Skip to content

Commit 14f0172

Browse files
Interceptor API + Tracing integration (#612)
* Add HandlerContext#attemptHeaders and HandlerRequest serviceName(), handlerName(), serviceType(), handlerType() * Add interceptor API in java api, modify a bit the usage of the HandlerRunner.Options * Add interceptor API in Kotlin API, modify a bit the usage of the HandlerRunner.Options * Add sdk-interceptor-micrometer module * Add sdk-interceptor-opentelemetry module * Add support in sdk-spring-boot to autoload micrometer integration * Other changes * Always use the W3C Trace Context Propagator for extracting context in OpenTelemetryInterceptorFactory * Align the two otel interceptor factories to use the same code for creating spans and extracting context. * Align the two micrometer interceptor factories to use the same code for creating spans and extracting context. Plus align the created spans to the same spans created by otel. * Remove service/handler type from HandlerRequest, not needed. * Add RunContextPropagator interface for the java api * Add auto propagation of Micrometer's JDK HttpClient instrumentation for the Restate's Client * Align deps * Fix deprecations * Mark new interfaces as experimental * Remove stuff we dont need from deps
1 parent ec4e244 commit 14f0172

56 files changed

Lines changed: 2358 additions & 189 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

gradle/libs.versions.toml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,36 @@
117117
[libraries.opentelemetry-kotlin.version]
118118
ref = 'opentelemetry'
119119

120+
[libraries.opentelemetry-sdk-testing]
121+
module = 'io.opentelemetry:opentelemetry-sdk-testing'
122+
123+
[libraries.opentelemetry-sdk-testing.version]
124+
ref = 'opentelemetry'
125+
126+
[libraries.micrometer-java11]
127+
module = 'io.micrometer:micrometer-java11'
128+
129+
[libraries.micrometer-java11.version]
130+
ref = 'micrometer'
131+
132+
[libraries.micrometer-observation]
133+
module = 'io.micrometer:micrometer-observation'
134+
135+
[libraries.micrometer-observation.version]
136+
ref = 'micrometer'
137+
138+
[libraries.micrometer-observation-test]
139+
module = 'io.micrometer:micrometer-observation-test'
140+
141+
[libraries.micrometer-observation-test.version]
142+
ref = 'micrometer'
143+
144+
[libraries.micrometer-context-propagation]
145+
module = 'io.micrometer:context-propagation'
146+
147+
[libraries.micrometer-context-propagation.version]
148+
ref = 'micrometer-context-propagation'
149+
120150
[libraries.protobuf-java]
121151
module = 'com.google.protobuf:protobuf-java'
122152

@@ -239,6 +269,8 @@
239269
kotlinx-serialization = '1.9.0'
240270
ksp = '2.2.10-2.0.2'
241271
log4j = '2.24.3'
272+
micrometer = '1.14.14'
273+
micrometer-context-propagation = '1.1.3'
242274
opentelemetry = '1.58.0'
243275
protobuf = '4.29.3'
244276
restate = '2.8.0-SNAPSHOT'

sdk-api-gen/src/main/resources/templates/ServiceDefinitionFactory.hbs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ public class {{generatedClassSimpleName}} implements dev.restate.sdk.endpoint.de
55

66
@java.lang.Override
77
public dev.restate.sdk.endpoint.definition.ServiceDefinition create({{originalClassFqcn}} bindableService, dev.restate.sdk.endpoint.definition.HandlerRunner.Options overrideHandlerOptions) {
8-
dev.restate.sdk.HandlerRunner.Options handlerRunnerOptions = dev.restate.sdk.HandlerRunner.Options.DEFAULT;
8+
dev.restate.sdk.HandlerRunner.Options handlerRunnerOptions = new dev.restate.sdk.HandlerRunner.Options();
99
if (overrideHandlerOptions != null) {
1010
if (overrideHandlerOptions instanceof dev.restate.sdk.HandlerRunner.Options) {
1111
handlerRunnerOptions = (dev.restate.sdk.HandlerRunner.Options)overrideHandlerOptions;

sdk-api-kotlin-gen/src/main/resources/templates/ServiceDefinitionFactory.hbs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ class {{generatedClassSimpleName}}: dev.restate.sdk.endpoint.definition.ServiceD
77
check(overrideHandlerOptions is dev.restate.sdk.kotlin.HandlerRunner.Options)
88
overrideHandlerOptions as dev.restate.sdk.kotlin.HandlerRunner.Options
99
} else {
10-
dev.restate.sdk.kotlin.HandlerRunner.Options.DEFAULT
10+
dev.restate.sdk.kotlin.HandlerRunner.Options()
1111
}
1212

1313
return dev.restate.sdk.endpoint.definition.ServiceDefinition.of(

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import dev.restate.sdk.common.HandlerRequest
1616
import dev.restate.sdk.common.StateKey
1717
import dev.restate.sdk.common.TerminalException
1818
import dev.restate.sdk.endpoint.definition.HandlerContext
19+
import dev.restate.sdk.kotlin.interceptor.RunInterceptor
1920
import dev.restate.sdk.kotlin.internal.InsideRunElement
2021
import dev.restate.sdk.kotlin.internal.InsideRunElement.Key.checkNotInsideRun
2122
import dev.restate.serde.Serde
@@ -32,6 +33,7 @@ internal class ContextImpl
3233
internal constructor(
3334
internal val handlerContext: HandlerContext,
3435
internal val contextSerdeFactory: SerdeFactory,
36+
internal val runInterceptor: RunInterceptor = RunInterceptor { _, next -> next() },
3537
) : WorkflowContext {
3638

3739
internal val random: RestateRandom =
@@ -168,14 +170,19 @@ internal constructor(
168170
handlerContext
169171
.submitRun(name) { completer ->
170172
scope.launch {
171-
val result: Slice?
173+
val resultHolder = java.util.concurrent.atomic.AtomicReference<Slice>()
174+
175+
val closure: suspend () -> Unit = { resultHolder.set(serde.serialize(block())) }
176+
172177
try {
173-
result = serde.serialize(block())
174-
} catch (e: Throwable) {
175-
completer.proposeFailure(e, javaRetryPolicy)
176-
return@launch
178+
runInterceptor.aroundRun(
179+
RunInterceptor.Context(handlerContext.request(), name),
180+
closure,
181+
)
182+
completer.proposeSuccess(resultHolder.get())
183+
} catch (t: Throwable) {
184+
completer.proposeFailure(t, javaRetryPolicy)
177185
}
178-
completer.proposeSuccess(result)
179186
}
180187
}
181188
.await()

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/HandlerRunner.kt

Lines changed: 64 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,14 @@ package dev.restate.sdk.kotlin
1111
import dev.restate.common.Slice
1212
import dev.restate.sdk.common.TerminalException
1313
import dev.restate.sdk.endpoint.definition.HandlerContext
14+
import dev.restate.sdk.kotlin.HandlerRunner.Options.Companion.DEFAULT
15+
import dev.restate.sdk.kotlin.interceptor.HandlerInterceptor
16+
import dev.restate.sdk.kotlin.interceptor.RunInterceptor
1417
import dev.restate.sdk.kotlin.internal.RestateContextElement
1518
import dev.restate.serde.Serde
1619
import dev.restate.serde.SerdeFactory
1720
import io.opentelemetry.extension.kotlin.asContextElement
21+
import java.util.*
1822
import java.util.concurrent.CompletableFuture
1923
import java.util.concurrent.atomic.AtomicReference
2024
import kotlin.coroutines.CoroutineContext
@@ -29,8 +33,12 @@ class HandlerRunner<REQ, RES, CTX : Context>
2933
internal constructor(
3034
private val runner: suspend (CTX, REQ) -> RES,
3135
private val contextSerdeFactory: SerdeFactory,
32-
private val options: Options,
36+
options: Options,
3337
) : dev.restate.sdk.endpoint.definition.HandlerRunner<REQ, RES> {
38+
private val coroutineContext = options.coroutineContext
39+
private val handlerInterceptor =
40+
HandlerInterceptor.Factory.combine(options.handlerInterceptorFactories)
41+
private val runInterceptor = RunInterceptor.Factory.combine(options.runInterceptorFactories)
3442

3543
companion object {
3644
private val LOG = LogManager.getLogger(HandlerRunner::class.java)
@@ -41,7 +49,7 @@ internal constructor(
4149
*/
4250
fun <REQ, RES, CTX : Context> of(
4351
contextSerdeFactory: SerdeFactory,
44-
options: Options = Options.DEFAULT,
52+
options: Options = Options(),
4553
runner: suspend (CTX, REQ) -> RES,
4654
): HandlerRunner<REQ, RES, CTX> {
4755
return HandlerRunner(runner, contextSerdeFactory, options)
@@ -53,7 +61,7 @@ internal constructor(
5361
*/
5462
fun <RES, CTX : Context> of(
5563
contextSerdeFactory: SerdeFactory,
56-
options: Options = Options.DEFAULT,
64+
options: Options = Options(),
5765
runner: suspend (CTX) -> RES,
5866
): HandlerRunner<Unit, RES, CTX> {
5967
return HandlerRunner({ ctx: CTX, _: Unit -> runner(ctx) }, contextSerdeFactory, options)
@@ -65,7 +73,7 @@ internal constructor(
6573
*/
6674
fun <REQ, CTX : Context> ofEmptyReturn(
6775
contextSerdeFactory: SerdeFactory,
68-
options: Options = Options.DEFAULT,
76+
options: Options = Options(),
6977
runner: suspend (CTX, REQ) -> Unit,
7078
): HandlerRunner<REQ, Unit, CTX> {
7179
return HandlerRunner(
@@ -84,7 +92,7 @@ internal constructor(
8492
*/
8593
fun <CTX : Context> ofEmptyReturn(
8694
contextSerdeFactory: SerdeFactory,
87-
options: Options = Options.DEFAULT,
95+
options: Options = Options(),
8896
runner: suspend (CTX) -> Unit,
8997
): HandlerRunner<Unit, Unit, CTX> {
9098
return HandlerRunner(
@@ -104,56 +112,62 @@ internal constructor(
104112
responseSerde: Serde<RES>,
105113
onClosedInvocationStreamHook: AtomicReference<Runnable>,
106114
): CompletableFuture<Slice> {
107-
val ctx: Context = ContextImpl(handlerContext, contextSerdeFactory)
115+
// Interceptor chains were combined once when Options was constructed; reuse them here.
116+
val ctx: Context = ContextImpl(handlerContext, contextSerdeFactory, runInterceptor)
108117

109118
val scope =
110119
CoroutineScope(
111-
options.coroutineContext +
120+
coroutineContext +
112121
RestateContextElement(ctx) +
113122
dev.restate.sdk.endpoint.definition.HandlerRunner.HANDLER_CONTEXT_THREAD_LOCAL
114123
.asContextElement(handlerContext) +
124+
// TODO(tracing-plumbing): deprecate, superseded by sdk-interceptor-opentelemetry
115125
handlerContext.request().openTelemetryContext()!!.asContextElement()
116126
)
117127

118128
val completableFuture = CompletableFuture<Slice>()
119129
val job =
120130
scope.launch {
121-
val serializedResult: Slice
131+
val resultHolder = AtomicReference(Slice.EMPTY)
122132

123-
try {
133+
val userBlock: suspend () -> Unit = {
124134
// Parse input
125-
val req: REQ
126-
try {
127-
req = requestSerde.deserialize(handlerContext.request().body())
128-
} catch (e: Throwable) {
129-
LOG.warn("Error deserializing request", e)
130-
completableFuture.completeExceptionally(
135+
val req: REQ =
136+
try {
137+
requestSerde.deserialize(handlerContext.request().body())
138+
} catch (e: Exception) {
139+
LOG.warn("Error deserializing request", e)
131140
throw TerminalException(
132141
TerminalException.BAD_REQUEST_CODE,
133142
"Cannot deserialize request: " + e.message,
134143
)
135-
)
136-
return@launch
137-
}
144+
}
138145

139-
// Execute user code
146+
// Execute user code. AbortedExecutionException (Throwable, not Exception)
147+
// propagates naturally through this suspend frame.
140148
@Suppress("UNCHECKED_CAST") val res: RES = runner(ctx as CTX, req)
141149

142150
// Serialize output
143151
try {
144-
serializedResult = responseSerde.serialize(res)
145-
} catch (e: Throwable) {
152+
resultHolder.set(responseSerde.serialize(res))
153+
} catch (e: Exception) {
146154
LOG.warn("Error when serializing response", e)
147-
completableFuture.completeExceptionally(e)
148-
return@launch
155+
throw e
149156
}
150-
} catch (e: Throwable) {
151-
completableFuture.completeExceptionally(e)
152-
return@launch
153157
}
154158

155-
// Complete callback
156-
completableFuture.complete(serializedResult)
159+
try {
160+
handlerInterceptor.aroundHandler(
161+
HandlerInterceptor.Context(
162+
handlerContext.request(),
163+
handlerContext.attemptHeaders(),
164+
),
165+
userBlock,
166+
)
167+
completableFuture.complete(resultHolder.get())
168+
} catch (t: Throwable) {
169+
completableFuture.completeExceptionally(t)
170+
}
157171
}
158172
onClosedInvocationStreamHook.set { job.cancel() }
159173

@@ -162,12 +176,30 @@ internal constructor(
162176

163177
/**
164178
* [dev.restate.sdk.kotlin.HandlerRunner] options. You can override the default options to
165-
* configure the [CoroutineContext] to run the handler.
179+
* configure the [CoroutineContext] to run the handler, and to register interceptor factories.
180+
*
181+
* [DEFAULT] picks up any [HandlerInterceptor.Factory] and [RunInterceptor.Factory] registered via
182+
* [java.util.ServiceLoader] on the classpath.
166183
*/
167-
data class Options(val coroutineContext: CoroutineContext) :
168-
dev.restate.sdk.endpoint.definition.HandlerRunner.Options {
184+
class Options(
185+
var coroutineContext: CoroutineContext = Dispatchers.Default,
186+
var handlerInterceptorFactories: MutableList<HandlerInterceptor.Factory> =
187+
SPI_HANDLER_FACTORIES.toMutableList(),
188+
var runInterceptorFactories: MutableList<RunInterceptor.Factory> =
189+
SPI_RUN_FACTORIES.toMutableList(),
190+
) : dev.restate.sdk.endpoint.definition.HandlerRunner.Options {
191+
169192
companion object {
170-
val DEFAULT: Options = Options(Dispatchers.Default)
193+
private val SPI_HANDLER_FACTORIES: List<HandlerInterceptor.Factory> =
194+
ServiceLoader.load(HandlerInterceptor.Factory::class.java).toList()
195+
private val SPI_RUN_FACTORIES: List<RunInterceptor.Factory> =
196+
ServiceLoader.load(RunInterceptor.Factory::class.java).toList()
197+
198+
@kotlin.Deprecated(
199+
message = "Replace it with constructing Options() instead.",
200+
replaceWith = ReplaceWith("Options()"),
201+
)
202+
val DEFAULT: Options = Options()
171203
}
172204
}
173205
}

0 commit comments

Comments
 (0)