Skip to content
This repository was archived by the owner on Jun 10, 2025. It is now read-only.

Commit 7b7bc16

Browse files
committed
Allow override the job cursors namespace
1 parent ed52418 commit 7b7bc16

5 files changed

Lines changed: 39 additions & 20 deletions

File tree

src/saturn_engine/stores/jobs_store.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -143,16 +143,16 @@ def sync_jobs_states(
143143
"enabled": False,
144144
}
145145
)
146+
147+
namespace = job_definition_by_name.get(job_id) or job_id
146148
for cursor, cursor_state in job_state.cursors_states.items():
147-
job_definition_name = job_definition_by_name.get(job_id)
148-
if job_definition_name:
149-
job_cursors.append(
150-
{
151-
"job_definition_name": job_definition_name,
152-
"cursor": cursor,
153-
"state": cursor_state,
154-
}
155-
)
149+
job_cursors.append(
150+
{
151+
"job_definition_name": namespace,
152+
"cursor": cursor,
153+
"state": cursor_state,
154+
}
155+
)
156156

157157
if job_values:
158158
job_values["name"] = job_id

src/saturn_engine/worker/services/job_state/service.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,13 @@ async def on_items_batched(self, batch: ItemsBatch) -> None:
9797
cursors = [
9898
c for i in batch.items if (c := i.metadata["job_state"]["state_cursor"])
9999
]
100+
namespace = (
101+
batch.job.config.get("job_state", {}).get("cursors_states_namespace")
102+
or batch.job.name
103+
)
104+
100105
cursors_states: dict = await self.fetch_cursors_states(
101-
batch.job.name, cursors=cursors
106+
namespace, cursors=cursors
102107
)
103108
for item in batch.items:
104109
metadata = item.metadata.setdefault("job_state", {})
@@ -114,12 +119,20 @@ async def on_pipeline_events_emitted(self, pevents: PipelineEventsEmitted) -> No
114119
for event in pevents.events:
115120
if not isinstance(event, CursorStateUpdated):
116121
continue
117-
job = pevents.xmsg.queue.definition.name
122+
118123
message = pevents.xmsg.message.message
119124
cursor = message.metadata.get("job_state", {}).get("state_cursor")
120125
if not cursor:
121126
continue
122-
self.set_job_cursor_state(job, cursor=cursor, cursor_state=event.state)
127+
128+
queue = pevents.xmsg.queue.definition
129+
namespace = (
130+
queue.config.get("job_state", {}).get("cursors_states_namespace")
131+
or queue.name
132+
)
133+
self.set_job_cursor_state(
134+
namespace, cursor=cursor, cursor_state=event.state
135+
)
123136

124137
def set_job_cursor(self, job_name: JobId, *, cursor: Cursor) -> None:
125138
self._store.set_job_cursor(job_name, cursor)

tests/worker/conftest.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,9 @@ async def fetch(
446446

447447

448448
@pytest.fixture
449-
async def inmemory_cursors_fetcher(services_manager: ServicesManager) -> None:
449+
async def inmemory_cursors_fetcher(
450+
services_manager: ServicesManager,
451+
) -> None:
450452
job_state_service = services_manager.services.cast_service(JobStateService)
451453
job_state_service._cursors_fetcher = InMemoryCursorsFetcher(
452454
job_state=job_state_service

tests/worker/services/job_state/test_service.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -201,17 +201,20 @@ async def test_job_state_set_message_cursor_state(
201201
job_state_service = services_manager.services.cast_service(JobStateService)
202202
fetch_cursors = mocker.spy(job_state_service, "fetch_cursors_states")
203203
job_state_service.set_job_cursor_state(
204-
fake_queue_item.name, cursor=Cursor("1"), cursor_state={"x": 1}
204+
JobId("test-job"), cursor=Cursor("1"), cursor_state={"x": 1}
205205
)
206206
job_state_service.set_job_cursor_state(
207-
fake_queue_item.name, cursor=Cursor("2"), cursor_state={"x": 2}
207+
JobId("test-job"), cursor=Cursor("2"), cursor_state={"x": 2}
208208
)
209209
fake_queue_item.config["job"] = {
210210
"buffer_flush_after": 7,
211211
"buffer_size": 2,
212212
}
213213
fake_queue_item.pipeline.info = PipelineInfo.from_pipeline(pipeline)
214-
fake_queue_item.config["job_state"] = {"cursors_states_enabled": True}
214+
fake_queue_item.config["job_state"] = {
215+
"cursors_states_enabled": True,
216+
"cursors_states_namespace": "test-job",
217+
}
215218

216219
@services_manager.services.s.hooks.work_queue_built.emit
217220
async def fake_work_builder(queue: QueueItemWithState) -> t.Any:
@@ -257,13 +260,13 @@ async def fake_work_builder(queue: QueueItemWithState) -> t.Any:
257260
]
258261

259262
assert fetch_cursors.call_args_list == [
260-
call(fake_queue_item.name, cursors=[Cursor("0")]),
261-
call(fake_queue_item.name, cursors=[Cursor("1"), Cursor("2")]),
262-
call(fake_queue_item.name, cursors=[Cursor("3"), Cursor("a")]),
263+
call("test-job", cursors=[Cursor("0")]),
264+
call("test-job", cursors=[Cursor("1"), Cursor("2")]),
265+
call("test-job", cursors=[Cursor("3"), Cursor("a")]),
263266
]
264267

265268
state = job_state_service._store._current_state
266-
assert len(state.jobs) == 1
269+
assert len(state.jobs) == 2
267270
job_state = state.jobs[fake_queue_item.name]
268271
assert job_state.cursor == "4"
269272
assert job_state.completion

tests/worker_manager/api/test_jobs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,7 @@ def test_sync_states(
665665
{"job": r.job_definition_name, "cursor": r.cursor, "state": r.state}
666666
for r in database_state
667667
] == [
668+
{"job": "orphan-test", "cursor": "a", "state": {"x": 1}},
668669
{"job": "test", "cursor": "a", "state": {"x": 1}},
669670
{"job": "test", "cursor": "b", "state": {"x": 3}},
670671
]

0 commit comments

Comments
 (0)