Skip to content

Commit dff8536

Browse files
committed
Schema back to 1.0, drop PENDING state
1 parent 5a5f1a3 commit dff8536

9 files changed

Lines changed: 121 additions & 174 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ ADDED
2727
history, and re-fetches entity state at the top of every page loop so
2828
external delete or mark-failed signals stop the orchestrator cleanly.
2929
Job state lives in a durable entity with an explicit state-transition
30-
matrix (PENDING / ACTIVE / COMPLETED / FAILED); invalid transitions raise
30+
matrix (ACTIVE / COMPLETED / FAILED); invalid transitions raise
3131
`ExportJobInvalidTransitionError`. Persisted entity state uses a
3232
versioned, schema-stable JSON shape (`STATE_SCHEMA_VERSION`) with no
3333
embedded Python type metadata. Each export job's driving orchestrator

durabletask/extensions/history_export/client.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -136,11 +136,11 @@ def create_job(
136136
) -> ExportJobDescription:
137137
"""Create a new export job and start its driving orchestrator.
138138
139-
The entity is created in :attr:`ExportJobStatus.PENDING` and
140-
immediately signalled with ``run``, which schedules the
139+
The entity processes ``create`` by validating the transition,
140+
persisting :attr:`ExportJobStatus.ACTIVE`, and scheduling the
141141
driving orchestrator from inside the entity using a
142142
deterministic instance ID (``export-job-{job_id}``). This
143-
matches the .NET ``ExportJob.Run`` pattern: callers can
143+
matches the .NET ``ExportJob.Create`` pattern: callers can
144144
correlate a job with its orchestrator by ID alone and may
145145
safely re-create a previously-terminated job.
146146
"""
@@ -150,10 +150,10 @@ def create_job(
150150
created_at = datetime.now(timezone.utc)
151151
config_dict = config.to_dict()
152152

153-
# Signal create first; the entity will validate the transition
154-
# and persist PENDING. Then signal run; the entity will
155-
# schedule the orchestrator and transition to ACTIVE. Both
156-
# signals are processed in FIFO order by the entity dispatcher.
153+
# A single ``create`` signal is enough: the entity validates
154+
# the transition, persists ACTIVE, and schedules the
155+
# orchestrator inline. Mirrors the .NET ``ExportJob.Create``
156+
# flow.
157157
self._client.signal_entity(
158158
entity_id,
159159
ExportJobEntity.OP_CREATE,
@@ -162,14 +162,13 @@ def create_job(
162162
"created_at": created_at.isoformat(),
163163
},
164164
)
165-
self._client.signal_entity(entity_id, ExportJobEntity.OP_RUN)
166165
logger.info(
167166
"Submitted export job %r; orchestrator instance ID will be %s",
168167
resolved_job_id, orchestrator_instance_id_for(resolved_job_id),
169168
)
170169
return ExportJobDescription(
171170
job_id=resolved_job_id,
172-
status=ExportJobStatus.PENDING,
171+
status=ExportJobStatus.ACTIVE,
173172
created_at=created_at,
174173
last_modified_at=created_at,
175174
config=config,
@@ -285,7 +284,7 @@ def wait_for_job(
285284
"""Poll until the job reaches a terminal status or *timeout* elapses.
286285
287286
Raises:
288-
TimeoutError: If the job is still pending/active after
287+
TimeoutError: If the job is still active after
289288
*timeout* seconds.
290289
ExportJobNotFoundError: If the job cannot be found at all.
291290
"""

durabletask/extensions/history_export/entity.py

Lines changed: 43 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,16 @@
1414
Operations
1515
----------
1616
``create``
17-
Initialise a fresh export job, or reset a terminal job back to
18-
:attr:`ExportJobStatus.PENDING`. Refuses to overwrite an active
19-
job (raises :class:`ExportJobInvalidTransitionError`).
17+
Initialise a fresh export job, or revive a terminal job, by
18+
persisting :attr:`ExportJobStatus.ACTIVE` and scheduling the
19+
driving orchestrator inline (with a deterministic instance ID
20+
derived from the job ID). Refuses to overwrite an active job
21+
(raises :class:`ExportJobInvalidTransitionError`). Mirrors the
22+
.NET ``ExportJob.Create`` flow, so a single signal is enough to
23+
launch a job.
2024
``get``
2125
Returns the persisted state dict, or ``None`` if the entity has
2226
not been created (or has been deleted).
23-
``run``
24-
Schedules the driving orchestrator (with a deterministic instance
25-
ID derived from the job ID) and transitions the job to
26-
:attr:`ExportJobStatus.ACTIVE`. Idempotent so the client may
27-
safely signal it more than once.
2827
``commit_checkpoint``
2928
Applies an incremental update after a single export page. When
3029
``mark_failed_on_batch`` is true *and* ``failures`` is non-empty,
@@ -101,7 +100,6 @@ class ExportJobEntity(entities.DurableEntity):
101100

102101
OP_CREATE = "create"
103102
OP_GET = "get"
104-
OP_RUN = "run"
105103
OP_COMMIT_CHECKPOINT = "commit_checkpoint"
106104
OP_MARK_COMPLETED = "mark_completed"
107105
OP_MARK_FAILED = "mark_failed"
@@ -138,7 +136,7 @@ def create(self, payload: Mapping[str, Any]) -> dict[str, Any]:
138136
job_id = self._job_id()
139137
current = self._current_status()
140138
assert_valid_transition(
141-
self.OP_CREATE, current, ExportJobStatus.PENDING, job_id=job_id,
139+
self.OP_CREATE, current, ExportJobStatus.ACTIVE, job_id=job_id,
142140
)
143141

144142
config_dict = payload.get("config")
@@ -165,69 +163,51 @@ def create(self, payload: Mapping[str, Any]) -> dict[str, Any]:
165163
# Matches the .NET ``ExportJob.Create`` revive semantics so a
166164
# re-created job starts from a clean slate.
167165
state = ExportJobState(
168-
status=ExportJobStatus.PENDING,
166+
status=ExportJobStatus.ACTIVE,
169167
config=config,
170168
created_at=created_at,
171169
last_modified_at=created_at,
172170
)
173-
logger.info(
174-
"Created export job %r in status %s", job_id, state.status.value,
175-
extra={"job_id": job_id, "operation": "create"},
176-
)
171+
172+
# The entity itself schedules the driving orchestrator inline,
173+
# so a single ``create`` signal is enough to launch a job.
174+
# Mirrors the .NET ``ExportJob.Create`` -> ``StartExportOrchestration``
175+
# flow and avoids the client having to send a second ``run``
176+
# signal (and the failure modes that come with it).
177+
instance_id = orchestrator_instance_id_for(job_id)
178+
try:
179+
self.entity_context.schedule_new_orchestration(
180+
ORCHESTRATOR_NAME,
181+
input={"job_id": job_id, "config": state.config.to_dict()},
182+
instance_id=instance_id,
183+
)
184+
state.orchestrator_instance_id = instance_id
185+
logger.info(
186+
"Created export job %r and scheduled orchestrator %s with "
187+
"instance ID %s",
188+
job_id, ORCHESTRATOR_NAME, instance_id,
189+
extra={"job_id": job_id, "operation": "create"},
190+
)
191+
except Exception as ex: # noqa: BLE001
192+
# Mirror the .NET pattern: record the failure on persisted
193+
# state and return, rather than re-raising. Re-raising
194+
# inside an entity operation can cause some entity
195+
# backends to discard the in-flight state mutations,
196+
# leaving the job with no error recorded.
197+
state.status = ExportJobStatus.FAILED
198+
state.last_error = (
199+
f"Failed to schedule orchestrator: {type(ex).__name__}: {ex}"
200+
)
201+
logger.exception(
202+
"Failed to schedule orchestrator for export job %r", job_id,
203+
extra={"job_id": job_id, "operation": "create"},
204+
)
177205
return self._save(state)
178206

179207
def get(self, _: Any = None) -> dict[str, Any] | None:
180208
state = self._load()
181209
return state.to_dict() if state is not None else None
182210

183-
def run(self, _: Any = None) -> dict[str, Any] | None:
184-
state = self._load()
185-
if state is None:
186-
raise ValueError("Cannot run uninitialized export job")
187-
job_id = self._job_id()
188-
assert_valid_transition(
189-
self.OP_RUN, state.status, ExportJobStatus.ACTIVE, job_id=job_id,
190-
)
191-
192-
# The entity itself schedules the driving orchestrator. The
193-
# client is therefore decoupled from the orchestrator's name
194-
# and input shape.
195-
if state.status is ExportJobStatus.PENDING:
196-
instance_id = orchestrator_instance_id_for(job_id)
197-
try:
198-
self.entity_context.schedule_new_orchestration(
199-
ORCHESTRATOR_NAME,
200-
input={"job_id": job_id, "config": state.config.to_dict()},
201-
instance_id=instance_id,
202-
)
203-
state.orchestrator_instance_id = instance_id
204-
logger.info(
205-
"Scheduled orchestrator %s for job %r with instance ID %s",
206-
ORCHESTRATOR_NAME, job_id, instance_id,
207-
extra={"job_id": job_id, "operation": "run"},
208-
)
209-
except Exception as ex: # noqa: BLE001
210-
# Mirror the .NET ExportJob.StartExportOrchestration pattern:
211-
# record the failure on persisted state and return, rather
212-
# than re-raising. Re-raising inside an entity operation
213-
# can cause some entity backends to discard the in-flight
214-
# state mutations, leaving the job stuck in PENDING with no
215-
# error recorded. Returning ensures FAILED + last_error
216-
# actually persist.
217-
state.status = ExportJobStatus.FAILED
218-
state.last_error = (
219-
f"Failed to schedule orchestrator: {type(ex).__name__}: {ex}"
220-
)
221-
logger.exception(
222-
"Failed to schedule orchestrator for export job %r", job_id,
223-
extra={"job_id": job_id, "operation": "run"},
224-
)
225-
return self._save(state)
226-
227-
state.status = ExportJobStatus.ACTIVE
228-
state.last_error = None
229-
return self._save(state)
230-
231211
def commit_checkpoint(self, payload: Mapping[str, Any]) -> dict[str, Any] | None:
232212
state = self._load()
233213
if state is None:

durabletask/extensions/history_export/models.py

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -63,24 +63,20 @@ class ExportJobStatus(Enum):
6363
6464
Status meanings
6565
---------------
66-
``PENDING``
67-
The job has been created and persisted but the entity has not
68-
yet kicked off its driving orchestrator. Jobs sit in this
69-
state briefly between the ``create`` and ``run`` signals
70-
(the public client sends both in immediate succession), or
71-
for longer if ``run`` is never invoked or if a caller revives
72-
a previously terminal job via ``create``.
7366
``ACTIVE``
74-
The job is running and the driving orchestrator is making
75-
progress through pages of terminal instances.
67+
The job has been created and the driving orchestrator is
68+
making progress through pages of terminal instances. This is
69+
the initial status after :meth:`ExportHistoryClient.create_job`
70+
because the entity schedules the orchestrator inline as part
71+
of its ``create`` operation (mirroring the .NET
72+
``ExportJob.Create`` flow).
7673
``COMPLETED``
7774
The orchestrator finished a batch successfully.
7875
``FAILED``
7976
The orchestrator threw, or a page of exports exhausted its
8077
retries.
8178
"""
8279

83-
PENDING = "Pending"
8480
ACTIVE = "Active"
8581
COMPLETED = "Completed"
8682
FAILED = "Failed"
@@ -434,22 +430,11 @@ def to_configuration(self) -> ExportJobConfiguration:
434430
# replaced with a registry keyed by ``(entity_name, schema_version)`` without
435431
# changing the on-disk shape.
436432

437-
STATE_SCHEMA_VERSION = "1.1"
433+
STATE_SCHEMA_VERSION = "1.0"
438434
"""The schema version emitted by :meth:`ExportJobState.to_dict`.
439435
440436
Increment this when the persisted shape changes in a non-backward-compatible
441437
way and add a new branch in :meth:`ExportJobState.from_dict`.
442-
443-
Version history:
444-
445-
``"1.0"``
446-
Initial shape. ``runtime_status`` filter values were persisted as
447-
enum *names* (e.g. ``"COMPLETED"``), which broke if the core SDK
448-
renamed an enum constant. Read support retained.
449-
``"1.1"``
450-
``runtime_status`` filter values are persisted as the protobuf
451-
enum *integer* (e.g. ``2`` for ``COMPLETED``). Reads still accept
452-
the legacy 1.0 string form for backward compatibility.
453438
"""
454439

455440

@@ -502,10 +487,10 @@ def to_dict(self) -> dict[str, Any]:
502487
@classmethod
503488
def from_dict(cls, data: Mapping[str, Any]) -> "ExportJobState":
504489
version = data.get("schema_version", "1.0")
505-
if version not in {"1.0", "1.1"}:
490+
if version != STATE_SCHEMA_VERSION:
506491
raise ValueError(
507492
f"Unsupported export job state schema_version={version!r}; "
508-
f"expected one of: '1.0', '1.1' (current: {STATE_SCHEMA_VERSION!r})"
493+
f"expected {STATE_SCHEMA_VERSION!r}"
509494
)
510495

511496
config_data = data.get("config")

durabletask/extensions/history_export/transitions.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,12 @@
2828
# Maps (operation_name, current_status_or_None) -> {valid target statuses}.
2929
TRANSITIONS: Mapping[tuple[str, ExportJobStatus | None], frozenset[ExportJobStatus]] = {
3030
# ``create`` initialises a fresh job and revives terminal jobs.
31-
("create", None): frozenset({ExportJobStatus.PENDING}),
32-
("create", ExportJobStatus.FAILED): frozenset({ExportJobStatus.PENDING}),
33-
("create", ExportJobStatus.COMPLETED): frozenset({ExportJobStatus.PENDING}),
34-
35-
# ``run`` flips the job from PENDING to ACTIVE. Idempotent so the
36-
# client may signal it more than once without crashing the entity.
37-
("run", ExportJobStatus.PENDING): frozenset({ExportJobStatus.ACTIVE}),
38-
("run", ExportJobStatus.ACTIVE): frozenset({ExportJobStatus.ACTIVE}),
31+
# The entity schedules the driving orchestrator inline, so the job
32+
# goes straight to ACTIVE without a separate ``run`` signal.
33+
# Matches the .NET ``ExportJob.Create`` flow.
34+
("create", None): frozenset({ExportJobStatus.ACTIVE}),
35+
("create", ExportJobStatus.FAILED): frozenset({ExportJobStatus.ACTIVE}),
36+
("create", ExportJobStatus.COMPLETED): frozenset({ExportJobStatus.ACTIVE}),
3937

4038
# ``commit_checkpoint`` is a no-op transition during normal runs.
4139
# When the orchestrator signals ``mark_failed_on_batch`` the entity
@@ -47,9 +45,6 @@
4745

4846
("mark_completed", ExportJobStatus.ACTIVE): frozenset({ExportJobStatus.COMPLETED}),
4947

50-
# ``mark_failed`` from PENDING covers the rare case of a failure
51-
# happening between create and run.
52-
("mark_failed", ExportJobStatus.PENDING): frozenset({ExportJobStatus.FAILED}),
5348
("mark_failed", ExportJobStatus.ACTIVE): frozenset({ExportJobStatus.FAILED}),
5449
}
5550

tests/durabletask/extensions/history_export/test_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ def test_create_get_and_wait_for_job_end_to_end(
127127
)
128128

129129
assert desc.job_id
130-
assert desc.status == ExportJobStatus.PENDING
130+
assert desc.status == ExportJobStatus.ACTIVE
131131
assert desc.config is not None
132132
assert desc.orchestrator_instance_id == f"export-job-{desc.job_id}"
133133

0 commit comments

Comments
 (0)