Skip to content

Commit 15b7d35

Browse files
authored
python(feat): Updates for new sift-stream-bindings API (#554)
1 parent c8bfc4c commit 15b7d35

5 files changed

Lines changed: 350 additions & 174 deletions

File tree

python/CHANGELOG.md

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,248 @@ All notable changes to this project will be documented in this file.
33

44
This project adheres to [Semantic Versioning](http://semver.org/).
55

6+
## [v0.15.0] - May 7, 2026
7+
8+
### What's New
9+
10+
v0.15.0 updates the streaming ingestion client to match the new `sift-stream-bindings` 0.3.0
11+
API. The `RecoveryStrategyConfig` class and `recovery_strategy` parameter have been replaced
12+
with an explicit `StreamingMode` enum and discrete per-mode configuration kwargs. The
13+
non-blocking send method has been renamed for consistency with the Rust library, and a new
14+
`try_send` method is available for single-flow non-blocking sends. This release contains
15+
**breaking changes** to the ingestion API — see below for details and a migration prompt.
16+
17+
#### Breaking Changes
18+
19+
##### 1. `RecoveryStrategyConfig` Removed — Replaced by `StreamingMode` + Per-Mode Kwargs
20+
21+
`RecoveryStrategyConfig` and the `recovery_strategy` parameter on `IngestionConfigStreamingClient`
22+
and `IngestionAPIAsync` have been removed. Transport mode is now selected via a `StreamingMode`
23+
enum, with separate `retry_policy` and `disk_backup_policy` kwargs for per-mode configuration.
24+
25+
The default mode is `StreamingMode.LIVE_WITH_BACKUPS`, which matches the previous default
26+
behavior of `RecoveryStrategyConfig.retry_with_backups()`.
27+
28+
**Removed from the public API:**
29+
- `RecoveryStrategyConfig` class (and its `retry_only()` / `retry_with_backups()` factory methods)
30+
- `recovery_strategy` parameter on `IngestionConfigStreamingClient` and `IngestionAPIAsync`
31+
32+
**Added:**
33+
- `StreamingMode` enum — `LIVE_ONLY`, `LIVE_WITH_BACKUPS`, `FILE_BACKUP`
34+
- `streaming_mode` parameter (default: `StreamingMode.LIVE_WITH_BACKUPS`)
35+
- `retry_policy` parameter — applies to `LIVE_WITH_BACKUPS` mode
36+
- `disk_backup_policy` parameter — applies to `LIVE_WITH_BACKUPS` and `FILE_BACKUP` modes
37+
- `checkpoint_interval_seconds` parameter — applies to `LIVE_WITH_BACKUPS` mode
38+
39+
**Before:**
40+
```python
41+
from sift_client.resources.ingestion import (
42+
IngestionConfigStreamingClient,
43+
RecoveryStrategyConfig,
44+
)
45+
46+
# Default: live streaming with backups
47+
client = await IngestionConfigStreamingClient.create(
48+
ingestion_config=my_config,
49+
recovery_strategy=RecoveryStrategyConfig.retry_with_backups(),
50+
)
51+
52+
# Retry only (no disk backups)
53+
client = await IngestionConfigStreamingClient.create(
54+
ingestion_config=my_config,
55+
recovery_strategy=RecoveryStrategyConfig.retry_only(),
56+
)
57+
```
58+
59+
**After:**
60+
```python
61+
from sift_client.resources.ingestion import (
62+
IngestionConfigStreamingClient,
63+
StreamingMode,
64+
)
65+
66+
# Default: live streaming with backups (no change needed if you were using the default)
67+
client = await IngestionConfigStreamingClient.create(
68+
ingestion_config=my_config,
69+
streaming_mode=StreamingMode.LIVE_WITH_BACKUPS,
70+
)
71+
72+
# Live only — no disk backups, lowest overhead
73+
client = await IngestionConfigStreamingClient.create(
74+
ingestion_config=my_config,
75+
streaming_mode=StreamingMode.LIVE_ONLY,
76+
)
77+
78+
# File backup only
79+
client = await IngestionConfigStreamingClient.create(
80+
ingestion_config=my_config,
81+
streaming_mode=StreamingMode.FILE_BACKUP,
82+
)
83+
```
84+
85+
To pass a custom retry or disk backup policy:
86+
```python
87+
from sift_stream_bindings import DiskBackupPolicyPy, RetryPolicyPy
88+
89+
client = await IngestionConfigStreamingClient.create(
90+
ingestion_config=my_config,
91+
streaming_mode=StreamingMode.LIVE_WITH_BACKUPS,
92+
retry_policy=RetryPolicyPy.default(),
93+
disk_backup_policy=DiskBackupPolicyPy.default(),
94+
checkpoint_interval_seconds=30,
95+
)
96+
```
97+
98+
##### 2. `send_requests_nonblocking` Renamed to `try_send_requests`
99+
100+
`IngestionConfigStreamingClient.send_requests_nonblocking` has been renamed to `try_send_requests`
101+
to align with the Rust `sift-stream` naming convention where `try_` methods return immediately
102+
without awaiting channel capacity.
103+
104+
**Before:**
105+
```python
106+
client.send_requests_nonblocking(requests)
107+
```
108+
109+
**After:**
110+
```python
111+
client.try_send_requests(requests)
112+
```
113+
114+
#### New Features
115+
116+
##### `try_send` — Non-Blocking Single-Flow Send
117+
118+
A new `try_send(flow)` method is available on `IngestionConfigStreamingClient` for non-blocking
119+
single-flow sends. It accepts either a `Flow` or a raw `FlowPy` object.
120+
121+
```python
122+
client.try_send(flow)
123+
```
124+
125+
Use `try_send` in real-time loops where blocking on channel capacity is unacceptable. For
126+
most use cases, the async `send(flow)` method (which applies backpressure) is preferred.
127+
128+
##### `sift-stream-bindings` 0.3.0
129+
130+
The `sift-stream-bindings` dependency has been bumped to 0.3.0, which reflects the
131+
`sift-stream` 0.9.0 breaking API changes (stepped builder, send rename, removed types).
132+
133+
#### AI-Assisted Migration Prompt (v0.14.x → v0.15.0)
134+
135+
Copy and paste the following prompt to an AI coding agent to automate the upgrade:
136+
137+
```
138+
You are upgrading a Python project from sift_client v0.14.x to v0.15.0. The streaming ingestion
139+
API has breaking changes. Apply ALL of the following changes precisely. Do not make any other
140+
modifications.
141+
142+
---
143+
144+
## 1. Update `pyproject.toml`
145+
146+
Find every occurrence of `sift-stream-bindings==0.2.2` and replace it with
147+
`sift-stream-bindings==0.3.0`. This may appear under multiple dependency groups (e.g. `all`,
148+
`dev-all`, `sift-stream`, `sift-stream-bindings`).
149+
150+
---
151+
152+
## 2. Remove all imports of `RecoveryStrategyConfig`
153+
154+
Delete any line that imports `RecoveryStrategyConfig`, for example:
155+
156+
from sift_client.resources.ingestion import RecoveryStrategyConfig
157+
from sift_client.resources.ingestion import IngestionConfigStreamingClient, RecoveryStrategyConfig
158+
159+
Remove only `RecoveryStrategyConfig` from those imports; keep any other names on the same line.
160+
161+
---
162+
163+
## 3. Add `StreamingMode` to imports where needed
164+
165+
Wherever `IngestionConfigStreamingClient` or `IngestionAPIAsync` is imported and a streaming
166+
mode needs to be specified, add `StreamingMode` to the import:
167+
168+
from sift_client.resources.ingestion import IngestionConfigStreamingClient, StreamingMode
169+
170+
---
171+
172+
## 4. Replace all `recovery_strategy` call sites
173+
174+
Search for every call to `IngestionConfigStreamingClient.create(...)` and
175+
`IngestionAPIAsync.create(...)` that contains a `recovery_strategy` keyword argument.
176+
177+
### Case A — `RecoveryStrategyConfig.retry_with_backups()` (or no recovery_strategy at all)
178+
179+
This was (and remains) the default. Replace the kwarg:
180+
181+
# BEFORE
182+
recovery_strategy=RecoveryStrategyConfig.retry_with_backups()
183+
184+
# AFTER
185+
streaming_mode=StreamingMode.LIVE_WITH_BACKUPS
186+
187+
If the call had no `recovery_strategy` argument, no change is needed — `LIVE_WITH_BACKUPS`
188+
is the default.
189+
190+
If the old call passed explicit `retry_policy` or `disk_backup_policy` arguments inside
191+
`RecoveryStrategyConfig.retry_with_backups(...)`, move them to top-level kwargs:
192+
193+
# BEFORE
194+
recovery_strategy=RecoveryStrategyConfig.retry_with_backups(
195+
retry_policy=my_retry_policy,
196+
disk_backup_policy=my_disk_policy,
197+
)
198+
199+
# AFTER
200+
streaming_mode=StreamingMode.LIVE_WITH_BACKUPS,
201+
retry_policy=my_retry_policy,
202+
disk_backup_policy=my_disk_policy,
203+
204+
### Case B — `RecoveryStrategyConfig.retry_only()`
205+
206+
Replace with `streaming_mode=StreamingMode.LIVE_ONLY`. If a `retry_policy` was passed,
207+
keep it as a top-level kwarg (it is ignored for `LIVE_ONLY` in this version, but preserving
208+
it avoids a TypeError):
209+
210+
# BEFORE
211+
recovery_strategy=RecoveryStrategyConfig.retry_only(retry_policy=my_policy)
212+
213+
# AFTER
214+
streaming_mode=StreamingMode.LIVE_ONLY
215+
216+
### Case C — Raw `RecoveryStrategyPy` object passed directly
217+
218+
If any call passes a raw `RecoveryStrategyPy` instance as `recovery_strategy`, determine
219+
which mode it was configured for and replace accordingly:
220+
- `RecoveryStrategyPy.retry_only(...)` → `streaming_mode=StreamingMode.LIVE_ONLY`
221+
- `RecoveryStrategyPy.retry_with_backups(...)` → `streaming_mode=StreamingMode.LIVE_WITH_BACKUPS`
222+
with `retry_policy` and `disk_backup_policy` promoted to top-level kwargs.
223+
224+
---
225+
226+
## 5. Rename `send_requests_nonblocking` → `try_send_requests`
227+
228+
Find every call to `.send_requests_nonblocking(...)` on any ingestion client instance and
229+
rename it to `.try_send_requests(...)`. The signature is unchanged.
230+
231+
# BEFORE
232+
client.send_requests_nonblocking(requests)
233+
234+
# AFTER
235+
client.try_send_requests(requests)
236+
237+
---
238+
239+
## 6. Verify
240+
241+
After applying the above changes:
242+
1. Run `grep -r "RecoveryStrategyConfig" .` — expect zero results.
243+
2. Run `grep -r "send_requests_nonblocking" .` — expect zero results.
244+
3. Run `grep -r "recovery_strategy" .` — expect zero results.
245+
4. Run your test suite to confirm no remaining references.
246+
```
247+
6248
## [v0.14.1] - April 30, 2026
7249

8250
### Bugfixes

python/lib/sift_client/_internal/low_level_wrappers/ingestion.py

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,15 @@
3939
from datetime import datetime
4040

4141
from sift_stream_bindings import (
42-
DurationPy,
42+
DiskBackupPolicyPy,
4343
FlowConfigPy,
4444
FlowDescriptorPy,
4545
FlowPy,
4646
IngestionConfigFormPy,
4747
IngestWithConfigDataStreamRequestPy,
4848
IngestWithConfigDataStreamRequestWrapperPy,
4949
MetadataPy,
50-
RecoveryStrategyPy,
50+
RetryPolicyPy,
5151
RunFormPy,
5252
RunSelectorPy,
5353
SiftStreamBuilderPy,
@@ -56,7 +56,7 @@
5656
TimeValuePy,
5757
)
5858

59-
from sift_client.resources.ingestion import TracingConfig
59+
from sift_client.resources.ingestion import StreamingMode, TracingConfig
6060

6161

6262
def _to_rust_py_timestamp(time: datetime) -> TimeValuePy:
@@ -250,26 +250,32 @@ async def create_sift_stream_instance(
250250
cls,
251251
api_key: str,
252252
grpc_uri: str,
253-
ingestion_config: IngestionConfigFormPy,
253+
ingestion_config_form: IngestionConfigFormPy,
254254
run_form: RunFormPy | None = None,
255255
run_id: str | None = None,
256256
asset_tags: list[str] | None = None,
257257
asset_metadata: list[MetadataPy] | None = None,
258-
recovery_strategy: RecoveryStrategyPy | None = None,
259-
checkpoint_interval: DurationPy | None = None,
258+
streaming_mode: StreamingMode = ..., # type: ignore[assignment]
259+
retry_policy: RetryPolicyPy | None = None,
260+
disk_backup_policy: DiskBackupPolicyPy | None = None,
261+
checkpoint_interval_seconds: int | None = None,
260262
enable_tls: bool = True,
261263
tracing_config: TracingConfig | None = None,
262264
) -> IngestionConfigStreamingLowLevelClient:
263265
# Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users
264266
# TODO(nathan): Fix bindings to fix mypy issues with tracing functions
265267
from sift_stream_bindings import ( # type: ignore[attr-defined]
268+
DurationPy,
266269
SiftStreamBuilderPy,
267270
init_tracing, # type: ignore[attr-defined]
268271
init_tracing_with_file, # type: ignore[attr-defined]
269272
is_tracing_initialized, # type: ignore[attr-defined]
270-
) # type: ignore[attr-defined]
273+
)
274+
275+
from sift_client.resources.ingestion import StreamingMode, TracingConfig
271276

272-
from sift_client.resources.ingestion import TracingConfig
277+
if streaming_mode is ...: # type: ignore[comparison-overlap]
278+
streaming_mode = StreamingMode.LIVE_WITH_BACKUPS
273279

274280
if not is_tracing_initialized():
275281
if tracing_config is None:
@@ -287,21 +293,35 @@ async def create_sift_stream_instance(
287293
# Use stdout/stderr only
288294
init_tracing(tracing_config.level)
289295

290-
builder = SiftStreamBuilderPy(
291-
uri=grpc_uri,
292-
apikey=api_key,
293-
)
294-
295-
builder.enable_tls = enable_tls
296-
builder.ingestion_config = ingestion_config
297-
builder.recovery_strategy = recovery_strategy
298-
builder.checkpoint_interval = checkpoint_interval
299-
builder.asset_tags = asset_tags
300-
builder.metadata = asset_metadata
301-
builder.run = run_form
302-
builder.run_id = run_id
303-
304-
sift_stream_instance = await builder.build()
296+
sift_builder = SiftStreamBuilderPy(uri=grpc_uri, apikey=api_key)
297+
sift_builder.enable_tls = enable_tls
298+
299+
config_builder = sift_builder.ingestion_config(ingestion_config_form)
300+
config_builder.run = run_form
301+
config_builder.run_id = run_id
302+
config_builder.asset_tags = asset_tags
303+
config_builder.metadata = asset_metadata
304+
305+
if streaming_mode == StreamingMode.LIVE_ONLY:
306+
sift_stream_instance = await config_builder.live_only().build()
307+
308+
elif streaming_mode == StreamingMode.FILE_BACKUP:
309+
fb_builder = config_builder.file_backup()
310+
if disk_backup_policy is not None:
311+
fb_builder.disk_backup_policy = disk_backup_policy
312+
sift_stream_instance = await fb_builder.build()
313+
314+
else: # LIVE_WITH_BACKUPS (default)
315+
lwb_builder = config_builder.live_with_backups()
316+
if retry_policy is not None:
317+
lwb_builder.retry_policy = retry_policy
318+
if disk_backup_policy is not None:
319+
lwb_builder.disk_backup_policy = disk_backup_policy
320+
if checkpoint_interval_seconds is not None:
321+
lwb_builder.checkpoint_interval = DurationPy(
322+
secs=checkpoint_interval_seconds, nanos=0
323+
)
324+
sift_stream_instance = await lwb_builder.build()
305325

306326
return cls(sift_stream_instance)
307327

@@ -314,10 +334,13 @@ async def batch_send(self, flows: Iterable[FlowPy]):
314334
async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy]):
315335
await self._sift_stream_instance.send_requests(requests)
316336

317-
def send_requests_nonblocking(
337+
def try_send_requests(
318338
self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy]
319-
):
320-
self._sift_stream_instance.send_requests_nonblocking(requests)
339+
) -> None:
340+
self._sift_stream_instance.try_send_requests(requests)
341+
342+
def try_send(self, flow: FlowPy) -> None:
343+
self._sift_stream_instance.try_send(flow)
321344

322345
def get_flow_descriptor(self, flow_name: str) -> FlowDescriptorPy:
323346
return self._sift_stream_instance.get_flow_descriptor(flow_name)

python/lib/sift_client/resources/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ async def main():
154154
from sift_client.resources.calculated_channels import CalculatedChannelsAPIAsync
155155
from sift_client.resources.channels import ChannelsAPIAsync
156156
from sift_client.resources.file_attachments import FileAttachmentsAPIAsync
157-
from sift_client.resources.ingestion import IngestionAPIAsync, TracingConfig
157+
from sift_client.resources.ingestion import IngestionAPIAsync, StreamingMode, TracingConfig
158158
from sift_client.resources.jobs import JobsAPIAsync
159159
from sift_client.resources.ping import PingAPIAsync
160160
from sift_client.resources.reports import ReportsAPIAsync
@@ -200,6 +200,7 @@ async def main():
200200
"FileAttachmentsAPI",
201201
"FileAttachmentsAPIAsync",
202202
"IngestionAPIAsync",
203+
"StreamingMode",
203204
"JobsAPI",
204205
"JobsAPIAsync",
205206
"PingAPI",

0 commit comments

Comments
 (0)