chess0.py 6.4 KB


  1. import re
  2. import time
  3. import chess
  4. import chess.svg
  5. import chess.engine
  6. class GameOver(Exception):
  7. pass
  8. class IllegalMove(Exception):
  9. pass
  10. def board2svg(board, size=256, shallow=False):
  11. arrows = []
  12. if board.move_stack and board.move_stack[-1] != chess.Move.null():
  13. arrows.append((board.move_stack[-1].from_square, board.move_stack[-1].to_square))
  14. if not shallow and len(board.move_stack) > 1 and board.move_stack[-2] != chess.Move.null():
  15. arrows.append((board.move_stack[-2].from_square, board.move_stack[-2].to_square))
  16. if board.is_check():
  17. return chess.svg.board(board, orientation=board.turn, size=size, arrows=arrows, fill={checker: "#cc0000cc" for checker in board.checkers()}, check=board.king(board.turn))
  18. return chess.svg.board(board, orientation=board.turn, size=size, arrows=arrows)
  19. class ChessSession:
  20. def __init__(self, engine):
  21. self.engine = engine
  22. self.board = chess.Board()
  23. self.ts = time.time()
  24. self.move_ts = time.time()
  25. def parse_move(self, move):
  26. move = move.strip()
  27. try:
  28. move = self.board.parse_san(move)
  29. except (chess.InvalidMoveError, chess.IllegalMoveError, chess.AmbiguousMoveError):
  30. raise IllegalMove(move)
  31. return move
  32. def check_game_over(self):
  33. outcome = self.board.outcome(claim_draw=True)
  34. if outcome is not None:
  35. raise GameOver(f"{outcome.result()} {outcome.termination.name}")
  36. elif self.board.status() & chess.STATUS_NO_WHITE_KING:
  37. raise GameOver(f"0-1 CHECKMATE")
  38. elif self.board.status() & chess.STATUS_NO_BLACK_KING:
  39. raise GameOver(f"1-0 CHECKMATE")
  40. elif not self.board.is_valid():
  41. raise GameOver(f"Некорректное состояние доски.")
  42. async def move(self, move=None):
  43. if move is not None:
  44. move = self.parse_move(move)
  45. else:
  46. move = await self.engine.play(self.board, chess.engine.Limit(nodes=1))
  47. move = move.move
  48. self.board.push(move)
  49. self.move_ts = time.time()
  50. self.check_game_over()
  51. def from_moves(self, moves, strict=True):
  52. self.board.reset()
  53. moves = moves.strip()
  54. if moves.startswith("1."):
  55. text = moves
  56. moves = []
  57. while text and len(moves) < 600:
  58. match = re.match(r"^(?:\d+\. ?([^.\s]+) ([^.\s]+))", text)
  59. if not match:
  60. raise IllegalMove
  61. moves.extend((match.group(1), match.group(2)))
  62. text = text[match.end() - match.start():].strip()
  63. else:
  64. moves = moves.split(" ")
  65. if len(moves) > 600 or (strict and len(moves) % 2 != 0):
  66. raise IllegalMove
  67. for i, move in zip(range(len(moves)), moves):
  68. move = self.parse_move(move)
  69. if strict and move == chess.Move.null() and i % 2 != 0:
  70. raise IllegalMove(move)
  71. self.board.push(move)
  72. self.check_game_over()
  73. def skip(self):
  74. self.board.push(chess.Move.null())
  75. self.move_ts = time.time()
  76. class ChessManager:
  77. def __init__(self, engine_path):
  78. self.engine_path = engine_path
  79. self.engine = None
  80. self.sessions = {}
  81. async def cleanup(self):
  82. for id in list(self.sessions.keys()):
  83. session = self.sessions[id]
  84. if (
  85. time.time() - session.move_ts >= 60 * 60 * 12
  86. or time.time() - session.ts >= 60 * 60 * 24 * 7
  87. ):
  88. await self.end(id)
  89. async def begin(self, id, moves=None, strict=True):
  90. if id in self.sessions:
  91. self.end(id)
  92. if not self.engine:
  93. _, self.engine = await chess.engine.popen_uci(self.engine_path)
  94. self.sessions[id] = ChessSession(self.engine)
  95. if moves is not None:
  96. self.sessions[id].from_moves(moves, strict=strict)
  97. def end(self, id):
  98. session = self.sessions.get(id)
  99. if not session:
  100. return False
  101. del self.sessions[id]
  102. return True
  103. async def move(self, id, move=None):
  104. session = self.sessions.get(id)
  105. if not session:
  106. raise KeyError(id)
  107. await session.move(move)
  108. def undo(self, id):
  109. session = self.sessions.get(id)
  110. if not session:
  111. raise KeyError(id)
  112. session.board.pop()
  113. def is_check(self, id):
  114. session = self.sessions.get(id)
  115. if not session:
  116. raise KeyError(id)
  117. return session.board.is_check()
  118. async def skip(self, id):
  119. session = self.sessions.get(id)
  120. if not session:
  121. raise KeyError(id)
  122. session.skip()
  123. def svg(self, id, size=256, shallow=False):
  124. session = self.sessions.get(id)
  125. if not session:
  126. raise KeyError(id)
  127. return board2svg(session.board, size=size, shallow=shallow)
  128. def ascii(self, id):
  129. session = self.sessions.get(id)
  130. if not session:
  131. raise KeyError(id)
  132. return str(session.board)
  133. def has_moves(self, id):
  134. session = self.sessions.get(id)
  135. if not session:
  136. raise KeyError(id)
  137. return bool(session.board.move_stack)
  138. def moves_count(self, id):
  139. session = self.sessions.get(id)
  140. if not session:
  141. raise KeyError(id)
  142. return len(session.board.move_stack)
  143. def moves(self, id, offset=None):
  144. session = self.sessions.get(id)
  145. if not session:
  146. raise KeyError(id)
  147. if offset is not None:
  148. moves = session.board.move_stack[-offset:]
  149. else:
  150. moves = session.board.move_stack
  151. return " ".join(map(str, moves))
  152. def turn(self, id):
  153. session = self.sessions.get(id)
  154. if not session:
  155. raise KeyError(id)
  156. return session.board.turn
  157. def animate(self, id, count=2, size=256, shallow=False):
  158. session = self.sessions.get(id)
  159. if not session:
  160. raise KeyError(id)
  161. board = chess.Board()
  162. frames = []
  163. frames.append(board2svg(board, size=size, shallow=shallow))
  164. for move in session.board.move_stack:
  165. board.push(move)
  166. frames.append(board2svg(board, size=size, shallow=shallow))
  167. return frames