123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377 |
- import os
- import os.path
- import sys
- import time
- import queue
- import argparse
- import tempfile
- import threading
- from itertools import groupby
- import cv2
- import requests
- import bitstring
- from loguru import logger
- from Crypto.Hash import BLAKE2b
- try:
- import rdrand
- except ImportError:
- logger.warning("RdRand is not available.")
- try:
- from linuxpy.video.device import Device, VideoCapture
- except ImportError:
- logger.warning("linuxpy is not available.")
- tmp_dir = None
- push_timeout = 10
- class TemporaryFile:
- def __init__(self, name, io, delete):
- self.name = name
- self.__io = io
- self.__delete = delete
- def __getattr__(self, k):
- return getattr(self.__io, k)
- def __del__(self):
- if self.__delete:
- try:
- os.unlink(self.name)
- except FileNotFoundError:
- pass
- def NamedTemporaryFile(
- mode="w+b", bufsize=-1, suffix="", prefix="tmp", dir=None, delete=True
- ):
- if not dir:
- dir = tempfile.gettempdir()
- name = os.path.join(dir, prefix + os.urandom(32).hex() + suffix)
- if mode is None:
- return TemporaryFile(name, None, delete)
- fh = open(name, "w+b", bufsize)
- if mode != "w+b":
- fh.close()
- fh = open(name, mode)
- return TemporaryFile(name, fh, delete)
- def chunks(lst, n):
- for i in range(0, len(lst), n):
- yield lst[i : i + n]
- def run_check(cmd):
- logger.info(f"Executing '{cmd}'.")
- if os.system(cmd) != 0:
- raise ValueError("Exit code != 0.")
- def extract_image(path):
- logger.info(f"Extract image '{path}'.")
- im = cv2.imread(path)
- data = []
- rows, cols, _ = im.shape
- for i in range(rows):
- for j in range(cols):
- r, g, b = im[i, j]
- data.extend((r, g, b))
- return bytes(data)
- def extract_wav(path):
- logger.info(f"Extract audio: '{path}'.")
- data = []
- with open(path, "rb") as f:
- for sample in chunks(f.read()[44:], 2):
- data.append(sample[0])
- return bytes(data)
- def extract_video(path):
- logger.info(f"Extract video: '{path}'.")
- with tempfile.TemporaryDirectory(dir=tmp_dir) as tmpd:
- run_check(
- f"ffmpeg -hide_banner -loglevel error -y -i {path} -vf mpdecimate -r 1/1 {tmpd}/%d.bmp"
- )
- data = b""
- for filename in sorted(
- os.listdir(tmpd), key=lambda filename: int(filename.split(".")[0])
- ):
- data += extract_image(os.path.join(tmpd, filename))
- return data
- def extract_lsbs(data):
- logger.info("Extract LSBs.")
- buffer = []
- data = [k for k, _ in groupby(data)]
- data = chunks(data, 8)
- for chunk in data:
- if len(chunk) != 8:
- break
- ba = bitstring.BitArray(8)
- for i, byte in zip(range(len(chunk)), chunk):
- ba[i] = byte & 1
- buffer.append(ba.u)
- return bytes(buffer)
- def whiten(data):
- logger.info("Whitening.")
- buffer = b""
- for chunk in chunks(data, 32):
- if len(chunk) < 32:
- break
- buffer += BLAKE2b.new(data=chunk, digest_bits=256).digest()
- return buffer
- def read_video(source, duration=60):
- tmpf = NamedTemporaryFile(suffix=".mkv", mode=None, dir=tmp_dir)
- run_check(
- f"ffmpeg -hide_banner -loglevel error -y -i {source} -t {duration} -acodec copy -vcodec copy {tmpf.name}"
- )
- return extract_video(tmpf.name)
- def read_audio(source, duration=60):
- tmpf = NamedTemporaryFile(suffix=".wav", mode=None, dir=tmp_dir)
- run_check(
- f"ffmpeg -hide_banner -loglevel error -y -f alsa -i {source} -t {duration} -ar 44100 -f s16le -acodec pcm_s16le {tmpf.name}"
- )
- return extract_wav(tmpf.name)
- def read_audio_video(source, duration=60):
- tmpf = NamedTemporaryFile(suffix=".mkv", mode=None, dir=tmp_dir)
- run_check(
- f"ffmpeg -hide_banner -loglevel error -y -i {source} -t {duration} -acodec copy -vcodec copy {tmpf.name}"
- )
- data_a = extract_video(tmpf.name)
- tmpf2 = NamedTemporaryFile(suffix=".wav", mode=None, dir=tmp_dir)
- run_check(
- f"ffmpeg -hide_banner -loglevel error -y -i {tmpf.name} -vn -ar 44100 -f s16le -acodec pcm_s16le {tmpf2.name}"
- )
- data_b = extract_wav(tmpf2.name)
- return bytes(a ^ b for a, b in zip(data_a, data_b))
- def read_rdseed(_, amount=16):
- data = rdrand.rdseed_get_bytes(amount)
- if len(data) != amount or data.count(0) == amount:
- raise ValueError("bad data")
- return data
- def sample(source, source_type, multiplier=1):
- match source_type:
- case "video":
- sampler = read_video
- multiplier *= 60
- case "audio":
- sampler = read_audio
- multiplier *= 60
- case "video+audio":
- sampler = read_audio_video
- multiplier *= 60
- case "rdseed":
- sampler = read_rdseed
- case _:
- raise ValueError(source_type)
- multiplier = int(multiplier)
- if multiplier < 1:
- raise ValueError(multiplier)
- logger.info("Sampling...")
- data = sampler(source, multiplier)
- logger.info(f"Sample ready: {len(data)}b.")
- if source_type != "rdseed":
- data = extract_lsbs(data)
- data = whiten(data)
- return data
- def video2_extractor(q, q2):
- while True:
- data = q2.get()
- data = chunks(data, 8)
- newdata = b""
- for chunk in data:
- newdata += bytes(a ^ b for a, b in zip(chunk[:4], chunk[4:]))
- data = extract_lsbs(newdata)
- data = whiten(data)
- logger.info(f"Sample ready: {len(data)}b.")
- q.put(data)
- def video2_sampler(q, q2, source):
- with Device.from_id(abs(int(source))) as device:
- device.set_format(
- 1,
- device.info.frame_sizes[0].width,
- device.info.frame_sizes[0].height,
- pixel_format="YUYV",
- )
- last = 0
- for frame in device:
- new = time.monotonic()
- if new - last > 10:
- q2.put(bytes(frame))
- last = new
- def push(pool_url, data, secret):
- logger.info(f"Pushing {len(data)}b.")
- resp = requests.post(
- f"{pool_url}/api/pool",
- data=data,
- headers={"X-Secret": secret},
- timeout=(push_timeout, push_timeout),
- )
- (logger.success if resp.status_code == 200 else logger.error)(
- f"{resp.status_code}: {resp.text}"
- )
- def puller(queue, source, source_type, multiplier):
- while True:
- try:
- data = sample(source, source_type, multiplier)
- except KeyboardInterrupt:
- logger.info("Interrupted by user.")
- sys.exit(0)
- except Exception as e:
- logger.error(f"Pull exception: {e}")
- continue
- for piece in chunks(data, 1024 * 500):
- queue.put(piece)
- def pusher(queue, pool_url, secret, cooldown=0):
- while True:
- piece = queue.get()
- try:
- push(pool_url, piece, secret)
- except KeyboardInterrupt:
- logger.info("Interrupted by user.")
- sys.exit(0)
- except Exception as e:
- logger.error(f"Push exception: {e}")
- queue.put(piece)
- if cooldown:
- time.sleep(cooldown)
- if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument("--source", type=str, required=True)
- parser.add_argument("--source-type", type=str, default="video+audio")
- parser.add_argument("--multiplier", type=float, default=1)
- parser.add_argument("--secret-file", type=str, default="./.secret")
- parser.add_argument("--cooldown", type=int, default=0)
- parser.add_argument("--pool-url", type=str, default="https://yebi.su")
- parser.add_argument("--push-timeout", type=int, default=10)
- parser.add_argument("--tmp-dir", type=str)
- args = parser.parse_args()
- if args.tmp_dir and os.path.isdir(args.tmp_dir):
- tmp_dir = args.tmp_dir
- logger.info(f"Changed temp-dir: '{tmp_dir}'")
- push_timeout = max(args.push_timeout, 1)
- with open(args.secret_file, "r") as f:
- lines = f.read().strip().split("\n")
- ident = lines[0].strip()
- secret = lines[1].strip()
- secret = f"{ident} {secret}"
- q = queue.Queue()
- pusher_th = threading.Thread(
- target=pusher, args=(q, args.pool_url, secret, args.cooldown)
- )
- if args.source_type == "linuxvideo":
- q2 = queue.Queue()
- threading.Thread(target=video2_sampler, args=(q, q2, args.source)).start()
- threading.Thread(target=video2_extractor, args=(q, q2)).start()
- else:
- threading.Thread(
- target=puller, args=(q, args.source, args.source_type, args.multiplier)
- ).start()
- pusher_th = threading.Thread(
- target=pusher, args=(q, args.pool_url, secret, args.cooldown)
- )
- pusher_th.start()
- pusher_th.join()
|