chess0.py 6.6 KB


  1. import re
  2. import time
  3. import chess
  4. import chess.svg
  5. import chess.engine
  6. class GameOver(Exception):
  7. pass
  8. class IllegalMove(Exception):
  9. pass
  10. def board2svg(board, size=256, shallow=False, orientation=None):
  11. if orientation is None:
  12. orientation = board.turn
  13. arrows = []
  14. if board.move_stack and board.move_stack[-1] != chess.Move.null():
  15. arrows.append((board.move_stack[-1].from_square, board.move_stack[-1].to_square))
  16. if not shallow and len(board.move_stack) > 1 and board.move_stack[-2] != chess.Move.null():
  17. arrows.append((board.move_stack[-2].from_square, board.move_stack[-2].to_square))
  18. if board.is_check():
  19. return chess.svg.board(board, orientation=orientation, size=size, arrows=arrows, fill={checker: "#cc0000cc" for checker in board.checkers()}, check=board.king(board.turn))
  20. return chess.svg.board(board, orientation=orientation, size=size, arrows=arrows)
  21. class ChessSession:
  22. def __init__(self, engine):
  23. self.engine = engine
  24. self.board = chess.Board()
  25. self.ts = time.time()
  26. self.move_ts = time.time()
  27. def parse_move(self, move):
  28. move = move.strip()
  29. try:
  30. move = self.board.parse_san(move)
  31. except (chess.InvalidMoveError, chess.IllegalMoveError, chess.AmbiguousMoveError):
  32. raise IllegalMove(move)
  33. return move
  34. def check_game_over(self):
  35. outcome = self.board.outcome(claim_draw=True)
  36. if outcome is not None:
  37. raise GameOver(f"{outcome.result()} {outcome.termination.name}")
  38. elif self.board.status() & chess.STATUS_NO_WHITE_KING:
  39. raise GameOver(f"0-1 CHECKMATE")
  40. elif self.board.status() & chess.STATUS_NO_BLACK_KING:
  41. raise GameOver(f"1-0 CHECKMATE")
  42. elif not self.board.is_valid():
  43. raise GameOver(f"Некорректное состояние доски.")
  44. async def move(self, move=None):
  45. if move is not None:
  46. move = self.parse_move(move)
  47. else:
  48. move = await self.engine.play(self.board, chess.engine.Limit(nodes=1))
  49. move = move.move
  50. self.board.push(move)
  51. self.move_ts = time.time()
  52. self.check_game_over()
  53. def from_moves(self, moves, strict=True):
  54. self.board.reset()
  55. moves = moves.strip()
  56. if moves.startswith("1."):
  57. text = moves
  58. moves = []
  59. while text and len(moves) < 600:
  60. match = re.match(r"^(?:\d+\. ?([^.\s]+) ([^.\s]+))", text)
  61. if not match:
  62. raise IllegalMove
  63. moves.extend((match.group(1), match.group(2)))
  64. text = text[match.end() - match.start():].strip()
  65. else:
  66. moves = moves.split(" ")
  67. if len(moves) > 600 or (strict and len(moves) % 2 != 0):
  68. raise IllegalMove
  69. for i, move in zip(range(len(moves)), moves):
  70. move = self.parse_move(move)
  71. if strict and move == chess.Move.null() and i % 2 != 0:
  72. raise IllegalMove(move)
  73. self.board.push(move)
  74. self.check_game_over()
  75. def skip(self):
  76. self.board.push(chess.Move.null())
  77. self.move_ts = time.time()
  78. class ChessManager:
  79. def __init__(self, engine_path):
  80. self.engine_path = engine_path
  81. self.engine = None
  82. self.sessions = {}
  83. async def cleanup(self):
  84. for id in list(self.sessions.keys()):
  85. session = self.sessions[id]
  86. if (
  87. time.time() - session.move_ts >= 60 * 60 * 12
  88. or time.time() - session.ts >= 60 * 60 * 24 * 7
  89. ):
  90. await self.end(id)
  91. async def begin(self, id, moves=None, strict=True):
  92. if id in self.sessions:
  93. self.end(id)
  94. if not self.engine:
  95. _, self.engine = await chess.engine.popen_uci(self.engine_path)
  96. self.sessions[id] = ChessSession(self.engine)
  97. if moves is not None:
  98. self.sessions[id].from_moves(moves, strict=strict)
  99. def end(self, id):
  100. session = self.sessions.get(id)
  101. if not session:
  102. return False
  103. del self.sessions[id]
  104. return True
  105. async def move(self, id, move=None):
  106. session = self.sessions.get(id)
  107. if not session:
  108. raise KeyError(id)
  109. await session.move(move)
  110. def undo(self, id):
  111. session = self.sessions.get(id)
  112. if not session:
  113. raise KeyError(id)
  114. session.board.pop()
  115. def is_check(self, id):
  116. session = self.sessions.get(id)
  117. if not session:
  118. raise KeyError(id)
  119. return session.board.is_check()
  120. async def skip(self, id):
  121. session = self.sessions.get(id)
  122. if not session:
  123. raise KeyError(id)
  124. session.skip()
  125. def svg(self, id, size=256, shallow=False):
  126. session = self.sessions.get(id)
  127. if not session:
  128. raise KeyError(id)
  129. return board2svg(session.board, size=size, shallow=shallow)
  130. def ascii(self, id):
  131. session = self.sessions.get(id)
  132. if not session:
  133. raise KeyError(id)
  134. return str(session.board)
  135. def has_moves(self, id):
  136. session = self.sessions.get(id)
  137. if not session:
  138. raise KeyError(id)
  139. return bool(session.board.move_stack)
  140. def moves_count(self, id):
  141. session = self.sessions.get(id)
  142. if not session:
  143. raise KeyError(id)
  144. return len(session.board.move_stack)
  145. def moves(self, id, offset=None):
  146. session = self.sessions.get(id)
  147. if not session:
  148. raise KeyError(id)
  149. if offset is not None:
  150. moves = session.board.move_stack[-offset:]
  151. else:
  152. moves = session.board.move_stack
  153. return " ".join(map(str, moves))
  154. def turn(self, id):
  155. session = self.sessions.get(id)
  156. if not session:
  157. raise KeyError(id)
  158. return session.board.turn
  159. def animate(self, id, count=None, size=256, shallow=False):
  160. session = self.sessions.get(id)
  161. if not session:
  162. raise KeyError(id)
  163. board = chess.Board()
  164. frames = []
  165. frames.append(board2svg(board, size=size, shallow=shallow, orientation=chess.WHITE))
  166. for move in session.board.move_stack:
  167. board.push(move)
  168. frames.append(board2svg(board, size=size, shallow=shallow, orientation=chess.WHITE))
  169. if count is not None and len(frames) >= count:
  170. break
  171. return frames