utils.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. import re
  2. from uuid import uuid4
  3. from enum import IntEnum
  4. from datetime import datetime
  5. from ipaddress import ip_address
  6. from collections import namedtuple
  7. from aiofiles.os import mkdir
  8. from aiofiles.os import path
  9. from telethon.utils import get_display_name
  10. from telethon.tl.types import (
  11. MessageEntityBold,
  12. MessageEntityItalic,
  13. MessageEntityStrike,
  14. MessageEntityCode,
  15. MessageEntityPre,
  16. MessageEntitySpoiler,
  17. MessageEntityUnderline,
  18. MessageEntityUrl,
  19. MessageEntityMention,
  20. MessageEntityBotCommand
  21. )
  22. Command = namedtuple(
  23. 'Command',
  24. 'name argc args args_string'
  25. )
  26. Age = namedtuple(
  27. 'Age',
  28. 'days_until age age_now date_string'
  29. )
  30. ANSI_ESCAPE_RE = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
  31. class Kind(IntEnum):
  32. CANNOT_APPLY_TO_SELF = 0
  33. CAN_APPLY_TO_SELF = 1
  34. NO_TARGET = 2
  35. NO_TARGET_MAYBE = 3
  36. def parse_command(text):
  37. text = text.strip()
  38. if not text.startswith('/'):
  39. raise ValueError
  40. text = text.split(' ')
  41. if len(text) < 1:
  42. raise ValueError
  43. command = text[0][1:].lower()
  44. if '@' in command:
  45. command = command.split('@')
  46. command = command[0]
  47. args = text[1:]
  48. argc = len(args)
  49. args_string = ' '.join(args)
  50. return Command(
  51. name=command,
  52. argc=argc,
  53. args=args,
  54. args_string=args_string
  55. )
  56. def get_user_name(user):
  57. full_name = get_display_name(user)
  58. if not full_name:
  59. full_name = user.username
  60. if not full_name:
  61. full_name = '?'
  62. return full_name
  63. def get_link_to_user(user):
  64. full_name = get_user_name(user)
  65. if user.username:
  66. return f'[{full_name}](@{user.username})'
  67. return f'[{full_name}](tg://user?id={user.id})'
  68. def is_valid_name(name):
  69. return name.isidentifier()
  70. def make_temporary_filename(ext):
  71. uid = uuid4().hex
  72. return f'tmp_{uid}.{ext}'
  73. CACHE_DIR = './cache'
  74. async def make_cache_filename(id, ext):
  75. if not await path.isdir(CACHE_DIR):
  76. await mkdir(CACHE_DIR)
  77. return f'{CACHE_DIR}/{id}.{ext}'
  78. def parse_kind(kind):
  79. kind = int(kind)
  80. if kind < 0 or kind > 3:
  81. raise ValueError
  82. return kind
  83. def calculate_age(date):
  84. now = datetime.now().date()
  85. birthday_date = date.replace(year=now.year)
  86. if now > birthday_date:
  87. birthday_date = date.replace(year=now.year + 1)
  88. delta = birthday_date - now
  89. age = (birthday_date - date).days // 365
  90. age_now = age - 1
  91. return Age(
  92. days_until=delta.days,
  93. age=age,
  94. age_now=age_now,
  95. date_string=date.strftime('%d.%m.%Y')
  96. )
  97. DELIMITERS = {
  98. MessageEntityBold: '**',
  99. MessageEntityItalic: '__',
  100. MessageEntityStrike: '~~',
  101. MessageEntityCode: '`',
  102. MessageEntityPre: '`',
  103. MessageEntityUnderline: '\ue000',
  104. MessageEntitySpoiler: '\ue001',
  105. MessageEntityUrl: '\ue002',
  106. MessageEntityMention: '\ue003',
  107. MessageEntityBotCommand: '\ue003'
  108. }
  109. class LookupTable:
  110. def __init__(self):
  111. self._start = {}
  112. self._end = {}
  113. def insert(self, entity):
  114. delimiter = DELIMITERS.get(type(entity))
  115. if not delimiter:
  116. return
  117. start = entity.offset
  118. end = entity.offset + entity.length
  119. if start in self._start:
  120. self._start[start].append(delimiter)
  121. else:
  122. self._start[start] = [delimiter]
  123. if end in self._end:
  124. self._end[end].insert(0, delimiter)
  125. else:
  126. self._end[end] = [delimiter]
  127. def _lookup(self, position):
  128. if position in self._start:
  129. return ''.join(self._start[position])
  130. elif position in self._end:
  131. return ''.join(self._end[position])
  132. def process(self, text):
  133. text = list(text) + ['']
  134. result = ''
  135. for position, character in zip(range(len(text)), text):
  136. delimiter = self._lookup(position)
  137. if delimiter:
  138. result += delimiter
  139. result += character
  140. return result
  141. def unparse(text, entities):
  142. table = LookupTable()
  143. for entity in entities:
  144. table.insert(entity)
  145. return table.process(text)
  146. def is_valid_ip(ip):
  147. try:
  148. ip_address(ip)
  149. except:
  150. return False
  151. return True
  152. def remove_ansi_escapes(text):
  153. return ANSI_ESCAPE_RE.sub('', text)