import time import chess import chess.svg import chess.engine class GameOver(Exception): pass class IllegalMove(Exception): pass def outcome_to_str(outcome): return f"{outcome.result()} {outcome.termination.name}" class ChessSession: def __init__(self, engine): self.engine = engine self.board = chess.Board() self.ts = time.time() self.move_ts = time.time() def check_game_over(self): if self.board.is_game_over(): raise GameOver(outcome_to_str(self.board.outcome())) async def move(self, move=None): if move is not None: try: move = chess.Move.from_uci(move) except chess.InvalidMoveError: raise IllegalMove(move) if move not in self.board.legal_moves: raise IllegalMove(move) else: move = await self.engine.play(self.board, chess.engine.Limit(time=10)) move = move.move self.board.push(move) self.move_ts = time.time() self.check_game_over() def skip(self): self.board.push(chess.Move.null()) self.move_ts = time.time() self.check_game_over() async def end(self): await self.engine.quit() class ChessManager: def __init__(self, engine_path): self.engine_path = engine_path self.sessions = {} async def cleanup(self): for id in list(self.sessions.keys()): session = self.sessions[id] if ( time.time() - session.move_ts >= 60 * 60 * 12 or time.time() - session.ts >= 60 * 60 * 24 * 7 ): await self.end(id) async def begin(self, id): if id in self.sessions: await self.stop(id) _, engine = await chess.engine.popen_uci(self.engine_path) self.sessions[id] = ChessSession(engine) async def end(self, id): session = self.sessions.get(id) if not session: return False del self.sessions[id] await session.end() return True async def move(self, id, move=None): session = self.sessions.get(id) if not session: raise KeyError(id) await session.move(move) def undo(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) session.board.pop() async def skip(self, id): await self.move(id) async def svg(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) return chess.svg.board(session.board) async def ascii(self, id): session = self.sessions.get(id) if not session: raise KeyError(id) return str(session.board)