Skip to content

Commit a80083f

Browse files
ArtDuclaude
andcommitted
feat(tracing): add request/response lifecycle handlers
Add Handlers class with 4 lifecycle callbacks: - onBeforeSend: called before sending request - onSuccess: called on successful response - onTimeout: called when request times out - onIgnoredResponse: called when response arrives after timeout Handlers are configured at client level via withHandlers() builder method and propagated through: Builder -> ClientImpl -> Pool -> PoolEntry -> IProtoClient Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent ed7c898 commit a80083f

File tree

15 files changed

+371
-11
lines changed

15 files changed

+371
-11
lines changed

tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolBoxClientBuilder.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.tarantool.client.box.TarantoolBoxClient;
3535
import io.tarantool.core.ManagedResource;
3636
import io.tarantool.core.WatcherOptions;
37+
import io.tarantool.core.protocol.Handlers;
3738
import io.tarantool.core.protocol.IProtoResponse;
3839
import io.tarantool.pool.HeartbeatOpts;
3940
import io.tarantool.pool.InstanceConnectionGroup;
@@ -159,6 +160,9 @@ public class TarantoolBoxClientBuilder {
159160
/** Optional listener for pool events. */
160161
private PoolEventListener poolEventListener;
161162

163+
/** Custom handlers for protocol operations. */
164+
private Handlers handlers;
165+
162166
/**
163167
* Getter for {@link #options}.
164168
*
@@ -819,6 +823,17 @@ public TarantoolBoxClientBuilder withPoolEventListener(PoolEventListener listene
819823
return this;
820824
}
821825

826+
/**
827+
* Sets custom handlers for protocol operations.
828+
*
829+
* @param handlers handlers instance
830+
* @return {@link TarantoolBoxClientBuilder} object.
831+
*/
832+
public TarantoolBoxClientBuilder withHandlers(Handlers handlers) {
833+
this.handlers = handlers;
834+
return this;
835+
}
836+
822837
/**
823838
* Builds specific {@link TarantoolBoxClient} class instance with parameters.
824839
*
@@ -859,6 +874,7 @@ public TarantoolBoxClient build() throws Exception {
859874
reconnectAfter,
860875
metricsRegistry,
861876
ignoredPacketsHandler,
877+
handlers,
862878
sslContext,
863879
poolEventListener);
864880
}

tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolBoxClientImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.tarantool.core.ManagedResource;
2424
import io.tarantool.core.WatcherOptions;
2525
import io.tarantool.core.exceptions.ClientException;
26+
import io.tarantool.core.protocol.Handlers;
2627
import io.tarantool.core.protocol.IProtoResponse;
2728
import io.tarantool.pool.HeartbeatOpts;
2829
import io.tarantool.pool.InstanceConnectionGroup;
@@ -119,6 +120,7 @@ final class TarantoolBoxClientImpl extends TarantoolClientImpl implements Tarant
119120
long reconnectAfter,
120121
MeterRegistry metricsRegistry,
121122
TripleConsumer<String, Integer, IProtoResponse> ignoredPacketsHandler,
123+
Handlers handlers,
122124
SslContext sslContext,
123125
PoolEventListener poolEventListener)
124126
throws InvocationTargetException,
@@ -138,6 +140,7 @@ final class TarantoolBoxClientImpl extends TarantoolClientImpl implements Tarant
138140
reconnectAfter,
139141
metricsRegistry,
140142
ignoredPacketsHandler,
143+
handlers,
141144
sslContext,
142145
!fetchSchema,
143146
poolEventListener);

tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolClientImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.tarantool.core.WatcherOptions;
3434
import io.tarantool.core.connection.ConnectionFactory;
3535
import io.tarantool.core.exceptions.ServerException;
36+
import io.tarantool.core.protocol.Handlers;
3637
import io.tarantool.core.protocol.IProtoRequestOpts;
3738
import io.tarantool.core.protocol.IProtoResponse;
3839
import io.tarantool.mapping.TarantoolJacksonMapping;
@@ -94,6 +95,7 @@ abstract class TarantoolClientImpl implements TarantoolClient {
9495
* @param reconnectAfter time after which reconnect occurs
9596
* @param metricsRegistry micrometer {@link TarantoolClientImpl#metricsRegistry}
9697
* @param ignoredPacketsHandler handler for ignored IProto-packets.
98+
* @param handlers handlers for request/response lifecycle events.
9799
* @param sslContext SslContext with settings for establishing SSL/TLS connection between
98100
* Tarantool.
99101
* @param useTupleExtension Use TUPLE_EXT feature if true.
@@ -123,6 +125,7 @@ protected TarantoolClientImpl(
123125
long reconnectAfter,
124126
MeterRegistry metricsRegistry,
125127
TripleConsumer<String, Integer, IProtoResponse> ignoredPacketsHandler,
128+
Handlers handlers,
126129
SslContext sslContext,
127130
boolean useTupleExtension,
128131
PoolEventListener poolEventListener)
@@ -144,6 +147,7 @@ protected TarantoolClientImpl(
144147
watcherOpts,
145148
this.metricsRegistry,
146149
ignoredPacketsHandler,
150+
handlers,
147151
useTupleExtension,
148152
poolEventListener);
149153
pool.setGroups(groups);

tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolCrudClientBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.tarantool.client.crud.TarantoolCrudClient;
3434
import io.tarantool.core.ManagedResource;
3535
import io.tarantool.core.WatcherOptions;
36+
import io.tarantool.core.protocol.Handlers;
3637
import io.tarantool.core.protocol.IProtoResponse;
3738
import io.tarantool.pool.HeartbeatOpts;
3839
import io.tarantool.pool.InstanceConnectionGroup;
@@ -148,6 +149,8 @@ public class TarantoolCrudClientBuilder {
148149
/** Optional listener for pool lifecycle events. */
149150
private PoolEventListener poolEventListener;
150151

152+
private Handlers handlers;
153+
151154
public Map<ChannelOption<?>, Object> getOptions() {
152155
return options;
153156
}
@@ -762,6 +765,11 @@ public TarantoolCrudClientBuilder withPoolEventListener(PoolEventListener listen
762765
return this;
763766
}
764767

768+
public TarantoolCrudClientBuilder withHandlers(Handlers handlers) {
769+
this.handlers = handlers;
770+
return this;
771+
}
772+
765773
/**
766774
* Builds specific {@link TarantoolCrudClient} class instance with parameters.
767775
*
@@ -800,6 +808,7 @@ public TarantoolCrudClient build() throws Exception {
800808
reconnectAfter,
801809
metricsRegistry,
802810
ignoredPacketsHandler,
811+
handlers,
803812
sslContext,
804813
useTupleExtension,
805814
poolEventListener);

tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolCrudClientImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.tarantool.client.crud.options.CrudOptions;
2121
import io.tarantool.core.ManagedResource;
2222
import io.tarantool.core.WatcherOptions;
23+
import io.tarantool.core.protocol.Handlers;
2324
import io.tarantool.core.protocol.IProtoResponse;
2425
import io.tarantool.pool.HeartbeatOpts;
2526
import io.tarantool.pool.InstanceConnectionGroup;
@@ -70,6 +71,7 @@ final class TarantoolCrudClientImpl extends TarantoolClientImpl implements Taran
7071
* @param reconnectAfter see reconnectAfter in{@link TarantoolCrudClientBuilder}.
7172
* @param metricsRegistry see metricsRegistry in{@link TarantoolCrudClientBuilder}.
7273
* @param ignoredPacketsHandler see ignoredPacketsHandler in{@link TarantoolCrudClientBuilder}.
74+
* @param handlers see handlers in{@link TarantoolCrudClientBuilder}.
7375
* @param sslContext see sslContext in{@link TarantoolCrudClientBuilder}.
7476
* @param useTupleExtension see useTupleExtension in{@link TarantoolCrudClientBuilder}.
7577
* @throws NoSuchMethodException if a matching method is not found.
@@ -96,6 +98,7 @@ final class TarantoolCrudClientImpl extends TarantoolClientImpl implements Taran
9698
long reconnectAfter,
9799
MeterRegistry metricsRegistry,
98100
TripleConsumer<String, Integer, IProtoResponse> ignoredPacketsHandler,
101+
Handlers handlers,
99102
SslContext sslContext,
100103
boolean useTupleExtension,
101104
PoolEventListener poolEventListener)
@@ -116,6 +119,7 @@ final class TarantoolCrudClientImpl extends TarantoolClientImpl implements Taran
116119
reconnectAfter,
117120
metricsRegistry,
118121
ignoredPacketsHandler,
122+
handlers,
119123
sslContext,
120124
useTupleExtension,
121125
poolEventListener);

tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolDataGridClientBuilder.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.tarantool.client.tdg.TarantoolDataGridClient;
3434
import io.tarantool.core.ManagedResource;
3535
import io.tarantool.core.WatcherOptions;
36+
import io.tarantool.core.protocol.Handlers;
3637
import io.tarantool.core.protocol.IProtoResponse;
3738
import io.tarantool.pool.HeartbeatOpts;
3839
import io.tarantool.pool.InstanceConnectionGroup;
@@ -158,6 +159,9 @@ public class TarantoolDataGridClientBuilder {
158159
/** Optional listener for pool lifecycle events. */
159160
private PoolEventListener poolEventListener;
160161

162+
/** Handlers for protocol messages. */
163+
private Handlers handlers;
164+
161165
public Map<ChannelOption<?>, Object> getOptions() {
162166
return options;
163167
}
@@ -840,6 +844,17 @@ public TarantoolDataGridClientBuilder withCredentials(Map<String, Object> creden
840844
return this;
841845
}
842846

847+
/**
848+
* Sets the {@link #handlers} parameter when constructing an instance of a builder class.
849+
*
850+
* @param handlers see {@link #handlers} field.
851+
* @return {@link TarantoolDataGridClientBuilder} object.
852+
*/
853+
public TarantoolDataGridClientBuilder withHandlers(Handlers handlers) {
854+
this.handlers = handlers;
855+
return this;
856+
}
857+
843858
/**
844859
* Builds specific {@link TarantoolDataGridClient} class instance with parameters.
845860
*
@@ -878,6 +893,7 @@ public TarantoolDataGridClient build() throws Exception {
878893
reconnectAfter,
879894
metricsRegistry,
880895
ignoredPacketsHandler,
896+
handlers,
881897
sslContext,
882898
useTupleExtension,
883899
useTdg1Context,

tarantool-client/src/main/java/io/tarantool/client/factory/TarantoolDataGridClientImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.tarantool.client.tdg.TarantoolDataGridSpace;
2020
import io.tarantool.core.ManagedResource;
2121
import io.tarantool.core.WatcherOptions;
22+
import io.tarantool.core.protocol.Handlers;
2223
import io.tarantool.core.protocol.IProtoResponse;
2324
import io.tarantool.pool.HeartbeatOpts;
2425
import io.tarantool.pool.InstanceConnectionGroup;
@@ -70,6 +71,7 @@ protected TarantoolDataGridClientImpl(
7071
long reconnectAfter,
7172
MeterRegistry metricsRegistry,
7273
TripleConsumer<String, Integer, IProtoResponse> ignoredPacketsHandler,
74+
Handlers handlers,
7375
SslContext sslContext,
7476
boolean useTupleExtension,
7577
boolean useTdg1Context,
@@ -92,6 +94,7 @@ protected TarantoolDataGridClientImpl(
9294
reconnectAfter,
9395
metricsRegistry,
9496
ignoredPacketsHandler,
97+
handlers,
9598
sslContext,
9699
useTupleExtension,
97100
poolEventListener);

tarantool-client/src/test/java/io/tarantool/client/integration/TarantoolBoxClientTest.java

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,19 @@
4444
import org.junit.jupiter.params.ParameterizedTest;
4545
import org.junit.jupiter.params.provider.Arguments;
4646
import org.junit.jupiter.params.provider.MethodSource;
47+
import org.msgpack.core.MessageBufferPacker;
48+
import org.msgpack.core.MessagePack;
49+
import org.msgpack.core.MessageUnpacker;
50+
import org.msgpack.value.Value;
51+
import org.msgpack.value.ValueFactory;
4752
import org.testcontainers.containers.tarantool.TarantoolContainer;
4853
import org.testcontainers.containers.utils.TarantoolContainerClientHelper;
4954
import org.testcontainers.shaded.com.google.common.base.CaseFormat;
5055

5156
import static io.tarantool.client.box.TarantoolBoxSpace.WITHOUT_ENABLED_FETCH_SCHEMA_OPTION_FOR_TARANTOOL_LESS_3_0_0;
57+
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_DATA;
58+
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_SYNC_ID;
59+
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_CALL;
5260
import static io.tarantool.mapping.BaseTarantoolJacksonMapping.objectMapper;
5361
import io.tarantool.client.BaseOptions;
5462
import io.tarantool.client.ClientType;
@@ -64,7 +72,11 @@
6472
import io.tarantool.client.operation.Operations;
6573
import io.tarantool.core.IProtoClient;
6674
import io.tarantool.core.protocol.BoxIterator;
75+
import io.tarantool.core.protocol.ByteBodyValueWrapper;
76+
import io.tarantool.core.protocol.Handlers;
77+
import io.tarantool.core.protocol.IProtoRequest;
6778
import io.tarantool.core.protocol.IProtoResponse;
79+
import io.tarantool.mapping.BaseTarantoolJacksonMapping;
6880
import io.tarantool.mapping.NilErrorResponse;
6981
import io.tarantool.mapping.SelectResponse;
7082
import io.tarantool.mapping.TarantoolResponse;
@@ -250,25 +262,113 @@ public void testCallAndEval() {
250262

251263
@Test
252264
public void testCallTimeoutWithIgnoredPacketsHandler() throws Exception {
265+
List<IProtoRequest> timedOutRequests = new ArrayList<>();
266+
List<List<Object>> localTriplets = new ArrayList<>();
267+
268+
TarantoolBoxClient testClient =
269+
TarantoolFactory.box()
270+
.withUser(API_USER)
271+
.withPassword(CREDS.get(API_USER))
272+
.withHost(tt.getHost())
273+
.withPort(tt.getFirstMappedPort())
274+
.withHandlers(
275+
Handlers.builder()
276+
.onTimeout(
277+
request -> {
278+
synchronized (timedOutRequests) {
279+
timedOutRequests.add(request);
280+
}
281+
})
282+
.build())
283+
.withIgnoredPacketsHandler(
284+
(tag, index, packet) -> {
285+
synchronized (localTriplets) {
286+
localTriplets.add(Arrays.asList(tag, index, packet));
287+
}
288+
})
289+
.build();
290+
253291
Options options = BaseOptions.builder().withTimeout(1_000L).build();
254292
Exception ex =
255293
assertThrows(
256294
CompletionException.class,
257-
() -> client.call("slow_echo", Arrays.asList(1, true), options).join());
295+
() -> testClient.call("slow_echo", Arrays.asList(1, true), options).join());
258296
Throwable cause = ex.getCause();
259297
assertEquals(TimeoutException.class, cause.getClass());
260298
Thread.sleep(600);
261-
assertEquals(1, triplets.size());
299+
assertEquals(1, localTriplets.size());
300+
301+
// Verify timeout handler and ignored packets have matching syncId
302+
long requestSyncId = assertTimeoutHandler(timedOutRequests);
303+
long responseSyncId = assertIgnoredPackets(localTriplets);
304+
assertEquals(requestSyncId, responseSyncId, "Request and response syncId should match");
262305

306+
testClient.close();
307+
}
308+
309+
private long assertTimeoutHandler(List<IProtoRequest> timedOutRequests) throws Exception {
310+
assertEquals(1, timedOutRequests.size());
311+
IProtoRequest timedOutRequest = timedOutRequests.get(0);
312+
assertEquals(IPROTO_TYPE_CALL, timedOutRequest.getRequestType());
313+
314+
byte[] packetBytes = timedOutRequest.getPacket(MessagePack.newDefaultBufferPacker());
315+
MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(packetBytes);
316+
unpacker.unpackInt(); // Skip size prefix
317+
318+
Value headerValue = unpacker.unpackValue();
319+
Value bodyValue = unpacker.unpackValue();
320+
321+
// Extract syncId from header (key 0x01)
322+
long syncId =
323+
headerValue
324+
.asMapValue()
325+
.map()
326+
.get(ValueFactory.newInteger(IPROTO_SYNC_ID))
327+
.asIntegerValue()
328+
.asLong();
329+
330+
// Convert body to bytes and parse with objectMapper
331+
MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
332+
packer.packValue(bodyValue);
333+
byte[] bodyBytes = packer.toByteArray();
334+
Map<Integer, Object> body =
335+
objectMapper.readValue(bodyBytes, new TypeReference<Map<Integer, Object>>() {});
336+
337+
assertEquals("slow_echo", body.get(0x22)); // IPROTO_FUNCTION_NAME
338+
assertEquals(Arrays.asList(1, true), body.get(0x21)); // IPROTO_TUPLE
339+
340+
return syncId;
341+
}
342+
343+
private long assertIgnoredPackets(List<List<Object>> triplets) throws Exception {
263344
Set<String> tags = new HashSet<>();
264345
Set<Integer> indexes = new HashSet<>();
346+
long syncId = -1;
347+
265348
for (List<Object> item : triplets) {
266349
tags.add((String) item.get(0));
267350
indexes.add((int) item.get(1));
268351
assertInstanceOf(IProtoResponse.class, item.get(2));
352+
IProtoResponse response = (IProtoResponse) item.get(2);
353+
assertFalse(response.isError());
354+
assertTrue(response.hasSyncId());
355+
syncId = response.getSyncId();
356+
assertTrue(syncId > 0);
357+
358+
Map<Integer, Object> bodyAsObjects = new HashMap<>();
359+
Map<Integer, ByteBodyValueWrapper> byteBodyValues = response.getByteBodyValues();
360+
for (Map.Entry<Integer, ByteBodyValueWrapper> entry : byteBodyValues.entrySet()) {
361+
bodyAsObjects.put(
362+
entry.getKey(), BaseTarantoolJacksonMapping.readValue(entry.getValue(), Object.class));
363+
}
364+
assertEquals(byteBodyValues.size(), bodyAsObjects.size());
365+
assertEquals(Arrays.asList(1, true), bodyAsObjects.get(IPROTO_DATA));
269366
}
367+
270368
assertEquals(new HashSet<>(Collections.singletonList("default")), tags);
271369
assertEquals(Collections.singleton(0), indexes);
370+
371+
return syncId;
272372
}
273373

274374
@Test

0 commit comments

Comments
 (0)