main.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. import os
  2. import os.path
  3. import sys
  4. import time
  5. import queue
  6. import argparse
  7. import tempfile
  8. import threading
  9. import subprocess
  10. from itertools import groupby
  11. import cv2
  12. import requests
  13. import bitstring
  14. from loguru import logger
  15. from Crypto.Hash import BLAKE2b
  16. try:
  17. import rdrand
  18. except ImportError:
  19. logger.warning("RdRand is not available.")
  20. try:
  21. from linuxpy.video.device import Device, VideoCapture
  22. except ImportError:
  23. logger.warning("linuxpy is not available.")
  24. tmp_dir = None
  25. push_timeout = 10
  26. class TemporaryFile:
  27. def __init__(self, name, io, delete):
  28. self.name = name
  29. self.__io = io
  30. self.__delete = delete
  31. def __getattr__(self, k):
  32. return getattr(self.__io, k)
  33. def __del__(self):
  34. if self.__delete:
  35. try:
  36. os.unlink(self.name)
  37. except FileNotFoundError:
  38. pass
  39. def NamedTemporaryFile(
  40. mode="w+b", bufsize=-1, suffix="", prefix="tmp", dir=None, delete=True
  41. ):
  42. if not dir:
  43. dir = tempfile.gettempdir()
  44. name = os.path.join(dir, prefix + os.urandom(32).hex() + suffix)
  45. if mode is None:
  46. return TemporaryFile(name, None, delete)
  47. fh = open(name, "w+b", bufsize)
  48. if mode != "w+b":
  49. fh.close()
  50. fh = open(name, mode)
  51. return TemporaryFile(name, fh, delete)
  52. def chunks(lst, n):
  53. for i in range(0, len(lst), n):
  54. yield lst[i : i + n]
  55. def run_check(cmd):
  56. logger.info(f"Executing '{cmd}'.")
  57. subprocess.check_output(
  58. cmd,
  59. stderr=subprocess.STDOUT,
  60. timeout=100,
  61. shell=True
  62. )
  63. def extract_image(path):
  64. logger.info(f"Extract image '{path}'.")
  65. im = cv2.imread(path)
  66. data = []
  67. rows, cols, _ = im.shape
  68. for i in range(rows):
  69. for j in range(cols):
  70. r, g, b = im[i, j]
  71. data.extend((r, g, b))
  72. return bytes(data)
  73. def extract_wav(path):
  74. logger.info(f"Extract audio: '{path}'.")
  75. data = []
  76. with open(path, "rb") as f:
  77. for sample in chunks(f.read()[44:], 2):
  78. data.append(sample[0])
  79. return bytes(data)
  80. def extract_video(path):
  81. logger.info(f"Extract video: '{path}'.")
  82. with tempfile.TemporaryDirectory(dir=tmp_dir) as tmpd:
  83. run_check(
  84. f"ffmpeg -hide_banner -loglevel error -y -i {path} -vf mpdecimate -r 1/1 {tmpd}/%d.bmp"
  85. )
  86. data = b""
  87. for filename in sorted(
  88. os.listdir(tmpd), key=lambda filename: int(filename.split(".")[0])
  89. ):
  90. data += extract_image(os.path.join(tmpd, filename))
  91. return data
  92. def extract_lsbs(data):
  93. logger.info("Extract LSBs.")
  94. buffer = []
  95. data = [k for k, _ in groupby(data)]
  96. data = chunks(data, 8)
  97. for chunk in data:
  98. if len(chunk) != 8:
  99. break
  100. ba = bitstring.BitArray(8)
  101. for i, byte in zip(range(len(chunk)), chunk):
  102. ba[i] = byte & 1
  103. buffer.append(ba.u)
  104. return bytes(buffer)
  105. def whiten(data):
  106. logger.info("Whitening.")
  107. buffer = b""
  108. for chunk in chunks(data, 32):
  109. if len(chunk) < 32:
  110. break
  111. buffer += BLAKE2b.new(data=chunk, digest_bits=256).digest()
  112. return buffer
  113. def read_video(source, duration=60):
  114. tmpf = NamedTemporaryFile(suffix=".mkv", mode=None, dir=tmp_dir)
  115. run_check(
  116. f"ffmpeg -hide_banner -loglevel error -y -i {source} -t {duration} -acodec copy -vcodec copy {tmpf.name}"
  117. )
  118. return extract_video(tmpf.name)
  119. def read_audio(source, duration=60):
  120. tmpf = NamedTemporaryFile(suffix=".wav", mode=None, dir=tmp_dir)
  121. run_check(
  122. f"ffmpeg -hide_banner -loglevel error -y -f alsa -i {source} -t {duration} -ar 44100 -f s16le -acodec pcm_s16le {tmpf.name}"
  123. )
  124. return extract_wav(tmpf.name)
  125. def read_audio_video(source, duration=60):
  126. tmpf = NamedTemporaryFile(suffix=".mkv", mode=None, dir=tmp_dir)
  127. run_check(
  128. f"ffmpeg -hide_banner -loglevel error -y -i {source} -t {duration} -acodec copy -vcodec copy {tmpf.name}"
  129. )
  130. data_a = extract_video(tmpf.name)
  131. tmpf2 = NamedTemporaryFile(suffix=".wav", mode=None, dir=tmp_dir)
  132. run_check(
  133. f"ffmpeg -hide_banner -loglevel error -y -i {tmpf.name} -vn -ar 44100 -f s16le -acodec pcm_s16le {tmpf2.name}"
  134. )
  135. data_b = extract_wav(tmpf2.name)
  136. return bytes(a ^ b for a, b in zip(data_a, data_b))
  137. def read_rdseed(_, amount=16):
  138. data = rdrand.rdseed_get_bytes(amount)
  139. if len(data) != amount or data.count(0) == amount:
  140. raise ValueError("bad data")
  141. return data
  142. def sample(source, source_type, multiplier=1):
  143. match source_type:
  144. case "video":
  145. sampler = read_video
  146. multiplier *= 60
  147. case "audio":
  148. sampler = read_audio
  149. multiplier *= 60
  150. case "video+audio":
  151. sampler = read_audio_video
  152. multiplier *= 60
  153. case "rdseed":
  154. sampler = read_rdseed
  155. case _:
  156. raise ValueError(source_type)
  157. multiplier = int(multiplier)
  158. if multiplier < 1:
  159. raise ValueError(multiplier)
  160. logger.info("Sampling...")
  161. data = sampler(source, multiplier)
  162. logger.info(f"Sample ready: {len(data)}b.")
  163. if source_type != "rdseed":
  164. data = extract_lsbs(data)
  165. data = whiten(data)
  166. return data
  167. def video2_extractor(q, q2):
  168. while True:
  169. data = q2.get()
  170. data = chunks(data, 8)
  171. newdata = b""
  172. for chunk in data:
  173. newdata += bytes(a ^ b for a, b in zip(chunk[:4], chunk[4:]))
  174. data = extract_lsbs(newdata)
  175. data = whiten(data)
  176. logger.info(f"Sample ready: {len(data)}b.")
  177. q.put(data)
  178. def video2_sampler(q, q2, source):
  179. with Device.from_id(abs(int(source))) as device:
  180. device.set_format(
  181. 1,
  182. device.info.frame_sizes[0].width,
  183. device.info.frame_sizes[0].height,
  184. pixel_format="YUYV",
  185. )
  186. last = 0
  187. for frame in device:
  188. new = time.monotonic()
  189. if new - last > 10:
  190. q2.put(bytes(frame))
  191. last = new
  192. def push(pool_url, data, secret):
  193. logger.info(f"Pushing {len(data)}b.")
  194. resp = requests.post(
  195. f"{pool_url}/api/pool",
  196. data=data,
  197. headers={"X-Secret": secret},
  198. timeout=(push_timeout, push_timeout),
  199. )
  200. (logger.success if resp.status_code == 200 else logger.error)(
  201. f"{resp.status_code}: {resp.text}"
  202. )
  203. def puller(queue, source, source_type, multiplier, sample_interval):
  204. while True:
  205. try:
  206. data = sample(source, source_type, multiplier)
  207. except KeyboardInterrupt:
  208. logger.info("Interrupted by user.")
  209. sys.exit(0)
  210. except Exception as e:
  211. logger.error(f"Pull exception: {e}")
  212. if sample_interval:
  213. time.sleep(sample_interval)
  214. continue
  215. for piece in chunks(data, 1024 * 500):
  216. queue.put(piece)
  217. if sample_interval:
  218. time.sleep(sample_interval)
  219. def pusher(queue, pool_url, secret, cooldown=0):
  220. while True:
  221. piece = queue.get()
  222. try:
  223. push(pool_url, piece, secret)
  224. except KeyboardInterrupt:
  225. logger.info("Interrupted by user.")
  226. sys.exit(0)
  227. except Exception as e:
  228. logger.error(f"Push exception: {e}")
  229. queue.put(piece)
  230. if cooldown:
  231. time.sleep(cooldown)
  232. if __name__ == "__main__":
  233. parser = argparse.ArgumentParser()
  234. parser.add_argument("--source", type=str, required=True)
  235. parser.add_argument("--source-type", type=str, default="video+audio")
  236. parser.add_argument("--multiplier", type=float, default=1)
  237. parser.add_argument("--secret-file", type=str, default="./.secret")
  238. parser.add_argument("--cooldown", type=int, default=0)
  239. parser.add_argument("--sample-interval", type=int, default=0)
  240. parser.add_argument("--pool-url", type=str, default="https://yebi.su")
  241. parser.add_argument("--push-timeout", type=int, default=10)
  242. parser.add_argument("--tmp-dir", type=str)
  243. args = parser.parse_args()
  244. if args.tmp_dir and os.path.isdir(args.tmp_dir):
  245. tmp_dir = args.tmp_dir
  246. logger.info(f"Changed temp-dir: '{tmp_dir}'")
  247. push_timeout = max(args.push_timeout, 1)
  248. with open(args.secret_file, "r") as f:
  249. lines = f.read().strip().split("\n")
  250. ident = lines[0].strip()
  251. secret = lines[1].strip()
  252. secret = f"{ident} {secret}"
  253. q = queue.Queue()
  254. pusher_th = threading.Thread(
  255. target=pusher, args=(q, args.pool_url, secret, args.cooldown)
  256. )
  257. if args.source_type == "linuxvideo":
  258. q2 = queue.Queue()
  259. threading.Thread(target=video2_sampler, args=(q, q2, args.source)).start()
  260. threading.Thread(target=video2_extractor, args=(q, q2)).start()
  261. else:
  262. threading.Thread(
  263. target=puller, args=(q, args.source, args.source_type, args.multiplier, args.sample_interval)
  264. ).start()
  265. pusher_th = threading.Thread(
  266. target=pusher, args=(q, args.pool_url, secret, args.cooldown)
  267. )
  268. pusher_th.start()
  269. pusher_th.join()