import asyncio from concurrent.futures import ProcessPoolExecutor from itertools import groupby import bitstring from linuxpy.video.device import Device from Crypto.Hash import BLAKE2b from loguru import logger logger.level("INFO") def chunks(lst, n): for i in range(0, len(lst), n): yield lst[i: i + n] def extract_lsbs(data): logger.info("Extract LSBs.") buffer = [] data = [k for k, _ in groupby(data)] data = chunks(data, 8) for chunk in data: if len(chunk) != 8: break ba = bitstring.BitArray(8) for i, byte in zip(range(len(chunk)), chunk): ba[i] = byte & 1 buffer.append(ba.u) return bytes(buffer) def whiten(data: bytes) -> bytes: logger.info("Whitening.") buffer = b"" for chunk in chunks(data, 32): if len(chunk) < 32: break buffer += BLAKE2b.new(data=chunk, digest_bits=256).digest() return buffer def video2_extractor(frame_bytes: bytes): logger.debug(f"frames_in {len(frame_bytes)}") chunked_bytes = chunks(frame_bytes, 8) newdata = b"" for chunk in chunked_bytes: newdata += bytes(a ^ b for a, b in zip(chunk[:4], chunk[4:])) logger.debug(f"newdata {len(newdata)}") out_bytes = extract_lsbs(newdata) out_bytes = whiten(out_bytes) logger.info(f"Sample ready: {len(out_bytes)}b.") return out_bytes class FrameTakeWorker: def __init__(self, device_id=0, processes=3): self.processes = processes self.executor_pool = ProcessPoolExecutor(max_workers=3) logger.info(self.executor_pool) self.video_source = Device.from_id(device_id) self.video_source.open() self.video_source.set_format( 1, self.video_source.info.frame_sizes[0].width, self.video_source.info.frame_sizes[0].height, pixel_format="YUYV", ) self.frame_gen = self.video_source.__iter__() async def get_frame(self): return next(self.frame_gen) def __del__(self): self.frame_gen.close() async def frame_extractor(self, frame_bytes: bytes): loop = asyncio.get_event_loop() logger.info(f"Input bytes: {len(frame_bytes)}") output = await loop.run_in_executor(self.executor_pool, video2_extractor, frame_bytes) return output async def generator_worker(self): while True: logger.info("get frame") frame = await self.get_frame() logger.info(f"{frame}") data = await self.frame_extractor(bytes(frame)) logger.info(f"{len(data)}") async def run_workers(self): tasks = [self.generator_worker() for _ in range(self.processes)] await asyncio.gather(*tasks) pass if __name__ == "__main__": ftw = FrameTakeWorker(1) asyncio.run(ftw.run_workers())