chess0.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. import re
  2. import time
  3. import random
  4. import chess
  5. import chess.svg
  6. import chess.engine
  7. class GameOver(Exception):
  8. pass
  9. class IllegalMove(Exception):
  10. pass
  11. def board2svg(board, size=256, shallow=False, orientation=None):
  12. if orientation is None:
  13. orientation = board.turn
  14. arrows = []
  15. if board.move_stack and board.move_stack[-1] != chess.Move.null():
  16. arrows.append((board.move_stack[-1].from_square, board.move_stack[-1].to_square))
  17. if not shallow and len(board.move_stack) > 1 and board.move_stack[-2] != chess.Move.null():
  18. arrows.append((board.move_stack[-2].from_square, board.move_stack[-2].to_square))
  19. if board.is_check():
  20. return chess.svg.board(board, orientation=orientation, size=size, arrows=arrows, fill={checker: "#cc0000cc" for checker in board.checkers()}, check=board.king(board.turn))
  21. return chess.svg.board(board, orientation=orientation, size=size, arrows=arrows)
  22. class ChessSession:
  23. def __init__(self, engine):
  24. self.engine = engine
  25. self.board = chess.Board()
  26. self.ts = time.time()
  27. self.move_ts = time.time()
  28. self.game_over = None
  29. self.checkmate = False
  30. self.draw = None
  31. def parse_move(self, move):
  32. move = move.strip()
  33. try:
  34. move = self.board.parse_san(move)
  35. except (chess.InvalidMoveError, chess.IllegalMoveError, chess.AmbiguousMoveError):
  36. raise IllegalMove(move)
  37. return move
  38. def check_game_over(self):
  39. if self.game_over is not None:
  40. raise self.game_over
  41. outcome = self.board.outcome(claim_draw=True)
  42. if outcome is not None:
  43. self.game_over = GameOver(f"{outcome.result()} {outcome.termination.name}")
  44. if outcome.termination == chess.Termination.CHECKMATE:
  45. self.checkmate = True
  46. elif outcome.termination in (chess.Termination.THREEFOLD_REPETITION, chess.Termination.FIFTY_MOVES):
  47. self.draw = self.game_over
  48. self.game_over = None
  49. return
  50. raise self.game_over
  51. elif self.board.status() & chess.STATUS_NO_WHITE_KING:
  52. self.game_over = GameOver(f"0-1 CHECKMATE")
  53. self.checkmate = True
  54. raise self.game_over
  55. elif self.board.status() & chess.STATUS_NO_BLACK_KING:
  56. self.game_over = GameOver(f"1-0 CHECKMATE")
  57. self.checkmate = True
  58. raise self.game_over
  59. elif not self.board.is_valid():
  60. self.game_over = GameOver(f"1/2-1/2 STALEMATE")
  61. raise self.game_over
  62. async def move(self, move=None):
  63. self.check_game_over()
  64. if move is not None:
  65. move = self.parse_move(move)
  66. else:
  67. move = await self.engine.play(self.board, chess.engine.Limit(nodes=1))
  68. move = move.move
  69. self.board.push(move)
  70. self.move_ts = time.time()
  71. self.check_game_over()
  72. def from_moves(self, moves, strict=True, reset=True):
  73. if reset:
  74. self.board.reset()
  75. moves = moves.strip()
  76. if re.match(r"^[0-9]{1,3}$", moves):
  77. if moves == "960":
  78. moves = random.randint(0, 959)
  79. else:
  80. try:
  81. moves = int(moves)
  82. except ValueError:
  83. raise IllegalMove(moves)
  84. if moves < 0 or moves > 959:
  85. raise IllegalMove(moves)
  86. self.board.set_chess960_pos(moves)
  87. return
  88. if "/" in moves:
  89. try:
  90. self.board.set_fen(moves)
  91. except ValueError:
  92. raise IllegalMove
  93. if not self.board.is_valid():
  94. raise IllegalMove
  95. self.check_game_over()
  96. return
  97. if moves.startswith("1."):
  98. text = moves
  99. moves = []
  100. while text and len(moves) < 600:
  101. match = re.match(r"^(?:\d+\. ?([^.\s]+) ([^.\s]+))", text)
  102. if not match:
  103. raise IllegalMove
  104. moves.extend((match.group(1), match.group(2)))
  105. text = text[match.end() - match.start():].strip()
  106. else:
  107. moves = moves.split(" ")
  108. if len(moves) > 600 or (strict and len(moves) % 2 != 0):
  109. raise IllegalMove
  110. for i, move in zip(range(len(moves)), moves):
  111. move = self.parse_move(move)
  112. if strict and move == chess.Move.null() and i % 2 != 0:
  113. raise IllegalMove(move)
  114. self.board.push(move)
  115. self.check_game_over()
  116. def skip(self):
  117. self.check_game_over()
  118. self.board.push(chess.Move.null())
  119. self.move_ts = time.time()
  120. class ChessManager:
  121. def __init__(self, engine_path):
  122. self.engine_path = engine_path
  123. self.engine = None
  124. self.sessions = {}
  125. async def cleanup(self):
  126. for id in list(self.sessions.keys()):
  127. session = self.sessions[id]
  128. if (
  129. time.time() - session.move_ts >= 60 * 60 * 12
  130. or time.time() - session.ts >= 60 * 60 * 24 * 7
  131. ):
  132. await self.end(id)
  133. async def begin(self, id, moves=None, strict=True):
  134. if id in self.sessions:
  135. self.end(id)
  136. if not self.engine:
  137. _, self.engine = await chess.engine.popen_uci(self.engine_path)
  138. self.sessions[id] = ChessSession(self.engine)
  139. if moves is not None:
  140. right = None
  141. if "$$" in moves:
  142. parts = moves.split("$$")
  143. if len(parts) == 2:
  144. moves, right = parts
  145. self.sessions[id].from_moves(moves, strict=strict)
  146. if right:
  147. self.sessions[id].from_moves(right, strict=strict, reset=False)
  148. def end(self, id):
  149. session = self.sessions.get(id)
  150. if not session:
  151. return False
  152. del self.sessions[id]
  153. return True
  154. def set(self, id, moves, strict=True, reset=False):
  155. session = self.sessions.get(id)
  156. if not session:
  157. raise KeyError(id)
  158. session.from_moves(moves, strict=strict, reset=reset)
  159. async def move(self, id, move=None):
  160. session = self.sessions.get(id)
  161. if not session:
  162. raise KeyError(id)
  163. await session.move(move)
  164. def clear(self, id):
  165. session = self.sessions.get(id)
  166. if not session:
  167. raise KeyError(id)
  168. session.game_over = None
  169. session.checkmate = False
  170. session.draw = None
  171. def undo(self, id):
  172. session = self.sessions.get(id)
  173. if not session:
  174. raise KeyError(id)
  175. session.board.pop()
  176. session.game_over = None
  177. session.checkmate = False
  178. session.draw = None
  179. session.check_game_over()
  180. def is_check(self, id):
  181. session = self.sessions.get(id)
  182. if not session:
  183. raise KeyError(id)
  184. return session.board.is_check()
  185. def is_checkmate(self, id):
  186. session = self.sessions.get(id)
  187. if not session:
  188. raise KeyError(id)
  189. return session.checkmate
  190. def draw(self, id):
  191. session = self.sessions.get(id)
  192. if not session:
  193. raise KeyError(id)
  194. draw = session.draw
  195. session.draw = None
  196. return draw
  197. async def skip(self, id):
  198. session = self.sessions.get(id)
  199. if not session:
  200. raise KeyError(id)
  201. session.skip()
  202. def svg(self, id, size=256, shallow=False):
  203. session = self.sessions.get(id)
  204. if not session:
  205. raise KeyError(id)
  206. return board2svg(session.board, size=size, shallow=shallow)
  207. def ascii(self, id):
  208. session = self.sessions.get(id)
  209. if not session:
  210. raise KeyError(id)
  211. return str(session.board)
  212. def has_moves(self, id):
  213. session = self.sessions.get(id)
  214. if not session:
  215. raise KeyError(id)
  216. return bool(session.board.move_stack)
  217. def moves_count(self, id):
  218. session = self.sessions.get(id)
  219. if not session:
  220. raise KeyError(id)
  221. return len(session.board.move_stack)
  222. def moves(self, id, offset=None):
  223. session = self.sessions.get(id)
  224. if not session:
  225. raise KeyError(id)
  226. if offset is not None:
  227. moves = session.board.move_stack[-offset:]
  228. else:
  229. moves = session.board.move_stack
  230. return " ".join(map(str, moves))
  231. def turn(self, id):
  232. session = self.sessions.get(id)
  233. if not session:
  234. raise KeyError(id)
  235. return session.board.turn
  236. def game_over(self, id):
  237. session = self.sessions.get(id)
  238. if not session:
  239. raise KeyError(id)
  240. return session.game_over
  241. def fen(self, id):
  242. session = self.sessions.get(id)
  243. if not session:
  244. raise KeyError(id)
  245. return session.board.fen()
  246. def animate(self, id, count=None, size=256, shallow=False):
  247. session = self.sessions.get(id)
  248. if not session:
  249. raise KeyError(id)
  250. board = chess.Board()
  251. frames = []
  252. frames.append(board2svg(board, size=size, shallow=shallow, orientation=chess.WHITE))
  253. for move in session.board.move_stack:
  254. board.push(move)
  255. frames.append(board2svg(board, size=size, shallow=shallow, orientation=chess.WHITE))
  256. if count is not None and len(frames) >= count:
  257. break
  258. return frames