123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391 |
- import re
- import random
- import struct
- import inspect
- import operator
- import aiohttp
- async def _pyrandom(min_val, max_val, is_dice=False):
- if is_dice:
- return [random.randint(1, max_val) for _ in range(min_val)]
- return random.randint(min_val, max_val)
- async def _get(url):
- async with aiohttp.ClientSession() as session:
- async with session.get(url) as resp:
- return await resp.read()
- async def generate_unbiased_numbers(data, min_val, max_val, count):
- mod = max_val - min_val + 1
- max_acceptable = (1 << 64) // mod * mod
- numbers = []
- data_len = len(data)
- bytes_needed = count * 8
- if data_len < bytes_needed:
- return None
- for i in range(count):
- chunk = data[i * 8 : (i + 1) * 8]
- number = struct.unpack("<Q", chunk)[0]
- while number >= max_acceptable:
- number = number >> 1
- numbers.append((number % mod) + min_val)
- return numbers
- async def generate(source, min_val, max_val, count=1):
- total_bytes = count * 8
- try:
- data = await _get(f"{source}{total_bytes}")
- except Exception:
- data = b""
- if len(data) >= total_bytes:
- result = await generate_unbiased_numbers(data, min_val, max_val, count)
- if result is not None:
- return result
- if count == 1:
- return [await _pyrandom(min_val, max_val)]
- return await _pyrandom(min_val, max_val, is_dice=True)
- async def _trng_yebisu(min_val, max_val, is_dice=False):
- count = min_val if is_dice else 1
- actual_min = 1 if is_dice else min_val
- actual_max = max_val if is_dice else max_val
- numbers = await generate(
- "https://yebi.su/api/pool?count=", actual_min, actual_max, count
- )
- return numbers if is_dice else numbers[0]
- async def randint(min_val, max_val):
- try:
- return await _trng_yebisu(min_val, max_val)
- except Exception:
- return await _pyrandom(min_val, max_val)
- async def rolldices(count, sides):
- try:
- return await _trng_yebisu(count, sides, is_dice=True)
- except Exception:
- return await _pyrandom(count, sides, is_dice=True)
- async def _roll(count, sides):
- if count <= 0:
- raise ValueError("Количество костей должно быть больше нуля.")
- if sides <= 0:
- raise ValueError("Количество сторон должно быть больше нуля.")
- return await rolldices(count, sides)
- OPS = {
- "+": operator.add,
- "-": operator.sub,
- "*": operator.mul,
- "/": operator.truediv,
- "%": operator.mod,
- "^": operator.pow,
- "..": randint,
- }
- OPS_KEYS = "|".join(map(lambda k: "(" + re.escape(k) + ")", OPS.keys()))
- T_NAME = re.compile(r"([abce-zа-ге-йл-яA-ZА-Я]+)")
- T_COLON = re.compile(r"(:)")
- T_DICE = re.compile(r"(d|д|к)")
- T_MINUS = re.compile(r"(-)")
- T_OP = re.compile(f"({OPS_KEYS})")
- T_DIGIT = re.compile(r"(\d+)")
- T_OPEN_PAREN = re.compile(r"(\()")
- T_CLOSE_PAREN = re.compile(r"(\))")
- T_WS = re.compile(r"([ \t\r\n]+)")
- TOKEN_NAMES = {
- T_NAME: "имя",
- T_COLON: "двоеточие",
- T_DICE: "кость",
- T_OP: "оператор",
- T_DIGIT: "число",
- T_OPEN_PAREN: "открывающая скобка",
- T_CLOSE_PAREN: "закрывающая скобка",
- }
- class Value:
- def __init__(self, value):
- if not isinstance(value, list):
- value = [int(value)]
- self.value = value
- def __int__(self):
- return sum(map(int, self.value))
- def __repr__(self):
- return str(self.value[0] if len(self.value) == 1 else self.value)
- def __iter__(self):
- return iter(self.value)
- def __next__(self):
- return next(self.value)
- def __index__(self, index):
- return self.value[index]
- def __len__(self):
- return len(self.value)
- def __eq__(self, other):
- if not isinstance(other, Value):
- return False
- return self.value == other.value
- def apply(self, what, *args):
- args = list(map(int, args))
- return Value(what(int(self), *args))
- class Dices:
- def __init__(self, text):
- self.text = text.strip()
- self.position = 0
- self.names = {}
- self.rolls = []
- self.result = None
- self._rolls = []
- def __repr__(self):
- if self.result:
- buffer = ""
- for count, sides, roll, is_range in self._rolls:
- if is_range:
- buffer += f"{count}..{sides}: {roll}\n"
- else:
- buffer += f"{'' if count == 1 else count}d{sides}: {roll}\n"
- for roll, result in zip(self.rolls, self.result):
- count, sides, roll, is_range = roll
- if is_range:
- buffer += f"{count}..{sides}: "
- else:
- buffer += f"{'' if count == 1 else count}d{sides}: "
- roll_sum = int(roll)
- result_sum = int(result)
- if result_sum == roll_sum:
- results = ", ".join(map(str, result))
- if "," in results:
- results = f"[{results}]"
- buffer += results
- if "," in results:
- buffer += f" ({result_sum})"
- buffer += "\n"
- else:
- difference = result_sum - roll_sum
- results = f"{'' if len(roll) == 1 else f'{roll_sum} -> '}{roll_sum}{'' if difference < 0 else '+'}{difference}"
- buffer += f"{roll} -> {results} ({int(result)})\n"
- return f"{self.text}\n{buffer}= {int(self.result)}"
- return self.text
- def _skip_ws(self):
- match = T_WS.match(self.text, self.position)
- if match:
- self.position += len(match.group(0))
- def _done(self):
- self._skip_ws()
- return self.position >= len(self.text)
- def _match(self, what, skip_ws=True):
- if skip_ws:
- self._skip_ws()
- match = what.match(self.text, self.position)
- if match:
- self.position += len(match.group(0))
- return match.groups()
- def _expected(self, expected):
- raise SyntaxError(
- f"Неожиданный ввод на позиции `#{self.position + 1}`: ожидалось: `{expected}`."
- )
- def _expect(self, what):
- match = self._match(what)
- if not match:
- self._expected(TOKEN_NAMES[what])
- return match
- async def _parse_dice(self, left=1):
- if self._match(T_OPEN_PAREN):
- right = await self._parse_expr()
- self._expect(T_CLOSE_PAREN)
- else:
- right = await self._parse_atom()
- left = int(left)
- right = int(right)
- if left > 1000 or right > 1000:
- raise SyntaxError("Слишком длинное число.")
- roll = Value(await _roll(left, right))
- self._rolls.append((left, right, roll, False))
- return roll
- async def _parse_atom(self):
- if self._match(T_OPEN_PAREN):
- expr = await self._parse_expr()
- self._expect(T_CLOSE_PAREN)
- if self._match(T_DICE, skip_ws=False):
- return await self._parse_dice(expr)
- return expr
- elif self._match(T_MINUS):
- value = await self._parse_atom()
- return value.apply(operator.neg)
- elif match := self._match(T_DIGIT):
- try:
- left = int(match[0])
- except ValueError:
- raise SyntaxError("Слишком длинное число.")
- if match := self._match(T_DICE, skip_ws=False):
- return await self._parse_dice(left)
- return Value(left)
- elif self._match(T_DICE):
- return await self._parse_dice()
- elif match := self._match(T_NAME):
- name = match[0].upper()
- if name not in self.names:
- raise NameError(f"Неизвестная переменная: `{match[0]}`.")
- expr = self.names[name]
- if self._match(T_DICE, skip_ws=False):
- return await self._parse_dice(expr)
- return expr
- self._expected("число, кость или переменная")
- async def _parse_expr(self):
- left = await self._parse_atom()
- if op := self._match(T_OP):
- is_range = op[0] == ".."
- op = OPS[op[0]]
- right = await self._parse_expr()
- if is_range or inspect.iscoroutinefunction(op):
- left = int(left)
- right = int(right)
- try:
- r = await op(left, right)
- except Exception:
- r = 0
- if not isinstance(r, Value):
- r = Value(r)
- if is_range:
- self._rolls.append((left, right, r, True))
- left = r
- else:
- left = left.apply(op, right)
- elif self._match(T_COLON):
- right = self._expect(T_NAME)[0].upper()
- self.names[right] = left
- return left
- async def _parse_exprs(self):
- exprs = []
- while not self._done():
- if len(exprs) >= 10:
- raise SyntaxError("Слишком длинное число.")
- rolls_count = len(self._rolls)
- expr = await self._parse_expr()
- if len(self._rolls) == rolls_count:
- raise SyntaxError("Выражение не содержит бросков.")
- self.rolls.append(self._rolls.pop(-1))
- exprs.append(expr)
- if not exprs:
- raise SyntaxError("Выражение не должно быть пустым.")
- return Value(exprs)
- async def roll(self, vars={}):
- self.names = {str(k).upper(): Value(vars[k]) for k in vars}
- self.position = 0
- self.rolls = []
- self._rolls = []
- self.result = await self._parse_exprs()
- return self
- async def roll_dices(dices, vars={}):
- dices = Dices(dices)
- try:
- await dices.roll(vars=vars)
- except (ValueError, SyntaxError, NameError) as e:
- return str(e)
- except ZeroDivisionError:
- raise "Попытка деления на ноль."
- return str(dices)
|