123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- import asyncio
- from concurrent.futures import ProcessPoolExecutor
- from itertools import groupby
- import bitstring
- from linuxpy.video.device import Device
- from Crypto.Hash import BLAKE2b
- from loguru import logger
- logger.level("INFO")
- def chunks(lst, n):
- for i in range(0, len(lst), n):
- yield lst[i: i + n]
- def extract_lsbs(data):
- logger.info("Extract LSBs.")
- buffer = []
- data = [k for k, _ in groupby(data)]
- data = chunks(data, 8)
- for chunk in data:
- if len(chunk) != 8:
- break
- ba = bitstring.BitArray(8)
- for i, byte in zip(range(len(chunk)), chunk):
- ba[i] = byte & 1
- buffer.append(ba.u)
- return bytes(buffer)
- def whiten(data: bytes) -> bytes:
- logger.info("Whitening.")
- buffer = b""
- for chunk in chunks(data, 32):
- if len(chunk) < 32:
- break
- buffer += BLAKE2b.new(data=chunk, digest_bits=256).digest()
- return buffer
- def video2_extractor(frame_bytes: bytes):
- logger.debug(f"frames_in {len(frame_bytes)}")
- chunked_bytes = chunks(frame_bytes, 8)
- newdata = b""
- for chunk in chunked_bytes:
- newdata += bytes(a ^ b for a, b in zip(chunk[:4], chunk[4:]))
- logger.debug(f"newdata {len(newdata)}")
- out_bytes = extract_lsbs(newdata)
- out_bytes = whiten(out_bytes)
- logger.info(f"Sample ready: {len(out_bytes)}b.")
- return out_bytes
- class FrameTakeWorker:
- def __init__(self, device_id=0, processes=3):
- self.processes = processes
- self.executor_pool = ProcessPoolExecutor(max_workers=3)
- logger.info(self.executor_pool)
- self.video_source = Device.from_id(device_id)
- self.video_source.open()
- self.video_source.set_format(
- 1,
- self.video_source.info.frame_sizes[0].width,
- self.video_source.info.frame_sizes[0].height,
- pixel_format="YUYV",
- )
- self.frame_gen = self.video_source.__iter__()
- async def get_frame(self):
- return next(self.frame_gen)
- async def frame_extractor(self, frame_bytes: bytes):
- loop = asyncio.get_event_loop()
- logger.info(f"Input bytes: {len(frame_bytes)}")
- output = await loop.run_in_executor(self.executor_pool, video2_extractor, frame_bytes)
- return output
- async def generator_worker(self):
- while True:
- logger.info("get frame")
- frame = await self.get_frame()
- logger.info(f"{frame}")
- data = await self.frame_extractor(bytes(frame))
- logger.info(f"{len(data)}")
- async def run_workers(self):
- tasks = [self.generator_worker() for _ in range(self.processes)]
- await asyncio.gather(*tasks)
- pass
- if __name__ == "__main__":
- ftw = FrameTakeWorker(1)
- asyncio.run(ftw.run_workers())
|