Skip to content

Commit 00d0ec6

Browse files
committed
perf(dsm): trim per-checkpoint and per-message allocations
DSM observes every Kafka, SQS, SNS, Kinesis, Pub/Sub, and AMQP message when enabled, so the per-checkpoint hot path compounds. Several allocations are removed without changing the wire format: 1. `getSizeOrZero` stopped allocating a fresh Buffer copy of every string just to read its UTF-8 byte length. `Buffer.byteLength` returns the same value with no allocation. `getHeadersSize`'s `Object.entries(...).reduce(...)` becomes a `for (const key of Object.keys(headers))` loop, dropping the per-header `[k, v]` tuple and the reducer closure. 2. `pathway.js#shaHash` extracted the first 8 bytes of SHA-256 by round-tripping through a 64-char hex string + a 16-char slice + a hex-decoded Buffer. `digest().subarray(0, 8)` produces the same bytes directly. `computeHash` now also caches `hashableEdgeTags.join('')` and `propagationHashBigInt.toString(16)` once per call (each was computed twice), gates the `manual_checkpoint:true` filter on `includes(...)` so the common path skips the alloc, and reuses a module-scope 20-byte scratch buffer to assemble `encodePathwayContext` with a single `Buffer.from(subarray)` copy-out instead of seven nested allocs. 3. `setCheckpoint` precomputes `PATHWAY_HEADER_BYTES` from the static header overhead instead of allocating a temp object, encoding it, and JSON-stringifying just to read its length. It now reads the direction from `edgeTags[0]` directly: every in-tree caller places it there, the `DataStreamsCheckpointer` shape is updated to match, and the test fixture pinning that arg order is updated in the same commit. Drive-by fix: * `recordCheckpoint` reuses the `BigInt` already computed by the `StatsPoint` returned from `forCheckpoint(...)` instead of running `readBigUInt64LE` a second time. `setCheckpoint` returns `undefined` (rather than `null`) on the disabled fast path so the function shape matches the rest of the file. * `processor.js` drops the `DsmPathwayCodec` import that the inlined byte-count made unreachable; `pathway.js` exports `CONTEXT_PROPAGATION_KEY_BASE64` so the constant calculation is anchored to the actual header key. * `encoding.js` adds an `encodeVarintInto(target, offset, value)` helper so the pathway encoder can write directly into the scratch buffer instead of allocating a per-varint `Uint8Array` and copying.
1 parent 24a486f commit 00d0ec6

7 files changed

Lines changed: 106 additions & 82 deletions

File tree

packages/dd-trace/src/datastreams/checkpointer.js

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@ class DataStreamsCheckpointer {
1919
if (!this.config.dsmEnabled) return
2020

2121
const ctx = this.dsmProcessor.setCheckpoint(
22-
['type:' + type, 'topic:' + target, 'direction:out', 'manual_checkpoint:true'],
22+
['direction:out', 'type:' + type, 'topic:' + target, 'manual_checkpoint:true'],
2323
null,
24-
DataStreamsContext.getDataStreamsContext(),
25-
null
24+
DataStreamsContext.getDataStreamsContext()
2625
)
2726
DataStreamsContext.setDataStreamsContext(ctx)
2827

@@ -45,17 +44,12 @@ class DataStreamsCheckpointer {
4544
const parentCtx = this.tracer.extract('text_map_dsm', carrier)
4645
DataStreamsContext.setDataStreamsContext(parentCtx)
4746

48-
const tags = ['type:' + type, 'topic:' + source, 'direction:in']
47+
const tags = ['direction:in', 'type:' + type, 'topic:' + source]
4948
if (manualCheckpoint) {
5049
tags.push('manual_checkpoint:true')
5150
}
5251

53-
const ctx = this.dsmProcessor.setCheckpoint(
54-
tags,
55-
null,
56-
parentCtx,
57-
null
58-
)
52+
const ctx = this.dsmProcessor.setCheckpoint(tags, null, parentCtx)
5953
DataStreamsContext.setDataStreamsContext(ctx)
6054

6155
return ctx

packages/dd-trace/src/datastreams/encoding.js

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
'use strict'
22

3+
const maxVarLen64 = 9
4+
35
/**
46
* Encodes positive and negative numbers, using zig zag encoding to reduce the size of the variable length encoding.
57
* Uses high and low part to ensure those parts are under the limit for byte operations in javascript (32 bits)
@@ -8,15 +10,45 @@
810
* @returns {Uint8Array|undefined}
911
*/
1012
function encodeVarint (v) {
11-
const sign = v >= 0 ? 0 : 1
13+
const result = new Uint8Array(maxVarLen64)
14+
const written = encodeVarintInto(result, 0, v)
15+
if (written === 0) {
16+
return
17+
}
18+
return result.slice(0, written)
19+
}
20+
21+
/**
22+
* Writes a zig-zag varint at `target[offset..]` and returns the offset just past the last
23+
* byte written. Returns `offset` unchanged when the value exceeds MAX_SAFE_INTEGER/2, mirroring
24+
* the `encodeVarint` overflow contract. Used on the DSM checkpoint hot path to avoid
25+
* per-call Uint8Array / Buffer allocations.
26+
* @param {Uint8Array | Buffer} target
27+
* @param {number} offset
28+
* @param {number} value
29+
* @returns {number}
30+
*/
31+
function encodeVarintInto (target, offset, value) {
32+
const sign = value >= 0 ? 0 : 1
1233
// We leave the least significant bit for the sign.
13-
const double = Math.abs(v) * 2
34+
const double = Math.abs(value) * 2
1435
if (double > Number.MAX_SAFE_INTEGER) {
15-
return
36+
return offset
1637
}
17-
const high = Math.floor(double / 0x1_00_00_00_00)
18-
const low = (double & 0xFF_FF_FF_FF) | sign
19-
return encodeUvarint64(low, high)
38+
let high = Math.floor(double / 0x1_00_00_00_00)
39+
let low = (double & 0xFF_FF_FF_FF) | sign
40+
let i = offset
41+
const limit = offset + maxVarLen64 - 1
42+
// if first byte is 1, the number is negative in javascript, but we want to interpret it as positive
43+
while ((high !== 0 || low < 0 || low > 0x80) && i < limit) {
44+
target[i] = (low & 0x7F) | 0x80
45+
low >>>= 7
46+
low |= (high & 0x7F) << 25
47+
high >>>= 7
48+
i++
49+
}
50+
target[i] = low & 0x7F
51+
return i + 1
2052
}
2153

2254
/**
@@ -35,28 +67,6 @@ function decodeVarint (b) {
3567
return [positive ? abs : -abs, bytes]
3668
}
3769

38-
const maxVarLen64 = 9
39-
40-
/**
41-
* @param {number} low
42-
* @param {number} high
43-
* @returns {Uint8Array}
44-
*/
45-
function encodeUvarint64 (low, high) {
46-
const result = new Uint8Array(maxVarLen64)
47-
let i = 0
48-
// if first byte is 1, the number is negative in javascript, but we want to interpret it as positive
49-
while ((high !== 0 || low < 0 || low > 0x80) && i < maxVarLen64 - 1) {
50-
result[i] = (low & 0x7F) | 0x80
51-
low >>>= 7
52-
low |= (high & 0x7F) << 25
53-
high >>>= 7
54-
i++
55-
}
56-
result[i] = low & 0x7F
57-
return result.slice(0, i + 1)
58-
}
59-
6070
/**
6171
* @param {Uint8Array} bytes
6272
* @returns {[number|undefined, number|undefined, Uint8Array]}
@@ -95,5 +105,6 @@ function decodeUvarint64 (
95105

96106
module.exports = {
97107
encodeVarint,
108+
encodeVarintInto,
98109
decodeVarint,
99110
}

packages/dd-trace/src/datastreams/pathway.js

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,24 @@ const crypto = require('crypto')
77
const { LRUCache } = require('../../../../vendor/dist/lru-cache')
88
const log = require('../log')
99
const pick = require('../../../datadog-core/src/utils/src/pick')
10-
const { encodeVarint, decodeVarint } = require('./encoding')
10+
const { encodeVarintInto, decodeVarint } = require('./encoding')
1111

1212
const cache = new LRUCache({ max: 500 })
1313

1414
const CONTEXT_PROPAGATION_KEY = 'dd-pathway-ctx'
1515
const CONTEXT_PROPAGATION_KEY_BASE64 = 'dd-pathway-ctx-base64'
1616

17+
const PATHWAY_CONTEXT_BYTES = 20
18+
19+
// Reused across `encodePathwayContext` calls; the buffer is fully rewritten before each
20+
// `Buffer.from(...)` copy-out so callers never observe mutation between checkpoints.
21+
const pathwayScratch = Buffer.allocUnsafe(PATHWAY_CONTEXT_BYTES)
22+
1723
const logKeys = [CONTEXT_PROPAGATION_KEY, CONTEXT_PROPAGATION_KEY_BASE64]
1824

1925
function shaHash (checkpointString) {
20-
const hash = crypto.createHash('sha256').update(checkpointString).digest('hex').slice(0, 16)
21-
return Buffer.from(hash, 'hex')
26+
// Copy out of the 32-byte digest so the LRU cache doesn't retain it.
27+
return Buffer.from(crypto.createHash('sha256').update(checkpointString).digest().subarray(0, 8))
2228
}
2329

2430
/**
@@ -30,30 +36,25 @@ function shaHash (checkpointString) {
3036
*/
3137
function computeHash (service, env, edgeTags, parentHash, propagationHashBigInt = null) {
3238
edgeTags.sort()
33-
const hashableEdgeTags = edgeTags.filter(item => item !== 'manual_checkpoint:true')
34-
35-
// Cache key includes parentHash to handle fan-in/fan-out scenarios where the same
36-
// service+env+tags+propagationHash can have different parents. This ensures we cache
37-
// the complete pathway context, not just the current node's identity.
38-
const propagationPart = propagationHashBigInt ? `:${propagationHashBigInt.toString(16)}` : ''
39-
const key = `${service}${env}${hashableEdgeTags.join('')}${parentHash}${propagationPart}`
39+
const hashableEdgeTags = edgeTags.includes('manual_checkpoint:true')
40+
? edgeTags.filter(item => item !== 'manual_checkpoint:true')
41+
: edgeTags
42+
43+
// The cache key includes parentHash so a fan-in node with different parents
44+
// gets distinct cache entries; the hash input below excludes parentHash and
45+
// gets combined with it via a second sha pass to produce the final hash.
46+
const joinedEdgeTags = hashableEdgeTags.join('')
47+
const propagationHex = propagationHashBigInt ? propagationHashBigInt.toString(16) : ''
48+
const propagationPart = propagationHex ? `:${propagationHex}` : ''
49+
const key = `${service}${env}${joinedEdgeTags}${parentHash}${propagationPart}`
4050

4151
let value = cache.get(key)
4252
if (value) {
4353
return value
4454
}
4555

46-
// Key vs hashInput distinction:
47-
// - 'key' (above) is used for caching and includes parentHash to differentiate pathways
48-
// with the same node but different parents (e.g., multiple queues feeding one consumer)
49-
// - 'hashInput' (below) excludes parentHash to compute only the current node's identity hash,
50-
// which is then XORed with parentHash (line 54) to build the complete pathway hash
51-
// This two-step approach (hash current node independently, then combine with parent) is
52-
// required for proper pathway construction in the DSM protocol.
53-
const baseString = `${service}${env}` + hashableEdgeTags.join('')
54-
const hashInput = propagationHashBigInt
55-
? `${baseString}:${propagationHashBigInt.toString(16)}`
56-
: baseString
56+
const baseString = `${service}${env}${joinedEdgeTags}`
57+
const hashInput = propagationHex ? `${baseString}:${propagationHex}` : baseString
5758

5859
const currentHash = shaHash(hashInput)
5960
const buf = Buffer.concat([currentHash, parentHash], 16)
@@ -70,11 +71,12 @@ function computeHash (service, env, edgeTags, parentHash, propagationHashBigInt
7071
* @returns {Buffer}
7172
*/
7273
function encodePathwayContext (dataStreamsContext) {
73-
return Buffer.concat([
74-
dataStreamsContext.hash,
75-
Buffer.from(encodeVarint(Math.round(dataStreamsContext.pathwayStartNs / 1e6))),
76-
Buffer.from(encodeVarint(Math.round(dataStreamsContext.edgeStartNs / 1e6))),
77-
], 20)
74+
let offset = dataStreamsContext.hash.copy(pathwayScratch, 0)
75+
offset = encodeVarintInto(pathwayScratch, offset, Math.round(dataStreamsContext.pathwayStartNs / 1e6))
76+
offset = encodeVarintInto(pathwayScratch, offset, Math.round(dataStreamsContext.edgeStartNs / 1e6))
77+
// No-op when offset >= PATHWAY_CONTEXT_BYTES; otherwise pads stale bytes from a previous call.
78+
pathwayScratch.fill(0, offset, PATHWAY_CONTEXT_BYTES)
79+
return Buffer.from(pathwayScratch.subarray(0, PATHWAY_CONTEXT_BYTES))
7880
}
7981

8082
/**
@@ -178,6 +180,7 @@ const DsmPathwayCodec = {
178180
}
179181

180182
module.exports = {
183+
CONTEXT_PROPAGATION_KEY_BASE64,
181184
computePathwayHash: computeHash,
182185
encodePathwayContext,
183186
decodePathwayContext,

packages/dd-trace/src/datastreams/processor.js

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,21 @@ const { PATHWAY_HASH, DSM_TRANSACTION_ID, DSM_TRANSACTION_CHECKPOINT } = require
88
const log = require('../log')
99
const processTags = require('../process-tags')
1010
const propagationHash = require('../propagation-hash')
11-
const { DsmPathwayCodec } = require('./pathway')
11+
const { CONTEXT_PROPAGATION_KEY_BASE64, computePathwayHash } = require('./pathway')
1212
const { DataStreamsWriter } = require('./writer')
13-
const { computePathwayHash } = require('./pathway')
1413
const { getAmqpMessageSize, getHeadersSize, getMessageSize, getSizeOrZero } = require('./size')
1514
const { SchemaBuilder } = require('./schemas/schema_builder')
1615
const { SchemaSampler } = require('./schemas/schema_sampler')
1716

1817
const ENTRY_PARENT_HASH = Buffer.from('0000000000000000', 'hex')
1918

19+
// A direction:out checkpoint estimates the size cost of the header the
20+
// producer plugin will inject. The pathway context is always 20 binary
21+
// bytes, encoded as 28 base64 chars; together with the header key and
22+
// JSON framing (matching the prior `JSON.stringify({key: value})` byte
23+
// count minus 1), this is a fixed value.
24+
const PATHWAY_HEADER_BYTES = CONTEXT_PROPAGATION_KEY_BASE64.length + 28 + 6
25+
2026
class StatsPoint {
2127
constructor (hash, parentHash, edgeTags) {
2228
this.hash = hash.readBigUInt64LE()
@@ -271,19 +277,19 @@ class DataStreamsProcessor {
271277

272278
recordCheckpoint (checkpoint, span = null) {
273279
if (!this.enabled) return
274-
this.bucketFromTimestamp(checkpoint.currentTimestamp)
275-
.forCheckpoint(checkpoint)
276-
.addLatencies(checkpoint)
277-
// set DSM pathway hash on span to enable related traces feature on DSM tab, convert from buffer to uint64
280+
const statsPoint = this.bucketFromTimestamp(checkpoint.currentTimestamp).forCheckpoint(checkpoint)
281+
statsPoint.addLatencies(checkpoint)
278282
if (span) {
279-
span.setTag(PATHWAY_HASH, checkpoint.hash.readBigUInt64LE(0).toString())
283+
// StatsPoint already converted the 8-byte Buffer hash to a uint64 BigInt.
284+
span.setTag(PATHWAY_HASH, statsPoint.hash.toString())
280285
}
281286
}
282287

283-
setCheckpoint (edgeTags, span, ctx = null, payloadSize = 0) {
284-
if (!this.enabled) return null
288+
setCheckpoint (edgeTags, span, ctx, payloadSize = 0) {
289+
if (!this.enabled) return
285290
const nowNs = Date.now() * 1e6
286-
const direction = edgeTags.find(t => t.startsWith('direction:'))
291+
// Callers must place the direction tag at index 0.
292+
const direction = edgeTags[0]
287293
let pathwayStartNs = nowNs
288294
let edgeStartNs = nowNs
289295
let parentHash = ENTRY_PARENT_HASH
@@ -334,11 +340,7 @@ class DataStreamsProcessor {
334340
closestOppositeDirectionEdgeStart,
335341
}
336342
if (direction === 'direction:out') {
337-
// Add the header for this now, as the callee doesn't have access to context when producing
338-
// - 1 to account for extra byte for {
339-
const ddInfoContinued = {}
340-
DsmPathwayCodec.encode(dataStreamsContext, ddInfoContinued)
341-
payloadSize += getSizeOrZero(JSON.stringify(ddInfoContinued)) - 1
343+
payloadSize += PATHWAY_HEADER_BYTES
342344
}
343345
const checkpoint = {
344346
currentTimestamp: nowNs,

packages/dd-trace/src/datastreams/size.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ const { types } = require('util')
44

55
function getSizeOrZero (obj) {
66
if (typeof obj === 'string') {
7-
return Buffer.from(obj, 'utf8').length
7+
return Buffer.byteLength(obj, 'utf8')
88
}
99
if (types.isArrayBuffer(obj)) {
1010
return obj.byteLength
@@ -32,7 +32,11 @@ function getSizeOrZero (obj) {
3232

3333
function getHeadersSize (headers) {
3434
if (headers === undefined) return 0
35-
return Object.entries(headers).reduce((prev, [key, val]) => getSizeOrZero(key) + getSizeOrZero(val) + prev, 0)
35+
let size = 0
36+
for (const key of Object.keys(headers)) {
37+
size += Buffer.byteLength(key, 'utf8') + getSizeOrZero(headers[key])
38+
}
39+
return size
3640
}
3741

3842
function getMessageSize (message) {

packages/dd-trace/test/datastreams/data_streams_checkpointer.spec.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ describe('data streams checkpointer manual api', () => {
117117

118118
tracer.dataStreamsCheckpointer.setConsumeCheckpoint('kinesis', 'stream-123', headers)
119119
const calledTags = mockSetCheckpoint.getCall(0).args[0]
120-
assert.deepStrictEqual(calledTags, ['type:kinesis', 'topic:stream-123', 'direction:in', 'manual_checkpoint:true'])
120+
assert.deepStrictEqual(calledTags, ['direction:in', 'type:kinesis', 'topic:stream-123', 'manual_checkpoint:true'])
121121
})
122122

123123
it('should set an automatic checkpoint when setConsumeCheckpoint is called with manualCheckpoint:false', function () {

packages/dd-trace/test/datastreams/processor.spec.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,16 @@ describe('DataStreamsProcessor', () => {
290290
assert.deepStrictEqual(encoded.PayloadSize, payloadSize.toProto())
291291
})
292292

293+
it('should tag the provided span with the pathway hash on recordCheckpoint', () => {
294+
const span = { setTag: sinon.stub() }
295+
processor.recordCheckpoint(mockCheckpoint, span)
296+
sinon.assert.calledOnceWithExactly(
297+
span.setTag,
298+
'pathway.hash',
299+
DEFAULT_CURRENT_HASH.readBigUInt64LE().toString()
300+
)
301+
})
302+
293303
it('should export on interval', () => {
294304
processor.recordCheckpoint(mockCheckpoint)
295305
processor.onInterval()

0 commit comments

Comments
 (0)