txlyre 2 месяцев назад
Сommit
11de2469f2
3 измененных файлов с 183 добавлено и 0 удалено
  1. 4 0
      .gitignore
  2. 177 0
      main.py
  3. 2 0
      requirements.txt

+ 4 - 0
.gitignore

@@ -0,0 +1,4 @@
+__pycache__
+.venv
+*.session
+.env

+ 177 - 0
main.py

@@ -0,0 +1,177 @@
+import os
+import time
+import random
+import string
+import asyncio
+from collections import namedtuple
+
+from dotenv import load_dotenv
+from telethon import TelegramClient, events
+from telethon.utils import get_peer_id, get_display_name
+from telethon.tl.functions.channels import EditBannedRequest
+from telethon.tl.types import ChatBannedRights
+from captcha.image import ImageCaptcha
+
+
+def get_user_name(user):
+    full_name = get_display_name(user)
+    if not full_name:
+        full_name = user.username
+
+    if not full_name:
+        full_name = "user"
+
+    return full_name
+
+
+def get_link_to_user(user):
+    full_name = get_user_name(user)
+
+    if user.username:
+        return f"[{full_name}](@{user.username})"
+
+    return f"[{full_name}](tg://user?id={user.id})"
+
+
+load_dotenv()
+
+API_ID = int(os.getenv("API_ID"))
+API_HASH = os.getenv("API_HASH")
+BOT_TOKEN = os.getenv("BOT_TOKEN")
+ALLOWED_GROUPS = set(map(int, os.getenv("ALLOWED_GROUPS").split(",")))
+CAPTCHA_WAIT_TIME = int(os.getenv("CAPTCHA_WAIT_TIME", 80))
+MAX_CAPTCHA_ATTEMPTS = int(os.getenv("MAX_CAPTCHA_ATTEMPTS", 8))
+
+bot = TelegramClient("captcha_bot", API_ID, API_HASH).start(bot_token=BOT_TOKEN)
+pending = {}
+
+
+class PendingUser:
+    def __init__(self, peer_id, user_id):
+        self.peer_id = peer_id
+        self.user_id = user_id
+
+        self.message_id = None
+
+        self.captcha = None
+        self.captcha_im = None
+
+        self.join_ts = time.time()
+        self.retries = 0
+
+    @property
+    def expired(self):
+        return time.time() - self.join_ts >= CAPTCHA_WAIT_TIME
+
+    async def start(self, event):
+        await self.update_captcha()
+
+        message = await bot.send_message(
+            event.chat_id,
+            f"Welcome, {get_link_to_user(event.user)}! Please write a message with the letters that appear in this image to verify that you are a human. If you don't solve the captcha in {CAPTCHA_WAIT_TIME} seconds, you will be automatically kicked out of the group.",
+            file=self.captcha_im,
+        )
+
+        self.message_id = message
+
+    async def update_captcha(self):
+        self.captcha = "".join(random.sample(string.ascii_uppercase, 4))
+        self.captcha_im = ImageCaptcha().generate(self.captcha)
+
+        self.captcha_im.name = "captcha.png"
+
+        if self.message_id is not None:
+            await bot.edit_message(self.peer_id, self.message_id, file=self.captcha_im)
+
+    async def check_captcha(self, captcha):
+        captcha = captcha.strip().upper()
+
+        if captcha == self.captcha:
+            del pending[self.user_id]
+
+            try:
+                await bot.delete_messages(self.peer_id, self.message_id)
+            except Exception:
+                pass
+
+            return True
+
+        self.retries += 1
+
+        if self.retries > MAX_CAPTCHA_ATTEMPTS:
+            await self.kick()
+        else:
+            await self.update_captcha()
+
+        return False
+
+    async def remove(self):
+        del pending[self.user_id]
+
+        try:
+            await bot.delete_messages(self.peer_id, self.message_id)
+        except Exception:
+            pass
+
+    async def kick(self):
+        if self.user_id not in pending:
+            return
+
+        await self.remove()
+
+        try:
+            await bot(
+                EditBannedRequest(
+                    self.peer_id,
+                    self.user_id,
+                    ChatBannedRights(until_date=None, view_messages=True),
+                )
+            )
+        except Exception:
+            pass
+
+
[email protected](events.ChatAction)
+async def handler(event):
+    if (event.user_left or event.user_kicked) and event.user_id in pending:
+        await pending[event.user_id].remove()
+
+        return
+
+    if not event.user_joined:
+        return
+
+    pending[event.user_id] = PendingUser(peer_id=event.chat_id, user_id=event.user_id)
+
+    await pending[event.user_id].start(event)
+
+
[email protected](events.NewMessage)
+async def handler(event):
+    pending_user = pending.get(event.sender.id)
+    if not pending_user:
+        return
+
+    try:
+        await event.delete()
+    except Exception:
+        pass
+
+    await pending_user.check_captcha(event.text)
+
+
+async def check_pending():
+    while True:
+        for user_id in dict(pending):
+            pending_user = pending[user_id]
+
+            if pending_user.expired:
+                await pending_user.kick()
+
+        await asyncio.sleep(1)
+
+
+if __name__ == "__main__":
+    bot.loop.create_task(check_pending())
+    bot.start()
+    bot.run_until_disconnected()

+ 2 - 0
requirements.txt

@@ -0,0 +1,2 @@
+python-dotenv
+telethon