taruu před 2 měsíci
rodič
revize
cca59f386d
2 změnil soubory, kde provedl 435 přidání a 327 odebrání
  1. 46 327
      main.py
  2. 389 0
      old_main.py

+ 46 - 327
main.py

@@ -1,128 +1,21 @@
-import os
-import os.path
-import sys
-import time
-import queue
-import argparse
-import tempfile
-import threading
-import subprocess
+import asyncio
+from concurrent.futures import ProcessPoolExecutor
 from itertools import groupby
 
-import cv2
-import requests
+import aiomultiprocess
 import bitstring
-from loguru import logger
+from linuxpy.io import GeventIO
+from linuxpy.video.device import Device
 from Crypto.Hash import BLAKE2b
 
-try:
-    import rdrand
-except ImportError:
-    logger.warning("RdRand is not available.")
-
-try:
-    from linuxpy.video.device import Device, VideoCapture
-except ImportError:
-    logger.warning("linuxpy is not available.")
-
-tmp_dir = None
-push_timeout = 10
-
-
-class TemporaryFile:
-    def __init__(self, name, io, delete):
-        self.name = name
-        self.__io = io
-        self.__delete = delete
-
-    def __getattr__(self, k):
-        return getattr(self.__io, k)
-
-    def __del__(self):
-        if self.__delete:
-            try:
-                os.unlink(self.name)
-            except FileNotFoundError:
-                pass
-
-
-def NamedTemporaryFile(
-    mode="w+b", bufsize=-1, suffix="", prefix="tmp", dir=None, delete=True
-):
-    if not dir:
-        dir = tempfile.gettempdir()
-
-    name = os.path.join(dir, prefix + os.urandom(32).hex() + suffix)
-
-    if mode is None:
-        return TemporaryFile(name, None, delete)
-
-    fh = open(name, "w+b", bufsize)
-    if mode != "w+b":
-        fh.close()
-        fh = open(name, mode)
+from loguru import logger
 
-    return TemporaryFile(name, fh, delete)
+logger.level("INFO")
 
 
 def chunks(lst, n):
     for i in range(0, len(lst), n):
-        yield lst[i : i + n]
-
-
-def run_check(cmd):
-    logger.info(f"Executing '{cmd}'.")
-
-    subprocess.check_output(
-        cmd,
-        stderr=subprocess.STDOUT,
-        timeout=100,
-        shell=True
-    )
-
-def extract_image(path):
-    logger.info(f"Extract image '{path}'.")
-
-    im = cv2.imread(path)
-
-    data = []
-    rows, cols, _ = im.shape
-    for i in range(rows):
-        for j in range(cols):
-            r, g, b = im[i, j]
-
-            data.extend((r, g, b))
-
-    return bytes(data)
-
-
-def extract_wav(path):
-    logger.info(f"Extract audio: '{path}'.")
-
-    data = []
-
-    with open(path, "rb") as f:
-        for sample in chunks(f.read()[44:], 2):
-            data.append(sample[0])
-
-    return bytes(data)
-
-
-def extract_video(path):
-    logger.info(f"Extract video: '{path}'.")
-
-    with tempfile.TemporaryDirectory(dir=tmp_dir) as tmpd:
-        run_check(
-            f"ffmpeg -hide_banner -loglevel error -y -i {path} -vf mpdecimate -r 1/1 {tmpd}/%d.bmp"
-        )
-
-        data = b""
-        for filename in sorted(
-            os.listdir(tmpd), key=lambda filename: int(filename.split(".")[0])
-        ):
-            data += extract_image(os.path.join(tmpd, filename))
-
-        return data
+        yield lst[i: i + n]
 
 
 def extract_lsbs(data):
@@ -146,7 +39,7 @@ def extract_lsbs(data):
     return bytes(buffer)
 
 
-def whiten(data):
+def whiten(data: bytes) -> bytes:
     logger.info("Whitening.")
 
     buffer = b""
@@ -159,230 +52,56 @@ def whiten(data):
     return buffer
 
 
-def read_video(source, duration=60):
-    tmpf = NamedTemporaryFile(suffix=".mkv", mode=None, dir=tmp_dir)
-
-    run_check(
-        f"ffmpeg -hide_banner -loglevel error -y -i {source} -t {duration} -acodec copy -vcodec copy {tmpf.name}"
-    )
-
-    return extract_video(tmpf.name)
-
-
-def read_audio(source, duration=60):
-    tmpf = NamedTemporaryFile(suffix=".wav", mode=None, dir=tmp_dir)
-
-    run_check(
-        f"ffmpeg -hide_banner -loglevel error -y -f alsa -i {source} -t {duration} -ar 44100 -f s16le -acodec pcm_s16le {tmpf.name}"
-    )
-
-    return extract_wav(tmpf.name)
-
-
-def read_audio_video(source, duration=60):
-    tmpf = NamedTemporaryFile(suffix=".mkv", mode=None, dir=tmp_dir)
+def video2_extractor(frame_bytes: bytes):
+    logger.debug(f"frames_in {len(frame_bytes)}")
+    chunked_bytes = chunks(frame_bytes, 8)
+    newdata = b""
 
-    run_check(
-        f"ffmpeg -hide_banner -loglevel error -y -i {source} -t {duration} -acodec copy -vcodec copy {tmpf.name}"
-    )
+    for chunk in chunked_bytes:
+        newdata += bytes(a ^ b for a, b in zip(chunk[:4], chunk[4:]))
 
-    data_a = extract_video(tmpf.name)
+    logger.debug(f"newdata {len(newdata)}")
+    out_bytes = extract_lsbs(newdata)
+    out_bytes = whiten(out_bytes)
 
-    tmpf2 = NamedTemporaryFile(suffix=".wav", mode=None, dir=tmp_dir)
+    logger.info(f"Sample ready: {len(out_bytes)}b.")
+    return out_bytes
 
-    run_check(
-        f"ffmpeg -hide_banner -loglevel error -y -i {tmpf.name} -vn -ar 44100 -f s16le -acodec pcm_s16le {tmpf2.name}"
-    )
 
-    data_b = extract_wav(tmpf2.name)
+class FrameTakeWorker:
+    def __init__(self, device_id=0, processes=3):
+        self.executor_pool = ProcessPoolExecutor(max_workers=3)
+        logger.info(self.executor_pool)
+        self.video_source = Device.from_id(device_id)
+        self.video_source.open()
 
-    return bytes(a ^ b for a, b in zip(data_a, data_b))
-
-
-def read_rdseed(_, amount=16):
-    data = rdrand.rdseed_get_bytes(amount)
-    if len(data) != amount or data.count(0) == amount:
-        raise ValueError("bad data")
-
-    return data
-
-
-def sample(source, source_type, multiplier=1):
-    match source_type:
-        case "video":
-            sampler = read_video
-            multiplier *= 60
-
-        case "audio":
-            sampler = read_audio
-            multiplier *= 60
-
-        case "video+audio":
-            sampler = read_audio_video
-            multiplier *= 60
-
-        case "rdseed":
-            sampler = read_rdseed
-
-        case _:
-            raise ValueError(source_type)
-
-    multiplier = int(multiplier)
-    if multiplier < 1:
-        raise ValueError(multiplier)
-
-    logger.info("Sampling...")
-
-    data = sampler(source, multiplier)
-
-    logger.info(f"Sample ready: {len(data)}b.")
-
-    if source_type != "rdseed":
-        data = extract_lsbs(data)
-        data = whiten(data)
-
-    return data
-
-
-def video2_extractor(q, q2):
-    while True:
-        data = q2.get()
-        data = chunks(data, 8)
-
-        newdata = b""
-        for chunk in data:
-            newdata += bytes(a ^ b for a, b in zip(chunk[:4], chunk[4:]))
-
-        data = extract_lsbs(newdata)
-        data = whiten(data)
-
-        logger.info(f"Sample ready: {len(data)}b.")
-
-        q.put(data)
-
-
-def video2_sampler(q, q2, source):
-    with Device.from_id(abs(int(source))) as device:
-        device.set_format(
+        self.video_source.set_format(
             1,
-            device.info.frame_sizes[0].width,
-            device.info.frame_sizes[0].height,
+            self.video_source.info.frame_sizes[0].width,
+            self.video_source.info.frame_sizes[0].height,
             pixel_format="YUYV",
         )
 
-        last = 0
-        for frame in device:
-            new = time.monotonic()
-
-            if new - last > 10:
-                q2.put(bytes(frame))
-
-                last = new
-
-
-def push(pool_url, data, secret):
-    logger.info(f"Pushing {len(data)}b.")
-
-    resp = requests.post(
-        f"{pool_url}/api/pool",
-        data=data,
-        headers={"X-Secret": secret},
-        timeout=(push_timeout, push_timeout),
-    )
-
-    (logger.success if resp.status_code == 200 else logger.error)(
-        f"{resp.status_code}: {resp.text}"
-    )
-
-
-def puller(queue, source, source_type, multiplier, sample_interval):
-    while True:
-        try:
-            data = sample(source, source_type, multiplier)
-        except KeyboardInterrupt:
-            logger.info("Interrupted by user.")
-
-            sys.exit(0)
-        except Exception as e:
-            logger.error(f"Pull exception: {e}")
-
-            if sample_interval:
-                time.sleep(sample_interval)
-
-            continue
-
-        for piece in chunks(data, 1024 * 500):
-            queue.put(piece)
-
-        if sample_interval:
-            time.sleep(sample_interval)
-
-
-def pusher(queue, pool_url, secret, cooldown=0):
-    while True:
-        piece = queue.get()
-
-        try:
-            push(pool_url, piece, secret)
-        except KeyboardInterrupt:
-            logger.info("Interrupted by user.")
+        self.frame_gen = self.video_source.__iter__()
 
-            sys.exit(0)
-        except Exception as e:
-            logger.error(f"Push exception: {e}")
+    async def get_frame(self):
+        return next(self.frame_gen)
 
-            queue.put(piece)
+    async def frame_extractor(self, frame_bytes: bytes):
+        loop = asyncio.get_event_loop()
+        logger.info(f"Input bytes: {len(frame_bytes)}")
+        output = await loop.run_in_executor(self.executor_pool, video2_extractor, frame_bytes)
+        return output
 
-        if cooldown:
-            time.sleep(cooldown)
+    async def test(self):
+        while True:
+            logger.info("get frame")
+            frame = await self.get_frame()
+            logger.info(f"{frame}")
+            data = await self.frame_extractor(bytes(frame))
+            logger.info(f"{len(data)}")
 
 
 if __name__ == "__main__":
-    parser = argparse.ArgumentParser()
-    parser.add_argument("--source", type=str, required=True)
-    parser.add_argument("--source-type", type=str, default="video+audio")
-    parser.add_argument("--multiplier", type=float, default=1)
-    parser.add_argument("--secret-file", type=str, default="./.secret")
-    parser.add_argument("--cooldown", type=int, default=0)
-    parser.add_argument("--sample-interval", type=int, default=0)
-    parser.add_argument("--pool-url", type=str, default="https://yebi.su")
-    parser.add_argument("--push-timeout", type=int, default=10)
-    parser.add_argument("--tmp-dir", type=str)
-
-    args = parser.parse_args()
-
-    if args.tmp_dir and os.path.isdir(args.tmp_dir):
-        tmp_dir = args.tmp_dir
-
-        logger.info(f"Changed temp-dir: '{tmp_dir}'")
-
-    push_timeout = max(args.push_timeout, 1)
-
-    with open(args.secret_file, "r") as f:
-        lines = f.read().strip().split("\n")
-        ident = lines[0].strip()
-        secret = lines[1].strip()
-
-        secret = f"{ident} {secret}"
-
-    q = queue.Queue()
-
-    pusher_th = threading.Thread(
-        target=pusher, args=(q, args.pool_url, secret, args.cooldown)
-    )
-
-    if args.source_type == "linuxvideo":
-        q2 = queue.Queue()
-
-        threading.Thread(target=video2_sampler, args=(q, q2, args.source)).start()
-        threading.Thread(target=video2_extractor, args=(q, q2)).start()
-    else:
-        threading.Thread(
-            target=puller, args=(q, args.source, args.source_type, args.multiplier, args.sample_interval)
-        ).start()
-
-    pusher_th = threading.Thread(
-        target=pusher, args=(q, args.pool_url, secret, args.cooldown)
-    )
-    pusher_th.start()
-    pusher_th.join()
+    ftw = FrameTakeWorker(0)
+    asyncio.run(ftw.test())

+ 389 - 0
old_main.py

@@ -0,0 +1,389 @@
+import os
+import os.path
+import sys
+import time
+import queue
+import argparse
+import tempfile
+import threading
+import subprocess
+from itertools import groupby
+
+import cv2
+import requests
+import bitstring
+from loguru import logger
+from Crypto.Hash import BLAKE2b
+
+try:
+    import rdrand
+except ImportError:
+    logger.warning("RdRand is not available.")
+
+try:
+    from linuxpy.video.device import Device, VideoCapture
+except ImportError:
+    logger.warning("linuxpy is not available.")
+
+tmp_dir = None
+push_timeout = 10
+
+
+class TemporaryFile:
+    def __init__(self, name, io, delete):
+        self.name = name
+        self.__io = io
+        self.__delete = delete
+
+    def __getattr__(self, k):
+        return getattr(self.__io, k)
+
+    def __del__(self):
+        if self.__delete:
+            try:
+                os.unlink(self.name)
+            except FileNotFoundError:
+                pass
+
+
+def NamedTemporaryFile(
+    mode="w+b", bufsize=-1, suffix="", prefix="tmp", dir=None, delete=True
+):
+    if not dir:
+        dir = tempfile.gettempdir()
+
+    name = os.path.join(dir, prefix + os.urandom(32).hex() + suffix)
+
+    if mode is None:
+        return TemporaryFile(name, None, delete)
+
+    fh = open(name, "w+b", bufsize)
+    if mode != "w+b":
+        fh.close()
+        fh = open(name, mode)
+
+    return TemporaryFile(name, fh, delete)
+
+
+def chunks(lst, n):
+    for i in range(0, len(lst), n):
+        yield lst[i : i + n]
+
+
+def run_check(cmd):
+    logger.info(f"Executing '{cmd}'.")
+
+    subprocess.check_output(
+        cmd,
+        stderr=subprocess.STDOUT,
+        timeout=100,
+        shell=True
+    )
+
+def extract_image(path):
+    logger.info(f"Extract image '{path}'.")
+
+    im = cv2.imread(path)
+
+    data = []
+    rows, cols, _ = im.shape
+    for i in range(rows):
+        for j in range(cols):
+            r, g, b = im[i, j]
+
+            data.extend((r, g, b))
+
+    return bytes(data)
+
+
+def extract_wav(path):
+    logger.info(f"Extract audio: '{path}'.")
+
+    data = []
+
+    with open(path, "rb") as f:
+        for sample in chunks(f.read()[44:], 2):
+            data.append(sample[0])
+
+    return bytes(data)
+
+
+def extract_video(path):
+    logger.info(f"Extract video: '{path}'.")
+
+    with tempfile.TemporaryDirectory(dir=tmp_dir) as tmpd:
+        run_check(
+            f"ffmpeg -hide_banner -loglevel error -y -i {path} -vf mpdecimate -r 1/1 {tmpd}/%d.bmp"
+        )
+
+        data = b""
+        for filename in sorted(
+            os.listdir(tmpd), key=lambda filename: int(filename.split(".")[0])
+        ):
+            data += extract_image(os.path.join(tmpd, filename))
+
+        return data
+
+
+def extract_lsbs(data):
+    logger.info("Extract LSBs.")
+
+    buffer = []
+
+    data = [k for k, _ in groupby(data)]
+    data = chunks(data, 8)
+
+    for chunk in data:
+        if len(chunk) != 8:
+            break
+
+        ba = bitstring.BitArray(8)
+        for i, byte in zip(range(len(chunk)), chunk):
+            ba[i] = byte & 1
+
+        buffer.append(ba.u)
+
+    return bytes(buffer)
+
+
+def whiten(data):
+    logger.info("Whitening.")
+
+    buffer = b""
+    for chunk in chunks(data, 32):
+        if len(chunk) < 32:
+            break
+
+        buffer += BLAKE2b.new(data=chunk, digest_bits=256).digest()
+
+    return buffer
+
+
+def read_video(source, duration=60):
+    tmpf = NamedTemporaryFile(suffix=".mkv", mode=None, dir=tmp_dir)
+
+    run_check(
+        f"ffmpeg -hide_banner -loglevel error -y -i {source} -t {duration} -acodec copy -vcodec copy {tmpf.name}"
+    )
+
+    return extract_video(tmpf.name)
+
+
+def read_audio(source, duration=60):
+    tmpf = NamedTemporaryFile(suffix=".wav", mode=None, dir=tmp_dir)
+
+    run_check(
+        f"ffmpeg -hide_banner -loglevel error -y -f alsa -i {source} -t {duration} -ar 44100 -f s16le -acodec pcm_s16le {tmpf.name}"
+    )
+
+    return extract_wav(tmpf.name)
+
+
+def read_audio_video(source, duration=60):
+    tmpf = NamedTemporaryFile(suffix=".mkv", mode=None, dir=tmp_dir)
+
+    run_check(
+        f"ffmpeg -hide_banner -loglevel error -y -i {source} -t {duration} -acodec copy -vcodec copy {tmpf.name}"
+    )
+
+    data_a = extract_video(tmpf.name)
+
+    tmpf2 = NamedTemporaryFile(suffix=".wav", mode=None, dir=tmp_dir)
+
+    run_check(
+        f"ffmpeg -hide_banner -loglevel error -y -i {tmpf.name} -vn -ar 44100 -f s16le -acodec pcm_s16le {tmpf2.name}"
+    )
+
+    data_b = extract_wav(tmpf2.name)
+
+    return bytes(a ^ b for a, b in zip(data_a, data_b))
+
+
+def read_rdseed(_, amount=16):
+    data = rdrand.rdseed_get_bytes(amount)
+    if len(data) != amount or data.count(0) == amount:
+        raise ValueError("bad data")
+
+    return data
+
+
+def sample(source, source_type, multiplier=1):
+    match source_type:
+        case "video":
+            sampler = read_video
+            multiplier *= 60
+
+        case "audio":
+            sampler = read_audio
+            multiplier *= 60
+
+        case "video+audio":
+            sampler = read_audio_video
+            multiplier *= 60
+
+        case "rdseed":
+            sampler = read_rdseed
+
+        case _:
+            raise ValueError(source_type)
+
+    multiplier = int(multiplier)
+    if multiplier < 1:
+        raise ValueError(multiplier)
+
+    logger.info("Sampling...")
+
+    data = sampler(source, multiplier)
+
+    logger.info(f"Sample ready: {len(data)}b.")
+
+    if source_type != "rdseed":
+        data = extract_lsbs(data)
+        data = whiten(data)
+
+    return data
+
+
+def video2_extractor(q, q2):
+    while True:
+        data = q2.get()
+        data = chunks(data, 8)
+
+        newdata = b""
+        for chunk in data:
+            newdata += bytes(a ^ b for a, b in zip(chunk[:4], chunk[4:]))
+
+        data = extract_lsbs(newdata)
+        data = whiten(data)
+
+        logger.info(f"Sample ready: {len(data)}b.")
+
+        q.put(data)
+
+
+def video2_sampler(q, q2, source):
+    with Device.from_id(abs(int(source))) as device:
+        device.set_format(
+            1,
+            device.info.frame_sizes[0].width,
+            device.info.frame_sizes[0].height,
+            pixel_format="YUYV",
+        )
+
+        last = 0
+        for frame in device:
+            new = time.monotonic()
+
+            if new - last > 10:
+                q2.put(bytes(frame))
+
+                last = new
+
+
+def push(pool_url, data, secret):
+    logger.info(f"Pushing {len(data)}b.")
+
+    resp = requests.post(
+        f"{pool_url}/api/pool",
+        data=data,
+        headers={"X-Secret": secret},
+        timeout=(push_timeout, push_timeout),
+    )
+
+    (logger.success if resp.status_code == 200 else logger.error)(
+        f"{resp.status_code}: {resp.text}"
+    )
+
+
+def puller(queue, source, source_type, multiplier, sample_interval):
+    while True:
+        try:
+            data = sample(source, source_type, multiplier)
+        except KeyboardInterrupt:
+            logger.info("Interrupted by user.")
+
+            sys.exit(0)
+        except Exception as e:
+            logger.error(f"Pull exception: {e}")
+
+            if sample_interval:
+                time.sleep(sample_interval)
+
+            continue
+
+        for piece in chunks(data, 1024 * 500):
+            queue.put(piece)
+
+        if sample_interval:
+            time.sleep(sample_interval)
+
+
+def pusher(queue, pool_url, secret, cooldown=0):
+    while True:
+        piece = queue.get()
+
+        try:
+            push(pool_url, piece, secret)
+        except KeyboardInterrupt:
+            logger.info("Interrupted by user.")
+
+            sys.exit(0)
+        except Exception as e:
+            logger.error(f"Push exception: {e}")
+
+            queue.put(piece)
+
+        if cooldown:
+            time.sleep(cooldown)
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--source", type=str, required=True)
+    parser.add_argument("--source-type", type=str, default="video+audio")
+    parser.add_argument("--multiplier", type=float, default=1)
+    parser.add_argument("--secret-file", type=str, default="./.secret")
+    parser.add_argument("--cooldown", type=int, default=0)
+    parser.add_argument("--sample-interval", type=int, default=0)
+    parser.add_argument("--pool-url", type=str, default="https://yebi.su")
+    parser.add_argument("--push-timeout", type=int, default=10)
+    parser.add_argument("--tmp-dir", type=str)
+
+    args = parser.parse_args()
+
+    if args.tmp_dir and os.path.isdir(args.tmp_dir):
+        tmp_dir = args.tmp_dir
+
+        logger.info(f"Changed temp-dir: '{tmp_dir}'")
+
+    push_timeout = max(args.push_timeout, 1)
+
+    with open(args.secret_file, "r") as f:
+        lines = f.read().strip().split("\n")
+        ident = lines[0].strip()
+        secret = lines[1].strip()
+
+        secret = f"{ident} {secret}"
+
+    q = queue.Queue()
+
+
+    pusher_th = threading.Thread(
+        target=pusher, args=(q, args.pool_url, secret, args.cooldown)
+    )
+
+    if args.source_type == "linuxvideo":
+        q2 = queue.Queue()
+
+        threading.Thread(target=video2_sampler, args=(q, q2, args.source)).start()
+        threading.Thread(target=video2_extractor, args=(q, q2)).start()
+    else:
+        threading.Thread(
+            target=puller, args=(q, args.source, args.source_type, args.multiplier, args.sample_interval)
+        ).start()
+
+    pusher_th = threading.Thread(
+        target=pusher, args=(q, args.pool_url, secret, args.cooldown)
+    )
+    pusher_th.start()
+    pusher_th.join()