123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- from random import uniform
- from asyncio import sleep
- from tortoise.contrib.postgres.functions import Random
- from telethon.utils import get_input_document
- from telethon.tl.functions.stickers import (
- CreateStickerSetRequest,
- AddStickerToSetRequest,
- )
- from telethon.tl.functions.messages import UploadMediaRequest, GetStickerSetRequest
- from telethon.tl.types import (
- InputStickerSetID,
- InputStickerSetShortName,
- InputStickerSetItem,
- InputMediaUploadedDocument,
- InputPeerSelf,
- )
- from tortoise.expressions import F
- from models import (
- Action,
- Gif,
- StickerPack,
- Admin,
- BirthDay,
- VPNServer,
- AllowedChat,
- MarkovChat,
- )
- from utils import is_valid_name, is_valid_ip
- from config import config
- async def is_admin(bot, user):
- admin = await bot.get_entity(config.ADMIN)
- if user.id == admin.id:
- return True
- admin = await Admin.filter(user_id=user.id)
- if admin:
- return True
- return False
- async def add_admin(user):
- await Admin(user_id=user.id).save()
- async def delete_admin(user):
- admin = await Admin.filter(user_id=user.id).first()
- if not admin:
- raise IndexError
- await admin.delete()
- async def create_action(name, template, kind):
- if not is_valid_name(name):
- raise SyntaxError
- await Action(name=name, template=template, kind=kind).save()
- async def find_action(name):
- if not is_valid_name(name):
- raise SyntaxError
- return await Action.filter(name=name).first()
- async def delete_action(name):
- action = await find_action(name)
- if not action:
- raise NameError
- gifs = await action.gifs.all()
- for gif in gifs:
- await gif.delete()
- await action.delete()
- async def add_gif(action, file_id):
- await Gif(action=action, file_id=file_id).save()
- async def get_random_gif(action):
- return await action.gifs.all().annotate(order=Random()).order_by("order").first()
- async def create_new_pack(bot, sticker):
- last_pack = await StickerPack.all().order_by("-id").first()
- set_id = last_pack.id + 1 if last_pack else 1
- user = await bot.get_entity(config.USER)
- me = await bot.get_me()
- bot_username = me.username
- pack = await bot(
- CreateStickerSetRequest(
- user_id=user.id,
- title=f"Messages #{set_id}.",
- short_name=f"messages{set_id}_by_{bot_username}",
- stickers=[sticker],
- )
- )
- sid = pack.set.id
- hash = pack.set.access_hash
- await StickerPack(short_name=pack.set.short_name, sid=sid, hash=hash).save()
- return pack
- async def get_current_pack(bot):
- pack = await StickerPack.all().order_by("-id").first()
- if not pack or pack.stickers_count >= 119:
- return None
- return pack
- async def add_sticker(bot, file, emoji):
- file = await bot.upload_file(file)
- file = InputMediaUploadedDocument(file, "image/png", [])
- file = await bot(UploadMediaRequest(InputPeerSelf(), file))
- file = get_input_document(file)
- sticker = InputStickerSetItem(document=file, emoji=emoji)
- pack = await get_current_pack(bot)
- if not pack:
- pack = await create_new_pack(bot, sticker)
- else:
- await StickerPack.filter(id=pack.id).update(
- stickers_count=F("stickers_count") + 1
- )
- pack = await bot(
- AddStickerToSetRequest(
- stickerset=InputStickerSetID(id=pack.sid, access_hash=pack.hash),
- sticker=sticker,
- )
- )
- return get_input_document(pack.documents[-1])
- async def get_birthdays(peer_id):
- return await BirthDay.filter(peer_id=peer_id).all()
- async def add_or_update_birthday(peer_id, user, date):
- birthday = await BirthDay.filter(peer_id=peer_id, user_id=user.id).first()
- if birthday:
- await BirthDay.filter(id=birthday.id).update(date=date)
- return False
- await BirthDay(peer_id=peer_id, user_id=user.id, date=date).save()
- return True
- async def get_all_birthdays():
- return await BirthDay.all()
- async def add_server(name, ip):
- if not is_valid_ip(ip):
- raise ValueError
- await VPNServer(name=name, ip=ip).save()
- async def add_server(name, ip):
- if not is_valid_name(name):
- raise SyntaxError
- if not is_valid_ip(ip):
- raise ValueError
- await VPNServer(name=name, ip=ip).save()
- async def delete_server(name):
- if not is_valid_name(name):
- raise SyntaxError
- server = await VPNServer.filter(name=name).first()
- if not server:
- raise IndexError
- await server.delete()
- async def list_servers():
- servers = await VPNServer.all()
- if not servers:
- return "*пусто*"
- return ", ".join(map(lambda server: server.name, servers))
- async def get_server_ip(name):
- if not is_valid_name(name):
- raise SyntaxError
- server = await VPNServer.filter(name=name).first()
- if not server:
- raise IndexError
- return server.ip
- async def add_allowed(peer_id):
- await AllowedChat(peer_id=peer_id).save()
- async def delete_allowed(peer_id):
- chat = await AllowedChat.filter(peer_id=peer_id).first()
- if not chat:
- raise IndexError
- await chat.delete()
- async def is_allowed(peer_id):
- return await AllowedChat.filter(peer_id=peer_id).exists()
- async def is_markov_enabled(peer_id):
- return await MarkovChat.filter(peer_id=peer_id).exists()
- async def enable_markov(peer_id):
- await MarkovChat(peer_id=peer_id).save()
- async def set_markov_options(peer_id, **options):
- chat = await MarkovChat.filter(peer_id=peer_id).first()
- if not chat:
- raise IndexError
- await MarkovChat.filter(id=chat.id).update(**options)
- async def get_markov_option(peer_id, option):
- chat = await MarkovChat.filter(peer_id=peer_id).first()
- if not chat:
- raise IndexError
- return getattr(chat, option)
- async def disable_markov(peer_id):
- chat = await MarkovChat.filter(peer_id=peer_id).first()
- if not chat:
- raise IndexError
- await chat.delete()
- async def list_markov_chats():
- return await MarkovChat.all()
- async def markov_say(bot, peer_id, reply_to=None):
- if not bot.markov.is_ready:
- return
- text = bot.markov.generate()
- async with bot.action(peer_id, "typing"):
- amount = 0
- for _ in range(len(text)):
- amount += round(uniform(0.05, 0.2), 2)
- await sleep(min(amount, 8))
- await bot.send_message(peer_id, message=text, reply_to=reply_to)
|