Skip to content

refactor: move replay status init into ExecutionState#378

Closed
zhongkechen wants to merge 1 commit intomainfrom
replay_status
Closed

refactor: move replay status init into ExecutionState#378
zhongkechen wants to merge 1 commit intomainfrom
replay_status

Conversation

@zhongkechen
Copy link
Copy Markdown
Contributor

@zhongkechen zhongkechen commented May 5, 2026

Issue #, if available:

Description of changes:

  • Move replay status initialization into ExecutionState
  • Move input parsing into ExecutionState
  • Split giant execute() into smaller pieces

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@zhongkechen zhongkechen force-pushed the replay_status branch 2 times, most recently from 26076fa to 19e5c7b Compare May 5, 2026 18:31
@zhongkechen zhongkechen changed the title refactor: move execution state initialization into ExecutionState refactor: move replay status init into ExecutionState May 5, 2026
@zhongkechen zhongkechen force-pushed the replay_status branch 6 times, most recently from 654bb4a to 2c31c52 Compare May 5, 2026 19:36
@zhongkechen zhongkechen marked this pull request as ready for review May 5, 2026 20:52
@zhongkechen zhongkechen self-assigned this May 5, 2026
@zhongkechen zhongkechen requested a review from a team May 5, 2026 20:52
if TYPE_CHECKING:
from collections.abc import Callable, Sequence

from aws_durable_execution_sdk_python.state import ExecutionState # noqa: TCH001
Copy link
Copy Markdown
Contributor

@yaythomas yaythomas May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why's the noqa here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea. Noqa above

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.astral.sh/ruff/rules/typing-only-first-party-import/

yes, above it was relevant, because outside of a type checking block.

Checks for first-party imports that are only used for type annotations, but aren't defined in a type-checking block.

OperationUpdate,
)
from aws_durable_execution_sdk_python.state import ExecutionState, ReplayStatus
from aws_durable_execution_sdk_python.state import ExecutionState
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is creating a circular reference.

please see style guide in contributing: runtime environment variables

https://github.com/aws/aws-durable-execution-sdk-python/blob/main/CONTRIBUTING.md#organization

Do not allow circular references, even if you can get away with it by using if TYPE_CHECKING.

Maybe move the # region Invocation models either to lambda_service , or maybe a new module invocation.

# add a redundant raise to make type checker happy
raise ExecutionError(msg)

logger.debug("durableExecutionArn: %s", invocation_input.durable_execution_arn)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why move this into init? The log reads as a diagnostic of the parsed invocation input, which is here.

{op.operation_id: op for op in all_operations}
)

def get_input_event(self) -> Any:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this on ExecutionState? this is json-deserialzation and a concern of the DurableExecutionInvocationInput, this is the object that owns the payload.

   invocation_input.get_input_event()

suggestion: put on DurableExecutionInvocationInput pls.

output.checkpoint_token,
output.new_execution_state.next_marker,
)
self.fetch_paginated_operations(output.new_execution_state)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't using the new current_checkpoint_token? won't it end up using old checkpoint token?

output: StateOutput = self._service_client.get_execution_state(
durable_execution_arn=self.durable_execution_arn,
checkpoint_token=checkpoint_token,
checkpoint_token=self._current_checkpoint_token,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this going to be stale?

if fetch_paginated, as it used to, takes

  initial_operations: list[Operation],
  checkpoint_token: str,

then the circular reference can also go away, I think.

Comment thread src/aws_durable_execution_sdk_python/state.py
Comment thread src/aws_durable_execution_sdk_python/state.py
Comment thread tests/state_test.py
assert "0" in state.operations
assert "1" in state.operations
assert len(state.operations) == 2
# Initial operation + page 1 should be stored despite page 2 failing
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these asserts are dead code now, inside the with?

) -> None:
"""Add initial operations and fetch all paginated operations from the Durable Functions API. This method is thread_safe.

The checkpoint_token is passed explicitly as a parameter rather than using the instance variable to ensure thread safety.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not true anymore. but if change back as I suggest further down, then still relevant :-)

if TYPE_CHECKING:
from collections.abc import Callable, Sequence

from aws_durable_execution_sdk_python.state import ExecutionState # noqa: TCH001
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.astral.sh/ruff/rules/typing-only-first-party-import/

yes, above it was relevant, because outside of a type checking block.

Checks for first-party imports that are only used for type annotations, but aren't defined in a type-checking block.

self._current_checkpoint_token: str = initial_checkpoint_token
self.operations: MutableMapping[str, Operation] = operations
self.durable_execution_arn: str = invocation_input.durable_execution_arn
self._current_checkpoint_token: str = invocation_input.checkpoint_token
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pre-existing issue, but current_checkpoint_token is not really accurate, it's actually _initial_checkpoint_token.

I'm actually thinking to drop it entirely, and make the _start_checkpointing take the current_token explicitly. Reduce the surface area for confusion arising.

# start checkpointing
executor.submit(self.checkpoint_batches_forever)
try:
yield self
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice idea :-)

minor nit. the yield self is redundant, it only needs yield.

suggestion, make it take the checkpoint token:

   @contextmanager
   def start_checkpointing(
       self,
       executor: Executor,
       initial_checkpoint_token: str,
   ) -> Iterator[None]:
       executor.submit(self._checkpoint_batches_forever,
   initial_checkpoint_token)
       try:
           yield
       finally:
           self.stop_checkpointing()

@zhongkechen
Copy link
Copy Markdown
Contributor Author

Abandoning this PR as most changes won't be required for the plugin feature. Required changes will go into the plugin PR if any.

@zhongkechen zhongkechen closed this May 6, 2026
@zhongkechen zhongkechen deleted the replay_status branch May 6, 2026 01:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants