|
@@ -189,7 +189,8 @@ class Peer:
|
|
|
self.send_lock = asyncio.Lock()
|
|
|
self.receive_lock = asyncio.Lock()
|
|
|
self.ticks = 0
|
|
|
- self.last_message_ts = -1
|
|
|
+ self.last_message_in_ts = -1
|
|
|
+ self.last_message_out_ts = -1
|
|
|
|
|
|
async def write(peer, data):
|
|
|
peer.writer.write(data)
|
|
@@ -203,7 +204,22 @@ async def read(peer, size):
|
|
|
|
|
|
return buffer
|
|
|
|
|
|
+async def cooldown(peer, out=True):
|
|
|
+ if out:
|
|
|
+ diff = time.time() - peer.last_message_out_ts
|
|
|
+ if diff < RATELIMIT:
|
|
|
+ await asyncio.sleep(RATELIMIT - diff)
|
|
|
+
|
|
|
+ peer.last_message_out_ts = time.time()
|
|
|
+ else:
|
|
|
+ if time.time() - peer.last_message_in_ts < RATELIMIT:
|
|
|
+ raise Error(f'rate limit (={RATELIMIT}s.) exceeded for incoming messages')
|
|
|
+
|
|
|
+ peer.last_message_in_ts = time.time()
|
|
|
+
|
|
|
async def send(peer, message):
|
|
|
+ await cooldown(peer)
|
|
|
+
|
|
|
if type(message) is ServiceMessage:
|
|
|
buffer = bytes([message.kind])
|
|
|
|
|
@@ -218,8 +234,6 @@ async def send(peer, message):
|
|
|
async with peer.send_lock:
|
|
|
await write(peer, buffer)
|
|
|
|
|
|
- await asyncio.sleep(RATELIMIT)
|
|
|
-
|
|
|
return
|
|
|
|
|
|
payload = b''
|
|
@@ -253,15 +267,10 @@ async def send(peer, message):
|
|
|
chunk
|
|
|
)
|
|
|
|
|
|
- await asyncio.sleep(RATELIMIT)
|
|
|
-
|
|
|
async def receive(peer):
|
|
|
- async with peer.receive_lock:
|
|
|
- if time.time() - peer.last_message_ts < RATELIMIT:
|
|
|
- raise Error(f'rate limit (={RATELIMIT}s.) exceeded for incoming messages')
|
|
|
-
|
|
|
- peer.last_message_ts = time.time()
|
|
|
+ await cooldown(peer, False)
|
|
|
|
|
|
+ async with peer.receive_lock:
|
|
|
kind = (await read(peer, 1))[0]
|
|
|
|
|
|
if kind != 0xff:
|