Skip to content

Commit cce7833

Browse files
committed
feat: add plugin interface
1 parent 85f2d24 commit cce7833

11 files changed

Lines changed: 2423 additions & 42 deletions

src/aws_durable_execution_sdk_python/execution.py

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
Operation,
2727
OperationType,
2828
OperationUpdate,
29+
InvocationStatus,
2930
)
31+
from aws_durable_execution_sdk_python.plugin import DurableExecutionPlugin, PluginExecutor
3032
from aws_durable_execution_sdk_python.state import ExecutionState, ReplayStatus
3133

3234
if TYPE_CHECKING:
@@ -149,12 +151,6 @@ def from_durable_execution_invocation_input(
149151
)
150152

151153

152-
class InvocationStatus(Enum):
153-
SUCCEEDED = "SUCCEEDED"
154-
FAILED = "FAILED"
155-
PENDING = "PENDING"
156-
157-
158154
@dataclass(frozen=True)
159155
class DurableExecutionInvocationOutput:
160156
"""Representation the DurableExecutionInvocationOutput. This is what the Durable lambda handler returns.
@@ -204,22 +200,63 @@ def create_succeeded(cls, result: str) -> DurableExecutionInvocationOutput:
204200
"""Create a succeeded invocation output."""
205201
return cls(status=InvocationStatus.SUCCEEDED, result=result)
206202

203+
@classmethod
204+
def create_retry(cls, error: ErrorObject) -> DurableExecutionInvocationOutput:
205+
"""Create a failed invocation output."""
206+
return cls(status=InvocationStatus.RETRY, error=error)
207207

208208
# endregion Invocation models
209209

210210

211+
def handle_plugins(plugin_executor: PluginExecutor):
212+
def decorator(func):
213+
@functools.wraps(func)
214+
def wrapper(event: Any, context: LambdaContext):
215+
with plugin_executor.start():
216+
durable_execution_arn = event.get("DurableExecutionArn")
217+
try:
218+
output = func(event, context)
219+
plugin_executor.on_invocation_end(
220+
durable_execution_arn=durable_execution_arn,
221+
context=context,
222+
output=output,
223+
)
224+
except Exception as e:
225+
plugin_executor.on_invocation_end(
226+
durable_execution_arn=durable_execution_arn,
227+
context=context,
228+
output=DurableExecutionInvocationOutput.create_retry(ErrorObject.from_exception(e)),
229+
)
230+
231+
return wrapper
232+
return decorator
233+
234+
211235
def durable_execution(
212236
func: Callable[[Any, DurableContext], Any] | None = None,
213237
*,
214238
boto3_client: Boto3LambdaClient | None = None,
239+
plugins: list[DurableExecutionPlugin] | None = None,
215240
) -> Callable[[Any, LambdaContext], Any]:
241+
"""
242+
Decorator to create a durable execution handler.
243+
244+
Args:
245+
func: The user function to decorate
246+
boto3_client: Optional boto3 Lambda client to use
247+
plugins: Optional list of plugins to use (EXPERIMENTAL: This
248+
parameter may change or be removed.)
249+
"""
216250
# Decorator called with parameters
217251
if func is None:
218252
logger.debug("Decorator called with parameters")
219-
return functools.partial(durable_execution, boto3_client=boto3_client)
253+
return functools.partial(durable_execution, boto3_client=boto3_client, plugins=plugins)
220254

221255
logger.debug("Starting durable execution handler...")
222256

257+
plugin_executor = PluginExecutor(plugins)
258+
259+
@handle_plugins(plugin_executor)
223260
def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
224261
invocation_input: DurableExecutionInvocationInput
225262
service_client: DurableServiceClient
@@ -255,6 +292,7 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
255292
operations={},
256293
service_client=service_client,
257294
replay_status=ReplayStatus.NEW,
295+
plugin_executor=plugin_executor,
258296
)
259297

260298
try:
@@ -306,6 +344,13 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
306344
) as executor,
307345
contextlib.closing(execution_state) as execution_state,
308346
):
347+
# execute the plugins
348+
plugin_executor.on_invocation_start(
349+
durable_execution_arn=invocation_input.durable_execution_arn,
350+
context=context,
351+
execution_operation=execution_state.get_execution_operation(),
352+
is_replaying=execution_state.is_replaying(),
353+
)
309354
# Thread 1: Run background checkpoint processing
310355
executor.submit(execution_state.checkpoint_batches_forever)
311356

src/aws_durable_execution_sdk_python/lambda_service.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,15 @@ class OperationSubType(Enum):
105105
CHAINED_INVOKE = "ChainedInvoke"
106106

107107

108+
class InvocationStatus(Enum):
109+
SUCCEEDED = "SUCCEEDED"
110+
FAILED = "FAILED"
111+
PENDING = "PENDING"
112+
113+
# Used internally only: the invocation failed and the backend will retry
114+
RETRY = "RETRY"
115+
116+
108117
@dataclass(frozen=True)
109118
class ExecutionDetails:
110119
input_payload: str | None = None

0 commit comments

Comments
 (0)