import re from uuid import uuid4 from enum import IntEnum from datetime import datetime from ipaddress import ip_address from collections import namedtuple from aiofiles.os import mkdir from aiofiles.os import path from telethon.utils import get_display_name from telethon.tl.types import ( MessageEntityBold, MessageEntityItalic, MessageEntityStrike, MessageEntityCode, MessageEntityPre, MessageEntitySpoiler, MessageEntityUnderline, MessageEntityUrl, MessageEntityMention, MessageEntityBotCommand, ) Command = namedtuple("Command", "name argc args args_string") Age = namedtuple("Age", "days_until age age_now date_string") ANSI_ESCAPE_RE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") class Kind(IntEnum): CANNOT_APPLY_TO_SELF = 0 CAN_APPLY_TO_SELF = 1 NO_TARGET = 2 NO_TARGET_MAYBE = 3 def parse_command(text): text = text.strip() if not text.startswith("/"): raise ValueError text = text.split(" ") if len(text) < 1: raise ValueError command = text[0][1:].lower() if "@" in command: command = command.split("@") command = command[0] args = text[1:] argc = len(args) args_string = " ".join(args) return Command(name=command, argc=argc, args=args, args_string=args_string) def get_user_name(user): full_name = get_display_name(user) if not full_name: full_name = user.username if not full_name: full_name = "?" return full_name def get_link_to_user(user): full_name = get_user_name(user) if user.username: return f"[{full_name}](@{user.username})" return f"[{full_name}](tg://user?id={user.id})" def is_valid_name(name): return name.isidentifier() def make_temporary_filename(ext): uid = uuid4().hex return f"tmp_{uid}.{ext}" CACHE_DIR = "./cache" async def make_cache_filename(id, ext): if not await path.isdir(CACHE_DIR): await mkdir(CACHE_DIR) return f"{CACHE_DIR}/{id}.{ext}" def parse_kind(kind): kind = int(kind) if kind < 0 or kind > 3: raise ValueError return kind def calculate_age(date): now = datetime.now().date() birthday_date = date.replace(year=now.year) if now > birthday_date: birthday_date = date.replace(year=now.year + 1) delta = birthday_date - now age = (birthday_date - date).days // 365 age_now = age - 1 return Age( days_until=delta.days, age=age, age_now=age_now, date_string=date.strftime("%d.%m.%Y"), ) DELIMITERS = { MessageEntityBold: "**", MessageEntityItalic: "__", MessageEntityStrike: "~~", MessageEntityCode: "`", MessageEntityPre: "`", MessageEntityUnderline: "\ue000", MessageEntitySpoiler: "\ue001", MessageEntityUrl: "\ue002", MessageEntityMention: "\ue003", MessageEntityBotCommand: "\ue003", } class LookupTable: def __init__(self): self._start = {} self._end = {} def insert(self, entity): delimiter = DELIMITERS.get(type(entity)) if not delimiter: return start = entity.offset end = entity.offset + entity.length if start in self._start: self._start[start].append(delimiter) else: self._start[start] = [delimiter] if end in self._end: self._end[end].insert(0, delimiter) else: self._end[end] = [delimiter] def _lookup(self, position): if position in self._start: return "".join(self._start[position]) elif position in self._end: return "".join(self._end[position]) def process(self, text): text = list(text) + [""] result = "" for position, character in zip(range(len(text)), text): delimiter = self._lookup(position) if delimiter: result += delimiter result += character return result def unparse(text, entities): table = LookupTable() for entity in entities: table.insert(entity) return table.process(text) def is_valid_ip(ip): try: ip_address(ip) except: return False return True def remove_ansi_escapes(text): return ANSI_ESCAPE_RE.sub("", text)