chess0.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. import re
  2. import time
  3. import chess
  4. import chess.svg
  5. import chess.engine
  6. class GameOver(Exception):
  7. pass
  8. class IllegalMove(Exception):
  9. pass
  10. def board2svg(board, size=256, shallow=False, orientation=None):
  11. if orientation is None:
  12. orientation = board.turn
  13. arrows = []
  14. if board.move_stack and board.move_stack[-1] != chess.Move.null():
  15. arrows.append((board.move_stack[-1].from_square, board.move_stack[-1].to_square))
  16. if not shallow and len(board.move_stack) > 1 and board.move_stack[-2] != chess.Move.null():
  17. arrows.append((board.move_stack[-2].from_square, board.move_stack[-2].to_square))
  18. if board.is_check():
  19. return chess.svg.board(board, orientation=orientation, size=size, arrows=arrows, fill={checker: "#cc0000cc" for checker in board.checkers()}, check=board.king(board.turn))
  20. return chess.svg.board(board, orientation=orientation, size=size, arrows=arrows)
  21. class ChessSession:
  22. def __init__(self, engine):
  23. self.engine = engine
  24. self.board = chess.Board()
  25. self.ts = time.time()
  26. self.move_ts = time.time()
  27. self.game_over = None
  28. self.checkmate = False
  29. self.draw = None
  30. def parse_move(self, move):
  31. move = move.strip()
  32. try:
  33. move = self.board.parse_san(move)
  34. except (chess.InvalidMoveError, chess.IllegalMoveError, chess.AmbiguousMoveError):
  35. raise IllegalMove(move)
  36. return move
  37. def check_game_over(self):
  38. if self.game_over is not None:
  39. raise self.game_over
  40. outcome = self.board.outcome(claim_draw=True)
  41. if outcome is not None:
  42. self.game_over = GameOver(f"{outcome.result()} {outcome.termination.name}")
  43. if outcome.termination == chess.Termination.CHECKMATE:
  44. self.checkmate = True
  45. elif outcome.termination in (chess.Termination.THREEFOLD_REPETITION, chess.Termination.FIFTY_MOVES):
  46. self.draw = self.game_over
  47. self.game_over = None
  48. return
  49. raise self.game_over
  50. elif self.board.status() & chess.STATUS_NO_WHITE_KING:
  51. self.game_over = GameOver(f"0-1 CHECKMATE")
  52. self.checkmate = True
  53. raise self.game_over
  54. elif self.board.status() & chess.STATUS_NO_BLACK_KING:
  55. self.game_over = GameOver(f"1-0 CHECKMATE")
  56. self.checkmate = True
  57. raise self.game_over
  58. elif not self.board.is_valid():
  59. self.game_over = GameOver(f"1/2-1/2 STALEMATE")
  60. raise self.game_over
  61. async def move(self, move=None):
  62. self.check_game_over()
  63. if move is not None:
  64. move = self.parse_move(move)
  65. else:
  66. move = await self.engine.play(self.board, chess.engine.Limit(nodes=1))
  67. move = move.move
  68. self.board.push(move)
  69. self.move_ts = time.time()
  70. self.check_game_over()
  71. def from_moves(self, moves, strict=True, reset=True):
  72. if reset:
  73. self.board.reset()
  74. moves = moves.strip()
  75. if re.match(r"^[0-9]{1,3}$", moves):
  76. try:
  77. moves = int(moves)
  78. except ValueError:
  79. raise IllegalMove(moves)
  80. if moves < 0 or moves > 959:
  81. raise IllegalMove(moves)
  82. self.board.set_chess960_pos(moves)
  83. return
  84. if "/" in moves:
  85. try:
  86. self.board.set_fen(moves)
  87. except ValueError:
  88. raise IllegalMove
  89. if not self.board.is_valid():
  90. raise IllegalMove
  91. self.check_game_over()
  92. return
  93. if moves.startswith("1."):
  94. text = moves
  95. moves = []
  96. while text and len(moves) < 600:
  97. match = re.match(r"^(?:\d+\. ?([^.\s]+) ([^.\s]+))", text)
  98. if not match:
  99. raise IllegalMove
  100. moves.extend((match.group(1), match.group(2)))
  101. text = text[match.end() - match.start():].strip()
  102. else:
  103. moves = moves.split(" ")
  104. if len(moves) > 600 or (strict and len(moves) % 2 != 0):
  105. raise IllegalMove
  106. for i, move in zip(range(len(moves)), moves):
  107. move = self.parse_move(move)
  108. if strict and move == chess.Move.null() and i % 2 != 0:
  109. raise IllegalMove(move)
  110. self.board.push(move)
  111. self.check_game_over()
  112. def skip(self):
  113. self.check_game_over()
  114. self.board.push(chess.Move.null())
  115. self.move_ts = time.time()
  116. class ChessManager:
  117. def __init__(self, engine_path):
  118. self.engine_path = engine_path
  119. self.engine = None
  120. self.sessions = {}
  121. async def cleanup(self):
  122. for id in list(self.sessions.keys()):
  123. session = self.sessions[id]
  124. if (
  125. time.time() - session.move_ts >= 60 * 60 * 12
  126. or time.time() - session.ts >= 60 * 60 * 24 * 7
  127. ):
  128. await self.end(id)
  129. async def begin(self, id, moves=None, strict=True):
  130. if id in self.sessions:
  131. self.end(id)
  132. if not self.engine:
  133. _, self.engine = await chess.engine.popen_uci(self.engine_path)
  134. self.sessions[id] = ChessSession(self.engine)
  135. if moves is not None:
  136. right = None
  137. if "$$" in moves:
  138. parts = moves.split("$$")
  139. if len(parts) == 2:
  140. moves, right = parts
  141. self.sessions[id].from_moves(moves, strict=strict)
  142. if right:
  143. self.sessions[id].from_moves(right, strict=strict, reset=False)
  144. def end(self, id):
  145. session = self.sessions.get(id)
  146. if not session:
  147. return False
  148. del self.sessions[id]
  149. return True
  150. def set(self, id, moves, strict=True, reset=False):
  151. session = self.sessions.get(id)
  152. if not session:
  153. raise KeyError(id)
  154. session.from_moves(moves, strict=strict, reset=reset)
  155. async def move(self, id, move=None):
  156. session = self.sessions.get(id)
  157. if not session:
  158. raise KeyError(id)
  159. await session.move(move)
  160. def clear(self, id):
  161. session = self.sessions.get(id)
  162. if not session:
  163. raise KeyError(id)
  164. session.game_over = None
  165. session.checkmate = False
  166. session.draw = None
  167. def undo(self, id):
  168. session = self.sessions.get(id)
  169. if not session:
  170. raise KeyError(id)
  171. session.board.pop()
  172. session.game_over = None
  173. session.checkmate = False
  174. session.draw = None
  175. session.check_game_over()
  176. def is_check(self, id):
  177. session = self.sessions.get(id)
  178. if not session:
  179. raise KeyError(id)
  180. return session.board.is_check()
  181. def is_checkmate(self, id):
  182. session = self.sessions.get(id)
  183. if not session:
  184. raise KeyError(id)
  185. return session.checkmate
  186. def draw(self, id):
  187. session = self.sessions.get(id)
  188. if not session:
  189. raise KeyError(id)
  190. draw = session.draw
  191. session.draw = None
  192. return draw
  193. async def skip(self, id):
  194. session = self.sessions.get(id)
  195. if not session:
  196. raise KeyError(id)
  197. session.skip()
  198. def svg(self, id, size=256, shallow=False):
  199. session = self.sessions.get(id)
  200. if not session:
  201. raise KeyError(id)
  202. return board2svg(session.board, size=size, shallow=shallow)
  203. def ascii(self, id):
  204. session = self.sessions.get(id)
  205. if not session:
  206. raise KeyError(id)
  207. return str(session.board)
  208. def has_moves(self, id):
  209. session = self.sessions.get(id)
  210. if not session:
  211. raise KeyError(id)
  212. return bool(session.board.move_stack)
  213. def moves_count(self, id):
  214. session = self.sessions.get(id)
  215. if not session:
  216. raise KeyError(id)
  217. return len(session.board.move_stack)
  218. def moves(self, id, offset=None):
  219. session = self.sessions.get(id)
  220. if not session:
  221. raise KeyError(id)
  222. if offset is not None:
  223. moves = session.board.move_stack[-offset:]
  224. else:
  225. moves = session.board.move_stack
  226. return " ".join(map(str, moves))
  227. def turn(self, id):
  228. session = self.sessions.get(id)
  229. if not session:
  230. raise KeyError(id)
  231. return session.board.turn
  232. def game_over(self, id):
  233. session = self.sessions.get(id)
  234. if not session:
  235. raise KeyError(id)
  236. return session.game_over
  237. def animate(self, id, count=None, size=256, shallow=False):
  238. session = self.sessions.get(id)
  239. if not session:
  240. raise KeyError(id)
  241. board = chess.Board()
  242. frames = []
  243. frames.append(board2svg(board, size=size, shallow=shallow, orientation=chess.WHITE))
  244. for move in session.board.move_stack:
  245. board.push(move)
  246. frames.append(board2svg(board, size=size, shallow=shallow, orientation=chess.WHITE))
  247. if count is not None and len(frames) >= count:
  248. break
  249. return frames