|
36 | 36 | import org.junit.jupiter.api.Assertions; |
37 | 37 | import org.junit.jupiter.api.BeforeAll; |
38 | 38 | import org.junit.jupiter.api.BeforeEach; |
| 39 | +import org.junit.jupiter.api.Disabled; |
39 | 40 | import org.junit.jupiter.api.Test; |
40 | 41 | import org.junit.jupiter.api.Timeout; |
41 | 42 | import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable; |
|
55 | 56 | import static io.tarantool.client.box.TarantoolBoxSpace.WITHOUT_ENABLED_FETCH_SCHEMA_OPTION_FOR_TARANTOOL_LESS_3_0_0; |
56 | 57 | import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_DATA; |
57 | 58 | import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_SYNC_ID; |
58 | | -import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_TYPE_CALL; |
| 59 | +import static io.tarantool.core.protocol.requests.IProtoConstant.*; |
59 | 60 | import static io.tarantool.mapping.BaseTarantoolJacksonMapping.objectMapper; |
60 | 61 | import io.tarantool.client.BaseOptions; |
61 | 62 | import io.tarantool.client.ClientType; |
62 | 63 | import io.tarantool.client.Options; |
63 | 64 | import io.tarantool.client.TarantoolSpace; |
64 | 65 | import io.tarantool.client.TarantoolVersion; |
65 | 66 | import io.tarantool.client.box.TarantoolBoxClient; |
| 67 | +import io.opentelemetry.api.common.AttributeKey; |
| 68 | +import io.opentelemetry.api.trace.Span; |
| 69 | +import io.opentelemetry.api.trace.StatusCode; |
| 70 | +import io.opentelemetry.api.trace.Tracer; |
| 71 | +import io.opentelemetry.sdk.OpenTelemetrySdk; |
| 72 | +import io.opentelemetry.sdk.trace.SdkTracerProvider; |
| 73 | +import io.opentelemetry.sdk.trace.data.SpanData; |
| 74 | +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; |
| 75 | +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; |
66 | 76 | import io.tarantool.client.box.TarantoolBoxSpace; |
67 | 77 | import io.tarantool.client.box.options.DeleteOptions; |
68 | 78 | import io.tarantool.client.box.options.SelectOptions; |
@@ -1681,4 +1691,304 @@ void testGetServerVersion() throws Exception { |
1681 | 1691 | TarantoolVersion version = client.getServerVersion().join(); |
1682 | 1692 | assertEquals(tarantoolMajorVersion, version.getMajor()); |
1683 | 1693 | } |
| 1694 | + |
| 1695 | + /** |
| 1696 | + * Demonstrates distributed tracing using OpenTelemetry API with Handlers. |
| 1697 | + * |
| 1698 | + * <p>This test shows how to integrate Tarantool client with OpenTelemetry to track request |
| 1699 | + * lifecycle: |
| 1700 | + * |
| 1701 | + * <ul> |
| 1702 | + * <li>onBeforeSend - creates a span and stores it by syncId |
| 1703 | + * <li>onSuccess - retrieves span, sets attributes and ends it |
| 1704 | + * <li>onTimeout - retrieves span, records exception and ends it |
| 1705 | + * <li>onIgnoredResponse - adds event for late response arrival |
| 1706 | + * </ul> |
| 1707 | + * |
| 1708 | + * <p>Jaeger is started via TestContainer. To view traces, open the URL printed in console |
| 1709 | + * (http://localhost:16686) after test starts running. |
| 1710 | + */ |
| 1711 | + @Test |
| 1712 | + @Timeout(5000) |
| 1713 | + @Disabled |
| 1714 | + public void testOpenTelemetryTracingWithJaeger() throws Exception { |
| 1715 | + // Start Jaeger via TestContainer |
| 1716 | + org.testcontainers.containers.GenericContainer<?> jaeger = |
| 1717 | + new org.testcontainers.containers.GenericContainer<>("jaegertracing/all-in-one:1.50") |
| 1718 | + .withExposedPorts(16686, 4317) |
| 1719 | + .withEnv("COLLECTOR_OTLP_ENABLED", "true"); |
| 1720 | + jaeger.start(); |
| 1721 | + |
| 1722 | + String jaegerUrl = "http://" + jaeger.getHost() + ":" + jaeger.getMappedPort(16686); |
| 1723 | + String otlpEndpoint = |
| 1724 | + "http://" + jaeger.getHost() + ":" + jaeger.getMappedPort(4317); |
| 1725 | + |
| 1726 | + System.out.println("========================================"); |
| 1727 | + System.out.println("Jaeger UI available at: " + jaegerUrl); |
| 1728 | + System.out.println("OTLP endpoint: " + otlpEndpoint); |
| 1729 | + System.out.println("========================================"); |
| 1730 | + |
| 1731 | + // Set up OTLP exporter to send traces to Jaeger |
| 1732 | + io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter spanExporter = |
| 1733 | + io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter.builder() |
| 1734 | + .setEndpoint(otlpEndpoint) |
| 1735 | + .build(); |
| 1736 | + |
| 1737 | + io.opentelemetry.sdk.resources.Resource resource = |
| 1738 | + io.opentelemetry.sdk.resources.Resource.builder() |
| 1739 | + .put(io.opentelemetry.api.common.AttributeKey.stringKey("service.name"), "tarantool-java-sdk") |
| 1740 | + .put(io.opentelemetry.api.common.AttributeKey.stringKey("service.version"), "2.0.0") |
| 1741 | + .build(); |
| 1742 | + |
| 1743 | + SdkTracerProvider tracerProvider = |
| 1744 | + SdkTracerProvider.builder() |
| 1745 | + .setResource(resource) |
| 1746 | + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) |
| 1747 | + .build(); |
| 1748 | + |
| 1749 | + OpenTelemetrySdk openTelemetry = |
| 1750 | + OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build(); |
| 1751 | + |
| 1752 | + Tracer tracer = openTelemetry.getTracer("tarantool-test"); |
| 1753 | + |
| 1754 | + // Maps to store active spans by syncId |
| 1755 | + java.util.Map<Long, Span> requestSpans = new java.util.concurrent.ConcurrentHashMap<>(); |
| 1756 | + java.util.Map<Long, Span> timeoutSpans = new java.util.concurrent.ConcurrentHashMap<>(); |
| 1757 | + |
| 1758 | + TarantoolBoxClient testClient = |
| 1759 | + TarantoolFactory.box() |
| 1760 | + .withUser(API_USER) |
| 1761 | + .withPassword(CREDS.get(API_USER)) |
| 1762 | + .withHost(tt.getHost()) |
| 1763 | + .withPort(tt.getFirstMappedPort()) |
| 1764 | + .withHandlers( |
| 1765 | + Handlers.builder() |
| 1766 | + .onBeforeSend( |
| 1767 | + request -> { |
| 1768 | + // Extract function name and arguments from request body |
| 1769 | + String functionName = extractFunctionName(request); |
| 1770 | + String args = extractArguments(request); |
| 1771 | + |
| 1772 | + String requestTypeName = getRequestTypeName(request.getRequestType()); |
| 1773 | + |
| 1774 | + // 1. Main request span - lives until final result (success or late response) |
| 1775 | + Span requestSpan = |
| 1776 | + tracer |
| 1777 | + .spanBuilder("tarantool." + requestTypeName + ":" + request.getSyncId()) |
| 1778 | + .setAttribute("request.type", request.getRequestType()) |
| 1779 | + .setAttribute("request.type.name", requestTypeName) |
| 1780 | + .setAttribute("sync.id", request.getSyncId()) |
| 1781 | + .setAttribute("tarantool.function", functionName) |
| 1782 | + .setAttribute("tarantool.arguments", args) |
| 1783 | + .startSpan(); |
| 1784 | + requestSpans.put(request.getSyncId(), requestSpan); |
| 1785 | + |
| 1786 | + // 2. Timeout span - created now, ended on timeout, removed on success |
| 1787 | + Span timeoutSpan = |
| 1788 | + tracer |
| 1789 | + .spanBuilder("tarantool." + requestTypeName + ".timeout:" + request.getSyncId()) |
| 1790 | + .setParent(io.opentelemetry.context.Context.current().with(requestSpan)) |
| 1791 | + .setAttribute("sync.id", request.getSyncId()) |
| 1792 | + .startSpan(); |
| 1793 | + timeoutSpans.put(request.getSyncId(), timeoutSpan); |
| 1794 | + }) |
| 1795 | + .onSuccess( |
| 1796 | + response -> { |
| 1797 | + // Remove timeout span (success happened, no timeout) |
| 1798 | + Span timeoutSpan = timeoutSpans.remove(response.getSyncId()); |
| 1799 | + if (timeoutSpan != null) { |
| 1800 | + // Don't end it, just discard - timeout didn't happen |
| 1801 | + } |
| 1802 | + |
| 1803 | + // End main request span |
| 1804 | + Span requestSpan = requestSpans.remove(response.getSyncId()); |
| 1805 | + if (requestSpan != null) { |
| 1806 | + requestSpan.setAttribute("response.sync_id", response.getSyncId()); |
| 1807 | + requestSpan.setStatus(StatusCode.OK); |
| 1808 | + |
| 1809 | + // Parse and log response data |
| 1810 | + String responseData = extractResponseData(response); |
| 1811 | + requestSpan.setAttribute("response.data", responseData); |
| 1812 | + |
| 1813 | + requestSpan.end(); |
| 1814 | + } |
| 1815 | + }) |
| 1816 | + .onTimeout( |
| 1817 | + request -> { |
| 1818 | + // End timeout span |
| 1819 | + Span timeoutSpan = timeoutSpans.remove(request.getSyncId()); |
| 1820 | + if (timeoutSpan != null) { |
| 1821 | + timeoutSpan.addEvent("Request timed out"); |
| 1822 | + timeoutSpan.setStatus(StatusCode.ERROR, "Request timed out"); |
| 1823 | + timeoutSpan.setAttribute("error.type", "timeout"); |
| 1824 | + timeoutSpan.recordException(new TimeoutException("Request timed out")); |
| 1825 | + timeoutSpan.end(); |
| 1826 | + } |
| 1827 | + }) |
| 1828 | + .onIgnoredResponse( |
| 1829 | + response -> { |
| 1830 | + // End main request span (late response arrived) |
| 1831 | + Span requestSpan = requestSpans.remove(response.getSyncId()); |
| 1832 | + if (requestSpan != null) { |
| 1833 | + requestSpan.addEvent("Late response arrived"); |
| 1834 | + requestSpan.setAttribute("response.sync_id", response.getSyncId()); |
| 1835 | + requestSpan.setAttribute("late.response", true); |
| 1836 | + |
| 1837 | + // Parse and log response data |
| 1838 | + String responseData = extractResponseData(response); |
| 1839 | + requestSpan.setAttribute("response.data", responseData); |
| 1840 | + |
| 1841 | + requestSpan.end(); |
| 1842 | + } |
| 1843 | + }) |
| 1844 | + .build()) |
| 1845 | + .build(); |
| 1846 | + |
| 1847 | + // Test successful request |
| 1848 | + List<?> result = testClient.call("echo", Arrays.asList(42, "test")).join().get(); |
| 1849 | + assertEquals(Arrays.asList(42, "test"), result); |
| 1850 | + |
| 1851 | + // Wait for async callbacks and span export |
| 1852 | + Thread.sleep(1000); |
| 1853 | + |
| 1854 | + System.out.println("Sent successful request trace to Jaeger"); |
| 1855 | + |
| 1856 | + // Test timeout scenario |
| 1857 | + Options timeoutOptions = BaseOptions.builder().withTimeout(100L).build(); |
| 1858 | + |
| 1859 | + Exception ex = |
| 1860 | + assertThrows( |
| 1861 | + CompletionException.class, |
| 1862 | + () -> testClient.call("slow_echo", Arrays.asList(1, true), timeoutOptions).join()); |
| 1863 | + assertEquals(TimeoutException.class, ex.getCause().getClass()); |
| 1864 | + |
| 1865 | + // Wait for timeout handler and span export |
| 1866 | + Thread.sleep(1000); |
| 1867 | + |
| 1868 | + System.out.println("Sent timeout error trace to Jaeger"); |
| 1869 | + |
| 1870 | + // Print Jaeger URL again so user can view traces after test completes |
| 1871 | + System.out.println("========================================"); |
| 1872 | + System.out.println("Test completed. View traces at: " + jaegerUrl); |
| 1873 | + System.out.println("========================================"); |
| 1874 | + |
| 1875 | + // Keep Jaeger running for a moment to allow viewing traces |
| 1876 | + Thread.sleep(3000000); |
| 1877 | + jaeger.stop(); |
| 1878 | + openTelemetry.close(); |
| 1879 | + testClient.close(); |
| 1880 | + } |
| 1881 | + |
| 1882 | + /** Helper method to get human-readable request type name. */ |
| 1883 | + private String getRequestTypeName(int requestType) { |
| 1884 | + switch (requestType) { |
| 1885 | + case IPROTO_TYPE_SELECT: |
| 1886 | + return "SELECT"; |
| 1887 | + case IPROTO_TYPE_INSERT: |
| 1888 | + return "INSERT"; |
| 1889 | + case IPROTO_TYPE_REPLACE: |
| 1890 | + return "REPLACE"; |
| 1891 | + case IPROTO_TYPE_UPDATE: |
| 1892 | + return "UPDATE"; |
| 1893 | + case IPROTO_TYPE_DELETE: |
| 1894 | + return "DELETE"; |
| 1895 | + case IPROTO_TYPE_AUTH: |
| 1896 | + return "AUTH"; |
| 1897 | + case IPROTO_TYPE_EVAL: |
| 1898 | + return "EVAL"; |
| 1899 | + case IPROTO_TYPE_UPSERT: |
| 1900 | + return "UPSERT"; |
| 1901 | + case IPROTO_TYPE_CALL: |
| 1902 | + return "CALL"; |
| 1903 | + case IPROTO_TYPE_PING: |
| 1904 | + return "PING"; |
| 1905 | + default: |
| 1906 | + return "UNKNOWN(" + requestType + ")"; |
| 1907 | + } |
| 1908 | + } |
| 1909 | + |
| 1910 | + /** Helper method to extract function name from CALL request. */ |
| 1911 | + private String extractFunctionName(IProtoRequest request) { |
| 1912 | + try { |
| 1913 | + byte[] packetBytes = request.getPacket(org.msgpack.core.MessagePack.newDefaultBufferPacker()); |
| 1914 | + org.msgpack.core.MessageUnpacker unpacker = org.msgpack.core.MessagePack.newDefaultUnpacker(packetBytes); |
| 1915 | + unpacker.unpackInt(); // Skip size prefix |
| 1916 | + unpacker.unpackValue(); // Skip header |
| 1917 | + org.msgpack.value.Value bodyValue = unpacker.unpackValue(); |
| 1918 | + |
| 1919 | + if (bodyValue.isMapValue()) { |
| 1920 | + org.msgpack.value.Value functionNameValue = bodyValue.asMapValue().map().get(MP_IPROTO_FUNCTION_NAME); |
| 1921 | + if (functionNameValue != null && functionNameValue.isStringValue()) { |
| 1922 | + return functionNameValue.asStringValue().asString(); |
| 1923 | + } |
| 1924 | + } |
| 1925 | + } catch (Exception e) { |
| 1926 | + // Ignore parsing errors |
| 1927 | + } |
| 1928 | + return "unknown"; |
| 1929 | + } |
| 1930 | + |
| 1931 | + /** Helper method to extract arguments from CALL request. */ |
| 1932 | + private String extractArguments(IProtoRequest request) { |
| 1933 | + try { |
| 1934 | + byte[] packetBytes = request.getPacket(org.msgpack.core.MessagePack.newDefaultBufferPacker()); |
| 1935 | + org.msgpack.core.MessageUnpacker unpacker = org.msgpack.core.MessagePack.newDefaultUnpacker(packetBytes); |
| 1936 | + unpacker.unpackInt(); // Skip size prefix |
| 1937 | + unpacker.unpackValue(); // Skip header |
| 1938 | + org.msgpack.value.Value bodyValue = unpacker.unpackValue(); |
| 1939 | + |
| 1940 | + if (bodyValue.isMapValue()) { |
| 1941 | + org.msgpack.value.Value tupleValue = bodyValue.asMapValue().map().get(MP_IPROTO_TUPLE); |
| 1942 | + if (tupleValue != null) { |
| 1943 | + return tupleValue.toString(); |
| 1944 | + } |
| 1945 | + } |
| 1946 | + } catch (Exception e) { |
| 1947 | + // Ignore parsing errors |
| 1948 | + } |
| 1949 | + return "[]"; |
| 1950 | + } |
| 1951 | + |
| 1952 | + /** Helper method to extract response data from IProtoResponse. */ |
| 1953 | + private String extractResponseData(IProtoResponse response) { |
| 1954 | + try { |
| 1955 | + StringBuilder sb = new StringBuilder(); |
| 1956 | + sb.append("{"); |
| 1957 | + |
| 1958 | + // Extract data from response body using Jackson mapping |
| 1959 | + java.util.Map<Integer, io.tarantool.core.protocol.ByteBodyValueWrapper> byteBodyValues = |
| 1960 | + response.getByteBodyValues(); |
| 1961 | + |
| 1962 | + boolean first = true; |
| 1963 | + for (java.util.Map.Entry<Integer, io.tarantool.core.protocol.ByteBodyValueWrapper> entry : |
| 1964 | + byteBodyValues.entrySet()) { |
| 1965 | + if (!first) { |
| 1966 | + sb.append(", "); |
| 1967 | + } |
| 1968 | + first = false; |
| 1969 | + |
| 1970 | + String keyName = getResponseFieldName(entry.getKey()); |
| 1971 | + Object value = |
| 1972 | + io.tarantool.mapping.BaseTarantoolJacksonMapping.readValue(entry.getValue(), Object.class); |
| 1973 | + sb.append(keyName).append("=").append(value); |
| 1974 | + } |
| 1975 | + |
| 1976 | + sb.append("}"); |
| 1977 | + return sb.toString(); |
| 1978 | + } catch (Exception e) { |
| 1979 | + return "{error=" + e.getMessage() + "}"; |
| 1980 | + } |
| 1981 | + } |
| 1982 | + |
| 1983 | + /** Helper method to get human-readable response field name. */ |
| 1984 | + private String getResponseFieldName(int fieldCode) { |
| 1985 | + switch (fieldCode) { |
| 1986 | + case IPROTO_DATA: |
| 1987 | + return "data"; |
| 1988 | + case IPROTO_ERROR: |
| 1989 | + return "error"; |
| 1990 | + default: |
| 1991 | + return "field_" + fieldCode; |
| 1992 | + } |
| 1993 | + } |
1684 | 1994 | } |
0 commit comments