123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- import time
- import chess
- import chess.svg
- import chess.engine
- class GameOver(Exception):
- pass
- class IllegalMove(Exception):
- pass
- def outcome_to_str(outcome):
- return f"{outcome.result()}: {outcome.termination.name}"
- class ChessSession:
- def __init__(self, engine):
- self.engine = engine
- self.board = chess.Board()
- self.ts = time.time()
- self.move_ts = time.time()
- def check_game_over(self):
- if self.board.is_game_over():
- raise GameOver(f"{outcome_to_str(self.board.outcome())} [{len(self.board.move_stack)}]")
- elif self.board.status() & chess.STATUS_NO_WHITE_KING:
- raise GameOver(f"0-1 CHECKMATE [{len(self.board.move_stack)}]")
- elif self.board.status() & chess.STATUS_NO_BLACK_KING:
- raise GameOver(f"1-0 CHECKMATE [{len(self.board.move_stack)}]")
- elif not self.board.is_valid():
- raise GameOver(f"Некорректное состояние доски.")
- async def move(self, move=None):
- if move is not None:
- try:
- move = chess.Move.from_uci(move)
- except chess.InvalidMoveError:
- raise IllegalMove(move)
- if move not in self.board.legal_moves:
- raise IllegalMove(move)
- else:
- move = await self.engine.play(self.board, chess.engine.Limit(nodes=1))
- move = move.move
- self.board.push(move)
- self.move_ts = time.time()
- self.check_game_over()
- def skip(self):
- self.board.push(chess.Move.null())
- self.move_ts = time.time()
- class ChessManager:
- def __init__(self, engine_path):
- self.engine_path = engine_path
- self.engine = None
- self.sessions = {}
- async def cleanup(self):
- for id in list(self.sessions.keys()):
- session = self.sessions[id]
- if (
- time.time() - session.move_ts >= 60 * 60 * 12
- or time.time() - session.ts >= 60 * 60 * 24 * 7
- ):
- await self.end(id)
- async def begin(self, id):
- if id in self.sessions:
- await self.end(id)
- if not self.engine:
- _, self.engine = await chess.engine.popen_uci(self.engine_path)
- self.sessions[id] = ChessSession(self.engine)
- async def end(self, id):
- session = self.sessions.get(id)
- if not session:
- return False
- del self.sessions[id]
- return True
- async def move(self, id, move=None):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- await session.move(move)
- def undo(self, id):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- session.board.pop()
- def is_check(self, id):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- return session.board.is_check()
- async def skip(self, id):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- session.skip()
- def svg(self, id, size=256):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- arrows = []
- if session.board.move_stack:
- arrows.append((session.board.move_stack[-1].from_square, session.board.move_stack[-1].to_square))
- if len(session.board.move_stack) > 1 and session.board.move_stack[-2] != chess.Move.null():
- arrows.append((session.board.move_stack[-2].from_square, session.board.move_stack[-2].to_square))
-
- if session.board.is_check():
- return chess.svg.board(session.board, size=size, arrows=arrows, fill={checker: "#cc0000cc" for checker in session.board.checkers()}, check=session.board.king(session.board.turn))
- return chess.svg.board(session.board, size=size, arrows=arrows)
- def ascii(self, id):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- return str(session.board)
- def has_moves(self, id):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- return bool(session.board.move_stack)
- def moves_count(self, id):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- return len(session.board.move_stack)
- def moves(self, id, offset=None):
- session = self.sessions.get(id)
- if not session:
- raise KeyError(id)
- if offset is not None:
- moves = session.board.move_stack[-offset:]
- else:
- moves = session.board.move_stack
- return " ".join(map(str, moves))
|