Skip to content

Commit 488fc1a

Browse files
Fix problems with errors
1 parent f95de18 commit 488fc1a

4 files changed

Lines changed: 135 additions & 34 deletions

File tree

sdk-core/build.gradle.kts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,9 @@ tasks {
221221
withType<org.gradle.jvm.tasks.Jar>().configureEach {
222222
dependsOn(generateJsonSchema2Pojo, generateWasmMarker)
223223
}
224-
withType<AbstractDokkaTask>().configureEach { dependsOn(generateJsonSchema2Pojo, generateWasmMarker) }
224+
withType<AbstractDokkaTask>().configureEach {
225+
dependsOn(generateJsonSchema2Pojo, generateWasmMarker)
226+
}
225227
}
226228

227229
ksp {

sdk-core/src/main/java/dev/restate/sdk/core/WasmStateMachineImpl.java

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import dev.restate.sdk.common.*;
1414
import dev.restate.sdk.core.sharedcore.SharedCoreVM;
1515
import dev.restate.sdk.endpoint.HeadersAccessor;
16+
import java.io.PrintWriter;
17+
import java.io.StringWriter;
1618
import java.time.Duration;
1719
import java.util.*;
1820
import java.util.concurrent.CompletableFuture;
@@ -59,6 +61,14 @@ public void onNext(Slice slice) {
5961

6062
@Override
6163
public void onError(Throwable throwable) {
64+
try {
65+
String message =
66+
throwable.getMessage() != null ? throwable.getMessage() : throwable.getClass().getName();
67+
vm.notifyError(message, stacktraceToString(throwable), null);
68+
pumpOutput();
69+
} catch (Throwable ignored) {
70+
// ignore errors from notifyError — we're already in error handling
71+
}
6272
if (!waitForReadyFuture.isDone()) {
6373
waitForReadyFuture.completeExceptionally(throwable);
6474
}
@@ -194,7 +204,11 @@ private static NotificationValue mapNotificationValue(SharedCoreVM.NotificationV
194204
} else if (raw instanceof SharedCoreVM.NotificationValue.Success s) {
195205
return new NotificationValue.Success(Slice.wrap(s.value()));
196206
} else if (raw instanceof SharedCoreVM.NotificationValue.Failure f) {
197-
return new NotificationValue.Failure(new TerminalException(f.code(), f.message()));
207+
Map<String, String> meta = new LinkedHashMap<>();
208+
if (f.metadata() != null) {
209+
for (String[] pair : f.metadata()) meta.put(pair[0], pair[1]);
210+
}
211+
return new NotificationValue.Failure(new TerminalException(f.code(), f.message(), meta));
198212
} else if (raw instanceof SharedCoreVM.NotificationValue.StateKeys sk) {
199213
return new NotificationValue.StateKeys(sk.keys());
200214
} else if (raw instanceof SharedCoreVM.NotificationValue.InvocationId id) {
@@ -300,7 +314,8 @@ public void completeAwakeable(String awakeableId, Slice value) {
300314

301315
@Override
302316
public void completeAwakeable(String awakeableId, TerminalException exception) {
303-
vm.sysCompleteAwakeableFailure(awakeableId, exception.getCode(), exception.getMessage());
317+
vm.sysCompleteAwakeableFailure(
318+
awakeableId, exception.getCode(), exception.getMessage(), toMetaList(exception));
304319
}
305320

306321
@Override
@@ -317,7 +332,11 @@ public void completeSignal(String targetInvocationId, String signalName, Slice v
317332
public void completeSignal(
318333
String targetInvocationId, String signalName, TerminalException exception) {
319334
vm.sysCompleteSignalFailure(
320-
targetInvocationId, signalName, exception.getCode(), exception.getMessage());
335+
targetInvocationId,
336+
signalName,
337+
exception.getCode(),
338+
exception.getMessage(),
339+
toMetaList(exception));
321340
}
322341

323342
@Override
@@ -337,7 +356,8 @@ public int promiseComplete(String key, Slice value) {
337356

338357
@Override
339358
public int promiseComplete(String key, TerminalException exception) {
340-
return vm.sysPromiseCompleteFailure(key, exception.getCode(), exception.getMessage());
359+
return vm.sysPromiseCompleteFailure(
360+
key, exception.getCode(), exception.getMessage(), toMetaList(exception));
341361
}
342362

343363
@Override
@@ -359,12 +379,13 @@ public void proposeRunCompletion(
359379
@Nullable RetryPolicy retryPolicy) {
360380
SharedCoreVM.WasmRetryPolicy rp = toWasmRetryPolicy(retryPolicy);
361381
if (exception instanceof TerminalException te) {
362-
vm.proposeRunCompletionTerminalFailure(handle, te.getCode(), te.getMessage());
382+
vm.proposeRunCompletionTerminalFailure(handle, te.getCode(), te.getMessage(), toMetaList(te));
363383
} else {
364384
vm.proposeRunCompletionRetryableFailure(
365385
handle,
366386
500,
367387
exception.getMessage() != null ? exception.getMessage() : exception.getClass().getName(),
388+
stacktraceToString(exception),
368389
attemptDuration.toMillis(),
369390
rp);
370391
}
@@ -393,7 +414,7 @@ public void writeOutput(Slice value) {
393414

394415
@Override
395416
public void writeOutput(TerminalException exception) {
396-
vm.sysWriteOutputFailure(exception.getCode(), exception.getMessage());
417+
vm.sysWriteOutputFailure(exception.getCode(), exception.getMessage(), toMetaList(exception));
397418
}
398419

399420
@Override
@@ -435,4 +456,19 @@ public InvocationState state() {
435456
retryPolicy.getMaxAttempts(),
436457
retryPolicy.getMaxDuration() != null ? retryPolicy.getMaxDuration().toMillis() : null);
437458
}
459+
460+
private static @Nullable List<String[]> toMetaList(TerminalException exception) {
461+
Map<String, String> meta = exception.getMetadata();
462+
if (meta == null || meta.isEmpty()) return null;
463+
List<String[]> r = new ArrayList<>(meta.size());
464+
for (Map.Entry<String, String> e : meta.entrySet())
465+
r.add(new String[] {e.getKey(), e.getValue()});
466+
return r;
467+
}
468+
469+
private static String stacktraceToString(Throwable t) {
470+
StringWriter sw = new StringWriter();
471+
t.printStackTrace(new PrintWriter(sw));
472+
return sw.toString();
473+
}
438474
}

sdk-core/src/main/java/dev/restate/sdk/core/sharedcore/SharedCoreVM.java

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,17 @@ public void notifyInputClosed() {
103103
instance.getExports().vmNotifyInputClosed(vmPtr);
104104
}
105105

106+
public void notifyError(
107+
String message, @Nullable String stacktrace, @Nullable Long delayOverrideMillis) {
108+
if (closed) {
109+
return;
110+
}
111+
LOG.trace("[vm=0x{}] notifyError()", Integer.toHexString(vmPtr));
112+
instance.callCborVmFunction(
113+
(exports, ptr, len) -> exports.vmNotifyError(vmPtr, ptr, len),
114+
new VmNotifyError(message, stacktrace, delayOverrideMillis));
115+
}
116+
106117
public Optional<byte[]> takeOutput() {
107118
if (closed) {
108119
return Optional.empty();
@@ -334,7 +345,12 @@ public void sysCompleteAwakeableSuccess(String id, byte[] value) {
334345
}
335346

336347
public void sysCompleteAwakeableFailure(String id, int code, String message) {
337-
sysCompleteAwakeable(id, new NonEmptyValueParam.Failure(code, message));
348+
sysCompleteAwakeable(id, new NonEmptyValueParam.Failure(code, message, null));
349+
}
350+
351+
public void sysCompleteAwakeableFailure(
352+
String id, int code, String message, @Nullable List<String[]> metadata) {
353+
sysCompleteAwakeable(id, new NonEmptyValueParam.Failure(code, message, metadata));
338354
}
339355

340356
public int sysCreateSignalHandle(String signalName) {
@@ -360,7 +376,16 @@ public void sysCompleteSignalSuccess(String target, String signalName, byte[] va
360376
}
361377

362378
public void sysCompleteSignalFailure(String target, String signalName, int code, String message) {
363-
sysCompleteSignal(target, signalName, new NonEmptyValueParam.Failure(code, message));
379+
sysCompleteSignal(target, signalName, new NonEmptyValueParam.Failure(code, message, null));
380+
}
381+
382+
public void sysCompleteSignalFailure(
383+
String target,
384+
String signalName,
385+
int code,
386+
String message,
387+
@Nullable List<String[]> metadata) {
388+
sysCompleteSignal(target, signalName, new NonEmptyValueParam.Failure(code, message, metadata));
364389
}
365390

366391
public int sysPromiseGet(String key) {
@@ -391,7 +416,12 @@ public int sysPromiseCompleteSuccess(String key, byte[] value) {
391416
}
392417

393418
public int sysPromiseCompleteFailure(String key, int code, String message) {
394-
return sysPromiseComplete(key, new NonEmptyValueParam.Failure(code, message));
419+
return sysPromiseComplete(key, new NonEmptyValueParam.Failure(code, message, null));
420+
}
421+
422+
public int sysPromiseCompleteFailure(
423+
String key, int code, String message, @Nullable List<String[]> metadata) {
424+
return sysPromiseComplete(key, new NonEmptyValueParam.Failure(code, message, metadata));
395425
}
396426

397427
public int sysRun(String name) {
@@ -411,20 +441,22 @@ public void proposeRunCompletionSuccess(int handle, byte[] value) {
411441
new VmProposeRunCompletionParameters(handle, new RunResult.Success(value), 0L, null));
412442
}
413443

414-
public void proposeRunCompletionTerminalFailure(int handle, int code, String message) {
444+
public void proposeRunCompletionTerminalFailure(
445+
int handle, int code, String message, @Nullable List<String[]> metadata) {
415446
LOG.trace("[vm=0x{}] proposeRunCompletionTerminalFailure()", Integer.toHexString(vmPtr));
416447
verifyNotClosed();
417448

418449
callWithEmptyReturn(
419450
SharedCoreWasm_ModuleExports::vmProposeRunCompletion,
420451
new VmProposeRunCompletionParameters(
421-
handle, new RunResult.TerminalFailure(code, message), 0L, null));
452+
handle, new RunResult.TerminalFailure(code, message, metadata), 0L, null));
422453
}
423454

424455
public void proposeRunCompletionRetryableFailure(
425456
int handle,
426457
int code,
427458
String message,
459+
@Nullable String stacktrace,
428460
long attemptDurationMillis,
429461
@Nullable WasmRetryPolicy retryPolicy) {
430462
LOG.trace("[vm=0x{}] proposeRunCompletionRetryableFailure()", Integer.toHexString(vmPtr));
@@ -434,7 +466,7 @@ public void proposeRunCompletionRetryableFailure(
434466
SharedCoreWasm_ModuleExports::vmProposeRunCompletion,
435467
new VmProposeRunCompletionParameters(
436468
handle,
437-
new RunResult.RetryableFailure(code, message),
469+
new RunResult.RetryableFailure(code, message, stacktrace),
438470
attemptDurationMillis,
439471
retryPolicy));
440472
}
@@ -479,7 +511,11 @@ public void sysWriteOutputSuccess(byte[] value) {
479511
}
480512

481513
public void sysWriteOutputFailure(int code, String message) {
482-
sysWriteOutput(new NonEmptyValueParam.Failure(code, message));
514+
sysWriteOutput(new NonEmptyValueParam.Failure(code, message, null));
515+
}
516+
517+
public void sysWriteOutputFailure(int code, String message, @Nullable List<String[]> metadata) {
518+
sysWriteOutput(new NonEmptyValueParam.Failure(code, message, metadata));
483519
}
484520

485521
public void sysEnd() {
@@ -615,7 +651,8 @@ record Void() implements NotificationValue {}
615651

616652
record Success(byte[] value) implements NotificationValue {}
617653

618-
record Failure(int code, String message) implements NotificationValue {}
654+
record Failure(int code, String message, @Nullable List<String[]> metadata)
655+
implements NotificationValue {}
619656

620657
record StateKeys(List<String> keys) implements NotificationValue {}
621658

@@ -685,6 +722,9 @@ record Failure(int code, String message) implements SysCallReturn {}
685722
// Field names match Rust struct field names after camelCase renaming.
686723
// =========================================================================
687724

725+
public record VmNotifyError(
726+
String message, @Nullable String stacktrace, @Nullable Long delayOverrideMillis) {}
727+
688728
public record VmNewParameters(List<String[]> headers) {}
689729

690730
public record VmDoProgressParameters(int[] handles) {}
@@ -707,7 +747,8 @@ public record VmSysSleepParameters(
707747
public sealed interface NonEmptyValueParam {
708748
record Success(byte[] value) implements NonEmptyValueParam {}
709749

710-
record Failure(int code, String message) implements NonEmptyValueParam {}
750+
record Failure(int code, String message, @Nullable List<String[]> metadata)
751+
implements NonEmptyValueParam {}
711752
}
712753

713754
public record VmSysCompleteAwakeableParameters(String id, NonEmptyValueParam result) {}
@@ -752,9 +793,11 @@ public record VmSysRunParameters(String name) {}
752793
public sealed interface RunResult {
753794
record Success(byte[] value) implements RunResult {}
754795

755-
record TerminalFailure(int code, String message) implements RunResult {}
796+
record TerminalFailure(int code, String message, @Nullable List<String[]> metadata)
797+
implements RunResult {}
756798

757-
record RetryableFailure(int code, String message) implements RunResult {}
799+
record RetryableFailure(int code, String message, @Nullable String stacktrace)
800+
implements RunResult {}
758801
}
759802

760803
public record VmProposeRunCompletionParameters(

0 commit comments

Comments
 (0)