Skip to content

Commit e80e390

Browse files
fix: replay status after paginated state (#377)
* fix: replay status after paginated state * fix: move replay status check into state --------- Co-authored-by: Frank Chen <65260095+zhongkechen@users.noreply.github.com>
1 parent 5f4b23e commit e80e390

3 files changed

Lines changed: 46 additions & 5 deletions

File tree

src/aws_durable_execution_sdk_python/execution.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -281,11 +281,7 @@ def execute(self):
281281
initial_checkpoint_token=self.invocation_input.checkpoint_token,
282282
operations={},
283283
service_client=self.service_client,
284-
# If there are operations other than the initial EXECUTION one, current state is in replay mode
285-
# todo: replay status will be wrong if initial_execution_state contains only one operation and more in next pages
286-
replay_status=ReplayStatus.REPLAY
287-
if len(self.invocation_input.initial_execution_state.operations) > 1
288-
else ReplayStatus.NEW,
284+
replay_status=ReplayStatus.NEW,
289285
)
290286

291287
try:
@@ -307,6 +303,8 @@ def execute(self):
307303
exception=e, retryable=e.is_retryable()
308304
)
309305

306+
execution_state.mark_replaying_if_prior_operations_exist()
307+
310308
raw_input_payload: str | None = execution_state.get_input_payload()
311309

312310
# Python RIC LambdaMarshaller just uses standard json deserialization for event

src/aws_durable_execution_sdk_python/state.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,18 @@ def is_replaying(self) -> bool:
379379
with self._replay_status_lock:
380380
return self._replay_status is ReplayStatus.REPLAY
381381

382+
def mark_replaying_if_prior_operations_exist(self) -> None:
383+
"""Mark execution state as replaying when non-execution operations exist."""
384+
with self._operations_lock:
385+
has_prior_operations: bool = any(
386+
op.operation_type is not OperationType.EXECUTION
387+
for op in self.operations.values()
388+
)
389+
390+
if has_prior_operations:
391+
with self._replay_status_lock:
392+
self._replay_status = ReplayStatus.REPLAY
393+
382394
def get_checkpoint_result(self, checkpoint_id: str) -> CheckpointedResult:
383395
"""Get checkpoint result.
384396

tests/execution_test.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
OperationStatus,
4444
OperationType,
4545
OperationUpdate,
46+
StateOutput,
4647
StepDetails,
4748
WaitDetails,
4849
)
@@ -2686,6 +2687,36 @@ def _make_lambda_context():
26862687
return ctx
26872688

26882689

2690+
def test_durable_execution_replays_when_paginated_state_has_prior_operations():
2691+
"""Test paginated execution state starts in replay mode when prior operations exist."""
2692+
mock_client = Mock(spec=DurableServiceClient)
2693+
step_operation = Operation(
2694+
operation_id="step1",
2695+
operation_type=OperationType.STEP,
2696+
status=OperationStatus.SUCCEEDED,
2697+
)
2698+
mock_client.get_execution_state.return_value = StateOutput(
2699+
operations=[step_operation],
2700+
next_marker=None,
2701+
)
2702+
2703+
invocation_input = _make_invocation_input(mock_client, next_marker="page2")
2704+
2705+
@durable_execution
2706+
def test_handler(event: Any, context: DurableContext) -> dict:
2707+
return {"is_replaying": context.state.is_replaying()}
2708+
2709+
result = test_handler(invocation_input, _make_lambda_context())
2710+
2711+
assert result["Status"] == InvocationStatus.SUCCEEDED.value
2712+
assert json.loads(result["Result"]) == {"is_replaying": True}
2713+
mock_client.get_execution_state.assert_called_once_with(
2714+
durable_execution_arn="arn:test:execution",
2715+
checkpoint_token="token123",
2716+
next_marker="page2",
2717+
)
2718+
2719+
26892720
def test_durable_execution_non_retryable_invocation_error_returns_failed():
26902721
"""Test that non-retryable InvocationError returns FAILED instead of retrying."""
26912722
mock_client = Mock(spec=DurableServiceClient)

0 commit comments

Comments
 (0)