__main__.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import time
  2. import socket
  3. import asyncio
  4. import ipaddress
  5. import cbor2
  6. from aiohttp import web
  7. from Crypto.Hash import SHA256, RIPEMD160
  8. from Crypto.PublicKey import RSA
  9. def ripemd160(data):
  10. hash = RIPEMD160.new()
  11. hash.update(data)
  12. return hash.digest()
  13. def sha256(data):
  14. hash = SHA256.new()
  15. hash.update(data)
  16. return hash.digest()
  17. def generate_uid(pubkey):
  18. pubkey = RSA.import_key(pubkey)
  19. pubkey = pubkey.public_key()
  20. n = pubkey.n.to_bytes(128, 'big')
  21. e = pubkey.e.to_bytes(3, 'big')
  22. return ripemd160(sha256(n + e))
  23. async def _readall(reader, size):
  24. buffer = b''
  25. while len(buffer) < size:
  26. buffer += await reader.read(size - len(buffer))
  27. return buffer
  28. class Peer:
  29. def __init__(self, addr):
  30. self.addr = addr
  31. self.uid = None
  32. self.last_check = 0
  33. self.failed_checks = 0
  34. self.works = False
  35. self.latency = 0
  36. async def check(self):
  37. self.last_check = time.time()
  38. try:
  39. addr = socket.gethostbyname(self.addr)
  40. addr = ipaddress.ip_address(addr)
  41. if not addr.is_global:
  42. raise Exception
  43. reader, writer = await asyncio.wait_for(
  44. asyncio.open_connection(str(addr), 49871),
  45. timeout=5
  46. )
  47. data = await asyncio.wait_for(
  48. _readall(reader, 172),
  49. timeout=5
  50. )
  51. if len(data) != 172:
  52. raise Exception
  53. if data[:10] != b'YAFN HELLO':
  54. raise Exception
  55. self.uid = generate_uid(data[10:])
  56. writer.write(b'YAFN TRACKER CHECK' + b'\x00' * 154)
  57. await asyncio.wait_for(
  58. writer.drain(),
  59. timeout=5
  60. )
  61. except:
  62. self.failed_checks += 1
  63. self.works = False
  64. return False
  65. finally:
  66. try:
  67. writer.close()
  68. await asyncio.wait_for(
  69. writer.wait_closed(),
  70. timeout=2
  71. )
  72. except:
  73. pass
  74. self.failed_checks = 0
  75. self.works = True
  76. self.latency = time.time() - self.last_check
  77. return True
  78. class Tracker:
  79. def __init__(self):
  80. self.peers = []
  81. async def add(self, addr):
  82. for peer in self.peers:
  83. if peer.addr == addr:
  84. return await peer.check()
  85. peer = Peer(addr)
  86. if await peer.check():
  87. self.peers.append(peer)
  88. return True
  89. return False
  90. async def watch(self):
  91. while True:
  92. to_delete = []
  93. for peer in self.peers:
  94. if not await peer.check():
  95. if peer.failed_checks >= 3:
  96. to_delete.append(peer)
  97. for peer in to_delete:
  98. self.peers.remove(peer)
  99. await asyncio.sleep(60)
  100. tracker = Tracker()
  101. async def request_handler(request):
  102. remote_addr = request.headers.get('YAFN-Remote-Address', request.remote)
  103. is_accessible = await tracker.add(remote_addr)
  104. peers = tracker.peers.copy()
  105. peers = [
  106. {
  107. 'address': peer.addr,
  108. 'uid': peer.uid,
  109. 'latency': int(peer.latency),
  110. 'last_check': int(peer.last_check)
  111. } for peer in peers if peer.works and peer.addr != remote_addr
  112. ]
  113. peers.sort(key=lambda peer: (peer['last_check'], peer['latency']))
  114. return web.Response(
  115. body=cbor2.dumps({
  116. 'remote_addr': remote_addr,
  117. 'is_accessible': is_accessible,
  118. 'peers': peers
  119. })
  120. )
  121. async def main():
  122. asyncio.ensure_future(tracker.watch())
  123. app = web.Application()
  124. app.add_routes([
  125. web.get('/track', request_handler),
  126. ])
  127. return app
  128. web.run_app(main(), port=49873)