Skip to content

Commit 42cf62f

Browse files
Bunch of changes
1 parent cd1df5e commit 42cf62f

5 files changed

Lines changed: 23 additions & 50 deletions

File tree

sdk-core/src/main/java/dev/restate/sdk/core/HandlerContextImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -362,8 +362,7 @@ public CompletableFuture<Void> writeOutput(TerminalException throwable) {
362362

363363
@Override
364364
public void pollAsyncResult(AsyncResultInternal<?> asyncResult) {
365-
// We use the separate function for the recursion,
366-
// as there's no need to jump back and forth between threads again.
365+
this.stateMachine.pumpOutput();
367366
this.pollAsyncResultInner(asyncResult);
368367
}
369368

@@ -400,10 +399,12 @@ private void pollAsyncResultInner(AsyncResultInternal<?> asyncResult) {
400399
if (response instanceof StateMachine.DoProgressResponse.AnyCompleted) {
401400
// Let it loop now
402401
} else if (response instanceof StateMachine.DoProgressResponse.WaitExternalProgress) {
402+
this.stateMachine.pumpOutput();
403403
this.stateMachine.onExternalProgress(() -> this.pollAsyncResultInner(asyncResult));
404404
return;
405405
} else if (response instanceof StateMachine.DoProgressResponse.CancelSignalReceived) {
406406
asyncResult.tryCancel();
407+
return;
407408
} else if (response instanceof StateMachine.DoProgressResponse.ExecuteRun) {
408409
triggerScheduledRun(((StateMachine.DoProgressResponse.ExecuteRun) response).handle());
409410
// Let it loop now

sdk-core/src/main/java/dev/restate/sdk/core/StateMachine.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ static StateMachine init(
4545

4646
void onExternalProgress(Runnable runnable);
4747

48+
// --- Output pump (mirrors TS OutputPump.awaitNextProgress — drains ONE chunk)
49+
50+
void pumpOutput();
51+
4852
// --- Async results
4953

5054
sealed interface DoProgressResponse {

sdk-core/src/main/java/dev/restate/sdk/core/WasmStateMachineImpl.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,16 @@ public void cancel() {
108108
});
109109
}
110110

111-
private void pumpOutput() {
111+
@Override
112+
public void pumpOutput() {
112113
if (outputSubscriber == null) {
113114
return;
114115
}
115-
while (true) {
116-
Optional<byte[]> chunk = vm.takeOutput();
117-
if (chunk.isEmpty()) {
118-
return;
119-
}
120-
outputSubscriber.onNext(Slice.wrap(chunk.get()));
116+
byte[] chunk = vm.takeOutput();
117+
if (chunk.length == 0) {
118+
return;
121119
}
120+
outputSubscriber.onNext(Slice.wrap(chunk));
122121
}
123122

124123
private void checkReadyToExecute() {
@@ -171,14 +170,12 @@ public DoProgressResponse doProgress(List<Integer> handles) {
171170
if (result instanceof SharedCoreVM.DoProgressResult.AnyCompleted) {
172171
return DoProgressResponse.AnyCompleted.INSTANCE;
173172
} else if (result instanceof SharedCoreVM.DoProgressResult.WaitExternalProgress) {
174-
pumpOutput();
175173
return DoProgressResponse.WaitExternalProgress.INSTANCE;
176174
} else if (result instanceof SharedCoreVM.DoProgressResult.CancelSignalReceived) {
177175
return DoProgressResponse.CancelSignalReceived.INSTANCE;
178176
} else if (result instanceof SharedCoreVM.DoProgressResult.ExecuteRun r) {
179177
return new DoProgressResponse.ExecuteRun(r.handle());
180178
} else if (result instanceof SharedCoreVM.DoProgressResult.Suspended) {
181-
pumpOutput();
182179
ExceptionUtils.sneakyThrow(AbortedExecutionException.INSTANCE);
183180
}
184181
throw new IllegalStateException("Unknown DoProgressResult: " + result);
@@ -419,7 +416,7 @@ public void writeOutput(TerminalException exception) {
419416
public void end() {
420417
try {
421418
vm.sysEnd();
422-
pumpOutput();
419+
this.pumpOutput();
423420
} finally {
424421
if (outputSubscriber != null) {
425422
outputSubscriber.onComplete();

sdk-core/src/main/java/dev/restate/sdk/core/sharedcore/SharedCoreVM.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -114,16 +114,14 @@ public void notifyError(
114114
new VmNotifyError(message, stacktrace, delayOverrideMillis));
115115
}
116116

117-
public Optional<byte[]> takeOutput() {
117+
public byte[] takeOutput() {
118118
if (closed) {
119-
return Optional.empty();
119+
return new byte[0];
120120
}
121121
LOG.trace("[vm=0x{}] takeOutput()", Integer.toHexString(vmPtr));
122122

123-
var ret =
124-
instance.callCborVmFunction(exports -> exports.vmTakeOutput(vmPtr), TakeOutputReturn.class);
125-
if (ret instanceof TakeOutputReturn.Eof) return Optional.empty();
126-
return Optional.of(((TakeOutputReturn.Buffer) ret).bytes());
123+
var ptr = instance.getExports().vmTakeOutput(vmPtr);
124+
return instance.readAndFree(ptr);
127125
}
128126

129127
public String getResponseContentType() {
@@ -583,17 +581,6 @@ record Ok(int handle) implements HandleReturn {}
583581
record Failure(int code, String message) implements HandleReturn {}
584582
}
585583

586-
@JsonTypeInfo(use = Id.NAME, include = As.PROPERTY, property = "type")
587-
@JsonSubTypes({
588-
@JsonSubTypes.Type(value = TakeOutputReturn.Buffer.class, name = "buffer"),
589-
@JsonSubTypes.Type(value = TakeOutputReturn.Eof.class, name = "eof"),
590-
})
591-
public sealed interface TakeOutputReturn {
592-
record Buffer(byte[] bytes) implements TakeOutputReturn {}
593-
594-
record Eof() implements TakeOutputReturn {}
595-
}
596-
597584
@JsonTypeInfo(use = Id.NAME, include = As.PROPERTY, property = "type")
598585
@JsonSubTypes({
599586
@JsonSubTypes.Type(value = IsReadyReturn.Ok.class, name = "ok"),

sdk-core/src/main/rust/src/lib.rs

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,11 @@ fn vm_notify_error(rc_vm: &Rc<RefCell<WasmVM>>, input: VmNotifyError) {
252252
#[export_name = "vm_take_output"]
253253
pub unsafe extern "C" fn _vm_take_output(vm_pointer: *const RefCell<WasmVM>) -> u64 {
254254
let rc_vm = vm_ptr_to_rc(vm_pointer);
255-
let res: TakeOutputReturn = VM::take_output(&mut rc_vm.borrow_mut().vm).into();
256-
output_to_ptr(res)
255+
let res: Vec<u8> = match VM::take_output(&mut rc_vm.borrow_mut().vm) {
256+
TakeOutputResult::Buffer(b) => b.to_vec(),
257+
TakeOutputResult::EOF => Vec::default(),
258+
};
259+
vec_to_ptr(res)
257260
}
258261

259262
#[export_name = "vm_is_ready_to_execute"]
@@ -1224,16 +1227,6 @@ enum HandleReturn {
12241227
Failure { code: u32, message: String },
12251228
}
12261229

1227-
#[derive(Serialize)]
1228-
#[serde(tag = "type", rename_all = "camelCase", rename_all_fields = "camelCase")]
1229-
enum TakeOutputReturn {
1230-
Buffer {
1231-
#[serde(with = "serde_bytes")]
1232-
bytes: Vec<u8>,
1233-
},
1234-
Eof,
1235-
}
1236-
12371230
#[derive(Serialize)]
12381231
#[serde(tag = "type", rename_all = "camelCase", rename_all_fields = "camelCase")]
12391232
enum IsReadyReturn {
@@ -1418,15 +1411,6 @@ impl From<Result<NotificationHandle, Error>> for HandleReturn {
14181411
}
14191412
}
14201413

1421-
impl From<TakeOutputResult> for TakeOutputReturn {
1422-
fn from(value: TakeOutputResult) -> Self {
1423-
match value {
1424-
TakeOutputResult::Buffer(b) => TakeOutputReturn::Buffer { bytes: b.to_vec() },
1425-
TakeOutputResult::EOF => TakeOutputReturn::Eof,
1426-
}
1427-
}
1428-
}
1429-
14301414
impl From<ResponseHead> for ResponseHeadReturn {
14311415
fn from(head: ResponseHead) -> Self {
14321416
ResponseHeadReturn {

0 commit comments

Comments
 (0)