Skip to content

Commit 376cdbe

Browse files
committed
on_reply functions moved
1 parent 089770e commit 376cdbe

4 files changed

Lines changed: 155 additions & 158 deletions

File tree

zenoh-jni/src/liveliness.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ use crate::{
2929
errors::ZResult,
3030
key_expr::process_kotlin_key_expr,
3131
owned_object::OwnedObject,
32-
sample_callback::process_kotlin_sample_callback,
33-
session::{on_reply_error, on_reply_success},
32+
sample_callback::{on_reply_error, on_reply_success, process_kotlin_sample_callback},
3433
throw_exception,
3534
utils::{get_callback_global_ref, get_java_vm, load_on_close},
3635
};

zenoh-jni/src/querier.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::{
2525
errors::ZResult,
2626
key_expr::process_kotlin_key_expr,
2727
owned_object::OwnedObject,
28-
session::{on_reply_error, on_reply_success},
28+
sample_callback::{on_reply_error, on_reply_success},
2929
throw_exception,
3030
utils::{
3131
decode_byte_array, decode_encoding, decode_string, get_callback_global_ref, get_java_vm,

zenoh-jni/src/sample_callback.rs

Lines changed: 150 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,157 @@ use jni::{
1919
sys::{jint, jlong},
2020
JNIEnv,
2121
};
22-
use zenoh::{query::{Query, Reply, ReplyKeyExpr}, sample::Sample};
22+
use zenoh::{query::{Query, Reply, ReplyError, ReplyKeyExpr}, sample::Sample, session::EntityGlobalId};
2323

2424
use crate::{errors::ZResult, utils::*};
2525

26+
pub(crate) fn on_reply_success(
27+
env: &mut JNIEnv,
28+
replier_id: Option<EntityGlobalId>,
29+
sample: &Sample,
30+
callback_global_ref: &GlobalRef,
31+
) -> ZResult<()> {
32+
let zenoh_id = replier_id
33+
.map_or_else(
34+
|| Ok(JByteArray::default()),
35+
|replier_id| {
36+
env.byte_array_from_slice(&replier_id.zid().to_le_bytes())
37+
.map_err(|err| zerror!(err))
38+
},
39+
)
40+
.map(|value| env.auto_local(value))?;
41+
let eid = replier_id.map_or_else(|| 0, |replier_id| replier_id.eid() as jint);
42+
43+
let byte_array =
44+
bytes_to_java_array(env, sample.payload()).map(|value| env.auto_local(value))?;
45+
let encoding: jint = sample.encoding().id() as jint;
46+
let encoding_schema = sample
47+
.encoding()
48+
.schema()
49+
.map_or_else(
50+
|| Ok(JString::default()),
51+
|schema| slice_to_java_string(env, schema),
52+
)
53+
.map(|value| env.auto_local(value))?;
54+
let kind = sample.kind() as jint;
55+
56+
let (timestamp, is_valid) = sample
57+
.timestamp()
58+
.map(|timestamp| (timestamp.get_time().as_u64(), true))
59+
.unwrap_or((0, false));
60+
61+
let attachment_bytes = sample
62+
.attachment()
63+
.map_or_else(
64+
|| Ok(JByteArray::default()),
65+
|attachment| bytes_to_java_array(env, attachment),
66+
)
67+
.map(|value| env.auto_local(value))
68+
.map_err(|err| zerror!("Error processing attachment of reply: {}.", err))?;
69+
70+
let key_expr_str = env
71+
.new_string(sample.key_expr().to_string())
72+
.map(|value| env.auto_local(value))
73+
.map_err(|err| {
74+
zerror!(
75+
"Could not create a JString through JNI for the Sample key expression. {}",
76+
err
77+
)
78+
})?;
79+
80+
let express = sample.express();
81+
let priority = sample.priority() as jint;
82+
let cc = sample.congestion_control() as jint;
83+
84+
let result = match env.call_method(
85+
callback_global_ref,
86+
"run",
87+
"([BIZLjava/lang/String;[BILjava/lang/String;IJZ[BZII)V",
88+
&[
89+
JValue::from(&zenoh_id),
90+
JValue::from(eid),
91+
JValue::from(true),
92+
JValue::from(&key_expr_str),
93+
JValue::from(&byte_array),
94+
JValue::from(encoding),
95+
JValue::from(&encoding_schema),
96+
JValue::from(kind),
97+
JValue::from(timestamp as i64),
98+
JValue::from(is_valid),
99+
JValue::from(&attachment_bytes),
100+
JValue::from(express),
101+
JValue::from(priority),
102+
JValue::from(cc),
103+
],
104+
) {
105+
Ok(_) => Ok(()),
106+
Err(err) => {
107+
_ = env.exception_describe();
108+
Err(zerror!("On GET callback error: {}", err))
109+
}
110+
};
111+
result
112+
}
113+
114+
pub(crate) fn on_reply_error(
115+
env: &mut JNIEnv,
116+
replier_id: Option<EntityGlobalId>,
117+
reply_error: &ReplyError,
118+
callback_global_ref: &GlobalRef,
119+
) -> ZResult<()> {
120+
let zenoh_id = replier_id
121+
.map_or_else(
122+
|| Ok(JByteArray::default()),
123+
|replier_id| {
124+
env.byte_array_from_slice(&replier_id.zid().to_le_bytes())
125+
.map_err(|err| zerror!(err))
126+
},
127+
)
128+
.map(|value| env.auto_local(value))?;
129+
let eid = replier_id.map_or_else(|| 0, |replier_id| replier_id.eid() as jint);
130+
131+
let payload =
132+
bytes_to_java_array(env, reply_error.payload()).map(|value| env.auto_local(value))?;
133+
let encoding_id: jint = reply_error.encoding().id() as jint;
134+
let encoding_schema = reply_error
135+
.encoding()
136+
.schema()
137+
.map_or_else(
138+
|| Ok(JString::default()),
139+
|schema| slice_to_java_string(env, schema),
140+
)
141+
.map(|value| env.auto_local(value))?;
142+
let result = match env.call_method(
143+
callback_global_ref,
144+
"run",
145+
"([BIZLjava/lang/String;[BILjava/lang/String;IJZ[BZII)V",
146+
&[
147+
JValue::from(&zenoh_id),
148+
JValue::from(eid),
149+
JValue::from(false),
150+
JValue::from(&JString::default()),
151+
JValue::from(&payload),
152+
JValue::from(encoding_id),
153+
JValue::from(&encoding_schema),
154+
// The remaining parameters aren't used in case of replying error, so we set them to default.
155+
JValue::from(0 as jint),
156+
JValue::from(0_i64),
157+
JValue::from(false),
158+
JValue::from(&JByteArray::default()),
159+
JValue::from(false),
160+
JValue::from(0 as jint),
161+
JValue::from(0 as jint),
162+
],
163+
) {
164+
Ok(_) => Ok(()),
165+
Err(err) => {
166+
_ = env.exception_describe();
167+
Err(zerror!("On GET callback error: {}", err))
168+
}
169+
};
170+
result
171+
}
172+
26173
pub(crate) unsafe fn process_kotlin_sample_callback(
27174
env: &mut JNIEnv,
28175
callback: JObject,
@@ -114,13 +261,13 @@ pub(crate) unsafe fn process_kotlin_reply_callback(
114261
.attach_current_thread_as_daemon()
115262
.map_err(|err| zerror!("Unable to attach thread for GET query callback: {}", err))?;
116263
match reply.result() {
117-
Ok(sample) => crate::session::on_reply_success(
264+
Ok(sample) => crate::sample_callback::on_reply_success(
118265
&mut env,
119266
reply.replier_id(),
120267
sample,
121268
&callback_global_ref,
122269
),
123-
Err(error) => crate::session::on_reply_error(
270+
Err(error) => crate::sample_callback::on_reply_error(
124271
&mut env,
125272
reply.replier_id(),
126273
error,

zenoh-jni/src/session.rs

Lines changed: 3 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,16 @@
1515
use std::{ptr::null, sync::Arc, time::Duration};
1616

1717
use jni::{
18-
objects::{GlobalRef, JByteArray, JClass, JObject, JString, JValue},
18+
objects::{JClass, JObject, JString},
1919
sys::{jboolean, jint, jlong},
2020
JNIEnv,
2121
};
2222
use zenoh::{
2323
config::Config,
2424
key_expr::KeyExpr,
2525
pubsub::{Publisher, Subscriber},
26-
query::{Querier, Queryable, ReplyError},
27-
sample::Sample,
28-
session::{EntityGlobalId, Session},
26+
query::{Querier, Queryable},
27+
session::Session,
2928
Wait,
3029
};
3130

@@ -54,154 +53,6 @@ include!(concat!(env!("OUT_DIR"), "/zenoh_flat_jni.rs"));
5453

5554

5655

57-
pub(crate) fn on_reply_success(
58-
env: &mut JNIEnv,
59-
replier_id: Option<EntityGlobalId>,
60-
sample: &Sample,
61-
callback_global_ref: &GlobalRef,
62-
) -> ZResult<()> {
63-
let zenoh_id = replier_id
64-
.map_or_else(
65-
|| Ok(JByteArray::default()),
66-
|replier_id| {
67-
env.byte_array_from_slice(&replier_id.zid().to_le_bytes())
68-
.map_err(|err| zerror!(err))
69-
},
70-
)
71-
.map(|value| env.auto_local(value))?;
72-
let eid = replier_id.map_or_else(|| 0, |replier_id| replier_id.eid() as jint);
73-
74-
let byte_array =
75-
bytes_to_java_array(env, sample.payload()).map(|value| env.auto_local(value))?;
76-
let encoding: jint = sample.encoding().id() as jint;
77-
let encoding_schema = sample
78-
.encoding()
79-
.schema()
80-
.map_or_else(
81-
|| Ok(JString::default()),
82-
|schema| slice_to_java_string(env, schema),
83-
)
84-
.map(|value| env.auto_local(value))?;
85-
let kind = sample.kind() as jint;
86-
87-
let (timestamp, is_valid) = sample
88-
.timestamp()
89-
.map(|timestamp| (timestamp.get_time().as_u64(), true))
90-
.unwrap_or((0, false));
91-
92-
let attachment_bytes = sample
93-
.attachment()
94-
.map_or_else(
95-
|| Ok(JByteArray::default()),
96-
|attachment| bytes_to_java_array(env, attachment),
97-
)
98-
.map(|value| env.auto_local(value))
99-
.map_err(|err| zerror!("Error processing attachment of reply: {}.", err))?;
100-
101-
let key_expr_str = env
102-
.new_string(sample.key_expr().to_string())
103-
.map(|value| env.auto_local(value))
104-
.map_err(|err| {
105-
zerror!(
106-
"Could not create a JString through JNI for the Sample key expression. {}",
107-
err
108-
)
109-
})?;
110-
111-
let express = sample.express();
112-
let priority = sample.priority() as jint;
113-
let cc = sample.congestion_control() as jint;
114-
115-
let result = match env.call_method(
116-
callback_global_ref,
117-
"run",
118-
"([BIZLjava/lang/String;[BILjava/lang/String;IJZ[BZII)V",
119-
&[
120-
JValue::from(&zenoh_id),
121-
JValue::from(eid),
122-
JValue::from(true),
123-
JValue::from(&key_expr_str),
124-
JValue::from(&byte_array),
125-
JValue::from(encoding),
126-
JValue::from(&encoding_schema),
127-
JValue::from(kind),
128-
JValue::from(timestamp as i64),
129-
JValue::from(is_valid),
130-
JValue::from(&attachment_bytes),
131-
JValue::from(express),
132-
JValue::from(priority),
133-
JValue::from(cc),
134-
],
135-
) {
136-
Ok(_) => Ok(()),
137-
Err(err) => {
138-
_ = env.exception_describe();
139-
Err(zerror!("On GET callback error: {}", err))
140-
}
141-
};
142-
result
143-
}
144-
145-
pub(crate) fn on_reply_error(
146-
env: &mut JNIEnv,
147-
replier_id: Option<EntityGlobalId>,
148-
reply_error: &ReplyError,
149-
callback_global_ref: &GlobalRef,
150-
) -> ZResult<()> {
151-
let zenoh_id = replier_id
152-
.map_or_else(
153-
|| Ok(JByteArray::default()),
154-
|replier_id| {
155-
env.byte_array_from_slice(&replier_id.zid().to_le_bytes())
156-
.map_err(|err| zerror!(err))
157-
},
158-
)
159-
.map(|value| env.auto_local(value))?;
160-
let eid = replier_id.map_or_else(|| 0, |replier_id| replier_id.eid() as jint);
161-
162-
let payload =
163-
bytes_to_java_array(env, reply_error.payload()).map(|value| env.auto_local(value))?;
164-
let encoding_id: jint = reply_error.encoding().id() as jint;
165-
let encoding_schema = reply_error
166-
.encoding()
167-
.schema()
168-
.map_or_else(
169-
|| Ok(JString::default()),
170-
|schema| slice_to_java_string(env, schema),
171-
)
172-
.map(|value| env.auto_local(value))?;
173-
let result = match env.call_method(
174-
callback_global_ref,
175-
"run",
176-
"([BIZLjava/lang/String;[BILjava/lang/String;IJZ[BZII)V",
177-
&[
178-
JValue::from(&zenoh_id),
179-
JValue::from(eid),
180-
JValue::from(false),
181-
JValue::from(&JString::default()),
182-
JValue::from(&payload),
183-
JValue::from(encoding_id),
184-
JValue::from(&encoding_schema),
185-
// The remaining parameters aren't used in case of replying error, so we set them to default.
186-
JValue::from(0 as jint),
187-
JValue::from(0_i64),
188-
JValue::from(false),
189-
JValue::from(&JByteArray::default()),
190-
JValue::from(false),
191-
JValue::from(0 as jint),
192-
JValue::from(0 as jint),
193-
],
194-
) {
195-
Ok(_) => Ok(()),
196-
Err(err) => {
197-
_ = env.exception_describe();
198-
Err(zerror!("On GET callback error: {}", err))
199-
}
200-
};
201-
result
202-
}
203-
204-
20556
/// Declare an advanced Zenoh subscriber via JNI.
20657
///
20758
/// # Parameters:

0 commit comments

Comments
 (0)