Implement Standalone Nexus Operations#1461
Conversation
…n defaults for id_reuse_policy and id_conflict_policy. Update integration tests with better typing and new assertions for the newly interceptable get_nexus_operation_result
…with a NexusOperationFailureError
ce1bade to
325d8bc
Compare
…ch start/execute operation. Fix hardcoded retryable=True in fallback error serialization. Add type tests and rename test/nexus/test_type_errors.py to ensure that the type test file properly executes nexus tests.
…ion of sdk-core. Reference cli prerelease in tests. Add sano required dynamic config values to test server. unskip additional failure details test.
…s links out. Shield task completion to core so we don't drop the rust future that _must_ complete for shutdown to succeed. Add flake finder as dev dependency.
THardy98
left a comment
There was a problem hiding this comment.
Did a quick passthrough
| _metadata_decoded: bool = field( | ||
| kw_only=True, default=False, compare=False, repr=False | ||
| ) | ||
|
|
There was a problem hiding this comment.
I'm not sure if we need a field for this. I know we did this for workflow description, but I think it's reasonable to just lazy decode on demand without mutating _static_details or _static_summary.
nbd either way, thought I'd mention it
There was a problem hiding this comment.
Happy to remove if we feel like it's an over optimization
…ests that validate that the arg is required
|
|
||
| def __init__(self, *, cause: BaseException) -> None: | ||
| """Create nexus operation failure error.""" | ||
| super().__init__("Nexus operation execution failed") |
There was a problem hiding this comment.
Should you add "cause" to this to make a more helpful error? And I think we usually capitolize Nexus
There was a problem hiding this comment.
This matches the existing patterns here where the message is passed to the super constructor but the cause is set via self.__cause__.
Will fix the capitalization though!
| ) -> list[temporalio.api.common.v1.Link.WorkflowEvent]: | ||
| event_links = [] | ||
| for inbound_link in self.nexus_context.inbound_links: | ||
| if inbound_link.type != _WORKFLOW_EVENT_LINK_TYPE: |
There was a problem hiding this comment.
Why are we only accepting _WORKFLOW_EVENT_LINK_TYPE
There was a problem hiding this comment.
This doesn't change underlying behavior, but suppresses a warning log when a link that doesn't match the expected regex in nexus_link_to_workflow_event. In that function, invalid urls return None and are filtered out here.
This was unintentionally added to this PR from a standalone nexus operation -> standalone activity prototype I was experimenting with. I've removed this change here so we can address link support when support we cross that bridge.
| # Task completion should never be dropped in case of cancellation. | ||
| # The Rust future in core must complete for shutdown to happen without | ||
| # hanging. | ||
| async def _complete_task( |
There was a problem hiding this comment.
Is this needed for stand alone nexus operations or is this fixing a different bug?
There was a problem hiding this comment.
This is a different bug, but implementing standalone nexus operations surfaced the race condition. Specifically starting an operation and then terminating it before the completion has been sent to Core. Previously, this wasn't caught as the cancellation propagation from a caller workflow was slow enough to not trigger the race.
Happy to move it to a separate PR to limit the scope if you'd like.
There was a problem hiding this comment.
Client impl and tests for SANO lgtm, though you may want to get additional approval from nexus folks and @tconley1428 - i wouldn't merge with only my approval
| assert desc.blocked_reason is None or isinstance(desc.blocked_reason, str) | ||
| assert desc.last_attempt_failure is None or isinstance( | ||
| desc.last_attempt_failure, BaseException | ||
| ) |
There was a problem hiding this comment.
Is there a reason why we are asserting on multiple conditions?
i.e. blocked_reason is None or str
I don't see why it would ever be blocked or why last_attempt_failure would be None
There was a problem hiding this comment.
No reason, I've narrowed these assertions now.
Both blocked_reason and last_attempt_failure are expected to be None since nothing should go wrong here.
Add Standalone Nexus Operation Support
API Changes
Client.create_nexus_client()- Create a typedNexusClientfrom an endpoint & nexusrpc.ServiceClient.list_nexus_operations()- List operations via visibility queryClient.count_nexus_operations()- Count operations via visibility queryClient.get_nexus_operation_handle()- Create a typedNexusOperationHandleto an existing operationNexusClient.start_operation()- Start a Nexus Operation and return aNexusOperationHandle.NexusClient.execute_operation()- Same asstart_operation, but poll the handle for the final result.NexusOperationHandle.result()/.describe()/.cancel()/.terminate()/ property accessors - interact with an existing operation.Interceptor Support
The following methods have been added to
OutboundInterceptorstart_nexus_operation()describe_nexus_operation()cancel_nexus_operation()terminate_nexus_operation()list_nexus_operations()count_nexus_operations()Testing
Added integration test suite tests/nexus/test_standalone_operations.py (858 lines) covering start, get result, describe, cancel, terminate, list, count, ID conflict/reuse policies, and interceptor integration
Misc Changes
rufftarget version topy310to match the minimum support Python version.