| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370 | import osimport os.pathimport sysimport timeimport queueimport argparseimport tempfileimport threadingfrom itertools import groupbyimport cv2import requestsimport bitstringfrom loguru import loggerfrom Crypto.Hash import BLAKE2btry:    import rdrandexcept ImportError:    logger.warning("RdRand is not available.")try:    from linuxpy.video.device import Device, VideoCaptureexcept ImportError:    logger.warning("linuxpy is not available.")tmp_dir = Nonepush_timeout = 10class TemporaryFile:    def __init__(self, name, io, delete):        self.name = name        self.__io = io        self.__delete = delete    def __getattr__(self, k):        return getattr(self.__io, k)    def __del__(self):        if self.__delete:            try:                os.unlink(self.name)            except FileNotFoundError:                passdef NamedTemporaryFile(    mode="w+b", bufsize=-1, suffix="", prefix="tmp", dir=None, delete=True):    if not dir:        dir = tempfile.gettempdir()    name = os.path.join(dir, prefix + os.urandom(32).hex() + suffix)    if mode is None:        return TemporaryFile(name, None, delete)    fh = open(name, "w+b", bufsize)    if mode != "w+b":        fh.close()        fh = open(name, mode)    return TemporaryFile(name, fh, delete)def chunks(lst, n):    for i in range(0, len(lst), n):        yield lst[i : i + n]def run_check(cmd):    logger.info(f"Executing '{cmd}'.")    if os.system(cmd) != 0:        raise ValueError("Exit code != 0.")def extract_image(path):    logger.info(f"Extract image '{path}'.")    im = cv2.imread(path)    data = []    rows, cols, _ = im.shape    for i in range(rows):        for j in range(cols):            r, g, b = im[i, j]            data.extend((r, g, b))    return bytes(data)def extract_wav(path):    logger.info(f"Extract audio: '{path}'.")    data = []    with open(path, "rb") as f:        for sample in chunks(f.read()[44:], 2):            data.append(sample[0])    return bytes(data)def extract_video(path):    logger.info(f"Extract video: '{path}'.")    with tempfile.TemporaryDirectory(dir=tmp_dir) as tmpd:        run_check(            f"ffmpeg -hide_banner -loglevel error -y -i {path} -vf mpdecimate -r 1/1 {tmpd}/%d.bmp"        )        data = b""        for filename in sorted(            os.listdir(tmpd), key=lambda filename: int(filename.split(".")[0])        ):            data += extract_image(os.path.join(tmpd, filename))        return datadef extract_lsbs(data):    logger.info("Extract LSBs.")    buffer = []    data = [k for k, _ in groupby(data)]    data = chunks(data, 8)    for chunk in data:        if len(chunk) != 8:            break        ba = bitstring.BitArray(8)        for i, byte in zip(range(len(chunk)), chunk):            ba[i] = byte & 1        buffer.append(ba.u)    return bytes(buffer)def whiten(data):    logger.info("Whitening.")    buffer = b""    for chunk in chunks(data, 32):        if len(chunk) < 32:            break        buffer += BLAKE2b.new(data=chunk, digest_bits=256).digest()    return bufferdef read_video(source, duration=60):    tmpf = NamedTemporaryFile(suffix=".mkv", mode=None, dir=tmp_dir)    run_check(        f"ffmpeg -hide_banner -loglevel error -y -i {source} -t {duration} -acodec copy -vcodec copy {tmpf.name}"    )    return extract_video(tmpf.name)def read_audio(source, duration=60):    tmpf = NamedTemporaryFile(suffix=".wav", mode=None, dir=tmp_dir)    run_check(        f"ffmpeg -hide_banner -loglevel error -y -f alsa -i {source} -t {duration} -ar 44100 -f s16le -acodec pcm_s16le {tmpf.name}"    )    return extract_wav(tmpf.name)def read_audio_video(source, duration=60):    tmpf = NamedTemporaryFile(suffix=".mkv", mode=None, dir=tmp_dir)    run_check(        f"ffmpeg -hide_banner -loglevel error -y -i {source} -t {duration} -acodec copy -vcodec copy {tmpf.name}"    )    data_a = extract_video(tmpf.name)    tmpf2 = NamedTemporaryFile(suffix=".wav", mode=None, dir=tmp_dir)    run_check(        f"ffmpeg -hide_banner -loglevel error -y -i {tmpf.name} -vn -ar 44100 -f s16le -acodec pcm_s16le {tmpf2.name}"    )    data_b = extract_wav(tmpf2.name)    return bytes(a ^ b for a, b in zip(data_a, data_b))def read_rdseed(_, amount=16):    data = rdrand.rdseed_get_bytes(amount)    if len(data) != amount or data.count(0) == amount:        raise ValueError("bad data")    return datadef sample(source, source_type, multiplier=1):    match source_type:        case "video":            sampler = read_video            multiplier *= 60        case "audio":            sampler = read_audio            multiplier *= 60        case "video+audio":            sampler = read_audio_video            multiplier *= 60        case "rdseed":            sampler = read_rdseed        case _:            raise ValueError(source_type)    multiplier = int(multiplier)    if multiplier < 1:        raise ValueError(multiplier)    logger.info("Sampling...")    data = sampler(source, multiplier)    logger.info(f"Sample ready: {len(data)}b.")    if source_type != "rdseed":        data = extract_lsbs(data)        data = whiten(data)    return datadef video2_extractor(q, q2):    while True:        data = extract_lsbs(q2.get())        data = whiten(data)        logger.info(f"Sample ready: {len(data)}b.")        q.put(data)def video2_sampler(q, q2, source):    with Device.from_id(abs(int(source))) as device:        device.set_format(            1,            device.info.frame_sizes[0].width,            device.info.frame_sizes[0].height,            pixel_format="YUYV",        )        last = 0        for frame in device:            new = time.monotonic()            if new - last > 10:                q2.put(bytes(frame))                last = newdef push(pool_url, data, secret):    logger.info(f"Pushing {len(data)}b.")    resp = requests.post(        f"{pool_url}/api/pool",        data=data,        headers={"X-Secret": secret},        timeout=(push_timeout, push_timeout),    )    (logger.success if resp.status_code == 200 else logger.error)(        f"{resp.status_code}: {resp.text}"    )def puller(queue, source, source_type, multiplier):    while True:        try:            data = sample(source, source_type, multiplier)        except KeyboardInterrupt:            logger.info("Interrupted by user.")            sys.exit(0)        except Exception as e:            logger.error(f"Pull exception: {e}")            continue        for piece in chunks(data, 1024 * 500):            queue.put(piece)def pusher(queue, pool_url, secret, cooldown=0):    while True:        piece = queue.get()        try:            push(pool_url, piece, secret)        except KeyboardInterrupt:            logger.info("Interrupted by user.")            sys.exit(0)        except Exception as e:            logger.error(f"Push exception: {e}")            queue.put(piece)        if cooldown:            time.sleep(cooldown)if __name__ == "__main__":    parser = argparse.ArgumentParser()    parser.add_argument("--source", type=str, required=True)    parser.add_argument("--source-type", type=str, default="video+audio")    parser.add_argument("--multiplier", type=float, default=1)    parser.add_argument("--secret-file", type=str, default="./.secret")    parser.add_argument("--cooldown", type=int, default=0)    parser.add_argument("--pool-url", type=str, default="https://yebi.su")    parser.add_argument("--push-timeout", type=int, default=10)    parser.add_argument("--tmp-dir", type=str)    args = parser.parse_args()    if args.tmp_dir and os.path.isdir(args.tmp_dir):        tmp_dir = args.tmp_dir        logger.info(f"Changed temp-dir: '{tmp_dir}'")    push_timeout = max(args.push_timeout, 1)    with open(args.secret_file, "r") as f:        lines = f.read().strip().split("\n")        ident = lines[0].strip()        secret = lines[1].strip()        secret = f"{ident} {secret}"    q = queue.Queue()    pusher_th = threading.Thread(        target=pusher, args=(q, args.pool_url, secret, args.cooldown)    )    if args.source_type == "linuxvideo":        q2 = queue.Queue()        threading.Thread(target=video2_sampler, args=(q, 12, args.source)).start()        threading.Thread(target=video2_extractor, args=(q, q2)).start()    else:        threading.Thread(            target=puller, args=(q, args.source, args.source_type, args.multiplier)        ).start()    pusher_th = threading.Thread(        target=pusher, args=(q, args.pool_url, secret, args.cooldown)    )    pusher_th.start()    pusher_th.join()
 |