Skip to content

Commit c33efc2

Browse files
committed
feat(sdk): Add DurableInstrumentationPlugin interface and plugin runner
1 parent 89d0b6d commit c33efc2

12 files changed

Lines changed: 560 additions & 0 deletions

sdk/src/main/java/software/amazon/lambda/durable/DurableConfig.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.io.IOException;
66
import java.io.InputStream;
77
import java.time.Duration;
8+
import java.util.List;
89
import java.util.Objects;
910
import java.util.Properties;
1011
import java.util.concurrent.ExecutorService;
@@ -21,6 +22,8 @@
2122
import software.amazon.lambda.durable.client.DurableExecutionClient;
2223
import software.amazon.lambda.durable.client.LambdaDurableFunctionsClient;
2324
import software.amazon.lambda.durable.logging.LoggerConfig;
25+
import software.amazon.lambda.durable.plugin.DurableInstrumentationPlugin;
26+
import software.amazon.lambda.durable.plugin.PluginRunner;
2427
import software.amazon.lambda.durable.retry.PollingStrategies;
2528
import software.amazon.lambda.durable.retry.PollingStrategy;
2629
import software.amazon.lambda.durable.serde.JacksonSerDes;
@@ -94,6 +97,7 @@ public final class DurableConfig {
9497
private final LoggerConfig loggerConfig;
9598
private final PollingStrategy pollingStrategy;
9699
private final Duration checkpointDelay;
100+
private final PluginRunner pluginRunner;
97101

98102
private DurableConfig(Builder builder) {
99103
this.durableExecutionClient = Objects.requireNonNullElseGet(
@@ -104,6 +108,7 @@ private DurableConfig(Builder builder) {
104108
this.loggerConfig = Objects.requireNonNullElseGet(builder.loggerConfig, LoggerConfig::defaults);
105109
this.pollingStrategy = Objects.requireNonNullElse(builder.pollingStrategy, PollingStrategies.Presets.DEFAULT);
106110
this.checkpointDelay = Objects.requireNonNullElseGet(builder.checkpointDelay, () -> Duration.ofSeconds(0));
111+
this.pluginRunner = builder.plugins != null ? new PluginRunner(builder.plugins) : PluginRunner.noOp();
107112

108113
validateConfiguration();
109114
}
@@ -180,6 +185,16 @@ public Duration getCheckpointDelay() {
180185
return checkpointDelay;
181186
}
182187

188+
/**
189+
* Gets the plugin runner that dispatches lifecycle events to registered plugins.
190+
*
191+
* @return PluginRunner instance (never null — returns no-op runner if no plugins configured)
192+
* @experimental This method is experimental and may be changed or removed in future releases.
193+
*/
194+
public PluginRunner getPluginRunner() {
195+
return pluginRunner;
196+
}
197+
183198
public void validateConfiguration() {
184199
if (getDurableExecutionClient() == null) {
185200
throw new IllegalStateException("DurableExecutionClient configuration failed");
@@ -274,6 +289,7 @@ public static final class Builder {
274289
private LoggerConfig loggerConfig;
275290
private PollingStrategy pollingStrategy;
276291
private Duration checkpointDelay;
292+
private List<DurableInstrumentationPlugin> plugins;
277293

278294
public Builder() {}
279295

@@ -383,6 +399,32 @@ public Builder withCheckpointDelay(Duration duration) {
383399
return this;
384400
}
385401

402+
/**
403+
* Sets instrumentation plugins for observability and tracing.
404+
*
405+
* <p>Plugins receive lifecycle callbacks at key points during durable execution, enabling integration with
406+
* tracing systems (e.g., OpenTelemetry, X-Ray), custom metrics, and logging enrichment.
407+
*
408+
* <p>Multiple plugins can be provided and will be called in order. Plugin errors are swallowed to prevent
409+
* instrumentation from affecting execution correctness.
410+
*
411+
* <p>Example:
412+
*
413+
* <pre>{@code
414+
* DurableConfig.builder()
415+
* .withPlugins(List.of(new OpenTelemetryDurablePlugin()))
416+
* .build();
417+
* }</pre>
418+
*
419+
* @param plugins list of instrumentation plugins
420+
* @return This builder
421+
* @experimental This method is experimental and may be changed or removed in future releases.
422+
*/
423+
public Builder withPlugins(List<DurableInstrumentationPlugin> plugins) {
424+
this.plugins = plugins;
425+
return this;
426+
}
427+
386428
/**
387429
* Builds the DurableConfig instance.
388430
*
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.plugin;
4+
5+
import java.time.Instant;
6+
7+
/**
8+
* Information provided when an operation attempt ends.
9+
*
10+
* @param id operation ID
11+
* @param name human-readable operation name (may be null)
12+
* @param type operation type
13+
* @param subType operation sub-type (may be null)
14+
* @param parentId parent operation ID (null for root-level operations)
15+
* @param startTimestamp when the attempt started
16+
* @param endTimestamp when the attempt ended
17+
* @param attempt 1-based attempt number
18+
* @param outcome the attempt outcome (SUCCEEDED, FAILED, or RETRYING)
19+
* @param error non-null if the attempt failed
20+
* @param nextAttemptDelaySeconds non-null if outcome is RETRYING
21+
* @experimental This record is experimental and may be changed or removed in future releases.
22+
*/
23+
public record AttemptEndInfo(
24+
String id,
25+
String name,
26+
String type,
27+
String subType,
28+
String parentId,
29+
Instant startTimestamp,
30+
Instant endTimestamp,
31+
int attempt,
32+
AttemptOutcome outcome,
33+
Throwable error,
34+
Integer nextAttemptDelaySeconds) {}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.plugin;
4+
5+
import java.time.Instant;
6+
7+
/**
8+
* Attempt-level information for step retry hooks.
9+
*
10+
* @param id operation ID
11+
* @param name human-readable operation name (may be null)
12+
* @param type operation type
13+
* @param subType operation sub-type (may be null)
14+
* @param parentId parent operation ID (null for root-level operations)
15+
* @param startTimestamp when the attempt started
16+
* @param attempt 1-based attempt number
17+
* @experimental This record is experimental and may be changed or removed in future releases.
18+
*/
19+
public record AttemptInfo(
20+
String id, String name, String type, String subType, String parentId, Instant startTimestamp, int attempt) {}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.plugin;
4+
5+
/**
6+
* Possible outcomes for an operation attempt.
7+
*
8+
* @experimental This enum is experimental and may be changed or removed in future releases.
9+
*/
10+
public enum AttemptOutcome {
11+
SUCCEEDED,
12+
FAILED,
13+
RETRYING
14+
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.plugin;
4+
5+
import java.util.Collections;
6+
import java.util.Map;
7+
import java.util.function.Supplier;
8+
9+
/**
10+
* Plugin interface for instrumenting durable execution lifecycle events.
11+
*
12+
* <p>Implement this interface to integrate observability tools (OpenTelemetry, Datadog, etc.) with the durable
13+
* execution SDK. The SDK calls these hooks at key lifecycle points without requiring modifications to core SDK code.
14+
*
15+
* <p>All methods have default no-op implementations, allowing plugins to override only the hooks they need.
16+
*
17+
* <p>Plugin errors are isolated — exceptions thrown by plugin methods are caught and logged but never disrupt SDK
18+
* execution.
19+
*
20+
* @experimental This interface is experimental and may be changed or removed in future releases.
21+
*/
22+
public interface DurableInstrumentationPlugin {
23+
24+
// ─── Execution-level hooks ───────────────────────────────────────────
25+
26+
/**
27+
* Called once when an execution first starts (not on replay invocations). Use for sampling decisions or
28+
* execution-level span creation.
29+
*/
30+
default void onExecutionStart(InvocationInfo info) {}
31+
32+
/**
33+
* Called when an execution reaches a terminal state (succeeded or failed). Use for writing summary records or
34+
* flushing final data.
35+
*
36+
* <p>This hook is awaited — the SDK blocks until it returns before sending the response.
37+
*/
38+
default void onExecutionEnd(ExecutionEndInfo info) {}
39+
40+
// ─── Invocation-level hooks ──────────────────────────────────────────
41+
42+
/**
43+
* Called at the start of each Lambda invocation. Use to set up per-invocation state (trace ID, invocation span).
44+
*/
45+
default void onInvocationStart(InvocationInfo info) {}
46+
47+
/**
48+
* Wraps the entire invocation execution. Use this to set the active OTel context for the invocation scope.
49+
*
50+
* <p>This hook runs on the same thread as the user handler, making it suitable for {@code Context.makeCurrent()}
51+
* which uses ThreadLocal.
52+
*
53+
* @param info invocation metadata
54+
* @param fn the SDK invocation logic to execute
55+
* @return the result of executing fn
56+
*/
57+
default <T> T wrapInvocation(InvocationInfo info, Supplier<T> fn) {
58+
return fn.get();
59+
}
60+
61+
/**
62+
* Called at the end of each Lambda invocation. Use to flush spans/metrics before Lambda freezes.
63+
*
64+
* <p>This hook is awaited — the SDK blocks until it returns. This is the only safe flush point before Lambda
65+
* freezes the execution environment.
66+
*/
67+
default void onInvocationEnd(InvocationInfo info) {}
68+
69+
// ─── Operation-level hooks ───────────────────────────────────────────
70+
71+
/**
72+
* Called when an operation starts for the first time (not on replay). Use to record when an operation genuinely
73+
* began.
74+
*/
75+
default void onOperationFirstStart(OperationInfo info) {}
76+
77+
/** Called when an operation starts (including replay). Use for logging/metrics that want replay visibility. */
78+
default void onOperationStart(OperationInfo info) {}
79+
80+
/**
81+
* Wraps child context execution (runInChildContext, map iterations, parallel branches). Use this to set the active
82+
* OTel context for child context scopes.
83+
*
84+
* <p>This hook runs on the child context thread, making it suitable for {@code Context.makeCurrent()}.
85+
*
86+
* @param info operation metadata
87+
* @param fn the child context logic to execute
88+
* @return the result of executing fn
89+
*/
90+
default <T> T wrapChildContextFn(OperationInfo info, Supplier<T> fn) {
91+
return fn.get();
92+
}
93+
94+
/**
95+
* Called when an operation completes for the first time (not on replay). The OTel plugin creates the operation span
96+
* here with backfilled start/end timestamps.
97+
*/
98+
default void onOperationFirstEnd(OperationEndInfo info) {}
99+
100+
// ─── Operation Attempt-level hooks ───────────────────────────────────
101+
102+
/** Called before each attempt of a retryable operation (step, waitForCondition). Use to start an attempt span. */
103+
default void onOperationAttemptStart(AttemptInfo info) {}
104+
105+
/**
106+
* Wraps a single operation attempt execution. Use this to set the active OTel context so auto-instrumented API
107+
* calls within the attempt are correctly parented under the attempt span.
108+
*
109+
* <p>This hook runs on the step thread, making it suitable for {@code Context.makeCurrent()}.
110+
*
111+
* @param info attempt metadata
112+
* @param fn the attempt logic to execute
113+
* @return the result of executing fn
114+
*/
115+
default <T> T wrapOperationAttemptFn(AttemptInfo info, Supplier<T> fn) {
116+
return fn.get();
117+
}
118+
119+
/**
120+
* Called after each attempt completes (succeeded, failed, or retrying). Use to end the attempt span and record
121+
* outcome/errors.
122+
*/
123+
default void onOperationAttemptEnd(AttemptEndInfo info) {}
124+
125+
// ─── Utility hooks ───────────────────────────────────────────────────
126+
127+
/**
128+
* Called when a checkpoint response contains operation status changes. Use for real-time dashboards or metrics on
129+
* operation state transitions.
130+
*/
131+
default void onOperationChange(OperationChangeInfo info) {}
132+
133+
/**
134+
* Returns additional key-value pairs to include in SDK log context via MDC. Called on each log emission. Return
135+
* empty map for no enrichment.
136+
*
137+
* <p>Implementations must be thread-safe as this may be called from any thread.
138+
*/
139+
default Map<String, String> enrichLogContext() {
140+
return Collections.emptyMap();
141+
}
142+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.plugin;
4+
5+
import java.util.Map;
6+
import software.amazon.awssdk.services.lambda.model.Operation;
7+
8+
/**
9+
* Information provided when a durable execution ends.
10+
*
11+
* @param requestId the Lambda request ID
12+
* @param executionArn the durable execution ARN
13+
* @param status the execution outcome (SUCCEEDED or FAILED)
14+
* @param executionResult the result if succeeded (may be null)
15+
* @param executionError the error if failed (may be null)
16+
* @param executionInput the original execution input
17+
* @param operations all operations in the execution (final state)
18+
* @experimental This record is experimental and may be changed or removed in future releases.
19+
*/
20+
public record ExecutionEndInfo(
21+
String requestId,
22+
String executionArn,
23+
ExecutionStatus status,
24+
Object executionResult,
25+
Throwable executionError,
26+
Object executionInput,
27+
Map<String, Operation> operations) {}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.plugin;
4+
5+
/**
6+
* Terminal status of a durable execution.
7+
*
8+
* @experimental This enum is experimental and may be changed or removed in future releases.
9+
*/
10+
public enum ExecutionStatus {
11+
SUCCEEDED,
12+
FAILED
13+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.plugin;
4+
5+
/**
6+
* Invocation-level information available to plugin hooks.
7+
*
8+
* @param requestId the Lambda request ID for this invocation
9+
* @param executionArn the durable execution ARN
10+
* @experimental This record is experimental and may be changed or removed in future releases.
11+
*/
12+
public record InvocationInfo(String requestId, String executionArn) {}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
package software.amazon.lambda.durable.plugin;
4+
5+
import java.util.Map;
6+
import software.amazon.awssdk.services.lambda.model.Operation;
7+
8+
/**
9+
* Information about operation state changes from a checkpoint response.
10+
*
11+
* @param requestId the Lambda request ID
12+
* @param executionArn the durable execution ARN
13+
* @param updatedOperations operations whose status changed in this checkpoint response
14+
* @param operations all operations in the execution (current state)
15+
* @experimental This record is experimental and may be changed or removed in future releases.
16+
*/
17+
public record OperationChangeInfo(
18+
String requestId,
19+
String executionArn,
20+
Map<String, Operation> updatedOperations,
21+
Map<String, Operation> operations) {}

0 commit comments

Comments
 (0)