Skip to content

Commit 6545832

Browse files
committed
on_drop as parameter
1 parent 3667af3 commit 6545832

7 files changed

Lines changed: 121 additions & 57 deletions

File tree

zenoh-flat/src/jni_converter.rs

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,6 @@ pub(crate) struct KotlinConfig {
9393
/// FQN of a singleton referenced inside the generated `init { ... }` block
9494
/// to force native-library loading — `None` disables the `init`.
9595
init_load_fqn: Option<String>,
96-
/// FQN of the on-close callback type (typically
97-
/// `io.zenoh.jni.callbacks.JNIOnCloseCallback`). Used for the synthetic
98-
/// `<name>OnClose` parameter injected for every callback arg.
99-
on_close_callback_fqn: String,
10096
/// Per-source-type Kotlin names (FQN or bare) for struct-decoded args.
10197
struct_kotlin_types: HashMap<String, String>,
10298
/// Per-element-type Kotlin names for callback args (e.g. `Sample` →
@@ -381,13 +377,6 @@ impl Builder {
381377
self
382378
}
383379

384-
/// FQN of the Kotlin on-close callback type used for the synthetic
385-
/// `<name>OnClose` parameter injected for each callback argument.
386-
pub fn kotlin_on_close(mut self, fqn: impl Into<String>) -> Self {
387-
self.kotlin.get_or_insert_with(KotlinConfig::default).on_close_callback_fqn = fqn.into();
388-
self
389-
}
390-
391380
pub fn build(self) -> JniConverter {
392381
JniConverter {
393382
cfg: self,
@@ -408,7 +397,6 @@ impl Default for KotlinConfig {
408397
class_name: String::new(),
409398
throws_class_fqn: None,
410399
init_load_fqn: None,
411-
on_close_callback_fqn: String::new(),
412400
struct_kotlin_types: HashMap::new(),
413401
callback_kotlin_types: HashMap::new(),
414402
return_kotlin_types: HashMap::new(),
@@ -778,11 +766,9 @@ impl JniConverter {
778766
}
779767
}
780768
ArgKind::Callback { decoder, element_type_name } => {
781-
let on_close_ident = format_ident!("{}_on_close", name);
782769
jni_params.push(quote! { #name: jni::objects::JObject });
783-
jni_params.push(quote! { #on_close_ident: jni::objects::JObject });
784770
prelude.push(quote! {
785-
let #name = #decoder(&mut env, #name, #on_close_ident)?;
771+
let #name = #decoder(&mut env, #name)?;
786772
});
787773
call_args.push(quote! { #name });
788774
if let Some(kt) = kt_cfg {
@@ -792,17 +778,11 @@ impl JniConverter {
792778
element_type_name
793779
));
794780
let cb_short = kotlin_register_fqn(&cb_fqn, &mut local_kotlin_fqns);
795-
let oc_short = kotlin_register_fqn(&kt.on_close_callback_fqn, &mut local_kotlin_fqns);
796781
kotlin_params.push(format!(
797782
"{}: {}",
798783
kotlin_param_name(&name.to_string(), false),
799784
cb_short
800785
));
801-
kotlin_params.push(format!(
802-
"{}OnClose: {}",
803-
kotlin_param_name(&name.to_string(), false),
804-
oc_short
805-
));
806786
}
807787
}
808788
ArgKind::OptionString => {
@@ -1209,7 +1189,7 @@ impl JniConverter {
12091189
}
12101190
}
12111191
syn::Type::ImplTrait(it) => {
1212-
if let Some(elem) = extract_fn_single_arg_type_name(&it.bounds) {
1192+
if let Some(elem) = extract_fn_arg_type_name(&it.bounds) {
12131193
if let Some(decoder) = self.cfg.callback_decoders.get(&elem) {
12141194
return ArgKind::Callback {
12151195
decoder: decoder.clone(),
@@ -1308,8 +1288,9 @@ enum ArgKind {
13081288
decoder: syn::Path,
13091289
type_name: String,
13101290
},
1311-
/// `impl Fn(T) + Send + Sync + 'static` → `(JObject callback, JObject on_close)`
1312-
/// pair decoded via a callback decoder registered for `T`.
1291+
/// `impl Fn(T) + Send + Sync + 'static` → single `JObject` decoded via the
1292+
/// callback decoder registered for `T`. Zero-arg `impl Fn() + Send + Sync`
1293+
/// callbacks are looked up under the key `"()"`.
13131294
Callback {
13141295
decoder: syn::Path,
13151296
element_type_name: String,
@@ -1335,9 +1316,11 @@ fn type_last_segment(ty: &syn::Type) -> Option<String> {
13351316
tp.path.segments.last().map(|s| s.ident.to_string())
13361317
}
13371318

1338-
/// Look through the trait bounds of an `impl Fn(T) + ...` for a `Fn`-family
1339-
/// trait and return the last-segment name of its single argument type `T`.
1340-
fn extract_fn_single_arg_type_name(
1319+
/// Look through the trait bounds of an `impl Fn(...) + ...` for a `Fn`-family
1320+
/// trait and return the lookup key for its argument type:
1321+
/// - `impl Fn(T)` → `Some("T")`
1322+
/// - `impl Fn()` → `Some("()")` (zero-arg callbacks, e.g. on-close handlers)
1323+
fn extract_fn_arg_type_name(
13411324
bounds: &syn::punctuated::Punctuated<syn::TypeParamBound, syn::Token![+]>,
13421325
) -> Option<String> {
13431326
for bound in bounds {
@@ -1347,8 +1330,10 @@ fn extract_fn_single_arg_type_name(
13471330
continue;
13481331
}
13491332
let syn::PathArguments::Parenthesized(p) = &seg.arguments else { continue };
1350-
let first = p.inputs.first()?;
1351-
return type_last_segment(first);
1333+
return match p.inputs.first() {
1334+
Some(first) => type_last_segment(first),
1335+
None => Some("()".to_string()),
1336+
};
13521337
}
13531338
None
13541339
}

zenoh-flat/src/session.rs

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,23 @@ use zenoh_ext::{
3535
AdvancedSubscriberBuilderExt,
3636
};
3737

38+
/// Fires `f` exactly once when dropped. Used to bind an `on_close` callback
39+
/// to the lifetime of a data callback closure: when zenoh drops the data
40+
/// closure (subscription/queryable/get teardown), the guard's `Drop` runs
41+
/// `on_close`.
42+
struct CallOnDrop<F: FnOnce()>(core::mem::MaybeUninit<F>);
43+
impl<F: FnOnce()> CallOnDrop<F> {
44+
fn new(f: F) -> Self {
45+
Self(core::mem::MaybeUninit::new(f))
46+
}
47+
}
48+
impl<F: FnOnce()> Drop for CallOnDrop<F> {
49+
fn drop(&mut self) {
50+
let f = unsafe { self.0.assume_init_read() };
51+
f();
52+
}
53+
}
54+
3855
/// Open a Zenoh session using a borrowed configuration.
3956
#[prebindgen_proc_macro::prebindgen("jni")]
4057
pub fn open_session(config: &Config) -> ZResult<Session> {
@@ -134,11 +151,16 @@ pub fn declare_subscriber(
134151
session: &Session,
135152
key_expr: KeyExpr<'static>,
136153
callback: impl Fn(Sample) + Send + Sync + 'static,
154+
on_close: impl Fn() + Send + Sync + 'static,
137155
) -> ZResult<Subscriber<()>> {
138156
let key_expr_string = key_expr.to_string();
157+
let guard = CallOnDrop::new(on_close);
139158
session
140159
.declare_subscriber(key_expr)
141-
.callback(callback)
160+
.callback(move |sample| {
161+
let _ = &guard;
162+
callback(sample);
163+
})
142164
.wait()
143165
.map(|subscriber| {
144166
trace!("Declared subscriber on '{}'.", key_expr_string);
@@ -190,12 +212,17 @@ pub fn declare_queryable(
190212
session: &Session,
191213
key_expr: KeyExpr<'static>,
192214
callback: impl Fn(Query) + Send + Sync + 'static,
215+
on_close: impl Fn() + Send + Sync + 'static,
193216
complete: bool,
194217
) -> ZResult<Queryable<()>> {
195218
let key_expr_string = key_expr.to_string();
219+
let guard = CallOnDrop::new(on_close);
196220
session
197221
.declare_queryable(key_expr)
198-
.callback(callback)
222+
.callback(move |query| {
223+
let _ = &guard;
224+
callback(query);
225+
})
199226
.complete(complete)
200227
.wait()
201228
.map(|queryable| {
@@ -224,6 +251,7 @@ pub fn get(
224251
key_expr: KeyExpr<'static>,
225252
selector_params: Option<String>,
226253
callback: impl Fn(Reply) + Send + Sync + 'static,
254+
on_close: impl Fn() + Send + Sync + 'static,
227255
timeout: Duration,
228256
query_target: QueryTarget,
229257
consolidation: ConsolidationMode,
@@ -237,9 +265,13 @@ pub fn get(
237265
) -> ZResult<()> {
238266
let key_expr_string = key_expr.to_string();
239267
let selector = Selector::owned(&key_expr, selector_params.unwrap_or_default());
268+
let guard = CallOnDrop::new(on_close);
240269
let mut get_builder = session
241270
.get(selector)
242-
.callback(callback)
271+
.callback(move |reply| {
272+
let _ = &guard;
273+
callback(reply);
274+
})
243275
.target(query_target)
244276
.consolidation(consolidation)
245277
.congestion_control(congestion_control)
@@ -386,14 +418,19 @@ pub fn declare_advanced_subscriber(
386418
session: &Session,
387419
key_expr: KeyExpr<'static>,
388420
callback: impl Fn(Sample) + Send + Sync + 'static,
421+
on_close: impl Fn() + Send + Sync + 'static,
389422
history: Option<HistoryConfig>,
390423
recovery: Option<RecoveryConfig>,
391424
subscriber_detection: bool,
392425
) -> ZResult<AdvancedSubscriber<()>> {
393426
let key_expr_string = key_expr.to_string();
427+
let guard = CallOnDrop::new(on_close);
394428
let mut builder = session
395429
.declare_subscriber(key_expr)
396-
.callback(callback)
430+
.callback(move |sample| {
431+
let _ = &guard;
432+
callback(sample);
433+
})
397434
.advanced();
398435
if let Some(history) = history {
399436
builder = builder.history(history.try_into()?);

zenoh-jni/build.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ fn main() {
1818
.kotlin_class("JNISessionNative")
1919
.kotlin_throws("io.zenoh.exceptions.ZError")
2020
.kotlin_init("io.zenoh.ZenohLoad")
21-
.kotlin_on_close("io.zenoh.jni.callbacks.JNIOnCloseCallback")
2221
.enum_decoder(
2322
"CongestionControl",
2423
"crate::utils::decode_congestion_control",
@@ -43,6 +42,11 @@ fn main() {
4342
"crate::sample_callback::process_kotlin_reply_callback",
4443
"io.zenoh.jni.callbacks.JNIGetCallback",
4544
)
45+
.callback_decoder(
46+
"()",
47+
"crate::sample_callback::process_kotlin_on_close_callback",
48+
"io.zenoh.jni.callbacks.JNIOnCloseCallback",
49+
)
4650
.struct_decoder(
4751
"KeyExpr",
4852
"crate::key_expr::decode_jni_key_expr",

zenoh-jni/src/ext/advanced_subscriber.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use zenoh::Wait;
3030

3131
use crate::owned_object::OwnedObject;
3232

33-
use crate::utils::{get_callback_global_ref, get_java_vm, load_on_close};
33+
use crate::utils::{get_callback_global_ref, get_java_vm, load_on_close, wrap_with_on_close};
3434
use std::ptr::null;
3535

3636
use crate::throw_exception;
@@ -138,12 +138,12 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNIAdvancedSubscriber_declareDetectPu
138138
advanced_subscriber.key_expr()
139139
);
140140

141+
let cb = process_kotlin_sample_callback(&mut env, callback)?;
142+
let cb = wrap_with_on_close(&mut env, on_close, cb)?;
141143
let detect_publishers_subscriber = advanced_subscriber
142144
.detect_publishers()
143145
.history(history != 0)
144-
.callback(process_kotlin_sample_callback(
145-
&mut env, callback, on_close,
146-
)?)
146+
.callback(cb)
147147
.wait()
148148
.map_err(|err| zerror!("Unable to declare detect publishers subscriber: {}", err))?;
149149

@@ -196,12 +196,12 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNIAdvancedSubscriber_declareBackgrou
196196
advanced_subscriber.key_expr()
197197
);
198198

199+
let cb = process_kotlin_sample_callback(&mut env, callback)?;
200+
let cb = wrap_with_on_close(&mut env, on_close, cb)?;
199201
advanced_subscriber
200202
.detect_publishers()
201203
.history(history != 0)
202-
.callback(process_kotlin_sample_callback(
203-
&mut env, callback, on_close,
204-
)?)
204+
.callback(cb)
205205
.background()
206206
.wait()
207207
.map_err(|err| {

zenoh-jni/src/liveliness.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::{
3030
owned_object::OwnedObject,
3131
sample_callback::{on_reply_error, on_reply_success, process_kotlin_sample_callback},
3232
throw_exception,
33-
utils::{get_callback_global_ref, get_java_vm, load_on_close},
33+
utils::{get_callback_global_ref, get_java_vm, load_on_close, wrap_with_on_close},
3434
};
3535

3636
#[no_mangle]
@@ -146,13 +146,13 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_declareLivelinessSubscribe
146146
let key_expr = decode_jni_key_expr(&mut env, &key_expr)?;
147147
tracing::debug!("Declaring liveliness subscriber on '{}'...", key_expr);
148148

149+
let cb = process_kotlin_sample_callback(&mut env, callback)?;
150+
let cb = wrap_with_on_close(&mut env, on_close, cb)?;
149151
let subscriber = session
150152
.liveliness()
151153
.declare_subscriber(key_expr.to_owned())
152154
.history(history != 0)
153-
.callback(process_kotlin_sample_callback(
154-
&mut env, callback, on_close,
155-
)?)
155+
.callback(cb)
156156
.wait()
157157
.map_err(|err| zerror!("Unable to declare liveliness subscriber: {}", err))?;
158158

zenoh-jni/src/sample_callback.rs

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -177,15 +177,11 @@ pub(crate) fn on_reply_error(
177177
pub(crate) unsafe fn process_kotlin_sample_callback(
178178
env: &mut JNIEnv,
179179
callback: JObject,
180-
on_close: JObject,
181180
) -> ZResult<impl Fn(Sample) + Send + Sync + 'static> {
182181
let java_vm = Arc::new(get_java_vm(env)?);
183182
let callback_global_ref = get_callback_global_ref(env, callback)?;
184-
let on_close_global_ref = get_callback_global_ref(env, on_close)?;
185-
let on_close = load_on_close(&java_vm, on_close_global_ref);
186183

187184
Ok(move |sample: Sample| {
188-
on_close.noop();
189185
let _ = || -> ZResult<()> {
190186
let mut env = java_vm
191187
.attach_current_thread_as_daemon()
@@ -250,16 +246,12 @@ pub(crate) unsafe fn process_kotlin_sample_callback(
250246
pub(crate) unsafe fn process_kotlin_reply_callback(
251247
env: &mut JNIEnv,
252248
callback: JObject,
253-
on_close: JObject,
254249
) -> ZResult<impl Fn(Reply) + Send + Sync + 'static> {
255250
let java_vm = Arc::new(get_java_vm(env)?);
256251
let callback_global_ref = get_callback_global_ref(env, callback)?;
257-
let on_close_global_ref = get_callback_global_ref(env, on_close)?;
258-
let on_close = load_on_close(&java_vm, on_close_global_ref);
259252

260253
Ok(move |reply: Reply| {
261254
|| -> ZResult<()> {
262-
on_close.noop();
263255
tracing::debug!("Receiving reply through JNI: {:?}", reply);
264256
let mut env = java_vm.attach_current_thread_as_daemon().map_err(|err| {
265257
zerror!("Unable to attach thread for GET query callback: {}", err)
@@ -286,15 +278,11 @@ pub(crate) unsafe fn process_kotlin_reply_callback(
286278
pub(crate) unsafe fn process_kotlin_query_callback(
287279
env: &mut JNIEnv,
288280
callback: JObject,
289-
on_close: JObject,
290281
) -> ZResult<impl Fn(Query) + Send + Sync + 'static> {
291282
let java_vm = Arc::new(get_java_vm(env)?);
292283
let callback_global_ref = get_callback_global_ref(env, callback)?;
293-
let on_close_global_ref = get_callback_global_ref(env, on_close)?;
294-
let on_close = load_on_close(&java_vm, on_close_global_ref);
295284

296285
Ok(move |query: Query| {
297-
on_close.noop();
298286
let env = match java_vm.attach_current_thread_as_daemon() {
299287
Ok(env) => env,
300288
Err(err) => {
@@ -311,6 +299,33 @@ pub(crate) unsafe fn process_kotlin_query_callback(
311299
})
312300
}
313301

302+
/// Decoder for the zero-arg `impl Fn() + Send + Sync + 'static` callback used
303+
/// by zenoh-flat for `on_close` parameters. The returned closure attaches the
304+
/// JVM and invokes `Runnable.run()` on the Kotlin object. zenoh-flat is
305+
/// responsible for binding it to the data closure's lifetime via its own
306+
/// `CallOnDrop` so the on-close fires when the subscription/query is dropped.
307+
pub(crate) unsafe fn process_kotlin_on_close_callback(
308+
env: &mut JNIEnv,
309+
on_close: JObject,
310+
) -> ZResult<impl Fn() + Send + Sync + 'static> {
311+
let java_vm = Arc::new(get_java_vm(env)?);
312+
let on_close_global_ref = get_callback_global_ref(env, on_close)?;
313+
314+
Ok(move || {
315+
let mut env = match java_vm.attach_current_thread_as_daemon() {
316+
Ok(env) => env,
317+
Err(err) => {
318+
tracing::error!("Unable to attach thread for 'onClose' callback: {}", err);
319+
return;
320+
}
321+
};
322+
if let Err(err) = env.call_method(&on_close_global_ref, "run", "()V", &[]) {
323+
_ = env.exception_describe();
324+
tracing::error!("Error while running 'onClose' callback: {}", err);
325+
}
326+
})
327+
}
328+
314329
fn on_query(mut env: JNIEnv, query: Query, callback_global_ref: &GlobalRef) -> ZResult<()> {
315330
let selector_params_jstr = env
316331
.new_string(query.parameters().to_string())

0 commit comments

Comments
 (0)