import time import chess import chess.svg import chess.engine class GameOver(Exception): pass class IllegalMove(Exception): pass class ChessSession: def __init__(self, engine): self.engine = engine self.board = chess.Board() self.ts = time.time() self.move_ts = time.time() def check_game_over(self): if self.board.is_game_over(): outcome = self.board.outcome() raise GameOver(f"{outcome.result()} {outcome.termination.name}") elif self.board.status() & chess.STATUS_NO_WHITE_KING: raise GameOver(f"0-1 CHECKMATE") elif self.board.status() & chess.STATUS_NO_BLACK_KING: raise GameOver(f"1-0 CHECKMATE") elif not self.board.is_valid(): raise GameOver(f"Некорректное состояние доски.") async def move(self, move=None): if move is not None: try: move = chess.Move.from_uci(move) except chess.InvalidMoveError: raise IllegalMove(move) if move not in self.board.legal_moves: raise IllegalMove(move) else: move = await self.engine.play(self.board, chess.engine.Limit(nodes=1)) move = move.move self.board.push(move) self.move_ts = time.time() self.check_game_over() def from_moves(self, moves): self.board.reset() moves = moves.strip().split(" ") if len(moves) > 2048 or len(moves) % 2 != 0: raise IllegalMove for i, move in zip(range(len(moves)), moves): try: move = chess.Move.from_uci(move) except chess.InvalidMoveError: raise IllegalMove(move) if move not in self.board.legal_moves and not (move == chess.Move.null() and i % 2 == 0): raise IllegalMove(move) self.board.push(move) self.check_game_over() def skip(self): self.board.push(chess.Move.null()) self.move_ts = time.time() class ChessManager: def __init__(self, engine_path): self.engine_path = engine_path self.engine = None self.sessions = {} async def cleanup(self): for id in list(self.sessions.keys()): session = self.sessions[id] if ( time.time() - session.move_ts >= 60 * 60 * 12 or time.time() - session.ts >= 60 * 60 * 24 * 7 ): await self.end(id) async def begin(self, id, moves=None): if id in self.sessions: await self.end(id) if not self.engine: _, self.engine = await chess.engine.popen_uci(self.engine_path) self.sessions[id] = ChessSession(self.engine) if moves is not None: self.sessions[id].from_moves(moves) async def end(self, id): session = self.sessions.get(id) if not session: return False del self.sessions[id] return True async def move(self, id, move=None): session = self.sessions.get(id) if not session: raise KeyError(id) await session.move(move) def undo(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) session.board.pop() def is_check(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) return session.board.is_check() async def skip(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) session.skip() def svg(self, id, size=256): session = self.sessions.get(id) if not session: raise KeyError(id) arrows = [] if session.board.move_stack: arrows.append((session.board.move_stack[-1].from_square, session.board.move_stack[-1].to_square)) if len(session.board.move_stack) > 1 and session.board.move_stack[-2] != chess.Move.null(): arrows.append((session.board.move_stack[-2].from_square, session.board.move_stack[-2].to_square)) move = None if session.board.attackers_mask(chess.WHITE, session.board.king(chess.BLACK)) != 0: move = session.board.king(chess.BLACK) if session.board.is_check(): return chess.svg.board(session.board, size=size, lastmove=move, arrows=arrows, fill={checker: "#cc0000cc" for checker in session.board.checkers()}, check=session.board.king(session.board.turn)) return chess.svg.board(session.board, size=size, lastmove=move, arrows=arrows) def ascii(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) return str(session.board) def has_moves(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) return bool(session.board.move_stack) def moves_count(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) return len(session.board.move_stack) def moves(self, id, offset=None): session = self.sessions.get(id) if not session: raise KeyError(id) if offset is not None: moves = session.board.move_stack[-offset:] else: moves = session.board.move_stack return " ".join(map(str, moves))