diff --git a/packages/dd-trace/src/datastreams/checkpointer.js b/packages/dd-trace/src/datastreams/checkpointer.js index abc03ec522..318bac034a 100644 --- a/packages/dd-trace/src/datastreams/checkpointer.js +++ b/packages/dd-trace/src/datastreams/checkpointer.js @@ -19,10 +19,9 @@ class DataStreamsCheckpointer { if (!this.config.dsmEnabled) return const ctx = this.dsmProcessor.setCheckpoint( - ['type:' + type, 'topic:' + target, 'direction:out', 'manual_checkpoint:true'], + ['direction:out', 'type:' + type, 'topic:' + target, 'manual_checkpoint:true'], null, - DataStreamsContext.getDataStreamsContext(), - null + DataStreamsContext.getDataStreamsContext() ) DataStreamsContext.setDataStreamsContext(ctx) @@ -45,17 +44,12 @@ class DataStreamsCheckpointer { const parentCtx = this.tracer.extract('text_map_dsm', carrier) DataStreamsContext.setDataStreamsContext(parentCtx) - const tags = ['type:' + type, 'topic:' + source, 'direction:in'] + const tags = ['direction:in', 'type:' + type, 'topic:' + source] if (manualCheckpoint) { tags.push('manual_checkpoint:true') } - const ctx = this.dsmProcessor.setCheckpoint( - tags, - null, - parentCtx, - null - ) + const ctx = this.dsmProcessor.setCheckpoint(tags, null, parentCtx) DataStreamsContext.setDataStreamsContext(ctx) return ctx diff --git a/packages/dd-trace/src/datastreams/encoding.js b/packages/dd-trace/src/datastreams/encoding.js index fcee66749e..e91b184dd4 100644 --- a/packages/dd-trace/src/datastreams/encoding.js +++ b/packages/dd-trace/src/datastreams/encoding.js @@ -1,5 +1,7 @@ 'use strict' +const maxVarLen64 = 9 + /** * Encodes positive and negative numbers, using zig zag encoding to reduce the size of the variable length encoding. * Uses high and low part to ensure those parts are under the limit for byte operations in javascript (32 bits) @@ -8,15 +10,45 @@ * @returns {Uint8Array|undefined} */ function encodeVarint (v) { - const sign = v >= 0 ? 0 : 1 + const result = new Uint8Array(maxVarLen64) + const written = encodeVarintInto(result, 0, v) + if (written === 0) { + return + } + return result.slice(0, written) +} + +/** + * Writes a zig-zag varint at `target[offset..]` and returns the offset just past the last + * byte written. Returns `offset` unchanged when the value exceeds MAX_SAFE_INTEGER/2, mirroring + * the `encodeVarint` overflow contract. Used on the DSM checkpoint hot path to avoid + * per-call Uint8Array / Buffer allocations. + * @param {Uint8Array | Buffer} target + * @param {number} offset + * @param {number} value + * @returns {number} + */ +function encodeVarintInto (target, offset, value) { + const sign = value >= 0 ? 0 : 1 // We leave the least significant bit for the sign. - const double = Math.abs(v) * 2 + const double = Math.abs(value) * 2 if (double > Number.MAX_SAFE_INTEGER) { - return + return offset } - const high = Math.floor(double / 0x1_00_00_00_00) - const low = (double & 0xFF_FF_FF_FF) | sign - return encodeUvarint64(low, high) + let high = Math.floor(double / 0x1_00_00_00_00) + let low = (double & 0xFF_FF_FF_FF) | sign + let i = offset + const limit = offset + maxVarLen64 - 1 + // if first byte is 1, the number is negative in javascript, but we want to interpret it as positive + while ((high !== 0 || low < 0 || low > 0x80) && i < limit) { + target[i] = (low & 0x7F) | 0x80 + low >>>= 7 + low |= (high & 0x7F) << 25 + high >>>= 7 + i++ + } + target[i] = low & 0x7F + return i + 1 } /** @@ -35,28 +67,6 @@ function decodeVarint (b) { return [positive ? abs : -abs, bytes] } -const maxVarLen64 = 9 - -/** - * @param {number} low - * @param {number} high - * @returns {Uint8Array} - */ -function encodeUvarint64 (low, high) { - const result = new Uint8Array(maxVarLen64) - let i = 0 - // if first byte is 1, the number is negative in javascript, but we want to interpret it as positive - while ((high !== 0 || low < 0 || low > 0x80) && i < maxVarLen64 - 1) { - result[i] = (low & 0x7F) | 0x80 - low >>>= 7 - low |= (high & 0x7F) << 25 - high >>>= 7 - i++ - } - result[i] = low & 0x7F - return result.slice(0, i + 1) -} - /** * @param {Uint8Array} bytes * @returns {[number|undefined, number|undefined, Uint8Array]} @@ -95,5 +105,6 @@ function decodeUvarint64 ( module.exports = { encodeVarint, + encodeVarintInto, decodeVarint, } diff --git a/packages/dd-trace/src/datastreams/pathway.js b/packages/dd-trace/src/datastreams/pathway.js index 2a60b932f4..dcf5badb7d 100644 --- a/packages/dd-trace/src/datastreams/pathway.js +++ b/packages/dd-trace/src/datastreams/pathway.js @@ -7,18 +7,24 @@ const crypto = require('crypto') const { LRUCache } = require('../../../../vendor/dist/lru-cache') const log = require('../log') const pick = require('../../../datadog-core/src/utils/src/pick') -const { encodeVarint, decodeVarint } = require('./encoding') +const { encodeVarintInto, decodeVarint } = require('./encoding') const cache = new LRUCache({ max: 500 }) const CONTEXT_PROPAGATION_KEY = 'dd-pathway-ctx' const CONTEXT_PROPAGATION_KEY_BASE64 = 'dd-pathway-ctx-base64' +const PATHWAY_CONTEXT_BYTES = 20 + +// Reused across `encodePathwayContext` calls; the buffer is fully rewritten before each +// `Buffer.from(...)` copy-out so callers never observe mutation between checkpoints. +const pathwayScratch = Buffer.allocUnsafe(PATHWAY_CONTEXT_BYTES) + const logKeys = [CONTEXT_PROPAGATION_KEY, CONTEXT_PROPAGATION_KEY_BASE64] function shaHash (checkpointString) { - const hash = crypto.createHash('sha256').update(checkpointString).digest('hex').slice(0, 16) - return Buffer.from(hash, 'hex') + // Copy out of the 32-byte digest so the LRU cache doesn't retain it. + return Buffer.from(crypto.createHash('sha256').update(checkpointString).digest().subarray(0, 8)) } /** @@ -30,30 +36,25 @@ function shaHash (checkpointString) { */ function computeHash (service, env, edgeTags, parentHash, propagationHashBigInt = null) { edgeTags.sort() - const hashableEdgeTags = edgeTags.filter(item => item !== 'manual_checkpoint:true') - - // Cache key includes parentHash to handle fan-in/fan-out scenarios where the same - // service+env+tags+propagationHash can have different parents. This ensures we cache - // the complete pathway context, not just the current node's identity. - const propagationPart = propagationHashBigInt ? `:${propagationHashBigInt.toString(16)}` : '' - const key = `${service}${env}${hashableEdgeTags.join('')}${parentHash}${propagationPart}` + const hashableEdgeTags = edgeTags.includes('manual_checkpoint:true') + ? edgeTags.filter(item => item !== 'manual_checkpoint:true') + : edgeTags + + // The cache key includes parentHash so a fan-in node with different parents + // gets distinct cache entries; the hash input below excludes parentHash and + // gets combined with it via a second sha pass to produce the final hash. + const joinedEdgeTags = hashableEdgeTags.join('') + const propagationHex = propagationHashBigInt ? propagationHashBigInt.toString(16) : '' + const propagationPart = propagationHex ? `:${propagationHex}` : '' + const key = `${service}${env}${joinedEdgeTags}${parentHash}${propagationPart}` let value = cache.get(key) if (value) { return value } - // Key vs hashInput distinction: - // - 'key' (above) is used for caching and includes parentHash to differentiate pathways - // with the same node but different parents (e.g., multiple queues feeding one consumer) - // - 'hashInput' (below) excludes parentHash to compute only the current node's identity hash, - // which is then XORed with parentHash (line 54) to build the complete pathway hash - // This two-step approach (hash current node independently, then combine with parent) is - // required for proper pathway construction in the DSM protocol. - const baseString = `${service}${env}` + hashableEdgeTags.join('') - const hashInput = propagationHashBigInt - ? `${baseString}:${propagationHashBigInt.toString(16)}` - : baseString + const baseString = `${service}${env}${joinedEdgeTags}` + const hashInput = propagationHex ? `${baseString}:${propagationHex}` : baseString const currentHash = shaHash(hashInput) const buf = Buffer.concat([currentHash, parentHash], 16) @@ -70,11 +71,12 @@ function computeHash (service, env, edgeTags, parentHash, propagationHashBigInt * @returns {Buffer} */ function encodePathwayContext (dataStreamsContext) { - return Buffer.concat([ - dataStreamsContext.hash, - Buffer.from(encodeVarint(Math.round(dataStreamsContext.pathwayStartNs / 1e6))), - Buffer.from(encodeVarint(Math.round(dataStreamsContext.edgeStartNs / 1e6))), - ], 20) + let offset = dataStreamsContext.hash.copy(pathwayScratch, 0) + offset = encodeVarintInto(pathwayScratch, offset, Math.round(dataStreamsContext.pathwayStartNs / 1e6)) + offset = encodeVarintInto(pathwayScratch, offset, Math.round(dataStreamsContext.edgeStartNs / 1e6)) + // No-op when offset >= PATHWAY_CONTEXT_BYTES; otherwise pads stale bytes from a previous call. + pathwayScratch.fill(0, offset, PATHWAY_CONTEXT_BYTES) + return Buffer.from(pathwayScratch.subarray(0, PATHWAY_CONTEXT_BYTES)) } /** @@ -178,6 +180,7 @@ const DsmPathwayCodec = { } module.exports = { + CONTEXT_PROPAGATION_KEY_BASE64, computePathwayHash: computeHash, encodePathwayContext, decodePathwayContext, diff --git a/packages/dd-trace/src/datastreams/processor.js b/packages/dd-trace/src/datastreams/processor.js index a8483111d4..e30d7d7c4a 100644 --- a/packages/dd-trace/src/datastreams/processor.js +++ b/packages/dd-trace/src/datastreams/processor.js @@ -8,15 +8,21 @@ const { PATHWAY_HASH, DSM_TRANSACTION_ID, DSM_TRANSACTION_CHECKPOINT } = require const log = require('../log') const processTags = require('../process-tags') const propagationHash = require('../propagation-hash') -const { DsmPathwayCodec } = require('./pathway') +const { CONTEXT_PROPAGATION_KEY_BASE64, computePathwayHash } = require('./pathway') const { DataStreamsWriter } = require('./writer') -const { computePathwayHash } = require('./pathway') const { getAmqpMessageSize, getHeadersSize, getMessageSize, getSizeOrZero } = require('./size') const { SchemaBuilder } = require('./schemas/schema_builder') const { SchemaSampler } = require('./schemas/schema_sampler') const ENTRY_PARENT_HASH = Buffer.from('0000000000000000', 'hex') +// A direction:out checkpoint estimates the size cost of the header the +// producer plugin will inject. The pathway context is always 20 binary +// bytes, encoded as 28 base64 chars; together with the header key and +// JSON framing (matching the prior `JSON.stringify({key: value})` byte +// count minus 1), this is a fixed value. +const PATHWAY_HEADER_BYTES = CONTEXT_PROPAGATION_KEY_BASE64.length + 28 + 6 + class StatsPoint { constructor (hash, parentHash, edgeTags) { this.hash = hash.readBigUInt64LE() @@ -271,19 +277,19 @@ class DataStreamsProcessor { recordCheckpoint (checkpoint, span = null) { if (!this.enabled) return - this.bucketFromTimestamp(checkpoint.currentTimestamp) - .forCheckpoint(checkpoint) - .addLatencies(checkpoint) - // set DSM pathway hash on span to enable related traces feature on DSM tab, convert from buffer to uint64 + const statsPoint = this.bucketFromTimestamp(checkpoint.currentTimestamp).forCheckpoint(checkpoint) + statsPoint.addLatencies(checkpoint) if (span) { - span.setTag(PATHWAY_HASH, checkpoint.hash.readBigUInt64LE(0).toString()) + // StatsPoint already converted the 8-byte Buffer hash to a uint64 BigInt. + span.setTag(PATHWAY_HASH, statsPoint.hash.toString()) } } - setCheckpoint (edgeTags, span, ctx = null, payloadSize = 0) { - if (!this.enabled) return null + setCheckpoint (edgeTags, span, ctx, payloadSize = 0) { + if (!this.enabled) return const nowNs = Date.now() * 1e6 - const direction = edgeTags.find(t => t.startsWith('direction:')) + // Callers must place the direction tag at index 0. + const direction = edgeTags[0] let pathwayStartNs = nowNs let edgeStartNs = nowNs let parentHash = ENTRY_PARENT_HASH @@ -334,11 +340,7 @@ class DataStreamsProcessor { closestOppositeDirectionEdgeStart, } if (direction === 'direction:out') { - // Add the header for this now, as the callee doesn't have access to context when producing - // - 1 to account for extra byte for { - const ddInfoContinued = {} - DsmPathwayCodec.encode(dataStreamsContext, ddInfoContinued) - payloadSize += getSizeOrZero(JSON.stringify(ddInfoContinued)) - 1 + payloadSize += PATHWAY_HEADER_BYTES } const checkpoint = { currentTimestamp: nowNs, diff --git a/packages/dd-trace/src/datastreams/size.js b/packages/dd-trace/src/datastreams/size.js index ccf0699b4c..c4f28456b7 100644 --- a/packages/dd-trace/src/datastreams/size.js +++ b/packages/dd-trace/src/datastreams/size.js @@ -4,7 +4,7 @@ const { types } = require('util') function getSizeOrZero (obj) { if (typeof obj === 'string') { - return Buffer.from(obj, 'utf8').length + return Buffer.byteLength(obj, 'utf8') } if (types.isArrayBuffer(obj)) { return obj.byteLength @@ -32,7 +32,11 @@ function getSizeOrZero (obj) { function getHeadersSize (headers) { if (headers === undefined) return 0 - return Object.entries(headers).reduce((prev, [key, val]) => getSizeOrZero(key) + getSizeOrZero(val) + prev, 0) + let size = 0 + for (const key of Object.keys(headers)) { + size += Buffer.byteLength(key, 'utf8') + getSizeOrZero(headers[key]) + } + return size } function getMessageSize (message) { diff --git a/packages/dd-trace/src/opentracing/propagation/text_map.js b/packages/dd-trace/src/opentracing/propagation/text_map.js index 062f99bcc7..07ba204dee 100644 --- a/packages/dd-trace/src/opentracing/propagation/text_map.js +++ b/packages/dd-trace/src/opentracing/propagation/text_map.js @@ -69,6 +69,12 @@ const percentByte = /%([0-9A-Fa-f]{2})/g class TextMapPropagator { #extractB3Context + /** @type {Set | undefined} Cached `Set` view of `_config.baggageTagKeys`. */ + #baggageTagKeysSet + + /** @type {string[] | undefined} Source array that `#baggageTagKeysSet` was built from. */ + #baggageTagKeysSetSource + constructor (config) { this._config = config @@ -78,6 +84,23 @@ class TextMapPropagator { this.#extractB3Context = envName === 'OTEL_PROPAGATORS' ? this._extractB3SingleContext : this._extractB3MultiContext } + /** + * Returns a `Set` view of `_config.baggageTagKeys` that is rebuilt only + * when the source array reference changes. Avoids an `O(n)` `Set` alloc + * per baggage extract (which is per-request when baggage propagation is + * enabled). + * + * @returns {Set} + */ + #getBaggageTagKeysSet () { + const source = this._config.baggageTagKeys + if (this.#baggageTagKeysSetSource !== source) { + this.#baggageTagKeysSet = new Set(source) + this.#baggageTagKeysSetSource = source + } + return this.#baggageTagKeysSet + } + inject (spanContext, carrier) { if (!carrier) return this._injectBaggageItems(spanContext, carrier) @@ -174,14 +197,14 @@ class TextMapPropagator { const baggageItems = getAllBaggageItems() if (!baggageItems) return - for (const [key, value] of Object.entries(baggageItems)) { + for (const key of Object.keys(baggageItems)) { const baggageKey = key.trim() if (!baggageTokenExpr.test(baggageKey)) continue // Do not trim values. If callers include leading/trailing whitespace, it must be percent-encoded. // W3C list-member allows optional properties after ';'. // https://www.w3.org/TR/baggage/#header-content - const item = `${baggageKey}=${encodeURIComponent(value)},` + const item = `${baggageKey}=${encodeURIComponent(baggageItems[key])},` itemCounter += 1 byteCounter += item.length @@ -218,14 +241,15 @@ class TextMapPropagator { const tags = [] - for (const key in trace.tags) { - if (!trace.tags[key] || !key.startsWith('_dd.p.')) continue - if (!this._validateTagKey(key) || !this._validateTagValue(trace.tags[key])) { + for (const key of Object.keys(trace.tags)) { + const value = trace.tags[key] + if (!value || !key.startsWith('_dd.p.')) continue + if (!this._validateTagKey(key) || !this._validateTagValue(value)) { log.error('Trace tags from span are invalid, skipping injection.') return } - tags.push(`${key}=${trace.tags[key]}`) + tags.push(`${key}=${value}`) } const header = tags.join(',') @@ -297,21 +321,22 @@ class TextMapPropagator { if (typeof origin === 'string') { const originValue = origin .replaceAll(tracestateOriginFilter, '_') - .replaceAll(/[\x3D]/g, '~') + .replaceAll('=', '~') state.set('o', originValue) } - for (const key in tags) { - if (!tags[key] || !key.startsWith('_dd.p.')) continue + for (const key of Object.keys(tags)) { + const tagValueRaw = tags[key] + if (!tagValueRaw || !key.startsWith('_dd.p.')) continue const tagKey = 't.' + key.slice(6) .replaceAll(tracestateTagKeyFilter, '_') - const tagValue = tags[key] + const tagValue = tagValueRaw .toString() .replaceAll(tracestateTagValueFilter, '_') - .replaceAll(/[\x3D]/g, '~') + .replaceAll('=', '~') state.set(tagKey, tagValue) } @@ -563,7 +588,7 @@ class TextMapPropagator { default: { if (!key.startsWith('t.')) continue const subKey = key.slice(2) // e.g. t.tid -> tid - const transformedValue = value.replaceAll(/[\x7E]/gm, '=') + const transformedValue = value.replaceAll('~', '=') // If subkey is tid then do nothing because trace header tid should always be preserved if (subKey === 'tid') { @@ -681,7 +706,7 @@ class TextMapPropagator { if (!header) return const baggages = header.split(',') - const baggageTagKeys = new Set(this._config.baggageTagKeys) + const baggageTagKeys = this.#getBaggageTagKeysSet() const tagAllKeys = baggageTagKeys.has('*') /** @type {Record | undefined} */ let items @@ -721,11 +746,17 @@ class TextMapPropagator { tracerMetrics.count('context_header_style.malformed', ['header_style:baggage']).inc() return } - try { - value = decodeURIComponent(value) - } catch { - const bytes = value.replaceAll(percentByte, (_, hex) => String.fromCharCode(Number.parseInt(hex, 16))) - value = Buffer.from(bytes, 'binary').toString('utf8') + // `decodeURIComponent` only does work when the value contains a + // percent-encoded sequence; everything else passes through unchanged. + // Skipping the call (and the surrounding `try` frame) shaves an alloc + // per baggage entry on the dominant ASCII case. + if (value.includes('%')) { + try { + value = decodeURIComponent(value) + } catch { + const bytes = value.replaceAll(percentByte, (_, hex) => String.fromCharCode(Number.parseInt(hex, 16))) + value = Buffer.from(bytes, 'binary').toString('utf8') + } } items ??= {} items[key] = value diff --git a/packages/dd-trace/src/opentracing/propagation/tracestate.js b/packages/dd-trace/src/opentracing/propagation/tracestate.js index 76600ab440..ba460b9fbb 100644 --- a/packages/dd-trace/src/opentracing/propagation/tracestate.js +++ b/packages/dd-trace/src/opentracing/propagation/tracestate.js @@ -112,7 +112,7 @@ class TraceState extends Map { if (state.changed) { const value = state.toString() if (value) { - this.set(vendor, state.toString()) + this.set(vendor, value) } else { this.delete(vendor) } diff --git a/packages/dd-trace/test/datastreams/data_streams_checkpointer.spec.js b/packages/dd-trace/test/datastreams/data_streams_checkpointer.spec.js index 762cf8cc2f..0607b753aa 100644 --- a/packages/dd-trace/test/datastreams/data_streams_checkpointer.spec.js +++ b/packages/dd-trace/test/datastreams/data_streams_checkpointer.spec.js @@ -117,7 +117,7 @@ describe('data streams checkpointer manual api', () => { tracer.dataStreamsCheckpointer.setConsumeCheckpoint('kinesis', 'stream-123', headers) const calledTags = mockSetCheckpoint.getCall(0).args[0] - assert.deepStrictEqual(calledTags, ['type:kinesis', 'topic:stream-123', 'direction:in', 'manual_checkpoint:true']) + assert.deepStrictEqual(calledTags, ['direction:in', 'type:kinesis', 'topic:stream-123', 'manual_checkpoint:true']) }) it('should set an automatic checkpoint when setConsumeCheckpoint is called with manualCheckpoint:false', function () { diff --git a/packages/dd-trace/test/datastreams/processor.spec.js b/packages/dd-trace/test/datastreams/processor.spec.js index 8a2c18f0a6..4f85b0253c 100644 --- a/packages/dd-trace/test/datastreams/processor.spec.js +++ b/packages/dd-trace/test/datastreams/processor.spec.js @@ -290,6 +290,16 @@ describe('DataStreamsProcessor', () => { assert.deepStrictEqual(encoded.PayloadSize, payloadSize.toProto()) }) + it('should tag the provided span with the pathway hash on recordCheckpoint', () => { + const span = { setTag: sinon.stub() } + processor.recordCheckpoint(mockCheckpoint, span) + sinon.assert.calledOnceWithExactly( + span.setTag, + 'pathway.hash', + DEFAULT_CURRENT_HASH.readBigUInt64LE().toString() + ) + }) + it('should export on interval', () => { processor.recordCheckpoint(mockCheckpoint) processor.onInterval()