Skip to content

Commit 8b58b00

Browse files
Progress
1 parent 1e5875f commit 8b58b00

16 files changed

Lines changed: 710 additions & 1124 deletions

File tree

sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/HandlerContext.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,10 @@ public interface HandlerContext {
4040
// ----- IO
4141
// Note: These are not supposed to be exposed in the user's facing Context API.
4242

43+
@Deprecated(forRemoval = true)
4344
CompletableFuture<Void> writeOutput(Slice value);
4445

46+
@Deprecated(forRemoval = true)
4547
CompletableFuture<Void> writeOutput(TerminalException exception);
4648

4749
// ----- State

sdk-core/src/main/java/dev/restate/sdk/core/AsyncResults.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import dev.restate.common.function.ThrowingFunction;
1212
import dev.restate.sdk.common.AbortedExecutionException;
1313
import dev.restate.sdk.common.TerminalException;
14+
import dev.restate.sdk.core.sharedcore.StateMachine;
1415
import dev.restate.sdk.endpoint.definition.AsyncResult;
1516
import java.util.*;
1617
import java.util.concurrent.CompletableFuture;
@@ -21,7 +22,12 @@ abstract class AsyncResults {
2122

2223
@FunctionalInterface
2324
interface Completer<T> {
24-
void complete(NotificationValue value, CompletableFuture<T> future);
25+
void complete(StateMachine.NotificationValue value, CompletableFuture<T> future);
26+
}
27+
28+
@FunctionalInterface
29+
interface NotificationReader {
30+
java.util.Optional<StateMachine.NotificationValue> take(int handle);
2531
}
2632

2733
private AsyncResults() {}
@@ -46,7 +52,7 @@ interface AsyncResultInternal<T> extends AsyncResult<T> {
4652

4753
void tryCancel();
4854

49-
void tryComplete(StateMachine stateMachine);
55+
void tryComplete(NotificationReader reader);
5056

5157
CompletableFuture<T> publicFuture();
5258

@@ -109,9 +115,9 @@ public void tryCancel() {
109115
}
110116

111117
@Override
112-
public void tryComplete(StateMachine stateMachine) {
113-
stateMachine
114-
.takeNotification(handle)
118+
public void tryComplete(NotificationReader reader) {
119+
reader
120+
.take(handle)
115121
.ifPresent(
116122
value -> {
117123
try {
@@ -159,8 +165,8 @@ public void tryCancel() {
159165
}
160166

161167
@Override
162-
public void tryComplete(StateMachine stateMachine) {
163-
asyncResult.tryComplete(stateMachine);
168+
public void tryComplete(NotificationReader reader) {
169+
asyncResult.tryComplete(reader);
164170
}
165171

166172
@Override
@@ -273,8 +279,8 @@ public void tryCancel() {
273279
}
274280

275281
@Override
276-
public void tryComplete(StateMachine stateMachine) {
277-
asyncResults.forEach(ar -> ar.tryComplete(stateMachine));
282+
public void tryComplete(NotificationReader reader) {
283+
asyncResults.forEach(ar -> ar.tryComplete(reader));
278284
for (int i = 0; i < asyncResults.size(); i++) {
279285
if (asyncResults.get(i).isDone()) {
280286
publicFuture.complete(i);
@@ -320,8 +326,8 @@ public void tryCancel() {
320326
}
321327

322328
@Override
323-
public void tryComplete(StateMachine stateMachine) {
324-
asyncResults.forEach(ar -> ar.tryComplete(stateMachine));
329+
public void tryComplete(NotificationReader reader) {
330+
asyncResults.forEach(ar -> ar.tryComplete(reader));
325331
asyncResults.stream()
326332
.filter(ar -> ar.publicFuture().isCompletedExceptionally())
327333
.findFirst()

sdk-core/src/main/java/dev/restate/sdk/core/EndpointRequestHandler.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import dev.restate.common.Slice;
1212
import dev.restate.sdk.core.generated.manifest.EndpointManifestSchema;
1313
import dev.restate.sdk.core.generated.manifest.Service;
14+
import dev.restate.sdk.core.sharedcore.StateMachine;
1415
import dev.restate.sdk.endpoint.Endpoint;
1516
import dev.restate.sdk.endpoint.HeadersAccessor;
1617
import dev.restate.sdk.endpoint.definition.HandlerDefinition;
@@ -177,9 +178,6 @@ public RequestProcessor processorForRequest(
177178
loggingContextSetter.set(LoggingContextSetter.INVOCATION_ID_KEY, invocationIdHeader);
178179
}
179180

180-
// Instantiate state machine
181-
StateMachine stateMachine = StateMachine.init(headersAccessor, loggingContextSetter);
182-
183181
// Resolve the service method definition
184182
ServiceDefinition svc = this.endpoint.resolveService(serviceName);
185183
if (svc == null) {
@@ -190,7 +188,7 @@ public RequestProcessor processorForRequest(
190188
throw ProtocolException.methodNotFound(serviceName, handlerName);
191189
}
192190

193-
// Parse OTEL context and generate span
191+
// Parse OTEL context
194192
final io.opentelemetry.context.Context otelContext =
195193
this.endpoint
196194
.getOpenTelemetry()
@@ -199,25 +197,16 @@ public RequestProcessor processorForRequest(
199197
.extract(
200198
io.opentelemetry.context.Context.current(), headersAccessor, OTEL_HEADERS_GETTER);
201199

202-
// Generate the span
203-
// Span span =
204-
// tracer
205-
// .spanBuilder("Invoke handler")
206-
// .setSpanKind(SpanKind.SERVER)
207-
// .setParent(otelContext)
208-
// .startSpan();
209-
210200
// Setup logging context
211201
loggingContextSetter.set(
212202
LoggingContextSetter.INVOCATION_TARGET_KEY, fullyQualifiedServiceMethod);
213203

214204
return new RequestProcessorImpl(
205+
StateMachine.create(headersAccessor),
215206
fullyQualifiedServiceMethod,
216-
stateMachine,
217207
svc.getServiceType(),
218208
handler,
219209
otelContext,
220-
loggingContextSetter,
221210
coreExecutor);
222211
}
223212

sdk-core/src/main/java/dev/restate/sdk/core/ExceptionUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public static Optional<ProtocolException> findProtocolException(Throwable throwa
5353
return findCause(throwable, t -> t instanceof ProtocolException);
5454
}
5555

56-
public static boolean containsSuspendedException(Throwable throwable) {
56+
public static boolean containsAbortedExecutionException(Throwable throwable) {
5757
return findCause(throwable, t -> t == AbortedExecutionException.INSTANCE).isPresent();
5858
}
5959

sdk-core/src/main/java/dev/restate/sdk/core/ExecutorSwitchingHandlerContextImpl.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import dev.restate.common.Slice;
1313
import dev.restate.common.Target;
1414
import dev.restate.sdk.common.*;
15+
import dev.restate.sdk.core.sharedcore.StateMachine;
1516
import dev.restate.sdk.endpoint.definition.AsyncResult;
1617
import dev.restate.sdk.endpoint.definition.HandlerType;
1718
import dev.restate.sdk.endpoint.definition.ServiceType;
@@ -31,14 +32,24 @@ final class ExecutorSwitchingHandlerContextImpl extends HandlerContextImpl {
3132
private final Executor coreExecutor;
3233

3334
ExecutorSwitchingHandlerContextImpl(
35+
StateMachine vm,
36+
ExternalProgressChannel externalProgressChannel,
37+
Consumer<Slice> outputSink,
3438
String fullyQualifiedHandlerName,
3539
ServiceType serviceType,
3640
@Nullable HandlerType handlerType,
37-
StateMachine stateMachine,
3841
Context otelContext,
3942
StateMachine.Input input,
4043
Executor coreExecutor) {
41-
super(fullyQualifiedHandlerName, serviceType, handlerType, stateMachine, otelContext, input);
44+
super(
45+
vm,
46+
externalProgressChannel,
47+
outputSink,
48+
fullyQualifiedHandlerName,
49+
serviceType,
50+
handlerType,
51+
otelContext,
52+
input);
4253
this.coreExecutor = coreExecutor;
4354
}
4455

@@ -170,23 +181,20 @@ public void pollAsyncResult(AsyncResults.AsyncResultInternal<?> asyncResult) {
170181
coreExecutor.execute(() -> super.pollAsyncResult(asyncResult));
171182
}
172183

184+
@Deprecated
173185
@Override
174186
public CompletableFuture<Void> writeOutput(Slice value) {
175187
return CompletableFuture.supplyAsync(() -> super.writeOutput(value), coreExecutor)
176188
.thenCompose(Function.identity());
177189
}
178190

191+
@Deprecated
179192
@Override
180193
public CompletableFuture<Void> writeOutput(TerminalException throwable) {
181194
return CompletableFuture.supplyAsync(() -> super.writeOutput(throwable), coreExecutor)
182195
.thenCompose(Function.identity());
183196
}
184197

185-
@Override
186-
public void close() {
187-
coreExecutor.execute(super::close);
188-
}
189-
190198
@Override
191199
public void fail(Throwable cause) {
192200
coreExecutor.execute(() -> super.fail(cause));
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
2+
//
3+
// This file is part of the Restate Java SDK,
4+
// which is released under the MIT license.
5+
//
6+
// You can find a copy of the license in file LICENSE in the root
7+
// directory of this repository or package, or at
8+
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
9+
package dev.restate.sdk.core;
10+
11+
import org.jspecify.annotations.Nullable;
12+
13+
final class ExternalProgressChannel {
14+
15+
private int pending = 0;
16+
private @Nullable Runnable waiter;
17+
18+
void signal() {
19+
if (waiter != null) {
20+
Runnable w = waiter;
21+
waiter = null;
22+
w.run();
23+
} else {
24+
pending++;
25+
}
26+
}
27+
28+
void awaitNext(Runnable callback) {
29+
if (waiter != null) {
30+
throw new IllegalStateException("awaitNext already pending");
31+
}
32+
if (pending > 0) {
33+
pending--;
34+
callback.run();
35+
return;
36+
}
37+
waiter = callback;
38+
}
39+
}

0 commit comments

Comments
 (0)