Skip to content

Commit 256fbf9

Browse files
Support UnresolvedFuture
Support signals Bump test suite to 2.1
1 parent 09ea8d0 commit 256fbf9

19 files changed

Lines changed: 510 additions & 71 deletions

File tree

.github/workflows/integration.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ jobs:
141141

142142
- name: Run test tool
143143
continue-on-error: ${{ inputs.continueOnError == 'true' }}
144-
uses: restatedev/e2e/sdk-tests@v1.0
144+
uses: restatedev/e2e/sdk-tests@v2.1
145145
with:
146146
envVars: ${{ inputs.envVars }}
147147
testArtifactOutput: ${{ inputs.testArtifactOutput != '' && inputs.testArtifactOutput || 'sdk-java-integration-test-report' }}

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ internal constructor(
123123
)
124124
.await()
125125

126-
object : BaseInvocationHandle<Res>(handlerContext, responseSerde) {
126+
object : BaseInvocationHandle<Res>(this, responseSerde) {
127127
override suspend fun invocationId(): String = invocationIdAsyncResult.poll().await()
128128
}
129129
}
@@ -134,7 +134,7 @@ internal constructor(
134134
responseTypeTag: TypeTag<Res>,
135135
): InvocationHandle<Res> =
136136
resolveSerde<Res>(responseTypeTag).let { responseSerde ->
137-
object : BaseInvocationHandle<Res>(handlerContext, responseSerde) {
137+
object : BaseInvocationHandle<Res>(this, responseSerde) {
138138
override suspend fun invocationId(): String = invocationId
139139
}
140140
}
@@ -193,6 +193,14 @@ internal constructor(
193193
return AwakeableHandleImpl(this, id)
194194
}
195195

196+
override suspend fun <T : Any> signal(name: String, typeTag: TypeTag<T>): DurableFuture<T> {
197+
checkNotInsideRun()
198+
val serde: Serde<T> = resolveSerde(typeTag)
199+
return SingleDurableFutureImpl(handlerContext.signal(name).await()).simpleMap {
200+
serde.deserialize(it)
201+
}
202+
}
203+
196204
override fun random(): RestateRandom {
197205
return this.random
198206
}

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,21 @@ sealed interface Context {
201201
*/
202202
fun awakeableHandle(id: String): AwakeableHandle
203203

204+
/**
205+
* Create a [DurableFuture] waiting on a named signal targeting the current invocation.
206+
*
207+
* Signals are identified by `(invocationId, name)`. The resolution can arrive before or after the
208+
* handler starts waiting on the signal — there's no need to pre-register.
209+
*
210+
* Another invocation can resolve or reject the signal using [signalHandle].
211+
*
212+
* @param name the signal name.
213+
* @param typeTag the response type tag to use for deserializing the signal result.
214+
* @return a [DurableFuture] that resolves to the signal value (or rejects with a
215+
* [dev.restate.sdk.common.TerminalException]).
216+
*/
217+
suspend fun <T : Any> signal(name: String, typeTag: TypeTag<T>): DurableFuture<T>
218+
204219
/**
205220
* Create a [RestateRandom] instance inherently predictable, seeded on the
206221
* [dev.restate.sdk.common.InvocationId], which is not secret.
@@ -336,6 +351,15 @@ suspend inline fun <reified T : Any> Context.awakeable(): Awakeable<T> {
336351
return this.awakeable(typeTag<T>())
337352
}
338353

354+
/**
355+
* Create a [DurableFuture] waiting on a named signal targeting the current invocation.
356+
*
357+
* @see Context.signal
358+
*/
359+
suspend inline fun <reified T : Any> Context.signal(name: String): DurableFuture<T> {
360+
return this.signal(name, typeTag<T>())
361+
}
362+
339363
/**
340364
* This interface can be used only within shared handlers of virtual objects. It extends [Context]
341365
* adding access to the virtual object instance key-value state storage.
@@ -629,6 +653,14 @@ sealed interface InvocationHandle<Res : Any?> {
629653

630654
/** @return the output of this invocation, if present. */
631655
suspend fun output(): Output<Res>
656+
657+
/**
658+
* Get a [SignalHandle] for resolving or rejecting a named signal on this invocation. The
659+
* receiving handler can await on the signal using [Context.signal].
660+
*
661+
* @param name the signal name.
662+
*/
663+
suspend fun signal(name: String): SignalHandle
632664
}
633665

634666
/**
@@ -677,6 +709,35 @@ suspend inline fun <reified T : Any> AwakeableHandle.resolve(payload: T) {
677709
return this.resolve(typeTag<T>(), payload)
678710
}
679711

712+
/**
713+
* Handle to resolve or reject a named signal on a target invocation.
714+
*
715+
* Unlike awakeables, signals are identified by `(invocationId, name)` and do not need to be
716+
* pre-registered: the resolution can arrive before or after the handler starts waiting.
717+
*/
718+
sealed interface SignalHandle {
719+
/**
720+
* Resolve the signal with the given value.
721+
*
722+
* @param typeTag used to serialize the result payload.
723+
* @param payload the result payload.
724+
*/
725+
suspend fun <T : Any> resolve(typeTag: TypeTag<T>, payload: T)
726+
727+
/**
728+
* Reject the signal with the given reason. The handler awaiting the signal will receive a
729+
* terminal error with [reason] as the message.
730+
*
731+
* @param reason the rejection reason.
732+
*/
733+
suspend fun reject(reason: String)
734+
}
735+
736+
/** Resolve the signal with the given value. */
737+
suspend inline fun <reified T : Any> SignalHandle.resolve(payload: T) {
738+
return this.resolve(typeTag<T>(), payload)
739+
}
740+
680741
/**
681742
* A [DurablePromise] is a durable, distributed version of a Kotlin's Deferred, or more commonly of
682743
* a future/promise. Restate keeps track of the [DurablePromise] across restarts/failures.
@@ -965,6 +1026,17 @@ suspend fun awakeableHandle(id: String): AwakeableHandle {
9651026
return context().awakeableHandle(id)
9661027
}
9671028

1029+
/**
1030+
* Create a [DurableFuture] waiting on a named signal targeting the current invocation.
1031+
*
1032+
* @throws IllegalStateException if called outside of a Restate handler
1033+
* @see Context.signal
1034+
*/
1035+
@org.jetbrains.annotations.ApiStatus.Experimental
1036+
suspend inline fun <reified T : Any> signal(name: String): DurableFuture<T> {
1037+
return context().signal(name, typeTag<T>())
1038+
}
1039+
9681040
/**
9691041
* Get an [InvocationHandle] for an already existing invocation.
9701042
*

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/futures.kt

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,12 @@ internal constructor(
191191

192192
internal abstract class BaseInvocationHandle<Res>
193193
internal constructor(
194-
private val handlerContext: HandlerContext,
194+
private val contextImpl: ContextImpl,
195195
private val responseSerde: Serde<Res>,
196196
) : InvocationHandle<Res> {
197+
private val handlerContext: HandlerContext
198+
get() = contextImpl.handlerContext
199+
197200
override suspend fun cancel() {
198201
checkNotInsideRun()
199202
val ignored = handlerContext.cancelInvocation(invocationId()).await()
@@ -214,6 +217,11 @@ internal constructor(
214217
.simpleMap { it.map { responseSerde.deserialize(it) } }
215218
.await()
216219
}
220+
221+
override suspend fun signal(name: String): SignalHandle {
222+
val resolvedId = invocationId()
223+
return SignalHandleImpl(contextImpl, resolvedId, name)
224+
}
217225
}
218226

219227
internal class AwakeableImpl<T : Any?>
@@ -237,6 +245,24 @@ internal class AwakeableHandleImpl(val contextImpl: ContextImpl, val id: String)
237245
}
238246
}
239247

248+
internal class SignalHandleImpl(
249+
val contextImpl: ContextImpl,
250+
val invocationId: String,
251+
val name: String,
252+
) : SignalHandle {
253+
override suspend fun <T : Any> resolve(typeTag: TypeTag<T>, payload: T) {
254+
checkNotInsideRun()
255+
contextImpl.handlerContext
256+
.resolveSignal(invocationId, name, contextImpl.resolveAndSerialize(typeTag, payload))
257+
.await()
258+
}
259+
260+
override suspend fun reject(reason: String) {
261+
checkNotInsideRun()
262+
contextImpl.handlerContext.rejectSignal(invocationId, name, TerminalException(reason)).await()
263+
}
264+
}
265+
240266
internal class SelectClauseImpl<T>(override val durableFuture: DurableFuture<T>) : SelectClause<T>
241267

242268
@PublishedApi

sdk-api/src/main/java/dev/restate/sdk/Context.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,36 @@ default <T> Awakeable<T> awakeable(Class<T> clazz) {
478478
*/
479479
AwakeableHandle awakeableHandle(String id);
480480

481+
/**
482+
* Create a {@link DurableFuture} waiting on a named signal targeting the current invocation.
483+
*
484+
* <p>Signals are identified by {@code (invocationId, name)}. The resolution can arrive before or
485+
* after the handler starts waiting on the signal — there's no need to pre-register.
486+
*
487+
* <p>Another invocation can resolve or reject the signal using {@link
488+
* SignalHandle#resolve(TypeTag, Object)} / {@link SignalHandle#reject(String)}.
489+
*
490+
* @param name the signal name.
491+
* @param clazz the response type to use for deserializing the signal result. When using generic
492+
* types, use {@link #signal(String, TypeTag)} instead.
493+
* @return a {@link DurableFuture} that resolves to the signal value (or rejects with a {@link
494+
* TerminalException}).
495+
*/
496+
default <T> DurableFuture<T> signal(String name, Class<T> clazz) {
497+
return signal(name, TypeTag.of(clazz));
498+
}
499+
500+
/**
501+
* Create a {@link DurableFuture} waiting on a named signal targeting the current invocation.
502+
*
503+
* @param name the signal name.
504+
* @param typeTag the response type tag to use for deserializing the signal result.
505+
* @return a {@link DurableFuture} that resolves to the signal value (or rejects with a {@link
506+
* TerminalException}).
507+
* @see #signal(String, Class)
508+
*/
509+
<T> DurableFuture<T> signal(String name, TypeTag<T> typeTag);
510+
481511
/**
482512
* Returns a deterministic random.
483513
*

sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,30 @@ public Output<R> getOutput() {
214214
serviceExecutor)
215215
.await();
216216
}
217+
218+
@Override
219+
public SignalHandle signal(String name) {
220+
String invocationId = invocationId();
221+
return new SignalHandle() {
222+
@Override
223+
public <T> void resolve(TypeTag<T> typeTag, T payload) {
224+
checkNotInsideRun();
225+
Util.awaitCompletableFuture(
226+
handlerContext.resolveSignal(
227+
invocationId,
228+
name,
229+
Util.executeOrFail(
230+
handlerContext, serdeFactory.create(typeTag)::serialize, payload)));
231+
}
232+
233+
@Override
234+
public void reject(String reason) {
235+
checkNotInsideRun();
236+
Util.awaitCompletableFuture(
237+
handlerContext.rejectSignal(invocationId, name, new TerminalException(reason)));
238+
}
239+
};
240+
}
217241
}
218242

219243
@Override
@@ -275,6 +299,15 @@ public void reject(String reason) {
275299
};
276300
}
277301

302+
@Override
303+
public <T> DurableFuture<T> signal(String name, TypeTag<T> typeTag) throws TerminalException {
304+
checkNotInsideRun();
305+
Serde<T> serde = serdeFactory.create(typeTag);
306+
AsyncResult<Slice> result = Util.awaitCompletableFuture(handlerContext.signal(name));
307+
return DurableFuture.fromAsyncResult(result, serviceExecutor)
308+
.mapWithoutExecutor(serde::deserialize);
309+
}
310+
278311
@Override
279312
public RestateRandom random() {
280313
return this.random;

sdk-api/src/main/java/dev/restate/sdk/InvocationHandle.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,12 @@ public interface InvocationHandle<Res> {
3131
* @return the output of this invocation, if present.
3232
*/
3333
Output<Res> getOutput();
34+
35+
/**
36+
* Get a {@link SignalHandle} for resolving or rejecting a named signal on this invocation. The
37+
* receiving handler can await on the signal using {@link Context#signal(String, Class)}.
38+
*
39+
* @param name the signal name.
40+
*/
41+
SignalHandle signal(String name);
3442
}

sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/HandlerContext.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,17 @@ record Awakeable(String id, AsyncResult<Slice> asyncResult) {}
101101

102102
CompletableFuture<AsyncResult<Void>> rejectPromise(String key, TerminalException reason);
103103

104+
// ----- Named signals
105+
//
106+
// Signals are identified by (invocationId, name). Unlike awakeables, signals do not need to be
107+
// pre-registered: the resolution can arrive before or after the handler starts waiting.
108+
109+
CompletableFuture<AsyncResult<Slice>> signal(String name);
110+
111+
CompletableFuture<Void> resolveSignal(String invocationId, String name, Slice payload);
112+
113+
CompletableFuture<Void> rejectSignal(String invocationId, String name, TerminalException reason);
114+
104115
CompletableFuture<Void> cancelInvocation(String invocationId);
105116

106117
CompletableFuture<AsyncResult<Slice>> attachInvocation(String invocationId);

0 commit comments

Comments
 (0)