chess0.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. import re
  2. import time
  3. import chess
  4. import chess.svg
  5. import chess.engine
  6. class GameOver(Exception):
  7. pass
  8. class IllegalMove(Exception):
  9. pass
  10. class ChessSession:
  11. def __init__(self, engine):
  12. self.engine = engine
  13. self.board = chess.Board()
  14. self.ts = time.time()
  15. self.move_ts = time.time()
  16. def parse_move(self, move):
  17. move = move.strip()
  18. try:
  19. move = self.board.parse_san(move)
  20. except (chess.InvalidMoveError, chess.IllegalMoveError, chess.AmbiguousMoveError):
  21. raise IllegalMove(move)
  22. return move
  23. def check_game_over(self):
  24. outcome = self.board.outcome(claim_draw=True)
  25. if outcome is not None:
  26. raise GameOver(f"{outcome.result()} {outcome.termination.name}")
  27. elif self.board.status() & chess.STATUS_NO_WHITE_KING:
  28. raise GameOver(f"0-1 CHECKMATE")
  29. elif self.board.status() & chess.STATUS_NO_BLACK_KING:
  30. raise GameOver(f"1-0 CHECKMATE")
  31. elif not self.board.is_valid():
  32. raise GameOver(f"Некорректное состояние доски.")
  33. async def move(self, move=None):
  34. if move is not None:
  35. move = self.parse_move(move)
  36. else:
  37. move = await self.engine.play(self.board, chess.engine.Limit(nodes=1))
  38. move = move.move
  39. self.board.push(move)
  40. self.move_ts = time.time()
  41. self.check_game_over()
  42. def from_moves(self, moves, strict=True):
  43. self.board.reset()
  44. moves = moves.strip()
  45. if moves.startswith("1."):
  46. text = moves
  47. moves = []
  48. while text and len(moves) < 600:
  49. match = re.match(r"^(?:\d+\. ?([^.\s]+) ([^.\s]+))", text)
  50. if not match:
  51. raise IllegalMove
  52. moves.extend((match.group(1), match.group(2)))
  53. text = text[match.end() - match.start():].strip()
  54. else:
  55. moves = moves.split(" ")
  56. if len(moves) > 600 or (strict and len(moves) % 2 != 0):
  57. raise IllegalMove
  58. for i, move in zip(range(len(moves)), moves):
  59. move = self.parse_move(move)
  60. if strict and move == chess.Move.null() and i % 2 != 0:
  61. raise IllegalMove(move)
  62. self.board.push(move)
  63. self.check_game_over()
  64. def skip(self):
  65. self.board.push(chess.Move.null())
  66. self.move_ts = time.time()
  67. class ChessManager:
  68. def __init__(self, engine_path):
  69. self.engine_path = engine_path
  70. self.engine = None
  71. self.sessions = {}
  72. async def cleanup(self):
  73. for id in list(self.sessions.keys()):
  74. session = self.sessions[id]
  75. if (
  76. time.time() - session.move_ts >= 60 * 60 * 12
  77. or time.time() - session.ts >= 60 * 60 * 24 * 7
  78. ):
  79. await self.end(id)
  80. async def begin(self, id, moves=None, strict=True):
  81. if id in self.sessions:
  82. self.end(id)
  83. if not self.engine:
  84. _, self.engine = await chess.engine.popen_uci(self.engine_path)
  85. self.sessions[id] = ChessSession(self.engine)
  86. if moves is not None:
  87. self.sessions[id].from_moves(moves, strict=strict)
  88. def end(self, id):
  89. session = self.sessions.get(id)
  90. if not session:
  91. return False
  92. del self.sessions[id]
  93. return True
  94. async def move(self, id, move=None):
  95. session = self.sessions.get(id)
  96. if not session:
  97. raise KeyError(id)
  98. await session.move(move)
  99. def undo(self, id):
  100. session = self.sessions.get(id)
  101. if not session:
  102. raise KeyError(id)
  103. session.board.pop()
  104. def is_check(self, id):
  105. session = self.sessions.get(id)
  106. if not session:
  107. raise KeyError(id)
  108. return session.board.is_check()
  109. async def skip(self, id):
  110. session = self.sessions.get(id)
  111. if not session:
  112. raise KeyError(id)
  113. session.skip()
  114. def svg(self, id, size=256, shallow=False):
  115. session = self.sessions.get(id)
  116. if not session:
  117. raise KeyError(id)
  118. arrows = []
  119. if session.board.move_stack and session.board.move_stack[-1] != chess.Move.null():
  120. arrows.append((session.board.move_stack[-1].from_square, session.board.move_stack[-1].to_square))
  121. if not shallow and len(session.board.move_stack) > 1 and session.board.move_stack[-2] != chess.Move.null():
  122. arrows.append((session.board.move_stack[-2].from_square, session.board.move_stack[-2].to_square))
  123. if session.board.is_check():
  124. return chess.svg.board(session.board, orientation=session.board.turn, size=size, arrows=arrows, fill={checker: "#cc0000cc" for checker in session.board.checkers()}, check=session.board.king(session.board.turn))
  125. return chess.svg.board(session.board, orientation=session.board.turn, size=size, arrows=arrows)
  126. def ascii(self, id):
  127. session = self.sessions.get(id)
  128. if not session:
  129. raise KeyError(id)
  130. return str(session.board)
  131. def has_moves(self, id):
  132. session = self.sessions.get(id)
  133. if not session:
  134. raise KeyError(id)
  135. return bool(session.board.move_stack)
  136. def moves_count(self, id):
  137. session = self.sessions.get(id)
  138. if not session:
  139. raise KeyError(id)
  140. return len(session.board.move_stack)
  141. def moves(self, id, offset=None):
  142. session = self.sessions.get(id)
  143. if not session:
  144. raise KeyError(id)
  145. if offset is not None:
  146. moves = session.board.move_stack[-offset:]
  147. else:
  148. moves = session.board.move_stack
  149. return " ".join(map(str, moves))
  150. def turn(self, id):
  151. session = self.sessions.get(id)
  152. if not session:
  153. raise KeyError(id)
  154. return session.board.turn