byafn.py 17 KB


  1. import os
  2. import sys
  3. import time
  4. import random
  5. import struct
  6. import asyncio
  7. from collections import namedtuple
  8. import zstd
  9. import cbor2
  10. import ecies
  11. import hjson
  12. import aiofiles
  13. import aiologger
  14. try:
  15. import aiofiles.os
  16. except AttributeError:
  17. os.link = os.symlink
  18. import aiofiles.os
  19. from Crypto.Hash import SHA256
  20. from Crypto.Random import get_random_bytes
  21. PINGS_COUNT = 3
  22. PING_TIMEOUT = 10
  23. HANDSHAKE_TIMEOUT = 10
  24. BROADCAST_TIMEOUT = 300
  25. HEARTBEAT = 10
  26. WATCHER_INTERVAL = 30
  27. CACHE_LIFETIME = 3600
  28. MAX_TTL = 7
  29. MAX_PAYLOAD_SIZE = 1024*1024*64
  30. CHUNK_SIZE = 512
  31. COOLDOWN = 0.1
  32. MAX_PIECE_SIZE = 1024*1024*10
  33. config = {}
  34. peers = []
  35. cache = []
  36. logger = aiologger.Logger.with_default_handlers(
  37. formatter=aiologger.formatters.base.Formatter(
  38. fmt='%(asctime)s %(levelname)s: %(message)s'
  39. )
  40. )
  41. def sha256(data):
  42. hash = SHA256.new()
  43. hash.update(data)
  44. return hash.digest()
  45. def chunks(l, n):
  46. for i in range(0, len(l), n):
  47. yield l[i:i + n]
  48. class Error(Exception): pass
  49. async def cleanup_pieces(age):
  50. count = 0
  51. for filename in aiofiles.os.listdir(
  52. config['StoragePath']
  53. ):
  54. path = os.path.join(
  55. config['StoragePath'],
  56. filename
  57. )
  58. access_time = await aiofiles.os.path.getatime(
  59. path
  60. )
  61. if time.time() - access_time > age:
  62. await logger.info(f'Purging `{filename}\' from the storage.')
  63. await aiofiles.os.remove(
  64. path
  65. )
  66. count += 1
  67. return count
  68. async def is_piece_exists(hash):
  69. if await aiofiles.os.path.isfile(
  70. os.path.join(
  71. config['StoragePath'],
  72. hash.hex()
  73. )
  74. ):
  75. return True
  76. return False
  77. async def save_piece(data, hash=None):
  78. if not hash:
  79. hash = sha256(data)
  80. path = os.path.join(
  81. config['StoragePath'],
  82. hash.hex()
  83. )
  84. async with aiofiles.open(
  85. path,
  86. 'wb'
  87. ) as f:
  88. data = zstd.compress(data)
  89. await f.write(data)
  90. return hash
  91. async def read_piece(hash):
  92. path = os.path.join(
  93. config['StoragePath'],
  94. hash.hex()
  95. )
  96. async with aiofiles.open(
  97. path,
  98. 'rb'
  99. ) as f:
  100. data = await f.read()
  101. data = zstd.decompress(data)
  102. if sha256(data) != hash:
  103. await aiofiles.os.remove(path)
  104. raise ValueError('piece actual checksum (i.e. on disk) and expected checksum do not match')
  105. os.utime(path)
  106. return data
  107. CachedMessage = namedtuple(
  108. 'CachedMessage',
  109. 'kind uid ts'
  110. )
  111. class Message:
  112. QUERY = 0xa
  113. QUERY_HIT = 0xb
  114. NOT_AVAILABLE = 0xc
  115. def __init__(self, kind, uid=None, **fields):
  116. self.kind = kind
  117. self.uid = uid if uid else get_random_bytes(16)
  118. self.fields = fields
  119. if self.kind == Message.QUERY:
  120. if 'hash' not in self.fields\
  121. or type(self.fields['hash']) != bytes\
  122. or len(self.fields['hash']) != 32:
  123. raise ValueError('malformed `QUERY\' message: illegal or missing `hash\' field')
  124. if 'ttl' not in self.fields\
  125. or type(self.fields['ttl']) != int\
  126. or self.fields['ttl'] < 0:
  127. raise ValueError('malformed `QUERY\' message: illegal or missing `ttl\' field')
  128. elif self.kind == Message.QUERY_HIT:
  129. if 'data' not in self.fields\
  130. or type(self.fields['data']) != bytes\
  131. or len(self.fields['data']) < 1\
  132. or len(self.fields['data']) > MAX_PIECE_SIZE:
  133. raise ValueError('malformed `QUERY_HIT\' message: illegal or missing `data\' field')
  134. elif self.kind == Message.NOT_AVAILABLE:
  135. if self.fields:
  136. raise ValueError('malformed `NOT_AVAILABLE\' message: unexpected payload')
  137. elif self.kind == ServiceMessage.PING:
  138. if self.fields:
  139. raise ValueError('malformed `PING\' message: unexpected payload')
  140. elif self.kind == ServiceMessage.PONG:
  141. if self.fields:
  142. raise ValueError('malformed `PONG\' message: unexpected payload')
  143. elif self.kind == ServiceMessage.CLOSE:
  144. if self.fields:
  145. raise ValueError('malformed `CLOSE\' message: unexpected payload')
  146. def __getattr__(self, field):
  147. if field not in self.fields:
  148. raise Error(f'missing required field `{field}\'')
  149. return self.fields[field]
  150. def cache(self):
  151. for message in cache:
  152. if (
  153. message.uid == self.uid and
  154. message.kind == self.kind
  155. ):
  156. return False
  157. cache.append(
  158. CachedMessage(
  159. kind=self.kind,
  160. uid=self.uid,
  161. ts=time.time()
  162. )
  163. )
  164. return True
  165. def is_response_for(self, message):
  166. if self.uid != message.uid:
  167. return False
  168. if message.kind == ServiceMessage.PING:
  169. return self.kind == ServiceMessage.PONG
  170. elif message.kind == Message.QUERY:
  171. return self.kind in (
  172. Message.QUERY_HIT,
  173. Message.NOT_AVAILABLE
  174. )
  175. return False
  176. class ServiceMessage(Message):
  177. HELLO = 0x0
  178. CHALLENGE = 0x1
  179. ANSWER = 0x2
  180. FINISH = 0x3
  181. PING = 0x4
  182. PONG = 0x5
  183. CLOSE = 0x6
  184. class Peer:
  185. def __init__(self, reader, writer, address=None):
  186. self.reader = reader
  187. self.writer = writer
  188. self.address = address
  189. self.key = None
  190. self.queue = []
  191. self.is_open = True
  192. self.send_lock = asyncio.Lock()
  193. self.receive_lock = asyncio.Lock()
  194. self.last_message_ts = -1
  195. async def write(peer, data):
  196. peer.writer.write(data)
  197. await peer.writer.drain()
  198. async def read(peer, size):
  199. buffer = b''
  200. while len(buffer) < size:
  201. buffer += await peer.reader.read(size - len(buffer))
  202. return buffer
  203. async def send(peer, message):
  204. if type(message) is ServiceMessage:
  205. buffer = bytes([message.kind])
  206. if message.fields:
  207. payload = cbor2.dumps(message.fields)
  208. buffer += struct.pack('<H', len(payload))
  209. buffer += payload
  210. else:
  211. buffer += bytes(2)
  212. async with peer.send_lock:
  213. await write(peer, buffer)
  214. return
  215. payload = b''
  216. chunks_count = 0
  217. if message.fields:
  218. payload = cbor2.dumps(message.fields)
  219. payload = zstd.compress(payload)
  220. payload = ecies.encrypt(
  221. peer.key,
  222. payload
  223. )
  224. chunks_count = max(1, len(payload) // CHUNK_SIZE)
  225. buffer = b'\xff' + ecies.encrypt(
  226. peer.key,
  227. bytes([message.kind]) +
  228. message.uid +
  229. struct.pack('<H', chunks_count)
  230. )
  231. async with peer.send_lock:
  232. await write(peer, buffer)
  233. if chunks_count:
  234. for chunk in chunks(payload, CHUNK_SIZE):
  235. await write(
  236. peer,
  237. struct.pack('<H', len(chunk)) +
  238. chunk
  239. )
  240. async def receive(peer):
  241. async with peer.receive_lock:
  242. kind = (await read(peer, 1))[0]
  243. if kind != 0xff:
  244. if kind > ServiceMessage.CLOSE:
  245. raise Error('unecrypted non-service messages are not allowed')
  246. length = struct.unpack('<H', await read(peer, 2))[0]
  247. if length > MAX_PAYLOAD_SIZE:
  248. raise Error('payload is too large')
  249. payload = {}
  250. if length:
  251. payload = await read(peer, length)
  252. payload = cbor2.loads(payload)
  253. return ServiceMessage(
  254. kind,
  255. **payload
  256. )
  257. head = await read(peer, 116)
  258. head = ecies.decrypt(
  259. config['Secret'],
  260. head
  261. )
  262. kind = head[0]
  263. uid = head[1:17]
  264. chunks_count = struct.unpack('<H', head[17:19])[0]
  265. payload = {}
  266. if chunks_count:
  267. payload = b''
  268. if chunks_count * CHUNK_SIZE > MAX_PAYLOAD_SIZE:
  269. raise Error('payload is too large')
  270. for _ in range(chunks_count):
  271. length = struct.unpack('<H', await read(peer, 2))[0]
  272. if not length or length > CHUNK_SIZE:
  273. raise Error('illegal chunk length')
  274. payload += await read(peer, length)
  275. payload = ecies.decrypt(
  276. config['Secret'],
  277. payload
  278. )
  279. payload = zstd.decompress(payload)
  280. payload = cbor2.loads(payload)
  281. return Message(
  282. kind,
  283. uid=uid,
  284. **payload
  285. )
  286. async def close(peer, gracefully=True):
  287. if not peer.is_open:
  288. return
  289. if gracefully:
  290. try:
  291. await send(
  292. peer,
  293. ServiceMessage(
  294. ServiceMessage.CLOSE
  295. )
  296. )
  297. except:
  298. pass
  299. peer.writer.close()
  300. peer.is_open = False
  301. try:
  302. await asyncio.wait_for(
  303. peer.writer.wait_closed(),
  304. timeout=3
  305. )
  306. except:
  307. pass
  308. async def wait_response(peer, message):
  309. while peer.is_open:
  310. for other_message in peer.queue:
  311. if other_message.is_response_for(message):
  312. peer.queue.remove(other_message)
  313. return other_message
  314. await asyncio.sleep(0)
  315. async def communicate(peer, message, timeout=None):
  316. await send(peer, message)
  317. answer = await asyncio.wait_for(
  318. wait_response(
  319. peer,
  320. message
  321. ),
  322. timeout=timeout
  323. )
  324. if not answer:
  325. raise Error('communication timeout')
  326. return answer
  327. async def ping(peer):
  328. await communicate(
  329. peer,
  330. ServiceMessage(
  331. ServiceMessage.PING
  332. ),
  333. timeout=PING_TIMEOUT
  334. )
  335. async def respond(peer, message, kind, is_service=False, **data):
  336. await send(
  337. peer,
  338. (ServiceMessage if is_service else Message)(
  339. kind,
  340. uid=message.uid,
  341. **data
  342. )
  343. )
  344. async def query(hash, uid=None, ttl=0, filter=None):
  345. if await is_piece_exists(hash):
  346. return await read_piece(hash)
  347. answer = await broadcast(
  348. Message(
  349. Message.QUERY,
  350. uid=uid,
  351. hash=hash,
  352. ttl=ttl
  353. ),
  354. message_filter=lambda answer: answer.kind == Message.QUERY_HIT and sha256(answer.data) == hash,
  355. peer_filter=filter
  356. )
  357. if not answer:
  358. return None
  359. await save_piece(
  360. answer.data,
  361. hash
  362. )
  363. return answer.data
  364. async def broadcast(message, message_filter=None, peer_filter=None):
  365. if message.ttl >= MAX_TTL:
  366. return
  367. message.fields['ttl'] += 1
  368. for peer in random.sample(peers, len(peers)):
  369. if not peer.is_open:
  370. continue
  371. if peer_filter and not peer_filter(peer):
  372. continue
  373. try:
  374. answer = await communicate(
  375. peer,
  376. message,
  377. timeout=BROADCAST_TIMEOUT
  378. )
  379. if message_filter and message_filter(answer):
  380. return answer
  381. except:
  382. continue
  383. async def tick(peer):
  384. while peer.is_open:
  385. attempts = PINGS_COUNT
  386. while True:
  387. try:
  388. await ping(peer)
  389. except:
  390. attempts -= 1
  391. if attempts < 1:
  392. self.close(False)
  393. return
  394. break
  395. await asyncio.sleep(HEARTBEAT)
  396. async def handshake(peer):
  397. await send(
  398. peer,
  399. ServiceMessage(
  400. ServiceMessage.HELLO,
  401. key=config['Key']
  402. )
  403. )
  404. answer = await receive(peer)
  405. if answer.kind != ServiceMessage.HELLO:
  406. raise Error('handshake failed: illegal initial message')
  407. key = answer.key
  408. if key == config['Key']:
  409. raise Error('handshake failed: looping connection')
  410. for peer in peers.copy():
  411. if peer.key == key:
  412. if not peer.is_open:
  413. peers.remove(peer)
  414. continue
  415. raise Error('handshake failed: duplicated connection')
  416. data = get_random_bytes(16)
  417. await send(
  418. peer,
  419. ServiceMessage(
  420. ServiceMessage.CHALLENGE,
  421. data=ecies.encrypt(
  422. key,
  423. data
  424. )
  425. )
  426. )
  427. answer = await receive(peer)
  428. if answer.kind != ServiceMessage.CHALLENGE:
  429. raise Error('handshake failed: illegal challenge initiation message')
  430. await send(
  431. peer,
  432. ServiceMessage(
  433. ServiceMessage.ANSWER,
  434. data=ecies.decrypt(
  435. config['Secret'],
  436. answer.data
  437. )
  438. )
  439. )
  440. answer = await receive(peer)
  441. if answer.kind != ServiceMessage.ANSWER:
  442. raise Error('handshake failed: illegal challenge answer message')
  443. if answer.data != data:
  444. raise Error('handshake failed: challenge data mismatch')
  445. await send(
  446. peer,
  447. ServiceMessage(
  448. ServiceMessage.FINISH
  449. )
  450. )
  451. answer = await receive(peer)
  452. if answer.kind != ServiceMessage.FINISH:
  453. raise Error('handshake failed: illegal finish message')
  454. peer.key = key
  455. async def cooldown(peer):
  456. delta = time.time() - peer.last_message_ts
  457. if delta < COOLDOWN:
  458. await asyncio.sleep(COOLDOWN - delta)
  459. peer.last_message_ts = time.time()
  460. async def serve(peer):
  461. await asyncio.wait_for(
  462. handshake(peer),
  463. timeout=HANDSHAKE_TIMEOUT
  464. )
  465. asyncio.create_task(
  466. tick(peer)
  467. )
  468. if peer.address:
  469. await logger.info(f'Connected to {peer.address}.')
  470. while peer.is_open:
  471. message = await receive(peer)
  472. await cooldown(peer)
  473. if not message.cache():
  474. if message.kind == Message.QUERY:
  475. await respond(
  476. peer,
  477. message,
  478. Message.NOT_AVAILABLE
  479. )
  480. continue
  481. if message.kind in (
  482. ServiceMessage.PONG,
  483. Message.QUERY_HIT,
  484. Message.NOT_AVAILABLE
  485. ):
  486. peer.queue.append(message)
  487. continue
  488. if message.kind == ServiceMessage.PING:
  489. await respond(
  490. peer,
  491. message,
  492. ServiceMessage.PONG,
  493. is_service=True
  494. )
  495. elif message.kind == ServiceMessage.CLOSE:
  496. await close(peer, False)
  497. elif message.kind == Message.QUERY:
  498. answer = await query(
  499. message.hash,
  500. uid=message.uid,
  501. ttl=message.ttl,
  502. filter=lambda other_peer: other_peer.key not in (peer.key, config['Key'])
  503. )
  504. if not answer:
  505. await respond(
  506. peer,
  507. message,
  508. Message.NOT_AVAILABLE
  509. )
  510. continue
  511. await respond(
  512. peer,
  513. message,
  514. Message.QUERY_HIT,
  515. data=answer
  516. )
  517. else:
  518. raise Error(f'unknown message kind: {hex(message.kind)}')
  519. async def accept(reader, writer, address=None):
  520. peer = Peer(reader, writer, address)
  521. peers.append(peer)
  522. try:
  523. await serve(peer)
  524. except Exception as e:
  525. if peer.address:
  526. await logger.warning(f'Connection lost to {peer.address}: {e}')
  527. finally:
  528. await close(peer)
  529. async def dial(address):
  530. parts = address.split(':')
  531. host = ':'.join(parts[:-1])
  532. port = int(parts[-1])
  533. try:
  534. reader, writer = await asyncio.open_connection(
  535. host,
  536. port
  537. )
  538. except Exception as e:
  539. dummy_peer = Peer(None, None, address)
  540. dummy_peer.is_open = False
  541. peers.append(dummy_peer)
  542. await logger.error(f'Dial {address}: {e}')
  543. return
  544. asyncio.create_task(
  545. accept(reader, writer, address)
  546. )
  547. async def listen():
  548. try:
  549. server = await asyncio.start_server(
  550. accept,
  551. config['ListenAddress'],
  552. int(config['ListenPort'])
  553. )
  554. except Exception as e:
  555. await logger.error(f'Bind {config["ListenAddress"]}:{config["ListenPort"]}: {e}')
  556. return
  557. await logger.info(f'Listening at {config["ListenAddress"]}:{config["ListenPort"]}')
  558. async with server:
  559. await server.serve_forever()
  560. async def watcher():
  561. while True:
  562. for peer in peers.copy():
  563. if not peer.is_open:
  564. peers.remove(peer)
  565. if peer.address:
  566. asyncio.create_task(
  567. dial(peer.address)
  568. )
  569. continue
  570. for message in cache.copy():
  571. if time.time() - message.ts > CACHE_LIFETIME:
  572. cache.remove(message)
  573. await asyncio.sleep(WATCHER_INTERVAL)
  574. async def shutdown(delay):
  575. await asyncio.sleep(delay)
  576. await logger.info('Performing graceful shutdown')
  577. for peer in peers:
  578. if not peer.is_open:
  579. continue
  580. try:
  581. await send(
  582. peer,
  583. ServiceMessage(
  584. ServiceMessage.CLOSE
  585. )
  586. )
  587. except:
  588. pass
  589. while True:
  590. os.kill(os.getpid(), 2)
  591. async def accept_admin(reader, writer):
  592. try:
  593. length = struct.unpack('<I', await reader.read(4))[0]
  594. request = cbor2.loads(await reader.read(length))
  595. response = {}
  596. if 'store' in request:
  597. hash = await save_piece(
  598. request['store']['piece']
  599. )
  600. response['hash'] = hash
  601. elif 'query' in request:
  602. piece = await query(
  603. request['query']['hash']
  604. )
  605. if piece:
  606. response['piece'] = piece
  607. elif 'shutdown' in request:
  608. delay = int(request['shutdown']['delay'])
  609. await logger.info(f'Requested shutdown in {delay}sec.')
  610. asyncio.create_task(
  611. shutdown(delay)
  612. )
  613. elif 'cleanup' in request:
  614. age = int(request['cleanup']['age'])
  615. response['removed_count'] = await cleanup_pieces(
  616. age
  617. )
  618. else:
  619. raise Error('unrecognized command')
  620. except Exception as e:
  621. await logger.error(f'Process request on admin socket: {e}')
  622. response = cbor2.dumps(response)
  623. writer.write(struct.pack('<I', len(response)))
  624. writer.write(response)
  625. await writer.drain()
  626. async def listen_admin():
  627. try:
  628. server = await asyncio.start_unix_server(
  629. accept_admin,
  630. config['AdminSocketPath'],
  631. )
  632. except Exception as e:
  633. await logger.error(f'Bind {config["AdminSocketPath"]}: {e}')
  634. return
  635. async with server:
  636. await server.serve_forever()
  637. async def main():
  638. global config
  639. if len(sys.argv) < 2:
  640. print(f'Usage: {sys.argv[0]} <config.conf>')
  641. return
  642. try:
  643. async with aiofiles.open(
  644. sys.argv[1],
  645. 'r'
  646. ) as f:
  647. config = hjson.loads(await f.read())
  648. except Exception as e:
  649. await logger.error(f'Load configuration `{sys.argv[1]}\': {e}')
  650. return
  651. if not await aiofiles.os.path.isdir(
  652. config['StoragePath']
  653. ):
  654. await aiofiles.os.mkdir(
  655. config['StoragePath']
  656. )
  657. asyncio.create_task(
  658. watcher()
  659. )
  660. for address in config['Peers']:
  661. await dial(address)
  662. asyncio.create_task(
  663. listen_admin()
  664. )
  665. await listen()
  666. try:
  667. asyncio.run(main())
  668. except KeyboardInterrupt:
  669. print('Interrupted.')