Skip to content

Commit f93123a

Browse files
committed
feat: add plugin interface
1 parent 85f2d24 commit f93123a

14 files changed

Lines changed: 2527 additions & 63 deletions

File tree

.github/hooks/pre-commit

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#!/bin/sh
2+
3+
if hatch fmt --check; then
4+
echo "Hatch fmt check passed!"
5+
else
6+
hatch fmt
7+
echo "Error: hatch fmt modified your files. Please re-stage and commit again."
8+
exit 1
9+
fi

examples/examples-catalog.json

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,17 @@
580580
"ApplicationLogLevel": "DEBUG",
581581
"LogFormat": "JSON"
582582
}
583-
}
583+
},
584+
{
585+
"name": "Plugin",
586+
"description": "Test plugin",
587+
"handler": "execution_with_plugin.handler",
588+
"integration": true,
589+
"durableConfig": {
590+
"RetentionPeriodInDays": 7,
591+
"ExecutionTimeout": 300
592+
},
593+
"path": "./src/plugin/execution_with_plugin.py"
594+
}
584595
]
585596
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
"""Demonstrates handler execution without any durable operations."""
2+
3+
import logging
4+
from typing import Any
5+
6+
from aws_durable_execution_sdk_python import StepContext
7+
from aws_durable_execution_sdk_python.context import (
8+
DurableContext,
9+
durable_step,
10+
durable_with_child_context,
11+
)
12+
from aws_durable_execution_sdk_python.execution import durable_execution
13+
from aws_durable_execution_sdk_python.plugin import (
14+
DurableExecutionPlugin,
15+
AttemptStartInfo,
16+
)
17+
18+
19+
class MyPlugin(DurableExecutionPlugin):
20+
logger = logging.getLogger("MyPlugin")
21+
22+
def on_execution_start(self, info):
23+
self.logger.info(f"Execution started: {info}")
24+
25+
def on_execution_end(self, info):
26+
self.logger.info(f"Execution ended: {info}")
27+
28+
def on_operation_start(self, info):
29+
self.logger.info(f"Operation started: {info}")
30+
31+
def on_operation_end(self, info):
32+
self.logger.info(f"Operation ended: {info}")
33+
34+
def on_invocation_start(self, info):
35+
self.logger.info(f"Invocation started: {info}")
36+
37+
def on_invocation_end(self, info):
38+
self.logger.info(f"Invocation ended: {info}")
39+
40+
def on_operation_attempt_start(self, info: AttemptStartInfo) -> None:
41+
self.logger.info(f"Attempt started: {info}")
42+
43+
def on_operation_attempt_end(self, info) -> None:
44+
self.logger.info(f"Attempt ended: {info}")
45+
46+
47+
@durable_step
48+
def add_numbers(_step_context: StepContext, a: int, b: int) -> int:
49+
return a + b
50+
51+
52+
@durable_with_child_context
53+
def add_numbers_in_child(child_context: DurableContext, a: int, b: int):
54+
result: int = child_context.step(
55+
add_numbers(a, b),
56+
name="add-a-and-b",
57+
)
58+
return result
59+
60+
61+
@durable_execution(plugins=[MyPlugin()])
62+
def handler(_event: Any, context: DurableContext) -> int:
63+
result: int = context.run_in_child_context(
64+
add_numbers_in_child(6, 4),
65+
name="add-6-and-4",
66+
)
67+
return context.step(
68+
add_numbers(result, 2),
69+
name="add-result-to-2",
70+
)

examples/template.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -941,6 +941,24 @@
941941
"ExecutionTimeout": 300
942942
}
943943
}
944+
},
945+
"ExecutionWithPlugin": {
946+
"Type": "AWS::Serverless::Function",
947+
"Properties": {
948+
"CodeUri": "build/",
949+
"Handler": "execution_with_plugin.handler",
950+
"Description": "Test plugin",
951+
"Role": {
952+
"Fn::GetAtt": [
953+
"DurableFunctionRole",
954+
"Arn"
955+
]
956+
},
957+
"DurableConfig": {
958+
"RetentionPeriodInDays": 7,
959+
"ExecutionTimeout": 300
960+
}
961+
}
944962
}
945963
}
946964
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""Tests for step example."""
2+
3+
import pytest
4+
from aws_durable_execution_sdk_python.execution import InvocationStatus
5+
6+
from src.step import step
7+
from test.conftest import deserialize_operation_payload
8+
9+
10+
@pytest.mark.example
11+
@pytest.mark.durable_execution(
12+
handler=step.handler,
13+
lambda_function_name="Plugin",
14+
)
15+
def test_step(durable_runner):
16+
"""Test basic step example."""
17+
with durable_runner:
18+
result = durable_runner.run(input="{}", timeout=10)
19+
20+
assert result.status is InvocationStatus.SUCCEEDED
21+
assert deserialize_operation_payload(result.result) == 8
22+
23+
step_result = result.get_step("add_numbers")
24+
assert deserialize_operation_payload(step_result.result) == 8

src/aws_durable_execution_sdk_python/execution.py

Lines changed: 31 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import logging
77
from concurrent.futures import ThreadPoolExecutor
88
from dataclasses import dataclass
9-
from enum import Enum
109
from typing import TYPE_CHECKING, Any
1110

1211
from aws_durable_execution_sdk_python.context import DurableContext
@@ -26,6 +25,13 @@
2625
Operation,
2726
OperationType,
2827
OperationUpdate,
28+
InvocationStatus,
29+
DurableExecutionInvocationOutput,
30+
)
31+
from aws_durable_execution_sdk_python.plugin import (
32+
DurableExecutionPlugin,
33+
PluginExecutor,
34+
handle_plugins,
2935
)
3036
from aws_durable_execution_sdk_python.state import ExecutionState, ReplayStatus
3137

@@ -149,77 +155,36 @@ def from_durable_execution_invocation_input(
149155
)
150156

151157

152-
class InvocationStatus(Enum):
153-
SUCCEEDED = "SUCCEEDED"
154-
FAILED = "FAILED"
155-
PENDING = "PENDING"
156-
157-
158-
@dataclass(frozen=True)
159-
class DurableExecutionInvocationOutput:
160-
"""Representation the DurableExecutionInvocationOutput. This is what the Durable lambda handler returns.
161-
162-
If the execution has been already completed via an update to the EXECUTION operation via CheckpointDurableExecution,
163-
payload must be empty for SUCCEEDED/FAILED status.
164-
"""
165-
166-
status: InvocationStatus
167-
result: str | None = None
168-
error: ErrorObject | None = None
169-
170-
@classmethod
171-
def from_dict(
172-
cls, data: MutableMapping[str, Any]
173-
) -> DurableExecutionInvocationOutput:
174-
"""Create an instance from a dictionary.
175-
176-
Args:
177-
data: Dictionary with camelCase keys matching the original structure
178-
179-
Returns:
180-
A DurableExecutionInvocationOutput instance
181-
"""
182-
status = InvocationStatus(data.get("Status"))
183-
error = ErrorObject.from_dict(data["Error"]) if data.get("Error") else None
184-
return cls(status=status, result=data.get("Result"), error=error)
185-
186-
def to_dict(self) -> MutableMapping[str, Any]:
187-
"""Convert to a dictionary with the original field names.
188-
189-
Returns:
190-
Dictionary with the original camelCase keys
191-
"""
192-
result: MutableMapping[str, Any] = {"Status": self.status.value}
193-
194-
if self.result is not None:
195-
# large payloads return "", because checkpointed already
196-
result["Result"] = self.result
197-
if self.error:
198-
result["Error"] = self.error.to_dict()
199-
200-
return result
201-
202-
@classmethod
203-
def create_succeeded(cls, result: str) -> DurableExecutionInvocationOutput:
204-
"""Create a succeeded invocation output."""
205-
return cls(status=InvocationStatus.SUCCEEDED, result=result)
206-
207-
208158
# endregion Invocation models
209159

210160

211161
def durable_execution(
212162
func: Callable[[Any, DurableContext], Any] | None = None,
213163
*,
214164
boto3_client: Boto3LambdaClient | None = None,
165+
plugins: list[DurableExecutionPlugin] | None = None,
215166
) -> Callable[[Any, LambdaContext], Any]:
167+
"""
168+
Decorator to create a durable execution handler.
169+
170+
Args:
171+
func: The user function to decorate
172+
boto3_client: Optional boto3 Lambda client to use
173+
plugins: Optional list of plugins to use (EXPERIMENTAL: This
174+
parameter may change or be removed.)
175+
"""
216176
# Decorator called with parameters
217177
if func is None:
218178
logger.debug("Decorator called with parameters")
219-
return functools.partial(durable_execution, boto3_client=boto3_client)
179+
return functools.partial(
180+
durable_execution, boto3_client=boto3_client, plugins=plugins
181+
)
220182

221183
logger.debug("Starting durable execution handler...")
222184

185+
plugin_executor = PluginExecutor(plugins)
186+
187+
@handle_plugins(plugin_executor)
223188
def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
224189
invocation_input: DurableExecutionInvocationInput
225190
service_client: DurableServiceClient
@@ -255,6 +220,7 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
255220
operations={},
256221
service_client=service_client,
257222
replay_status=ReplayStatus.NEW,
223+
plugin_executor=plugin_executor,
258224
)
259225

260226
try:
@@ -306,6 +272,13 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
306272
) as executor,
307273
contextlib.closing(execution_state) as execution_state,
308274
):
275+
# execute the plugins
276+
plugin_executor.on_invocation_start(
277+
durable_execution_arn=invocation_input.durable_execution_arn,
278+
context=context,
279+
execution_operation=execution_state.get_execution_operation(),
280+
is_replaying=execution_state.is_replaying(),
281+
)
309282
# Thread 1: Run background checkpoint processing
310283
executor.submit(execution_state.checkpoint_batches_forever)
311284

src/aws_durable_execution_sdk_python/lambda_service.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,70 @@ class OperationSubType(Enum):
105105
CHAINED_INVOKE = "ChainedInvoke"
106106

107107

108+
class InvocationStatus(Enum):
109+
SUCCEEDED = "SUCCEEDED"
110+
FAILED = "FAILED"
111+
PENDING = "PENDING"
112+
113+
# Used internally only: the invocation failed and the backend will retry
114+
RETRY = "RETRY"
115+
116+
117+
@dataclass(frozen=True)
118+
class DurableExecutionInvocationOutput:
119+
"""Representation the DurableExecutionInvocationOutput. This is what the Durable lambda handler returns.
120+
121+
If the execution has been already completed via an update to the EXECUTION operation via CheckpointDurableExecution,
122+
payload must be empty for SUCCEEDED/FAILED status.
123+
"""
124+
125+
status: InvocationStatus
126+
result: str | None = None
127+
error: ErrorObject | None = None
128+
129+
@classmethod
130+
def from_dict(
131+
cls, data: MutableMapping[str, Any]
132+
) -> DurableExecutionInvocationOutput:
133+
"""Create an instance from a dictionary.
134+
135+
Args:
136+
data: Dictionary with camelCase keys matching the original structure
137+
138+
Returns:
139+
A DurableExecutionInvocationOutput instance
140+
"""
141+
status = InvocationStatus(data.get("Status"))
142+
error = ErrorObject.from_dict(data["Error"]) if data.get("Error") else None
143+
return cls(status=status, result=data.get("Result"), error=error)
144+
145+
def to_dict(self) -> MutableMapping[str, Any]:
146+
"""Convert to a dictionary with the original field names.
147+
148+
Returns:
149+
Dictionary with the original camelCase keys
150+
"""
151+
result: MutableMapping[str, Any] = {"Status": self.status.value}
152+
153+
if self.result is not None:
154+
# large payloads return "", because checkpointed already
155+
result["Result"] = self.result
156+
if self.error:
157+
result["Error"] = self.error.to_dict()
158+
159+
return result
160+
161+
@classmethod
162+
def create_succeeded(cls, result: str) -> DurableExecutionInvocationOutput:
163+
"""Create a succeeded invocation output."""
164+
return cls(status=InvocationStatus.SUCCEEDED, result=result)
165+
166+
@classmethod
167+
def create_retry(cls, error: ErrorObject) -> DurableExecutionInvocationOutput:
168+
"""Create a failed invocation output."""
169+
return cls(status=InvocationStatus.RETRY, error=error)
170+
171+
108172
@dataclass(frozen=True)
109173
class ExecutionDetails:
110174
input_payload: str | None = None

0 commit comments

Comments
 (0)