txlyre преди 1 ден
родител
ревизия
32c6f4730a
променени са 2 файла, в които са добавени 387 реда и са изтрити 14 реда
  1. 32 14
      commands.py
  2. 355 0
      rand.py

+ 32 - 14
commands.py

@@ -1,11 +1,9 @@
 from io import BytesIO
-from sys import executable
 from struct import pack
 from asyncio import create_subprocess_shell
 from asyncio.subprocess import PIPE
 from datetime import datetime
 
-from ujson import dumps
 from tortoise.exceptions import IntegrityError
 from telethon.utils import get_display_name, get_peer_id
 from telethon.tl.types import MessageEntityCode
@@ -48,6 +46,7 @@ from utils import (
     calculate_age,
     unparse,
 )
+from rand import roll_dices
 
 
 class Handler:
@@ -255,7 +254,7 @@ async def markov_handler(bot, event, command):
     if command.args[0] == "enable":
         try:
             await enable_markov(peer_id)
-        except:
+        except Exception:
             await event.reply("Ошибка!!!!!")
 
             return
@@ -264,7 +263,7 @@ async def markov_handler(bot, event, command):
     elif command.args[0] == "disable":
         try:
             await disable_markov(peer_id)
-        except:
+        except Exception:
             await event.reply("Ошибка!!!!!")
 
             return
@@ -277,7 +276,7 @@ async def markov_handler(bot, event, command):
             await set_markov_options(
                 peer_id, **{command.args[1]: float(command.args[2])}
             )
-        except:
+        except Exception:
             await event.reply("Ошибка!!!!!")
 
             return
@@ -286,7 +285,7 @@ async def markov_handler(bot, event, command):
     elif command.args[0] == "get" and command.argc == 2:
         try:
             await event.reply(str(await get_markov_option(peer_id, command.args[1])))
-        except:
+        except Exception:
             await event.reply("Ошибка!!!!!")
     elif command.args[0] == "say":
         if not bot.markov.is_ready:
@@ -493,7 +492,7 @@ async def vpn_handler(bot, event, command):
                 data = await resp.json()
 
                 profile = data["profile"]
-        except:
+        except Exception:
             await event.reply(
                 "Произошла ошибка при попытке обращения к API сервера… :("
             )
@@ -505,7 +504,7 @@ async def vpn_handler(bot, event, command):
             await bot.get_entity(sender_id),
             f"Ваш файл конфигурации WireGuard (сервер {command.args[0]}):\n\n```{profile}```",
         )
-    except:
+    except Exception:
         await event.reply(
             "Произошла ошибка при отправке файла конфигурации в Ваши личные сообщения… :с"
         )
@@ -526,7 +525,7 @@ async def run_handler(bot, event, command):
                     text = text.decode("UTF-8")
 
                     await event.reply(f"Доступные языки:\n`{text}`")
-            except:
+            except Exception:
                 await event.reply("Произошла ошибка при попытке обращения к API… :(")
 
         return
@@ -559,14 +558,14 @@ async def sylvy_handler(bot, event, command):
                     return
 
                 result = data["data"]["result"]
-        except:
+        except Exception:
             await event.reply("Произошла ошибка при попытке обращения к API Sylvy… :(")
 
             return
 
     if result["status"] == "TIMEOUT":
         await event.reply(
-            f"Максимальное время исполнения истекло (более тридцати секунд)!!!"
+            "Максимальное время исполнения истекло (более тридцати секунд)!!!"
         )
 
         return
@@ -622,17 +621,35 @@ async def say_handler(bot, event, command):
         try:
             try:
                 await event.delete()
-            except:
+            except Exception:
                 pass
 
             await markov_say(bot, get_peer_id(event.peer_id), init_state=init_state)
-        except:
+        except Exception:
             try:
                 await markov_say(bot, get_peer_id(event.peer_id))
-            except:
+            except Exception:
                 await event.reply("Ошибка :(")
 
 
+async def roll_handler(bot, event, command):
+    if command.argc < 1:
+        await event.reply("Пожалуйста, не оставляйте ввод пустым!")
+
+        return
+
+    try:
+        text = await roll_dices(command.args_string)
+    except Exception:
+        await event.reply("Ошибка :(")
+
+        return
+
+    await event.reply(
+        text, formatting_entities=[MessageEntityCode(offset=0, length=len(text))]
+    )
+
+
 COMMANDS = {
     "newadmin": Handler(newadmin_handler, is_restricted=True),
     "deladmin": Handler(deladmin_handler, is_restricted=True),
@@ -650,4 +667,5 @@ COMMANDS = {
     "sylvy": Handler(sylvy_handler, is_public=True),
     "run": Handler(run_handler, is_public=True),
     "say": Handler(say_handler, is_public=True),
+    "roll": Handler(roll_handler, is_public=True),
 }

+ 355 - 0
rand.py

@@ -0,0 +1,355 @@
+import re
+import random
+import struct
+import operator
+
+import aiohttp
+
+
+async def _pyrandom(min_val, max_val, is_dice=False):
+    if is_dice:
+        return [random.randint(1, max_val) for _ in range(min_val)]
+    return random.randint(min_val, max_val)
+
+
+async def _get(url):
+    async with aiohttp.ClientSession() as session:
+        async with session.get(url) as resp:
+            return await resp.read()
+
+
+async def generate_unbiased_numbers(data, min_val, max_val, count):
+    mod = max_val - min_val + 1
+    max_acceptable = (1 << 64) // mod * mod
+    numbers = []
+    data_len = len(data)
+    bytes_needed = count * 8
+
+    if data_len < bytes_needed:
+        return None
+
+    for i in range(count):
+        chunk = data[i * 8 : (i + 1) * 8]
+        number = struct.unpack("<Q", chunk)[0]
+
+        while number >= max_acceptable:
+            number = number >> 1
+
+        numbers.append((number % mod) + min_val)
+
+    return numbers
+
+
+async def generate(source, min_val, max_val, count=1):
+    total_bytes = count * 8
+    try:
+        data = await _get(f"{source}{total_bytes}")
+    except Exception:
+        data = b""
+
+    if len(data) >= total_bytes:
+        result = await generate_unbiased_numbers(data, min_val, max_val, count)
+        if result is not None:
+            return result
+
+    if count == 1:
+        return [await _pyrandom(min_val, max_val)]
+    return await _pyrandom(min_val, max_val, is_dice=True)
+
+
+async def _trng_yebisu(min_val, max_val, is_dice=False):
+    count = min_val if is_dice else 1
+    actual_min = 1 if is_dice else min_val
+    actual_max = max_val if is_dice else max_val
+
+    numbers = await generate(
+        "https://yebi.su/api/pool?count=", actual_min, actual_max, count
+    )
+
+    return numbers if is_dice else numbers[0]
+
+
+async def rolldices(count, sides):
+    try:
+        return await _trng_yebisu(count, sides, is_dice=True)
+    except Exception:
+        return await _pyrandom(count, sides, is_dice=True)
+
+
+async def _roll(count, sides):
+    if count <= 0:
+        raise ValueError("Количество костей должно быть больше нуля.")
+
+    if sides <= 0:
+        raise ValueError("Количество сторон должно быть больше нуля.")
+
+    return await rolldices(count, sides)
+
+
+OPS = {
+    "+": operator.add,
+    "-": operator.sub,
+    "*": operator.mul,
+    "/": operator.truediv,
+    "%": operator.mod,
+    "^": operator.pow,
+}
+OPS_KEYS = "".join(OPS.keys()).replace("-", r"\-")
+
+T_NAME = re.compile(r"([abce-zа-ге-йл-яA-ZА-Я]+)")
+T_COLON = re.compile(r"(:)")
+T_DICE = re.compile(r"(d|д|к)")
+T_MINUS = re.compile(r"(-)")
+T_OP = re.compile(f"([{OPS_KEYS}])")
+T_DIGIT = re.compile(r"(\d+)")
+T_OPEN_PAREN = re.compile(r"(\()")
+T_CLOSE_PAREN = re.compile(r"(\))")
+T_WS = re.compile(r"([ \t\r\n]+)")
+
+TOKEN_NAMES = {
+    T_NAME: "имя",
+    T_COLON: "двоеточие",
+    T_DICE: "кость",
+    T_OP: "оператор",
+    T_DIGIT: "число",
+    T_OPEN_PAREN: "открывающая скобка",
+    T_CLOSE_PAREN: "закрывающая скобка",
+}
+
+
+class Value:
+    def __init__(self, value):
+        if not isinstance(value, list):
+            value = [int(value)]
+
+        self.value = value
+
+    def __int__(self):
+        return sum(map(int, self.value))
+
+    def __repr__(self):
+        return str(self.value[0] if len(self.value) == 1 else self.value)
+
+    def __iter__(self):
+        return iter(self.value)
+
+    def __next__(self):
+        return next(self.value)
+
+    def __index__(self, index):
+        return self.value[index]
+
+    def __len__(self):
+        return len(self.value)
+
+    def __eq__(self, other):
+        if not isinstance(other, Value):
+            return False
+
+        return self.value == other.value
+
+    def apply(self, what, *args):
+        args = list(map(int, args))
+
+        return Value(what(int(self), *args))
+
+
+class Dices:
+    def __init__(self, text):
+        self.text = text.strip()
+        self.position = 0
+
+        self.names = {}
+        self.rolls = []
+        self.result = None
+
+        self._rolls = []
+
+    def __repr__(self):
+        if self.result:
+            buffer = ""
+
+            for count, sides, roll in self._rolls:
+                buffer += f"{'' if count == 1 else count}d{sides}: {roll}\n"
+
+            for roll, result in zip(self.rolls, self.result):
+                count, sides, roll = roll
+
+                buffer += f"{'' if count == 1 else count}d{sides}: "
+
+                roll_sum = int(roll)
+                result_sum = int(result)
+                if result_sum == roll_sum:
+                    results = ", ".join(map(str, result))
+
+                    if "," in results:
+                        results = f"[{results}]"
+
+                    buffer += results
+
+                    if "," in results:
+                        buffer += f" ({result_sum})"
+
+                    buffer += "\n"
+                else:
+                    difference = result_sum - roll_sum
+                    results = f"{'' if len(roll) == 1 else f'{roll_sum} -> '}{roll_sum}{'' if difference < 0 else '+'}{difference}"
+
+                    buffer += f"{roll} -> {results} ({int(result)})\n"
+
+            return f"```\n{self.text}\n{buffer}= {int(self.result)}```"
+
+        return self.text
+
+    def _skip_ws(self):
+        match = T_WS.match(self.text, self.position)
+        if match:
+            self.position += len(match.group(0))
+
+    def _done(self):
+        self._skip_ws()
+
+        return self.position >= len(self.text)
+
+    def _match(self, what, skip_ws=True):
+        if skip_ws:
+            self._skip_ws()
+
+        match = what.match(self.text, self.position)
+        if match:
+            self.position += len(match.group(0))
+
+            return match.groups()
+
+    def _expected(self, expected):
+        raise SyntaxError(
+            f"Неожиданный ввод на позиции `#{self.position + 1}`: ожидалось: `{expected}`."
+        )
+
+    def _expect(self, what):
+        match = self._match(what)
+        if not match:
+            self._expected(TOKEN_NAMES[what])
+
+        return match
+
+    async def _parse_dice(self, left=1):
+        if self._match(T_OPEN_PAREN):
+            right = await self._parse_expr()
+
+            self._expect(T_CLOSE_PAREN)
+        else:
+            right = await self._parse_atom()
+
+        left = int(left)
+        right = int(right)
+
+        if left > 1000 or right > 1000:
+            raise SyntaxError("Слишком длинное число.")
+
+        roll = Value(await _roll(left, right))
+
+        self._rolls.append((left, right, roll))
+
+        return roll
+
+    async def _parse_atom(self):
+        if self._match(T_OPEN_PAREN):
+            expr = await self._parse_expr()
+
+            self._expect(T_CLOSE_PAREN)
+
+            if self._match(T_DICE, skip_ws=False):
+                return await self._parse_dice(expr)
+
+            return expr
+        elif self._match(T_MINUS):
+            value = await self._parse_atom()
+
+            return value.apply(operator.neg)
+        elif match := self._match(T_DIGIT):
+            try:
+                left = int(match[0])
+            except ValueError:
+                raise SyntaxError("Слишком длинное число.")
+
+            if match := self._match(T_DICE, skip_ws=False):
+                return await self._parse_dice(left)
+
+            return Value(left)
+        elif self._match(T_DICE):
+            return await self._parse_dice()
+        elif match := self._match(T_NAME):
+            name = match[0].upper()
+
+            if name not in self.names:
+                raise NameError(f"Неизвестная переменная: `{match[0]}`.")
+
+            expr = self.names[name]
+
+            if self._match(T_DICE, skip_ws=False):
+                return await self._parse_dice(expr)
+
+            return expr
+
+        self._expected("число, кость или переменная")
+
+    async def _parse_expr(self):
+        left = await self._parse_atom()
+
+        if op := self._match(T_OP):
+            op = OPS[op[0]]
+            right = await self._parse_expr()
+
+            left = left.apply(op, right)
+        elif self._match(T_COLON):
+            right = self._expect(T_NAME)[0].upper()
+
+            self.names[right] = left
+
+        return left
+
+    async def _parse_exprs(self):
+        exprs = []
+        while not self._done():
+            if len(exprs) >= 10:
+                raise SyntaxError("Слишком длинное число.")
+
+            rolls_count = len(self._rolls)
+
+            expr = await self._parse_expr()
+
+            if len(self._rolls) == rolls_count:
+                raise SyntaxError("Выражение не содержит бросков.")
+
+            self.rolls.append(self._rolls.pop(-1))
+
+            exprs.append(expr)
+
+        if not exprs:
+            raise SyntaxError("Выражение не должно быть пустым.")
+
+        return Value(exprs)
+
+    async def roll(self, vars={}):
+        self.names = {str(k).upper(): Value(vars[k]) for k in vars}
+        self.position = 0
+        self.rolls = []
+        self._rolls = []
+
+        self.result = await self._parse_exprs()
+
+        return self
+
+
+async def roll_dices(dices, vars={}):
+    dices = Dices(dices)
+
+    try:
+        await dices.roll(vars=vars)
+    except (ValueError, SyntaxError, NameError) as e:
+        return str(e)
+    except ZeroDivisionError:
+        raise "Попытка деления на ноль."
+
+    return str(dices)