Skip to content

Commit 331ecbc

Browse files
committed
feat(plugin): Wire plugin hooks into SDK execution lifecycle
1 parent 56534df commit 331ecbc

8 files changed

Lines changed: 754 additions & 7 deletions

File tree

sdk-integration-tests/src/test/java/software/amazon/lambda/durable/PluginIntegrationTest.java

Lines changed: 434 additions & 0 deletions
Large diffs are not rendered by default.

sdk-testing/src/main/java/software/amazon/lambda/durable/testing/LocalDurableTestRunner.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ private LocalDurableTestRunner(
6161
.withPollingStrategy(customerConfig.getPollingStrategy())
6262
.withCheckpointDelay(customerConfig.getCheckpointDelay())
6363
.withLoggerConfig(customerConfig.getLoggerConfig())
64+
.withPluginRunner(customerConfig.getPluginRunner())
6465
.build();
6566
} else {
6667
// Fallback to default config with in-memory client

sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import java.io.IOException;
66
import java.io.InputStream;
77
import java.time.Duration;
8+
import java.util.ArrayList;
9+
import java.util.List;
810
import java.util.Objects;
911
import java.util.Properties;
1012
import java.util.concurrent.ExecutorService;
@@ -21,6 +23,7 @@
2123
import software.amazon.lambda.durable.client.DurableExecutionClient;
2224
import software.amazon.lambda.durable.client.LambdaDurableFunctionsClient;
2325
import software.amazon.lambda.durable.logging.LoggerConfig;
26+
import software.amazon.lambda.durable.plugin.DurableExecutionPlugin;
2427
import software.amazon.lambda.durable.plugin.PluginRunner;
2528
import software.amazon.lambda.durable.retry.PollingStrategies;
2629
import software.amazon.lambda.durable.retry.PollingStrategy;
@@ -106,7 +109,9 @@ private DurableConfig(Builder builder) {
106109
this.loggerConfig = Objects.requireNonNullElseGet(builder.loggerConfig, LoggerConfig::defaults);
107110
this.pollingStrategy = Objects.requireNonNullElse(builder.pollingStrategy, PollingStrategies.Presets.DEFAULT);
108111
this.checkpointDelay = Objects.requireNonNullElseGet(builder.checkpointDelay, () -> Duration.ofSeconds(0));
109-
this.pluginRunner = PluginRunner.noOp();
112+
this.pluginRunner = builder.pluginRunner != null
113+
? builder.pluginRunner
114+
: (builder.plugins.isEmpty() ? PluginRunner.noOp() : new PluginRunner(builder.plugins));
110115

111116
validateConfiguration();
112117
}
@@ -186,10 +191,9 @@ public Duration getCheckpointDelay() {
186191
/**
187192
* Gets the plugin runner that dispatches lifecycle events to registered plugins.
188193
*
189-
* <p>Currently returns a no-op runner. Plugin registration via config will be added when the plugin system is fully
190-
* wired.
194+
* <p>Returns a no-op runner if no plugins were registered via the builder.
191195
*
192-
* @return PluginRunner instance (always no-op until plugin wiring is complete)
196+
* @return PluginRunner instance (never null)
193197
* @deprecated This is a preview API that is experimental and may be changed or removed in future releases.
194198
*/
195199
@Deprecated
@@ -291,6 +295,8 @@ public static final class Builder {
291295
private LoggerConfig loggerConfig;
292296
private PollingStrategy pollingStrategy;
293297
private Duration checkpointDelay;
298+
private final List<DurableExecutionPlugin> plugins = new ArrayList<>();
299+
private PluginRunner pluginRunner;
294300

295301
public Builder() {}
296302

@@ -400,6 +406,45 @@ public Builder withCheckpointDelay(Duration duration) {
400406
return this;
401407
}
402408

409+
/**
410+
* Registers one or more plugins for lifecycle event instrumentation.
411+
*
412+
* <p>Plugins receive hooks at invocation, operation, and user function boundaries. Errors thrown by plugins are
413+
* isolated and never disrupt SDK execution.
414+
*
415+
* <p>Multiple plugins can be registered by calling this method multiple times. Plugins are called in
416+
* registration order.
417+
*
418+
* @param plugins the plugins to register
419+
* @return This builder
420+
* @throws NullPointerException if any plugin is null
421+
* @deprecated This is a preview API that is experimental and may be changed or removed in future releases.
422+
*/
423+
@Deprecated
424+
public Builder withPlugins(DurableExecutionPlugin... plugins) {
425+
Objects.requireNonNull(plugins, "Plugins array cannot be null");
426+
for (var plugin : plugins) {
427+
this.plugins.add(Objects.requireNonNull(plugin, "Plugin cannot be null"));
428+
}
429+
return this;
430+
}
431+
432+
/**
433+
* Sets a pre-built PluginRunner directly, bypassing individual plugin registration.
434+
*
435+
* <p>This is primarily used internally (e.g., by the test runner) to transfer an existing PluginRunner to a
436+
* rebuilt DurableConfig without needing access to the original plugin list.
437+
*
438+
* @param pluginRunner the pre-built PluginRunner
439+
* @return This builder
440+
* @deprecated This is a preview API that is experimental and may be changed or removed in future releases.
441+
*/
442+
@Deprecated
443+
public Builder withPluginRunner(PluginRunner pluginRunner) {
444+
this.pluginRunner = pluginRunner;
445+
return this;
446+
}
447+
403448
/**
404449
* Builds the DurableConfig instance.
405450
*

sdk/src/main/java/software/amazon/lambda/durable/execution/DurableExecutor.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
2525
import software.amazon.lambda.durable.model.DurableExecutionInput;
2626
import software.amazon.lambda.durable.model.DurableExecutionOutput;
27+
import software.amazon.lambda.durable.plugin.InvocationEndInfo;
28+
import software.amazon.lambda.durable.plugin.InvocationInfo;
29+
import software.amazon.lambda.durable.plugin.InvocationStatus;
30+
import software.amazon.lambda.durable.plugin.PluginRunner;
2731
import software.amazon.lambda.durable.serde.SerDes;
2832
import software.amazon.lambda.durable.util.ExceptionHelper;
2933

@@ -48,7 +52,15 @@ public static <I, O> DurableExecutionOutput execute(
4852
TypeToken<I> inputType,
4953
BiFunction<I, DurableContext, O> handler,
5054
DurableConfig config) {
55+
var pluginRunner = config.getPluginRunner();
5156
try (var executionManager = new ExecutionManager(input, config)) {
57+
var isFirstInvocation = !executionManager.isReplaying();
58+
var requestId = lambdaContext != null ? lambdaContext.getAwsRequestId() : null;
59+
var executionArn = input.durableExecutionArn();
60+
61+
// Fire onInvocationStart plugin hook
62+
pluginRunner.onInvocationStart(new InvocationInfo(requestId, executionArn, isFirstInvocation));
63+
5264
executionManager.registerActiveThread(null);
5365
var handlerFuture = CompletableFuture.supplyAsync(
5466
() -> {
@@ -76,6 +88,13 @@ public static <I, O> DurableExecutionOutput execute(
7688

7789
// return PENDING if it's SuspendExecutionException
7890
if (cause instanceof SuspendExecutionException) {
91+
fireOnInvocationEnd(
92+
pluginRunner,
93+
requestId,
94+
executionArn,
95+
isFirstInvocation,
96+
InvocationStatus.PENDING,
97+
null);
7998
return DurableExecutionOutput.pending();
8099
}
81100

@@ -85,17 +104,40 @@ public static <I, O> DurableExecutionOutput execute(
85104
UnrecoverableDurableExecutionException
86105
unrecoverableDurableExecutionException
87106
&& unrecoverableDurableExecutionException.isRetryable()) {
107+
fireOnInvocationEnd(
108+
pluginRunner,
109+
requestId,
110+
executionArn,
111+
isFirstInvocation,
112+
InvocationStatus.FAILED,
113+
cause);
88114
throw unrecoverableDurableExecutionException;
89115
}
90116

91117
// fail the execution otherwise
92118
logger.debug("Execution failed: {}", cause.getMessage());
119+
fireOnInvocationEnd(
120+
pluginRunner,
121+
requestId,
122+
executionArn,
123+
isFirstInvocation,
124+
InvocationStatus.FAILED,
125+
cause);
93126
return DurableExecutionOutput.failure(buildErrorObject(cause, config.getSerDes()));
94127
}
95128
// user handler complete successfully
96129
logger.debug("Execution completed");
97130
var outputPayload = config.getSerDes().serialize(result);
98-
return DurableExecutionOutput.success(handleLargePayload(executionManager, outputPayload));
131+
var output =
132+
DurableExecutionOutput.success(handleLargePayload(executionManager, outputPayload));
133+
fireOnInvocationEnd(
134+
pluginRunner,
135+
requestId,
136+
executionArn,
137+
isFirstInvocation,
138+
InvocationStatus.SUCCEEDED,
139+
null);
140+
return output;
99141
})
100142
.join();
101143
} catch (CompletionException e) {
@@ -106,6 +148,16 @@ public static <I, O> DurableExecutionOutput execute(
106148
}
107149
}
108150

151+
private static void fireOnInvocationEnd(
152+
PluginRunner pluginRunner,
153+
String requestId,
154+
String executionArn,
155+
boolean isFirstInvocation,
156+
InvocationStatus status,
157+
Throwable error) {
158+
pluginRunner.onInvocationEnd(new InvocationEndInfo(requestId, executionArn, isFirstInvocation, status, error));
159+
}
160+
109161
private static String handleLargePayload(ExecutionManager executionManager, String outputPayload) {
110162
// Check if the serialized payload exceeds Lambda response size limit
111163
var payloadSize = outputPayload != null ? outputPayload.getBytes(StandardCharsets.UTF_8).length : 0;

sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import software.amazon.lambda.durable.execution.ThreadType;
2424
import software.amazon.lambda.durable.model.OperationIdentifier;
2525
import software.amazon.lambda.durable.model.OperationSubType;
26+
import software.amazon.lambda.durable.plugin.PluginInfoConverter;
27+
import software.amazon.lambda.durable.plugin.PluginRunner;
2628
import software.amazon.lambda.durable.util.ExceptionHelper;
2729

2830
/**
@@ -132,11 +134,15 @@ public void execute() {
132134
if (ExecutionManager.isTerminalStatus(existing.status())) {
133135
replayCompletedOperation.set(true);
134136
}
137+
// Fire onOperationStart plugin hook (including replay)
138+
fireOnOperationStart(existing);
135139
replay(existing);
136140
} else {
137141
if (durableContext.isReplaying()) {
138142
this.durableContext.setExecutionMode();
139143
}
144+
// Fire onOperationStart plugin hook (first execution, no existing operation)
145+
fireOnOperationStart(null);
140146
start();
141147
}
142148
}
@@ -245,13 +251,52 @@ protected Operation waitForOperationCompletion() {
245251
}
246252

247253
protected void runUserHandler(Runnable runnable, ThreadType threadType) {
254+
runUserHandler(runnable, threadType, null);
255+
}
256+
257+
/**
258+
* Runs user code in a separate thread with plugin hook instrumentation.
259+
*
260+
* @param runnable the user code to run
261+
* @param threadType the thread type (STEP or CONTEXT)
262+
* @param attempt the 1-based attempt number for steps/waitForCondition, null for context operations
263+
*/
264+
protected void runUserHandler(Runnable runnable, ThreadType threadType, Integer attempt) {
248265
String operationId = getOperationId();
249266
logger.debug("Starting user handler for operation {} ({})", operationId, threadType);
267+
var pluginRunner = getPluginRunner();
250268
Runnable wrapped = () -> {
251269
executionManager.setCurrentThreadContext(new ThreadContext(operationId, threadType));
270+
271+
// Fire onUserFunctionStart on the user code thread
272+
var userFunctionStartInfo = PluginInfoConverter.toUserFunctionStartInfo(
273+
getOperationId(),
274+
null,
275+
getName(),
276+
getType(),
277+
getSubType(),
278+
durableContext.getParentId(),
279+
null,
280+
durableContext.isReplaying(),
281+
attempt);
282+
pluginRunner.onUserFunctionStart(userFunctionStartInfo);
283+
284+
boolean userFunctionEndFired = false;
252285
try {
253286
runnable.run();
287+
// Fire onUserFunctionEnd on success (on the user code thread)
288+
pluginRunner.onUserFunctionEnd(
289+
PluginInfoConverter.toUserFunctionEndInfo(userFunctionStartInfo, true, null));
290+
userFunctionEndFired = true;
254291
} catch (Throwable throwable) {
292+
// Only fire onUserFunctionEnd for actual user function failures,
293+
// not for SDK control flow signals (SuspendExecutionException)
294+
if (!userFunctionEndFired && !(throwable instanceof SuspendExecutionException)) {
295+
pluginRunner.onUserFunctionEnd(
296+
PluginInfoConverter.toUserFunctionEndInfo(userFunctionStartInfo, false, throwable));
297+
userFunctionEndFired = true;
298+
}
299+
255300
// Operations always wrap the user's function and handles all possible exceptions except for
256301
// SuspendExecutionException.
257302
if (!executionManager.isExecutionCompletedExceptionally()
@@ -310,6 +355,11 @@ public void onCheckpointComplete(Operation operation) {
310355
// handle other updates.
311356
logger.trace("In onCheckpointComplete, completing operation {} ({})", getOperationId(), completionFuture);
312357

358+
// Fire onOperationEnd plugin hook — operation reached terminal status for the first time (not replay)
359+
if (!replayCompletedOperation.get()) {
360+
fireOnOperationEnd(operation, null);
361+
}
362+
313363
markCompletionFutureCompleted();
314364
}
315365
}
@@ -454,4 +504,41 @@ protected void validateReplay(Operation checkpointed) {
454504
public CompletableFuture<Void> getRunningUserHandler() {
455505
return runningUserHandler.get();
456506
}
507+
508+
// ─── Plugin hook helpers ─────────────────────────────────────────────
509+
510+
/** Returns the plugin runner from config, or no-op if config is unavailable. */
511+
private PluginRunner getPluginRunner() {
512+
var config = getContext().getDurableConfig();
513+
return config != null ? config.getPluginRunner() : PluginRunner.noOp();
514+
}
515+
516+
/** Fires onOperationStart plugin hook. */
517+
private void fireOnOperationStart(Operation existing) {
518+
var info = PluginInfoConverter.toOperationInfo(
519+
existing,
520+
getOperationId(),
521+
null,
522+
getName(),
523+
getType(),
524+
getSubType(),
525+
durableContext.getParentId(),
526+
null);
527+
getPluginRunner().onOperationStart(info);
528+
}
529+
530+
/** Fires onOperationEnd plugin hook when an operation reaches terminal status for the first time. */
531+
private void fireOnOperationEnd(Operation operation, Throwable error) {
532+
var info = PluginInfoConverter.toOperationEndInfo(
533+
operation,
534+
getOperationId(),
535+
null,
536+
getName(),
537+
getType(),
538+
getSubType(),
539+
durableContext.getParentId(),
540+
null,
541+
error);
542+
getPluginRunner().onOperationEnd(info);
543+
}
457544
}

sdk/src/main/java/software/amazon/lambda/durable/operation/StepOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ private void executeStepLogic(int attempt) {
119119
};
120120

121121
// Execute user provided step code in user-configured executor
122-
runUserHandler(userHandler, ThreadType.STEP);
122+
runUserHandler(userHandler, ThreadType.STEP, attempt);
123123
}
124124

125125
private void checkpointStarted() {

sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ private void executeCheckLogic(T currentState, int attempt) {
160160
}
161161
};
162162

163-
runUserHandler(userHandler, ThreadType.STEP);
163+
runUserHandler(userHandler, ThreadType.STEP, attempt);
164164
}
165165

166166
private void handleCheckFailure(Throwable exception) {

0 commit comments

Comments
 (0)