import re import time import chess import chess.svg import chess.engine class GameOver(Exception): pass class IllegalMove(Exception): pass def board2svg(board, size=256, shallow=False, orientation=None): if orientation is None: orientation = board.turn arrows = [] if board.move_stack and board.move_stack[-1] != chess.Move.null(): arrows.append((board.move_stack[-1].from_square, board.move_stack[-1].to_square)) if not shallow and len(board.move_stack) > 1 and board.move_stack[-2] != chess.Move.null(): arrows.append((board.move_stack[-2].from_square, board.move_stack[-2].to_square)) if board.is_check(): return chess.svg.board(board, orientation=orientation, size=size, arrows=arrows, fill={checker: "#cc0000cc" for checker in board.checkers()}, check=board.king(board.turn)) return chess.svg.board(board, orientation=orientation, size=size, arrows=arrows) class ChessSession: def __init__(self, engine): self.engine = engine self.board = chess.Board() self.ts = time.time() self.move_ts = time.time() def parse_move(self, move): move = move.strip() try: move = self.board.parse_san(move) except (chess.InvalidMoveError, chess.IllegalMoveError, chess.AmbiguousMoveError): raise IllegalMove(move) return move def check_game_over(self): outcome = self.board.outcome(claim_draw=True) if outcome is not None: raise GameOver(f"{outcome.result()} {outcome.termination.name}") elif self.board.status() & chess.STATUS_NO_WHITE_KING: raise GameOver(f"0-1 CHECKMATE") elif self.board.status() & chess.STATUS_NO_BLACK_KING: raise GameOver(f"1-0 CHECKMATE") elif not self.board.is_valid(): raise GameOver(f"Некорректное состояние доски.") async def move(self, move=None): if move is not None: move = self.parse_move(move) else: move = await self.engine.play(self.board, chess.engine.Limit(nodes=1)) move = move.move self.board.push(move) self.move_ts = time.time() self.check_game_over() def from_moves(self, moves, strict=True): self.board.reset() moves = moves.strip() if moves.startswith("1."): text = moves moves = [] while text and len(moves) < 600: match = re.match(r"^(?:\d+\. ?([^.\s]+) ([^.\s]+))", text) if not match: raise IllegalMove moves.extend((match.group(1), match.group(2))) text = text[match.end() - match.start():].strip() else: moves = moves.split(" ") if len(moves) > 600 or (strict and len(moves) % 2 != 0): raise IllegalMove for i, move in zip(range(len(moves)), moves): move = self.parse_move(move) if strict and move == chess.Move.null() and i % 2 != 0: raise IllegalMove(move) self.board.push(move) self.check_game_over() def skip(self): self.board.push(chess.Move.null()) self.move_ts = time.time() class ChessManager: def __init__(self, engine_path): self.engine_path = engine_path self.engine = None self.sessions = {} async def cleanup(self): for id in list(self.sessions.keys()): session = self.sessions[id] if ( time.time() - session.move_ts >= 60 * 60 * 12 or time.time() - session.ts >= 60 * 60 * 24 * 7 ): await self.end(id) async def begin(self, id, moves=None, strict=True): if id in self.sessions: self.end(id) if not self.engine: _, self.engine = await chess.engine.popen_uci(self.engine_path) self.sessions[id] = ChessSession(self.engine) if moves is not None: self.sessions[id].from_moves(moves, strict=strict) def end(self, id): session = self.sessions.get(id) if not session: return False del self.sessions[id] return True async def move(self, id, move=None): session = self.sessions.get(id) if not session: raise KeyError(id) await session.move(move) def undo(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) session.board.pop() def is_check(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) return session.board.is_check() async def skip(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) session.skip() def svg(self, id, size=256, shallow=False): session = self.sessions.get(id) if not session: raise KeyError(id) return board2svg(session.board, size=size, shallow=shallow) def ascii(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) return str(session.board) def has_moves(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) return bool(session.board.move_stack) def moves_count(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) return len(session.board.move_stack) def moves(self, id, offset=None): session = self.sessions.get(id) if not session: raise KeyError(id) if offset is not None: moves = session.board.move_stack[-offset:] else: moves = session.board.move_stack return " ".join(map(str, moves)) def turn(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) return session.board.turn def animate(self, id, count=None, size=256, shallow=False): session = self.sessions.get(id) if not session: raise KeyError(id) board = chess.Board() frames = [] frames.append(board2svg(board, size=size, shallow=shallow, orientation=chess.WHITE)) for move in session.board.move_stack: board.push(move) frames.append(board2svg(board, size=size, shallow=shallow, orientation=chess.WHITE)) if count is not None and len(frames) >= count: break return frames