|
@@ -3,6 +3,7 @@ import time
|
|
|
import random
|
|
|
import string
|
|
|
import asyncio
|
|
|
+from datetime import datetime, timedelta
|
|
|
from collections import namedtuple
|
|
|
|
|
|
from dotenv import load_dotenv
|
|
@@ -38,7 +39,9 @@ load_dotenv()
|
|
|
API_ID = int(os.getenv("API_ID"))
|
|
|
API_HASH = os.getenv("API_HASH")
|
|
|
BOT_TOKEN = os.getenv("BOT_TOKEN")
|
|
|
-ALLOWED_GROUPS = set(map(lambda cid: int("-100" + cid), os.getenv("ALLOWED_GROUPS").split(",")))
|
|
|
+ALLOWED_GROUPS = set(
|
|
|
+ map(lambda cid: int("-100" + cid), os.getenv("ALLOWED_GROUPS").split(","))
|
|
|
+)
|
|
|
CAPTCHA_WAIT_TIME = int(os.getenv("CAPTCHA_WAIT_TIME", 80))
|
|
|
MAX_CAPTCHA_ATTEMPTS = int(os.getenv("MAX_CAPTCHA_ATTEMPTS", 8))
|
|
|
|
|
@@ -51,6 +54,7 @@ class PendingUser:
|
|
|
self.peer_id = peer_id
|
|
|
self.user_id = user_id
|
|
|
|
|
|
+ self.text = None
|
|
|
self.message_id = None
|
|
|
|
|
|
self.captcha = None
|
|
@@ -66,22 +70,38 @@ class PendingUser:
|
|
|
async def start(self, event):
|
|
|
await self.update_captcha()
|
|
|
|
|
|
+ self.text = f"Welcome, {get_link_to_user(event.user)}! Please write a message with the **four latin letters (case insensitive)** that appear in this image to verify that you are a human. If you don't solve the captcha in **{CAPTCHA_WAIT_TIME}** seconds, you will be automatically kicked out of the group."
|
|
|
+
|
|
|
message = await bot.send_message(
|
|
|
event.chat_id,
|
|
|
- f"Welcome, {get_link_to_user(event.user)}! Please write a message with the letters that appear in this image to verify that you are a human. If you don't solve the captcha in {CAPTCHA_WAIT_TIME} seconds, you will be automatically kicked out of the group.",
|
|
|
+ self.text,
|
|
|
file=self.captcha_im,
|
|
|
)
|
|
|
|
|
|
self.message_id = message
|
|
|
|
|
|
- async def update_captcha(self):
|
|
|
- self.captcha = "".join(random.sample(string.ascii_uppercase, 4))
|
|
|
- self.captcha_im = ImageCaptcha().generate(self.captcha)
|
|
|
+ async def update_captcha(self, only_text=False):
|
|
|
+ if not only_text:
|
|
|
+ self.captcha = "".join(random.sample(string.ascii_uppercase, 4))
|
|
|
+ self.captcha_im = ImageCaptcha().generate(self.captcha)
|
|
|
|
|
|
- self.captcha_im.name = "captcha.png"
|
|
|
+ self.captcha_im.name = "captcha.png"
|
|
|
|
|
|
if self.message_id is not None:
|
|
|
- await bot.edit_message(self.peer_id, self.message_id, file=self.captcha_im)
|
|
|
+ try:
|
|
|
+ attempts = MAX_CAPTCHA_ATTEMPTS - self.retries
|
|
|
+
|
|
|
+ message = await bot.edit_message(
|
|
|
+ self.peer_id,
|
|
|
+ self.message_id,
|
|
|
+ text=self.text
|
|
|
+ + f"\n\nYou have **{attempts}** attempt{'' if attempts == 1 else 's'} left.",
|
|
|
+ file=None if only_text else self.captcha_im,
|
|
|
+ )
|
|
|
+
|
|
|
+ self.message_id = message.id
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
|
|
|
async def check_captcha(self, captcha):
|
|
|
captcha = captcha.strip().upper()
|
|
@@ -101,7 +121,12 @@ class PendingUser:
|
|
|
if self.retries > MAX_CAPTCHA_ATTEMPTS:
|
|
|
await self.kick()
|
|
|
else:
|
|
|
- await self.update_captcha()
|
|
|
+ await self.update_captcha(
|
|
|
+ only_text=not (
|
|
|
+ len(captcha) == 4
|
|
|
+ and all(map(lambda c: c in string.ascii_uppercase, captcha))
|
|
|
+ )
|
|
|
+ )
|
|
|
|
|
|
return False
|
|
|
|
|
@@ -124,7 +149,10 @@ class PendingUser:
|
|
|
EditBannedRequest(
|
|
|
self.peer_id,
|
|
|
self.user_id,
|
|
|
- ChatBannedRights(until_date=None, view_messages=True),
|
|
|
+ ChatBannedRights(
|
|
|
+ until_date=datetime.now() + timedelta(minutes=5),
|
|
|
+ view_messages=True,
|
|
|
+ ),
|
|
|
)
|
|
|
)
|
|
|
except Exception:
|
|
@@ -164,7 +192,9 @@ async def handler(event):
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
- await pending_user.check_captcha(event.text)
|
|
|
+ text = getattr(event, "text", "")
|
|
|
+
|
|
|
+ await pending_user.check_captcha(text)
|
|
|
|
|
|
|
|
|
async def check_pending():
|