import time import chess import chess.svg import chess.engine class GameOver(Exception): pass class IllegalMove(Exception): pass def outcome_to_str(outcome): return f"{outcome.result()}: {outcome.termination.name}" class ChessSession: def __init__(self, engine): self.engine = engine self.board = chess.Board() self.ts = time.time() self.move_ts = time.time() def check_game_over(self): if self.board.is_game_over(): raise GameOver(f"{outcome_to_str(self.board.outcome())} [{len(self.board.move_stack)}]") elif self.board.status() & chess.STATUS_NO_WHITE_KING: raise GameOver(f"0-1 CHECKMATE [{len(self.board.move_stack)}]") elif self.board.status() & chess.STATUS_NO_BLACK_KING: raise GameOver(f"1-0 CHECKMATE [{len(self.board.move_stack)}]") elif not self.board.is_valid(): raise GameOver(f"Некорректное состояние доски.") async def move(self, move=None): if move is not None: try: move = chess.Move.from_uci(move) except chess.InvalidMoveError: raise IllegalMove(move) if move not in self.board.legal_moves: raise IllegalMove(move) else: move = await self.engine.play(self.board, chess.engine.Limit(nodes=1)) move = move.move self.board.push(move) self.move_ts = time.time() self.check_game_over() def skip(self): self.board.push(chess.Move.null()) self.move_ts = time.time() class ChessManager: def __init__(self, engine_path): self.engine_path = engine_path self.engine = None self.sessions = {} async def cleanup(self): for id in list(self.sessions.keys()): session = self.sessions[id] if ( time.time() - session.move_ts >= 60 * 60 * 12 or time.time() - session.ts >= 60 * 60 * 24 * 7 ): await self.end(id) async def begin(self, id): if id in self.sessions: await self.end(id) if not self.engine: _, self.engine = await chess.engine.popen_uci(self.engine_path) self.sessions[id] = ChessSession(self.engine) async def end(self, id): session = self.sessions.get(id) if not session: return False del self.sessions[id] return True async def move(self, id, move=None): session = self.sessions.get(id) if not session: raise KeyError(id) await session.move(move) def undo(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) session.board.pop() def is_check(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) return session.board.is_check() async def skip(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) session.skip() def svg(self, id, size=256): session = self.sessions.get(id) if not session: raise KeyError(id) arrows = [] if session.board.move_stack: arrows.append((session.board.move_stack[-1].from_square, session.board.move_stack[-1].to_square)) if len(session.board.move_stack) > 1 and session.board.move_stack[-2] != chess.Move.null(): arrows.append((session.board.move_stack[-2].from_square, session.board.move_stack[-2].to_square)) if session.board.is_check(): return chess.svg.board(session.board, size=size, arrows=arrows, fill={checker: "#cc0000cc" for checker in session.board.checkers()}, check=session.board.king(session.board.turn)) return chess.svg.board(session.board, size=size, arrows=arrows) def ascii(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) return str(session.board) def has_moves(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) return bool(session.board.move_stack) def moves_count(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) return len(session.board.move_stack) def moves(self, id, offset=None): session = self.sessions.get(id) if not session: raise KeyError(id) if offset is not None: moves = session.board.move_stack[-offset:] else: moves = session.board.move_stack return " ".join(map(str, moves))