Skip to content

Commit 388e94f

Browse files
committed
update ingestion to use ChannelConfig instead of Channel
1 parent e2feb35 commit 388e94f

5 files changed

Lines changed: 115 additions & 31 deletions

File tree

python/lib/sift_client/resources/rules.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,9 @@ async def update(
156156
if isinstance(update, dict):
157157
update = RuleUpdate.model_validate(update)
158158

159-
updated_rule = await self._low_level_client.update_rule(rule=rule_obj, update=update, version_notes=version_notes)
159+
updated_rule = await self._low_level_client.update_rule(
160+
rule=rule_obj, update=update, version_notes=version_notes
161+
)
160162
return self._apply_client_to_instance(updated_rule)
161163

162164
async def archive(self, rule: str | Rule) -> Rule:

python/lib/sift_client/sift_types/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
ChannelDataType,
1010
ChannelReference,
1111
)
12-
from sift_client.sift_types.ingestion import IngestionConfig
12+
from sift_client.sift_types.ingestion import ChannelConfig, Flow, IngestionConfig
1313
from sift_client.sift_types.rule import (
1414
Rule,
1515
RuleAction,
@@ -27,8 +27,10 @@
2727
"CalculatedChannelUpdate",
2828
"Channel",
2929
"ChannelBitFieldElement",
30+
"ChannelConfig",
3031
"ChannelDataType",
3132
"ChannelReference",
33+
"Flow",
3234
"IngestionConfig",
3335
"Rule",
3436
"RuleAction",

python/lib/sift_client/sift_types/_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class BaseType(BaseModel, Generic[ProtoT, SelfT], ABC):
1818
model_config = ConfigDict(frozen=True)
1919

2020
id_: str | None = None
21-
proto: Any | None = Field(default=None, exclude=True) # For user reference only
21+
proto: Any | None = Field(default=None, exclude=True) # For user reference only
2222
_client: SiftClient | None = None
2323

2424
@property

python/lib/sift_client/sift_types/channel.py

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
Uint32Values,
2525
Uint64Values,
2626
)
27-
from sift.ingestion_configs.v2.ingestion_configs_pb2 import ChannelConfig
2827

2928
from sift_client.sift_types._base import BaseType
3029

@@ -235,9 +234,7 @@ def _enum_types_from_proto_list(enum_types: list[ChannelEnumTypePb]) -> dict[str
235234
return {enum.name: enum.key for enum in enum_types}
236235

237236
@classmethod
238-
def _from_proto(
239-
cls, proto: ChannelProto, sift_client: SiftClient | None = None
240-
) -> Channel:
237+
def _from_proto(cls, proto: ChannelProto, sift_client: SiftClient | None = None) -> Channel:
241238
return cls(
242239
proto=proto,
243240
id_=proto.channel_id,
@@ -257,19 +254,6 @@ def _from_proto(
257254
_client=sift_client,
258255
)
259256

260-
261-
def _to_config_proto(self) -> ChannelConfig:
262-
return ChannelConfig(
263-
name=self.name,
264-
data_type=self.data_type.value,
265-
description=self.description, # type: ignore
266-
unit=self.unit, # type: ignore
267-
bit_field_elements=[el._to_proto() for el in self.bit_field_elements]
268-
if self.bit_field_elements
269-
else None,
270-
enum_types=self._enum_types_to_proto_list(self.enum_types),
271-
)
272-
273257
def data(
274258
self,
275259
*,

python/lib/sift_client/sift_types/ingestion.py

Lines changed: 107 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
from typing import TYPE_CHECKING, Any
55

66
from google.protobuf.empty_pb2 import Empty
7-
from pydantic import ConfigDict
7+
from pydantic import ConfigDict, model_validator
88
from sift.ingest.v1.ingest_pb2 import IngestWithConfigDataChannelValue
9+
from sift.ingestion_configs.v2.ingestion_configs_pb2 import (
10+
ChannelConfig as ChannelConfigProto,
11+
)
912
from sift.ingestion_configs.v2.ingestion_configs_pb2 import (
1013
FlowConfig,
1114
)
@@ -22,12 +25,13 @@
2225
)
2326

2427
from sift_client.sift_types._base import BaseType
25-
from sift_client.sift_types.channel import Channel, ChannelDataType
28+
from sift_client.sift_types.channel import ChannelBitFieldElement, ChannelDataType
2629

2730
if TYPE_CHECKING:
2831
from datetime import datetime
2932

3033
from sift_client.client import SiftClient
34+
from sift_client.sift_types.channel import Channel
3135

3236

3337
class IngestionConfig(BaseType[IngestionConfigProto, "IngestionConfig"]):
@@ -49,6 +53,98 @@ def _from_proto(
4953
)
5054

5155

56+
class ChannelConfig(BaseType[ChannelConfigProto, "ChannelConfig"]):
57+
"""Channel configuration model for ingestion purposes.
58+
59+
This model contains only the fields needed for ingestion configuration,
60+
without the full metadata from the Channels API.
61+
"""
62+
63+
model_config = ConfigDict(frozen=False)
64+
name: str
65+
data_type: ChannelDataType
66+
description: str | None = None
67+
unit: str | None = None
68+
bit_field_elements: list[ChannelBitFieldElement] | None = None
69+
enum_types: dict[str, int] | None = None
70+
71+
@model_validator(mode="after")
72+
def _validate_enum_types(self):
73+
"""Validate that enum_types is provided when data_type is ENUM."""
74+
if self.data_type == ChannelDataType.ENUM and not self.enum_types:
75+
raise ValueError(
76+
f"Channel '{self.name}' has data_type ENUM but enum_types is not provided"
77+
)
78+
return self
79+
80+
@classmethod
81+
def _from_proto(
82+
cls, proto: ChannelConfigProto, sift_client: SiftClient | None = None
83+
) -> ChannelConfig:
84+
"""Create ChannelConfig from ChannelConfigProto."""
85+
return cls(
86+
proto=proto,
87+
name=proto.name,
88+
data_type=ChannelDataType(proto.data_type),
89+
description=proto.description if proto.description else None,
90+
unit=proto.unit if proto.unit else None,
91+
bit_field_elements=[
92+
ChannelBitFieldElement._from_proto(el) for el in proto.bit_field_elements
93+
]
94+
if proto.bit_field_elements
95+
else None,
96+
enum_types={enum.name: enum.key for enum in proto.enum_types}
97+
if proto.enum_types
98+
else None,
99+
_client=sift_client,
100+
)
101+
102+
@classmethod
103+
def from_channel(cls, channel: Channel) -> ChannelConfig:
104+
"""Create ChannelConfig from a Channel.
105+
106+
Args:
107+
channel: The Channel to convert.
108+
109+
Returns:
110+
A ChannelConfig with the channel's configuration data.
111+
"""
112+
return cls(
113+
name=channel.name,
114+
data_type=channel.data_type,
115+
description=channel.description,
116+
unit=channel.unit,
117+
bit_field_elements=channel.bit_field_elements if channel.bit_field_elements else None,
118+
enum_types=channel.enum_types,
119+
)
120+
121+
def _to_config_proto(self) -> ChannelConfigProto:
122+
"""Convert to ChannelConfigProto for ingestion."""
123+
from sift.common.type.v1.channel_bit_field_element_pb2 import (
124+
ChannelBitFieldElement as ChannelBitFieldElementPb,
125+
)
126+
from sift.common.type.v1.channel_enum_type_pb2 import ChannelEnumType as ChannelEnumTypePb
127+
128+
return ChannelConfigProto(
129+
name=self.name,
130+
data_type=self.data_type.value,
131+
description=self.description or "",
132+
unit=self.unit or "",
133+
bit_field_elements=[
134+
ChannelBitFieldElementPb(
135+
name=bfe.name,
136+
index=bfe.index,
137+
bit_count=bfe.bit_count,
138+
)
139+
for bfe in self.bit_field_elements or []
140+
],
141+
enum_types=[
142+
ChannelEnumTypePb(name=name, key=key)
143+
for name, key in (self.enum_types or {}).items()
144+
],
145+
)
146+
147+
52148
class Flow(BaseType[FlowConfig, "Flow"]):
53149
"""Model representing a data flow for ingestion.
54150
@@ -57,7 +153,7 @@ class Flow(BaseType[FlowConfig, "Flow"]):
57153

58154
model_config = ConfigDict(frozen=False)
59155
name: str
60-
channels: list[Channel]
156+
channels: list[ChannelConfig]
61157
ingestion_config_id: str | None = None
62158
run_id: str | None = None
63159

@@ -66,7 +162,7 @@ def _from_proto(cls, proto: FlowConfig, sift_client: SiftClient | None = None) -
66162
return cls(
67163
proto=proto,
68164
name=proto.name,
69-
channels=[Channel._from_proto(channel) for channel in proto.channels],
165+
channels=[ChannelConfig._from_proto(channel) for channel in proto.channels],
70166
_client=sift_client,
71167
)
72168

@@ -82,11 +178,11 @@ def _to_rust_config(self) -> FlowConfigPy:
82178
channels=[_channel_to_rust_config(channel) for channel in self.channels],
83179
)
84180

85-
def add_channel(self, channel: Channel):
86-
"""Add a Channel to this Flow.
181+
def add_channel(self, channel: ChannelConfig):
182+
"""Add a ChannelConfig to this Flow.
87183
88184
Args:
89-
channel: The Channel to add.
185+
channel: The ChannelConfig to add.
90186
91187
Raises:
92188
ValueError: If the flow has already been created with an ingestion config.
@@ -115,7 +211,7 @@ def ingest(self, *, timestamp: datetime, channel_values: dict[str, Any]):
115211

116212

117213
# Converter functions.
118-
def _channel_to_rust_config(channel: Channel) -> ChannelConfigPy:
214+
def _channel_to_rust_config(channel: ChannelConfig) -> ChannelConfigPy:
119215
return ChannelConfigPy(
120216
name=channel.name,
121217
data_type=_to_rust_type(channel.data_type),
@@ -135,7 +231,7 @@ def _channel_to_rust_config(channel: Channel) -> ChannelConfigPy:
135231

136232

137233
def _rust_channel_value_from_bitfield(
138-
channel: Channel, value: Any
234+
channel: ChannelConfig, value: Any
139235
) -> IngestWithConfigDataChannelValuePy:
140236
"""Helper function to convert a bitfield value to a ChannelValuePy object.
141237
@@ -171,10 +267,10 @@ def _rust_channel_value_from_bitfield(
171267
return IngestWithConfigDataChannelValuePy.bitfield(byte_array)
172268

173269

174-
def _to_rust_value(channel: Channel, value: Any) -> IngestWithConfigDataChannelValuePy:
270+
def _to_rust_value(channel: ChannelConfig, value: Any) -> IngestWithConfigDataChannelValuePy:
175271
if value is None:
176272
return IngestWithConfigDataChannelValuePy.empty()
177-
if channel.data_type == ChannelDataType.ENUM:
273+
if channel.data_type == ChannelDataType.ENUM and channel.enum_types is not None:
178274
enum_name = value
179275
enum_val = channel.enum_types.get(enum_name)
180276
if enum_val is None:

0 commit comments

Comments
 (0)