Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions tarantool-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,31 @@
<version>2.20.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-logging</artifactId>
<scope>test</scope>
</dependency>
<!-- End test -->
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,17 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.core.type.TypeReference;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
Expand All @@ -54,8 +61,20 @@

import static io.tarantool.client.box.TarantoolBoxSpace.WITHOUT_ENABLED_FETCH_SCHEMA_OPTION_FOR_TARANTOOL_LESS_3_0_0;
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_DATA;
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_ERROR;
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_SYNC_ID;
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_AUTH;
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_CALL;
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_DELETE;
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_EVAL;
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_INSERT;
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_PING;
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_REPLACE;
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_SELECT;
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_UPDATE;
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_UPSERT;
import static io.tarantool.core.protocol.requests.IProtoConstant.MP_IPROTO_FUNCTION_NAME;
import static io.tarantool.core.protocol.requests.IProtoConstant.MP_IPROTO_TUPLE;
import static io.tarantool.mapping.BaseTarantoolJacksonMapping.objectMapper;
import io.tarantool.client.BaseOptions;
import io.tarantool.client.ClientType;
Expand Down Expand Up @@ -1681,4 +1700,316 @@ void testGetServerVersion() throws Exception {
TarantoolVersion version = client.getServerVersion().join();
assertEquals(tarantoolMajorVersion, version.getMajor());
}

/**
* Demonstrates distributed tracing using OpenTelemetry API with Handlers.
*
* <p>This test shows how to integrate Tarantool client with OpenTelemetry to track request
* lifecycle:
*
* <ul>
* <li>onBeforeSend - creates a span and stores it by syncId
* <li>onSuccess - retrieves span, sets attributes and ends it
* <li>onTimeout - retrieves span, records exception and ends it
* <li>onIgnoredResponse - adds event for late response arrival
* </ul>
*
* <p>Jaeger is started via TestContainer. To view traces, open the URL printed in console
* (http://localhost:16686) after test starts running.
*/
@Test
@Timeout(5000)
@Disabled
public void testOpenTelemetryTracingWithJaeger() throws Exception {
// Start Jaeger via TestContainer
org.testcontainers.containers.GenericContainer<?> jaeger =
new org.testcontainers.containers.GenericContainer<>("jaegertracing/all-in-one:1.50")
.withExposedPorts(16686, 4317)
.withEnv("COLLECTOR_OTLP_ENABLED", "true");
jaeger.start();

String jaegerUrl = "http://" + jaeger.getHost() + ":" + jaeger.getMappedPort(16686);
String otlpEndpoint = "http://" + jaeger.getHost() + ":" + jaeger.getMappedPort(4317);

System.out.println("========================================");
System.out.println("Jaeger UI available at: " + jaegerUrl);
System.out.println("OTLP endpoint: " + otlpEndpoint);
System.out.println("========================================");

// Set up OTLP exporter to send traces to Jaeger
io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter spanExporter =
io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter.builder()
.setEndpoint(otlpEndpoint)
.build();

io.opentelemetry.sdk.resources.Resource resource =
io.opentelemetry.sdk.resources.Resource.builder()
.put(
io.opentelemetry.api.common.AttributeKey.stringKey("service.name"),
"tarantool-java-sdk")
.put(io.opentelemetry.api.common.AttributeKey.stringKey("service.version"), "2.0.0")
.build();

SdkTracerProvider tracerProvider =
SdkTracerProvider.builder()
.setResource(resource)
.addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
.build();

OpenTelemetrySdk openTelemetry =
OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build();

Tracer tracer = openTelemetry.getTracer("tarantool-test");

// Maps to store active spans by syncId
java.util.Map<Long, Span> requestSpans = new java.util.concurrent.ConcurrentHashMap<>();
java.util.Map<Long, Span> timeoutSpans = new java.util.concurrent.ConcurrentHashMap<>();

TarantoolBoxClient testClient =
TarantoolFactory.box()
.withUser(API_USER)
.withPassword(CREDS.get(API_USER))
.withHost(tt.getHost())
.withPort(tt.getFirstMappedPort())
.withHandlers(
Handlers.builder()
.onBeforeSend(
request -> {
// Extract function name and arguments from request body
String functionName = extractFunctionName(request);
String args = extractArguments(request);

String requestTypeName = getRequestTypeName(request.getRequestType());

// 1. Main request span - lives until final result (success or late
// response)
Span requestSpan =
tracer
.spanBuilder(
"tarantool." + requestTypeName + ":" + request.getSyncId())
.setAttribute("request.type", request.getRequestType())
.setAttribute("request.type.name", requestTypeName)
.setAttribute("sync.id", request.getSyncId())
.setAttribute("tarantool.function", functionName)
.setAttribute("tarantool.arguments", args)
.startSpan();
requestSpans.put(request.getSyncId(), requestSpan);

// 2. Timeout span - created now, ended on timeout, removed on success
Span timeoutSpan =
tracer
.spanBuilder(
"tarantool."
+ requestTypeName
+ ".timeout:"
+ request.getSyncId())
.setParent(
io.opentelemetry.context.Context.current().with(requestSpan))
.setAttribute("sync.id", request.getSyncId())
.startSpan();
timeoutSpans.put(request.getSyncId(), timeoutSpan);
})
.onSuccess(
response -> {
// Remove timeout span (success happened, no timeout)
Span timeoutSpan = timeoutSpans.remove(response.getSyncId());
if (timeoutSpan != null) {
// Don't end it, just discard - timeout didn't happen
}

// End main request span
Span requestSpan = requestSpans.remove(response.getSyncId());
if (requestSpan != null) {
requestSpan.setAttribute("response.sync_id", response.getSyncId());
requestSpan.setStatus(StatusCode.OK);

// Parse and log response data
String responseData = extractResponseData(response);
requestSpan.setAttribute("response.data", responseData);

requestSpan.end();
}
})
.onTimeout(
request -> {
// End timeout span
Span timeoutSpan = timeoutSpans.remove(request.getSyncId());
if (timeoutSpan != null) {
timeoutSpan.addEvent("Request timed out");
timeoutSpan.setStatus(StatusCode.ERROR, "Request timed out");
timeoutSpan.setAttribute("error.type", "timeout");
timeoutSpan.recordException(new TimeoutException("Request timed out"));
timeoutSpan.end();
}
})
.onIgnoredResponse(
response -> {
// End main request span (late response arrived)
Span requestSpan = requestSpans.remove(response.getSyncId());
if (requestSpan != null) {
requestSpan.addEvent("Late response arrived");
requestSpan.setAttribute("response.sync_id", response.getSyncId());
requestSpan.setAttribute("late.response", true);

// Parse and log response data
String responseData = extractResponseData(response);
requestSpan.setAttribute("response.data", responseData);

requestSpan.end();
}
})
.build())
.build();

// Test successful request
List<?> result = testClient.call("echo", Arrays.asList(42, "test")).join().get();
assertEquals(Arrays.asList(42, "test"), result);

// Wait for async callbacks and span export
Thread.sleep(1000);

System.out.println("Sent successful request trace to Jaeger");

// Test timeout scenario
Options timeoutOptions = BaseOptions.builder().withTimeout(100L).build();

Exception ex =
assertThrows(
CompletionException.class,
() -> testClient.call("slow_echo", Arrays.asList(1, true), timeoutOptions).join());
assertEquals(TimeoutException.class, ex.getCause().getClass());

// Wait for timeout handler and span export
Thread.sleep(1000);

System.out.println("Sent timeout error trace to Jaeger");

// Print Jaeger URL again so user can view traces after test completes
System.out.println("========================================");
System.out.println("Test completed. View traces at: " + jaegerUrl);
System.out.println("========================================");

// Keep Jaeger running for a moment to allow viewing traces
Thread.sleep(3000000);
jaeger.stop();
openTelemetry.close();
testClient.close();
}

/** Helper method to get human-readable request type name. */
private String getRequestTypeName(int requestType) {
switch (requestType) {
case IPROTO_TYPE_SELECT:
return "SELECT";
case IPROTO_TYPE_INSERT:
return "INSERT";
case IPROTO_TYPE_REPLACE:
return "REPLACE";
case IPROTO_TYPE_UPDATE:
return "UPDATE";
case IPROTO_TYPE_DELETE:
return "DELETE";
case IPROTO_TYPE_AUTH:
return "AUTH";
case IPROTO_TYPE_EVAL:
return "EVAL";
case IPROTO_TYPE_UPSERT:
return "UPSERT";
case IPROTO_TYPE_CALL:
return "CALL";
case IPROTO_TYPE_PING:
return "PING";
default:
return "UNKNOWN(" + requestType + ")";
}
}

/** Helper method to extract function name from CALL request. */
private String extractFunctionName(IProtoRequest request) {
try {
byte[] packetBytes = request.getPacket(org.msgpack.core.MessagePack.newDefaultBufferPacker());
org.msgpack.core.MessageUnpacker unpacker =
org.msgpack.core.MessagePack.newDefaultUnpacker(packetBytes);
unpacker.unpackInt(); // Skip size prefix
unpacker.unpackValue(); // Skip header
org.msgpack.value.Value bodyValue = unpacker.unpackValue();

if (bodyValue.isMapValue()) {
org.msgpack.value.Value functionNameValue =
bodyValue.asMapValue().map().get(MP_IPROTO_FUNCTION_NAME);
if (functionNameValue != null && functionNameValue.isStringValue()) {
return functionNameValue.asStringValue().asString();
}
}
} catch (Exception e) {
// Ignore parsing errors
}
return "unknown";
}

/** Helper method to extract arguments from CALL request. */
private String extractArguments(IProtoRequest request) {
try {
byte[] packetBytes = request.getPacket(org.msgpack.core.MessagePack.newDefaultBufferPacker());
org.msgpack.core.MessageUnpacker unpacker =
org.msgpack.core.MessagePack.newDefaultUnpacker(packetBytes);
unpacker.unpackInt(); // Skip size prefix
unpacker.unpackValue(); // Skip header
org.msgpack.value.Value bodyValue = unpacker.unpackValue();

if (bodyValue.isMapValue()) {
org.msgpack.value.Value tupleValue = bodyValue.asMapValue().map().get(MP_IPROTO_TUPLE);
if (tupleValue != null) {
return tupleValue.toString();
}
}
} catch (Exception e) {
// Ignore parsing errors
}
return "[]";
}

/** Helper method to extract response data from IProtoResponse. */
private String extractResponseData(IProtoResponse response) {
try {
StringBuilder sb = new StringBuilder();
sb.append("{");

// Extract data from response body using Jackson mapping
java.util.Map<Integer, io.tarantool.core.protocol.ByteBodyValueWrapper> byteBodyValues =
response.getByteBodyValues();

boolean first = true;
for (java.util.Map.Entry<Integer, io.tarantool.core.protocol.ByteBodyValueWrapper> entry :
byteBodyValues.entrySet()) {
if (!first) {
sb.append(", ");
}
first = false;

String keyName = getResponseFieldName(entry.getKey());
Object value =
io.tarantool.mapping.BaseTarantoolJacksonMapping.readValue(
entry.getValue(), Object.class);
sb.append(keyName).append("=").append(value);
}

sb.append("}");
return sb.toString();
} catch (Exception e) {
return "{error=" + e.getMessage() + "}";
}
}

/** Helper method to get human-readable response field name. */
private String getResponseFieldName(int fieldCode) {
switch (fieldCode) {
case IPROTO_DATA:
return "data";
case IPROTO_ERROR:
return "error";
default:
return "field_" + fieldCode;
}
}
}
Loading