chess0.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. import time
  2. import chess
  3. import chess.svg
  4. import chess.engine
  5. class GameOver(Exception):
  6. pass
  7. class IllegalMove(Exception):
  8. pass
  9. class ChessSession:
  10. def __init__(self, engine):
  11. self.engine = engine
  12. self.board = chess.Board()
  13. self.ts = time.time()
  14. self.move_ts = time.time()
  15. def check_game_over(self):
  16. if self.board.is_game_over():
  17. outcome = self.board.outcome()
  18. raise GameOver(f"{outcome.result()} {outcome.termination.name}")
  19. elif self.board.status() & chess.STATUS_NO_WHITE_KING:
  20. raise GameOver(f"0-1 CHECKMATE")
  21. elif self.board.status() & chess.STATUS_NO_BLACK_KING:
  22. raise GameOver(f"1-0 CHECKMATE")
  23. elif not self.board.is_valid():
  24. raise GameOver(f"Некорректное состояние доски.")
  25. async def move(self, move=None):
  26. if move is not None:
  27. try:
  28. move = chess.Move.from_uci(move)
  29. except chess.InvalidMoveError:
  30. raise IllegalMove(move)
  31. if move not in self.board.legal_moves:
  32. raise IllegalMove(move)
  33. else:
  34. move = await self.engine.play(self.board, chess.engine.Limit(nodes=1))
  35. move = move.move
  36. self.board.push(move)
  37. self.move_ts = time.time()
  38. self.check_game_over()
  39. def from_moves(self, moves):
  40. self.board.reset()
  41. moves = moves.strip().split(" ")
  42. if len(moves) > 2048 or len(moves) % 2 != 0:
  43. raise IllegalMove
  44. for move in moves:
  45. try:
  46. move = chess.Move.from_uci(move)
  47. except chess.InvalidMoveError:
  48. raise IllegalMove(move)
  49. if move not in self.board.legal_moves:
  50. raise IllegalMove(move)
  51. self.board.push(move)
  52. self.check_game_over()
  53. def skip(self):
  54. self.board.push(chess.Move.null())
  55. self.move_ts = time.time()
  56. class ChessManager:
  57. def __init__(self, engine_path):
  58. self.engine_path = engine_path
  59. self.engine = None
  60. self.sessions = {}
  61. async def cleanup(self):
  62. for id in list(self.sessions.keys()):
  63. session = self.sessions[id]
  64. if (
  65. time.time() - session.move_ts >= 60 * 60 * 12
  66. or time.time() - session.ts >= 60 * 60 * 24 * 7
  67. ):
  68. await self.end(id)
  69. async def begin(self, id, moves=None):
  70. if id in self.sessions:
  71. await self.end(id)
  72. if not self.engine:
  73. _, self.engine = await chess.engine.popen_uci(self.engine_path)
  74. self.sessions[id] = ChessSession(self.engine)
  75. if moves is not None:
  76. self.sessions[id].from_moves(moves)
  77. async def end(self, id):
  78. session = self.sessions.get(id)
  79. if not session:
  80. return False
  81. del self.sessions[id]
  82. return True
  83. async def move(self, id, move=None):
  84. session = self.sessions.get(id)
  85. if not session:
  86. raise KeyError(id)
  87. await session.move(move)
  88. def undo(self, id):
  89. session = self.sessions.get(id)
  90. if not session:
  91. raise KeyError(id)
  92. session.board.pop()
  93. def is_check(self, id):
  94. session = self.sessions.get(id)
  95. if not session:
  96. raise KeyError(id)
  97. return session.board.is_check()
  98. async def skip(self, id):
  99. session = self.sessions.get(id)
  100. if not session:
  101. raise KeyError(id)
  102. session.skip()
  103. def svg(self, id, size=256):
  104. session = self.sessions.get(id)
  105. if not session:
  106. raise KeyError(id)
  107. arrows = []
  108. if session.board.move_stack:
  109. arrows.append((session.board.move_stack[-1].from_square, session.board.move_stack[-1].to_square))
  110. if len(session.board.move_stack) > 1 and session.board.move_stack[-2] != chess.Move.null():
  111. arrows.append((session.board.move_stack[-2].from_square, session.board.move_stack[-2].to_square))
  112. move = None
  113. if session.board.attackers_mask(chess.WHITE, session.board.king(chess.BLACK)) != 0:
  114. move = session.board.king(chess.BLACK)
  115. if session.board.is_check():
  116. return chess.svg.board(session.board, size=size, lastmove=move, arrows=arrows, fill={checker: "#cc0000cc" for checker in session.board.checkers()}, check=session.board.king(session.board.turn))
  117. return chess.svg.board(session.board, size=size, lastmove=move, arrows=arrows)
  118. def ascii(self, id):
  119. session = self.sessions.get(id)
  120. if not session:
  121. raise KeyError(id)
  122. return str(session.board)
  123. def has_moves(self, id):
  124. session = self.sessions.get(id)
  125. if not session:
  126. raise KeyError(id)
  127. return bool(session.board.move_stack)
  128. def moves_count(self, id):
  129. session = self.sessions.get(id)
  130. if not session:
  131. raise KeyError(id)
  132. return len(session.board.move_stack)
  133. def moves(self, id, offset=None):
  134. session = self.sessions.get(id)
  135. if not session:
  136. raise KeyError(id)
  137. if offset is not None:
  138. moves = session.board.move_stack[-offset:]
  139. else:
  140. moves = session.board.move_stack
  141. return " ".join(map(str, moves))