Skip to content

Commit 6038732

Browse files
authored
Close game stream (#1185)
* Always close game stream connection * Always close lichess event stream * Add context methods to test EventStream * Add context methods for test game stream * Fix types for older versions of python
1 parent 960bcad commit 6038732

3 files changed

Lines changed: 122 additions & 106 deletions

File tree

lib/lichess_bot.py

Lines changed: 106 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,14 @@ def watch_control_stream(control_queue: CONTROL_QUEUE_TYPE, li: lichess.Lichess)
117117
error = None
118118
while not stop.terminated:
119119
try:
120-
response = li.get_event_stream()
121-
lines = response.iter_lines()
122-
for line in lines:
123-
if line:
124-
event = json.loads(line.decode("utf-8"))
125-
control_queue.put_nowait(event)
126-
else:
127-
control_queue.put_nowait({"type": "ping"})
120+
with li.get_event_stream() as response:
121+
lines = response.iter_lines()
122+
for line in lines:
123+
if line:
124+
event = json.loads(line.decode("utf-8"))
125+
control_queue.put_nowait(event)
126+
else:
127+
control_queue.put_nowait({"type": "ping"})
128128
except Exception:
129129
error = traceback.format_exc()
130130
break
@@ -668,104 +668,104 @@ def play_game(li: lichess.Lichess,
668668
thread_logging_configurer(logging_queue)
669669
logger = logging.getLogger(__name__)
670670

671-
response = li.get_game_stream(game_id)
672-
lines = response.iter_lines()
673-
674-
# Initial response of stream will be the full game info. Store it.
675-
initial_state = json.loads(next(lines).decode("utf-8"))
676-
logger.debug(f"Initial state: {initial_state}")
677-
abort_time = seconds(config.abort_time)
678-
game = model.Game(initial_state, user_profile["username"], li.baseUrl, abort_time)
679-
680-
with engine_wrapper.create_engine(config, game) as engine:
681-
engine.get_opponent_info(game)
682-
logger.debug(f"The engine for game {game_id} has pid={engine.get_pid()}")
683-
conversation = Conversation(game, engine, li, __version__, challenge_queue)
684-
685-
logger.info(f"+++ {game}")
686-
687-
is_correspondence = game.speed == "correspondence"
688-
correspondence_cfg = config.correspondence
689-
correspondence_move_time = seconds(correspondence_cfg.move_time)
690-
correspondence_disconnect_time = seconds(correspondence_cfg.disconnect_time)
691-
692-
engine_cfg = config.engine
693-
ponder_cfg = correspondence_cfg if is_correspondence else engine_cfg
694-
can_ponder = ponder_cfg.uci_ponder or ponder_cfg.ponder
695-
move_overhead = msec(config.move_overhead)
696-
delay = msec(config.rate_limiting_delay)
697-
698-
takebacks_accepted = read_takeback_record(game)
699-
max_takebacks_accepted = config.max_takebacks_accepted
700-
701-
keyword_map: defaultdict[str, str] = defaultdict(str, me=game.me.name, opponent=game.opponent.name)
702-
hello = get_greeting("hello", config.greeting, keyword_map)
703-
goodbye = get_greeting("goodbye", config.greeting, keyword_map)
704-
hello_spectators = get_greeting("hello_spectators", config.greeting, keyword_map)
705-
goodbye_spectators = get_greeting("goodbye_spectators", config.greeting, keyword_map)
706-
707-
disconnect_time = correspondence_disconnect_time if not game.state.get("moves") else seconds(0)
708-
prior_game = None
709-
board = chess.Board()
710-
game_stream = itertools.chain([json.dumps(game.state).encode("utf-8")], lines)
711-
quit_after_all_games_finish = config.quit_after_all_games_finish
712-
stay_in_game = True
713-
while stay_in_game and (not stop.terminated or quit_after_all_games_finish) and not stop.force_quit:
714-
move_attempted = False
715-
try:
716-
upd = next_update(game_stream)
717-
u_type = upd["type"] if upd else "ping"
718-
if u_type == "chatLine":
719-
conversation.react(ChatLine(upd))
720-
elif u_type == "gameState":
721-
game.state = upd
722-
board = setup_board(game)
723-
takeback_field = game.state.get("btakeback") if game.is_white else game.state.get("wtakeback")
724-
725-
if not is_game_over(game) and is_engine_move(game, prior_game, board):
726-
disconnect_time = correspondence_disconnect_time
727-
say_hello(conversation, hello, hello_spectators, board)
728-
setup_timer = Timer()
729-
print_move_number(board)
730-
move_attempted = True
731-
engine.play_move(board,
732-
game,
733-
li,
734-
setup_timer,
735-
move_overhead,
736-
can_ponder,
737-
is_correspondence,
738-
correspondence_move_time,
739-
engine_cfg,
740-
fake_think_time(config, board, game))
741-
time.sleep(to_seconds(delay))
742-
elif is_game_over(game):
743-
tell_user_game_result(game, board)
744-
engine.send_game_result(game, board)
745-
conversation.send_message("player", goodbye)
746-
conversation.send_message("spectator", goodbye_spectators)
747-
elif (takeback_field
748-
and not bot_to_move(game, board)
749-
and li.accept_takeback(game.id, takebacks_accepted < max_takebacks_accepted)):
750-
takebacks_accepted += 1
751-
record_takeback(game, takebacks_accepted)
752-
engine.discard_last_move_commentary()
753-
754-
wbtime = upd[engine_wrapper.wbtime(board)]
755-
wbinc = upd[engine_wrapper.wbinc(board)]
756-
terminate_time = msec(wbtime) + msec(wbinc) + seconds(60)
757-
game.ping(abort_time, terminate_time, disconnect_time)
758-
prior_game = copy.deepcopy(game)
759-
elif u_type == "ping" and should_exit_game(board, game, prior_game, li, is_correspondence):
760-
stay_in_game = False
761-
except (HTTPError, ReadTimeout, RemoteDisconnected, ChunkedEncodingError, RequestsConnectionError,
762-
StopIteration) as e:
763-
stopped = isinstance(e, StopIteration)
764-
stay_in_game = not stopped and (move_attempted or game_is_active(li, game.id))
765-
766-
pgn_record = try_get_pgn_game_record(li, config, game, board, engine)
767-
final_queue_entries(control_queue, correspondence_queue, game, is_correspondence, pgn_record, pgn_queue)
768-
delete_takeback_record(game)
671+
with li.get_game_stream(game_id) as response:
672+
lines = response.iter_lines()
673+
674+
# Initial response of stream will be the full game info. Store it.
675+
initial_state = json.loads(next(lines).decode("utf-8"))
676+
logger.debug(f"Initial state: {initial_state}")
677+
abort_time = seconds(config.abort_time)
678+
game = model.Game(initial_state, user_profile["username"], li.baseUrl, abort_time)
679+
680+
with engine_wrapper.create_engine(config, game) as engine:
681+
engine.get_opponent_info(game)
682+
logger.debug(f"The engine for game {game_id} has pid={engine.get_pid()}")
683+
conversation = Conversation(game, engine, li, __version__, challenge_queue)
684+
685+
logger.info(f"+++ {game}")
686+
687+
is_correspondence = game.speed == "correspondence"
688+
correspondence_cfg = config.correspondence
689+
correspondence_move_time = seconds(correspondence_cfg.move_time)
690+
correspondence_disconnect_time = seconds(correspondence_cfg.disconnect_time)
691+
692+
engine_cfg = config.engine
693+
ponder_cfg = correspondence_cfg if is_correspondence else engine_cfg
694+
can_ponder = ponder_cfg.uci_ponder or ponder_cfg.ponder
695+
move_overhead = msec(config.move_overhead)
696+
delay = msec(config.rate_limiting_delay)
697+
698+
takebacks_accepted = read_takeback_record(game)
699+
max_takebacks_accepted = config.max_takebacks_accepted
700+
701+
keyword_map: defaultdict[str, str] = defaultdict(str, me=game.me.name, opponent=game.opponent.name)
702+
hello = get_greeting("hello", config.greeting, keyword_map)
703+
goodbye = get_greeting("goodbye", config.greeting, keyword_map)
704+
hello_spectators = get_greeting("hello_spectators", config.greeting, keyword_map)
705+
goodbye_spectators = get_greeting("goodbye_spectators", config.greeting, keyword_map)
706+
707+
disconnect_time = correspondence_disconnect_time if not game.state.get("moves") else seconds(0)
708+
prior_game = None
709+
board = chess.Board()
710+
game_stream = itertools.chain([json.dumps(game.state).encode("utf-8")], lines)
711+
quit_after_all_games_finish = config.quit_after_all_games_finish
712+
stay_in_game = True
713+
while stay_in_game and (not stop.terminated or quit_after_all_games_finish) and not stop.force_quit:
714+
move_attempted = False
715+
try:
716+
upd = next_update(game_stream)
717+
u_type = upd["type"] if upd else "ping"
718+
if u_type == "chatLine":
719+
conversation.react(ChatLine(upd))
720+
elif u_type == "gameState":
721+
game.state = upd
722+
board = setup_board(game)
723+
takeback_field = game.state.get("btakeback") if game.is_white else game.state.get("wtakeback")
724+
725+
if not is_game_over(game) and is_engine_move(game, prior_game, board):
726+
disconnect_time = correspondence_disconnect_time
727+
say_hello(conversation, hello, hello_spectators, board)
728+
setup_timer = Timer()
729+
print_move_number(board)
730+
move_attempted = True
731+
engine.play_move(board,
732+
game,
733+
li,
734+
setup_timer,
735+
move_overhead,
736+
can_ponder,
737+
is_correspondence,
738+
correspondence_move_time,
739+
engine_cfg,
740+
fake_think_time(config, board, game))
741+
time.sleep(to_seconds(delay))
742+
elif is_game_over(game):
743+
tell_user_game_result(game, board)
744+
engine.send_game_result(game, board)
745+
conversation.send_message("player", goodbye)
746+
conversation.send_message("spectator", goodbye_spectators)
747+
elif (takeback_field
748+
and not bot_to_move(game, board)
749+
and li.accept_takeback(game.id, takebacks_accepted < max_takebacks_accepted)):
750+
takebacks_accepted += 1
751+
record_takeback(game, takebacks_accepted)
752+
engine.discard_last_move_commentary()
753+
754+
wbtime = upd[engine_wrapper.wbtime(board)]
755+
wbinc = upd[engine_wrapper.wbinc(board)]
756+
terminate_time = msec(wbtime) + msec(wbinc) + seconds(60)
757+
game.ping(abort_time, terminate_time, disconnect_time)
758+
prior_game = copy.deepcopy(game)
759+
elif u_type == "ping" and should_exit_game(board, game, prior_game, li, is_correspondence):
760+
stay_in_game = False
761+
except (HTTPError, ReadTimeout, RemoteDisconnected, ChunkedEncodingError, RequestsConnectionError,
762+
StopIteration) as e:
763+
stopped = isinstance(e, StopIteration)
764+
stay_in_game = not stopped and (move_attempted or game_is_active(li, game.id))
765+
766+
pgn_record = try_get_pgn_game_record(li, config, game, board, engine)
767+
final_queue_entries(control_queue, correspondence_queue, game, is_correspondence, pgn_record, pgn_queue)
768+
delete_takeback_record(game)
769769

770770

771771
def read_takeback_record(game: model.Game) -> int:

test_bot/lichess.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Imitate `lichess.py`. Used in tests."""
2+
from __future__ import annotations
23
import time
34
import chess.engine
45
import json
@@ -103,6 +104,13 @@ def iter_lines(self, chunk_size: int | None = 512, decode_unicode: bool = False,
103104
new_game_state["status"] = "started"
104105
yield json.dumps(new_game_state).encode("utf-8")
105106

107+
def __enter__(self) -> GameStream: # noqa: PYI034
108+
"""Enter game stream context."""
109+
return self
110+
111+
def __exit__(self, *args: object) -> None:
112+
"""Exit game stream context."""
113+
106114

107115
class EventStream(Response):
108116
"""Imitate lichess.org's EventStream. Used in tests."""
@@ -129,6 +137,13 @@ def iter_lines(self, chunk_size: int | None = 512, decode_unicode: bool = False,
129137
"compat": {"bot": True,
130138
"board": True}}}).encode("utf-8")
131139

140+
def __enter__(self) -> EventStream: # noqa: PYI034
141+
"""Enter context block."""
142+
return self
143+
144+
def __exit__(self, *args: object) -> None:
145+
"""Exit context block."""
146+
132147

133148
# Docs: https://lichess.org/api.
134149
class Lichess(OriginalLichess):

test_bot/ruff.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ ignore = [
3636
"S311", # Standard pseudo-random generators are not suitable for cryptographic purposes
3737
"SIM108", # Use ternary operator instead of `if`-`else`-block
3838
"TC001", # Move application import into a type-checking block
39+
"TC002", # Move third-party import into a type-checking block
3940
"TC003", # Move standard library import into a type-checking block
4041
"TC006", # Add quotes to type expression in `typing.cast()`
4142
"TRY", # Try-except suggestions

0 commit comments

Comments
 (0)