main.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. import asyncio
  2. from concurrent.futures import ProcessPoolExecutor
  3. from itertools import groupby
  4. import bitstring
  5. from linuxpy.video.device import Device
  6. from Crypto.Hash import BLAKE2b
  7. from loguru import logger
  8. logger.level("INFO")
  9. def chunks(lst, n):
  10. for i in range(0, len(lst), n):
  11. yield lst[i: i + n]
  12. def extract_lsbs(data):
  13. logger.info("Extract LSBs.")
  14. buffer = []
  15. data = [k for k, _ in groupby(data)]
  16. data = chunks(data, 8)
  17. for chunk in data:
  18. if len(chunk) != 8:
  19. break
  20. ba = bitstring.BitArray(8)
  21. for i, byte in zip(range(len(chunk)), chunk):
  22. ba[i] = byte & 1
  23. buffer.append(ba.u)
  24. return bytes(buffer)
  25. def whiten(data: bytes) -> bytes:
  26. logger.info("Whitening.")
  27. buffer = b""
  28. for chunk in chunks(data, 32):
  29. if len(chunk) < 32:
  30. break
  31. buffer += BLAKE2b.new(data=chunk, digest_bits=256).digest()
  32. return buffer
  33. def video2_extractor(frame_bytes: bytes):
  34. logger.debug(f"frames_in {len(frame_bytes)}")
  35. chunked_bytes = chunks(frame_bytes, 8)
  36. newdata = b""
  37. for chunk in chunked_bytes:
  38. newdata += bytes(a ^ b for a, b in zip(chunk[:4], chunk[4:]))
  39. logger.debug(f"newdata {len(newdata)}")
  40. out_bytes = extract_lsbs(newdata)
  41. out_bytes = whiten(out_bytes)
  42. logger.info(f"Sample ready: {len(out_bytes)}b.")
  43. return out_bytes
  44. class FrameTakeWorker:
  45. def __init__(self, device_id=0, processes=3):
  46. self.processes = processes
  47. self.executor_pool = ProcessPoolExecutor(max_workers=3)
  48. logger.info(self.executor_pool)
  49. self.video_source = Device.from_id(device_id)
  50. self.video_source.open()
  51. self.video_source.set_format(
  52. 1,
  53. self.video_source.info.frame_sizes[0].width,
  54. self.video_source.info.frame_sizes[0].height,
  55. pixel_format="YUYV",
  56. )
  57. self.frame_gen = self.video_source.__iter__()
  58. async def get_frame(self):
  59. return next(self.frame_gen)
  60. def __del__(self):
  61. self.frame_gen.close()
  62. async def frame_extractor(self, frame_bytes: bytes):
  63. loop = asyncio.get_event_loop()
  64. logger.info(f"Input bytes: {len(frame_bytes)}")
  65. output = await loop.run_in_executor(self.executor_pool, video2_extractor, frame_bytes)
  66. return output
  67. async def generator_worker(self):
  68. while True:
  69. logger.info("get frame")
  70. frame = await self.get_frame()
  71. logger.info(f"{frame}")
  72. data = await self.frame_extractor(bytes(frame))
  73. logger.info(f"{len(data)}")
  74. async def run_workers(self):
  75. tasks = [self.generator_worker() for _ in range(self.processes)]
  76. await asyncio.gather(*tasks)
  77. pass
  78. if __name__ == "__main__":
  79. ftw = FrameTakeWorker(1)
  80. asyncio.run(ftw.run_workers())