yafn.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506
  1. import os
  2. import os.path
  3. import sys
  4. import uuid
  5. import time
  6. import zlib
  7. import random
  8. import struct
  9. import socket
  10. import pathlib
  11. import threading
  12. import urllib.request
  13. import zmq
  14. import cbor2
  15. from Crypto.Hash import SHA256, RIPEMD160
  16. from Crypto.Cipher import AES, Salsa20, PKCS1_OAEP
  17. from Crypto.Random import get_random_bytes
  18. from Crypto.PublicKey import RSA
  19. from . import log
  20. ALPHABET = '286USqFxzKsmMBP9c4TOyECkefQZ7otHAjYh5aN1WiLRprnIGwgulV0dDX'
  21. def encode_uid(uid):
  22. result = ''
  23. n = int.from_bytes(uid, 'big')
  24. while n >= 58:
  25. m = n % 58
  26. n //= 58
  27. result += ALPHABET[m]
  28. if n > 0:
  29. result += ALPHABET[n]
  30. return result
  31. class Timer(threading.Thread):
  32. def __init__(self, interval, callback, preflight=0):
  33. super().__init__()
  34. self.interval = interval
  35. self.callback = callback
  36. self.preflight = preflight
  37. self.event = threading.Event()
  38. def run(self):
  39. if self.preflight:
  40. time.sleep(self.preflight)
  41. self.callback()
  42. while not self.event.wait(self.interval):
  43. self.callback()
  44. def spawn_thread(target, *args):
  45. thread = threading.Thread(
  46. target=target,
  47. args=args
  48. )
  49. thread.daemon = True
  50. thread.start()
  51. def adler32(data):
  52. return zlib.adler32(data) & 0xffffffff
  53. def ripemd160(data):
  54. hash = RIPEMD160.new()
  55. hash.update(data)
  56. return hash.digest()
  57. def sha256(data):
  58. hash = SHA256.new()
  59. hash.update(data)
  60. return hash.digest()
  61. def generate_uid(pubkey):
  62. pubkey = pubkey.public_key()
  63. n = pubkey.n.to_bytes(128, 'big')
  64. e = pubkey.e.to_bytes(3, 'big')
  65. return ripemd160(sha256(n + e))
  66. def RSA_generate_keypair():
  67. keypair = RSA.generate(1024)
  68. return keypair
  69. def RSA_encrypt(data, key):
  70. key = key.public_key()
  71. cipher = PKCS1_OAEP.new(key)
  72. return cipher.encrypt(data)
  73. def RSA_decrypt(data, key):
  74. cipher = PKCS1_OAEP.new(key)
  75. return cipher.decrypt(data)
  76. def AES_encrypt(data, key):
  77. cipher = AES.new(key, AES.MODE_GCM)
  78. nonce = cipher.nonce
  79. data, tag = cipher.encrypt_and_digest(data)
  80. return (data, nonce, tag)
  81. def AES_decrypt(data, key, nonce, tag):
  82. cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
  83. data = cipher.decrypt(data)
  84. cipher.verify(tag)
  85. return data
  86. def RSA_AES_hybrid_encrypt(data, public_key):
  87. public_key = public_key.public_key()
  88. cipher = PKCS1_OAEP.new(public_key)
  89. key = get_random_bytes(32)
  90. data, nonce, tag = AES_encrypt(data, key)
  91. key = key + nonce + tag
  92. key = cipher.encrypt(key)
  93. return (data, key)
  94. def RSA_AES_hybrid_decrypt(data, key, private_key):
  95. cipher = PKCS1_OAEP.new(private_key)
  96. key = cipher.decrypt(key)
  97. if len(key) != 64:
  98. raise ValueError
  99. key, nonce, tag = key[:32], key[32:48], key[48:64]
  100. return AES_decrypt(data, key, nonce, tag)
  101. def salsa20_create_encryptor():
  102. key = get_random_bytes(32)
  103. cipher = Salsa20.new(key=key)
  104. return (cipher.nonce + key, cipher)
  105. def salsa20_create_decryptor(key):
  106. if len(key) != 40:
  107. raise ValueError
  108. cipher = Salsa20.new(key=key[8:], nonce=key[:8])
  109. return cipher
  110. def chunked(data, size):
  111. return [data[i:i+size] for i in range(0, len(data), size)]
  112. class MessageKind:
  113. PING = b'P'
  114. PONG = b'O'
  115. BYE = b'B'
  116. QUERY = b'Q'
  117. QUERY_HIT = b'H'
  118. NOTAVAIL = b'N'
  119. CRAWL = b'C'
  120. MAP = b'M'
  121. ANNOUNCE = b'A'
  122. class Message:
  123. def __init__(self, kind, uid=None, **fields):
  124. self.kind = kind
  125. self.uid = uid if uid else get_random_bytes(16)
  126. self.__dict__.update(**fields)
  127. self._timestamp = time.time()
  128. @property
  129. def age(self):
  130. return time.time() - self._timestamp
  131. class Part:
  132. def __init__(self, data, size, n, checksum):
  133. self.data = data
  134. self.size = size
  135. self.n = n
  136. self.checksum = checksum
  137. class Piece:
  138. HEADER = b'\x80YAFN-PIECE\x00\x00'
  139. HEADER_SIZE = len(HEADER)
  140. PIECE_SIZE = 512 * 1024
  141. PART_SIZE = 1024
  142. def __init__(self, timestamp, hash, parts, parts_count):
  143. self.timestamp = timestamp
  144. self.hash = hash
  145. self.parts = parts
  146. self.parts_count = parts_count
  147. def join(self):
  148. data = b''
  149. for part in self.parts:
  150. data += part.data
  151. return data
  152. def dump(self, fd):
  153. fd.write(Piece.HEADER)
  154. fd.write(self.hash)
  155. fd.write(struct.pack('!Q', int(self.timestamp)))
  156. fd.write(struct.pack('!H', self.parts_count))
  157. for order, part in zip(range(self.parts_count), self.parts):
  158. fd.write(struct.pack('!H', order))
  159. fd.write(struct.pack('!L', part.checksum))
  160. fd.write(struct.pack('!H', part.size))
  161. fd.write(part.data)
  162. @staticmethod
  163. def load(fd):
  164. header = fd.read(Piece.HEADER_SIZE)
  165. if header != Piece.HEADER:
  166. raise ValueError
  167. hash = fd.read(32)
  168. if len(hash) != 32:
  169. raise ValueError
  170. timestamp = fd.read(8)
  171. timestamp = struct.unpack('!Q', timestamp)[0]
  172. parts_count = fd.read(2)
  173. parts_count = struct.unpack('!H', parts_count)[0]
  174. if parts_count < 1:
  175. raise ValueError
  176. actual_hash = SHA256.new()
  177. parts = []
  178. for n in range(parts_count):
  179. order = fd.read(2)
  180. order = struct.unpack('!H', order)[0]
  181. if order != n:
  182. raise ValueError
  183. checksum = fd.read(4)
  184. checksum = struct.unpack('!L', checksum)[0]
  185. size = fd.read(2)
  186. size = struct.unpack('!H', size)[0]
  187. if size < 1:
  188. raise ValueError
  189. data = fd.read(size)
  190. if len(data) != size:
  191. raise ValueError
  192. if adler32(data) != checksum:
  193. raise ValueError
  194. parts.append(Part(
  195. data,
  196. size,
  197. order,
  198. checksum
  199. ))
  200. actual_hash.update(data)
  201. if len(parts) != parts_count:
  202. raise ValueError
  203. if actual_hash.digest() != hash:
  204. raise ValueError
  205. return Piece(
  206. timestamp,
  207. hash,
  208. parts,
  209. parts_count
  210. )
  211. @staticmethod
  212. def create(data):
  213. hash = SHA256.new()
  214. hash.update(data)
  215. parts = chunked(data, Piece.PART_SIZE)
  216. return Piece(
  217. time.time(),
  218. hash.digest(),
  219. [
  220. Part(
  221. part,
  222. len(part),
  223. n,
  224. adler32(part)
  225. ) for n, part in zip(range(len(parts)), parts)
  226. ],
  227. len(parts)
  228. )
  229. class Storage:
  230. YAFN_DIR = os.path.join(
  231. pathlib.Path.home(),
  232. 'yafn'
  233. )
  234. STORAGE_DIR = os.path.join(
  235. YAFN_DIR,
  236. '.storage'
  237. )
  238. KEYPAIR_FILE = os.path.join(
  239. YAFN_DIR,
  240. 'keypair.pem',
  241. )
  242. TRACKERS_FILE = os.path.join(
  243. YAFN_DIR,
  244. 'trackers.txt'
  245. )
  246. @staticmethod
  247. def setup():
  248. if not os.path.isdir(Storage.YAFN_DIR):
  249. os.mkdir(Storage.YAFN_DIR)
  250. if not os.path.isdir(Storage.STORAGE_DIR):
  251. os.mkdir(Storage.STORAGE_DIR)
  252. if not os.path.isfile(Storage.TRACKERS_FILE):
  253. open(Storage.TRACKERS_FILE, 'w').close()
  254. @staticmethod
  255. def get_keypair():
  256. if not os.path.isfile(Storage.KEYPAIR_FILE):
  257. keypair = RSA_generate_keypair()
  258. with open(Storage.KEYPAIR_FILE, 'wb') as f:
  259. f.write(keypair.export_key())
  260. return keypair
  261. with open(Storage.KEYPAIR_FILE, 'rb') as f:
  262. keypair = RSA.import_key(f.read())
  263. return keypair
  264. @staticmethod
  265. def get_trackers():
  266. if not os.path.isfile(Storage.TRACKERS_FILE):
  267. return []
  268. with open(Storage.TRACKERS_FILE, 'r') as f:
  269. lines = f.readlines()
  270. lines = map(lambda line: line.strip(), lines)
  271. lines = filter(bool, lines)
  272. return list(set(lines))
  273. @staticmethod
  274. def find_piece(hash):
  275. path = os.path.join(Storage.STORAGE_DIR, hash.hex())
  276. if not os.path.isfile(path):
  277. return None
  278. with open(path, 'rb') as f:
  279. try:
  280. piece = Piece.load(f)
  281. if piece.hash != hash:
  282. os.remove(path)
  283. return None
  284. return piece
  285. except:
  286. return None
  287. @staticmethod
  288. def save_piece(piece):
  289. if type(piece) is not Piece:
  290. piece = Piece.create(piece)
  291. path = os.path.join(Storage.STORAGE_DIR, piece.hash.hex())
  292. with open(path, 'wb') as f:
  293. piece.dump(f)
  294. return piece
  295. @staticmethod
  296. def list_pieces():
  297. pieces = set()
  298. files = os.listdir(Storage.STORAGE_DIR)
  299. for file in files:
  300. piece = Storage.find_piece(bytes.fromhex(file))
  301. pieces.add(piece.hash)
  302. return pieces
  303. class YAFNError(Exception): pass
  304. class CachedMessage:
  305. def __init__(self, kind, uid):
  306. self.kind = kind
  307. self.uid = uid
  308. self._timestamp = time.time()
  309. @property
  310. def age(self):
  311. return time.time() - self._timestamp
  312. class Map:
  313. def __init__(self, uid, submaps):
  314. self.uid = uid
  315. self.submaps = submaps
  316. self.submaps_count = len(submaps)
  317. def dump(self):
  318. data = b''
  319. data += self.uid
  320. data += struct.pack('!H', self.submaps_count)
  321. for submap in self.submaps:
  322. data += submap.dump()
  323. return data
  324. def split(self):
  325. data = self.dump()
  326. return chunked(data, Piece.PART_SIZE)
  327. @staticmethod
  328. def _drain(data):
  329. if len(data) < 20 + 2:
  330. raise ValueError
  331. uid = data[:20]
  332. submaps_count = struct.unpack('!H', data[20:22])[0]
  333. data = data[22:]
  334. submaps = []
  335. while submaps_count > 0:
  336. data, submap = Map._drain(data)
  337. submaps.append(submap)
  338. submaps_count -= 1
  339. return data, Map(uid, submaps)
  340. @staticmethod
  341. def create(data):
  342. data, map = Map._drain(data)
  343. if len(data) > 0:
  344. raise ValueError
  345. return map
  346. class Connection:
  347. def __init__(self, peer, socket, addr, is_inbound=True, reconnect_attempts=0):
  348. self._peer = peer
  349. self._socket = socket
  350. self.addr = addr
  351. self.is_inbound = is_inbound
  352. self._reconnect_attempts = reconnect_attempts
  353. self.uid = None
  354. self.is_alive = True
  355. self.near_pieces = set()
  356. self._near_pieces_purge_timestamp = time.time()
  357. self._send_lock = threading.Lock()
  358. self._dont_reconnect = False
  359. self._timestamp = time.time()
  360. self._queries_pending = 0
  361. self._messages_pending = 0
  362. self._queue = []
  363. self._cache = []
  364. self._remote_pubkey = None
  365. self._watchdog = Timer(30 if self.is_inbound else 60, self._watch)
  366. self._watchdog.start()
  367. self._announcer = Timer(60*10, self.announce)
  368. self._announcer.start()
  369. def _watch(self):
  370. if not self.is_alive:
  371. return
  372. if not self._remote_pubkey:
  373. self.close()
  374. return
  375. attempts = 0
  376. while True:
  377. attempts += 1
  378. if attempts > 3:
  379. self.close()
  380. return
  381. message = Message(MessageKind.PING)
  382. try:
  383. self.send(message)
  384. except:
  385. continue
  386. response = self.wait(
  387. lambda m: m.uid == message.uid and m.kind == MessageKind.PONG,
  388. timeout=15
  389. )
  390. if not response:
  391. continue
  392. break
  393. self._cache = [
  394. message for message in self._cache if message.age < 60*60
  395. ]
  396. self._queue = [
  397. message for message in self._queue if message.age < 60*5
  398. ]
  399. if time.time() - self._near_pieces_purge_timestamp > 60*60*8:
  400. self.near_pieces = set()
  401. self._near_pieces_purge_timestamp = time.time()
  402. def _recvall(self, size):
  403. buffer = b''
  404. while len(buffer) != size:
  405. buffer += self._socket.recv(size - len(buffer))
  406. return buffer
  407. def _sendall(self, data):
  408. self._socket.sendall(data)
  409. @property
  410. def age(self):
  411. return time.time() - self._timestamp
  412. @property
  413. def is_ok(self):
  414. if self.is_alive:
  415. return True
  416. try:
  417. self._peer.connections.remove(self)
  418. except:
  419. pass
  420. return False
  421. def is_cached(self, message):
  422. for other_message in self._cache:
  423. if message.uid == other_message.uid:
  424. return True
  425. return False
  426. def cache(self, message):
  427. if not self.is_cached(message):
  428. self._cache.append(CachedMessage(message.kind, message.uid))
  429. def wait(self, tester, timeout=60):
  430. start_ts = time.time()
  431. while time.time() - start_ts < timeout:
  432. queue = self._queue.copy()
  433. for message in queue:
  434. if tester(message):
  435. try:
  436. self._queue.remove(message)
  437. except:
  438. pass
  439. return message
  440. if not self.is_alive:
  441. return None
  442. time.sleep(1)
  443. def close(self):
  444. if not self.is_alive:
  445. return
  446. try:
  447. self.send(Message(MessageKind.BYE))
  448. except:
  449. pass
  450. finally:
  451. try:
  452. self._socket.close()
  453. except:
  454. pass
  455. self._watchdog.event.set()
  456. self._announcer.event.set()
  457. try:
  458. self._peer.connections.remove(self)
  459. except:
  460. pass
  461. self.is_alive = False
  462. if not self.is_inbound:
  463. log.warning(f'`{self.addr}` ({self.encoded_uid if self.uid else "n/a"}): Connection lost.')
  464. if self._dont_reconnect or self._reconnect_attempts >= 5:
  465. return
  466. self._reconnect_attempts += 1
  467. time.sleep(10 * self._reconnect_attempts)
  468. self._peer.connect_to(self.addr, self._reconnect_attempts)
  469. def query(self, hash, mid, ttl=7):
  470. self._queries_pending += 1
  471. try:
  472. self.send(
  473. Message(
  474. MessageKind.QUERY,
  475. uid=mid,
  476. hash=hash,
  477. ttl=ttl
  478. )
  479. )
  480. response = self.wait(
  481. lambda m: m.uid == mid and m.kind in (MessageKind.QUERY_HIT, MessageKind.NOTAVAIL),
  482. timeout=60*8
  483. )
  484. if response and response.kind == MessageKind.QUERY_HIT:
  485. if response.piece.hash != hash:
  486. return None
  487. return response.piece
  488. except:
  489. pass
  490. finally:
  491. self._queries_pending -= 1
  492. def crawl(self, mid, ttl=7):
  493. try:
  494. self.send(
  495. Message(
  496. MessageKind.CRAWL,
  497. uid=mid,
  498. ttl=ttl
  499. )
  500. )
  501. response = self.wait(
  502. lambda m: m.uid == mid and m.kind in (MessageKind.MAP, MessageKind.NOTAVAIL),
  503. timeout=60*10
  504. )
  505. if response and response.kind == MessageKind.MAP:
  506. return response.map
  507. except:
  508. pass
  509. def announce(self):
  510. if not self.is_alive:
  511. return
  512. pieces = Storage.list_pieces()
  513. if not pieces:
  514. return
  515. try:
  516. self.send(
  517. Message(
  518. MessageKind.ANNOUNCE,
  519. pieces=pieces
  520. )
  521. )
  522. except:
  523. pass
  524. def handshake(self):
  525. self._sendall(b'YAFN HELLO' + self._peer.pubkey)
  526. data = self._recvall(10 + 162)
  527. head = data[:10]
  528. if head != b'YAFN HELLO':
  529. raise YAFNError
  530. remote_pubkey = RSA.import_key(data[10:]).public_key()
  531. remote_uid = generate_uid(remote_pubkey)
  532. if remote_uid == self._peer.uid or remote_uid in self._peer.connections:
  533. self._dont_reconnect = True
  534. raise YAFNError
  535. random_data = get_random_bytes(16)
  536. data = RSA_encrypt(random_data, remote_pubkey)
  537. self._sendall(b'CHECK' + data)
  538. data = self._recvall(5 + 128)
  539. head = data[:5]
  540. if head != b'CHECK':
  541. raise YAFNError
  542. remote_data = data[5:]
  543. remote_data = RSA_decrypt(remote_data, self._peer.keypair)
  544. self._sendall(b'CHECKED' + remote_data)
  545. data = self._recvall(7 + 16)
  546. head = data[:7]
  547. if head != b'CHECKED':
  548. raise YAFNError
  549. if data[7:] != random_data:
  550. raise YAFNError
  551. self._sendall(b'FINISH')
  552. head = self._recvall(6)
  553. if head != b'FINISH':
  554. raise YAFNError
  555. self._remote_pubkey = remote_pubkey
  556. self.uid = remote_uid
  557. self.encoded_uid = encode_uid(self.uid)
  558. if not self.is_inbound:
  559. log.info(f'`{self.addr}` ({self.encoded_uid}): Connection successful.')
  560. self._reconnect_attempts = 0
  561. def _receive_parts(self, key, parts_count):
  562. data = b''
  563. total = 0
  564. cipher = salsa20_create_decryptor(key)
  565. try:
  566. self._socket.settimeout(5)
  567. while total < parts_count:
  568. head = self._recvall(6)
  569. checksum = head[:4]
  570. checksum = struct.unpack('!L', checksum)[0]
  571. part_size = head[4:6]
  572. part_size = struct.unpack('!H', part_size)[0]
  573. part = self._recvall(part_size)
  574. part = cipher.decrypt(part)
  575. if adler32(part) != checksum:
  576. raise YAFNError
  577. data += part
  578. total += 1
  579. finally:
  580. self._socket.settimeout(None)
  581. return data
  582. def receive(self):
  583. head = self._recvall(4 + 2 + 16 + 128)
  584. checksum = head[:4]
  585. size = head[4:6]
  586. uid = head[6:22]
  587. key = head[22:150]
  588. checksum = struct.unpack('!L', checksum)[0]
  589. size = struct.unpack('!H', size)[0]
  590. if size > 1024:
  591. raise YAFNError
  592. message = self._recvall(size)
  593. message = RSA_AES_hybrid_decrypt(message, key, self._peer.keypair)
  594. if adler32(message) != checksum:
  595. raise YAFNError
  596. kind = message[:1]
  597. payload = message[1:]
  598. payload_size = len(payload)
  599. fields = {}
  600. if kind in (
  601. MessageKind.PING,
  602. MessageKind.PONG,
  603. MessageKind.BYE,
  604. MessageKind.NOTAVAIL
  605. ):
  606. if payload_size != 0:
  607. raise YAFNError
  608. elif kind == MessageKind.QUERY:
  609. if payload_size != 32 + 1:
  610. raise YAFNError
  611. hash = payload[:32]
  612. ttl = payload[32]
  613. if ttl > 7:
  614. raise YAFNError
  615. fields['hash'] = hash
  616. fields['ttl'] = ttl
  617. elif kind == MessageKind.QUERY_HIT:
  618. if payload_size != 40 + 2:
  619. raise YAFNError
  620. key = payload[:40]
  621. parts_count = struct.unpack('!H', payload[40:42])[0]
  622. data = self._receive_parts(key, parts_count)
  623. fields['piece'] = Piece.create(data)
  624. elif kind == MessageKind.CRAWL:
  625. if payload_size != 1:
  626. raise YAFNError
  627. ttl = payload[0]
  628. if ttl > 7:
  629. raise YAFNError
  630. fields['ttl'] = ttl
  631. elif kind == MessageKind.MAP:
  632. if payload_size != 40 + 2:
  633. raise YAFNError
  634. key = payload[:40]
  635. parts_count = struct.unpack('!H', payload[40:42])[0]
  636. data = self._receive_parts(key, parts_count)
  637. fields['map'] = Map.create(data)
  638. elif kind == MessageKind.ANNOUNCE:
  639. if payload_size != 40 + 4:
  640. raise YAFNError
  641. key = payload[:40]
  642. parts_count = struct.unpack('!L', payload[40:44])[0]
  643. data = self._receive_parts(key, parts_count)
  644. if len(data) < 32 or len(data) % 32 != 0:
  645. raise YAFNError
  646. fields['pieces'] = set(chunked(data, 32))
  647. else:
  648. raise YAFNError
  649. return Message(kind, uid, **fields)
  650. def _send_parts(self, cipher, parts):
  651. for part in parts:
  652. checksum = adler32(part)
  653. data = cipher.encrypt(part)
  654. self._sendall(struct.pack('!L', checksum) + struct.pack('!H', len(data)))
  655. self._sendall(data)
  656. def send(self, message):
  657. head = b''
  658. payload = message.kind
  659. if message.kind == MessageKind.QUERY:
  660. payload += message.hash
  661. payload += bytes([message.ttl])
  662. elif message.kind == MessageKind.CRAWL:
  663. payload += bytes([message.ttl])
  664. elif message.kind in (
  665. MessageKind.QUERY_HIT,
  666. MessageKind.MAP,
  667. MessageKind.ANNOUNCE
  668. ):
  669. key, cipher = salsa20_create_encryptor()
  670. if message.kind == MessageKind.MAP:
  671. map_parts = message.map.split()
  672. elif message.kind == MessageKind.ANNOUNCE:
  673. splitted_pieces = b''.join(message.pieces)
  674. splitted_pieces = chunked(splitted_pieces, Piece.PART_SIZE)
  675. payload += key
  676. if message.kind == MessageKind.ANNOUNCE:
  677. payload += struct.pack('!L', len(splitted_pieces))
  678. else:
  679. payload += struct.pack('!H', len(map_parts) if message.kind == MessageKind.MAP else message.piece.parts_count)
  680. checksum = adler32(payload)
  681. payload, key = RSA_AES_hybrid_encrypt(payload, self._remote_pubkey)
  682. head += struct.pack('!L', checksum)
  683. head += struct.pack('!H', len(payload))
  684. head += message.uid
  685. head += key
  686. try:
  687. self._send_lock.acquire()
  688. self._sendall(head + payload)
  689. if message.kind == MessageKind.QUERY_HIT:
  690. self._send_parts(cipher, [
  691. part.data for part in message.piece.parts
  692. ])
  693. elif message.kind == MessageKind.MAP:
  694. self._send_parts(cipher, map_parts)
  695. elif message.kind == MessageKind.ANNOUNCE:
  696. self._send_parts(cipher, splitted_pieces)
  697. finally:
  698. self._send_lock.release()
  699. def _process(self, message):
  700. try:
  701. if message.kind == MessageKind.PING:
  702. self.send(
  703. Message(
  704. MessageKind.PONG,
  705. uid=message.uid
  706. )
  707. )
  708. elif message.kind == MessageKind.BYE:
  709. self.close()
  710. elif message.kind == MessageKind.QUERY:
  711. if self._queries_pending >= 3 or message.ttl < 1 or self.is_cached(message):
  712. self.send(
  713. Message(
  714. MessageKind.NOTAVAIL,
  715. uid=message.uid
  716. )
  717. )
  718. return
  719. self.cache(message)
  720. piece = self._peer.query(message.hash, message.uid, message.ttl - 1, self)
  721. if piece:
  722. self.send(
  723. Message(
  724. MessageKind.QUERY_HIT,
  725. uid=message.uid,
  726. piece=piece
  727. )
  728. )
  729. return
  730. self.send(
  731. Message(
  732. MessageKind.NOTAVAIL,
  733. uid=message.uid
  734. )
  735. )
  736. elif message.kind == MessageKind.CRAWL:
  737. if message.ttl <= 1:
  738. self.send(
  739. Message(
  740. MessageKind.NOTAVAIL,
  741. uid=message.uid
  742. )
  743. )
  744. return
  745. if self.is_cached(message):
  746. self.send(
  747. Message(
  748. MessageKind.MAP,
  749. uid=message.uid,
  750. map=self._peer.crawl(None, flat=True)
  751. )
  752. )
  753. return
  754. self.cache(message)
  755. map = self._peer.crawl(message.uid, message.ttl - 1, self)
  756. self.send(
  757. Message(
  758. MessageKind.MAP,
  759. uid=message.uid,
  760. map=map
  761. )
  762. )
  763. elif message.kind == MessageKind.ANNOUNCE:
  764. self.near_pieces.union(message.pieces)
  765. except:
  766. self.close()
  767. def listen(self):
  768. try:
  769. self.handshake()
  770. except:
  771. if not self.is_inbound:
  772. log.error(f'`{self.addr}` ({self.encoded_uid}): Handshake problem.')
  773. self.close()
  774. return
  775. self._peer.connections.append(self)
  776. self.announce()
  777. while self.is_alive:
  778. try:
  779. message = self.receive()
  780. except:
  781. self.close()
  782. return
  783. if message.kind in (
  784. MessageKind.PING,
  785. MessageKind.ANNOUNCE
  786. ):
  787. if self.is_cached(message):
  788. continue
  789. self.cache(message)
  790. elif message.kind in (
  791. MessageKind.PONG,
  792. MessageKind.QUERY_HIT,
  793. MessageKind.NOTAVAIL,
  794. MessageKind.MAP
  795. ):
  796. self._queue.append(message)
  797. continue
  798. spawn_thread(self._process, message)
  799. class Interface:
  800. def __init__(self, conn, peer=None):
  801. self._conn = conn
  802. self._peer = peer
  803. def close(self):
  804. self._conn.close()
  805. def _contact(self, command, data=b''):
  806. self._conn.send(command + data)
  807. response = self._conn.recv()
  808. return (response[:4], response[4:])
  809. def save(self, piece):
  810. response, data = self._contact(b'SAVE', piece)
  811. if response == b'SAVD':
  812. return data
  813. def query(self, hash):
  814. response, data = self._contact(b'FIND', hash)
  815. if response == b'QHIT':
  816. return data
  817. def crawl(self):
  818. response, data = self._contact(b'CRWL')
  819. if response == b'NMAP':
  820. return Map.create(data)
  821. def announce(self):
  822. response, _ = self._contact(b'ANON')
  823. return response == b'DONE'
  824. def listen(self):
  825. while True:
  826. query = self._conn.recv()
  827. if len(query) < 4:
  828. continue
  829. command = query[:4]
  830. data = query[4:]
  831. try:
  832. if command == b'SAVE':
  833. if len(data) < 1 or len(data) > Piece.PIECE_SIZE:
  834. raise YAFNError
  835. key, cipher = salsa20_create_encryptor()
  836. checksum = adler32(data)
  837. data = zlib.compress(data, 9)[2:-4]
  838. data = cipher.encrypt(data)
  839. piece = Storage.save_piece(data)
  840. self._conn.send(b'SAVD' + piece.hash + key + struct.pack('!L', checksum))
  841. elif command == b'FIND':
  842. if len(data) != 32 + 40 + 4:
  843. raise YAFNError
  844. hash = data[:32]
  845. key = data[32:72]
  846. checksum = struct.unpack('!L', data[72:76])[0]
  847. piece = self._peer.query(hash, uuid.uuid1().bytes)
  848. if piece:
  849. data = piece.join()
  850. cipher = salsa20_create_decryptor(key)
  851. data = cipher.decrypt(data)
  852. data = zlib.decompress(data, -15)
  853. if adler32(data) != checksum:
  854. raise YAFNError
  855. self._conn.send(b'QHIT' + data)
  856. continue
  857. self._conn.send(b'NOTA')
  858. elif command == b'CRWL':
  859. if len(data) != 0:
  860. raise YAFNError
  861. map = self._peer.crawl(uuid.uuid1().bytes)
  862. self._conn.send(b'NMAP' + map.dump())
  863. elif command == b'ANON':
  864. if len(data) != 0:
  865. raise YAFNError
  866. self._peer.announce()
  867. self._conn.send(b'DONE')
  868. else:
  869. raise YAFNError
  870. except:
  871. self._conn.send(b'FAIL')
  872. @staticmethod
  873. def create(peer):
  874. context = zmq.Context()
  875. conn = context.socket(zmq.REP)
  876. while True:
  877. try:
  878. conn.bind('tcp://*:49872')
  879. except:
  880. log.error('Failed to bind the interface.')
  881. time.sleep(10)
  882. continue
  883. break
  884. return Interface(conn, peer)
  885. @staticmethod
  886. def connect():
  887. context = zmq.Context()
  888. conn = context.socket(zmq.REQ)
  889. conn.connect('tcp://127.0.0.1:49872')
  890. return Interface(conn)
  891. class Tracker:
  892. def __init__(self, host):
  893. self.host = host
  894. self.disabled_for = 0
  895. self.disabled = False
  896. def _request(self, remote_addr=None):
  897. try:
  898. request = urllib.request.Request(f'http://{self.host}:49873/track')
  899. if remote_addr:
  900. request.add_header('YAFN-Remote-Address', remote_addr)
  901. with urllib.request.urlopen(request, timeout=30) as resp:
  902. code = resp.getcode()
  903. data = resp.read()
  904. return (code, data)
  905. except:
  906. return None
  907. return None
  908. def _contact(self, remote_addr=None):
  909. resp = self._request(remote_addr)
  910. if not resp or resp[0] != 200:
  911. return None
  912. try:
  913. data = cbor2.loads(resp[1])
  914. if 'remote_addr' not in data\
  915. or type(data['remote_addr']) is not str:
  916. return None
  917. if 'is_accessible' not in data\
  918. or type(data['is_accessible']) is not bool:
  919. return None
  920. if 'peers' not in data\
  921. or type(data['peers']) is not list\
  922. or not all(map(
  923. lambda peer: type(peer) is dict
  924. and 'address' in peer and type(peer['address']) is str
  925. and 'uid' in peer and type(peer['uid']) is bytes and len(peer['uid']) == 20
  926. and 'latency' in peer and type(peer['latency']) is int and peer['latency'] >= 0
  927. and 'last_check' in peer and type(peer['last_check']) is int and peer['last_check'] >= 0,
  928. data['peers']
  929. )):
  930. return None
  931. return data
  932. except:
  933. return None
  934. def contact(self, remote_addr=None):
  935. if self.disabled:
  936. if self.disabled_for > 0:
  937. self.disabled_for -= 1
  938. return None
  939. else:
  940. self.disabled = False
  941. data = self._contact(remote_addr)
  942. if not data:
  943. self.disabled_for += 1
  944. if self.disabled_for > 3:
  945. self.disabled = True
  946. return data
  947. class Peer:
  948. def __init__(self):
  949. self.connections = []
  950. self.trackers = []
  951. self.remote_addr = None
  952. Storage.setup()
  953. self.keypair = Storage.get_keypair()
  954. self.uid = generate_uid(self.keypair)
  955. self.pubkey = self.keypair.public_key().export_key('DER')
  956. trackers = Storage.get_trackers()
  957. for host in trackers:
  958. self.trackers.append(Tracker(host))
  959. def _discover_peers(self):
  960. peers = set()
  961. for tracker in self.trackers:
  962. data = tracker.contact(self.remote_addr)
  963. if data is None:
  964. if tracker.disabled_for <= 1:
  965. log.error(f'Tracker `{tracker.host}` contact problem.')
  966. continue
  967. if data['is_accessible']:
  968. if not self.remote_addr:
  969. self.remote_addr = data['remote_addr']
  970. log.info(f'Remote address: `{self.remote_addr}`.')
  971. peers = peers.union({
  972. peer['address'] for peer in data['peers']
  973. })
  974. return peers
  975. def _serve(self):
  976. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  977. while True:
  978. try:
  979. s.bind(('0.0.0.0', 49871))
  980. s.listen()
  981. except:
  982. log.error('Failed to bind the port.')
  983. time.sleep(15)
  984. continue
  985. break
  986. log.info('Ready to accept incoming connections.')
  987. while True:
  988. try:
  989. conn, remote = s.accept()
  990. except:
  991. continue
  992. conn = Connection(
  993. self,
  994. conn,
  995. remote[0]
  996. )
  997. spawn_thread(conn.listen)
  998. def _connect_to(self, host, reconnect_attempts=0):
  999. log.info(f'`{host}`: Connecting...')
  1000. try:
  1001. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  1002. s.connect((host, 49871))
  1003. except:
  1004. log.error(f'`{host}`: Connection problem.')
  1005. if reconnect_attempts < 5:
  1006. time.sleep(10 * max(reconnect_attempts, 1))
  1007. self.connect_to(host, reconnect_attempts + 1)
  1008. return
  1009. conn = Connection(
  1010. self,
  1011. s,
  1012. host,
  1013. False,
  1014. reconnect_attempts
  1015. )
  1016. conn.listen()
  1017. def _connect_to_everyone(self):
  1018. if not self.trackers:
  1019. return
  1020. addrs = [
  1021. conn.addr for conn in self.connections.copy()
  1022. ]
  1023. peers = self._discover_peers()
  1024. peers = [
  1025. addr for addr in peers if addr not in addrs
  1026. ]
  1027. if not peers:
  1028. return
  1029. log.info(f'Discovered {len(peers)} peer{"s" if len(peers) != 1 else ""}.')
  1030. for addr in peers:
  1031. self.connect_to(addr)
  1032. def query(self, hash, mid, ttl=7, came_from=None):
  1033. piece = Storage.find_piece(hash)
  1034. if piece:
  1035. return piece
  1036. if ttl < 1:
  1037. return None
  1038. connections = self.connections.copy()
  1039. inbound_connections = []
  1040. outbound_connections = []
  1041. for conn in connections:
  1042. if came_from and came_from.uid == conn.uid:
  1043. continue
  1044. if not conn.is_ok:
  1045. continue
  1046. if conn.is_inbound:
  1047. inbound_connections.append(conn)
  1048. else:
  1049. outbound_connections.append(conn)
  1050. random.shuffle(inbound_connections)
  1051. random.shuffle(outbound_connections)
  1052. if came_from and came_from.is_inbound:
  1053. connections = inbound_connections + outbound_connections
  1054. else:
  1055. connections = outbound_connections + inbound_connections
  1056. for conn in connections:
  1057. if not conn.is_ok:
  1058. continue
  1059. if hash in conn.near_pieces:
  1060. piece = conn.query(hash, mid, ttl)
  1061. if piece:
  1062. return Storage.save_piece(piece)
  1063. for conn in connections:
  1064. if not conn.is_ok:
  1065. continue
  1066. piece = conn.query(hash, mid, ttl)
  1067. if piece:
  1068. return Storage.save_piece(piece)
  1069. def crawl(self, mid, ttl=7, came_from=None, flat=False):
  1070. connections = self.connections.copy()
  1071. submaps = []
  1072. for conn in connections:
  1073. if came_from and came_from.uid == conn.uid:
  1074. continue
  1075. if not conn.is_ok:
  1076. continue
  1077. if flat:
  1078. submaps.append(Map(conn.uid, []))
  1079. else:
  1080. map = conn.crawl(mid, ttl)
  1081. if map:
  1082. submaps.append(map)
  1083. return Map(self.uid, submaps)
  1084. def announce(self):
  1085. connections = self.connections.copy()
  1086. for conn in connections:
  1087. if not conn.is_ok:
  1088. continue
  1089. conn.announce()
  1090. def connect_to(self, addr, reconnect_attempts=0):
  1091. if addr == self.remote_addr:
  1092. return
  1093. for conn in self.connections.copy():
  1094. if conn.addr == addr:
  1095. return
  1096. spawn_thread(self._connect_to, addr, reconnect_attempts)
  1097. def start(self, remote_addr=None):
  1098. log.info('Starting up.')
  1099. log.info(f'Peer UID: {encode_uid(self.uid)}.')
  1100. if remote_addr:
  1101. self.remote_addr = remote_addr
  1102. log.info(f'Remote address: `{self.remote_addr}`.')
  1103. spawn_thread(self._serve)
  1104. Timer(
  1105. 60*5,
  1106. self._connect_to_everyone,
  1107. 2
  1108. ).start()
  1109. Interface.create(self).listen()