Skip to content

Commit 338e2ad

Browse files
committed
Fix async issue
1 parent c67b15a commit 338e2ad

6 files changed

Lines changed: 111 additions & 15 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/vespera_inprocess/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ tokio = { version = "1", features = ["rt"] }
2020
[dev-dependencies]
2121
criterion = { version = "0.8", features = ["html_reports"] }
2222
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
23+
# `FutureExt::catch_unwind` for the `async_spawn_pattern` bench, which
24+
# A/Bs the vespera_jni `dispatchAsync` spawn-mechanism change (inner
25+
# `tokio::spawn` vs in-place `catch_unwind`).
26+
futures-util = { version = "0.3", default-features = false, features = ["std"] }
2327

2428
[[bench]]
2529
name = "dispatch"

crates/vespera_inprocess/benches/dispatch.rs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
2424
use std::collections::HashMap;
2525
use std::ops::ControlFlow;
26+
use std::panic::AssertUnwindSafe;
2627
use std::sync::Mutex;
2728

2829
use axum::{
@@ -32,6 +33,7 @@ use axum::{
3233
routing::{get, post},
3334
};
3435
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
36+
use futures_util::FutureExt;
3537
use serde::{Deserialize, Serialize};
3638
use tokio::runtime::Runtime;
3739
use vespera_inprocess::{
@@ -485,6 +487,61 @@ fn bench_streaming_path(c: &mut Criterion) {
485487
drop(runtime);
486488
}
487489

490+
/// #2 isolation: the `vespera_jni::dispatchAsync` spawn mechanism.
491+
///
492+
/// Both variants run the dispatch task on a shared multi-thread runtime
493+
/// (the outer `tokio::spawn`, common to both) and differ only in how a
494+
/// panic in the dispatch future is isolated:
495+
///
496+
/// - `double_spawn_pre`: a **second** `tokio::spawn` (panic → `JoinError`),
497+
/// the pre-#2 shape — one extra task allocation + scheduler hop.
498+
/// - `single_spawn_catch_unwind_post`: `FutureExt::catch_unwind` in place,
499+
/// the post-#2 shape — same panic → fallback, no second task.
500+
///
501+
/// The inner future is trivial so the spawn/catch_unwind overhead is the
502+
/// dominant cost and the delta isolates exactly what #2 removes per async
503+
/// dispatch (independent of the dispatch payload size).
504+
fn bench_async_spawn_pattern(c: &mut Criterion) {
505+
let runtime = tokio::runtime::Builder::new_multi_thread()
506+
.worker_threads(4)
507+
.enable_all()
508+
.build()
509+
.expect("multi-thread runtime");
510+
let mut group = c.benchmark_group("async_spawn_pattern");
511+
512+
group.bench_function("double_spawn_pre", |b| {
513+
b.iter(|| {
514+
runtime.block_on(async {
515+
tokio::spawn(async move {
516+
tokio::spawn(async { vec![0u8; 64] })
517+
.await
518+
.unwrap_or_else(|_| vec![1u8; 16])
519+
})
520+
.await
521+
.unwrap()
522+
})
523+
});
524+
});
525+
526+
group.bench_function("single_spawn_catch_unwind_post", |b| {
527+
b.iter(|| {
528+
runtime.block_on(async {
529+
tokio::spawn(async move {
530+
AssertUnwindSafe(async { vec![0u8; 64] })
531+
.catch_unwind()
532+
.await
533+
.unwrap_or_else(|_| vec![1u8; 16])
534+
})
535+
.await
536+
.unwrap()
537+
})
538+
});
539+
});
540+
541+
group.finish();
542+
drop(runtime);
543+
}
544+
488545
criterion_group!(
489546
benches,
490547
bench_router_path,
@@ -493,6 +550,7 @@ criterion_group!(
493550
bench_resolve_path,
494551
bench_contended_path,
495552
bench_headers_path,
496-
bench_streaming_path
553+
bench_streaming_path,
554+
bench_async_spawn_pattern
497555
);
498556
criterion_main!(benches);

crates/vespera_inprocess/src/wire.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -247,14 +247,29 @@ struct WireHeaders<'a>(&'a http::HeaderMap);
247247
impl Serialize for WireHeaders<'_> {
248248
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
249249
use serde::ser::SerializeMap;
250-
// `HeaderMap::keys` yields each distinct name exactly once;
251-
// pre-size to the exact distinct-key count so the collect never
252-
// reallocates.
253-
let mut names: Vec<&str> = Vec::with_capacity(self.0.keys_len());
254-
names.extend(self.0.keys().map(http::HeaderName::as_str));
250+
// `HeaderMap::keys` yields each distinct name exactly once. The
251+
// overwhelmingly common response carries only a handful of header
252+
// names, so sort them in a stack buffer and skip the per-response
253+
// heap `Vec`; header sets larger than the stack cap fall back to a
254+
// heap `Vec`. Output is byte-identical either way (same sorted
255+
// order over the same names), as locked by tests/wire_contract.rs.
256+
const STACK_CAP: usize = 32;
257+
let key_count = self.0.keys_len();
258+
let mut stack_names: [&str; STACK_CAP] = [""; STACK_CAP];
259+
let mut heap_names: Vec<&str>;
260+
let names: &mut [&str] = if key_count <= STACK_CAP {
261+
for (slot, name) in stack_names.iter_mut().zip(self.0.keys()) {
262+
*slot = name.as_str();
263+
}
264+
&mut stack_names[..key_count]
265+
} else {
266+
heap_names = Vec::with_capacity(key_count);
267+
heap_names.extend(self.0.keys().map(http::HeaderName::as_str));
268+
&mut heap_names[..]
269+
};
255270
names.sort_unstable();
256271
let mut map = serializer.serialize_map(Some(names.len()))?;
257-
for name in names {
272+
for &name in names.iter() {
258273
let mut values = self.0.get_all(name).iter();
259274
let first = values
260275
.next()

crates/vespera_jni/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ repository.workspace = true
1010
vespera_inprocess = { workspace = true }
1111
jni = "0.22"
1212
tokio = { version = "1", features = ["rt-multi-thread"] }
13+
# `FutureExt::catch_unwind` for the async dispatch panic-isolation path
14+
# (replaces a redundant second `tokio::spawn`). Already in the workspace
15+
# dependency tree via tokio/axum/tower, so this adds no new crate to the
16+
# build — only `std` is needed for the `catch_unwind` combinator.
17+
futures-util = { version = "0.3", default-features = false, features = ["std"] }
1318
# Optional high-performance global allocator for the final cdylib.
1419
# Opt-in because #[global_allocator] is process-wide and must be the
1520
# embedding crate's decision.

crates/vespera_jni/src/jni_impl.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{cell::RefCell, future::Future, sync::LazyLock};
22

3+
use futures_util::FutureExt;
34
use jni::EnvUnowned;
45
use jni::errors::ThrowRuntimeExAndDefault;
56
use jni::objects::{Global, JByteArray, JByteBuffer, JClass, JObject};
@@ -568,16 +569,27 @@ pub extern "system" fn Java_com_devfive_vespera_bridge_VesperaBridge_dispatchAsy
568569
}
569570
};
570571

571-
// The inner task converts Rust panics into JoinError, preserving
572-
// always-complete semantics for the Java future. Scheduling
573-
// itself is wrapped in `catch_unwind` so a failure to build or
574-
// schedule on the shared runtime completes the future (with a
575-
// 500) instead of leaving the Java caller hanging.
572+
// A panic in the dispatch future is caught **in place** with
573+
// `FutureExt::catch_unwind` instead of isolating it in a second
574+
// `tokio::spawn` task — same panic → 500 wire fallback (preserving
575+
// always-complete semantics for the Java future), but one fewer
576+
// task allocation + scheduler hop per async dispatch. The inner
577+
// spawn never bought parallelism here (the outer task awaited it
578+
// immediately), so it was pure overhead. `AssertUnwindSafe` is
579+
// sound: a panic drops the half-run dispatch and we return a fresh
580+
// `error_wire`; the registered `Router` is `Arc`-shared and is not
581+
// left observably inconsistent. The outer `catch_unwind` still
582+
// guards `RUNTIME.spawn` itself so a scheduling failure completes
583+
// the future (with a 500) instead of leaving the Java caller
584+
// hanging.
576585
let scheduled = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
577586
RUNTIME.spawn(async move {
578-
let response = tokio::spawn(vespera_inprocess::dispatch_from_bytes_async(input))
579-
.await
580-
.unwrap_or_else(|_| vespera_inprocess::error_wire(500, "panic in Rust engine"));
587+
let response = std::panic::AssertUnwindSafe(
588+
vespera_inprocess::dispatch_from_bytes_async(input),
589+
)
590+
.catch_unwind()
591+
.await
592+
.unwrap_or_else(|_| vespera_inprocess::error_wire(500, "panic in Rust engine"));
581593

582594
let _ = with_cached_daemon_env(&jvm, |env| -> jni::errors::Result<()> {
583595
complete_future(env, &future_for_task, &response)

0 commit comments

Comments
 (0)