rand.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. import re
  2. import random
  3. import struct
  4. import inspect
  5. import operator
  6. import aiohttp
  7. async def _pyrandom(min_val, max_val, is_dice=False):
  8. if is_dice:
  9. return [random.randint(1, max_val) for _ in range(min_val)]
  10. return random.randint(min_val, max_val)
  11. async def _get(url):
  12. async with aiohttp.ClientSession() as session:
  13. async with session.get(url) as resp:
  14. return await resp.read()
  15. async def generate_unbiased_numbers(data, min_val, max_val, count):
  16. mod = max_val - min_val + 1
  17. max_acceptable = (1 << 64) // mod * mod
  18. numbers = []
  19. data_len = len(data)
  20. bytes_needed = count * 8
  21. if data_len < bytes_needed:
  22. return None
  23. for i in range(count):
  24. chunk = data[i * 8 : (i + 1) * 8]
  25. number = struct.unpack("<Q", chunk)[0]
  26. while number >= max_acceptable:
  27. number = number >> 1
  28. numbers.append((number % mod) + min_val)
  29. return numbers
  30. async def generate(source, min_val, max_val, count=1):
  31. total_bytes = count * 8
  32. try:
  33. data = await _get(f"{source}{total_bytes}")
  34. except Exception:
  35. data = b""
  36. if len(data) >= total_bytes:
  37. result = await generate_unbiased_numbers(data, min_val, max_val, count)
  38. if result is not None:
  39. return result
  40. if count == 1:
  41. return [await _pyrandom(min_val, max_val)]
  42. return await _pyrandom(min_val, max_val, is_dice=True)
  43. async def _trng_yebisu(min_val, max_val, is_dice=False):
  44. count = min_val if is_dice else 1
  45. actual_min = 1 if is_dice else min_val
  46. actual_max = max_val if is_dice else max_val
  47. numbers = await generate(
  48. "https://yebi.su/api/pool?count=", actual_min, actual_max, count
  49. )
  50. return numbers if is_dice else numbers[0]
  51. async def randint(min_val, max_val):
  52. try:
  53. return await _trng_yebisu(min_val, max_val)
  54. except Exception:
  55. return await _pyrandom(min_val, max_val)
  56. async def rolldices(count, sides):
  57. try:
  58. return await _trng_yebisu(count, sides, is_dice=True)
  59. except Exception:
  60. return await _pyrandom(count, sides, is_dice=True)
  61. async def _roll(count, sides):
  62. if count <= 0:
  63. raise ValueError("Количество костей должно быть больше нуля.")
  64. if sides <= 0:
  65. raise ValueError("Количество сторон должно быть больше нуля.")
  66. return await rolldices(count, sides)
  67. OPS = {
  68. "+": operator.add,
  69. "-": operator.sub,
  70. "*": operator.mul,
  71. "/": operator.truediv,
  72. "%": operator.mod,
  73. "^": operator.pow,
  74. "..": randint,
  75. }
  76. OPS_KEYS = "|".join(map(lambda k: "(" + re.escape(k) + ")", OPS.keys()))
  77. T_NAME = re.compile(r"([abce-zа-ге-йл-яA-ZА-Я]+)")
  78. T_COLON = re.compile(r"(:)")
  79. T_DICE = re.compile(r"(d|д|к)")
  80. T_MINUS = re.compile(r"(-)")
  81. T_OP = re.compile(f"({OPS_KEYS})")
  82. T_DIGIT = re.compile(r"(\d+)")
  83. T_OPEN_PAREN = re.compile(r"(\()")
  84. T_CLOSE_PAREN = re.compile(r"(\))")
  85. T_WS = re.compile(r"([ \t\r\n]+)")
  86. TOKEN_NAMES = {
  87. T_NAME: "имя",
  88. T_COLON: "двоеточие",
  89. T_DICE: "кость",
  90. T_OP: "оператор",
  91. T_DIGIT: "число",
  92. T_OPEN_PAREN: "открывающая скобка",
  93. T_CLOSE_PAREN: "закрывающая скобка",
  94. }
  95. class Value:
  96. def __init__(self, value):
  97. if not isinstance(value, list):
  98. value = [int(value)]
  99. self.value = value
  100. def __int__(self):
  101. return sum(map(int, self.value))
  102. def __repr__(self):
  103. return str(self.value[0] if len(self.value) == 1 else self.value)
  104. def __iter__(self):
  105. return iter(self.value)
  106. def __next__(self):
  107. return next(self.value)
  108. def __index__(self, index):
  109. return self.value[index]
  110. def __len__(self):
  111. return len(self.value)
  112. def __eq__(self, other):
  113. if not isinstance(other, Value):
  114. return False
  115. return self.value == other.value
  116. def apply(self, what, *args):
  117. args = list(map(int, args))
  118. return Value(what(int(self), *args))
  119. class Dices:
  120. def __init__(self, text):
  121. self.text = text.strip()
  122. self.position = 0
  123. self.names = {}
  124. self.rolls = []
  125. self.result = None
  126. self._rolls = []
  127. def __repr__(self):
  128. if self.result:
  129. buffer = ""
  130. for count, sides, roll, is_range in self._rolls:
  131. if is_range:
  132. buffer += f"{count}..{sides}: {roll}\n"
  133. else:
  134. buffer += f"{'' if count == 1 else count}d{sides}: {roll}\n"
  135. for roll, result in zip(self.rolls, self.result):
  136. count, sides, roll, is_range = roll
  137. if is_range:
  138. continue
  139. buffer += f"{'' if count == 1 else count}d{sides}: "
  140. roll_sum = int(roll)
  141. result_sum = int(result)
  142. if result_sum == roll_sum:
  143. results = ", ".join(map(str, result))
  144. if "," in results:
  145. results = f"[{results}]"
  146. buffer += results
  147. if "," in results:
  148. buffer += f" ({result_sum})"
  149. buffer += "\n"
  150. else:
  151. difference = result_sum - roll_sum
  152. results = f"{'' if len(roll) == 1 else f'{roll_sum} -> '}{roll_sum}{'' if difference < 0 else '+'}{difference}"
  153. buffer += f"{roll} -> {results} ({int(result)})\n"
  154. return f"{self.text}\n{buffer}= {int(self.result)}"
  155. return self.text
  156. def _skip_ws(self):
  157. match = T_WS.match(self.text, self.position)
  158. if match:
  159. self.position += len(match.group(0))
  160. def _done(self):
  161. self._skip_ws()
  162. return self.position >= len(self.text)
  163. def _match(self, what, skip_ws=True):
  164. if skip_ws:
  165. self._skip_ws()
  166. match = what.match(self.text, self.position)
  167. if match:
  168. self.position += len(match.group(0))
  169. return match.groups()
  170. def _expected(self, expected):
  171. raise SyntaxError(
  172. f"Неожиданный ввод на позиции `#{self.position + 1}`: ожидалось: `{expected}`."
  173. )
  174. def _expect(self, what):
  175. match = self._match(what)
  176. if not match:
  177. self._expected(TOKEN_NAMES[what])
  178. return match
  179. async def _parse_dice(self, left=1):
  180. if self._match(T_OPEN_PAREN):
  181. right = await self._parse_expr()
  182. self._expect(T_CLOSE_PAREN)
  183. else:
  184. right = await self._parse_atom()
  185. left = int(left)
  186. right = int(right)
  187. if left > 1000 or right > 1000:
  188. raise SyntaxError("Слишком длинное число.")
  189. roll = Value(await _roll(left, right))
  190. self._rolls.append((left, right, roll, False))
  191. return roll
  192. async def _parse_atom(self):
  193. if self._match(T_OPEN_PAREN):
  194. expr = await self._parse_expr()
  195. self._expect(T_CLOSE_PAREN)
  196. if self._match(T_DICE, skip_ws=False):
  197. return await self._parse_dice(expr)
  198. return expr
  199. elif self._match(T_MINUS):
  200. value = await self._parse_atom()
  201. return value.apply(operator.neg)
  202. elif match := self._match(T_DIGIT):
  203. try:
  204. left = int(match[0])
  205. except ValueError:
  206. raise SyntaxError("Слишком длинное число.")
  207. if match := self._match(T_DICE, skip_ws=False):
  208. return await self._parse_dice(left)
  209. return Value(left)
  210. elif self._match(T_DICE):
  211. return await self._parse_dice()
  212. elif match := self._match(T_NAME):
  213. name = match[0].upper()
  214. if name not in self.names:
  215. raise NameError(f"Неизвестная переменная: `{match[0]}`.")
  216. expr = self.names[name]
  217. if self._match(T_DICE, skip_ws=False):
  218. return await self._parse_dice(expr)
  219. return expr
  220. self._expected("число, кость или переменная")
  221. async def _parse_expr(self):
  222. left = await self._parse_atom()
  223. if op := self._match(T_OP):
  224. is_range = op[0] == ".."
  225. op = OPS[op[0]]
  226. right = await self._parse_expr()
  227. if is_range or inspect.iscoroutinefunction(op):
  228. left = int(left)
  229. right = int(right)
  230. try:
  231. r = await op(left, right)
  232. except Exception:
  233. r = 0
  234. if not isinstance(r, Value):
  235. r = Value(r)
  236. if is_range:
  237. self._rolls.append((left, right, r, True))
  238. left = r
  239. else:
  240. left = left.apply(op, right)
  241. elif self._match(T_COLON):
  242. right = self._expect(T_NAME)[0].upper()
  243. self.names[right] = left
  244. return left
  245. async def _parse_exprs(self):
  246. exprs = []
  247. while not self._done():
  248. if len(exprs) >= 10:
  249. raise SyntaxError("Слишком длинное число.")
  250. rolls_count = len(self._rolls)
  251. expr = await self._parse_expr()
  252. if len(self._rolls) == rolls_count:
  253. raise SyntaxError("Выражение не содержит бросков.")
  254. self.rolls.append(self._rolls.pop(-1))
  255. exprs.append(expr)
  256. if not exprs:
  257. raise SyntaxError("Выражение не должно быть пустым.")
  258. return Value(exprs)
  259. async def roll(self, vars={}):
  260. self.names = {str(k).upper(): Value(vars[k]) for k in vars}
  261. self.position = 0
  262. self.rolls = []
  263. self._rolls = []
  264. self.result = await self._parse_exprs()
  265. return self
  266. async def roll_dices(dices, vars={}):
  267. dices = Dices(dices)
  268. try:
  269. await dices.roll(vars=vars)
  270. except (ValueError, SyntaxError, NameError) as e:
  271. return str(e)
  272. except ZeroDivisionError:
  273. raise "Попытка деления на ноль."
  274. return str(dices)