Skip to content

Commit f678833

Browse files
authored
fix: checkpoint start action when retry a failed step (#381)
1 parent 0738c36 commit f678833

4 files changed

Lines changed: 79 additions & 2 deletions

File tree

src/aws_durable_execution_sdk_python/operation/step.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,8 @@ def check_result_status(self) -> CheckResult[T]:
152152
):
153153
return CheckResult.create_is_ready_to_execute(checkpointed_result)
154154

155-
# Create START checkpoint if not exists
156-
if not checkpointed_result.is_existent():
155+
# Create START checkpoint if nonexistent or READY
156+
if not checkpointed_result.is_existent() or checkpointed_result.is_ready():
157157
start_operation: OperationUpdate = OperationUpdate.create_step_start(
158158
identifier=self.operation_identifier,
159159
)

src/aws_durable_execution_sdk_python/state.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,13 @@ def is_pending(self) -> bool:
175175
return False
176176
return op.status is OperationStatus.PENDING
177177

178+
def is_ready(self) -> bool:
179+
"""Return True if the checkpointed operation is READY."""
180+
op = self.operation
181+
if not op:
182+
return False
183+
return op.status is OperationStatus.READY
184+
178185
def is_timed_out(self) -> bool:
179186
"""Return True if the checkpointed operation is TIMED_OUT."""
180187
op = self.operation

tests/operation/step_test.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -894,3 +894,58 @@ def test_step_executes_function_when_second_check_returns_started():
894894
mock_state.get_checkpoint_result.call_count == 1
895895
) # Only one check for AT_LEAST_ONCE
896896
assert mock_state.create_checkpoint.call_count == 2 # START + SUCCEED checkpoints
897+
898+
899+
def test_step_creates_start_checkpoint_when_status_is_ready():
900+
"""Test that create_checkpoint is called with START action when the step is in READY status."""
901+
mock_state = Mock(spec=ExecutionState)
902+
mock_state.durable_execution_arn = "test_arn"
903+
904+
# Simulate a step that is in READY status (e.g., returned from a previous checkpoint)
905+
ready_op = Operation(
906+
operation_id="step_ready_1",
907+
operation_type=OperationType.STEP,
908+
status=OperationStatus.READY,
909+
step_details=StepDetails(attempt=0),
910+
)
911+
ready_result = CheckpointedResult.create_from_operation(ready_op)
912+
913+
# After creating the sync START checkpoint, the refreshed result returns STARTED
914+
started_op = Operation(
915+
operation_id="step_ready_1",
916+
operation_type=OperationType.STEP,
917+
status=OperationStatus.STARTED,
918+
step_details=StepDetails(attempt=0),
919+
)
920+
started_result = CheckpointedResult.create_from_operation(started_op)
921+
mock_state.get_checkpoint_result.side_effect = [ready_result, started_result]
922+
923+
config = StepConfig(step_semantics=StepSemantics.AT_MOST_ONCE_PER_RETRY)
924+
mock_callable = Mock(return_value="ready_step_result")
925+
mock_logger = Mock(spec=Logger)
926+
mock_logger.with_log_info.return_value = mock_logger
927+
928+
result = step_handler(
929+
mock_callable,
930+
mock_state,
931+
OperationIdentifier("step_ready_1", None, "test_step"),
932+
config,
933+
mock_logger,
934+
)
935+
936+
assert result == "ready_step_result"
937+
mock_callable.assert_called_once()
938+
939+
# Verify START checkpoint was created
940+
start_call = mock_state.create_checkpoint.call_args_list[0]
941+
start_operation = start_call[1]["operation_update"]
942+
assert start_operation.operation_id == "step_ready_1"
943+
assert start_operation.operation_type is OperationType.STEP
944+
assert start_operation.sub_type is OperationSubType.STEP
945+
assert start_operation.action is OperationAction.START
946+
947+
# Verify SUCCEED checkpoint was also created after execution
948+
assert mock_state.create_checkpoint.call_count == 2
949+
success_call = mock_state.create_checkpoint.call_args_list[1]
950+
success_operation = success_call[1]["operation_update"]
951+
assert success_operation.action is OperationAction.SUCCEED

tests/state_test.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,21 @@ def test_checkpointerd_result_is_pending():
332332
assert result_no_op.is_pending() is False
333333

334334

335+
def test_checkpointerd_result_is_ready():
336+
"""Test CheckpointedResult.is_ready method."""
337+
operation = Operation(
338+
operation_id="op1",
339+
operation_type=OperationType.STEP,
340+
status=OperationStatus.READY,
341+
)
342+
result = CheckpointedResult.create_from_operation(operation)
343+
assert result.is_ready() is True
344+
345+
# Test with no operation
346+
result_no_op = CheckpointedResult.create_not_found()
347+
assert result_no_op.is_ready() is False
348+
349+
335350
def test_checkpointed_result_is_started():
336351
"""Test CheckpointedResult.is_started method."""
337352
operation = Operation(

0 commit comments

Comments
 (0)