Skip to content

Commit 75dd80a

Browse files
authored
Integration test for Standalone Activities delayed-start (#1520)
* Integration test for Standalone Activities delayed-start * Bump server version, and de-flake delay assertion. * CI debugging * Remove debugging and warm server download in CI * Fix `test_workflow_caller_cancellation_types_when_cancel_handler_fails` flaking * Remove server download warming
1 parent 91da1ed commit 75dd80a

6 files changed

Lines changed: 108 additions & 19 deletions

File tree

.github/workflows/ci.yml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ jobs:
4646
components: "clippy"
4747
- uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 # v2
4848
with:
49-
cache-bin: false
5049
workspaces: temporalio/bridge -> target
5150
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5
5251
with:
@@ -112,7 +111,6 @@ jobs:
112111
- uses: dtolnay/rust-toolchain@29eef336d9b2848a0b548edc03f92a220660cdb8 # stable
113112
- uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 # v2
114113
with:
115-
cache-bin: false
116114
workspaces: temporalio/bridge -> target
117115
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5
118116
with:
@@ -149,7 +147,6 @@ jobs:
149147
components: "clippy"
150148
- uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 # v2
151149
with:
152-
cache-bin: false
153150
workspaces: temporalio/bridge -> target
154151
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5
155152
with:
@@ -188,7 +185,6 @@ jobs:
188185
- uses: dtolnay/rust-toolchain@29eef336d9b2848a0b548edc03f92a220660cdb8 # stable
189186
- uses: Swatinem/rust-cache@e18b497796c12c097a38f9edb9d0641fb99eee32 # v2
190187
with:
191-
cache-bin: false
192188
workspaces: temporalio/bridge -> target
193189
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5
194190
with:

tests/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
DEV_SERVER_DOWNLOAD_VERSION = "v1.7.0"
1+
DEV_SERVER_DOWNLOAD_VERSION = "v1.7.1-standalone-nexus-operations"

tests/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]:
123123
"--dynamic-config-value",
124124
"activity.enableStandalone=true",
125125
"--dynamic-config-value",
126+
"activity.startDelayEnabled=true",
127+
"--dynamic-config-value",
126128
"history.enableChasm=true",
127129
"--dynamic-config-value",
128130
"history.enableTransitionHistory=true",

tests/nexus/test_workflow_caller_cancellation_types.py

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from temporalio.common import WorkflowIDConflictPolicy
2121
from temporalio.testing import WorkflowEnvironment
2222
from temporalio.worker import Worker
23+
from tests.helpers import assert_eventually
2324
from tests.helpers.nexus import make_nexus_endpoint_name
2425

2526

@@ -40,25 +41,41 @@ class TestContext:
4041
class HandlerWorkflow:
4142
def __init__(self):
4243
self.caller_op_future_resolved = asyncio.Event()
44+
self.cancel_requested = False
45+
self.release_cancellation = asyncio.Event()
4346

4447
@workflow.run
4548
async def run(self) -> None:
4649
try:
4750
await asyncio.Future()
4851
except asyncio.CancelledError:
52+
self.cancel_requested = True
4953
if test_context.cancellation_type in [
5054
workflow.NexusOperationCancellationType.TRY_CANCEL,
5155
workflow.NexusOperationCancellationType.WAIT_REQUESTED,
5256
]:
5357
# We want to prove that the caller op future can be resolved before the operation
5458
# (i.e. its backing workflow) is cancelled.
5559
await self.caller_op_future_resolved.wait()
60+
elif (
61+
test_context.cancellation_type
62+
== workflow.NexusOperationCancellationType.WAIT_COMPLETED
63+
):
64+
await self.release_cancellation.wait()
5665
raise
5766

5867
@workflow.signal
5968
def set_caller_op_future_resolved(self) -> None:
6069
self.caller_op_future_resolved.set()
6170

71+
@workflow.signal
72+
def set_release_cancellation(self) -> None:
73+
self.release_cancellation.set()
74+
75+
@workflow.query
76+
def has_cancel_requested(self) -> bool:
77+
return self.cancel_requested
78+
6279

6380
@nexusrpc.service
6481
class Service:
@@ -151,6 +168,10 @@ async def get_operation_token(self) -> str:
151168
async def wait_caller_op_future_resolved(self) -> None:
152169
await self.caller_op_future_resolved
153170

171+
@workflow.query
172+
def has_caller_op_future_resolved(self) -> bool:
173+
return self.caller_op_future_resolved.done()
174+
154175
@workflow.run
155176
async def run(self, input: Input) -> CancellationResult:
156177
op_handle = await (
@@ -408,6 +429,17 @@ async def check_behavior_for_wait_cancellation_completed(
408429
Check that a cancellation request is sent and the caller workflow nexus operation future is
409430
unblocked after the operation is canceled.
410431
"""
432+
433+
async def assert_handler_cancel_requested() -> None:
434+
assert await handler_wf.query(HandlerWorkflow.has_cancel_requested)
435+
436+
await assert_eventually(assert_handler_cancel_requested)
437+
438+
handler_status = (await handler_wf.describe()).status
439+
assert handler_status == WorkflowExecutionStatus.RUNNING
440+
assert not await caller_wf.query(CallerWorkflow.has_caller_op_future_resolved)
441+
442+
await handler_wf.signal(HandlerWorkflow.set_release_cancellation)
411443
try:
412444
await handler_wf.result()
413445
except WorkflowFailureError as err:
@@ -418,8 +450,13 @@ async def check_behavior_for_wait_cancellation_completed(
418450
handler_status = (await handler_wf.describe()).status
419451
assert handler_status == WorkflowExecutionStatus.CANCELED
420452

453+
async def assert_caller_op_future_resolved() -> None:
454+
assert await caller_wf.query(CallerWorkflow.has_caller_op_future_resolved)
455+
456+
await assert_eventually(assert_caller_op_future_resolved)
457+
421458
await caller_wf.signal(CallerWorkflow.release)
422-
result = await caller_wf.result()
459+
await caller_wf.result()
423460

424461
await assert_event_subsequence(
425462
caller_wf,
@@ -430,14 +467,6 @@ async def check_behavior_for_wait_cancellation_completed(
430467
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED,
431468
],
432469
)
433-
handler_wf_canceled_event = await get_event_time(
434-
handler_wf,
435-
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED,
436-
)
437-
assert handler_wf_canceled_event <= result.caller_op_future_resolved, (
438-
"expected caller op future resolved after handler workflow canceled, but got "
439-
f"{result.caller_op_future_resolved} before {handler_wf_canceled_event}"
440-
)
441470

442471

443472
async def has_event(wf_handle: WorkflowHandle, event_type: EventType.ValueType):

tests/nexus/test_workflow_caller_cancellation_types_when_cancel_handler_fails.py

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from temporalio.common import WorkflowIDConflictPolicy
2424
from temporalio.testing import WorkflowEnvironment
2525
from temporalio.worker import Worker
26+
from tests.helpers import assert_eventually
2627
from tests.helpers.nexus import make_nexus_endpoint_name
2728
from tests.nexus.test_workflow_caller_cancellation_types import (
2829
assert_event_subsequence,
@@ -48,6 +49,7 @@ class HandlerWorkflow:
4849
def __init__(self):
4950
self.cancel_handler_released = asyncio.Event()
5051
self.caller_op_future_resolved = asyncio.Event()
52+
self.release_completion = asyncio.Event()
5153

5254
@workflow.run
5355
async def run(self) -> None:
@@ -61,6 +63,11 @@ async def run(self) -> None:
6163
# For WAIT_REQUESTED, we want to prove that the future can be unblocked before the
6264
# handler workflow completes.
6365
await self.caller_op_future_resolved.wait()
66+
elif (
67+
test_context.cancellation_type
68+
== workflow.NexusOperationCancellationType.WAIT_COMPLETED
69+
):
70+
await self.release_completion.wait()
6471

6572
@workflow.signal
6673
def set_cancel_handler_released(self) -> None:
@@ -70,6 +77,14 @@ def set_cancel_handler_released(self) -> None:
7077
def set_caller_op_future_resolved(self) -> None:
7178
self.caller_op_future_resolved.set()
7279

80+
@workflow.signal
81+
def set_release_completion(self) -> None:
82+
self.release_completion.set()
83+
84+
@workflow.query
85+
def has_cancel_handler_released(self) -> bool:
86+
return self.cancel_handler_released.is_set()
87+
7388

7489
@nexusrpc.service
7590
class Service:
@@ -153,6 +168,10 @@ async def get_operation_token(self) -> str:
153168
assert self.operation_token
154169
return self.operation_token
155170

171+
@workflow.query
172+
def has_caller_op_future_resolved(self) -> bool:
173+
return self.caller_op_future_resolved.done()
174+
156175
@workflow.run
157176
async def run(self, input: Input) -> CancellationResult:
158177
op_handle = await (
@@ -370,7 +389,23 @@ async def check_behavior_for_wait_cancellation_completed(
370389
caller_wf: WorkflowHandle[Any, CancellationResult],
371390
handler_wf: WorkflowHandle,
372391
) -> None:
392+
async def assert_cancel_handler_released() -> None:
393+
assert await handler_wf.query(HandlerWorkflow.has_cancel_handler_released)
394+
395+
await assert_eventually(assert_cancel_handler_released)
396+
397+
handler_status = (await handler_wf.describe()).status
398+
assert handler_status == WorkflowExecutionStatus.RUNNING
399+
assert not await caller_wf.query(CallerWorkflow.has_caller_op_future_resolved)
400+
401+
await handler_wf.signal(HandlerWorkflow.set_release_completion)
373402
await handler_wf.result()
403+
404+
async def assert_caller_op_future_resolved() -> None:
405+
assert await caller_wf.query(CallerWorkflow.has_caller_op_future_resolved)
406+
407+
await assert_eventually(assert_caller_op_future_resolved)
408+
374409
await caller_wf.signal(CallerWorkflow.release)
375410
result = await caller_wf.result()
376411
assert not result.error_type
@@ -386,8 +421,3 @@ async def check_behavior_for_wait_cancellation_completed(
386421
EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED,
387422
],
388423
)
389-
handler_wf_completed = await get_event_time(
390-
handler_wf,
391-
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED,
392-
)
393-
assert handler_wf_completed <= result.caller_op_future_resolved

tests/test_activity.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,38 @@ async def test_get_result(client: Client, env: WorkflowEnvironment):
461461
assert await result_via_execute_activity == 2
462462

463463

464+
async def test_start_activity_start_delay(client: Client, env: WorkflowEnvironment):
465+
if env.supports_time_skipping:
466+
pytest.skip(
467+
"Java test server: https://github.com/temporalio/sdk-java/issues/2741"
468+
)
469+
470+
activity_id = str(uuid.uuid4())
471+
task_queue = str(uuid.uuid4())
472+
start_delay = timedelta(seconds=2)
473+
474+
async with Worker(
475+
client,
476+
task_queue=task_queue,
477+
activities=[increment],
478+
):
479+
activity_handle = await client.start_activity(
480+
increment,
481+
args=(1,),
482+
id=activity_id,
483+
task_queue=task_queue,
484+
start_to_close_timeout=timedelta(seconds=5),
485+
start_delay=start_delay,
486+
)
487+
488+
assert await activity_handle.result() == 2
489+
desc = await activity_handle.describe()
490+
assert desc.last_started_time is not None
491+
assert (
492+
desc.last_started_time - desc.scheduled_time
493+
).total_seconds() >= start_delay.total_seconds() - 0.5
494+
495+
464496
async def test_get_activity_handle(client: Client, env: WorkflowEnvironment):
465497
if env.supports_time_skipping:
466498
pytest.skip(

0 commit comments

Comments
 (0)