Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit 762c350

Browse files
authored
refactor!: replace retention_age with retention_policy (#104)
1 parent ef981e7 commit 762c350

7 files changed

Lines changed: 88 additions & 54 deletions

File tree

protos

src/streamstore/_lib/s2/v1alpha/s2_pb2.py

Lines changed: 36 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/streamstore/_lib/s2/v1alpha/s2_pb2.pyi

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ class ReadSessionResponse(_message.Message):
637637
) -> None: ...
638638

639639
class StreamConfig(_message.Message):
640-
__slots__ = ("storage_class", "age", "timestamping", "delete_on_empty")
640+
__slots__ = ("storage_class", "age", "infinite", "timestamping", "delete_on_empty")
641641
class Timestamping(_message.Message):
642642
__slots__ = ("mode", "uncapped")
643643
MODE_FIELD_NUMBER: _ClassVar[int]
@@ -656,18 +656,25 @@ class StreamConfig(_message.Message):
656656
min_age_secs: int
657657
def __init__(self, min_age_secs: _Optional[int] = ...) -> None: ...
658658

659+
class InfiniteRetention(_message.Message):
660+
__slots__ = ()
661+
def __init__(self) -> None: ...
662+
659663
STORAGE_CLASS_FIELD_NUMBER: _ClassVar[int]
660664
AGE_FIELD_NUMBER: _ClassVar[int]
665+
INFINITE_FIELD_NUMBER: _ClassVar[int]
661666
TIMESTAMPING_FIELD_NUMBER: _ClassVar[int]
662667
DELETE_ON_EMPTY_FIELD_NUMBER: _ClassVar[int]
663668
storage_class: StorageClass
664669
age: int
670+
infinite: StreamConfig.InfiniteRetention
665671
timestamping: StreamConfig.Timestamping
666672
delete_on_empty: StreamConfig.DeleteOnEmpty
667673
def __init__(
668674
self,
669675
storage_class: _Optional[_Union[StorageClass, str]] = ...,
670676
age: _Optional[int] = ...,
677+
infinite: _Optional[_Union[StreamConfig.InfiniteRetention, _Mapping]] = ...,
671678
timestamping: _Optional[_Union[StreamConfig.Timestamping, _Mapping]] = ...,
672679
delete_on_empty: _Optional[_Union[StreamConfig.DeleteOnEmpty, _Mapping]] = ...,
673680
) -> None: ...

src/streamstore/_mappers.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from datetime import datetime
2-
from typing import cast
2+
from typing import Literal, cast
33

44
from google.protobuf.internal.containers import RepeatedCompositeFieldContainer
55

@@ -127,15 +127,18 @@ def stream_config_message(
127127
stream_config = msgs.StreamConfig()
128128
if config:
129129
storage_class = config.storage_class
130-
retention_age = config.retention_age
130+
retention_policy = config.retention_policy
131131
timestamping = config.timestamping
132132
delete_on_empty_min_age = config.delete_on_empty_min_age
133133
if storage_class is not None:
134134
paths.append(f"{mask_path_prefix}storage_class")
135135
stream_config.storage_class = storage_class.value
136-
if retention_age is not None:
136+
if retention_policy is not None:
137137
paths.append(f"{mask_path_prefix}retention_policy")
138-
stream_config.age = retention_age
138+
if retention_policy == "infinite":
139+
stream_config.infinite.CopyFrom(msgs.StreamConfig.InfiniteRetention())
140+
else:
141+
stream_config.age = retention_policy
139142
if timestamping is not None:
140143
paths.append(f"{mask_path_prefix}timestamping")
141144
if timestamping.mode is not None:
@@ -183,9 +186,19 @@ def basin_config_message(
183186

184187

185188
def stream_config_schema(config: msgs.StreamConfig) -> StreamConfig:
189+
retention_policy: int | Literal["infinite"]
190+
match config.WhichOneof("retention_policy"):
191+
case "age":
192+
retention_policy = config.age
193+
case "infinite":
194+
retention_policy = "infinite"
195+
case _:
196+
raise RuntimeError(
197+
"StreamConfig retention_policy doesn't match any of the expected values"
198+
)
186199
return StreamConfig(
187200
StorageClass(config.storage_class),
188-
config.age,
201+
retention_policy,
189202
Timestamping(
190203
mode=TimestampingMode(config.timestamping.mode),
191204
uncapped=config.timestamping.uncapped,

src/streamstore/schemas.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from dataclasses import dataclass, field
3636
from datetime import datetime
3737
from enum import Enum
38-
from typing import Generic, TypeVar
38+
from typing import Generic, Literal, TypeVar
3939

4040
from streamstore._exceptions import fallible
4141

@@ -294,13 +294,15 @@ class StreamConfig:
294294
#:
295295
#: If not specified, the default is :attr:`.StorageClass.EXPRESS`.
296296
storage_class: StorageClass | None = None
297-
#: Age in seconds for automatic trimming of records older than this threshold.
297+
#: Retention policy for records in this stream.
298298
#:
299-
#: If not specified, the default is to retain records for 7 days.
299+
#: Retention duration in seconds to automatically trim records older than this duration.
300300
#:
301-
#: If set to ``0``, the stream will have infinite retention.
301+
#: ``'infinite'`` to retain records indefinitely.
302302
#: (While S2 is in public preview, this is capped at 28 days. Let us know if you'd like the cap removed.)
303-
retention_age: int | None = None
303+
#:
304+
#: If not specified, the default is to retain records for 7 days.
305+
retention_policy: int | Literal["infinite"] | None = None
304306
#: Timestamping behavior for appends to this stream, which influences how timestamps are handled.
305307
timestamping: Timestamping | None = None
306308
#: Minimum age in seconds before this stream can be automatically deleted if empty.

tests/test_account_ops.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ async def test_create_basin_with_config(self, s2: S2, basin_name: str):
3636
config = BasinConfig(
3737
default_stream_config=StreamConfig(
3838
storage_class=StorageClass.STANDARD,
39-
retention_age=86400 * 7,
39+
retention_policy=86400 * 7,
4040
timestamping=Timestamping(
4141
mode=TimestampingMode.CLIENT_REQUIRE,
4242
uncapped=True,
@@ -60,7 +60,7 @@ async def test_reconfigure_basin(self, s2: S2, basin: Basin):
6060
config = BasinConfig(
6161
default_stream_config=StreamConfig(
6262
storage_class=StorageClass.STANDARD,
63-
retention_age=3600,
63+
retention_policy=3600,
6464
),
6565
create_stream_on_append=True,
6666
)
@@ -73,8 +73,8 @@ async def test_reconfigure_basin(self, s2: S2, basin: Basin):
7373
== config.default_stream_config.storage_class
7474
)
7575
assert (
76-
updated_config.default_stream_config.retention_age
77-
== config.default_stream_config.retention_age
76+
updated_config.default_stream_config.retention_policy
77+
== config.default_stream_config.retention_policy
7878
)
7979
assert updated_config.create_stream_on_append == config.create_stream_on_append
8080

tests/test_basin_ops.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ async def test_create_stream_with_config(
3030

3131
config = StreamConfig(
3232
storage_class=StorageClass.STANDARD,
33-
retention_age=86400 * 3,
33+
retention_policy=86400 * 3,
3434
timestamping=Timestamping(
3535
mode=TimestampingMode.ARRIVAL,
3636
uncapped=False,
@@ -53,14 +53,13 @@ async def test_default_stream_config(self, shared_basin: Basin, stream: Stream):
5353

5454
config = await basin.get_stream_config(stream.name)
5555
assert config.storage_class == StorageClass.EXPRESS
56-
assert config.retention_age == 86400 * 7
56+
assert config.retention_policy == 86400 * 7
5757

5858
async def test_reconfigure_stream(self, shared_basin: Basin, stream: Stream):
5959
basin = shared_basin
60-
6160
config = StreamConfig(
6261
storage_class=StorageClass.STANDARD,
63-
retention_age=86400 * 21,
62+
retention_policy="infinite",
6463
timestamping=Timestamping(
6564
mode=TimestampingMode.CLIENT_REQUIRE, uncapped=True
6665
),
@@ -70,6 +69,17 @@ async def test_reconfigure_stream(self, shared_basin: Basin, stream: Stream):
7069
updated_config = await basin.reconfigure_stream(stream.name, config)
7170
assert updated_config == config
7271

72+
config = StreamConfig(
73+
storage_class=StorageClass.EXPRESS,
74+
retention_policy=86400 * 90,
75+
timestamping=Timestamping(
76+
mode=TimestampingMode.CLIENT_PREFER, uncapped=False
77+
),
78+
delete_on_empty_min_age=3600,
79+
)
80+
updated_config = await basin.reconfigure_stream(stream.name, config)
81+
assert updated_config == config
82+
7383
async def test_list_streams(self, shared_basin: Basin, stream_names: list[str]):
7484
basin = shared_basin
7585

0 commit comments

Comments
 (0)