| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387 | 
							- import os
 
- import os.path
 
- import sys
 
- import time
 
- import queue
 
- import argparse
 
- import tempfile
 
- import threading
 
- import subprocess
 
- from itertools import groupby
 
- import cv2
 
- import requests
 
- import bitstring
 
- from loguru import logger
 
- from Crypto.Hash import BLAKE2b
 
- try:
 
-     import rdrand
 
- except ImportError:
 
-     logger.warning("RdRand is not available.")
 
- try:
 
-     from linuxpy.video.device import Device, VideoCapture
 
- except ImportError:
 
-     logger.warning("linuxpy is not available.")
 
- tmp_dir = None
 
- push_timeout = 10
 
- class TemporaryFile:
 
-     def __init__(self, name, io, delete):
 
-         self.name = name
 
-         self.__io = io
 
-         self.__delete = delete
 
-     def __getattr__(self, k):
 
-         return getattr(self.__io, k)
 
-     def __del__(self):
 
-         if self.__delete:
 
-             try:
 
-                 os.unlink(self.name)
 
-             except FileNotFoundError:
 
-                 pass
 
- def NamedTemporaryFile(
 
-     mode="w+b", bufsize=-1, suffix="", prefix="tmp", dir=None, delete=True
 
- ):
 
-     if not dir:
 
-         dir = tempfile.gettempdir()
 
-     name = os.path.join(dir, prefix + os.urandom(32).hex() + suffix)
 
-     if mode is None:
 
-         return TemporaryFile(name, None, delete)
 
-     fh = open(name, "w+b", bufsize)
 
-     if mode != "w+b":
 
-         fh.close()
 
-         fh = open(name, mode)
 
-     return TemporaryFile(name, fh, delete)
 
- def chunks(lst, n):
 
-     for i in range(0, len(lst), n):
 
-         yield lst[i : i + n]
 
- def run_check(cmd):
 
-     logger.info(f"Executing '{cmd}'.")
 
-     subprocess.check_output(
 
-         cmd,
 
-         stderr=subprocess.STDOUT,
 
-         timeout=100
 
-     )
 
- def extract_image(path):
 
-     logger.info(f"Extract image '{path}'.")
 
-     im = cv2.imread(path)
 
-     data = []
 
-     rows, cols, _ = im.shape
 
-     for i in range(rows):
 
-         for j in range(cols):
 
-             r, g, b = im[i, j]
 
-             data.extend((r, g, b))
 
-     return bytes(data)
 
- def extract_wav(path):
 
-     logger.info(f"Extract audio: '{path}'.")
 
-     data = []
 
-     with open(path, "rb") as f:
 
-         for sample in chunks(f.read()[44:], 2):
 
-             data.append(sample[0])
 
-     return bytes(data)
 
- def extract_video(path):
 
-     logger.info(f"Extract video: '{path}'.")
 
-     with tempfile.TemporaryDirectory(dir=tmp_dir) as tmpd:
 
-         run_check(
 
-             f"ffmpeg -hide_banner -loglevel error -y -i {path} -vf mpdecimate -r 1/1 {tmpd}/%d.bmp"
 
-         )
 
-         data = b""
 
-         for filename in sorted(
 
-             os.listdir(tmpd), key=lambda filename: int(filename.split(".")[0])
 
-         ):
 
-             data += extract_image(os.path.join(tmpd, filename))
 
-         return data
 
- def extract_lsbs(data):
 
-     logger.info("Extract LSBs.")
 
-     buffer = []
 
-     data = [k for k, _ in groupby(data)]
 
-     data = chunks(data, 8)
 
-     for chunk in data:
 
-         if len(chunk) != 8:
 
-             break
 
-         ba = bitstring.BitArray(8)
 
-         for i, byte in zip(range(len(chunk)), chunk):
 
-             ba[i] = byte & 1
 
-         buffer.append(ba.u)
 
-     return bytes(buffer)
 
- def whiten(data):
 
-     logger.info("Whitening.")
 
-     buffer = b""
 
-     for chunk in chunks(data, 32):
 
-         if len(chunk) < 32:
 
-             break
 
-         buffer += BLAKE2b.new(data=chunk, digest_bits=256).digest()
 
-     return buffer
 
- def read_video(source, duration=60):
 
-     tmpf = NamedTemporaryFile(suffix=".mkv", mode=None, dir=tmp_dir)
 
-     run_check(
 
-         f"ffmpeg -hide_banner -loglevel error -y -i {source} -t {duration} -acodec copy -vcodec copy {tmpf.name}"
 
-     )
 
-     return extract_video(tmpf.name)
 
- def read_audio(source, duration=60):
 
-     tmpf = NamedTemporaryFile(suffix=".wav", mode=None, dir=tmp_dir)
 
-     run_check(
 
-         f"ffmpeg -hide_banner -loglevel error -y -f alsa -i {source} -t {duration} -ar 44100 -f s16le -acodec pcm_s16le {tmpf.name}"
 
-     )
 
-     return extract_wav(tmpf.name)
 
- def read_audio_video(source, duration=60):
 
-     tmpf = NamedTemporaryFile(suffix=".mkv", mode=None, dir=tmp_dir)
 
-     run_check(
 
-         f"ffmpeg -hide_banner -loglevel error -y -i {source} -t {duration} -acodec copy -vcodec copy {tmpf.name}"
 
-     )
 
-     data_a = extract_video(tmpf.name)
 
-     tmpf2 = NamedTemporaryFile(suffix=".wav", mode=None, dir=tmp_dir)
 
-     run_check(
 
-         f"ffmpeg -hide_banner -loglevel error -y -i {tmpf.name} -vn -ar 44100 -f s16le -acodec pcm_s16le {tmpf2.name}"
 
-     )
 
-     data_b = extract_wav(tmpf2.name)
 
-     return bytes(a ^ b for a, b in zip(data_a, data_b))
 
- def read_rdseed(_, amount=16):
 
-     data = rdrand.rdseed_get_bytes(amount)
 
-     if len(data) != amount or data.count(0) == amount:
 
-         raise ValueError("bad data")
 
-     return data
 
- def sample(source, source_type, multiplier=1):
 
-     match source_type:
 
-         case "video":
 
-             sampler = read_video
 
-             multiplier *= 60
 
-         case "audio":
 
-             sampler = read_audio
 
-             multiplier *= 60
 
-         case "video+audio":
 
-             sampler = read_audio_video
 
-             multiplier *= 60
 
-         case "rdseed":
 
-             sampler = read_rdseed
 
-         case _:
 
-             raise ValueError(source_type)
 
-     multiplier = int(multiplier)
 
-     if multiplier < 1:
 
-         raise ValueError(multiplier)
 
-     logger.info("Sampling...")
 
-     data = sampler(source, multiplier)
 
-     logger.info(f"Sample ready: {len(data)}b.")
 
-     if source_type != "rdseed":
 
-         data = extract_lsbs(data)
 
-         data = whiten(data)
 
-     return data
 
- def video2_extractor(q, q2):
 
-     while True:
 
-         data = q2.get()
 
-         data = chunks(data, 8)
 
-         newdata = b""
 
-         for chunk in data:
 
-             newdata += bytes(a ^ b for a, b in zip(chunk[:4], chunk[4:]))
 
-         data = extract_lsbs(newdata)
 
-         data = whiten(data)
 
-         logger.info(f"Sample ready: {len(data)}b.")
 
-         q.put(data)
 
- def video2_sampler(q, q2, source):
 
-     with Device.from_id(abs(int(source))) as device:
 
-         device.set_format(
 
-             1,
 
-             device.info.frame_sizes[0].width,
 
-             device.info.frame_sizes[0].height,
 
-             pixel_format="YUYV",
 
-         )
 
-         last = 0
 
-         for frame in device:
 
-             new = time.monotonic()
 
-             if new - last > 10:
 
-                 q2.put(bytes(frame))
 
-                 last = new
 
- def push(pool_url, data, secret):
 
-     logger.info(f"Pushing {len(data)}b.")
 
-     resp = requests.post(
 
-         f"{pool_url}/api/pool",
 
-         data=data,
 
-         headers={"X-Secret": secret},
 
-         timeout=(push_timeout, push_timeout),
 
-     )
 
-     (logger.success if resp.status_code == 200 else logger.error)(
 
-         f"{resp.status_code}: {resp.text}"
 
-     )
 
- def puller(queue, source, source_type, multiplier, sample_interval):
 
-     while True:
 
-         try:
 
-             data = sample(source, source_type, multiplier)
 
-         except KeyboardInterrupt:
 
-             logger.info("Interrupted by user.")
 
-             sys.exit(0)
 
-         except Exception as e:
 
-             logger.error(f"Pull exception: {e}")
 
-             if sample_interval:
 
-                 time.sleep(sample_interval)
 
-             continue
 
-         for piece in chunks(data, 1024 * 500):
 
-             queue.put(piece)
 
-         if sample_interval:
 
-             time.sleep(sample_interval)
 
- def pusher(queue, pool_url, secret, cooldown=0):
 
-     while True:
 
-         piece = queue.get()
 
-         try:
 
-             push(pool_url, piece, secret)
 
-         except KeyboardInterrupt:
 
-             logger.info("Interrupted by user.")
 
-             sys.exit(0)
 
-         except Exception as e:
 
-             logger.error(f"Push exception: {e}")
 
-             queue.put(piece)
 
-         if cooldown:
 
-             time.sleep(cooldown)
 
- if __name__ == "__main__":
 
-     parser = argparse.ArgumentParser()
 
-     parser.add_argument("--source", type=str, required=True)
 
-     parser.add_argument("--source-type", type=str, default="video+audio")
 
-     parser.add_argument("--multiplier", type=float, default=1)
 
-     parser.add_argument("--secret-file", type=str, default="./.secret")
 
-     parser.add_argument("--cooldown", type=int, default=0)
 
-     parser.add_argument("--sample-interval", type=int, default=0)
 
-     parser.add_argument("--pool-url", type=str, default="https://yebi.su")
 
-     parser.add_argument("--push-timeout", type=int, default=10)
 
-     parser.add_argument("--tmp-dir", type=str)
 
-     args = parser.parse_args()
 
-     if args.tmp_dir and os.path.isdir(args.tmp_dir):
 
-         tmp_dir = args.tmp_dir
 
-         logger.info(f"Changed temp-dir: '{tmp_dir}'")
 
-     push_timeout = max(args.push_timeout, 1)
 
-     with open(args.secret_file, "r") as f:
 
-         lines = f.read().strip().split("\n")
 
-         ident = lines[0].strip()
 
-         secret = lines[1].strip()
 
-         secret = f"{ident} {secret}"
 
-     q = queue.Queue()
 
-     pusher_th = threading.Thread(
 
-         target=pusher, args=(q, args.pool_url, secret, args.cooldown)
 
-     )
 
-     if args.source_type == "linuxvideo":
 
-         q2 = queue.Queue()
 
-         threading.Thread(target=video2_sampler, args=(q, q2, args.source)).start()
 
-         threading.Thread(target=video2_extractor, args=(q, q2)).start()
 
-     else:
 
-         threading.Thread(
 
-             target=puller, args=(q, args.source, args.source_type, args.multiplier, args.sample_interval)
 
-         ).start()
 
-     pusher_th = threading.Thread(
 
-         target=pusher, args=(q, args.pool_url, secret, args.cooldown)
 
-     )
 
-     pusher_th.start()
 
-     pusher_th.join()
 
 
  |