Skip to content

Commit cd1df5e

Browse files
IMO this fixes suspensions, but let's see
1 parent 146843d commit cd1df5e

1 file changed

Lines changed: 7 additions & 9 deletions

File tree

sdk-core/src/main/java/dev/restate/sdk/core/WasmStateMachineImpl.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,13 @@ private void pumpOutput() {
112112
if (outputSubscriber == null) {
113113
return;
114114
}
115-
Optional<byte[]> chunk = vm.takeOutput();
116-
if (chunk.isEmpty()) {
117-
return;
115+
while (true) {
116+
Optional<byte[]> chunk = vm.takeOutput();
117+
if (chunk.isEmpty()) {
118+
return;
119+
}
120+
outputSubscriber.onNext(Slice.wrap(chunk.get()));
118121
}
119-
outputSubscriber.onNext(Slice.wrap(chunk.get()));
120122
}
121123

122124
private void checkReadyToExecute() {
@@ -162,10 +164,6 @@ private void cancelInputSubscription() {
162164
}
163165
}
164166

165-
// ---------------------------------------------------------------------------
166-
// StateMachine — async results
167-
// ---------------------------------------------------------------------------
168-
169167
@Override
170168
public DoProgressResponse doProgress(List<Integer> handles) {
171169
int[] arr = handles.stream().mapToInt(Integer::intValue).toArray();
@@ -181,7 +179,7 @@ public DoProgressResponse doProgress(List<Integer> handles) {
181179
return new DoProgressResponse.ExecuteRun(r.handle());
182180
} else if (result instanceof SharedCoreVM.DoProgressResult.Suspended) {
183181
pumpOutput();
184-
return DoProgressResponse.WaitExternalProgress.INSTANCE;
182+
ExceptionUtils.sneakyThrow(AbortedExecutionException.INSTANCE);
185183
}
186184
throw new IllegalStateException("Unknown DoProgressResult: " + result);
187185
}

0 commit comments

Comments
 (0)