Skip to content

Commit 94f49a0

Browse files
ArtDuclaude
andcommitted
test(tracing): add OpenTelemetry tracing example with Jaeger
Add testOpenTelemetryTracingWithJaeger() demonstrating distributed tracing using OpenTelemetry API with Handlers: - Create spans for request lifecycle (onBeforeSend, onSuccess, onTimeout) - Parse request/response data using Jackson mapping - Export traces to Jaeger via TestContainer - Show flamegraph with timeout and late response handling The test is @disabled by default as it requires Docker and keeps Jaeger running for manual inspection. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 65b4e94 commit 94f49a0

File tree

3 files changed

+389
-0
lines changed

3 files changed

+389
-0
lines changed

tarantool-client/pom.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,31 @@
6666
<version>2.20.0</version>
6767
<scope>test</scope>
6868
</dependency>
69+
<dependency>
70+
<groupId>io.opentelemetry</groupId>
71+
<artifactId>opentelemetry-api</artifactId>
72+
<scope>test</scope>
73+
</dependency>
74+
<dependency>
75+
<groupId>io.opentelemetry</groupId>
76+
<artifactId>opentelemetry-sdk</artifactId>
77+
<scope>test</scope>
78+
</dependency>
79+
<dependency>
80+
<groupId>io.opentelemetry</groupId>
81+
<artifactId>opentelemetry-sdk-testing</artifactId>
82+
<scope>test</scope>
83+
</dependency>
84+
<dependency>
85+
<groupId>io.opentelemetry</groupId>
86+
<artifactId>opentelemetry-exporter-otlp</artifactId>
87+
<scope>test</scope>
88+
</dependency>
89+
<dependency>
90+
<groupId>io.opentelemetry</groupId>
91+
<artifactId>opentelemetry-exporter-logging</artifactId>
92+
<scope>test</scope>
93+
</dependency>
6994
<!-- End test -->
7095
</dependencies>
7196
</project>

tarantool-client/src/test/java/io/tarantool/client/integration/TarantoolBoxClientTest.java

Lines changed: 331 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,17 @@
3232
import static org.junit.jupiter.api.Assertions.assertThrows;
3333
import static org.junit.jupiter.api.Assertions.assertTrue;
3434
import com.fasterxml.jackson.core.type.TypeReference;
35+
import io.opentelemetry.api.trace.Span;
36+
import io.opentelemetry.api.trace.StatusCode;
37+
import io.opentelemetry.api.trace.Tracer;
38+
import io.opentelemetry.sdk.OpenTelemetrySdk;
39+
import io.opentelemetry.sdk.trace.SdkTracerProvider;
40+
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
3541
import org.junit.jupiter.api.AfterAll;
3642
import org.junit.jupiter.api.Assertions;
3743
import org.junit.jupiter.api.BeforeAll;
3844
import org.junit.jupiter.api.BeforeEach;
45+
import org.junit.jupiter.api.Disabled;
3946
import org.junit.jupiter.api.Test;
4047
import org.junit.jupiter.api.Timeout;
4148
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
@@ -54,8 +61,20 @@
5461

5562
import static io.tarantool.client.box.TarantoolBoxSpace.WITHOUT_ENABLED_FETCH_SCHEMA_OPTION_FOR_TARANTOOL_LESS_3_0_0;
5663
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_DATA;
64+
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_ERROR;
5765
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_SYNC_ID;
66+
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_AUTH;
5867
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_CALL;
68+
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_DELETE;
69+
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_EVAL;
70+
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_INSERT;
71+
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_PING;
72+
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_REPLACE;
73+
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_SELECT;
74+
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_UPDATE;
75+
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_UPSERT;
76+
import static io.tarantool.core.protocol.requests.IProtoConstant.MP_IPROTO_FUNCTION_NAME;
77+
import static io.tarantool.core.protocol.requests.IProtoConstant.MP_IPROTO_TUPLE;
5978
import static io.tarantool.mapping.BaseTarantoolJacksonMapping.objectMapper;
6079
import io.tarantool.client.BaseOptions;
6180
import io.tarantool.client.ClientType;
@@ -1681,4 +1700,316 @@ void testGetServerVersion() throws Exception {
16811700
TarantoolVersion version = client.getServerVersion().join();
16821701
assertEquals(tarantoolMajorVersion, version.getMajor());
16831702
}
1703+
1704+
/**
1705+
* Demonstrates distributed tracing using OpenTelemetry API with Handlers.
1706+
*
1707+
* <p>This test shows how to integrate Tarantool client with OpenTelemetry to track request
1708+
* lifecycle:
1709+
*
1710+
* <ul>
1711+
* <li>onBeforeSend - creates a span and stores it by syncId
1712+
* <li>onSuccess - retrieves span, sets attributes and ends it
1713+
* <li>onTimeout - retrieves span, records exception and ends it
1714+
* <li>onIgnoredResponse - adds event for late response arrival
1715+
* </ul>
1716+
*
1717+
* <p>Jaeger is started via TestContainer. To view traces, open the URL printed in console
1718+
* (http://localhost:16686) after test starts running.
1719+
*/
1720+
@Test
1721+
@Timeout(5000)
1722+
@Disabled
1723+
public void testOpenTelemetryTracingWithJaeger() throws Exception {
1724+
// Start Jaeger via TestContainer
1725+
org.testcontainers.containers.GenericContainer<?> jaeger =
1726+
new org.testcontainers.containers.GenericContainer<>("jaegertracing/all-in-one:1.50")
1727+
.withExposedPorts(16686, 4317)
1728+
.withEnv("COLLECTOR_OTLP_ENABLED", "true");
1729+
jaeger.start();
1730+
1731+
String jaegerUrl = "http://" + jaeger.getHost() + ":" + jaeger.getMappedPort(16686);
1732+
String otlpEndpoint = "http://" + jaeger.getHost() + ":" + jaeger.getMappedPort(4317);
1733+
1734+
System.out.println("========================================");
1735+
System.out.println("Jaeger UI available at: " + jaegerUrl);
1736+
System.out.println("OTLP endpoint: " + otlpEndpoint);
1737+
System.out.println("========================================");
1738+
1739+
// Set up OTLP exporter to send traces to Jaeger
1740+
io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter spanExporter =
1741+
io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter.builder()
1742+
.setEndpoint(otlpEndpoint)
1743+
.build();
1744+
1745+
io.opentelemetry.sdk.resources.Resource resource =
1746+
io.opentelemetry.sdk.resources.Resource.builder()
1747+
.put(
1748+
io.opentelemetry.api.common.AttributeKey.stringKey("service.name"),
1749+
"tarantool-java-sdk")
1750+
.put(io.opentelemetry.api.common.AttributeKey.stringKey("service.version"), "2.0.0")
1751+
.build();
1752+
1753+
SdkTracerProvider tracerProvider =
1754+
SdkTracerProvider.builder()
1755+
.setResource(resource)
1756+
.addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
1757+
.build();
1758+
1759+
OpenTelemetrySdk openTelemetry =
1760+
OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build();
1761+
1762+
Tracer tracer = openTelemetry.getTracer("tarantool-test");
1763+
1764+
// Maps to store active spans by syncId
1765+
java.util.Map<Long, Span> requestSpans = new java.util.concurrent.ConcurrentHashMap<>();
1766+
java.util.Map<Long, Span> timeoutSpans = new java.util.concurrent.ConcurrentHashMap<>();
1767+
1768+
TarantoolBoxClient testClient =
1769+
TarantoolFactory.box()
1770+
.withUser(API_USER)
1771+
.withPassword(CREDS.get(API_USER))
1772+
.withHost(tt.getHost())
1773+
.withPort(tt.getFirstMappedPort())
1774+
.withHandlers(
1775+
Handlers.builder()
1776+
.onBeforeSend(
1777+
request -> {
1778+
// Extract function name and arguments from request body
1779+
String functionName = extractFunctionName(request);
1780+
String args = extractArguments(request);
1781+
1782+
String requestTypeName = getRequestTypeName(request.getRequestType());
1783+
1784+
// 1. Main request span - lives until final result (success or late
1785+
// response)
1786+
Span requestSpan =
1787+
tracer
1788+
.spanBuilder(
1789+
"tarantool." + requestTypeName + ":" + request.getSyncId())
1790+
.setAttribute("request.type", request.getRequestType())
1791+
.setAttribute("request.type.name", requestTypeName)
1792+
.setAttribute("sync.id", request.getSyncId())
1793+
.setAttribute("tarantool.function", functionName)
1794+
.setAttribute("tarantool.arguments", args)
1795+
.startSpan();
1796+
requestSpans.put(request.getSyncId(), requestSpan);
1797+
1798+
// 2. Timeout span - created now, ended on timeout, removed on success
1799+
Span timeoutSpan =
1800+
tracer
1801+
.spanBuilder(
1802+
"tarantool."
1803+
+ requestTypeName
1804+
+ ".timeout:"
1805+
+ request.getSyncId())
1806+
.setParent(
1807+
io.opentelemetry.context.Context.current().with(requestSpan))
1808+
.setAttribute("sync.id", request.getSyncId())
1809+
.startSpan();
1810+
timeoutSpans.put(request.getSyncId(), timeoutSpan);
1811+
})
1812+
.onSuccess(
1813+
response -> {
1814+
// Remove timeout span (success happened, no timeout)
1815+
Span timeoutSpan = timeoutSpans.remove(response.getSyncId());
1816+
if (timeoutSpan != null) {
1817+
// Don't end it, just discard - timeout didn't happen
1818+
}
1819+
1820+
// End main request span
1821+
Span requestSpan = requestSpans.remove(response.getSyncId());
1822+
if (requestSpan != null) {
1823+
requestSpan.setAttribute("response.sync_id", response.getSyncId());
1824+
requestSpan.setStatus(StatusCode.OK);
1825+
1826+
// Parse and log response data
1827+
String responseData = extractResponseData(response);
1828+
requestSpan.setAttribute("response.data", responseData);
1829+
1830+
requestSpan.end();
1831+
}
1832+
})
1833+
.onTimeout(
1834+
request -> {
1835+
// End timeout span
1836+
Span timeoutSpan = timeoutSpans.remove(request.getSyncId());
1837+
if (timeoutSpan != null) {
1838+
timeoutSpan.addEvent("Request timed out");
1839+
timeoutSpan.setStatus(StatusCode.ERROR, "Request timed out");
1840+
timeoutSpan.setAttribute("error.type", "timeout");
1841+
timeoutSpan.recordException(new TimeoutException("Request timed out"));
1842+
timeoutSpan.end();
1843+
}
1844+
})
1845+
.onIgnoredResponse(
1846+
response -> {
1847+
// End main request span (late response arrived)
1848+
Span requestSpan = requestSpans.remove(response.getSyncId());
1849+
if (requestSpan != null) {
1850+
requestSpan.addEvent("Late response arrived");
1851+
requestSpan.setAttribute("response.sync_id", response.getSyncId());
1852+
requestSpan.setAttribute("late.response", true);
1853+
1854+
// Parse and log response data
1855+
String responseData = extractResponseData(response);
1856+
requestSpan.setAttribute("response.data", responseData);
1857+
1858+
requestSpan.end();
1859+
}
1860+
})
1861+
.build())
1862+
.build();
1863+
1864+
// Test successful request
1865+
List<?> result = testClient.call("echo", Arrays.asList(42, "test")).join().get();
1866+
assertEquals(Arrays.asList(42, "test"), result);
1867+
1868+
// Wait for async callbacks and span export
1869+
Thread.sleep(1000);
1870+
1871+
System.out.println("Sent successful request trace to Jaeger");
1872+
1873+
// Test timeout scenario
1874+
Options timeoutOptions = BaseOptions.builder().withTimeout(100L).build();
1875+
1876+
Exception ex =
1877+
assertThrows(
1878+
CompletionException.class,
1879+
() -> testClient.call("slow_echo", Arrays.asList(1, true), timeoutOptions).join());
1880+
assertEquals(TimeoutException.class, ex.getCause().getClass());
1881+
1882+
// Wait for timeout handler and span export
1883+
Thread.sleep(1000);
1884+
1885+
System.out.println("Sent timeout error trace to Jaeger");
1886+
1887+
// Print Jaeger URL again so user can view traces after test completes
1888+
System.out.println("========================================");
1889+
System.out.println("Test completed. View traces at: " + jaegerUrl);
1890+
System.out.println("========================================");
1891+
1892+
// Keep Jaeger running for a moment to allow viewing traces
1893+
Thread.sleep(3000000);
1894+
jaeger.stop();
1895+
openTelemetry.close();
1896+
testClient.close();
1897+
}
1898+
1899+
/** Helper method to get human-readable request type name. */
1900+
private String getRequestTypeName(int requestType) {
1901+
switch (requestType) {
1902+
case IPROTO_TYPE_SELECT:
1903+
return "SELECT";
1904+
case IPROTO_TYPE_INSERT:
1905+
return "INSERT";
1906+
case IPROTO_TYPE_REPLACE:
1907+
return "REPLACE";
1908+
case IPROTO_TYPE_UPDATE:
1909+
return "UPDATE";
1910+
case IPROTO_TYPE_DELETE:
1911+
return "DELETE";
1912+
case IPROTO_TYPE_AUTH:
1913+
return "AUTH";
1914+
case IPROTO_TYPE_EVAL:
1915+
return "EVAL";
1916+
case IPROTO_TYPE_UPSERT:
1917+
return "UPSERT";
1918+
case IPROTO_TYPE_CALL:
1919+
return "CALL";
1920+
case IPROTO_TYPE_PING:
1921+
return "PING";
1922+
default:
1923+
return "UNKNOWN(" + requestType + ")";
1924+
}
1925+
}
1926+
1927+
/** Helper method to extract function name from CALL request. */
1928+
private String extractFunctionName(IProtoRequest request) {
1929+
try {
1930+
byte[] packetBytes = request.getPacket(org.msgpack.core.MessagePack.newDefaultBufferPacker());
1931+
org.msgpack.core.MessageUnpacker unpacker =
1932+
org.msgpack.core.MessagePack.newDefaultUnpacker(packetBytes);
1933+
unpacker.unpackInt(); // Skip size prefix
1934+
unpacker.unpackValue(); // Skip header
1935+
org.msgpack.value.Value bodyValue = unpacker.unpackValue();
1936+
1937+
if (bodyValue.isMapValue()) {
1938+
org.msgpack.value.Value functionNameValue =
1939+
bodyValue.asMapValue().map().get(MP_IPROTO_FUNCTION_NAME);
1940+
if (functionNameValue != null && functionNameValue.isStringValue()) {
1941+
return functionNameValue.asStringValue().asString();
1942+
}
1943+
}
1944+
} catch (Exception e) {
1945+
// Ignore parsing errors
1946+
}
1947+
return "unknown";
1948+
}
1949+
1950+
/** Helper method to extract arguments from CALL request. */
1951+
private String extractArguments(IProtoRequest request) {
1952+
try {
1953+
byte[] packetBytes = request.getPacket(org.msgpack.core.MessagePack.newDefaultBufferPacker());
1954+
org.msgpack.core.MessageUnpacker unpacker =
1955+
org.msgpack.core.MessagePack.newDefaultUnpacker(packetBytes);
1956+
unpacker.unpackInt(); // Skip size prefix
1957+
unpacker.unpackValue(); // Skip header
1958+
org.msgpack.value.Value bodyValue = unpacker.unpackValue();
1959+
1960+
if (bodyValue.isMapValue()) {
1961+
org.msgpack.value.Value tupleValue = bodyValue.asMapValue().map().get(MP_IPROTO_TUPLE);
1962+
if (tupleValue != null) {
1963+
return tupleValue.toString();
1964+
}
1965+
}
1966+
} catch (Exception e) {
1967+
// Ignore parsing errors
1968+
}
1969+
return "[]";
1970+
}
1971+
1972+
/** Helper method to extract response data from IProtoResponse. */
1973+
private String extractResponseData(IProtoResponse response) {
1974+
try {
1975+
StringBuilder sb = new StringBuilder();
1976+
sb.append("{");
1977+
1978+
// Extract data from response body using Jackson mapping
1979+
java.util.Map<Integer, io.tarantool.core.protocol.ByteBodyValueWrapper> byteBodyValues =
1980+
response.getByteBodyValues();
1981+
1982+
boolean first = true;
1983+
for (java.util.Map.Entry<Integer, io.tarantool.core.protocol.ByteBodyValueWrapper> entry :
1984+
byteBodyValues.entrySet()) {
1985+
if (!first) {
1986+
sb.append(", ");
1987+
}
1988+
first = false;
1989+
1990+
String keyName = getResponseFieldName(entry.getKey());
1991+
Object value =
1992+
io.tarantool.mapping.BaseTarantoolJacksonMapping.readValue(
1993+
entry.getValue(), Object.class);
1994+
sb.append(keyName).append("=").append(value);
1995+
}
1996+
1997+
sb.append("}");
1998+
return sb.toString();
1999+
} catch (Exception e) {
2000+
return "{error=" + e.getMessage() + "}";
2001+
}
2002+
}
2003+
2004+
/** Helper method to get human-readable response field name. */
2005+
private String getResponseFieldName(int fieldCode) {
2006+
switch (fieldCode) {
2007+
case IPROTO_DATA:
2008+
return "data";
2009+
case IPROTO_ERROR:
2010+
return "error";
2011+
default:
2012+
return "field_" + fieldCode;
2013+
}
2014+
}
16842015
}

0 commit comments

Comments
 (0)