Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion python/lib/sift_client/_tests/_internal/test_transport.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Tests for URL normalization in GrpcConfig and RestConfig."""

import threading

import pytest

from sift_client.transport.grpc_transport import GrpcConfig
from sift_client.transport.grpc_transport import GrpcClient, GrpcConfig
from sift_client.transport.rest_transport import RestConfig


Expand Down Expand Up @@ -48,6 +50,49 @@ def test_raise_on_missing_url(self):
GrpcConfig(url="", api_key="api")


class TestGrpcClientClose:
"""Lifecycle of GrpcClient.close_sync().

Constructing a GrpcClient builds a real (lazy, undialed) channel against an
unresolvable host and spins up the background event-loop thread; no RPC is
made, so these run offline.
"""

@staticmethod
def _client() -> GrpcClient:
return GrpcClient(GrpcConfig(url="disabled.invalid:0", api_key="api", use_ssl=False))

def test_close_sync_releases_channels(self):
# The channel maps must be cleared so the gRPC C-core can destroy the
# channels before its own exit-time shutdown, avoiding the
# "grpc_wait_for_shutdown_with_timeout() timed out" message.
client = self._client()
assert client._channels_async # channel created on the default loop
client.close_sync()
assert client._closed is True
assert client._channels_async == {}
assert client._stubs_async_map == {}
assert not client._default_loop_thread.is_alive()

def test_close_sync_is_idempotent(self):
# The atexit handler always fires after an explicit close (or the
# context manager's __exit__). The second call must be a no-op, not hang
# submitting a coroutine to the already-stopped loop.
client = self._client()
client.close_sync()

finished = threading.Event()

def _second_close():
client.close_sync()
finished.set()

thread = threading.Thread(target=_second_close)
thread.start()
thread.join(timeout=5.0)
assert finished.is_set(), "second close_sync() hung on the stopped loop"


class TestRestConfigUrl:
def test_adds_https_when_missing(self):
config = RestConfig(base_url="rest.sift.com", api_key="api")
Expand Down
42 changes: 36 additions & 6 deletions python/lib/sift_client/transport/grpc_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import asyncio
import atexit
import concurrent.futures
import logging
import threading
from typing import Any
Expand Down Expand Up @@ -102,6 +103,9 @@ def __init__(self, config: GrpcConfig):
# map each asyncio loop to its async channel and stub dict
self._channels_async: dict[asyncio.AbstractEventLoop, Any] = {}
self._stubs_async_map: dict[asyncio.AbstractEventLoop, dict[type[Any], Any]] = {}
# Guards close() / close_sync() against running twice. The atexit
# handler always fires, so an explicit close must leave it a no-op.
self._closed = False
# default loop for sync API
self._default_loop = asyncio.new_event_loop()
atexit.register(self.close_sync)
Expand Down Expand Up @@ -160,21 +164,47 @@ def get_stub(self, stub_class: type[Any]) -> Any:
return stubs[stub_class]

def close_sync(self):
"""Close the sync channel and all async channels."""
"""Close the sync channel and all async channels. Idempotent."""
if self._closed:
return
self._closed = True
try:
for ch in self._channels_async.values():
asyncio.run_coroutine_threadsafe(ch.close(), self._default_loop).result()
self._default_loop.call_soon_threadsafe(self._default_loop.stop)
# Only drive the loop if it's still running; submitting a coroutine
# to a stopped loop never resolves and would hang on .result().
if self._default_loop.is_running():
for ch in self._channels_async.values():
asyncio.run_coroutine_threadsafe(ch.close(), self._default_loop).result(
timeout=5.0
)
self._default_loop.call_soon_threadsafe(self._default_loop.stop)
self._default_loop_thread.join(timeout=1.0)
except ValueError:
except (ValueError, RuntimeError, concurrent.futures.TimeoutError):
...
finally:
self._release_channels()

async def close(self):
"""Close sync and async channels and stop the default loop."""
"""Close sync and async channels and stop the default loop. Idempotent."""
if self._closed:
return
self._closed = True
for ch in self._channels_async.values():
await ch.close()
self._default_loop.call_soon_threadsafe(self._default_loop.stop)
self._default_loop_thread.join(timeout=1.0)
self._release_channels()

def _release_channels(self):
"""Drop references to the closed channels and stubs.

The gRPC C-core defers a channel's resource release until the Python
object is destroyed, not merely closed. Holding the channels in these
maps keeps them alive until interpreter finalization, which races the
C-core's own exit-time shutdown ("grpc_wait_for_shutdown_with_timeout()
timed out"). Clearing the maps lets the channels be collected promptly.
"""
self._channels_async.clear()
self._stubs_async_map.clear()

async def __aenter__(self):
return self
Expand Down
Loading