main.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. import os
  2. import time
  3. import random
  4. import string
  5. import asyncio
  6. from datetime import datetime, timedelta
  7. from collections import namedtuple
  8. import aiohttp
  9. from dotenv import load_dotenv
  10. from telethon import TelegramClient, events
  11. from telethon.utils import get_peer_id, get_display_name
  12. from telethon.tl.functions.channels import EditBannedRequest
  13. from telethon.tl.types import ChatBannedRights
  14. from captcha.image import ImageCaptcha
  15. def get_user_name(user):
  16. full_name = get_display_name(user)
  17. if not full_name:
  18. full_name = user.username
  19. if not full_name:
  20. full_name = "user"
  21. return full_name
  22. def get_link_to_user(user):
  23. full_name = get_user_name(user)
  24. if user.username:
  25. return f"[{full_name}](@{user.username})"
  26. return f"[{full_name}](tg://user?id={user.id})"
  27. load_dotenv()
  28. API_ID = int(os.getenv("API_ID"))
  29. API_HASH = os.getenv("API_HASH")
  30. BOT_TOKEN = os.getenv("BOT_TOKEN")
  31. ALLOWED_GROUPS = set(
  32. map(lambda cid: int("-100" + cid), os.getenv("ALLOWED_GROUPS").split(","))
  33. )
  34. CAPTCHA_WAIT_TIME = int(os.getenv("CAPTCHA_WAIT_TIME", 80))
  35. MAX_CAPTCHA_ATTEMPTS = int(os.getenv("MAX_CAPTCHA_ATTEMPTS", 8))
  36. USE_LOLS_API = os.getenv("USE_LOLS_API", "yes") == "yes"
  37. bot = TelegramClient("captcha_bot", API_ID, API_HASH).start(bot_token=BOT_TOKEN)
  38. pending = {}
  39. async def lols_check(user_id):
  40. try:
  41. async with aiohttp.ClientSession() as session:
  42. async with session.get(f"https://api.lols.bot/account?id={user_id}") as response:
  43. data = await response.json()
  44. return data["banned"] or data["scammer"]
  45. except Exception:
  46. return False
  47. class PendingUser:
  48. def __init__(self, peer_id, user_id):
  49. self.peer_id = peer_id
  50. self.user_id = user_id
  51. self.text = None
  52. self.message_id = None
  53. self.captcha = None
  54. self.captcha_im = None
  55. self.join_ts = time.time()
  56. self.retries = 0
  57. @property
  58. def expired(self):
  59. return time.time() - self.join_ts >= CAPTCHA_WAIT_TIME
  60. async def start(self, event):
  61. if USE_LOLS_API:
  62. if not await lols_check(self.user_id):
  63. await self.kick(forever=True)
  64. return
  65. await self.update_captcha()
  66. self.text = f"Welcome, {get_link_to_user(event.user)}! Please write a message with the **four latin letters (case insensitive)** that appear in this image to verify that you are a human. If you don't solve the captcha in **{CAPTCHA_WAIT_TIME}** seconds, you will be automatically kicked out of the group."
  67. message = await bot.send_message(
  68. event.chat_id,
  69. self.text,
  70. file=self.captcha_im,
  71. )
  72. self.message_id = message
  73. async def update_captcha(self, only_text=False):
  74. if not only_text:
  75. self.captcha = "".join(random.sample(string.ascii_uppercase, 4))
  76. self.captcha_im = ImageCaptcha().generate(self.captcha)
  77. self.captcha_im.name = "captcha.png"
  78. if self.message_id is not None:
  79. try:
  80. attempts = MAX_CAPTCHA_ATTEMPTS - self.retries
  81. message = await bot.edit_message(
  82. self.peer_id,
  83. self.message_id,
  84. text=self.text
  85. + f"\n\nYou have **{attempts}** attempt{'' if attempts == 1 else 's'} left.",
  86. file=None if only_text else self.captcha_im,
  87. )
  88. self.message_id = message.id
  89. except Exception:
  90. pass
  91. async def check_captcha(self, captcha):
  92. captcha = captcha.strip().upper()
  93. if captcha == self.captcha:
  94. del pending[self.user_id]
  95. try:
  96. await bot.delete_messages(self.peer_id, self.message_id)
  97. except Exception:
  98. pass
  99. return True
  100. self.retries += 1
  101. if self.retries > MAX_CAPTCHA_ATTEMPTS:
  102. await self.kick()
  103. else:
  104. await self.update_captcha(
  105. only_text=not (
  106. len(captcha) == 4
  107. and all(map(lambda c: c in string.ascii_uppercase, captcha))
  108. )
  109. )
  110. return False
  111. async def remove(self):
  112. del pending[self.user_id]
  113. if self.message_id is not None:
  114. try:
  115. await bot.delete_messages(self.peer_id, self.message_id)
  116. except Exception:
  117. pass
  118. async def kick(self, forever=False):
  119. if self.user_id not in pending:
  120. return
  121. await self.remove()
  122. try:
  123. await bot(
  124. EditBannedRequest(
  125. self.peer_id,
  126. self.user_id,
  127. ChatBannedRights(
  128. until_date=None if forever else datetime.now() + timedelta(minutes=5),
  129. view_messages=True,
  130. ),
  131. )
  132. )
  133. except Exception:
  134. pass
  135. @bot.on(events.ChatAction)
  136. async def handler(event):
  137. if event.chat_id not in ALLOWED_GROUPS:
  138. return
  139. if (event.user_left or event.user_kicked) and event.user_id in pending:
  140. await pending[event.user_id].remove()
  141. return
  142. if not event.user_joined:
  143. return
  144. pending[event.user_id] = PendingUser(peer_id=event.chat_id, user_id=event.user_id)
  145. await pending[event.user_id].start(event)
  146. @bot.on(events.NewMessage)
  147. async def handler(event):
  148. peer_id = get_peer_id(event.peer_id)
  149. if peer_id not in ALLOWED_GROUPS:
  150. return
  151. pending_user = pending.get(event.sender.id)
  152. if not pending_user:
  153. return
  154. try:
  155. await event.delete()
  156. except Exception:
  157. pass
  158. text = getattr(event, "text", "")
  159. await pending_user.check_captcha(text)
  160. async def check_pending():
  161. while True:
  162. for user_id in dict(pending):
  163. pending_user = pending[user_id]
  164. if pending_user.expired:
  165. await pending_user.kick()
  166. await asyncio.sleep(1)
  167. if __name__ == "__main__":
  168. bot.loop.create_task(check_pending())
  169. bot.start()
  170. bot.run_until_disconnected()