Skip to content

Commit eebf37a

Browse files
milyinclaude
andauthored
stabilize accept replies (#432)
* chore: add branch placeholder zbobr_fix-38-fix-issue-431-tls-test * feat: add accept_replies support with ReplyKeyExpr enum Add stable accept_replies API to zenoh-java, matching the stabilized ReplyKeyExpr type from upstream zenoh PR #2443. Changes: - Create ReplyKeyExpr enum (MATCHING_QUERY, ANY) in Kotlin - Add acceptReplies field to GetOptions and QuerierOptions - Pass acceptReplies ordinal through JNI bridge (JNISession.kt) - Add decode_reply_key_expr in Rust utils.rs - Wire accept_replies into both getViaJNI and declareQuerierViaJNI Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat: expose acceptsReplies on Query received by queryables Pass accepts_replies from Rust Query through JNI to the Kotlin Query class, allowing queryable handlers to inspect what reply key expressions the querier accepts. Adds ReplyKeyExpr property to Query constructor, updates JNI callback signature, and adds test coverage. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Auto-commit by worker agent * chore: auto-commit uncommitted changes * fix: stabilize accepts_replies API and clean up unrelated files - Remove gradle wrapper files (gradle-wrapper.jar, gradle-wrapper.properties, gradlew, gradlew.bat) that were added unrelated to task - Delete Cargo.lock to use latest zenoh from main branch - Fix accepts_replies() call in session.rs: API now returns ReplyKeyExpr directly instead of Result<ReplyKeyExpr>, so remove .map_err(...)? chain Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * cargo.lock restored * chore: auto-commit uncommitted changes * test fix after deprecate priority / congestion --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0c2052e commit eebf37a

11 files changed

Lines changed: 206 additions & 113 deletions

File tree

zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,20 +138,22 @@ internal class JNISession(val sessionPtr: Long) {
138138
keyExpr: KeyExpr, callback: Callback<Query>, config: QueryableOptions
139139
): CallbackQueryable {
140140
val queryCallback =
141-
JNIQueryableCallback { keyExpr1: String, selectorParams: String, payload: ByteArray?, encodingId: Int, encodingSchema: String?, attachmentBytes: ByteArray?, queryPtr: Long ->
141+
JNIQueryableCallback { keyExpr1: String, selectorParams: String, payload: ByteArray?, encodingId: Int, encodingSchema: String?, attachmentBytes: ByteArray?, queryPtr: Long, acceptReplies: Int ->
142142
val jniQuery = JNIQuery(queryPtr)
143143
val keyExpr2 = KeyExpr(keyExpr1, null)
144144
val selector = if (selectorParams.isEmpty()) {
145145
Selector(keyExpr2)
146146
} else {
147147
Selector(keyExpr2, Parameters.from(selectorParams))
148148
}
149+
val replyKeyExpr = ReplyKeyExpr.entries[acceptReplies]
149150
val query = Query(
150151
keyExpr2,
151152
selector,
152153
payload?.into(),
153154
payload?.let { Encoding(encodingId, schema = encodingSchema) },
154155
attachmentBytes?.into(),
156+
replyKeyExpr,
155157
jniQuery
156158
)
157159
callback.run(query)
@@ -172,20 +174,22 @@ internal class JNISession(val sessionPtr: Long) {
172174
keyExpr: KeyExpr, handler: Handler<Query, R>, config: QueryableOptions
173175
): HandlerQueryable<R> {
174176
val queryCallback =
175-
JNIQueryableCallback { keyExpr1: String, selectorParams: String, payload: ByteArray?, encodingId: Int, encodingSchema: String?, attachmentBytes: ByteArray?, queryPtr: Long ->
177+
JNIQueryableCallback { keyExpr1: String, selectorParams: String, payload: ByteArray?, encodingId: Int, encodingSchema: String?, attachmentBytes: ByteArray?, queryPtr: Long, acceptReplies: Int ->
176178
val jniQuery = JNIQuery(queryPtr)
177179
val keyExpr2 = KeyExpr(keyExpr1, null)
178180
val selector = if (selectorParams.isEmpty()) {
179181
Selector(keyExpr2)
180182
} else {
181183
Selector(keyExpr2, Parameters.from(selectorParams))
182184
}
185+
val replyKeyExpr = ReplyKeyExpr.entries[acceptReplies]
183186
val query = Query(
184187
keyExpr2,
185188
selector,
186189
payload?.into(),
187190
payload?.let { Encoding(encodingId, schema = encodingSchema) },
188191
attachmentBytes?.into(),
192+
replyKeyExpr,
189193
jniQuery
190194
)
191195
handler.handle(query)
@@ -215,7 +219,8 @@ internal class JNISession(val sessionPtr: Long) {
215219
options.congestionControl.ordinal,
216220
options.priority.ordinal,
217221
options.express,
218-
options.timeout.toMillis()
222+
options.timeout.toMillis(),
223+
options.acceptReplies.ordinal
219224
)
220225
return Querier(
221226
keyExpr,
@@ -290,7 +295,8 @@ internal class JNISession(val sessionPtr: Long) {
290295
options.encoding?.schema,
291296
options.qos.congestionControl.value,
292297
options.qos.priority.value,
293-
options.qos.express
298+
options.qos.express,
299+
options.acceptReplies.ordinal
294300
)
295301
}
296302

@@ -356,7 +362,8 @@ internal class JNISession(val sessionPtr: Long) {
356362
options.encoding?.schema,
357363
options.qos.congestionControl.value,
358364
options.qos.priority.value,
359-
options.qos.express
365+
options.qos.express,
366+
options.acceptReplies.ordinal
360367
)
361368
return handler.receiver()
362369
}
@@ -481,7 +488,8 @@ internal class JNISession(val sessionPtr: Long) {
481488
congestionControl: Int,
482489
priority: Int,
483490
express: Boolean,
484-
timeoutMs: Long
491+
timeoutMs: Long,
492+
acceptReplies: Int
485493
): Long
486494

487495
@Throws(ZError::class)
@@ -508,6 +516,7 @@ internal class JNISession(val sessionPtr: Long) {
508516
congestionControl: Int,
509517
priority: Int,
510518
express: Boolean,
519+
acceptReplies: Int,
511520
)
512521

513522
@Throws(ZError::class)

zenoh-java/src/commonMain/kotlin/io/zenoh/jni/callbacks/JNIQueryableCallback.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ internal fun interface JNIQueryableCallback {
2121
encodingId: Int,
2222
encodingSchema: String?,
2323
attachmentBytes: ByteArray?,
24-
queryPtr: Long)
24+
queryPtr: Long,
25+
acceptReplies: Int)
2526
}

zenoh-java/src/commonMain/kotlin/io/zenoh/query/Get.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import java.time.Duration
3030
* @param encoding Encoding of the payload.
3131
* @param attachment Optional attachment.
3232
* @param qos The intended [QoS] for the query.
33+
* @param acceptReplies The [ReplyKeyExpr] accepted by the query.
3334
*/
3435
data class GetOptions(
3536
var timeout: Duration = Duration.ofMillis(10000),
@@ -38,7 +39,8 @@ data class GetOptions(
3839
var payload: IntoZBytes? = null,
3940
var encoding: Encoding? = null,
4041
var attachment: IntoZBytes? = null,
41-
var qos: QoS = QoS.defaultRequest
42+
var qos: QoS = QoS.defaultRequest,
43+
var acceptReplies: ReplyKeyExpr = ReplyKeyExpr.MATCHING_QUERY
4244
) {
4345
fun setPayload(payload: String) = apply { this.payload = ZBytes.from(payload) }
4446
fun setAttachment(attachment: String) = apply { this.attachment = ZBytes.from(attachment) }

zenoh-java/src/commonMain/kotlin/io/zenoh/query/Querier.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,5 +160,6 @@ data class QuerierOptions(
160160
var timeout: Duration = Duration.ofMillis(10000),
161161
var express: Boolean = QoS.defaultRequest.express,
162162
var congestionControl: CongestionControl = QoS.defaultRequest.congestionControl,
163-
var priority: Priority = QoS.defaultRequest.priority
163+
var priority: Priority = QoS.defaultRequest.priority,
164+
var acceptReplies: ReplyKeyExpr = ReplyKeyExpr.MATCHING_QUERY
164165
)

zenoh-java/src/commonMain/kotlin/io/zenoh/query/Query.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,15 @@ import io.zenoh.sample.SampleKind
3535
* @property payload Optional payload in case the received query was declared using "with query".
3636
* @property encoding Encoding of the [payload].
3737
* @property attachment Optional attachment.
38+
* @property acceptsReplies The [ReplyKeyExpr] indicating what key expressions are accepted in replies.
3839
*/
3940
class Query internal constructor(
4041
val keyExpr: KeyExpr,
4142
val selector: Selector,
4243
val payload: ZBytes?,
4344
val encoding: Encoding?,
4445
val attachment: ZBytes?,
46+
val acceptsReplies: ReplyKeyExpr,
4547
private var jniQuery: JNIQuery?
4648
) : AutoCloseable, ZenohType {
4749

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
//
2+
// Copyright (c) 2026 ZettaScale Technology
3+
//
4+
// This program and the accompanying materials are made available under the
5+
// terms of the Eclipse Public License 2.0 which is available at
6+
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
//
9+
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
//
11+
// Contributors:
12+
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13+
//
14+
15+
package io.zenoh.query
16+
17+
/**
18+
* The key expressions accepted by a query reply.
19+
*
20+
* Controls whether a GET query accepts replies whose key expressions
21+
* don't match the query's key expression.
22+
*/
23+
enum class ReplyKeyExpr {
24+
25+
/**
26+
* Accept replies whose key expressions match the query key expression.
27+
*/
28+
MATCHING_QUERY,
29+
30+
/**
31+
* Accept replies whose key expressions may not match the query key expression.
32+
*/
33+
ANY;
34+
}

zenoh-java/src/jvmTest/java/io/zenoh/QuerierTest.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,12 @@
1414

1515
package io.zenoh;
1616

17-
import io.zenoh.bytes.Encoding;
1817
import io.zenoh.bytes.ZBytes;
1918
import io.zenoh.exceptions.ZError;
2019
import io.zenoh.keyexpr.KeyExpr;
21-
import io.zenoh.qos.CongestionControl;
22-
import io.zenoh.qos.Priority;
23-
import io.zenoh.qos.QoS;
2420
import io.zenoh.query.Querier;
2521
import io.zenoh.query.Reply;
2622
import io.zenoh.query.ReplyOptions;
27-
import io.zenoh.sample.Sample;
2823
import io.zenoh.sample.SampleKind;
2924
import org.apache.commons.net.ntp.TimeStamp;
3025
import org.junit.Test;
@@ -52,15 +47,7 @@ public class QuerierTest {
5247
*/
5348
@Test
5449
public void querier_runsWithCallbackTest() throws ZError, InterruptedException {
55-
var sample = new Sample(
56-
testKeyExpr,
57-
testPayload,
58-
Encoding.defaultEncoding(),
59-
SampleKind.PUT,
60-
new TimeStamp(Date.from(Instant.now())),
61-
new QoS(CongestionControl.BLOCK, Priority.DATA, false),
62-
null
63-
);
50+
var timestamp = new TimeStamp(Date.from(Instant.now()));
6451
var examplePayload = ZBytes.from("Example payload");
6552
var exampleAttachment = ZBytes.from("Example attachment");
6653
var session = Zenoh.open(Config.loadDefault());
@@ -70,9 +57,9 @@ public void querier_runsWithCallbackTest() throws ZError, InterruptedException {
7057
assertEquals(examplePayload, query.getPayload());
7158

7259
var replyOptions = new ReplyOptions();
73-
replyOptions.setTimeStamp(sample.getTimestamp());
60+
replyOptions.setTimeStamp(timestamp);
7461
try {
75-
query.reply(testKeyExpr, sample.getPayload(), replyOptions);
62+
query.reply(testKeyExpr, testPayload, replyOptions);
7663
} catch (ZError e) {
7764
throw new RuntimeException(e);
7865
}
@@ -93,7 +80,11 @@ public void querier_runsWithCallbackTest() throws ZError, InterruptedException {
9380

9481
Thread.sleep(1000);
9582
assertNotNull(receivedReply[0]);
96-
assertEquals(sample, ((Reply.Success) receivedReply[0]).getSample());
83+
var receivedSample = ((Reply.Success) receivedReply[0]).getSample();
84+
assertEquals(testKeyExpr, receivedSample.getKeyExpr());
85+
assertEquals(testPayload, receivedSample.getPayload());
86+
assertEquals(SampleKind.PUT, receivedSample.getKind());
87+
assertEquals(timestamp, receivedSample.getTimestamp());
9788

9889
queryable.close();
9990
querier.close();

zenoh-java/src/jvmTest/java/io/zenoh/QueryableTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public void queryTest() throws ZError, InterruptedException {
9797
assertNull(query.getPayload());
9898
assertNull(query.getEncoding());
9999
assertNull(query.getAttachment());
100+
assertEquals(ReplyKeyExpr.MATCHING_QUERY, query.getAcceptsReplies());
100101

101102
receivedQuery[0] = null;
102103
var payload = ZBytes.from("Test value");
@@ -120,6 +121,24 @@ public void queryTest() throws ZError, InterruptedException {
120121
queryable.close();
121122
}
122123

124+
@Test
125+
public void queryAcceptsRepliesAnyTest() throws ZError, InterruptedException {
126+
Query[] receivedQuery = new Query[1];
127+
var queryable = session.declareQueryable(testKeyExpr, query -> receivedQuery[0] = query);
128+
129+
var getOptions = new GetOptions();
130+
getOptions.setAcceptReplies(ReplyKeyExpr.ANY);
131+
session.get(testKeyExpr, getOptions);
132+
133+
Thread.sleep(100);
134+
135+
Query query = receivedQuery[0];
136+
assertNotNull(query);
137+
assertEquals(ReplyKeyExpr.ANY, query.getAcceptsReplies());
138+
139+
queryable.close();
140+
}
141+
123142
@Test
124143
public void queryReplySuccessTest() throws ZError, InterruptedException {
125144
var message = ZBytes.from("Test message");

0 commit comments

Comments
 (0)