Skip to content

Commit b0b934f

Browse files
authored
Expose continue-as-new backoff start interval (#1613)
* Expose continue-as-new backoff start interval * Add changelog note
1 parent c37fa7e commit b0b934f

10 files changed

Lines changed: 77 additions & 35 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ to include examples, links to docs, or any other relevant information.
2020

2121
### Added
2222

23+
- Exposed `backoff_start_interval` for continue-as-new, to allow the new workflow to start after a delay.
24+
2325
### Changed
2426

2527
- AWS Lambda worker `configure` parameter supports sync, async, and async

temporalio/bridge/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

temporalio/bridge/proto/workflow_commands/workflow_commands_pb2.py

Lines changed: 34 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

temporalio/bridge/proto/workflow_commands/workflow_commands_pb2.pyi

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,7 @@ class ContinueAsNewWorkflowExecution(google.protobuf.message.Message):
942942
RETRY_POLICY_FIELD_NUMBER: builtins.int
943943
VERSIONING_INTENT_FIELD_NUMBER: builtins.int
944944
INITIAL_VERSIONING_BEHAVIOR_FIELD_NUMBER: builtins.int
945+
BACKOFF_START_INTERVAL_FIELD_NUMBER: builtins.int
945946
workflow_type: builtins.str
946947
"""The identifier the lang-specific sdk uses to execute workflow code"""
947948
task_queue: builtins.str
@@ -1000,6 +1001,9 @@ class ContinueAsNewWorkflowExecution(google.protobuf.message.Message):
10001001
For example, choose to AutoUpgrade on continue-as-new instead of inheriting the pinned version
10011002
of the previous run.
10021003
"""
1004+
@property
1005+
def backoff_start_interval(self) -> google.protobuf.duration_pb2.Duration:
1006+
"""Delay before the first workflow task of the continued run is scheduled."""
10031007
def __init__(
10041008
self,
10051009
*,
@@ -1024,10 +1028,13 @@ class ContinueAsNewWorkflowExecution(google.protobuf.message.Message):
10241028
retry_policy: temporalio.api.common.v1.message_pb2.RetryPolicy | None = ...,
10251029
versioning_intent: temporalio.bridge.proto.common.common_pb2.VersioningIntent.ValueType = ...,
10261030
initial_versioning_behavior: temporalio.api.enums.v1.workflow_pb2.ContinueAsNewVersioningBehavior.ValueType = ...,
1031+
backoff_start_interval: google.protobuf.duration_pb2.Duration | None = ...,
10271032
) -> None: ...
10281033
def HasField(
10291034
self,
10301035
field_name: typing_extensions.Literal[
1036+
"backoff_start_interval",
1037+
b"backoff_start_interval",
10311038
"retry_policy",
10321039
b"retry_policy",
10331040
"search_attributes",
@@ -1043,6 +1050,8 @@ class ContinueAsNewWorkflowExecution(google.protobuf.message.Message):
10431050
field_name: typing_extensions.Literal[
10441051
"arguments",
10451052
b"arguments",
1053+
"backoff_start_interval",
1054+
b"backoff_start_interval",
10461055
"headers",
10471056
b"headers",
10481057
"initial_versioning_behavior",

temporalio/bridge/sdk-core

Submodule sdk-core updated 37 files

temporalio/worker/_interceptor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ class ContinueAsNewInput:
165165
task_queue: str | None
166166
run_timeout: timedelta | None
167167
task_timeout: timedelta | None
168+
backoff_start_interval: timedelta | None
168169
retry_policy: temporalio.common.RetryPolicy | None
169170
memo: Mapping[str, Any] | None
170171
search_attributes: None | (

temporalio/worker/_workflow_instance.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1148,6 +1148,7 @@ def workflow_continue_as_new(
11481148
task_queue: str | None,
11491149
run_timeout: timedelta | None,
11501150
task_timeout: timedelta | None,
1151+
backoff_start_interval: timedelta | None,
11511152
retry_policy: temporalio.common.RetryPolicy | None,
11521153
memo: Mapping[str, Any] | None,
11531154
search_attributes: None
@@ -1178,6 +1179,7 @@ def workflow_continue_as_new(
11781179
task_queue=task_queue,
11791180
run_timeout=run_timeout,
11801181
task_timeout=task_timeout,
1182+
backoff_start_interval=backoff_start_interval,
11811183
retry_policy=retry_policy,
11821184
memo=memo,
11831185
search_attributes=search_attributes,
@@ -3587,6 +3589,8 @@ def _apply_command(self) -> None:
35873589
v.workflow_run_timeout.FromTimedelta(self._input.run_timeout)
35883590
if self._input.task_timeout:
35893591
v.workflow_task_timeout.FromTimedelta(self._input.task_timeout)
3592+
if self._input.backoff_start_interval:
3593+
v.backoff_start_interval.FromTimedelta(self._input.backoff_start_interval)
35903594
if self._input.headers:
35913595
temporalio.common._apply_headers(self._input.headers, v.headers)
35923596
if self._input.retry_policy:

temporalio/workflow/_context.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ def workflow_continue_as_new(
286286
task_queue: str | None,
287287
run_timeout: timedelta | None,
288288
task_timeout: timedelta | None,
289+
backoff_start_interval: timedelta | None,
289290
retry_policy: temporalio.common.RetryPolicy | None,
290291
memo: Mapping[str, Any] | None,
291292
search_attributes: None

temporalio/workflow/_workflow_ops.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,7 @@ def continue_as_new(
683683
task_queue: str | None = None,
684684
run_timeout: timedelta | None = None,
685685
task_timeout: timedelta | None = None,
686+
backoff_start_interval: timedelta | None = None,
686687
retry_policy: temporalio.common.RetryPolicy | None = None,
687688
memo: Mapping[str, Any] | None = None,
688689
search_attributes: None
@@ -702,6 +703,7 @@ def continue_as_new(
702703
task_queue: str | None = None,
703704
run_timeout: timedelta | None = None,
704705
task_timeout: timedelta | None = None,
706+
backoff_start_interval: timedelta | None = None,
705707
retry_policy: temporalio.common.RetryPolicy | None = None,
706708
memo: Mapping[str, Any] | None = None,
707709
search_attributes: None
@@ -722,6 +724,7 @@ def continue_as_new(
722724
task_queue: str | None = None,
723725
run_timeout: timedelta | None = None,
724726
task_timeout: timedelta | None = None,
727+
backoff_start_interval: timedelta | None = None,
725728
retry_policy: temporalio.common.RetryPolicy | None = None,
726729
memo: Mapping[str, Any] | None = None,
727730
search_attributes: None
@@ -742,6 +745,7 @@ def continue_as_new(
742745
task_queue: str | None = None,
743746
run_timeout: timedelta | None = None,
744747
task_timeout: timedelta | None = None,
748+
backoff_start_interval: timedelta | None = None,
745749
retry_policy: temporalio.common.RetryPolicy | None = None,
746750
memo: Mapping[str, Any] | None = None,
747751
search_attributes: None
@@ -762,6 +766,7 @@ def continue_as_new(
762766
task_queue: str | None = None,
763767
run_timeout: timedelta | None = None,
764768
task_timeout: timedelta | None = None,
769+
backoff_start_interval: timedelta | None = None,
765770
retry_policy: temporalio.common.RetryPolicy | None = None,
766771
memo: Mapping[str, Any] | None = None,
767772
search_attributes: None
@@ -781,6 +786,7 @@ def continue_as_new(
781786
task_queue: str | None = None,
782787
run_timeout: timedelta | None = None,
783788
task_timeout: timedelta | None = None,
789+
backoff_start_interval: timedelta | None = None,
784790
retry_policy: temporalio.common.RetryPolicy | None = None,
785791
memo: Mapping[str, Any] | None = None,
786792
search_attributes: None
@@ -804,6 +810,8 @@ def continue_as_new(
804810
workflow's run timeout.
805811
task_timeout: Timeout of a single workflow task. Defaults to the current
806812
workflow's task timeout.
813+
backoff_start_interval: Delay before the first workflow task of the
814+
continued run is scheduled.
807815
memo: Memo for the workflow. Defaults to the current workflow's memo.
808816
search_attributes: Search attributes for the workflow. Defaults to the
809817
current workflow's search attributes. The dictionary form of this is
@@ -826,6 +834,7 @@ def continue_as_new(
826834
task_queue=task_queue,
827835
run_timeout=run_timeout,
828836
task_timeout=task_timeout,
837+
backoff_start_interval=backoff_start_interval,
829838
retry_policy=retry_policy,
830839
memo=memo,
831840
search_attributes=search_attributes,

tests/worker/test_workflow.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1893,6 +1893,7 @@ async def run(self, past_run_ids: list[str]) -> list[str]:
18931893
# Add memo and retry policy to check
18941894
memo={"past_run_id_count": len(past_run_ids)},
18951895
retry_policy=RetryPolicy(maximum_attempts=1000 + len(past_run_ids)),
1896+
backoff_start_interval=timedelta(milliseconds=1),
18961897
)
18971898

18981899

@@ -1914,6 +1915,19 @@ async def test_workflow_continue_as_new(client: Client, env: WorkflowEnvironment
19141915
result = await handle.result()
19151916
assert len(result) == 5
19161917
assert result[0] == handle.first_execution_run_id
1918+
first_run_handle = client.get_workflow_handle(
1919+
handle.id, run_id=handle.first_execution_run_id
1920+
)
1921+
history = await first_run_handle.fetch_history()
1922+
continued = [
1923+
event.workflow_execution_continued_as_new_event_attributes
1924+
for event in history.events
1925+
if event.HasField("workflow_execution_continued_as_new_event_attributes")
1926+
]
1927+
assert continued
1928+
assert continued[0].backoff_start_interval.ToTimedelta() == timedelta(
1929+
milliseconds=1
1930+
)
19171931

19181932

19191933
sa_prefix = "python_test_"

0 commit comments

Comments
 (0)