Skip to content

Commit 5cc33ff

Browse files
ArtDuclaude
andcommitted
test(tracing): add OpenTelemetry tracing example with Jaeger
Add testOpenTelemetryTracingWithJaeger() demonstrating distributed tracing using OpenTelemetry API with Handlers: - Create spans for request lifecycle (onBeforeSend, onSuccess, onTimeout) - Parse request/response data using Jackson mapping - Export traces to Jaeger via TestContainer - Show flamegraph with timeout and late response handling The test is @disabled by default as it requires Docker and keeps Jaeger running for manual inspection. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6a39f33 commit 5cc33ff

File tree

3 files changed

+377
-1
lines changed

3 files changed

+377
-1
lines changed

tarantool-client/pom.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,31 @@
6666
<version>2.20.0</version>
6767
<scope>test</scope>
6868
</dependency>
69+
<dependency>
70+
<groupId>io.opentelemetry</groupId>
71+
<artifactId>opentelemetry-api</artifactId>
72+
<scope>test</scope>
73+
</dependency>
74+
<dependency>
75+
<groupId>io.opentelemetry</groupId>
76+
<artifactId>opentelemetry-sdk</artifactId>
77+
<scope>test</scope>
78+
</dependency>
79+
<dependency>
80+
<groupId>io.opentelemetry</groupId>
81+
<artifactId>opentelemetry-sdk-testing</artifactId>
82+
<scope>test</scope>
83+
</dependency>
84+
<dependency>
85+
<groupId>io.opentelemetry</groupId>
86+
<artifactId>opentelemetry-exporter-otlp</artifactId>
87+
<scope>test</scope>
88+
</dependency>
89+
<dependency>
90+
<groupId>io.opentelemetry</groupId>
91+
<artifactId>opentelemetry-exporter-logging</artifactId>
92+
<scope>test</scope>
93+
</dependency>
6994
<!-- End test -->
7095
</dependencies>
7196
</project>

tarantool-client/src/test/java/io/tarantool/client/integration/TarantoolBoxClientTest.java

Lines changed: 319 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,17 @@
3232
import static org.junit.jupiter.api.Assertions.assertThrows;
3333
import static org.junit.jupiter.api.Assertions.assertTrue;
3434
import com.fasterxml.jackson.core.type.TypeReference;
35+
import io.opentelemetry.api.trace.Span;
36+
import io.opentelemetry.api.trace.StatusCode;
37+
import io.opentelemetry.api.trace.Tracer;
38+
import io.opentelemetry.sdk.OpenTelemetrySdk;
39+
import io.opentelemetry.sdk.trace.SdkTracerProvider;
40+
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
3541
import org.junit.jupiter.api.AfterAll;
3642
import org.junit.jupiter.api.Assertions;
3743
import org.junit.jupiter.api.BeforeAll;
3844
import org.junit.jupiter.api.BeforeEach;
45+
import org.junit.jupiter.api.Disabled;
3946
import org.junit.jupiter.api.Test;
4047
import org.junit.jupiter.api.Timeout;
4148
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
@@ -55,7 +62,6 @@
5562
import static io.tarantool.client.box.TarantoolBoxSpace.WITHOUT_ENABLED_FETCH_SCHEMA_OPTION_FOR_TARANTOOL_LESS_3_0_0;
5663
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_DATA;
5764
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_SYNC_ID;
58-
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_CALL;
5965
import static io.tarantool.mapping.BaseTarantoolJacksonMapping.objectMapper;
6066
import io.tarantool.client.BaseOptions;
6167
import io.tarantool.client.ClientType;
@@ -1681,4 +1687,316 @@ void testGetServerVersion() throws Exception {
16811687
TarantoolVersion version = client.getServerVersion().join();
16821688
assertEquals(tarantoolMajorVersion, version.getMajor());
16831689
}
1690+
1691+
/**
1692+
* Demonstrates distributed tracing using OpenTelemetry API with Handlers.
1693+
*
1694+
* <p>This test shows how to integrate Tarantool client with OpenTelemetry to track request
1695+
* lifecycle:
1696+
*
1697+
* <ul>
1698+
* <li>onBeforeSend - creates a span and stores it by syncId
1699+
* <li>onSuccess - retrieves span, sets attributes and ends it
1700+
* <li>onTimeout - retrieves span, records exception and ends it
1701+
* <li>onIgnoredResponse - adds event for late response arrival
1702+
* </ul>
1703+
*
1704+
* <p>Jaeger is started via TestContainer. To view traces, open the URL printed in console
1705+
* (http://localhost:16686) after test starts running.
1706+
*/
1707+
@Test
1708+
@Timeout(5000)
1709+
@Disabled
1710+
public void testOpenTelemetryTracingWithJaeger() throws Exception {
1711+
// Start Jaeger via TestContainer
1712+
org.testcontainers.containers.GenericContainer<?> jaeger =
1713+
new org.testcontainers.containers.GenericContainer<>("jaegertracing/all-in-one:1.50")
1714+
.withExposedPorts(16686, 4317)
1715+
.withEnv("COLLECTOR_OTLP_ENABLED", "true");
1716+
jaeger.start();
1717+
1718+
String jaegerUrl = "http://" + jaeger.getHost() + ":" + jaeger.getMappedPort(16686);
1719+
String otlpEndpoint = "http://" + jaeger.getHost() + ":" + jaeger.getMappedPort(4317);
1720+
1721+
System.out.println("========================================");
1722+
System.out.println("Jaeger UI available at: " + jaegerUrl);
1723+
System.out.println("OTLP endpoint: " + otlpEndpoint);
1724+
System.out.println("========================================");
1725+
1726+
// Set up OTLP exporter to send traces to Jaeger
1727+
io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter spanExporter =
1728+
io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter.builder()
1729+
.setEndpoint(otlpEndpoint)
1730+
.build();
1731+
1732+
io.opentelemetry.sdk.resources.Resource resource =
1733+
io.opentelemetry.sdk.resources.Resource.builder()
1734+
.put(
1735+
io.opentelemetry.api.common.AttributeKey.stringKey("service.name"),
1736+
"tarantool-java-sdk")
1737+
.put(io.opentelemetry.api.common.AttributeKey.stringKey("service.version"), "2.0.0")
1738+
.build();
1739+
1740+
SdkTracerProvider tracerProvider =
1741+
SdkTracerProvider.builder()
1742+
.setResource(resource)
1743+
.addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
1744+
.build();
1745+
1746+
OpenTelemetrySdk openTelemetry =
1747+
OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build();
1748+
1749+
Tracer tracer = openTelemetry.getTracer("tarantool-test");
1750+
1751+
// Maps to store active spans by syncId
1752+
java.util.Map<Long, Span> requestSpans = new java.util.concurrent.ConcurrentHashMap<>();
1753+
java.util.Map<Long, Span> timeoutSpans = new java.util.concurrent.ConcurrentHashMap<>();
1754+
1755+
TarantoolBoxClient testClient =
1756+
TarantoolFactory.box()
1757+
.withUser(API_USER)
1758+
.withPassword(CREDS.get(API_USER))
1759+
.withHost(tt.getHost())
1760+
.withPort(tt.getFirstMappedPort())
1761+
.withHandlers(
1762+
Handlers.builder()
1763+
.onBeforeSend(
1764+
request -> {
1765+
// Extract function name and arguments from request body
1766+
String functionName = extractFunctionName(request);
1767+
String args = extractArguments(request);
1768+
1769+
String requestTypeName = getRequestTypeName(request.getRequestType());
1770+
1771+
// 1. Main request span - lives until final result (success or late
1772+
// response)
1773+
Span requestSpan =
1774+
tracer
1775+
.spanBuilder(
1776+
"tarantool." + requestTypeName + ":" + request.getSyncId())
1777+
.setAttribute("request.type", request.getRequestType())
1778+
.setAttribute("request.type.name", requestTypeName)
1779+
.setAttribute("sync.id", request.getSyncId())
1780+
.setAttribute("tarantool.function", functionName)
1781+
.setAttribute("tarantool.arguments", args)
1782+
.startSpan();
1783+
requestSpans.put(request.getSyncId(), requestSpan);
1784+
1785+
// 2. Timeout span - created now, ended on timeout, removed on success
1786+
Span timeoutSpan =
1787+
tracer
1788+
.spanBuilder(
1789+
"tarantool."
1790+
+ requestTypeName
1791+
+ ".timeout:"
1792+
+ request.getSyncId())
1793+
.setParent(
1794+
io.opentelemetry.context.Context.current().with(requestSpan))
1795+
.setAttribute("sync.id", request.getSyncId())
1796+
.startSpan();
1797+
timeoutSpans.put(request.getSyncId(), timeoutSpan);
1798+
})
1799+
.onSuccess(
1800+
response -> {
1801+
// Remove timeout span (success happened, no timeout)
1802+
Span timeoutSpan = timeoutSpans.remove(response.getSyncId());
1803+
if (timeoutSpan != null) {
1804+
// Don't end it, just discard - timeout didn't happen
1805+
}
1806+
1807+
// End main request span
1808+
Span requestSpan = requestSpans.remove(response.getSyncId());
1809+
if (requestSpan != null) {
1810+
requestSpan.setAttribute("response.sync_id", response.getSyncId());
1811+
requestSpan.setStatus(StatusCode.OK);
1812+
1813+
// Parse and log response data
1814+
String responseData = extractResponseData(response);
1815+
requestSpan.setAttribute("response.data", responseData);
1816+
1817+
requestSpan.end();
1818+
}
1819+
})
1820+
.onTimeout(
1821+
request -> {
1822+
// End timeout span
1823+
Span timeoutSpan = timeoutSpans.remove(request.getSyncId());
1824+
if (timeoutSpan != null) {
1825+
timeoutSpan.addEvent("Request timed out");
1826+
timeoutSpan.setStatus(StatusCode.ERROR, "Request timed out");
1827+
timeoutSpan.setAttribute("error.type", "timeout");
1828+
timeoutSpan.recordException(new TimeoutException("Request timed out"));
1829+
timeoutSpan.end();
1830+
}
1831+
})
1832+
.onIgnoredResponse(
1833+
response -> {
1834+
// End main request span (late response arrived)
1835+
Span requestSpan = requestSpans.remove(response.getSyncId());
1836+
if (requestSpan != null) {
1837+
requestSpan.addEvent("Late response arrived");
1838+
requestSpan.setAttribute("response.sync_id", response.getSyncId());
1839+
requestSpan.setAttribute("late.response", true);
1840+
1841+
// Parse and log response data
1842+
String responseData = extractResponseData(response);
1843+
requestSpan.setAttribute("response.data", responseData);
1844+
1845+
requestSpan.end();
1846+
}
1847+
})
1848+
.build())
1849+
.build();
1850+
1851+
// Test successful request
1852+
List<?> result = testClient.call("echo", Arrays.asList(42, "test")).join().get();
1853+
assertEquals(Arrays.asList(42, "test"), result);
1854+
1855+
// Wait for async callbacks and span export
1856+
Thread.sleep(1000);
1857+
1858+
System.out.println("Sent successful request trace to Jaeger");
1859+
1860+
// Test timeout scenario
1861+
Options timeoutOptions = BaseOptions.builder().withTimeout(100L).build();
1862+
1863+
Exception ex =
1864+
assertThrows(
1865+
CompletionException.class,
1866+
() -> testClient.call("slow_echo", Arrays.asList(1, true), timeoutOptions).join());
1867+
assertEquals(TimeoutException.class, ex.getCause().getClass());
1868+
1869+
// Wait for timeout handler and span export
1870+
Thread.sleep(1000);
1871+
1872+
System.out.println("Sent timeout error trace to Jaeger");
1873+
1874+
// Print Jaeger URL again so user can view traces after test completes
1875+
System.out.println("========================================");
1876+
System.out.println("Test completed. View traces at: " + jaegerUrl);
1877+
System.out.println("========================================");
1878+
1879+
// Keep Jaeger running for a moment to allow viewing traces
1880+
Thread.sleep(3000000);
1881+
jaeger.stop();
1882+
openTelemetry.close();
1883+
testClient.close();
1884+
}
1885+
1886+
/** Helper method to get human-readable request type name. */
1887+
private String getRequestTypeName(int requestType) {
1888+
switch (requestType) {
1889+
case IPROTO_TYPE_SELECT:
1890+
return "SELECT";
1891+
case IPROTO_TYPE_INSERT:
1892+
return "INSERT";
1893+
case IPROTO_TYPE_REPLACE:
1894+
return "REPLACE";
1895+
case IPROTO_TYPE_UPDATE:
1896+
return "UPDATE";
1897+
case IPROTO_TYPE_DELETE:
1898+
return "DELETE";
1899+
case IPROTO_TYPE_AUTH:
1900+
return "AUTH";
1901+
case IPROTO_TYPE_EVAL:
1902+
return "EVAL";
1903+
case IPROTO_TYPE_UPSERT:
1904+
return "UPSERT";
1905+
case IPROTO_TYPE_CALL:
1906+
return "CALL";
1907+
case IPROTO_TYPE_PING:
1908+
return "PING";
1909+
default:
1910+
return "UNKNOWN(" + requestType + ")";
1911+
}
1912+
}
1913+
1914+
/** Helper method to extract function name from CALL request. */
1915+
private String extractFunctionName(IProtoRequest request) {
1916+
try {
1917+
byte[] packetBytes = request.getPacket(org.msgpack.core.MessagePack.newDefaultBufferPacker());
1918+
org.msgpack.core.MessageUnpacker unpacker =
1919+
org.msgpack.core.MessagePack.newDefaultUnpacker(packetBytes);
1920+
unpacker.unpackInt(); // Skip size prefix
1921+
unpacker.unpackValue(); // Skip header
1922+
org.msgpack.value.Value bodyValue = unpacker.unpackValue();
1923+
1924+
if (bodyValue.isMapValue()) {
1925+
org.msgpack.value.Value functionNameValue =
1926+
bodyValue.asMapValue().map().get(MP_IPROTO_FUNCTION_NAME);
1927+
if (functionNameValue != null && functionNameValue.isStringValue()) {
1928+
return functionNameValue.asStringValue().asString();
1929+
}
1930+
}
1931+
} catch (Exception e) {
1932+
// Ignore parsing errors
1933+
}
1934+
return "unknown";
1935+
}
1936+
1937+
/** Helper method to extract arguments from CALL request. */
1938+
private String extractArguments(IProtoRequest request) {
1939+
try {
1940+
byte[] packetBytes = request.getPacket(org.msgpack.core.MessagePack.newDefaultBufferPacker());
1941+
org.msgpack.core.MessageUnpacker unpacker =
1942+
org.msgpack.core.MessagePack.newDefaultUnpacker(packetBytes);
1943+
unpacker.unpackInt(); // Skip size prefix
1944+
unpacker.unpackValue(); // Skip header
1945+
org.msgpack.value.Value bodyValue = unpacker.unpackValue();
1946+
1947+
if (bodyValue.isMapValue()) {
1948+
org.msgpack.value.Value tupleValue = bodyValue.asMapValue().map().get(MP_IPROTO_TUPLE);
1949+
if (tupleValue != null) {
1950+
return tupleValue.toString();
1951+
}
1952+
}
1953+
} catch (Exception e) {
1954+
// Ignore parsing errors
1955+
}
1956+
return "[]";
1957+
}
1958+
1959+
/** Helper method to extract response data from IProtoResponse. */
1960+
private String extractResponseData(IProtoResponse response) {
1961+
try {
1962+
StringBuilder sb = new StringBuilder();
1963+
sb.append("{");
1964+
1965+
// Extract data from response body using Jackson mapping
1966+
java.util.Map<Integer, io.tarantool.core.protocol.ByteBodyValueWrapper> byteBodyValues =
1967+
response.getByteBodyValues();
1968+
1969+
boolean first = true;
1970+
for (java.util.Map.Entry<Integer, io.tarantool.core.protocol.ByteBodyValueWrapper> entry :
1971+
byteBodyValues.entrySet()) {
1972+
if (!first) {
1973+
sb.append(", ");
1974+
}
1975+
first = false;
1976+
1977+
String keyName = getResponseFieldName(entry.getKey());
1978+
Object value =
1979+
io.tarantool.mapping.BaseTarantoolJacksonMapping.readValue(
1980+
entry.getValue(), Object.class);
1981+
sb.append(keyName).append("=").append(value);
1982+
}
1983+
1984+
sb.append("}");
1985+
return sb.toString();
1986+
} catch (Exception e) {
1987+
return "{error=" + e.getMessage() + "}";
1988+
}
1989+
}
1990+
1991+
/** Helper method to get human-readable response field name. */
1992+
private String getResponseFieldName(int fieldCode) {
1993+
switch (fieldCode) {
1994+
case IPROTO_DATA:
1995+
return "data";
1996+
case IPROTO_ERROR:
1997+
return "error";
1998+
default:
1999+
return "field_" + fieldCode;
2000+
}
2001+
}
16842002
}

0 commit comments

Comments
 (0)