Skip to content

Commit 4be9e09

Browse files
committed
- fix tests
- Properly handle optional dependency of sift-stream
1 parent 6a5aca3 commit 4be9e09

8 files changed

Lines changed: 80 additions & 37 deletions

File tree

python/lib/sift_client/_internal/low_level_wrappers/ingestion.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,14 @@
1818
from queue import Queue
1919
from typing import TYPE_CHECKING, Any, cast
2020

21-
import sift_stream_bindings
2221
from sift.ingestion_configs.v2.ingestion_configs_pb2 import (
2322
GetIngestionConfigRequest,
2423
ListIngestionConfigFlowsResponse,
2524
ListIngestionConfigsRequest,
2625
ListIngestionConfigsResponse,
2726
)
28-
from sift.ingestion_configs.v2.ingestion_configs_pb2_grpc import IngestionConfigServiceStub
29-
from sift_stream_bindings import (
30-
IngestionConfigFormPy,
31-
IngestWithConfigDataStreamRequestPy,
27+
from sift.ingestion_configs.v2.ingestion_configs_pb2_grpc import (
28+
IngestionConfigServiceStub,
3229
)
3330

3431
from sift_client._internal.low_level_wrappers.base import (
@@ -44,6 +41,12 @@
4441
if TYPE_CHECKING:
4542
from datetime import datetime
4643

44+
from sift_stream_bindings import (
45+
IngestionConfigFormPy,
46+
IngestWithConfigDataStreamRequestPy,
47+
SiftStreamBuilderPy,
48+
)
49+
4750

4851
class IngestionThread(threading.Thread):
4952
"""Manages ingestion for a single ingestion config."""
@@ -54,7 +57,7 @@ class IngestionThread(threading.Thread):
5457

5558
def __init__(
5659
self,
57-
sift_stream_builder: sift_stream_bindings.SiftStreamBuilderPy,
60+
sift_stream_builder: SiftStreamBuilderPy,
5861
data_queue: Queue,
5962
ingestion_config: IngestionConfigFormPy,
6063
no_data_timeout: int = 1,
@@ -154,7 +157,7 @@ class IngestionLowLevelClient(LowLevelClientBase, WithGrpcClient):
154157

155158
CacheEntry = namedtuple("CacheEntry", ["data_queue", "ingestion_config", "thread"])
156159

157-
sift_stream_builder: sift_stream_bindings.SiftStreamBuilderPy
160+
sift_stream_builder: SiftStreamBuilderPy
158161
stream_cache: dict[str, CacheEntry]
159162

160163
def __init__(self, grpc_client: GrpcClient):
@@ -163,21 +166,25 @@ def __init__(self, grpc_client: GrpcClient):
163166
Args:
164167
grpc_client: The gRPC client to use for making API calls.
165168
"""
169+
from sift_stream_bindings import (
170+
RecoveryStrategyPy,
171+
RetryPolicyPy,
172+
SiftStreamBuilderPy,
173+
)
174+
166175
super().__init__(grpc_client=grpc_client)
167176
# Rust GRPC client expects URI to have http(s):// prefix.
168177
uri = grpc_client._config.uri
169178
if not uri.startswith("http"):
170179
uri = f"https://{uri}" if grpc_client._config.use_ssl else f"http://{uri}"
171-
self.sift_stream_builder = sift_stream_bindings.SiftStreamBuilderPy(
180+
self.sift_stream_builder = SiftStreamBuilderPy(
172181
uri=uri,
173182
apikey=grpc_client._config.api_key,
174183
)
175184
self.sift_stream_builder.enable_tls = grpc_client._config.use_ssl
176185
# FD-177: Expose configuration for recovery strategy.
177-
self.sift_stream_builder.recovery_strategy = (
178-
sift_stream_bindings.RecoveryStrategyPy.retry_only(
179-
sift_stream_bindings.RetryPolicyPy.default()
180-
)
186+
self.sift_stream_builder.recovery_strategy = RecoveryStrategyPy.retry_only(
187+
RetryPolicyPy.default()
181188
)
182189
self.stream_cache = {}
183190

@@ -229,7 +236,9 @@ async def get_ingestion_config_id_from_client_key(self, client_key: str) -> str
229236
return ingestion_configs[0].id_
230237

231238
def _new_ingestion_thread(
232-
self, ingestion_config_id: str, ingestion_config: IngestionConfigFormPy | None = None
239+
self,
240+
ingestion_config_id: str,
241+
ingestion_config: IngestionConfigFormPy | None = None,
233242
):
234243
"""Start a new ingestion thread.
235244
This allows ingestion to happen in the background regardless of if the user is using the sync or async client

python/lib/sift_client/_tests/sift_types/test_base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class NestedCreateModel(ModelCreate[CreateCalculatedChannelRequest]):
4141
expression: str | None = None
4242
all_assets: bool | None = None
4343

44-
_to_proto_helpers: ClassVar = {
44+
_to_proto_helpers: ClassVar[dict[str, MappingHelper]] = {
4545
"expression": MappingHelper(
4646
proto_attr_path="calculated_channel_configuration.query_configuration.sel.expression",
4747
update_field="query_configuration",
@@ -293,7 +293,7 @@ def test_validation_error_on_invalid_helper_field(self):
293293
class InvalidModel(ModelCreate[CreateCalculatedChannelRequest]):
294294
name: str
295295

296-
_to_proto_helpers: ClassVar = {
296+
_to_proto_helpers: ClassVar[dict[str, MappingHelper]] = {
297297
"nonexistent_field": MappingHelper(proto_attr_path="some.path"),
298298
}
299299

python/lib/sift_client/sift_types/_base.py

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,14 @@
22

33
from abc import ABC, abstractmethod
44
from datetime import datetime
5-
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Generic, TypeVar
5+
from typing import (
6+
TYPE_CHECKING,
7+
Any,
8+
Callable,
9+
ClassVar,
10+
Generic,
11+
TypeVar,
12+
)
613

714
from google.protobuf import field_mask_pb2, message
815
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, model_validator
@@ -84,17 +91,31 @@ class ModelCreateUpdateBase(BaseModel, ABC):
8491
"""Base class for Pydantic models that generate proto messages."""
8592

8693
model_config = ConfigDict(frozen=False)
87-
_to_proto_helpers: ClassVar[dict[str, MappingHelper]] = PrivateAttr(default={})
94+
_to_proto_helpers: ClassVar[dict[str, MappingHelper]] = {}
8895

8996
def __init__(self, **data: Any):
9097
super().__init__(**data)
98+
99+
@model_validator(mode="after")
100+
def _check_mapping_helpers(self):
91101
if self._to_proto_helpers:
92102
data = self.model_dump()
93103
for expected_field in self._to_proto_helpers.keys():
94104
if expected_field not in data:
95105
raise ValueError(
96106
f"MappingHelper created for {expected_field} but {self.__class__.__name__} has no matching variable names."
97107
)
108+
return self
109+
110+
def __init_subclass__(cls, **kwargs):
111+
super().__init_subclass__(**kwargs)
112+
required_annotation = "ClassVar[dict[str, MappingHelper]]"
113+
annotation = cls.__annotations__.get("_to_proto_helpers")
114+
# Check for correct annotation otherwise pydantic will not populate this properly
115+
if annotation and annotation != required_annotation:
116+
raise TypeError(
117+
f"{cls.__name__} must define _to_proto_helpers type as: {required_annotation}"
118+
)
98119

99120
def _build_proto_and_paths(
100121
self, proto_msg, data, prefix="", already_setting_path_override=False
@@ -123,13 +144,13 @@ def _build_proto_and_paths(
123144
if mapping_helper.update_field:
124145
paths.append(mapping_helper.update_field)
125146
elif isinstance(value, dict):
126-
if field_name in self._to_proto_helpers:
127-
assert self._to_proto_helpers[field_name].converter, (
147+
if field_name in self.__class__._to_proto_helpers:
148+
assert self.__class__._to_proto_helpers[field_name].converter, (
128149
f"Expecting to run a coverter given a helper was defined for: {field_name}"
129150
)
130151
sub_paths = self._build_proto_and_paths(
131152
proto_msg,
132-
{field_name: self._to_proto_helpers[field_name].converter(value)}, # type: ignore[misc]
153+
{field_name: self.__class__._to_proto_helpers[field_name].converter(value)}, # type: ignore[misc]
133154
"",
134155
already_setting_path_override=True,
135156
)
@@ -151,13 +172,13 @@ def _build_proto_and_paths(
151172
try:
152173
repeated_field.extend(value) # Add all new values
153174
except TypeError as e:
154-
if field_name in self._to_proto_helpers:
155-
assert self._to_proto_helpers[field_name].converter, (
175+
if field_name in self.__class__._to_proto_helpers:
176+
assert self.__class__._to_proto_helpers[field_name].converter, (
156177
f"Expecting to run a coverter given a helper was defined for: {field_name}"
157178
)
158179
for item in value:
159180
repeated_field.append(
160-
self._to_proto_helpers[field_name].converter(**item) # type: ignore
181+
self.__class__._to_proto_helpers[field_name].converter(**item) # type: ignore
161182
)
162183
else:
163184
raise e

python/lib/sift_client/sift_types/asset.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ class AssetUpdate(ModelUpdate[AssetProto]):
113113
metadata: dict[str, str | float | bool] | None = None
114114
is_archived: bool | None = None
115115

116-
_to_proto_helpers: ClassVar = {
116+
_to_proto_helpers: ClassVar[dict[str, MappingHelper]] = {
117117
"metadata": MappingHelper(
118118
proto_attr_path="metadata",
119119
update_field="metadata",

python/lib/sift_client/sift_types/calculated_channel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ class CalculatedChannelBase(ModelCreateUpdateBase):
149149

150150
metadata: dict[str, str | float | bool] | None = None
151151

152-
_to_proto_helpers: ClassVar = {
152+
_to_proto_helpers: ClassVar[dict[str, MappingHelper]] = {
153153
"expression": MappingHelper(
154154
proto_attr_path="calculated_channel_configuration.query_configuration.sel.expression",
155155
update_field="query_configuration",

python/lib/sift_client/sift_types/ingestion.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,20 @@
1515
from sift.ingestion_configs.v2.ingestion_configs_pb2 import (
1616
IngestionConfig as IngestionConfigProto,
1717
)
18-
from sift_stream_bindings import (
19-
ChannelBitFieldElementPy,
20-
ChannelConfigPy,
21-
ChannelDataTypePy,
22-
ChannelEnumTypePy,
23-
FlowConfigPy,
24-
IngestWithConfigDataChannelValuePy,
25-
)
2618

2719
from sift_client.sift_types._base import BaseType
2820
from sift_client.sift_types.channel import ChannelBitFieldElement, ChannelDataType
2921

3022
if TYPE_CHECKING:
3123
from datetime import datetime
3224

25+
from sift_stream_bindings import (
26+
ChannelConfigPy,
27+
ChannelDataTypePy,
28+
FlowConfigPy,
29+
IngestWithConfigDataChannelValuePy,
30+
)
31+
3332
from sift_client.client import SiftClient
3433
from sift_client.sift_types.channel import Channel
3534

@@ -179,6 +178,8 @@ def _to_proto(self) -> FlowConfig:
179178
)
180179

181180
def _to_rust_config(self) -> FlowConfigPy:
181+
from sift_stream_bindings import FlowConfigPy
182+
182183
return FlowConfigPy(
183184
name=self.name,
184185
channels=[_channel_to_rust_config(channel) for channel in self.channels],
@@ -218,6 +219,12 @@ def ingest(self, *, timestamp: datetime, channel_values: dict[str, Any]):
218219

219220
# Converter functions.
220221
def _channel_to_rust_config(channel: ChannelConfig) -> ChannelConfigPy:
222+
from sift_stream_bindings import (
223+
ChannelBitFieldElementPy,
224+
ChannelConfigPy,
225+
ChannelEnumTypePy,
226+
)
227+
221228
return ChannelConfigPy(
222229
name=channel.name,
223230
data_type=_to_rust_type(channel.data_type),
@@ -252,6 +259,8 @@ def _rust_channel_value_from_bitfield(
252259
Returns:
253260
A ChannelValuePy object.
254261
"""
262+
from sift_stream_bindings import IngestWithConfigDataChannelValuePy
263+
255264
assert channel.bit_field_elements is not None
256265
# We expect individual ints or bytes to represent full bitfield values.
257266
if isinstance(value, bytes) or isinstance(value, int):
@@ -276,6 +285,8 @@ def _rust_channel_value_from_bitfield(
276285

277286

278287
def _to_rust_value(channel: ChannelConfig, value: Any) -> IngestWithConfigDataChannelValuePy:
288+
from sift_stream_bindings import IngestWithConfigDataChannelValuePy
289+
279290
if value is None:
280291
return IngestWithConfigDataChannelValuePy.empty()
281292
if channel.data_type == ChannelDataType.ENUM and channel.enum_types is not None:
@@ -314,6 +325,8 @@ def _to_rust_value(channel: ChannelConfig, value: Any) -> IngestWithConfigDataCh
314325

315326

316327
def _to_rust_type(data_type: ChannelDataType) -> ChannelDataTypePy:
328+
from sift_stream_bindings import ChannelDataTypePy
329+
317330
if data_type == ChannelDataType.DOUBLE:
318331
return ChannelDataTypePy.Double
319332
elif data_type == ChannelDataType.FLOAT:

python/lib/sift_client/sift_types/run.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class RunBase(ModelCreateUpdateBase):
126126
tags: list[str] | None = None
127127
metadata: dict[str, str | float | bool] | None = None
128128

129-
_to_proto_helpers: ClassVar = {
129+
_to_proto_helpers: ClassVar[dict[str, MappingHelper]] = {
130130
"metadata": MappingHelper(
131131
proto_attr_path="metadata",
132132
update_field="metadata",
@@ -158,7 +158,7 @@ class RunCreate(RunBase, ModelCreate[CreateRunRequestProto]):
158158
stop_time: datetime | None = None
159159
organization_id: str | None = None
160160

161-
_to_proto_helpers: ClassVar = {
161+
_to_proto_helpers: ClassVar[dict[str, MappingHelper]] = {
162162
"metadata": MappingHelper(
163163
proto_attr_path="metadata",
164164
update_field="metadata",
@@ -180,7 +180,7 @@ class RunUpdate(RunBase, ModelUpdate[RunProto]):
180180
stop_time: datetime | None = None
181181
is_archived: bool | None = None
182182

183-
_to_proto_helpers: ClassVar = {
183+
_to_proto_helpers: ClassVar[dict[str, MappingHelper]] = {
184184
"metadata": MappingHelper(
185185
proto_attr_path="metadata",
186186
update_field="metadata",

python/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ dependencies = [
2222
"PyYAML~=6.0",
2323
"pandas~=2.0",
2424
"protobuf>=4.0",
25-
"pydantic~=2.0",
25+
"pydantic~=2.10",
2626
# Support python 3.9+ typing in older versons of python.
2727
"eval-type-backport~=0.2",
2828
"pydantic_core~=2.3",

0 commit comments

Comments
 (0)