chess0.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. import time
  2. import chess
  3. import chess.svg
  4. import chess.engine
  5. class GameOver(Exception):
  6. pass
  7. class IllegalMove(Exception):
  8. pass
  9. def outcome_to_str(outcome):
  10. return f"{outcome.result()} {outcome.termination.name}"
  11. class ChessSession:
  12. def __init__(self, engine):
  13. self.engine = engine
  14. self.board = chess.Board()
  15. self.ts = time.time()
  16. self.move_ts = time.time()
  17. def check_game_over(self):
  18. if self.board.is_game_over():
  19. raise GameOver(outcome_to_str(self.board.outcome()))
  20. async def move(self, move=None):
  21. if move is not None:
  22. try:
  23. move = chess.Move.from_uci(move)
  24. except chess.InvalidMoveError:
  25. raise IllegalMove(move)
  26. if move not in self.board.legal_moves:
  27. raise IllegalMove(move)
  28. else:
  29. move = await self.engine.play(self.board, chess.engine.Limit(time=10))
  30. move = move.move
  31. self.board.push(move)
  32. self.move_ts = time.time()
  33. self.check_game_over()
  34. def skip(self):
  35. self.board.push(chess.Move.null())
  36. self.move_ts = time.time()
  37. self.check_game_over()
  38. async def end(self):
  39. await self.engine.quit()
  40. class ChessManager:
  41. def __init__(self, engine_path):
  42. self.engine_path = engine_path
  43. self.sessions = {}
  44. async def cleanup(self):
  45. for id in list(self.sessions.keys()):
  46. session = self.sessions[id]
  47. if (
  48. time.time() - session.move_ts >= 60 * 60 * 12
  49. or time.time() - session.ts >= 60 * 60 * 24 * 7
  50. ):
  51. await self.end(id)
  52. def get(self, id):
  53. return self.sessions.get(id)
  54. async def begin(self, id):
  55. if id in self.sessions:
  56. await self.stop(id)
  57. _, engine = await chess.engine.popen_uci(self.engine_path)
  58. self.sessions[id] = ChessSession(engine)
  59. async def end(self, id):
  60. session = self.sessions.get(id)
  61. if not session:
  62. return False
  63. del self.sessions[id]
  64. await session.end()
  65. return True
  66. async def move(self, id, move=None):
  67. session = self.get(id)
  68. if not session:
  69. raise KeyError(id)
  70. await session.move(move)
  71. def undo(self, id):
  72. session = self.get(id)
  73. if not session:
  74. raise KeyError(id)
  75. session.board.pop()
  76. async def skip(self, id):
  77. await self.move(id)
  78. async def svg(self, id):
  79. session = self.get(id)
  80. if not session:
  81. raise KeyError(id)
  82. return chess.svg.board(session.board)
  83. async def ascii(self, id):
  84. session = self.get(id)
  85. if not session:
  86. raise KeyError(id)
  87. return str(session.board)