123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- import argparse
- import asyncio
- import io
- import random
- import time
- from concurrent.futures import ProcessPoolExecutor
- from itertools import groupby
- import numpy as np
- import bitstring
- from linuxpy.video.device import Device
- from Crypto.Hash import BLAKE2b
- import aiohttp
- from loguru import logger
- import math
- logger.level("INFO")
- def split_array_numpy(data):
- data_np = np.frombuffer(data, dtype=np.uint32)
- first = data_np[::2].tobytes()
- second = data_np[1::2].tobytes()
- return first, second
- def xor_arrays_numpy(arr1, arr2):
- a = np.frombuffer(arr1, dtype=np.uint32)
- b = np.frombuffer(arr2, dtype=np.uint32)
- return np.bitwise_xor(a, b).tobytes()
- def chunks(in_bytes: bytes, take_n: int) -> bytes:
- bytes_buffer = io.BytesIO(in_bytes)
- bytes_send = bytes_buffer.read(take_n)
- while len(bytes_send) == take_n:
- yield bytes_send
- bytes_send = bytes_buffer.read(take_n)
- def extract_lsbs(data):
- deduced_data = bytearray()
- previous_byte = None
- for byte in data:
- if byte != previous_byte:
- deduced_data.append(byte)
- previous_byte = byte
- buffer = bytearray()
- chunk_size = 8
- num_full_chunks = len(deduced_data) // chunk_size
- for i in range(num_full_chunks):
- result_byte = 0
- start_index = i * chunk_size
- for j in range(chunk_size):
- result_byte |= ((deduced_data[start_index + j] & 1) << (7 - j))
- buffer.append(result_byte)
- return bytes(buffer)
- def whiten(data: bytes) -> bytes:
- logger.info("Whitening.")
- buffer = b""
- for chunk in chunks(data, 32):
- if len(chunk) < 32:
- break
- buffer += BLAKE2b.new(data=chunk, digest_bits=256).digest()
- return buffer
- def video2_extractor(frame_bytes: bytes):
- logger.debug(f"frames_in {len(frame_bytes)}")
- # chunked_bytes = chunks(frame_bytes, 8)
- # newdata = b""
- first_part, second_part = split_array_numpy(frame_bytes)
- new_data = xor_arrays_numpy(first_part, second_part)
- # for chunk in chunked_bytes:
- # newdata += bytes(a ^ b for a, b in zip(chunk[:4], chunk[4:]))
- logger.debug(f"new data {len(new_data)}")
- out_bytes = extract_lsbs(new_data)
- out_bytes = whiten(out_bytes)
- logger.info(f"Sample ready: {len(out_bytes)}b.")
- return out_bytes
- class GeneratorWorker:
- def __init__(self, pool_url: str, auth: str, device_id=0, processes=3, frame_timeout=1, push_timeout=10):
- self.processes = processes
- self.executor_pool = ProcessPoolExecutor(max_workers=3)
- logger.info(self.executor_pool)
- self.video_source = Device.from_id(device_id)
- self.video_source.open()
- self.video_source.set_format(
- 1,
- self.video_source.info.frame_sizes[0].width,
- self.video_source.info.frame_sizes[0].height,
- pixel_format="YUYV",
- )
- self.frame_gen = self.video_source.__iter__()
- self.frame_timeout = frame_timeout
- self.pool_url = f"{pool_url}/api/pool"
- self.auth = auth
- self.push_timeout = push_timeout
- self.bytes_queue = asyncio.Queue(maxsize=16)
- async def get_frame(self):
- return next(self.frame_gen)
- def __del__(self):
- if self.frame_gen:
- self.frame_gen.close()
- async def frame_extractor(self, frame_bytes: bytes):
- loop = asyncio.get_event_loop()
- logger.info(f"Take frame bytes: {len(frame_bytes)}")
- output = await loop.run_in_executor(self.executor_pool, video2_extractor, frame_bytes)
- return output
- async def generator_worker(self):
- await asyncio.sleep(random.uniform(1, 3))
- while True:
- get_frame = time.monotonic()
- frame = await self.get_frame()
- data = await self.frame_extractor(bytes(frame))
- await self.bytes_queue.put(data)
- put_frame = time.monotonic()
- frame_time = get_frame - put_frame
- if frame_time < self.frame_timeout:
- await asyncio.sleep(math.ceil(self.frame_timeout - frame_time))
- else:
- await asyncio.sleep(random.uniform(1, 3))
- async def push_worker(self):
- await asyncio.sleep(self.push_timeout)
- while True:
- data_bytes = b""
- for _ in range(self.bytes_queue.qsize()):
- data_bytes += await self.bytes_queue.get()
- async with aiohttp.ClientSession() as session:
- async with session.post(self.pool_url, data=data_bytes, headers={"X-Secret": self.auth}) as response:
- if response.status == 200:
- logger.success(f"{response.status}: {response.text}")
- else:
- logger.error(f"{response.status}: {response.text}")
- await asyncio.sleep(self.push_timeout)
- async def run_workers(self):
- tasks = [self.generator_worker() for _ in range(self.processes)]
- tasks.append(self.push_worker())
- await asyncio.gather(*tasks)
- if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument("--source", type=str, required=True)
- # parser.add_argument("--source-type", type=str, default="video")
- parser.add_argument("--secret-file", type=str, default="./.secret")
- parser.add_argument("--frame-timeout", type=int, default=1)
- parser.add_argument("--pool-url", type=str, default="https://yebi.su")
- parser.add_argument("--push-timeout", type=int, default=10)
- parser.add_argument("--tmp-dir", type=str)
- args = parser.parse_args()
- with open(args.secret_file, "r") as f:
- lines = f.read().strip().split("\n")
- ident = lines[0].strip()
- secret = lines[1].strip()
- secret = f"{ident} {secret}"
- ftw = GeneratorWorker(pool_url=args.pool_url, auth=secret,
- device_id=args.source,
- frame_timeout=args.frame_timeout,
- push_timeout=args.push_timeout)
- asyncio.run(ftw.run_workers())
|