-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathping.py
More file actions
56 lines (42 loc) · 1.74 KB
/
ping.py
File metadata and controls
56 lines (42 loc) · 1.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
"""Low-level wrapper for the PingAPI.
This module provides thin wrappers around the autogenerated bindings for the PingAPI.
It handles common concerns like error handling and retries.
It provides an asynchronous client for the PingAPI.
"""
from __future__ import annotations
import logging
from typing import cast
from sift.ping.v1.ping_pb2 import (
PingRequest,
PingResponse,
)
from sift.ping.v1.ping_pb2_grpc import PingServiceStub
from sift_client._internal.low_level_wrappers.base import (
LowLevelClientBase,
)
from sift_client.transport import GrpcClient, WithGrpcClient
# Configure logging
logger = logging.getLogger(__name__)
class PingLowLevelClient(LowLevelClientBase, WithGrpcClient):
"""Low-level client for the PingAPI.
This class provides a thin wrapper around the autogenerated bindings for the PingAPI.
It handles common concerns like error handling and retries.
"""
_cache_results: bool
"""Whether to cache the results of the ping request. Used for testing."""
def __init__(self, grpc_client: GrpcClient):
"""Initialize the PingLowLevelClient.
Args:
grpc_client: The gRPC client to use for making API calls.
"""
super().__init__(grpc_client=grpc_client)
self._cache_results = False
async def ping(self, _force_refresh: bool = False) -> str:
"""Send a ping request to the server in the current event loop."""
# get stub bound to this loop
stub = self._grpc_client.get_stub(PingServiceStub)
request = PingRequest()
response = await self._call_with_cache(
stub.Ping, request, use_cache=self._cache_results, force_refresh=_force_refresh, ttl=1
)
return cast("PingResponse", response).response