main.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. import os
  2. import time
  3. import random
  4. import string
  5. import asyncio
  6. from datetime import datetime, timedelta
  7. from collections import namedtuple
  8. import aiohttp
  9. from dotenv import load_dotenv
  10. from telethon import TelegramClient, events
  11. from telethon.utils import get_peer_id, get_display_name
  12. from telethon.tl.functions.channels import EditBannedRequest
  13. from telethon.tl.types import ChatBannedRights
  14. from captcha.image import ImageCaptcha
  15. def get_user_name(user):
  16. full_name = get_display_name(user)
  17. if not full_name:
  18. full_name = user.username
  19. if not full_name:
  20. full_name = "user"
  21. return full_name
  22. def get_link_to_user(user):
  23. full_name = get_user_name(user)
  24. if user.username:
  25. return f"[{full_name}](@{user.username})"
  26. return f"[{full_name}](tg://user?id={user.id})"
  27. load_dotenv()
  28. API_ID = int(os.getenv("API_ID"))
  29. API_HASH = os.getenv("API_HASH")
  30. BOT_TOKEN = os.getenv("BOT_TOKEN")
  31. ALLOWED_GROUPS = set(
  32. map(lambda cid: int("-100" + cid), os.getenv("ALLOWED_GROUPS").split(","))
  33. )
  34. CAPTCHA_WAIT_TIME = int(os.getenv("CAPTCHA_WAIT_TIME", 80))
  35. MAX_CAPTCHA_ATTEMPTS = int(os.getenv("MAX_CAPTCHA_ATTEMPTS", 8))
  36. USE_LOLS_API = os.getenv("USE_LOLS_API", "yes") == "yes"
  37. bot = TelegramClient("captcha_bot", API_ID, API_HASH).start(bot_token=BOT_TOKEN)
  38. pending = {}
  39. async def lols_check(user_id):
  40. try:
  41. async with aiohttp.ClientSession() as session:
  42. async with session.get(f"https://api.lols.bot/account?id={user_id}") as response:
  43. data = await response.json()
  44. banned = data.get("banned", False)
  45. scammer = data.get("scammer", False)
  46. return not banned and not scammer
  47. except Exception:
  48. pass
  49. return True
  50. class PendingUser:
  51. def __init__(self, peer_id, user_id):
  52. self.peer_id = peer_id
  53. self.user_id = user_id
  54. self.text = None
  55. self.message_id = None
  56. self.captcha = None
  57. self.captcha_im = None
  58. self.join_ts = time.time()
  59. self.retries = 0
  60. @property
  61. def expired(self):
  62. return time.time() - self.join_ts >= CAPTCHA_WAIT_TIME
  63. async def start(self, event):
  64. if USE_LOLS_API:
  65. if not await lols_check(self.user_id):
  66. await self.kick(forever=True)
  67. return
  68. await self.update_captcha()
  69. self.text = f"Welcome, {get_link_to_user(event.user)}! Please write a message with the **four latin letters (case insensitive)** that appear in this image to verify that you are a human. If you don't solve the captcha in **{CAPTCHA_WAIT_TIME}** seconds, you will be automatically kicked out of the group."
  70. message = await bot.send_message(
  71. event.chat_id,
  72. self.text,
  73. file=self.captcha_im,
  74. )
  75. self.message_id = message
  76. async def update_captcha(self, only_text=False):
  77. if not only_text:
  78. self.captcha = "".join(random.sample(string.ascii_uppercase, 4))
  79. self.captcha_im = ImageCaptcha().generate(self.captcha)
  80. self.captcha_im.name = "captcha.png"
  81. if self.message_id is not None:
  82. try:
  83. attempts = MAX_CAPTCHA_ATTEMPTS - self.retries
  84. message = await bot.edit_message(
  85. self.peer_id,
  86. self.message_id,
  87. text=self.text
  88. + f"\n\nYou have **{attempts}** attempt{'' if attempts == 1 else 's'} left.",
  89. file=None if only_text else self.captcha_im,
  90. )
  91. self.message_id = message.id
  92. except Exception:
  93. pass
  94. async def check_captcha(self, captcha):
  95. captcha = captcha.strip().upper()
  96. if captcha == self.captcha:
  97. del pending[self.user_id]
  98. try:
  99. await bot.delete_messages(self.peer_id, self.message_id)
  100. except Exception:
  101. pass
  102. return True
  103. self.retries += 1
  104. if self.retries > MAX_CAPTCHA_ATTEMPTS:
  105. await self.kick()
  106. else:
  107. await self.update_captcha(
  108. only_text=not (
  109. len(captcha) == 4
  110. and all(map(lambda c: c in string.ascii_uppercase, captcha))
  111. )
  112. )
  113. return False
  114. async def remove(self):
  115. del pending[self.user_id]
  116. if self.message_id is not None:
  117. try:
  118. await bot.delete_messages(self.peer_id, self.message_id)
  119. except Exception:
  120. pass
  121. async def kick(self, forever=False):
  122. if self.user_id not in pending:
  123. return
  124. await self.remove()
  125. try:
  126. await bot(
  127. EditBannedRequest(
  128. self.peer_id,
  129. self.user_id,
  130. ChatBannedRights(
  131. until_date=None if forever else datetime.now() + timedelta(minutes=5),
  132. view_messages=True,
  133. ),
  134. )
  135. )
  136. except Exception:
  137. pass
  138. @bot.on(events.ChatAction)
  139. async def handler(event):
  140. if event.chat_id not in ALLOWED_GROUPS:
  141. return
  142. if (event.user_left or event.user_kicked) and event.user_id in pending:
  143. await pending[event.user_id].remove()
  144. return
  145. if not event.user_joined:
  146. return
  147. pending[event.user_id] = PendingUser(peer_id=event.chat_id, user_id=event.user_id)
  148. await pending[event.user_id].start(event)
  149. @bot.on(events.NewMessage)
  150. async def handler(event):
  151. peer_id = get_peer_id(event.peer_id)
  152. if peer_id not in ALLOWED_GROUPS:
  153. return
  154. pending_user = pending.get(event.sender.id)
  155. if not pending_user:
  156. return
  157. try:
  158. await event.delete()
  159. except Exception:
  160. pass
  161. if pending_user.message_id is None:
  162. return
  163. text = getattr(event, "text", "")
  164. await pending_user.check_captcha(text)
  165. async def check_pending():
  166. while True:
  167. for user_id in dict(pending):
  168. pending_user = pending[user_id]
  169. if pending_user.expired:
  170. await pending_user.kick()
  171. await asyncio.sleep(1)
  172. if __name__ == "__main__":
  173. bot.loop.create_task(check_pending())
  174. bot.start()
  175. bot.run_until_disconnected()