Skip to content
This repository was archived by the owner on Jun 10, 2025. It is now read-only.

Commit 97915f2

Browse files
author
Mathieu Gascon-Lefebvre
committed
Batching: Keep ids and tags in BatchingTopic and BatchingInventory.
1 parent 28bd246 commit 97915f2

4 files changed

Lines changed: 61 additions & 2 deletions

File tree

src/saturn_engine/worker/inventories/batching.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,8 @@ async def iterate(self, after: Optional[str] = None) -> AsyncIterator[Item]:
4141
return
4242

4343
after = batch[-1].id
44-
yield Item(id=after, args={"batch": [item.args for item in batch]})
44+
yield Item(
45+
id=after,
46+
args={"batch": [item.args for item in batch]},
47+
tags={"batched_ids": ", ".join([item.id for item in batch])},
48+
)

src/saturn_engine/worker/topics/batching.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import asyncio
55
import contextlib
66
import dataclasses
7+
import json
78
from collections.abc import AsyncGenerator
89
from contextlib import asynccontextmanager
910
from datetime import datetime
@@ -109,6 +110,7 @@ async def message_context(
109110
) -> AsyncIterator[TopicMessage]:
110111
context = contextlib.AsyncExitStack()
111112
message_args: list[dict] = []
113+
tags: dict[str, str] = {}
112114

113115
for message_context in batch:
114116
message: TopicMessage
@@ -117,6 +119,7 @@ async def message_context(
117119
else:
118120
message = message_context
119121
message_args.append(message.args)
122+
tags[f"{message.id}_tags"] = json.dumps(message.tags)
120123

121124
async with context:
122-
yield TopicMessage(args={"batch": message_args})
125+
yield TopicMessage(args={"batch": message_args}, tags=tags)

tests/worker/inventories/test_batching_inventory.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,21 @@ async def test_batching_inventory() -> None:
3232
},
3333
),
3434
]
35+
assert [i.tags for i in items] == [
36+
{"batched_ids": "0, 1, 2"},
37+
{"batched_ids": "3, 4, 5"},
38+
{"batched_ids": "6, 7, 8"},
39+
{"batched_ids": "9"},
40+
]
3541

3642
items = await alib.list(inventory.iterate(after="4"))
3743

3844
assert [(i.id, i.args) for i in items] == [
3945
("7", {"batch": [{"a": "5"}, {"a": "6"}, {"a": "7"}]}),
4046
("9", {"batch": [{"a": "8"}, {"a": "9"}]}),
4147
]
48+
49+
assert [i.tags for i in items] == [
50+
{"batched_ids": "5, 6, 7"},
51+
{"batched_ids": "8, 9"},
52+
]

tests/worker/topics/test_batching_topic.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,44 @@ async def test_batching_topic_context_manager(
149149

150150
assert batch_number == 2
151151
await topic.close()
152+
153+
154+
@pytest.mark.asyncio
155+
async def test_batching_topic_tags() -> None:
156+
BATCH_SIZE = 5
157+
158+
topic = BatchingTopic(
159+
options=BatchingTopic.Options(
160+
topic=TopicItem(
161+
name="static-topic-with-tags",
162+
type="StaticTopic",
163+
options={
164+
"messages": [
165+
{"id": 1, "args": {}, "tags": {"hello": "1"}},
166+
{"id": 2, "args": {}, "tags": {"hello": "2"}},
167+
{"id": 3, "args": {}, "tags": {"hello": "3"}},
168+
{"id": 4, "args": {}, "tags": {"hello": "4"}},
169+
{"id": 5, "args": {}, "tags": {"hello": "5"}},
170+
],
171+
},
172+
),
173+
batch_size=BATCH_SIZE,
174+
),
175+
services=ServicesNamespace(strict=False),
176+
)
177+
178+
async with alib.scoped_iter(topic.run()) as scoped_topic_iter:
179+
context = await scoped_topic_iter.__anext__()
180+
assert isinstance(context, AsyncContextManager)
181+
async with context as message:
182+
...
183+
184+
await topic.close()
185+
186+
assert message.tags == {
187+
"1_tags": '{"hello": "1"}',
188+
"2_tags": '{"hello": "2"}',
189+
"3_tags": '{"hello": "3"}',
190+
"4_tags": '{"hello": "4"}',
191+
"5_tags": '{"hello": "5"}',
192+
}

0 commit comments

Comments
 (0)