refactor: move replay status init into ExecutionState#378
refactor: move replay status init into ExecutionState#378zhongkechen wants to merge 1 commit intomainfrom
Conversation
26076fa to
19e5c7b
Compare
654bb4a to
2c31c52
Compare
| if TYPE_CHECKING: | ||
| from collections.abc import Callable, Sequence | ||
|
|
||
| from aws_durable_execution_sdk_python.state import ExecutionState # noqa: TCH001 |
There was a problem hiding this comment.
No idea. Noqa above
There was a problem hiding this comment.
https://docs.astral.sh/ruff/rules/typing-only-first-party-import/
yes, above it was relevant, because outside of a type checking block.
Checks for first-party imports that are only used for type annotations, but aren't defined in a type-checking block.
| OperationUpdate, | ||
| ) | ||
| from aws_durable_execution_sdk_python.state import ExecutionState, ReplayStatus | ||
| from aws_durable_execution_sdk_python.state import ExecutionState |
There was a problem hiding this comment.
this is creating a circular reference.
please see style guide in contributing: runtime environment variables
https://github.com/aws/aws-durable-execution-sdk-python/blob/main/CONTRIBUTING.md#organization
Do not allow circular references, even if you can get away with it by using if TYPE_CHECKING.
Maybe move the # region Invocation models either to lambda_service , or maybe a new module invocation.
| # add a redundant raise to make type checker happy | ||
| raise ExecutionError(msg) | ||
|
|
||
| logger.debug("durableExecutionArn: %s", invocation_input.durable_execution_arn) |
There was a problem hiding this comment.
why move this into init? The log reads as a diagnostic of the parsed invocation input, which is here.
| {op.operation_id: op for op in all_operations} | ||
| ) | ||
|
|
||
| def get_input_event(self) -> Any: |
There was a problem hiding this comment.
why is this on ExecutionState? this is json-deserialzation and a concern of the DurableExecutionInvocationInput, this is the object that owns the payload.
invocation_input.get_input_event()
suggestion: put on DurableExecutionInvocationInput pls.
| output.checkpoint_token, | ||
| output.new_execution_state.next_marker, | ||
| ) | ||
| self.fetch_paginated_operations(output.new_execution_state) |
There was a problem hiding this comment.
this isn't using the new current_checkpoint_token? won't it end up using old checkpoint token?
| output: StateOutput = self._service_client.get_execution_state( | ||
| durable_execution_arn=self.durable_execution_arn, | ||
| checkpoint_token=checkpoint_token, | ||
| checkpoint_token=self._current_checkpoint_token, |
There was a problem hiding this comment.
isn't this going to be stale?
if fetch_paginated, as it used to, takes
initial_operations: list[Operation],
checkpoint_token: str,
then the circular reference can also go away, I think.
| assert "0" in state.operations | ||
| assert "1" in state.operations | ||
| assert len(state.operations) == 2 | ||
| # Initial operation + page 1 should be stored despite page 2 failing |
There was a problem hiding this comment.
these asserts are dead code now, inside the with?
| ) -> None: | ||
| """Add initial operations and fetch all paginated operations from the Durable Functions API. This method is thread_safe. | ||
|
|
||
| The checkpoint_token is passed explicitly as a parameter rather than using the instance variable to ensure thread safety. |
There was a problem hiding this comment.
not true anymore. but if change back as I suggest further down, then still relevant :-)
| if TYPE_CHECKING: | ||
| from collections.abc import Callable, Sequence | ||
|
|
||
| from aws_durable_execution_sdk_python.state import ExecutionState # noqa: TCH001 |
There was a problem hiding this comment.
https://docs.astral.sh/ruff/rules/typing-only-first-party-import/
yes, above it was relevant, because outside of a type checking block.
Checks for first-party imports that are only used for type annotations, but aren't defined in a type-checking block.
| self._current_checkpoint_token: str = initial_checkpoint_token | ||
| self.operations: MutableMapping[str, Operation] = operations | ||
| self.durable_execution_arn: str = invocation_input.durable_execution_arn | ||
| self._current_checkpoint_token: str = invocation_input.checkpoint_token |
There was a problem hiding this comment.
pre-existing issue, but current_checkpoint_token is not really accurate, it's actually _initial_checkpoint_token.
I'm actually thinking to drop it entirely, and make the _start_checkpointing take the current_token explicitly. Reduce the surface area for confusion arising.
| # start checkpointing | ||
| executor.submit(self.checkpoint_batches_forever) | ||
| try: | ||
| yield self |
There was a problem hiding this comment.
nice idea :-)
minor nit. the yield self is redundant, it only needs yield.
suggestion, make it take the checkpoint token:
@contextmanager
def start_checkpointing(
self,
executor: Executor,
initial_checkpoint_token: str,
) -> Iterator[None]:
executor.submit(self._checkpoint_batches_forever,
initial_checkpoint_token)
try:
yield
finally:
self.stop_checkpointing()
|
Abandoning this PR as most changes won't be required for the plugin feature. Required changes will go into the plugin PR if any. |
Issue #, if available:
Description of changes:
execute()into smaller piecesBy submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.