txlyre 2 年之前
當前提交
89f7d2ee28
共有 2 個文件被更改,包括 1158 次插入0 次删除
  1. 786 0
      byafn.py
  2. 372 0
      byafnctl.py

+ 786 - 0
byafn.py

@@ -0,0 +1,786 @@
+import os
+import sys
+import time
+import random
+import struct
+import asyncio
+
+from collections import namedtuple
+
+import zstd
+import cbor2
+import ecies
+import hjson
+import aiofiles
+import aiologger
+
+try:
+  import aiofiles.os
+except AttributeError:
+  os.link = os.symlink
+
+  import aiofiles.os
+
+from Crypto.Hash import SHA256
+from Crypto.Random import get_random_bytes
+
+PINGS_COUNT       = 3
+PING_TIMEOUT      = 10
+
+HANDSHAKE_TIMEOUT = 10
+BROADCAST_TIMEOUT = 300
+
+HEARTBEAT         = 10
+WATCHER_INTERVAL  = 30
+
+CACHE_LIFETIME    = 3600
+
+MAX_TTL           = 7
+MAX_DISTANCE      = 8
+
+MAX_PAYLOAD_SIZE  = 1024*1024*64
+CHUNK_SIZE        = 512
+
+RATELIMIT         = 0.3
+
+config = {}
+
+peers = []
+cache = []
+
+logger = aiologger.Logger.with_default_handlers(
+  formatter=aiologger.formatters.base.Formatter(
+    fmt='%(asctime)s %(levelname)s: %(message)s'
+  )
+)
+
+def sha256(data):
+  hash = SHA256.new()
+  hash.update(data)
+
+  return hash.digest()
+
+def chunks(l, n):
+  for i in range(0, len(l), n):
+    yield l[i:i + n]
+
+class Error(Exception): pass
+
+async def is_piece_exists(hash):
+  if await aiofiles.os.path.isfile(
+    os.path.join(
+      config['StoragePath'],
+      hash.hex()
+    )
+  ):
+    return True
+
+  return False
+
+async def save_piece(data, hash=None):
+  if not hash:
+    hash = sha256(data)
+
+  path = os.path.join(
+    config['StoragePath'],
+    hash.hex()
+  )
+
+  async with aiofiles.open(
+    path,
+    'wb'
+  ) as f:
+    data = zstd.compress(data)
+
+    await f.write(data)
+
+  return hash
+
+async def read_piece(hash):
+  path = os.path.join(
+    config['StoragePath'],
+    hash.hex()
+  )
+
+  async with aiofiles.open(
+    path,
+    'rb'
+  ) as f:
+    data = await f.read()
+    data = zstd.decompress(data)
+
+    if sha256(data) != hash:
+      await aiofiles.os.remove(path)
+
+      raise ValueError
+
+    return data
+
+CachedMessage = namedtuple(
+  'CachedMessage', 
+  'kind uid ts'
+)
+
+class Message:
+  QUERY         = 0xa
+  QUERY_HIT     = 0xb
+  NOT_AVAILABLE = 0xc
+
+  def __init__(self, kind, uid=None, **fields):
+    self.kind = kind
+    self.uid = uid if uid else get_random_bytes(16)
+    self.fields = fields
+
+  def __getattr__(self, field):
+    return self.fields[field]
+
+  def cache(self):
+    for message in cache:
+      if (
+        message.uid == self.uid and
+        message.kind == self.kind
+      ):
+        return False
+
+    cache.append(
+      CachedMessage(
+        kind=self.kind,
+        uid=self.uid,
+        ts=time.time()
+      )
+    )
+
+    return True
+
+  def is_response_for(self, message):
+    if self.uid != message.uid:
+      return False
+
+    if message.kind == ServiceMessage.PING:
+      return self.kind == ServiceMessage.PONG
+    elif message.kind == Message.QUERY:
+      return self.kind in (
+        Message.QUERY_HIT,
+        Message.NOT_AVAILABLE
+      )
+
+    return False
+
+class ServiceMessage(Message):
+  HELLO     = 0x0
+  CHALLENGE = 0x1
+  ANSWER    = 0x2
+  FINISH    = 0x3
+
+  PING      = 0x4
+  PONG      = 0x5
+
+  CLOSE     = 0x6
+
+class Peer:
+  def __init__(self, reader, writer, address=None):
+    self.reader = reader
+    self.writer = writer
+    self.address = address
+
+    self.key = None
+    self.queue = []
+    self.is_open = True
+    self.send_lock = asyncio.Lock()
+    self.receive_lock = asyncio.Lock()
+    self.ticks = 0
+    self.last_message_ts = time.time()
+
+async def write(peer, data):
+  peer.writer.write(data)
+  await peer.writer.drain()
+
+async def read(peer, size):
+  buffer = b''
+
+  while len(buffer) < size:
+    buffer += await peer.reader.read(size - len(buffer))
+
+  return buffer
+
+async def send(peer, message):
+  if type(message) is ServiceMessage:
+    buffer = bytes([message.kind])
+
+    if message.fields:
+      payload = cbor2.dumps(message.fields)
+
+      buffer += struct.pack('<H', len(payload))
+      buffer += payload
+    else:
+      buffer += bytes(2)
+
+    async with peer.send_lock:
+      await write(peer, buffer)
+
+      await asyncio.sleep(RATELIMIT)
+
+    return
+
+  payload = b''
+  chunks_count = 0
+
+  if message.fields:
+    payload = cbor2.dumps(message.fields)
+    payload = zstd.compress(payload)
+    payload = ecies.encrypt(
+      peer.key,
+      payload
+    )
+
+    chunks_count = max(1, len(payload) // CHUNK_SIZE)
+
+  buffer = b'\xff' + ecies.encrypt(
+    peer.key,
+    bytes([message.kind]) +
+    message.uid +
+    struct.pack('<H', chunks_count)
+  )
+
+  async with peer.send_lock:
+    await write(peer, buffer)
+
+    if chunks_count:
+      for chunk in chunks(payload, CHUNK_SIZE):
+        await write(
+          peer,
+          struct.pack('<H', len(chunk)) +
+          chunk
+        )
+
+    await asyncio.sleep(RATELIMIT)
+
+async def receive(peer):
+  async with peer.receive_lock:
+    if time.time() - peer.last_message_ts < RATELIMIT:
+      raise Error(f'rate limit (={RATELIMIT}s.) exceeded for incoming messages')
+
+    peer.last_message_ts = time.time()
+
+    kind = (await read(peer, 1))[0]
+
+    if kind != 0xff:
+      if kind > ServiceMessage.CLOSE:
+        raise Error(f'unecrypted non-service messages are not allowed')
+
+      length = struct.unpack('<H', await read(peer, 2))[0]
+
+      payload = {}
+
+      if length:
+        payload = await read(peer, length)
+        payload = cbor2.loads(payload)
+
+      return ServiceMessage(
+        kind,
+        **payload
+      )
+
+    head = await read(peer, 116)
+
+    head = ecies.decrypt(
+      config['Secret'],
+      head
+    )
+
+    kind = head[0]
+    uid = head[1:17]
+    chunks_count = struct.unpack('<H', head[17:19])[0]
+
+    payload = {}
+
+    if chunks_count:
+      payload = b''
+
+      if chunks_count * CHUNK_SIZE > MAX_PAYLOAD_SIZE:
+        raise Error('payload is too large')
+
+      for _ in range(chunks_count):
+        length = struct.unpack('<H', await read(peer, 2))[0]
+
+        if not length or length > CHUNK_SIZE:
+          raise Error('illegal chunk length')
+
+        payload += await read(peer, length)
+
+      payload = ecies.decrypt(
+        config['Secret'],
+        payload
+      )
+      payload = zstd.decompress(payload)
+      payload = cbor2.loads(payload)
+
+    return Message(
+      kind,
+      uid=uid,
+      **payload
+    )
+
+async def close(peer, gracefully=True):
+  if not peer.is_open:
+    return
+
+  if gracefully:
+    try:
+      await send(
+        peer,
+        ServiceMessage(
+          ServiceMessage.CLOSE
+        )
+      )
+    except:
+      pass
+
+  peer.writer.close()
+
+  peer.is_open = False
+
+  try:
+    await asyncio.wait_for(
+      peer.writer.wait_closed(),
+      timeout=3
+    )
+  except:
+    pass
+
+async def wait_response(peer, message):
+  while peer.is_open:
+    for other_message in peer.queue:
+      if other_message.is_response_for(message):
+        peer.queue.remove(other_message)
+
+        return other_message
+
+    await asyncio.sleep(1)
+
+async def communicate(peer, message, timeout=None):
+  await send(peer, message)
+  
+  answer = await asyncio.wait_for(
+    wait_response(
+      peer,
+      message
+    ),
+    timeout=timeout
+  )
+
+  if not answer:
+    raise Error('communication timeout')
+
+  return answer
+
+async def ping(peer):
+  await communicate(
+    peer,
+    ServiceMessage(
+      ServiceMessage.PING
+    ),
+    timeout=PING_TIMEOUT
+  )
+
+async def respond(peer, message, kind, is_service=False, **data):
+  await send(
+    peer,
+    (ServiceMessage if is_service else Message)(
+      kind,
+      uid=message.uid,
+      **data
+    )
+  )
+
+async def query(hash, uid=None, ttl=0, filter=None):
+  if await is_piece_exists(hash):
+    return await read_piece(hash)
+
+  answer = await broadcast(
+    Message(
+      Message.QUERY,
+      uid=uid,
+      hash=hash,
+      ttl=ttl
+    ),
+    fn=lambda answer: answer.kind == Message.QUERY_HIT and sha256(answer.data) == hash,
+    filter=filter
+  )
+
+  if not answer:
+    return None
+
+  await save_piece(
+    answer.data,
+    hash
+  )
+
+  return answer.data
+
+async def broadcast(message, fn=None, filter=None):
+  if message.ttl >= MAX_TTL:
+    return
+
+  message.fields['ttl'] += 1
+
+  for peer in peers:
+    if not peer.is_open:
+      continue
+
+    if filter and not filter(peer):
+      continue
+
+    try:
+      answer = await communicate(
+        peer,
+        message,
+        timeout=BROADCAST_TIMEOUT
+      )
+
+      if fn and fn(answer):
+        return answer
+    except:
+      continue
+
+async def tick(peer):
+  while peer.is_open:
+    if peer.ticks > 0 and peer.ticks % 3 == 0:
+      attempts = PINGS_COUNT
+      while True:
+        try:
+          await ping(peer)
+        except:        
+          attempts -= 1
+          if attempts < 1:
+            self.close(False)
+
+            return
+
+        break
+
+    peer.ticks += 1
+
+    await asyncio.sleep(HEARTBEAT)
+
+async def handshake(peer):
+  await send(
+    peer,
+    ServiceMessage(
+      ServiceMessage.HELLO,
+      key=config['Key']
+    )
+  )
+
+  answer = await receive(peer)
+  if answer.kind != ServiceMessage.HELLO:
+    raise Error
+
+  key = answer.key
+
+  if key == config['Key']:
+    raise Error
+
+  for peer in peers.copy():
+    if peer.key == key:
+      if not peer.is_open:
+        peers.remove(peer)
+
+        continue
+
+      raise Error
+
+  data = get_random_bytes(16)
+
+  await send(
+    peer,
+    ServiceMessage(
+      ServiceMessage.CHALLENGE,
+      data=ecies.encrypt(
+        key,
+        data
+      )
+    )
+  )
+
+  answer = await receive(peer)
+  if answer.kind != ServiceMessage.CHALLENGE:
+    raise Error
+
+  await send(
+    peer,
+    ServiceMessage(
+      ServiceMessage.ANSWER,
+      data=ecies.decrypt(
+        config['Secret'],
+        answer.data
+      )
+    )
+  )
+
+  answer = await receive(peer)
+  if answer.kind != ServiceMessage.ANSWER:
+    raise Error
+
+  if answer.data != data:
+    raise Error
+
+  await send(
+    peer,
+    ServiceMessage(
+      ServiceMessage.FINISH
+    )
+  )
+
+  answer = await receive(peer)
+  if answer.kind != ServiceMessage.FINISH:
+    raise Error
+
+  peer.key = key
+
+async def serve(peer):
+  await asyncio.wait_for(
+    handshake(peer),
+    timeout=HANDSHAKE_TIMEOUT
+  )
+
+  asyncio.get_event_loop().create_task(
+    tick(peer)
+  )
+
+  if peer.address:
+    await logger.info(f'Connected to {peer.address}')
+
+  while peer.is_open:
+    message = await receive(peer)
+
+    if not message.cache():
+      if message.kind == Message.QUERY:
+        await respond(
+          peer,
+          message,
+          Message.NOT_AVAILABLE
+        )
+
+      continue
+
+    if message.kind in (
+      ServiceMessage.PONG,
+      Message.QUERY_HIT,
+      Message.NOT_AVAILABLE
+    ):
+      peer.queue.append(message)
+
+      continue
+
+    if message.kind == ServiceMessage.PING:
+      await respond(
+        peer,
+        message,
+        ServiceMessage.PONG,
+        is_service=True
+      )
+    elif message.kind == ServiceMessage.CLOSE:
+      await close(peer, False)
+    elif message.kind == Message.QUERY:
+      answer = await query(
+        message.hash,
+        uid=message.uid,
+        ttl=message.ttl,
+        filter=lambda other_peer: other_peer.key not in (peer.key, config['Key'])
+      )
+
+      if not answer:
+        await respond(
+          peer,
+          message,
+          Message.NOT_AVAILABLE
+        )
+
+        continue
+
+      await respond(
+        peer,
+        message,
+        Message.QUERY_HIT,
+        data=answer
+      )
+    else:
+      raise Error(f'unknown message kind={message.kind}')
+
+async def accept(reader, writer, address=None):
+  peer = Peer(reader, writer, address)
+  peers.append(peer)
+
+  try:
+    await serve(peer)
+  except Exception as e:
+    if peer.address:
+      await logger.warning(f'Connection lost {peer.address}: {e}')
+  finally:
+    await close(peer)
+
+async def dial(address):
+  parts = address.split(':')
+
+  host = ':'.join(parts[:-1])
+  port = int(parts[-1])
+
+  try:
+     reader, writer = await asyncio.open_connection(
+       host,
+       port
+     )
+  except Exception as e:
+    dummy_peer = Peer(None, None, address)
+    dummy_peer.is_open = False
+    peers.append(dummy_peer)
+
+    await logger.error(f'Dial {address}: {e}')
+
+    return
+
+  asyncio.get_event_loop().create_task(
+    accept(reader, writer, address)
+  )
+
+async def listen():
+  server = await asyncio.start_server(
+    accept,
+    config['ListenAddress'],
+    int(config['ListenPort'])
+  )
+
+  await logger.info(f'Listening at {config["ListenAddress"]}:{config["ListenPort"]}')
+
+  async with server:
+    await server.serve_forever()
+
+async def watcher():
+  while True:
+    for peer in peers.copy():
+      if not peer.is_open:
+        peers.remove(peer)
+
+        if peer.address:
+          asyncio.get_event_loop().create_task(
+            dial(peer.address)
+          )
+
+        continue      
+
+    for message in cache.copy():
+      if time.time() - message.ts > CACHE_LIFETIME:
+        cache.remove(message)
+
+    await asyncio.sleep(WATCHER_INTERVAL)
+
+async def shutdown(delay):
+  await asyncio.sleep(delay)
+
+  await logger.info('Performing graceful shutdown')
+
+  for peer in peers:
+    if not peer.is_open:
+      continue
+
+    try:
+      await send(
+        peer,
+        ServiceMessage(
+          ServiceMessage.CLOSE
+        )
+      )
+    except:
+      pass
+
+  while True:
+    os.kill(os.getpid(), 2)
+
+async def accept_admin(reader, writer):
+  try:
+    length = struct.unpack('<I', await reader.read(4))[0]
+    request = cbor2.loads(await reader.read(length))
+    response = {}
+
+    if 'store' in request:
+      hash = await save_piece(
+        request['store']['piece']
+      )
+
+      response['hash'] = hash
+    elif 'query' in request:
+      piece = await query(
+        request['query']['hash']
+      )
+
+      if piece:
+        response['piece'] = piece
+    elif 'shutdown' in request:
+      delay = int(request['shutdown']['delay'])
+
+      await logger.info(f'Requested shutdown in {delay}sec.')
+
+      asyncio.get_event_loop().create_task(
+        shutdown(delay)
+      )
+    else:
+      raise Error('unrecognized command')
+  except Exception as e:
+    await logger.error(f'Process request on admin socket: {e}')
+
+  response = cbor2.dumps(response)
+  
+  writer.write(struct.pack('<I', len(response)))
+  writer.write(response)
+
+  await writer.drain()
+
+async def listen_admin():
+  server = await asyncio.start_unix_server(
+    accept_admin,
+    config['AdminSocketPath'],
+  )
+
+  async with server:
+    await server.serve_forever()
+
+async def main():
+  global config
+
+  if len(sys.argv) < 2:
+    print(f'usage: {sys.argv[0]} <config.conf>')
+
+    return
+
+  try:
+    async with aiofiles.open(
+      sys.argv[1],
+      'r'
+    ) as f:
+      config = hjson.loads(await f.read())
+  except Exception as e:
+    await logger.error(f'Load configuration `{sys.argv[1]}\': {e}')
+
+    return
+
+  if not await aiofiles.os.path.isdir(config['StoragePath']):
+    await aiofiles.os.mkdir(config['StoragePath'])
+
+  asyncio.get_event_loop().create_task(
+    watcher()
+  )
+
+  for address in config['Peers']:
+    await dial(address)
+
+  asyncio.get_event_loop().create_task(
+    listen_admin()
+  )
+
+  await listen()
+
+try:
+  asyncio.run(main())
+except KeyboardInterrupt:
+  print('Interrupted')

+ 372 - 0
byafnctl.py

@@ -0,0 +1,372 @@
+import re
+import os
+import os.path
+import sys
+import time
+import struct
+import socket
+import pathlib
+import logging
+import argparse
+
+import tqdm
+import cbor2
+
+from Crypto.Hash import SHA256
+from Crypto.Cipher import Salsa20
+from Crypto.Random import get_random_bytes
+
+logging.basicConfig(
+  format="%(asctime)s %(levelname)s: %(message)s", 
+  level=logging.INFO
+)
+
+class Metafile:
+  HEADER = b'\x80BYAFN-METAFILE\x00\x00'
+  HEADER_LEN = len(HEADER)
+
+  def __init__(self, filename, size, checksum, pieces, key=None):
+    self.filename = filename
+    self.size = size
+    self.checksum = checksum
+    self.pieces = pieces
+    self.key = key
+
+  def save(self, f):
+    f.write(Metafile.HEADER)
+
+    filename = self.filename.encode('UTF-8')
+    filename_len = len(filename)
+  
+    f.write(struct.pack('!I', filename_len))
+    f.write(filename)
+
+    f.write(struct.pack('!L', self.size))
+
+    f.write(self.checksum)
+
+    if self.key:
+      f.write(b'Y')
+      f.write(self.key)
+    else:
+      f.write(b'N')
+      
+    pieces_count = len(self.pieces)
+
+    f.write(struct.pack('!L', pieces_count))
+    
+    for hash in self.pieces:
+      f.write(hash)
+
+  @staticmethod
+  def load(f):
+    header = f.read(Metafile.HEADER_LEN)
+    if header != Metafile.HEADER:
+      raise ValueError
+
+    filename_len = f.read(4)
+    filename_len = struct.unpack('!I', filename_len)[0]
+
+    filename = f.read(filename_len)
+    if len(filename) != filename_len:
+      raise ValueError
+
+    filename = filename.decode('UTF-8')
+    filename = pathlib.Path(filename).name
+
+    size = f.read(4)
+    size = struct.unpack('!I', size)[0]
+
+    checksum = f.read(32)
+    if len(checksum) != 32:
+      raise ValueError
+
+    is_encrypted = f.read(1)
+    if is_encrypted == b'Y':
+      key = f.read(40)
+      if len(key) != 40:
+        raise ValueError
+    elif is_encrypted == b'N':
+      key = None
+    else:
+      raise ValueError
+
+    pieces_count = f.read(4)
+    pieces_count = struct.unpack('!L', pieces_count)[0]
+
+    if pieces_count < 1:
+      raise ValueError
+
+    pieces = []
+
+    while pieces_count > 0:
+      piece_hash = f.read(32)
+      if len(piece_hash) != 32:
+        raise ValueError
+
+      pieces.append(piece_hash)
+
+      pieces_count -= 1
+
+    return Metafile(filename, size, checksum, pieces, key)
+
+def sha256(data):
+  hash = SHA256.new()
+  hash.update(data)
+
+  return hash.digest()
+
+def salsa20_create_encryptor():
+  key = get_random_bytes(32)
+  cipher = Salsa20.new(key=key)
+
+  return (cipher.nonce + key, cipher)
+
+def salsa20_create_decryptor(key):
+  if len(key) != 40:
+    raise ValueError
+
+  cipher = Salsa20.new(key=key[8:], nonce=key[:8])
+  
+  return cipher
+
+def parse_file_size(size):
+  units = {
+    'b': 1, 
+    'kb': 10e3,
+    'mb': 10e6
+  }
+
+  match = re.match(r'(\d+)([km]b)', size.strip().lower())
+  if not match:
+    raise ValueError
+
+  size, unit = int(match.group(1)), units[match.group(2)]
+
+  return int(size * unit)
+
+def send_command(**command):
+  try:
+    conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+    conn.connect(args.admin_socket_path)
+
+    command = cbor2.dumps(command)
+    
+    conn.send(struct.pack('<I', len(command)))
+    conn.send(command)
+
+    length = struct.unpack('<I', conn.recv(4))[0]
+    response = conn.recv(length)
+    response = cbor2.loads(response)
+
+    conn.close()
+  except Exception as e:
+    logging.error(f'Contacting the admin socket `{args.admin_socket_path}\': {e}')
+
+    sys.exit(1)
+
+  return response
+
+parser = argparse.ArgumentParser()
+parser.add_argument(
+  '-G', '--genconf',
+  help='Generate configuration file',
+  action='store_true'
+)
+
+parser.add_argument(
+  '--listen-address',
+  help='Set ListenAddress (use together with --genconf)',
+  type=str, default='0.0.0.0'
+)
+
+parser.add_argument(
+  '-p', '--listen-port',
+  help='Set ListenPort (use together with --genconf)',
+  type=int, default=42424
+)
+
+parser.add_argument(
+  '-s', '--storage-path',
+  help='Set StoragePath (use together with --genconf)',
+  type=str, default='./storage'
+)
+
+parser.add_argument(
+  '-a', '--admin-socket-path',
+  help='Set AdminSocketPath (use together with --genconf, --share or --query)',
+  type=str, default='./adminsocket'
+)
+
+parser.add_argument(
+  '-S', '--share',
+  help='Share a file',
+  type=argparse.FileType('rb')
+)
+
+parser.add_argument(
+  '-e', '--encrypt',
+  help='Encrypt file before sharing (use together with --share)',
+  action='store_true'
+)
+
+parser.add_argument(
+  '-Q', '--query',
+  help='Query a file',
+  type=argparse.FileType('rb')
+)
+
+parser.add_argument(
+  '-o', '--output',
+  help='Set output file',
+  type=argparse.FileType('wb')
+)
+
+parser.add_argument(
+  '-P', '--piece-size',
+  help='Set piece size',
+  type=parse_file_size, default='16kb'
+)
+
+parser.add_argument(
+  '--shutdown',
+  help='Request graceful shutdown of the peer',
+  type=int
+)
+
+args = parser.parse_args()
+
+if args.genconf:
+  import ecies
+
+  key = ecies.utils.generate_eth_key()
+  pk = key.public_key.to_hex()
+  sk = key.to_hex()
+
+  if not args.output:
+    output = sys.stdout.buffer
+  else:
+    output = args.output
+
+  output.write(bytes(f'''{{
+  Peers: []
+
+  ListenAddress: {args.listen_address}
+  ListenPort: {args.listen_port}
+
+  Key: {pk}
+  Secret: {sk}
+
+  StoragePath: {os.path.abspath(args.storage_path)}
+  AdminSocketPath: {os.path.abspath(args.admin_socket_path)}
+}}''', 'UTF-8'))
+
+if args.share:
+  with args.share as f:
+    filename = pathlib.Path(f.name).name
+
+    f.seek(0, os.SEEK_END)
+    file_size = f.tell()
+    f.seek(0)
+
+    checksum = SHA256.new()
+
+    if args.encrypt:
+      key, cipher = salsa20_create_encryptor()
+
+    pieces_count = max(1, file_size // args.piece_size)
+    progress = tqdm.trange(pieces_count)
+    pieces = []
+    while True:
+      piece = f.read(args.piece_size)
+      if not piece:
+        break
+
+      checksum.update(piece)
+
+      if args.encrypt:
+        piece = cipher.encrypt(piece)
+
+      hash = send_command(store={
+        'piece': piece
+      })['hash']
+
+      pieces.append(hash)
+      progress.update(1)
+
+    metafile = Metafile(
+      filename,
+      file_size,
+      checksum.digest(),
+      pieces,
+      key=key if args.encrypt else None
+    )
+
+    if not args.output:
+      output = open(f'{filename}.byafn', 'wb')
+    else:
+      output = args.output
+
+    metafile.save(output)
+
+    output.close()
+
+if args.query:
+  with args.query as f:
+    metafile = Metafile.load(f)
+
+    if not args.output:
+      output = open(metafile.filename, 'wb')
+    else:
+      output = args.output
+
+    if metafile.key:
+      cipher = salsa20_create_decryptor(metafile.key)
+
+    checksum = SHA256.new()
+
+    progress = tqdm.trange(len(metafile.pieces))
+    for piece in metafile.pieces:
+      interval = 10
+      while True:
+        data = send_command(query={
+          'hash': piece
+        })
+
+        if 'piece' not in data:
+          logging.error(f'Piece `{piece.hex()}\' is not available')
+          logging.info(f'Retrying in {interval}sec.')
+
+          time.sleep(interval)
+
+          if interval < 120:
+            interval += 10
+
+          continue
+
+        break
+
+      piece = data['piece']
+
+      if metafile.key:
+        piece = cipher.decrypt(piece)
+
+      checksum.update(piece)
+
+      if checksum.digest() != metafile.checksum:
+        logging.error('Integrity check failed')
+
+        sys.exit(1)
+
+      output.write(piece)
+
+      progress.update(1)
+
+    output.close()
+
+if args.shutdown:
+  send_command(shutdown={
+    'delay': args.shutdown
+  })
+
+  logging.info('Shutdown command sent')