chess0.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import time
  2. import chess
  3. import chess.svg
  4. import chess.engine
  5. class GameOver(Exception):
  6. pass
  7. class IllegalMove(Exception):
  8. pass
  9. def outcome_to_str(outcome):
  10. return f"{outcome.result()}: {outcome.termination.name}"
  11. class ChessSession:
  12. def __init__(self, engine):
  13. self.engine = engine
  14. self.board = chess.Board()
  15. self.ts = time.time()
  16. self.move_ts = time.time()
  17. def check_game_over(self):
  18. if self.board.is_game_over():
  19. raise GameOver(f"{outcome_to_str(self.board.outcome())} [{len(self.board.move_stack)}]")
  20. elif self.board.status() & chess.STATUS_NO_WHITE_KING:
  21. raise GameOver(f"0-1 CHECKMATE [{len(self.board.move_stack)}]")
  22. elif self.board.status() & chess.STATUS_NO_BLACK_KING:
  23. raise GameOver(f"1-0 CHECKMATE [{len(self.board.move_stack)}]")
  24. elif not self.board.is_valid():
  25. raise GameOver(f"Некорректное состояние доски.")
  26. async def move(self, move=None):
  27. if move is not None:
  28. try:
  29. move = chess.Move.from_uci(move)
  30. except chess.InvalidMoveError:
  31. raise IllegalMove(move)
  32. if move not in self.board.legal_moves:
  33. raise IllegalMove(move)
  34. else:
  35. move = await self.engine.play(self.board, chess.engine.Limit(nodes=1))
  36. move = move.move
  37. self.board.push(move)
  38. self.move_ts = time.time()
  39. self.check_game_over()
  40. def skip(self):
  41. self.board.push(chess.Move.null())
  42. self.move_ts = time.time()
  43. class ChessManager:
  44. def __init__(self, engine_path):
  45. self.engine_path = engine_path
  46. self.engine = None
  47. self.sessions = {}
  48. async def cleanup(self):
  49. for id in list(self.sessions.keys()):
  50. session = self.sessions[id]
  51. if (
  52. time.time() - session.move_ts >= 60 * 60 * 12
  53. or time.time() - session.ts >= 60 * 60 * 24 * 7
  54. ):
  55. await self.end(id)
  56. async def begin(self, id):
  57. if id in self.sessions:
  58. await self.end(id)
  59. if not self.engine:
  60. _, self.engine = await chess.engine.popen_uci(self.engine_path)
  61. self.sessions[id] = ChessSession(self.engine)
  62. async def end(self, id):
  63. session = self.sessions.get(id)
  64. if not session:
  65. return False
  66. del self.sessions[id]
  67. return True
  68. async def move(self, id, move=None):
  69. session = self.sessions.get(id)
  70. if not session:
  71. raise KeyError(id)
  72. await session.move(move)
  73. def undo(self, id):
  74. session = self.sessions.get(id)
  75. if not session:
  76. raise KeyError(id)
  77. session.board.pop()
  78. def is_check(self, id):
  79. session = self.sessions.get(id)
  80. if not session:
  81. raise KeyError(id)
  82. return session.board.is_check()
  83. async def skip(self, id):
  84. session = self.sessions.get(id)
  85. if not session:
  86. raise KeyError(id)
  87. session.skip()
  88. def svg(self, id, size=256):
  89. session = self.sessions.get(id)
  90. if not session:
  91. raise KeyError(id)
  92. arrows = []
  93. if session.board.move_stack:
  94. arrows.append((session.board.move_stack[-1].from_square, session.board.move_stack[-1].to_square))
  95. if len(session.board.move_stack) > 1 and session.board.move_stack[-2] != chess.Move.null():
  96. arrows.append((session.board.move_stack[-2].from_square, session.board.move_stack[-2].to_square))
  97. if session.board.is_check():
  98. return chess.svg.board(session.board, size=size, arrows=arrows, fill={checker: "#cc0000cc" for checker in session.board.checkers()}, check=session.board.king(session.board.turn))
  99. return chess.svg.board(session.board, size=size, arrows=arrows)
  100. def ascii(self, id):
  101. session = self.sessions.get(id)
  102. if not session:
  103. raise KeyError(id)
  104. return str(session.board)
  105. def has_moves(self, id):
  106. session = self.sessions.get(id)
  107. if not session:
  108. raise KeyError(id)
  109. return bool(session.board.move_stack)
  110. def moves_count(self, id):
  111. session = self.sessions.get(id)
  112. if not session:
  113. raise KeyError(id)
  114. return len(session.board.move_stack)
  115. def moves(self, id, offset=None):
  116. session = self.sessions.get(id)
  117. if not session:
  118. raise KeyError(id)
  119. if offset is not None:
  120. moves = session.board.move_stack[-offset:]
  121. else:
  122. moves = session.board.move_stack
  123. return " ".join(map(str, moves))