123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- import re
- import time
- import chess
- import chess.svg
- import chess.engine
- class GameOver(Exception):
- pass
- class IllegalMove(Exception):
- pass
- def board2svg(board, size=256, shallow=False, orientation=None):
- if orientation is None:
- orientation = board.turn
- arrows = []
- if board.move_stack and board.move_stack[-1] != chess.Move.null():
- arrows.append((board.move_stack[-1].from_square, board.move_stack[-1].to_square))
- if not shallow and len(board.move_stack) > 1 and board.move_stack[-2] != chess.Move.null():
- arrows.append((board.move_stack[-2].from_square, board.move_stack[-2].to_square))
-
- if board.is_check():
- return chess.svg.board(board, orientation=orientation, size=size, arrows=arrows, fill={checker: "#cc0000cc" for checker in board.checkers()}, check=board.king(board.turn))
- return chess.svg.board(board, orientation=orientation, size=size, arrows=arrows)
- class ChessSession:
- def __init__(self, engine):
- self.engine = engine
- self.board = chess.Board()
- self.ts = time.time()
- self.move_ts = time.time()
- self.game_over = None
- self.checkmate = False
- def parse_move(self, move):
- move = move.strip()
- try:
- move = self.board.parse_san(move)
- except (chess.InvalidMoveError, chess.IllegalMoveError, chess.AmbiguousMoveError):
- raise IllegalMove(move)
- return move
-
- def check_game_over(self):
- if self.game_over is not None:
- raise self.game_over
- outcome = self.board.outcome(claim_draw=True)
- if outcome is not None:
- self.game_over = GameOver(f"{outcome.result()} {outcome.termination.name}")
- if outcome.termination == chess.Termination.CHECKMATE:
- self.checkmate = True
- raise self.game_over
- elif self.board.status() & chess.STATUS_NO_WHITE_KING:
- self.game_over = GameOver(f"0-1 CHECKMATE")
- self.checkmate = True
- raise self.game_over
- elif self.board.status() & chess.STATUS_NO_BLACK_KING:
- self.game_over = GameOver(f"1-0 CHECKMATE")
- self.checkmate = True
- raise self.game_over
- elif not self.board.is_valid():
- self.game_over = GameOver(f"1/2-1/2 STALEMATE")
- raise self.game_over
- async def move(self, move=None):
- self.check_game_over()
- if move is not None:
- move = self.parse_move(move)
- else:
- move = await self.engine.play(self.board, chess.engine.Limit(nodes=1))
- move = move.move
- self.board.push(move)
- self.move_ts = time.time()
- self.check_game_over()
- def from_moves(self, moves, strict=True):
- self.board.reset()
- moves = moves.strip()
- if moves.startswith("1."):
- text = moves
- moves = []
- while text and len(moves) < 600:
- match = re.match(r"^(?:\d+\. ?([^.\s]+) ([^.\s]+))", text)
- if not match:
- raise IllegalMove
- moves.extend((match.group(1), match.group(2)))
- text = text[match.end() - match.start():].strip()
- else:
- moves = moves.split(" ")
- if len(moves) > 600 or (strict and len(moves) % 2 != 0):
- raise IllegalMove
- for i, move in zip(range(len(moves)), moves):
- move = self.parse_move(move)
-
- if strict and move == chess.Move.null() and i % 2 != 0:
- raise IllegalMove(move)
- self.board.push(move)
- self.check_game_over()
- def skip(self):
- self.check_game_over()
- self.board.push(chess.Move.null())
- self.move_ts = time.time()
- class ChessManager:
- def __init__(self, engine_path):
- self.engine_path = engine_path
- self.engine = None
- self.sessions = {}
- async def cleanup(self):
- for id in list(self.sessions.keys()):
- session = self.sessions[id]
- if (
- time.time() - session.move_ts >= 60 * 60 * 12
- or time.time() - session.ts >= 60 * 60 * 24 * 7
- ):
- await self.end(id)
- async def begin(self, id, moves=None, strict=True):
- if id in self.sessions:
- self.end(id)
- if not self.engine:
- _, self.engine = await chess.engine.popen_uci(self.engine_path)
- self.sessions[id] = ChessSession(self.engine)
- if moves is not None:
- self.sessions[id].from_moves(moves, strict=strict)
- def end(self, id):
- session = self.sessions.get(id)
- if not session:
- return False
- del self.sessions[id]
- return True
- async def move(self, id, move=None):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- await session.move(move)
- def undo(self, id):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- session.board.pop()
- session.game_over = None
- session.checkmate = False
- session.check_game_over()
- def is_check(self, id):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- return session.board.is_check()
- def is_checkmate(self, id):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- return session.checkmate
- async def skip(self, id):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- session.skip()
- def svg(self, id, size=256, shallow=False):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- return board2svg(session.board, size=size, shallow=shallow)
- def ascii(self, id):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- return str(session.board)
- def has_moves(self, id):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- return bool(session.board.move_stack)
- def moves_count(self, id):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- return len(session.board.move_stack)
- def moves(self, id, offset=None):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- if offset is not None:
- moves = session.board.move_stack[-offset:]
- else:
- moves = session.board.move_stack
- return " ".join(map(str, moves))
- def turn(self, id):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- return session.board.turn
- def game_over(self, id):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- return session.game_over
- def animate(self, id, count=None, size=256, shallow=False):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- board = chess.Board()
- frames = []
- frames.append(board2svg(board, size=size, shallow=shallow, orientation=chess.WHITE))
- for move in session.board.move_stack:
- board.push(move)
- frames.append(board2svg(board, size=size, shallow=shallow, orientation=chess.WHITE))
- if count is not None and len(frames) >= count:
- break
- return frames
|