main.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. import argparse
  2. import asyncio
  3. import io
  4. import random
  5. import time
  6. from concurrent.futures import ProcessPoolExecutor
  7. from itertools import groupby
  8. import numpy as np
  9. import bitstring
  10. from linuxpy.video.device import Device
  11. from Crypto.Hash import BLAKE2b
  12. import aiohttp
  13. from loguru import logger
  14. import math
  15. logger.level("INFO")
  16. def split_array_numpy(data):
  17. data_np = np.frombuffer(data, dtype=np.uint32)
  18. first = data_np[::2].tobytes()
  19. second = data_np[1::2].tobytes()
  20. return first, second
  21. def xor_arrays_numpy(arr1, arr2):
  22. a = np.frombuffer(arr1, dtype=np.uint32)
  23. b = np.frombuffer(arr2, dtype=np.uint32)
  24. return np.bitwise_xor(a, b).tobytes()
  25. def chunks(in_bytes: bytes, take_n: int) -> bytes:
  26. bytes_buffer = io.BytesIO(in_bytes)
  27. bytes_send = bytes_buffer.read(take_n)
  28. while len(bytes_send) == take_n:
  29. yield bytes_send
  30. bytes_send = bytes_buffer.read(take_n)
  31. def extract_lsbs(data):
  32. deduced_data = bytearray()
  33. previous_byte = None
  34. for byte in data:
  35. if byte != previous_byte:
  36. deduced_data.append(byte)
  37. previous_byte = byte
  38. buffer = bytearray()
  39. chunk_size = 8
  40. num_full_chunks = len(deduced_data) // chunk_size
  41. for i in range(num_full_chunks):
  42. result_byte = 0
  43. start_index = i * chunk_size
  44. for j in range(chunk_size):
  45. result_byte |= ((deduced_data[start_index + j] & 1) << (7 - j))
  46. buffer.append(result_byte)
  47. return bytes(buffer)
  48. def whiten(data: bytes) -> bytes:
  49. logger.info("Whitening.")
  50. buffer = b""
  51. for chunk in chunks(data, 32):
  52. if len(chunk) < 32:
  53. break
  54. buffer += BLAKE2b.new(data=chunk, digest_bits=256).digest()
  55. return buffer
  56. def video2_extractor(frame_bytes: bytes):
  57. logger.debug(f"frames_in {len(frame_bytes)}")
  58. # chunked_bytes = chunks(frame_bytes, 8)
  59. # newdata = b""
  60. first_part, second_part = split_array_numpy(frame_bytes)
  61. new_data = xor_arrays_numpy(first_part, second_part)
  62. # for chunk in chunked_bytes:
  63. # newdata += bytes(a ^ b for a, b in zip(chunk[:4], chunk[4:]))
  64. logger.debug(f"new data {len(new_data)}")
  65. out_bytes = extract_lsbs(new_data)
  66. out_bytes = whiten(out_bytes)
  67. logger.info(f"Sample ready: {len(out_bytes)}b.")
  68. return out_bytes
  69. class GeneratorWorker:
  70. def __init__(self, pool_url: str, auth: str, device_id=0, processes=3, frame_timeout=1, push_timeout=10):
  71. self.processes = processes
  72. self.executor_pool = ProcessPoolExecutor(max_workers=3)
  73. logger.info(self.executor_pool)
  74. self.video_source = Device.from_id(device_id)
  75. self.video_source.open()
  76. self.video_source.set_format(
  77. 1,
  78. self.video_source.info.frame_sizes[0].width,
  79. self.video_source.info.frame_sizes[0].height,
  80. pixel_format="YUYV",
  81. )
  82. self.frame_gen = self.video_source.__iter__()
  83. self.frame_timeout = frame_timeout
  84. self.pool_url = f"{pool_url}/api/pool"
  85. self.auth = auth
  86. self.push_timeout = push_timeout
  87. self.bytes_queue = asyncio.Queue(maxsize=16)
  88. async def get_frame(self):
  89. return next(self.frame_gen)
  90. def __del__(self):
  91. if self.frame_gen:
  92. self.frame_gen.close()
  93. async def frame_extractor(self, frame_bytes: bytes):
  94. loop = asyncio.get_event_loop()
  95. logger.info(f"Take frame bytes: {len(frame_bytes)}")
  96. output = await loop.run_in_executor(self.executor_pool, video2_extractor, frame_bytes)
  97. return output
  98. async def generator_worker(self):
  99. await asyncio.sleep(random.uniform(1, 3))
  100. while True:
  101. get_frame = time.monotonic()
  102. frame = await self.get_frame()
  103. data = await self.frame_extractor(bytes(frame))
  104. await self.bytes_queue.put(data)
  105. put_frame = time.monotonic()
  106. frame_time = get_frame - put_frame
  107. if frame_time < self.frame_timeout:
  108. await asyncio.sleep(math.ceil(self.frame_timeout - frame_time))
  109. else:
  110. await asyncio.sleep(random.uniform(1, 3))
  111. async def push_worker(self):
  112. await asyncio.sleep(self.push_timeout)
  113. while True:
  114. data_bytes = b""
  115. for _ in range(self.bytes_queue.qsize()):
  116. data_bytes += await self.bytes_queue.get()
  117. async with aiohttp.ClientSession() as session:
  118. async with session.post(self.pool_url, data=data_bytes, headers={"X-Secret": self.auth}) as response:
  119. if response.status == 200:
  120. logger.success(f"{response.status}: {response.text}")
  121. else:
  122. logger.error(f"{response.status}: {response.text}")
  123. await asyncio.sleep(self.push_timeout)
  124. async def run_workers(self):
  125. tasks = [self.generator_worker() for _ in range(self.processes)]
  126. tasks.append(self.push_worker())
  127. await asyncio.gather(*tasks)
  128. if __name__ == "__main__":
  129. parser = argparse.ArgumentParser()
  130. parser.add_argument("--source", type=str, required=True)
  131. # parser.add_argument("--source-type", type=str, default="video")
  132. parser.add_argument("--secret-file", type=str, default="./.secret")
  133. parser.add_argument("--frame-timeout", type=int, default=1)
  134. parser.add_argument("--pool-url", type=str, default="https://yebi.su")
  135. parser.add_argument("--push-timeout", type=int, default=10)
  136. parser.add_argument("--tmp-dir", type=str)
  137. args = parser.parse_args()
  138. with open(args.secret_file, "r") as f:
  139. lines = f.read().strip().split("\n")
  140. ident = lines[0].strip()
  141. secret = lines[1].strip()
  142. secret = f"{ident} {secret}"
  143. ftw = GeneratorWorker(pool_url=args.pool_url, auth=secret,
  144. device_id=args.source,
  145. frame_timeout=args.frame_timeout,
  146. push_timeout=args.push_timeout)
  147. asyncio.run(ftw.run_workers())