chess0.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. import re
  2. import time
  3. import chess
  4. import chess.svg
  5. import chess.engine
  6. class GameOver(Exception):
  7. pass
  8. class IllegalMove(Exception):
  9. pass
  10. class MyBoard(chess.Board):
  11. def __init__(self):
  12. chess.Board.__init__(self)
  13. self.captured_black = []
  14. self.captured_white = []
  15. def _push_capture(self, move, capture_square, piece_type, was_promoted):
  16. if self.turn == chess.WHITE:
  17. self.captured_black.push(piece_type)
  18. else:
  19. self.captured_white.push(piece_type)
  20. class ChessSession:
  21. def __init__(self, engine):
  22. self.engine = engine
  23. self.board = MyBoard()
  24. self.ts = time.time()
  25. self.move_ts = time.time()
  26. def parse_move(self, move):
  27. move = move.strip()
  28. try:
  29. if move.startswith(":"):
  30. move = self.board.parse_san(move[1:])
  31. else:
  32. move = self.board.parse_uci(move)
  33. except (chess.InvalidMoveError, chess.IllegalMoveError):
  34. raise IllegalMove(move)
  35. if move != chess.Move.null() and move not in self.board.legal_moves:
  36. raise IllegalMove(move)
  37. return move
  38. def check_game_over(self):
  39. outcome = self.board.outcome(claim_draw=True)
  40. if outcome is not None:
  41. raise GameOver(f"{outcome.result()} {outcome.termination.name}")
  42. elif self.board.status() & chess.STATUS_NO_WHITE_KING:
  43. raise GameOver(f"0-1 CHECKMATE")
  44. elif self.board.status() & chess.STATUS_NO_BLACK_KING:
  45. raise GameOver(f"1-0 CHECKMATE")
  46. elif not self.board.is_valid():
  47. raise GameOver(f"Некорректное состояние доски.")
  48. async def move(self, move=None):
  49. if move is not None:
  50. move = self.parse_move(move)
  51. else:
  52. move = await self.engine.play(self.board, chess.engine.Limit(nodes=1))
  53. move = move.move
  54. self.board.push(move)
  55. self.move_ts = time.time()
  56. self.check_game_over()
  57. def from_moves(self, moves, san=False):
  58. self.board.reset()
  59. moves = moves.strip().split(" ")
  60. if len(moves) > 2048 or len(moves) % 2 != 0:
  61. raise IllegalMove
  62. for i, move in zip(range(len(moves)), moves):
  63. move = self.parse_move(move)
  64. if move == chess.Move.null() and i % 2 != 0:
  65. raise IllegalMove(move)
  66. self.board.push(move)
  67. self.check_game_over()
  68. def skip(self):
  69. self.board.push(chess.Move.null())
  70. self.move_ts = time.time()
  71. class ChessManager:
  72. def __init__(self, engine_path):
  73. self.engine_path = engine_path
  74. self.engine = None
  75. self.sessions = {}
  76. async def cleanup(self):
  77. for id in list(self.sessions.keys()):
  78. session = self.sessions[id]
  79. if (
  80. time.time() - session.move_ts >= 60 * 60 * 12
  81. or time.time() - session.ts >= 60 * 60 * 24 * 7
  82. ):
  83. await self.end(id)
  84. async def begin(self, id, moves=None):
  85. if id in self.sessions:
  86. await self.end(id)
  87. if not self.engine:
  88. _, self.engine = await chess.engine.popen_uci(self.engine_path)
  89. self.sessions[id] = ChessSession(self.engine)
  90. if moves is not None:
  91. self.sessions[id].from_moves(moves)
  92. async def end(self, id):
  93. session = self.sessions.get(id)
  94. if not session:
  95. return False
  96. del self.sessions[id]
  97. return True
  98. async def move(self, id, move=None):
  99. session = self.sessions.get(id)
  100. if not session:
  101. raise KeyError(id)
  102. await session.move(move)
  103. def undo(self, id):
  104. session = self.sessions.get(id)
  105. if not session:
  106. raise KeyError(id)
  107. session.board.pop()
  108. def is_check(self, id):
  109. session = self.sessions.get(id)
  110. if not session:
  111. raise KeyError(id)
  112. return session.board.is_check()
  113. async def skip(self, id):
  114. session = self.sessions.get(id)
  115. if not session:
  116. raise KeyError(id)
  117. session.skip()
  118. def svg(self, id, size=256):
  119. session = self.sessions.get(id)
  120. if not session:
  121. raise KeyError(id)
  122. arrows = []
  123. if session.board.move_stack:
  124. arrows.append((session.board.move_stack[-1].from_square, session.board.move_stack[-1].to_square))
  125. if len(session.board.move_stack) > 1 and session.board.move_stack[-2] != chess.Move.null():
  126. arrows.append((session.board.move_stack[-2].from_square, session.board.move_stack[-2].to_square))
  127. move = None
  128. if session.board.attackers_mask(chess.WHITE, session.board.king(chess.BLACK)) != 0:
  129. move = session.board.king(chess.BLACK)
  130. if session.board.is_check():
  131. return chess.svg.board(session.board, size=size, lastmove=move, arrows=arrows, fill={checker: "#cc0000cc" for checker in session.board.checkers()}, check=session.board.king(session.board.turn))
  132. return chess.svg.board(session.board, size=size, lastmove=move, arrows=arrows)
  133. def ascii(self, id):
  134. session = self.sessions.get(id)
  135. if not session:
  136. raise KeyError(id)
  137. return str(session.board)
  138. def has_moves(self, id):
  139. session = self.sessions.get(id)
  140. if not session:
  141. raise KeyError(id)
  142. return bool(session.board.move_stack)
  143. def moves_count(self, id):
  144. session = self.sessions.get(id)
  145. if not session:
  146. raise KeyError(id)
  147. return len(session.board.move_stack)
  148. def moves(self, id, offset=None):
  149. session = self.sessions.get(id)
  150. if not session:
  151. raise KeyError(id)
  152. if offset is not None:
  153. moves = session.board.move_stack[-offset:]
  154. else:
  155. moves = session.board.move_stack
  156. return " ".join(map(str, moves))
  157. def captured(self, id):
  158. session = self.sessions.get(id)
  159. if not session:
  160. raise KeyError(id)
  161. return (session.board.captured_black, session.board.captured_white)