Skip to content

Commit f569625

Browse files
committed
Add wire-level conformance tests for native trace transport
Verify that span data survives the full native transport path: Ruby Span -> C extension -> Rust serialization -> msgpack -> HTTP -> mock agent A `CapturingMockAgent` runs in a forked process with threaded request handling, captures trace payloads via a pipe, and the test deserializes the msgpack to verify field values. Tests cover: - Scalar fields (name, service, resource, type, span_id, parent_id, error) - String tags (meta) round-trip - Numeric metrics round-trip - 64-bit and 128-bit trace IDs - Multiple spans in one trace chunk - Multiple trace chunks in one payload
1 parent ce9096c commit f569625

1 file changed

Lines changed: 285 additions & 0 deletions

File tree

Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
# frozen_string_literal: true
2+
3+
require 'datadog/tracing/transport/native'
4+
require 'datadog/tracing/span'
5+
require 'datadog/tracing/trace_segment'
6+
require 'datadog/tracing/transport/trace_formatter'
7+
require 'socket'
8+
require 'msgpack'
9+
10+
# Verifies that span data put into traces arrives on the wire (at the
11+
# mock agent) with the correct field values after going through the
12+
# full native transport path:
13+
#
14+
# Ruby Span -> C extension -> Rust serialization -> msgpack -> HTTP -> mock agent
15+
#
16+
RSpec.describe 'Native transport wire-level conformance' do
17+
before do
18+
skip_if_libdatadog_not_supported
19+
end
20+
21+
# ---------------------------------------------------------------------------
22+
# Capturing mock agent: runs in a fork, writes captured request bodies
23+
# to a pipe so the parent can read and deserialize them.
24+
# ---------------------------------------------------------------------------
25+
26+
class CapturingMockAgent
27+
attr_reader :port
28+
29+
def initialize
30+
@read_io, @write_io = IO.pipe
31+
server = TCPServer.new('127.0.0.1', 0)
32+
@port = server.addr[1]
33+
34+
@pid = fork do
35+
@read_io.close
36+
37+
body = '{"rate_by_service":{"service:,env:":1.0}}'
38+
http_response = "HTTP/1.1 200 OK\r\nContent-Length: #{body.bytesize}\r\n" \
39+
"Content-Type: application/json\r\n\r\n#{body}"
40+
pipe_mutex = Mutex.new
41+
42+
loop do
43+
client = server.accept rescue break
44+
Thread.new(client) do |c|
45+
begin
46+
request_line = c.gets
47+
next c.close if request_line.nil?
48+
49+
# Read headers
50+
content_length = 0
51+
path = request_line.split(' ')[1]
52+
while (line = c.gets) && line != "\r\n"
53+
content_length = line.split(': ', 2).last.to_i if line.downcase.start_with?('content-length')
54+
end
55+
56+
# Read body
57+
request_body = content_length > 0 ? c.read(content_length) : ''
58+
c.print http_response
59+
60+
# Write captured trace payloads (skip /info requests)
61+
if path&.include?('/traces') && !request_body.empty?
62+
payload = Marshal.dump(request_body)
63+
pipe_mutex.synchronize do
64+
@write_io.write([payload.bytesize].pack('N'))
65+
@write_io.write(payload)
66+
@write_io.flush
67+
end
68+
end
69+
rescue # rubocop:disable Lint/SuppressedException
70+
ensure
71+
c.close rescue nil
72+
end
73+
end
74+
end
75+
end
76+
77+
server.close
78+
@write_io.close
79+
end
80+
81+
# Read one captured trace payload (blocking, with timeout).
82+
# Returns the raw msgpack bytes.
83+
def read_payload(timeout: 5)
84+
ready = IO.select([@read_io], nil, nil, timeout)
85+
raise 'Timeout waiting for agent to receive a trace payload' unless ready
86+
87+
len_bytes = @read_io.read(4)
88+
raise 'Agent pipe closed' if len_bytes.nil? || len_bytes.bytesize < 4
89+
90+
len = len_bytes.unpack1('N')
91+
Marshal.load(@read_io.read(len)) # rubocop:disable Security/MarshalLoad
92+
end
93+
94+
def stop
95+
Process.kill('TERM', @pid) rescue nil
96+
Process.wait(@pid) rescue nil
97+
@read_io.close rescue nil
98+
end
99+
end
100+
101+
# ---------------------------------------------------------------------------
102+
# Helpers
103+
# ---------------------------------------------------------------------------
104+
105+
# A single mock agent and transport are shared across all examples.
106+
#
107+
# The Rust TraceExporter spawns background workers (e.g. /info fetcher)
108+
# that keep connections alive. Creating one per test leaves orphaned
109+
# workers that interfere with subsequent tests. Using before/after(:all)
110+
# keeps everything scoped to this describe block.
111+
before(:all) do
112+
@mock_agent = CapturingMockAgent.new
113+
agent_settings = Struct.new(:url).new("http://127.0.0.1:#{@mock_agent.port}")
114+
@transport = Datadog::Tracing::Transport::Native::Transport.new(
115+
agent_settings: agent_settings,
116+
logger: Logger.new('/dev/null')
117+
)
118+
end
119+
120+
after(:all) do
121+
# Release the transport reference and force GC so that
122+
# ddog_trace_exporter_free runs, shutting down the Rust
123+
# TraceExporter and its background workers (e.g. /info fetcher)
124+
# before we kill the mock agent process they connect to.
125+
@transport = nil
126+
GC.start
127+
@mock_agent&.stop
128+
end
129+
130+
let(:mock_agent) { @mock_agent }
131+
let(:native_module) { Datadog::Tracing::Transport::Native }
132+
let(:transport) { @transport }
133+
134+
def make_trace(spans_attrs)
135+
trace_id = rand(1 << 62)
136+
spans = spans_attrs.map do |attrs|
137+
Datadog::Tracing::Span.new(
138+
attrs[:name],
139+
service: attrs[:service] || 'conformance-svc',
140+
resource: attrs[:resource] || attrs[:name],
141+
type: attrs[:type],
142+
id: attrs[:id] || rand(1 << 62),
143+
parent_id: attrs[:parent_id] || 0,
144+
trace_id: attrs[:trace_id] || trace_id,
145+
status: attrs[:error] || 0,
146+
).tap do |span|
147+
(attrs[:meta] || {}).each { |k, v| span.set_tag(k, v) }
148+
(attrs[:metrics] || {}).each { |k, v| span.set_metric(k, v) }
149+
end
150+
end
151+
Datadog::Tracing::TraceSegment.new(spans, id: trace_id, root_span_id: spans.first.id)
152+
end
153+
154+
def send_and_decode(traces)
155+
responses = transport.send_traces(traces)
156+
expect(responses.first.ok?).to be(true), "send failed: #{responses.first.inspect}"
157+
158+
raw = mock_agent.read_payload
159+
MessagePack.unpack(raw)
160+
end
161+
162+
# ---------------------------------------------------------------------------
163+
# Tests
164+
# ---------------------------------------------------------------------------
165+
166+
describe 'single span' do
167+
it 'preserves scalar fields on the wire' do
168+
trace = make_trace([{
169+
name: 'web.request',
170+
service: 'my-service',
171+
resource: 'GET /users',
172+
type: 'web',
173+
id: 12345,
174+
parent_id: 67890,
175+
error: 1,
176+
}])
177+
178+
decoded = send_and_decode([trace])
179+
180+
# v0.4 format: Array<Array<SpanHash>>
181+
expect(decoded).to be_an(Array)
182+
expect(decoded.length).to eq(1) # one trace chunk
183+
184+
chunk = decoded.first
185+
expect(chunk.length).to eq(1) # one span
186+
187+
span = chunk.first
188+
expect(span['name']).to eq('web.request')
189+
expect(span['service']).to eq('my-service')
190+
expect(span['resource']).to eq('GET /users')
191+
expect(span['type']).to eq('web')
192+
expect(span['span_id']).to eq(12345)
193+
expect(span['parent_id']).to eq(67890)
194+
expect(span['error']).to eq(1)
195+
end
196+
end
197+
198+
describe 'meta and metrics' do
199+
it 'preserves string tags on the wire' do
200+
trace = make_trace([{
201+
name: 'op',
202+
meta: {
203+
'http.method' => 'POST',
204+
'http.url' => '/api/v1/traces',
205+
'component' => 'rack',
206+
},
207+
}])
208+
209+
decoded = send_and_decode([trace])
210+
meta = decoded.first.first['meta']
211+
212+
expect(meta['http.method']).to eq('POST')
213+
expect(meta['http.url']).to eq('/api/v1/traces')
214+
expect(meta['component']).to eq('rack')
215+
end
216+
217+
it 'preserves numeric metrics on the wire' do
218+
trace = make_trace([{
219+
name: 'op',
220+
metrics: {
221+
'_dd.measured' => 1.0,
222+
'_sampling_priority_v1' => 2.0,
223+
'custom.metric' => 42.5,
224+
},
225+
}])
226+
227+
decoded = send_and_decode([trace])
228+
metrics = decoded.first.first['metrics']
229+
230+
expect(metrics['_dd.measured']).to eq(1.0)
231+
expect(metrics['_sampling_priority_v1']).to eq(2.0)
232+
expect(metrics['custom.metric']).to eq(42.5)
233+
end
234+
end
235+
236+
describe 'trace ID' do
237+
it 'preserves 64-bit trace IDs' do
238+
tid = 0x00000000deadbeef
239+
trace = make_trace([{ name: 'op', trace_id: tid }])
240+
decoded = send_and_decode([trace])
241+
expect(decoded.first.first['trace_id']).to eq(tid)
242+
end
243+
244+
it 'preserves the low 64 bits of 128-bit trace IDs' do
245+
low = 0xdeadbeef12345678
246+
high = 0x00000001
247+
tid = (high << 64) | low
248+
trace = make_trace([{ name: 'op', trace_id: tid }])
249+
decoded = send_and_decode([trace])
250+
251+
# The wire format trace_id field is 64-bit (low half only);
252+
# high bits go into meta as _dd.p.tid
253+
expect(decoded.first.first['trace_id']).to eq(low)
254+
end
255+
end
256+
257+
describe 'multiple spans in one trace' do
258+
it 'preserves all spans in a single chunk' do
259+
trace = make_trace([
260+
{ name: 'parent.op', id: 100, parent_id: 0 },
261+
{ name: 'child.op', id: 200, parent_id: 100 },
262+
{ name: 'sibling.op', id: 300, parent_id: 100 },
263+
])
264+
265+
decoded = send_and_decode([trace])
266+
267+
expect(decoded.length).to eq(1)
268+
names = decoded.first.map { |s| s['name'] }.sort
269+
expect(names).to eq(['child.op', 'parent.op', 'sibling.op'])
270+
end
271+
end
272+
273+
describe 'multiple trace chunks' do
274+
it 'sends all chunks in one payload' do
275+
trace1 = make_trace([{ name: 'trace1.op' }])
276+
trace2 = make_trace([{ name: 'trace2.op' }])
277+
278+
decoded = send_and_decode([trace1, trace2])
279+
280+
expect(decoded.length).to eq(2)
281+
names = decoded.map { |chunk| chunk.first['name'] }.sort
282+
expect(names).to eq(['trace1.op', 'trace2.op'])
283+
end
284+
end
285+
end

0 commit comments

Comments
 (0)