utils.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. import re
  2. from uuid import uuid4
  3. from enum import IntEnum
  4. from datetime import datetime
  5. from ipaddress import ip_address
  6. from collections import namedtuple
  7. from aiofiles.os import mkdir
  8. from aiofiles.os import path
  9. from telethon.utils import get_display_name
  10. from telethon.tl.types import (
  11. MessageEntityBold,
  12. MessageEntityItalic,
  13. MessageEntityStrike,
  14. MessageEntityCode,
  15. MessageEntityPre,
  16. MessageEntitySpoiler,
  17. MessageEntityUnderline,
  18. MessageEntityUrl,
  19. MessageEntityMention,
  20. MessageEntityBotCommand,
  21. )
  22. Command = namedtuple("Command", "name argc args args_string")
  23. Age = namedtuple("Age", "days_until age age_now date_string")
  24. ANSI_ESCAPE_RE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
  25. class Kind(IntEnum):
  26. CANNOT_APPLY_TO_SELF = 0
  27. CAN_APPLY_TO_SELF = 1
  28. NO_TARGET = 2
  29. NO_TARGET_MAYBE = 3
  30. def parse_command(text):
  31. text = text.strip()
  32. if not text.startswith("/"):
  33. raise ValueError
  34. text = text.split(" ")
  35. if len(text) < 1:
  36. raise ValueError
  37. command = text[0][1:].lower()
  38. if "@" in command:
  39. command = command.split("@")
  40. command = command[0]
  41. args = text[1:]
  42. argc = len(args)
  43. args_string = " ".join(args)
  44. return Command(name=command, argc=argc, args=args, args_string=args_string)
  45. def get_user_name(user):
  46. full_name = get_display_name(user)
  47. if not full_name:
  48. full_name = user.username
  49. if not full_name:
  50. full_name = "?"
  51. return full_name
  52. def get_link_to_user(user):
  53. full_name = get_user_name(user)
  54. if user.username:
  55. return f"[{full_name}](@{user.username})"
  56. return f"[{full_name}](tg://user?id={user.id})"
  57. def is_valid_name(name):
  58. return name.isidentifier()
  59. def make_temporary_filename(ext):
  60. uid = uuid4().hex
  61. return f"tmp_{uid}.{ext}"
  62. CACHE_DIR = "./cache"
  63. async def make_cache_filename(id, ext):
  64. if not await path.isdir(CACHE_DIR):
  65. await mkdir(CACHE_DIR)
  66. return f"{CACHE_DIR}/{id}.{ext}"
  67. def parse_kind(kind):
  68. kind = int(kind)
  69. if kind < 0 or kind > 3:
  70. raise ValueError
  71. return kind
  72. def calculate_age(date):
  73. now = datetime.now().date()
  74. birthday_date = date.replace(year=now.year)
  75. if now > birthday_date:
  76. birthday_date = date.replace(year=now.year + 1)
  77. delta = birthday_date - now
  78. age = (birthday_date - date).days // 365
  79. age_now = age - 1
  80. return Age(
  81. days_until=delta.days,
  82. age=age,
  83. age_now=age_now,
  84. date_string=date.strftime("%d.%m.%Y"),
  85. )
  86. DELIMITERS = {
  87. MessageEntityBold: "**",
  88. MessageEntityItalic: "__",
  89. MessageEntityStrike: "~~",
  90. MessageEntityCode: "`",
  91. MessageEntityPre: "`",
  92. MessageEntityUnderline: "\ue000",
  93. MessageEntitySpoiler: "\ue001",
  94. MessageEntityUrl: "\ue002",
  95. MessageEntityMention: "\ue003",
  96. MessageEntityBotCommand: "\ue003",
  97. }
  98. class LookupTable:
  99. def __init__(self):
  100. self._start = {}
  101. self._end = {}
  102. def insert(self, entity):
  103. delimiter = DELIMITERS.get(type(entity))
  104. if not delimiter:
  105. return
  106. start = entity.offset
  107. end = entity.offset + entity.length
  108. if start in self._start:
  109. self._start[start].append(delimiter)
  110. else:
  111. self._start[start] = [delimiter]
  112. if end in self._end:
  113. self._end[end].insert(0, delimiter)
  114. else:
  115. self._end[end] = [delimiter]
  116. def _lookup(self, position):
  117. if position in self._start:
  118. return "".join(self._start[position])
  119. elif position in self._end:
  120. return "".join(self._end[position])
  121. def process(self, text):
  122. text = list(text) + [""]
  123. result = ""
  124. for position, character in zip(range(len(text)), text):
  125. delimiter = self._lookup(position)
  126. if delimiter:
  127. result += delimiter
  128. result += character
  129. return result
  130. def unparse(text, entities):
  131. table = LookupTable()
  132. for entity in entities:
  133. table.insert(entity)
  134. return table.process(text)
  135. def is_valid_ip(ip):
  136. try:
  137. ip_address(ip)
  138. except:
  139. return False
  140. return True
  141. def remove_ansi_escapes(text):
  142. return ANSI_ESCAPE_RE.sub("", text)