Skip to content

Commit b1685fb

Browse files
ArtDuclaude
andcommitted
feat(tracing): add request/response handlers
Add Handlers class with 4 lifecycle callbacks: - onBeforeSend: called before sending request - onSuccess: called on successful response - onTimeout: called when request times out - onIgnoredResponse: called when response arrives after timeout Handlers are configured at client level via withHandlers() builder method and propagated through: Builder -> ClientImpl -> Pool -> PoolEntry -> IProtoClient Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent c57f6a2 commit b1685fb

File tree

15 files changed

+372
-11
lines changed

15 files changed

+372
-11
lines changed

tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolBoxClientBuilder.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.tarantool.client.box.TarantoolBoxClient;
3535
import io.tarantool.core.ManagedResource;
3636
import io.tarantool.core.WatcherOptions;
37+
import io.tarantool.core.protocol.Handlers;
3738
import io.tarantool.core.protocol.IProtoResponse;
3839
import io.tarantool.pool.HeartbeatOpts;
3940
import io.tarantool.pool.InstanceConnectionGroup;
@@ -159,6 +160,9 @@ public class TarantoolBoxClientBuilder {
159160
/** Optional listener for pool events. */
160161
private PoolEventListener poolEventListener;
161162

163+
/** Custom handlers for protocol operations. */
164+
private Handlers handlers;
165+
162166
/**
163167
* Getter for {@link #options}.
164168
*
@@ -819,6 +823,17 @@ public TarantoolBoxClientBuilder withPoolEventListener(PoolEventListener listene
819823
return this;
820824
}
821825

826+
/**
827+
* Sets custom handlers for protocol operations.
828+
*
829+
* @param handlers handlers instance
830+
* @return {@link TarantoolBoxClientBuilder} object.
831+
*/
832+
public TarantoolBoxClientBuilder withHandlers(Handlers handlers) {
833+
this.handlers = handlers;
834+
return this;
835+
}
836+
822837
/**
823838
* Builds specific {@link TarantoolBoxClient} class instance with parameters.
824839
*
@@ -859,6 +874,7 @@ public TarantoolBoxClient build() throws Exception {
859874
reconnectAfter,
860875
metricsRegistry,
861876
ignoredPacketsHandler,
877+
handlers,
862878
sslContext,
863879
poolEventListener);
864880
}

tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolBoxClientImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.tarantool.core.ManagedResource;
2424
import io.tarantool.core.WatcherOptions;
2525
import io.tarantool.core.exceptions.ClientException;
26+
import io.tarantool.core.protocol.Handlers;
2627
import io.tarantool.core.protocol.IProtoResponse;
2728
import io.tarantool.pool.HeartbeatOpts;
2829
import io.tarantool.pool.InstanceConnectionGroup;
@@ -119,6 +120,7 @@ final class TarantoolBoxClientImpl extends TarantoolClientImpl implements Tarant
119120
long reconnectAfter,
120121
MeterRegistry metricsRegistry,
121122
TripleConsumer<String, Integer, IProtoResponse> ignoredPacketsHandler,
123+
Handlers handlers,
122124
SslContext sslContext,
123125
PoolEventListener poolEventListener)
124126
throws InvocationTargetException,
@@ -138,6 +140,7 @@ final class TarantoolBoxClientImpl extends TarantoolClientImpl implements Tarant
138140
reconnectAfter,
139141
metricsRegistry,
140142
ignoredPacketsHandler,
143+
handlers,
141144
sslContext,
142145
!fetchSchema,
143146
poolEventListener);

tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolClientImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.tarantool.core.WatcherOptions;
3434
import io.tarantool.core.connection.ConnectionFactory;
3535
import io.tarantool.core.exceptions.ServerException;
36+
import io.tarantool.core.protocol.Handlers;
3637
import io.tarantool.core.protocol.IProtoRequestOpts;
3738
import io.tarantool.core.protocol.IProtoResponse;
3839
import io.tarantool.mapping.TarantoolJacksonMapping;
@@ -94,6 +95,7 @@ abstract class TarantoolClientImpl implements TarantoolClient {
9495
* @param reconnectAfter time after which reconnect occurs
9596
* @param metricsRegistry micrometer {@link TarantoolClientImpl#metricsRegistry}
9697
* @param ignoredPacketsHandler handler for ignored IProto-packets.
98+
* @param handlers handlers for request/response lifecycle events.
9799
* @param sslContext SslContext with settings for establishing SSL/TLS connection between
98100
* Tarantool.
99101
* @param useTupleExtension Use TUPLE_EXT feature if true.
@@ -123,6 +125,7 @@ protected TarantoolClientImpl(
123125
long reconnectAfter,
124126
MeterRegistry metricsRegistry,
125127
TripleConsumer<String, Integer, IProtoResponse> ignoredPacketsHandler,
128+
Handlers handlers,
126129
SslContext sslContext,
127130
boolean useTupleExtension,
128131
PoolEventListener poolEventListener)
@@ -144,6 +147,7 @@ protected TarantoolClientImpl(
144147
watcherOpts,
145148
this.metricsRegistry,
146149
ignoredPacketsHandler,
150+
handlers,
147151
useTupleExtension,
148152
poolEventListener);
149153
pool.setGroups(groups);

tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolCrudClientBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.tarantool.client.crud.TarantoolCrudClient;
3434
import io.tarantool.core.ManagedResource;
3535
import io.tarantool.core.WatcherOptions;
36+
import io.tarantool.core.protocol.Handlers;
3637
import io.tarantool.core.protocol.IProtoResponse;
3738
import io.tarantool.pool.HeartbeatOpts;
3839
import io.tarantool.pool.InstanceConnectionGroup;
@@ -148,6 +149,8 @@ public class TarantoolCrudClientBuilder {
148149
/** Optional listener for pool lifecycle events. */
149150
private PoolEventListener poolEventListener;
150151

152+
private Handlers handlers;
153+
151154
public Map<ChannelOption<?>, Object> getOptions() {
152155
return options;
153156
}
@@ -762,6 +765,11 @@ public TarantoolCrudClientBuilder withPoolEventListener(PoolEventListener listen
762765
return this;
763766
}
764767

768+
public TarantoolCrudClientBuilder withHandlers(Handlers handlers) {
769+
this.handlers = handlers;
770+
return this;
771+
}
772+
765773
/**
766774
* Builds specific {@link TarantoolCrudClient} class instance with parameters.
767775
*
@@ -800,6 +808,7 @@ public TarantoolCrudClient build() throws Exception {
800808
reconnectAfter,
801809
metricsRegistry,
802810
ignoredPacketsHandler,
811+
handlers,
803812
sslContext,
804813
useTupleExtension,
805814
poolEventListener);

tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolCrudClientImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.tarantool.client.crud.options.CrudOptions;
2121
import io.tarantool.core.ManagedResource;
2222
import io.tarantool.core.WatcherOptions;
23+
import io.tarantool.core.protocol.Handlers;
2324
import io.tarantool.core.protocol.IProtoResponse;
2425
import io.tarantool.pool.HeartbeatOpts;
2526
import io.tarantool.pool.InstanceConnectionGroup;
@@ -70,6 +71,7 @@ final class TarantoolCrudClientImpl extends TarantoolClientImpl implements Taran
7071
* @param reconnectAfter see reconnectAfter in{@link TarantoolCrudClientBuilder}.
7172
* @param metricsRegistry see metricsRegistry in{@link TarantoolCrudClientBuilder}.
7273
* @param ignoredPacketsHandler see ignoredPacketsHandler in{@link TarantoolCrudClientBuilder}.
74+
* @param handlers see handlers in{@link TarantoolCrudClientBuilder}.
7375
* @param sslContext see sslContext in{@link TarantoolCrudClientBuilder}.
7476
* @param useTupleExtension see useTupleExtension in{@link TarantoolCrudClientBuilder}.
7577
* @throws NoSuchMethodException if a matching method is not found.
@@ -96,6 +98,7 @@ final class TarantoolCrudClientImpl extends TarantoolClientImpl implements Taran
9698
long reconnectAfter,
9799
MeterRegistry metricsRegistry,
98100
TripleConsumer<String, Integer, IProtoResponse> ignoredPacketsHandler,
101+
Handlers handlers,
99102
SslContext sslContext,
100103
boolean useTupleExtension,
101104
PoolEventListener poolEventListener)
@@ -116,6 +119,7 @@ final class TarantoolCrudClientImpl extends TarantoolClientImpl implements Taran
116119
reconnectAfter,
117120
metricsRegistry,
118121
ignoredPacketsHandler,
122+
handlers,
119123
sslContext,
120124
useTupleExtension,
121125
poolEventListener);

tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolDataGridClientBuilder.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.tarantool.client.tdg.TarantoolDataGridClient;
3434
import io.tarantool.core.ManagedResource;
3535
import io.tarantool.core.WatcherOptions;
36+
import io.tarantool.core.protocol.Handlers;
3637
import io.tarantool.core.protocol.IProtoResponse;
3738
import io.tarantool.pool.HeartbeatOpts;
3839
import io.tarantool.pool.InstanceConnectionGroup;
@@ -158,6 +159,9 @@ public class TarantoolDataGridClientBuilder {
158159
/** Optional listener for pool lifecycle events. */
159160
private PoolEventListener poolEventListener;
160161

162+
/** Handlers for protocol messages. */
163+
private Handlers handlers;
164+
161165
public Map<ChannelOption<?>, Object> getOptions() {
162166
return options;
163167
}
@@ -840,6 +844,17 @@ public TarantoolDataGridClientBuilder withCredentials(Map<String, Object> creden
840844
return this;
841845
}
842846

847+
/**
848+
* Sets the {@link #handlers} parameter when constructing an instance of a builder class.
849+
*
850+
* @param handlers see {@link #handlers} field.
851+
* @return {@link TarantoolDataGridClientBuilder} object.
852+
*/
853+
public TarantoolDataGridClientBuilder withHandlers(Handlers handlers) {
854+
this.handlers = handlers;
855+
return this;
856+
}
857+
843858
/**
844859
* Builds specific {@link TarantoolDataGridClient} class instance with parameters.
845860
*
@@ -878,6 +893,7 @@ public TarantoolDataGridClient build() throws Exception {
878893
reconnectAfter,
879894
metricsRegistry,
880895
ignoredPacketsHandler,
896+
handlers,
881897
sslContext,
882898
useTupleExtension,
883899
useTdg1Context,

tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolDataGridClientImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.tarantool.client.tdg.TarantoolDataGridSpace;
2020
import io.tarantool.core.ManagedResource;
2121
import io.tarantool.core.WatcherOptions;
22+
import io.tarantool.core.protocol.Handlers;
2223
import io.tarantool.core.protocol.IProtoResponse;
2324
import io.tarantool.pool.HeartbeatOpts;
2425
import io.tarantool.pool.InstanceConnectionGroup;
@@ -70,6 +71,7 @@ protected TarantoolDataGridClientImpl(
7071
long reconnectAfter,
7172
MeterRegistry metricsRegistry,
7273
TripleConsumer<String, Integer, IProtoResponse> ignoredPacketsHandler,
74+
Handlers handlers,
7375
SslContext sslContext,
7476
boolean useTupleExtension,
7577
boolean useTdg1Context,
@@ -92,6 +94,7 @@ protected TarantoolDataGridClientImpl(
9294
reconnectAfter,
9395
metricsRegistry,
9496
ignoredPacketsHandler,
97+
handlers,
9598
sslContext,
9699
useTupleExtension,
97100
poolEventListener);

tarantool-client/src/test/java/io/tarantool/client/integration/TarantoolBoxClientTest.java

Lines changed: 103 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,20 @@
4242
import org.junit.jupiter.params.ParameterizedTest;
4343
import org.junit.jupiter.params.provider.Arguments;
4444
import org.junit.jupiter.params.provider.MethodSource;
45+
import org.msgpack.core.MessageBufferPacker;
46+
import org.msgpack.core.MessagePack;
47+
import org.msgpack.core.MessageUnpacker;
48+
import org.msgpack.value.Value;
49+
import org.msgpack.value.ValueFactory;
4550
import org.testcontainers.containers.tarantool.TarantoolContainer;
4651
import org.testcontainers.containers.utils.TarantoolContainerClientHelper;
4752
import org.testcontainers.shaded.com.google.common.base.CaseFormat;
4853

4954
import static io.tarantool.client.box.TarantoolBoxSpace.WITHOUT_ENABLED_FETCH_SCHEMA_OPTION_FOR_TARANTOOL_LESS_3_0_0;
55+
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_DATA;
56+
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_SYNC_ID;
57+
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_CALL;
58+
import static io.tarantool.mapping.BaseTarantoolJacksonMapping.objectMapper;
5059
import io.tarantool.client.BaseOptions;
5160
import io.tarantool.client.ClientType;
5261
import io.tarantool.client.Options;
@@ -61,7 +70,11 @@
6170
import io.tarantool.client.operation.Operations;
6271
import io.tarantool.core.IProtoClient;
6372
import io.tarantool.core.protocol.BoxIterator;
73+
import io.tarantool.core.protocol.ByteBodyValueWrapper;
74+
import io.tarantool.core.protocol.Handlers;
75+
import io.tarantool.core.protocol.IProtoRequest;
6476
import io.tarantool.core.protocol.IProtoResponse;
77+
import io.tarantool.mapping.BaseTarantoolJacksonMapping;
6578
import io.tarantool.mapping.NilErrorResponse;
6679
import io.tarantool.mapping.SelectResponse;
6780
import io.tarantool.mapping.TarantoolResponse;
@@ -248,25 +261,113 @@ public void testCallAndEval() {
248261

249262
@Test
250263
public void testCallTimeoutWithIgnoredPacketsHandler() throws Exception {
264+
List<IProtoRequest> timedOutRequests = new ArrayList<>();
265+
List<List<Object>> localTriplets = new ArrayList<>();
266+
267+
TarantoolBoxClient testClient =
268+
TarantoolFactory.box()
269+
.withUser(API_USER)
270+
.withPassword(CREDS.get(API_USER))
271+
.withHost(tt.getHost())
272+
.withPort(tt.getFirstMappedPort())
273+
.withHandlers(
274+
Handlers.builder()
275+
.onTimeout(
276+
request -> {
277+
synchronized (timedOutRequests) {
278+
timedOutRequests.add(request);
279+
}
280+
})
281+
.build())
282+
.withIgnoredPacketsHandler(
283+
(tag, index, packet) -> {
284+
synchronized (localTriplets) {
285+
localTriplets.add(Arrays.asList(tag, index, packet));
286+
}
287+
})
288+
.build();
289+
251290
Options options = BaseOptions.builder().withTimeout(1_000L).build();
252291
Exception ex =
253292
assertThrows(
254293
CompletionException.class,
255-
() -> client.call("slow_echo", Arrays.asList(1, true), options).join());
294+
() -> testClient.call("slow_echo", Arrays.asList(1, true), options).join());
256295
Throwable cause = ex.getCause();
257296
assertEquals(TimeoutException.class, cause.getClass());
258297
Thread.sleep(600);
259-
assertEquals(1, triplets.size());
298+
assertEquals(1, localTriplets.size());
299+
300+
// Verify timeout handler and ignored packets have matching syncId
301+
long requestSyncId = assertTimeoutHandler(timedOutRequests);
302+
long responseSyncId = assertIgnoredPackets(localTriplets);
303+
assertEquals(requestSyncId, responseSyncId, "Request and response syncId should match");
260304

305+
testClient.close();
306+
}
307+
308+
private long assertTimeoutHandler(List<IProtoRequest> timedOutRequests) throws Exception {
309+
assertEquals(1, timedOutRequests.size());
310+
IProtoRequest timedOutRequest = timedOutRequests.get(0);
311+
assertEquals(IPROTO_TYPE_CALL, timedOutRequest.getRequestType());
312+
313+
byte[] packetBytes = timedOutRequest.getPacket(MessagePack.newDefaultBufferPacker());
314+
MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(packetBytes);
315+
unpacker.unpackInt(); // Skip size prefix
316+
317+
Value headerValue = unpacker.unpackValue();
318+
Value bodyValue = unpacker.unpackValue();
319+
320+
// Extract syncId from header (key 0x01)
321+
long syncId =
322+
headerValue
323+
.asMapValue()
324+
.map()
325+
.get(ValueFactory.newInteger(IPROTO_SYNC_ID))
326+
.asIntegerValue()
327+
.asLong();
328+
329+
// Convert body to bytes and parse with objectMapper
330+
MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
331+
packer.packValue(bodyValue);
332+
byte[] bodyBytes = packer.toByteArray();
333+
Map<Integer, Object> body =
334+
objectMapper.readValue(bodyBytes, new TypeReference<Map<Integer, Object>>() {});
335+
336+
assertEquals("slow_echo", body.get(0x22)); // IPROTO_FUNCTION_NAME
337+
assertEquals(Arrays.asList(1, true), body.get(0x21)); // IPROTO_TUPLE
338+
339+
return syncId;
340+
}
341+
342+
private long assertIgnoredPackets(List<List<Object>> triplets) throws Exception {
261343
Set<String> tags = new HashSet<>();
262344
Set<Integer> indexes = new HashSet<>();
345+
long syncId = -1;
346+
263347
for (List<Object> item : triplets) {
264348
tags.add((String) item.get(0));
265349
indexes.add((int) item.get(1));
266350
assertInstanceOf(IProtoResponse.class, item.get(2));
351+
IProtoResponse response = (IProtoResponse) item.get(2);
352+
assertFalse(response.isError());
353+
assertTrue(response.hasSyncId());
354+
syncId = response.getSyncId();
355+
assertTrue(syncId > 0);
356+
357+
Map<Integer, Object> bodyAsObjects = new HashMap<>();
358+
Map<Integer, ByteBodyValueWrapper> byteBodyValues = response.getByteBodyValues();
359+
for (Map.Entry<Integer, ByteBodyValueWrapper> entry : byteBodyValues.entrySet()) {
360+
bodyAsObjects.put(
361+
entry.getKey(), BaseTarantoolJacksonMapping.readValue(entry.getValue(), Object.class));
362+
}
363+
assertEquals(byteBodyValues.size(), bodyAsObjects.size());
364+
assertEquals(Arrays.asList(1, true), bodyAsObjects.get(IPROTO_DATA));
267365
}
366+
268367
assertEquals(new HashSet<>(Collections.singletonList("default")), tags);
269368
assertEquals(Collections.singleton(0), indexes);
369+
370+
return syncId;
270371
}
271372

272373
@Test

0 commit comments

Comments
 (0)