Taruu 2 mesiacov pred
rodič
commit
fcf1359d16
1 zmenil súbory, kde vykonal 65 pridanie a 8 odobranie
  1. 65 8
      main.py

+ 65 - 8
main.py

@@ -1,13 +1,18 @@
+import argparse
 import asyncio
 import io
+import random
+import time
 from concurrent.futures import ProcessPoolExecutor
 from itertools import groupby
 import numpy as np
 import bitstring
 from linuxpy.video.device import Device
 from Crypto.Hash import BLAKE2b
+import aiohttp
 
 from loguru import logger
+import math
 
 logger.level("INFO")
 
@@ -89,8 +94,8 @@ def video2_extractor(frame_bytes: bytes):
     return out_bytes
 
 
-class FrameTakeWorker:
-    def __init__(self, device_id=0, processes=3):
+class GeneratorWorker:
+    def __init__(self, pool_url: str, auth: str, device_id=0, processes=3, frame_timeout=1, push_timeout=10):
         self.processes = processes
         self.executor_pool = ProcessPoolExecutor(max_workers=3)
         logger.info(self.executor_pool)
@@ -105,6 +110,13 @@ class FrameTakeWorker:
         )
 
         self.frame_gen = self.video_source.__iter__()
+        self.frame_timeout = frame_timeout
+
+        self.pool_url = f"{pool_url}/api/pool"
+        self.auth = auth
+        self.push_timeout = push_timeout
+
+        self.bytes_queue = asyncio.Queue(maxsize=16)
 
     async def get_frame(self):
         return next(self.frame_gen)
@@ -115,24 +127,69 @@ class FrameTakeWorker:
 
     async def frame_extractor(self, frame_bytes: bytes):
         loop = asyncio.get_event_loop()
-        logger.info(f"Input bytes: {len(frame_bytes)}")
+        logger.info(f"Take frame bytes: {len(frame_bytes)}")
         output = await loop.run_in_executor(self.executor_pool, video2_extractor, frame_bytes)
         return output
 
     async def generator_worker(self):
+        await asyncio.sleep(random.uniform(1, 3))
         while True:
-            logger.info("get frame")
+            get_frame = time.monotonic()
             frame = await self.get_frame()
-            logger.info(f"{frame}")
             data = await self.frame_extractor(bytes(frame))
-            logger.info(f"{len(data)}")
+            await self.bytes_queue.put(data)
+            put_frame = time.monotonic()
+            frame_time = get_frame - put_frame
+
+            if frame_time < self.frame_timeout:
+                await asyncio.sleep(math.ceil(self.frame_timeout - frame_time))
+            else:
+                await asyncio.sleep(random.uniform(1, 3))
+
+    async def push_worker(self):
+        await asyncio.sleep(self.push_timeout)
+        while True:
+            data_bytes = b""
+
+            for _ in range(self.bytes_queue.qsize()):
+                data_bytes += await self.bytes_queue.get()
+
+            async with aiohttp.ClientSession() as session:
+                async with session.post(self.pool_url, data=data_bytes, headers={"X-Secret": self.auth}) as response:
+                    if response.status == 200:
+                        logger.success(f"{response.status}: {response.text}")
+                    else:
+                        logger.error(f"{response.status}: {response.text}")
+            await asyncio.sleep(self.push_timeout)
 
     async def run_workers(self):
         tasks = [self.generator_worker() for _ in range(self.processes)]
+        tasks.append(self.push_worker())
         await asyncio.gather(*tasks)
-        pass
 
 
 if __name__ == "__main__":
-    ftw = FrameTakeWorker(0, 3)
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--source", type=str, required=True)
+    # parser.add_argument("--source-type", type=str, default="video")
+    parser.add_argument("--secret-file", type=str, default="./.secret")
+    parser.add_argument("--frame-timeout", type=int, default=1)
+    parser.add_argument("--pool-url", type=str, default="https://yebi.su")
+    parser.add_argument("--push-timeout", type=int, default=10)
+    parser.add_argument("--tmp-dir", type=str)
+
+    args = parser.parse_args()
+
+    with open(args.secret_file, "r") as f:
+        lines = f.read().strip().split("\n")
+        ident = lines[0].strip()
+        secret = lines[1].strip()
+
+        secret = f"{ident} {secret}"
+
+    ftw = GeneratorWorker(pool_url=args.pool_url, auth=secret,
+                          device_id=args.source,
+                          frame_timeout=args.frame_timeout,
+                          push_timeout=args.push_timeout)
+
     asyncio.run(ftw.run_workers())