Skip to content

Commit 58d65cd

Browse files
committed
feat: add plugin interface
1 parent 85f2d24 commit 58d65cd

9 files changed

Lines changed: 2394 additions & 61 deletions

File tree

src/aws_durable_execution_sdk_python/execution.py

Lines changed: 31 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@
2626
Operation,
2727
OperationType,
2828
OperationUpdate,
29+
InvocationStatus,
30+
DurableExecutionInvocationOutput,
31+
)
32+
from aws_durable_execution_sdk_python.plugin import (
33+
DurableExecutionPlugin,
34+
PluginExecutor,
35+
handle_plugins,
2936
)
3037
from aws_durable_execution_sdk_python.state import ExecutionState, ReplayStatus
3138

@@ -149,77 +156,36 @@ def from_durable_execution_invocation_input(
149156
)
150157

151158

152-
class InvocationStatus(Enum):
153-
SUCCEEDED = "SUCCEEDED"
154-
FAILED = "FAILED"
155-
PENDING = "PENDING"
156-
157-
158-
@dataclass(frozen=True)
159-
class DurableExecutionInvocationOutput:
160-
"""Representation the DurableExecutionInvocationOutput. This is what the Durable lambda handler returns.
161-
162-
If the execution has been already completed via an update to the EXECUTION operation via CheckpointDurableExecution,
163-
payload must be empty for SUCCEEDED/FAILED status.
164-
"""
165-
166-
status: InvocationStatus
167-
result: str | None = None
168-
error: ErrorObject | None = None
169-
170-
@classmethod
171-
def from_dict(
172-
cls, data: MutableMapping[str, Any]
173-
) -> DurableExecutionInvocationOutput:
174-
"""Create an instance from a dictionary.
175-
176-
Args:
177-
data: Dictionary with camelCase keys matching the original structure
178-
179-
Returns:
180-
A DurableExecutionInvocationOutput instance
181-
"""
182-
status = InvocationStatus(data.get("Status"))
183-
error = ErrorObject.from_dict(data["Error"]) if data.get("Error") else None
184-
return cls(status=status, result=data.get("Result"), error=error)
185-
186-
def to_dict(self) -> MutableMapping[str, Any]:
187-
"""Convert to a dictionary with the original field names.
188-
189-
Returns:
190-
Dictionary with the original camelCase keys
191-
"""
192-
result: MutableMapping[str, Any] = {"Status": self.status.value}
193-
194-
if self.result is not None:
195-
# large payloads return "", because checkpointed already
196-
result["Result"] = self.result
197-
if self.error:
198-
result["Error"] = self.error.to_dict()
199-
200-
return result
201-
202-
@classmethod
203-
def create_succeeded(cls, result: str) -> DurableExecutionInvocationOutput:
204-
"""Create a succeeded invocation output."""
205-
return cls(status=InvocationStatus.SUCCEEDED, result=result)
206-
207-
208159
# endregion Invocation models
209160

210161

211162
def durable_execution(
212163
func: Callable[[Any, DurableContext], Any] | None = None,
213164
*,
214165
boto3_client: Boto3LambdaClient | None = None,
166+
plugins: list[DurableExecutionPlugin] | None = None,
215167
) -> Callable[[Any, LambdaContext], Any]:
168+
"""
169+
Decorator to create a durable execution handler.
170+
171+
Args:
172+
func: The user function to decorate
173+
boto3_client: Optional boto3 Lambda client to use
174+
plugins: Optional list of plugins to use (EXPERIMENTAL: This
175+
parameter may change or be removed.)
176+
"""
216177
# Decorator called with parameters
217178
if func is None:
218179
logger.debug("Decorator called with parameters")
219-
return functools.partial(durable_execution, boto3_client=boto3_client)
180+
return functools.partial(
181+
durable_execution, boto3_client=boto3_client, plugins=plugins
182+
)
220183

221184
logger.debug("Starting durable execution handler...")
222185

186+
plugin_executor = PluginExecutor(plugins)
187+
188+
@handle_plugins(plugin_executor)
223189
def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
224190
invocation_input: DurableExecutionInvocationInput
225191
service_client: DurableServiceClient
@@ -255,6 +221,7 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
255221
operations={},
256222
service_client=service_client,
257223
replay_status=ReplayStatus.NEW,
224+
plugin_executor=plugin_executor,
258225
)
259226

260227
try:
@@ -306,6 +273,13 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
306273
) as executor,
307274
contextlib.closing(execution_state) as execution_state,
308275
):
276+
# execute the plugins
277+
plugin_executor.on_invocation_start(
278+
durable_execution_arn=invocation_input.durable_execution_arn,
279+
context=context,
280+
execution_operation=execution_state.get_execution_operation(),
281+
is_replaying=execution_state.is_replaying(),
282+
)
309283
# Thread 1: Run background checkpoint processing
310284
executor.submit(execution_state.checkpoint_batches_forever)
311285

src/aws_durable_execution_sdk_python/lambda_service.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,70 @@ class OperationSubType(Enum):
105105
CHAINED_INVOKE = "ChainedInvoke"
106106

107107

108+
class InvocationStatus(Enum):
109+
SUCCEEDED = "SUCCEEDED"
110+
FAILED = "FAILED"
111+
PENDING = "PENDING"
112+
113+
# Used internally only: the invocation failed and the backend will retry
114+
RETRY = "RETRY"
115+
116+
117+
@dataclass(frozen=True)
118+
class DurableExecutionInvocationOutput:
119+
"""Representation the DurableExecutionInvocationOutput. This is what the Durable lambda handler returns.
120+
121+
If the execution has been already completed via an update to the EXECUTION operation via CheckpointDurableExecution,
122+
payload must be empty for SUCCEEDED/FAILED status.
123+
"""
124+
125+
status: InvocationStatus
126+
result: str | None = None
127+
error: ErrorObject | None = None
128+
129+
@classmethod
130+
def from_dict(
131+
cls, data: MutableMapping[str, Any]
132+
) -> DurableExecutionInvocationOutput:
133+
"""Create an instance from a dictionary.
134+
135+
Args:
136+
data: Dictionary with camelCase keys matching the original structure
137+
138+
Returns:
139+
A DurableExecutionInvocationOutput instance
140+
"""
141+
status = InvocationStatus(data.get("Status"))
142+
error = ErrorObject.from_dict(data["Error"]) if data.get("Error") else None
143+
return cls(status=status, result=data.get("Result"), error=error)
144+
145+
def to_dict(self) -> MutableMapping[str, Any]:
146+
"""Convert to a dictionary with the original field names.
147+
148+
Returns:
149+
Dictionary with the original camelCase keys
150+
"""
151+
result: MutableMapping[str, Any] = {"Status": self.status.value}
152+
153+
if self.result is not None:
154+
# large payloads return "", because checkpointed already
155+
result["Result"] = self.result
156+
if self.error:
157+
result["Error"] = self.error.to_dict()
158+
159+
return result
160+
161+
@classmethod
162+
def create_succeeded(cls, result: str) -> DurableExecutionInvocationOutput:
163+
"""Create a succeeded invocation output."""
164+
return cls(status=InvocationStatus.SUCCEEDED, result=result)
165+
166+
@classmethod
167+
def create_retry(cls, error: ErrorObject) -> DurableExecutionInvocationOutput:
168+
"""Create a failed invocation output."""
169+
return cls(status=InvocationStatus.RETRY, error=error)
170+
171+
108172
@dataclass(frozen=True)
109173
class ExecutionDetails:
110174
input_payload: str | None = None

0 commit comments

Comments
 (0)