Skip to content

Commit 9d92d4d

Browse files
[cross-repo from server#312] Conformance blocker: expand replay coverage beyond current smoke (#111)
1 parent a8fe418 commit 9d92d4d

5 files changed

Lines changed: 554 additions & 16 deletions

File tree

src/durable_workflow/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
DurableWorkflowError,
5959
InvalidArgument,
6060
NamespaceNotFound,
61+
NonDeterministicReplayError,
6162
NonRetryableError,
6263
QueryFailed,
6364
ScheduleAlreadyExists,
@@ -270,6 +271,7 @@
270271
"LocalFilesystemExternalStorage",
271272
"MetricsRecorder",
272273
"NamespaceNotFound",
274+
"NonDeterministicReplayError",
273275
"NoopMetrics",
274276
"QueryFailed",
275277
"PrometheusMetrics",

src/durable_workflow/errors.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,33 @@ def __init__(
198198
self.exception_type = exception_type
199199

200200

201+
class NonDeterministicReplayError(DurableWorkflowError):
202+
"""Current workflow code yielded a command shape that does not match history."""
203+
204+
def __init__(
205+
self,
206+
workflow_sequence: int,
207+
expected_shape: str,
208+
recorded_event_types: list[str],
209+
*,
210+
detail: str | None = None,
211+
) -> None:
212+
message = (
213+
f"workflow history at workflow sequence {workflow_sequence} recorded "
214+
f"{recorded_event_types!r}, but the current workflow yielded {expected_shape}."
215+
)
216+
if detail:
217+
message = f"{message} {detail}"
218+
message = (
219+
f"{message} Keep yielded workflow steps stable across deployments or "
220+
"run this workflow on a compatible build."
221+
)
222+
super().__init__(message)
223+
self.workflow_sequence = workflow_sequence
224+
self.expected_shape = expected_shape
225+
self.recorded_event_types = list(recorded_event_types)
226+
227+
201228
class UpdateRejected(DurableWorkflowError):
202229
"""A workflow update was rejected by the workflow's validator."""
203230

src/durable_workflow/replay_verify.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from pathlib import Path
3535
from typing import Any, Callable
3636

37-
from .errors import ChildWorkflowFailed, WorkflowFailed
37+
from .errors import ChildWorkflowFailed, NonDeterministicReplayError, WorkflowFailed
3838
from .workflow import (
3939
CompleteWorkflow,
4040
ReplayOutcome,
@@ -172,6 +172,7 @@ class CaseReport:
172172
workflow_type: str | None
173173
family: str | None
174174
source: Mapping[str, Any] | None = None
175+
divergence: Mapping[str, Any] | None = None
175176
expected: Mapping[str, Any] | None = None
176177
observed: Mapping[str, Any] | None = None
177178
error: Mapping[str, Any] | None = None
@@ -196,6 +197,7 @@ def to_dict(self) -> dict[str, Any]:
196197
"workflow_type": self.workflow_type,
197198
"family": self.family,
198199
"source": dict(self.source) if self.source is not None else None,
200+
"divergence": dict(self.divergence) if self.divergence is not None else None,
199201
"expected": dict(self.expected) if self.expected is not None else None,
200202
"observed": dict(self.observed) if self.observed is not None else None,
201203
"error": dict(self.error) if self.error is not None else None,
@@ -550,6 +552,30 @@ def _replay_case(replayer: Replayer, case: Mapping[str, Any]) -> CaseReport:
550552
case.get("start_input") or [],
551553
workflow_type=workflow_type,
552554
)
555+
except NonDeterministicReplayError as exc:
556+
divergence = {
557+
"workflow_sequence": exc.workflow_sequence,
558+
"expected_shape": exc.expected_shape,
559+
"recorded_event_types": list(exc.recorded_event_types),
560+
"message": str(exc),
561+
}
562+
return CaseReport(
563+
id=case_id,
564+
status=STATUS_DRIFTED,
565+
reason=REASON_SHAPE_MISMATCH,
566+
workflow_type=workflow_type,
567+
family=family,
568+
source=source,
569+
divergence=divergence,
570+
expected=case.get("expected"),
571+
error={
572+
"class": type(exc).__name__,
573+
"message": str(exc),
574+
"workflow_sequence": exc.workflow_sequence,
575+
"expected_shape": exc.expected_shape,
576+
"recorded_event_types": list(exc.recorded_event_types),
577+
},
578+
)
553579
except (TypeError, ValueError, ChildWorkflowFailed, WorkflowFailed) as exc:
554580
return CaseReport(
555581
id=case_id,

0 commit comments

Comments
 (0)