import argparse import asyncio import io import random import time from concurrent.futures import ProcessPoolExecutor from itertools import groupby import numpy as np import bitstring from linuxpy.video.device import Device from Crypto.Hash import BLAKE2b import aiohttp from loguru import logger import math logger.level("INFO") def split_array_numpy(data): data_np = np.frombuffer(data, dtype=np.uint32) first = data_np[::2].tobytes() second = data_np[1::2].tobytes() return first, second def xor_arrays_numpy(arr1, arr2): a = np.frombuffer(arr1, dtype=np.uint32) b = np.frombuffer(arr2, dtype=np.uint32) return np.bitwise_xor(a, b).tobytes() def chunks(in_bytes: bytes, take_n: int) -> bytes: bytes_buffer = io.BytesIO(in_bytes) bytes_send = bytes_buffer.read(take_n) while len(bytes_send) == take_n: yield bytes_send bytes_send = bytes_buffer.read(take_n) def extract_lsbs(data): deduced_data = bytearray() previous_byte = None for byte in data: if byte != previous_byte: deduced_data.append(byte) previous_byte = byte buffer = bytearray() chunk_size = 8 num_full_chunks = len(deduced_data) // chunk_size for i in range(num_full_chunks): result_byte = 0 start_index = i * chunk_size for j in range(chunk_size): result_byte |= ((deduced_data[start_index + j] & 1) << (7 - j)) buffer.append(result_byte) return bytes(buffer) def whiten(data: bytes) -> bytes: logger.info("Whitening.") buffer = b"" for chunk in chunks(data, 32): if len(chunk) < 32: break buffer += BLAKE2b.new(data=chunk, digest_bits=256).digest() return buffer def video2_extractor(frame_bytes: bytes): logger.debug(f"frames_in {len(frame_bytes)}") # chunked_bytes = chunks(frame_bytes, 8) # newdata = b"" first_part, second_part = split_array_numpy(frame_bytes) new_data = xor_arrays_numpy(first_part, second_part) # for chunk in chunked_bytes: # newdata += bytes(a ^ b for a, b in zip(chunk[:4], chunk[4:])) logger.debug(f"new data {len(new_data)}") out_bytes = extract_lsbs(new_data) out_bytes = whiten(out_bytes) logger.info(f"Sample ready: {len(out_bytes)}b.") return out_bytes class GeneratorWorker: def __init__(self, pool_url: str, auth: str, device_id=0, processes=3, frame_timeout=1, push_timeout=10): self.processes = processes self.executor_pool = ProcessPoolExecutor(max_workers=3) logger.info(self.executor_pool) self.video_source = Device.from_id(device_id) self.video_source.open() self.video_source.set_format( 1, self.video_source.info.frame_sizes[0].width, self.video_source.info.frame_sizes[0].height, pixel_format="YUYV", ) self.frame_gen = self.video_source.__iter__() self.frame_timeout = frame_timeout self.pool_url = f"{pool_url}/api/pool" self.auth = auth self.push_timeout = push_timeout self.bytes_queue = asyncio.Queue(maxsize=16) async def get_frame(self): return next(self.frame_gen) def __del__(self): if self.frame_gen: self.frame_gen.close() async def frame_extractor(self, frame_bytes: bytes): loop = asyncio.get_event_loop() logger.info(f"Take frame bytes: {len(frame_bytes)}") output = await loop.run_in_executor(self.executor_pool, video2_extractor, frame_bytes) return output async def generator_worker(self): await asyncio.sleep(random.uniform(1, 3)) while True: get_frame = time.monotonic() frame = await self.get_frame() data = await self.frame_extractor(bytes(frame)) await self.bytes_queue.put(data) put_frame = time.monotonic() frame_time = get_frame - put_frame if frame_time < self.frame_timeout: await asyncio.sleep(math.ceil(self.frame_timeout - frame_time)) else: await asyncio.sleep(random.uniform(1, 3)) async def push_worker(self): await asyncio.sleep(self.push_timeout) while True: data_bytes = b"" for _ in range(self.bytes_queue.qsize()): data_bytes += await self.bytes_queue.get() async with aiohttp.ClientSession() as session: async with session.post(self.pool_url, data=data_bytes, headers={"X-Secret": self.auth}) as response: if response.status == 200: logger.success(f"{response.status}: {response.text}") else: logger.error(f"{response.status}: {response.text}") await asyncio.sleep(self.push_timeout) async def run_workers(self): tasks = [self.generator_worker() for _ in range(self.processes)] tasks.append(self.push_worker()) await asyncio.gather(*tasks) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--source", type=str, required=True) # parser.add_argument("--source-type", type=str, default="video") parser.add_argument("--secret-file", type=str, default="./.secret") parser.add_argument("--frame-timeout", type=int, default=1) parser.add_argument("--pool-url", type=str, default="https://yebi.su") parser.add_argument("--push-timeout", type=int, default=10) parser.add_argument("--tmp-dir", type=str) args = parser.parse_args() with open(args.secret_file, "r") as f: lines = f.read().strip().split("\n") ident = lines[0].strip() secret = lines[1].strip() secret = f"{ident} {secret}" ftw = GeneratorWorker(pool_url=args.pool_url, auth=secret, device_id=args.source, frame_timeout=args.frame_timeout, push_timeout=args.push_timeout) asyncio.run(ftw.run_workers())