|
@@ -93,6 +93,19 @@ def extract_wav(path):
|
|
|
|
|
|
return bytes(data)
|
|
return bytes(data)
|
|
|
|
|
|
|
|
+def extract_video(path):
|
|
|
|
+ logger.info(f"Extract video: '{path}'.")
|
|
|
|
+
|
|
|
|
+ with tempfile.TemporaryDirectory() as tmpd:
|
|
|
|
+ run_check(
|
|
|
|
+ f"ffmpeg -hide_banner -loglevel error -y -i {path} -vf mpdecimate -r 1/1 {tmpd}/%d.bmp"
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ data = b""
|
|
|
|
+ for filename in os.listdir(tmpd):
|
|
|
|
+ data += extract_image(os.path.join(tmpd, filename))
|
|
|
|
+
|
|
|
|
+ return data
|
|
|
|
|
|
def extract_lsbs(data):
|
|
def extract_lsbs(data):
|
|
logger.info("Extract LSBs.")
|
|
logger.info("Extract LSBs.")
|
|
@@ -130,16 +143,7 @@ def read_video(source, duration=60):
|
|
f"ffmpeg -hide_banner -loglevel error -y -i {source} -t {duration} -acodec copy -vcodec copy {tmpf.name}"
|
|
f"ffmpeg -hide_banner -loglevel error -y -i {source} -t {duration} -acodec copy -vcodec copy {tmpf.name}"
|
|
)
|
|
)
|
|
|
|
|
|
- with tempfile.TemporaryDirectory() as tmpd:
|
|
|
|
- run_check(
|
|
|
|
- f"ffmpeg -hide_banner -loglevel error -y -i {tmpf.name} -vf mpdecimate -r 1/1 {tmpd}/%d.bmp"
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
- data = b""
|
|
|
|
- for filename in os.listdir(tmpd):
|
|
|
|
- data += extract_image(os.path.join(tmpd, filename))
|
|
|
|
-
|
|
|
|
- return data
|
|
|
|
|
|
+ return extract_video(tmpf.name)
|
|
|
|
|
|
|
|
|
|
def read_audio(source, duration=60):
|
|
def read_audio(source, duration=60):
|
|
@@ -159,26 +163,17 @@ def read_audio_video(source, duration=60):
|
|
f"ffmpeg -hide_banner -loglevel error -y -i {source} -t {duration} -acodec copy -vcodec copy {tmpf.name}"
|
|
f"ffmpeg -hide_banner -loglevel error -y -i {source} -t {duration} -acodec copy -vcodec copy {tmpf.name}"
|
|
)
|
|
)
|
|
|
|
|
|
- with tempfile.TemporaryDirectory() as tmpd:
|
|
|
|
- run_check(
|
|
|
|
- f"ffmpeg -hide_banner -loglevel error -y -i {tmpf.name} -vf mpdecimate -r 1/1 {tmpd}/%d.bmp"
|
|
|
|
- )
|
|
|
|
-
|
|
|
|
- data_a = b""
|
|
|
|
- for filename in sorted(
|
|
|
|
- os.listdir(tmpd), key=lambda filename: int(filename.split(".")[0])
|
|
|
|
- ):
|
|
|
|
- data_a += extract_image(os.path.join(tmpd, filename))
|
|
|
|
|
|
+ data_a = extract_video(tmpf.name)
|
|
|
|
|
|
- tmpf2 = NamedTemporaryFile(suffix=".wav", mode=None)
|
|
|
|
|
|
+ tmpf2 = NamedTemporaryFile(suffix=".wav", mode=None)
|
|
|
|
|
|
- run_check(
|
|
|
|
- f"ffmpeg -hide_banner -loglevel error -y -i {tmpf.name} -vn -ar 48000 -f s16le -acodec pcm_s16le {tmpf2.name}"
|
|
|
|
- )
|
|
|
|
|
|
+ run_check(
|
|
|
|
+ f"ffmpeg -hide_banner -loglevel error -y -i {tmpf.name} -vn -ar 48000 -f s16le -acodec pcm_s16le {tmpf2.name}"
|
|
|
|
+ )
|
|
|
|
|
|
- data_b = extract_wav(tmpf2.name)
|
|
|
|
|
|
+ data_b = extract_wav(tmpf2.name)
|
|
|
|
|
|
- return bytes(a ^ b for a, b in zip(data_a, data_b))
|
|
|
|
|
|
+ return bytes(a ^ b for a, b in zip(data_a, data_b))
|
|
|
|
|
|
|
|
|
|
def read_rdseed(_, amount=16):
|
|
def read_rdseed(_, amount=16):
|