chess0.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import time
  2. import chess
  3. import chess.svg
  4. import chess.engine
  5. class GameOver(Exception):
  6. pass
  7. class IllegalMove(Exception):
  8. pass
  9. def outcome_to_str(outcome):
  10. return f"{outcome.result()} {outcome.termination.name}"
  11. class ChessSession:
  12. def __init__(self, engine):
  13. self.engine = engine
  14. self.board = chess.Board()
  15. self.ts = time.time()
  16. self.move_ts = time.time()
  17. def check_game_over(self):
  18. if self.board.is_game_over():
  19. raise GameOver(f"{outcome_to_str(self.board.outcome())} {len(self.board.move_stack)}")
  20. async def move(self, move=None):
  21. if move is not None:
  22. try:
  23. move = chess.Move.from_uci(move)
  24. except chess.InvalidMoveError:
  25. raise IllegalMove(move)
  26. if move not in self.board.legal_moves:
  27. raise IllegalMove(move)
  28. else:
  29. move = await self.engine.play(self.board, chess.engine.Limit(nodes=1))
  30. move = move.move
  31. self.board.push(move)
  32. self.move_ts = time.time()
  33. self.check_game_over()
  34. def skip(self):
  35. self.board.push(chess.Move.null())
  36. self.move_ts = time.time()
  37. class ChessManager:
  38. def __init__(self, engine_path):
  39. self.engine_path = engine_path
  40. self.engine = None
  41. self.sessions = {}
  42. async def cleanup(self):
  43. for id in list(self.sessions.keys()):
  44. session = self.sessions[id]
  45. if (
  46. time.time() - session.move_ts >= 60 * 60 * 12
  47. or time.time() - session.ts >= 60 * 60 * 24 * 7
  48. ):
  49. await self.end(id)
  50. async def begin(self, id):
  51. if id in self.sessions:
  52. await self.end(id)
  53. if not self.engine:
  54. _, self.engine = await chess.engine.popen_uci(self.engine_path)
  55. self.sessions[id] = ChessSession(self.engine)
  56. async def end(self, id):
  57. session = self.sessions.get(id)
  58. if not session:
  59. return False
  60. del self.sessions[id]
  61. return True
  62. async def move(self, id, move=None):
  63. session = self.sessions.get(id)
  64. if not session:
  65. raise KeyError(id)
  66. await session.move(move)
  67. def undo(self, id):
  68. session = self.sessions.get(id)
  69. if not session:
  70. raise KeyError(id)
  71. session.board.pop()
  72. def is_check(self, id):
  73. session = self.sessions.get(id)
  74. if not session:
  75. raise KeyError(id)
  76. return session.board.is_check()
  77. async def skip(self, id):
  78. session = self.sessions.get(id)
  79. if not session:
  80. raise KeyError(id)
  81. session.skip()
  82. def svg(self, id, size=256):
  83. session = self.sessions.get(id)
  84. if not session:
  85. raise KeyError(id)
  86. arrows = []
  87. if session.board.move_stack:
  88. arrows.append((session.board.move_stack[-1].from_square, session.board.move_stack[-1].to_square))
  89. if len(session.board.move_stack) > 1:
  90. arrows.append((session.board.move_stack[-2].from_square, session.board.move_stack[-2].to_square))
  91. if session.board.is_check():
  92. return chess.svg.board(session.board, size=size, arrows=arrows, fill={checker: "#cc0000cc" for checker in session.board.checkers()}, check=session.board.king(session.board.turn))
  93. return chess.svg.board(session.board, size=size, arrows=arrows)
  94. def ascii(self, id):
  95. session = self.sessions.get(id)
  96. if not session:
  97. raise KeyError(id)
  98. return str(session.board)
  99. def has_moves(self, id):
  100. session = self.sessions.get(id)
  101. if not session:
  102. raise KeyError(id)
  103. return bool(session.board.move_stack)
  104. def moves_count(self, id):
  105. session = self.sessions.get(id)
  106. if not session:
  107. raise KeyError(id)
  108. return len(session.board.move_stack)
  109. def moves(self, id, offset=None):
  110. session = self.sessions.get(id)
  111. if not session:
  112. raise KeyError(id)
  113. if offset is not None:
  114. moves = session.board.move_stack[-offset:]
  115. else:
  116. moves = session.board.move_stack
  117. return " ".join(map(str, moves))