Skip to content

Commit a16c005

Browse files
committed
lifecycle cleanup improvements for sift client for use with pytest
1 parent edda647 commit a16c005

2 files changed

Lines changed: 82 additions & 7 deletions

File tree

python/lib/sift_client/_tests/_internal/test_transport.py

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
"""Tests for URL normalization in GrpcConfig and RestConfig."""
22

3+
import threading
4+
35
import pytest
46

5-
from sift_client.transport.grpc_transport import GrpcConfig
7+
from sift_client.transport.grpc_transport import GrpcClient, GrpcConfig
68
from sift_client.transport.rest_transport import RestConfig
79

810

@@ -48,6 +50,49 @@ def test_raise_on_missing_url(self):
4850
GrpcConfig(url="", api_key="api")
4951

5052

53+
class TestGrpcClientClose:
54+
"""Lifecycle of GrpcClient.close_sync().
55+
56+
Constructing a GrpcClient builds a real (lazy, undialed) channel against an
57+
unresolvable host and spins up the background event-loop thread; no RPC is
58+
made, so these run offline.
59+
"""
60+
61+
@staticmethod
62+
def _client() -> GrpcClient:
63+
return GrpcClient(GrpcConfig(url="disabled.invalid:0", api_key="api", use_ssl=False))
64+
65+
def test_close_sync_releases_channels(self):
66+
# The channel maps must be cleared so the gRPC C-core can destroy the
67+
# channels before its own exit-time shutdown, avoiding the
68+
# "grpc_wait_for_shutdown_with_timeout() timed out" message.
69+
client = self._client()
70+
assert client._channels_async # channel created on the default loop
71+
client.close_sync()
72+
assert client._closed is True
73+
assert client._channels_async == {}
74+
assert client._stubs_async_map == {}
75+
assert not client._default_loop_thread.is_alive()
76+
77+
def test_close_sync_is_idempotent(self):
78+
# The atexit handler always fires after an explicit close (or the
79+
# context manager's __exit__). The second call must be a no-op, not hang
80+
# submitting a coroutine to the already-stopped loop.
81+
client = self._client()
82+
client.close_sync()
83+
84+
finished = threading.Event()
85+
86+
def _second_close():
87+
client.close_sync()
88+
finished.set()
89+
90+
thread = threading.Thread(target=_second_close)
91+
thread.start()
92+
thread.join(timeout=5.0)
93+
assert finished.is_set(), "second close_sync() hung on the stopped loop"
94+
95+
5196
class TestRestConfigUrl:
5297
def test_adds_https_when_missing(self):
5398
config = RestConfig(base_url="rest.sift.com", api_key="api")

python/lib/sift_client/transport/grpc_transport.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import asyncio
1010
import atexit
11+
import concurrent.futures
1112
import logging
1213
import threading
1314
from typing import Any
@@ -102,6 +103,9 @@ def __init__(self, config: GrpcConfig):
102103
# map each asyncio loop to its async channel and stub dict
103104
self._channels_async: dict[asyncio.AbstractEventLoop, Any] = {}
104105
self._stubs_async_map: dict[asyncio.AbstractEventLoop, dict[type[Any], Any]] = {}
106+
# Guards close() / close_sync() against running twice. The atexit
107+
# handler always fires, so an explicit close must leave it a no-op.
108+
self._closed = False
105109
# default loop for sync API
106110
self._default_loop = asyncio.new_event_loop()
107111
atexit.register(self.close_sync)
@@ -160,21 +164,47 @@ def get_stub(self, stub_class: type[Any]) -> Any:
160164
return stubs[stub_class]
161165

162166
def close_sync(self):
163-
"""Close the sync channel and all async channels."""
167+
"""Close the sync channel and all async channels. Idempotent."""
168+
if self._closed:
169+
return
170+
self._closed = True
164171
try:
165-
for ch in self._channels_async.values():
166-
asyncio.run_coroutine_threadsafe(ch.close(), self._default_loop).result()
167-
self._default_loop.call_soon_threadsafe(self._default_loop.stop)
172+
# Only drive the loop if it's still running; submitting a coroutine
173+
# to a stopped loop never resolves and would hang on .result().
174+
if self._default_loop.is_running():
175+
for ch in self._channels_async.values():
176+
asyncio.run_coroutine_threadsafe(ch.close(), self._default_loop).result(
177+
timeout=5.0
178+
)
179+
self._default_loop.call_soon_threadsafe(self._default_loop.stop)
168180
self._default_loop_thread.join(timeout=1.0)
169-
except ValueError:
181+
except (ValueError, RuntimeError, concurrent.futures.TimeoutError):
170182
...
183+
finally:
184+
self._release_channels()
171185

172186
async def close(self):
173-
"""Close sync and async channels and stop the default loop."""
187+
"""Close sync and async channels and stop the default loop. Idempotent."""
188+
if self._closed:
189+
return
190+
self._closed = True
174191
for ch in self._channels_async.values():
175192
await ch.close()
176193
self._default_loop.call_soon_threadsafe(self._default_loop.stop)
177194
self._default_loop_thread.join(timeout=1.0)
195+
self._release_channels()
196+
197+
def _release_channels(self):
198+
"""Drop references to the closed channels and stubs.
199+
200+
The gRPC C-core defers a channel's resource release until the Python
201+
object is destroyed, not merely closed. Holding the channels in these
202+
maps keeps them alive until interpreter finalization, which races the
203+
C-core's own exit-time shutdown ("grpc_wait_for_shutdown_with_timeout()
204+
timed out"). Clearing the maps lets the channels be collected promptly.
205+
"""
206+
self._channels_async.clear()
207+
self._stubs_async_map.clear()
178208

179209
async def __aenter__(self):
180210
return self

0 commit comments

Comments
 (0)