|
@@ -1,9 +1,11 @@
|
|
|
import os
|
|
|
import os.path
|
|
|
+import sys
|
|
|
import time
|
|
|
+import queue
|
|
|
import argparse
|
|
|
import tempfile
|
|
|
-import traceback
|
|
|
+import threading
|
|
|
|
|
|
import cv2
|
|
|
import requests
|
|
@@ -223,6 +225,9 @@ def sample(source, source_type, multiplier=1):
|
|
|
|
|
|
|
|
|
def push(pool_url, data, secret):
|
|
|
+ logger.info(f"Pushing {len(data)}b.")
|
|
|
+
|
|
|
+
|
|
|
resp = requests.post(
|
|
|
f"{pool_url}/api/pool", data=data[: 1024**2], headers={"X-Secret": secret}
|
|
|
)
|
|
@@ -232,6 +237,40 @@ def push(pool_url, data, secret):
|
|
|
)
|
|
|
|
|
|
|
|
|
+def puller(queue, source, source_type, multiplier):
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ data = sample(source, source_type, multiplier=1)
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ logger.info("Interrupted by user.")
|
|
|
+
|
|
|
+ sys.exit(0)
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Pull exception: {e}")
|
|
|
+
|
|
|
+ continue
|
|
|
+
|
|
|
+ for piece in chunks(data, 512):
|
|
|
+ queue.put(piece)
|
|
|
+
|
|
|
+
|
|
|
+def pusher(queue, pool_url, secret, cooldown=0):
|
|
|
+ while True:
|
|
|
+ piece = queue.get()
|
|
|
+
|
|
|
+ try:
|
|
|
+ push(pool_url, piece, secret)
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ logger.info("Interrupted by user.")
|
|
|
+
|
|
|
+ sys.exit(0)
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Push exception: {e}")
|
|
|
+
|
|
|
+ if cooldown:
|
|
|
+ time.sleep(cooldown)
|
|
|
+
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
|
parser = argparse.ArgumentParser()
|
|
|
parser.add_argument("--source", type=str, required=True)
|
|
@@ -250,18 +289,13 @@ if __name__ == "__main__":
|
|
|
|
|
|
secret = f"{ident} {secret}"
|
|
|
|
|
|
- while True:
|
|
|
- try:
|
|
|
- data = sample(args.source, args.source_type, args.multiplier)
|
|
|
+ q = queue.Queue()
|
|
|
+ threading.Thread(
|
|
|
+ target=puller, args=(q, args.source, args.source_type, args.multiplier)
|
|
|
+ ).start()
|
|
|
|
|
|
- push(args.pool_url, data, secret)
|
|
|
- except KeyboardInterrupt:
|
|
|
- logger.info("Interrupted by user.")
|
|
|
-
|
|
|
- break
|
|
|
- except Exception as e:
|
|
|
- traceback.print_exc()
|
|
|
-
|
|
|
- logger.error(e)
|
|
|
-
|
|
|
- time.sleep(args.cooldown)
|
|
|
+ pusher_th = threading.Thread(
|
|
|
+ target=pusher, args=(q, args.pool_url, secret, args.cooldown)
|
|
|
+ )
|
|
|
+ pusher_th.start()
|
|
|
+ pusher_th.join()
|