Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions packages/dd-trace/src/datastreams/checkpointer.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ class DataStreamsCheckpointer {
if (!this.config.dsmEnabled) return

const ctx = this.dsmProcessor.setCheckpoint(
['type:' + type, 'topic:' + target, 'direction:out', 'manual_checkpoint:true'],
['direction:out', 'type:' + type, 'topic:' + target, 'manual_checkpoint:true'],
null,
DataStreamsContext.getDataStreamsContext(),
null
DataStreamsContext.getDataStreamsContext()
)
DataStreamsContext.setDataStreamsContext(ctx)

Expand All @@ -45,17 +44,12 @@ class DataStreamsCheckpointer {
const parentCtx = this.tracer.extract('text_map_dsm', carrier)
DataStreamsContext.setDataStreamsContext(parentCtx)

const tags = ['type:' + type, 'topic:' + source, 'direction:in']
const tags = ['direction:in', 'type:' + type, 'topic:' + source]
if (manualCheckpoint) {
tags.push('manual_checkpoint:true')
}

const ctx = this.dsmProcessor.setCheckpoint(
tags,
null,
parentCtx,
null
)
const ctx = this.dsmProcessor.setCheckpoint(tags, null, parentCtx)
DataStreamsContext.setDataStreamsContext(ctx)

return ctx
Expand Down
67 changes: 39 additions & 28 deletions packages/dd-trace/src/datastreams/encoding.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict'

const maxVarLen64 = 9

/**
* Encodes positive and negative numbers, using zig zag encoding to reduce the size of the variable length encoding.
* Uses high and low part to ensure those parts are under the limit for byte operations in javascript (32 bits)
Expand All @@ -8,15 +10,45 @@
* @returns {Uint8Array|undefined}
*/
function encodeVarint (v) {
const sign = v >= 0 ? 0 : 1
const result = new Uint8Array(maxVarLen64)
const written = encodeVarintInto(result, 0, v)
if (written === 0) {
return
}
return result.slice(0, written)
}

/**
* Writes a zig-zag varint at `target[offset..]` and returns the offset just past the last
* byte written. Returns `offset` unchanged when the value exceeds MAX_SAFE_INTEGER/2, mirroring
* the `encodeVarint` overflow contract. Used on the DSM checkpoint hot path to avoid
* per-call Uint8Array / Buffer allocations.
* @param {Uint8Array | Buffer} target
* @param {number} offset
* @param {number} value
* @returns {number}
*/
function encodeVarintInto (target, offset, value) {
const sign = value >= 0 ? 0 : 1
// We leave the least significant bit for the sign.
const double = Math.abs(v) * 2
const double = Math.abs(value) * 2
if (double > Number.MAX_SAFE_INTEGER) {
return
return offset
}
const high = Math.floor(double / 0x1_00_00_00_00)
const low = (double & 0xFF_FF_FF_FF) | sign
return encodeUvarint64(low, high)
let high = Math.floor(double / 0x1_00_00_00_00)
let low = (double & 0xFF_FF_FF_FF) | sign
let i = offset
const limit = offset + maxVarLen64 - 1
// if first byte is 1, the number is negative in javascript, but we want to interpret it as positive
while ((high !== 0 || low < 0 || low > 0x80) && i < limit) {
target[i] = (low & 0x7F) | 0x80
low >>>= 7
low |= (high & 0x7F) << 25
high >>>= 7
i++
}
target[i] = low & 0x7F
return i + 1
}

/**
Expand All @@ -35,28 +67,6 @@ function decodeVarint (b) {
return [positive ? abs : -abs, bytes]
}

const maxVarLen64 = 9

/**
* @param {number} low
* @param {number} high
* @returns {Uint8Array}
*/
function encodeUvarint64 (low, high) {
const result = new Uint8Array(maxVarLen64)
let i = 0
// if first byte is 1, the number is negative in javascript, but we want to interpret it as positive
while ((high !== 0 || low < 0 || low > 0x80) && i < maxVarLen64 - 1) {
result[i] = (low & 0x7F) | 0x80
low >>>= 7
low |= (high & 0x7F) << 25
high >>>= 7
i++
}
result[i] = low & 0x7F
return result.slice(0, i + 1)
}

/**
* @param {Uint8Array} bytes
* @returns {[number|undefined, number|undefined, Uint8Array]}
Expand Down Expand Up @@ -95,5 +105,6 @@ function decodeUvarint64 (

module.exports = {
encodeVarint,
encodeVarintInto,
decodeVarint,
}
55 changes: 29 additions & 26 deletions packages/dd-trace/src/datastreams/pathway.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,24 @@ const crypto = require('crypto')
const { LRUCache } = require('../../../../vendor/dist/lru-cache')
const log = require('../log')
const pick = require('../../../datadog-core/src/utils/src/pick')
const { encodeVarint, decodeVarint } = require('./encoding')
const { encodeVarintInto, decodeVarint } = require('./encoding')

const cache = new LRUCache({ max: 500 })

const CONTEXT_PROPAGATION_KEY = 'dd-pathway-ctx'
const CONTEXT_PROPAGATION_KEY_BASE64 = 'dd-pathway-ctx-base64'

const PATHWAY_CONTEXT_BYTES = 20

// Reused across `encodePathwayContext` calls; the buffer is fully rewritten before each
// `Buffer.from(...)` copy-out so callers never observe mutation between checkpoints.
const pathwayScratch = Buffer.allocUnsafe(PATHWAY_CONTEXT_BYTES)

const logKeys = [CONTEXT_PROPAGATION_KEY, CONTEXT_PROPAGATION_KEY_BASE64]

function shaHash (checkpointString) {
const hash = crypto.createHash('sha256').update(checkpointString).digest('hex').slice(0, 16)
return Buffer.from(hash, 'hex')
// Copy out of the 32-byte digest so the LRU cache doesn't retain it.
return Buffer.from(crypto.createHash('sha256').update(checkpointString).digest().subarray(0, 8))
}

/**
Expand All @@ -30,30 +36,25 @@ function shaHash (checkpointString) {
*/
function computeHash (service, env, edgeTags, parentHash, propagationHashBigInt = null) {
edgeTags.sort()
const hashableEdgeTags = edgeTags.filter(item => item !== 'manual_checkpoint:true')

// Cache key includes parentHash to handle fan-in/fan-out scenarios where the same
// service+env+tags+propagationHash can have different parents. This ensures we cache
// the complete pathway context, not just the current node's identity.
const propagationPart = propagationHashBigInt ? `:${propagationHashBigInt.toString(16)}` : ''
const key = `${service}${env}${hashableEdgeTags.join('')}${parentHash}${propagationPart}`
const hashableEdgeTags = edgeTags.includes('manual_checkpoint:true')
? edgeTags.filter(item => item !== 'manual_checkpoint:true')
: edgeTags

// The cache key includes parentHash so a fan-in node with different parents
// gets distinct cache entries; the hash input below excludes parentHash and
// gets combined with it via a second sha pass to produce the final hash.
const joinedEdgeTags = hashableEdgeTags.join('')
const propagationHex = propagationHashBigInt ? propagationHashBigInt.toString(16) : ''
const propagationPart = propagationHex ? `:${propagationHex}` : ''
const key = `${service}${env}${joinedEdgeTags}${parentHash}${propagationPart}`

let value = cache.get(key)
if (value) {
return value
}

// Key vs hashInput distinction:
// - 'key' (above) is used for caching and includes parentHash to differentiate pathways
// with the same node but different parents (e.g., multiple queues feeding one consumer)
// - 'hashInput' (below) excludes parentHash to compute only the current node's identity hash,
// which is then XORed with parentHash (line 54) to build the complete pathway hash
// This two-step approach (hash current node independently, then combine with parent) is
// required for proper pathway construction in the DSM protocol.
const baseString = `${service}${env}` + hashableEdgeTags.join('')
const hashInput = propagationHashBigInt
? `${baseString}:${propagationHashBigInt.toString(16)}`
: baseString
const baseString = `${service}${env}${joinedEdgeTags}`
const hashInput = propagationHex ? `${baseString}:${propagationHex}` : baseString

const currentHash = shaHash(hashInput)
const buf = Buffer.concat([currentHash, parentHash], 16)
Expand All @@ -70,11 +71,12 @@ function computeHash (service, env, edgeTags, parentHash, propagationHashBigInt
* @returns {Buffer}
*/
function encodePathwayContext (dataStreamsContext) {
return Buffer.concat([
dataStreamsContext.hash,
Buffer.from(encodeVarint(Math.round(dataStreamsContext.pathwayStartNs / 1e6))),
Buffer.from(encodeVarint(Math.round(dataStreamsContext.edgeStartNs / 1e6))),
], 20)
let offset = dataStreamsContext.hash.copy(pathwayScratch, 0)
offset = encodeVarintInto(pathwayScratch, offset, Math.round(dataStreamsContext.pathwayStartNs / 1e6))
offset = encodeVarintInto(pathwayScratch, offset, Math.round(dataStreamsContext.edgeStartNs / 1e6))
// No-op when offset >= PATHWAY_CONTEXT_BYTES; otherwise pads stale bytes from a previous call.
pathwayScratch.fill(0, offset, PATHWAY_CONTEXT_BYTES)
return Buffer.from(pathwayScratch.subarray(0, PATHWAY_CONTEXT_BYTES))
}

/**
Expand Down Expand Up @@ -178,6 +180,7 @@ const DsmPathwayCodec = {
}

module.exports = {
CONTEXT_PROPAGATION_KEY_BASE64,
computePathwayHash: computeHash,
encodePathwayContext,
decodePathwayContext,
Expand Down
32 changes: 17 additions & 15 deletions packages/dd-trace/src/datastreams/processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,21 @@ const { PATHWAY_HASH, DSM_TRANSACTION_ID, DSM_TRANSACTION_CHECKPOINT } = require
const log = require('../log')
const processTags = require('../process-tags')
const propagationHash = require('../propagation-hash')
const { DsmPathwayCodec } = require('./pathway')
const { CONTEXT_PROPAGATION_KEY_BASE64, computePathwayHash } = require('./pathway')
const { DataStreamsWriter } = require('./writer')
const { computePathwayHash } = require('./pathway')
const { getAmqpMessageSize, getHeadersSize, getMessageSize, getSizeOrZero } = require('./size')
const { SchemaBuilder } = require('./schemas/schema_builder')
const { SchemaSampler } = require('./schemas/schema_sampler')

const ENTRY_PARENT_HASH = Buffer.from('0000000000000000', 'hex')

// A direction:out checkpoint estimates the size cost of the header the
// producer plugin will inject. The pathway context is always 20 binary
// bytes, encoded as 28 base64 chars; together with the header key and
// JSON framing (matching the prior `JSON.stringify({key: value})` byte
// count minus 1), this is a fixed value.
const PATHWAY_HEADER_BYTES = CONTEXT_PROPAGATION_KEY_BASE64.length + 28 + 6

class StatsPoint {
constructor (hash, parentHash, edgeTags) {
this.hash = hash.readBigUInt64LE()
Expand Down Expand Up @@ -271,19 +277,19 @@ class DataStreamsProcessor {

recordCheckpoint (checkpoint, span = null) {
if (!this.enabled) return
this.bucketFromTimestamp(checkpoint.currentTimestamp)
.forCheckpoint(checkpoint)
.addLatencies(checkpoint)
// set DSM pathway hash on span to enable related traces feature on DSM tab, convert from buffer to uint64
const statsPoint = this.bucketFromTimestamp(checkpoint.currentTimestamp).forCheckpoint(checkpoint)
statsPoint.addLatencies(checkpoint)
if (span) {
span.setTag(PATHWAY_HASH, checkpoint.hash.readBigUInt64LE(0).toString())
// StatsPoint already converted the 8-byte Buffer hash to a uint64 BigInt.
span.setTag(PATHWAY_HASH, statsPoint.hash.toString())
}
}

setCheckpoint (edgeTags, span, ctx = null, payloadSize = 0) {
if (!this.enabled) return null
setCheckpoint (edgeTags, span, ctx, payloadSize = 0) {
if (!this.enabled) return
const nowNs = Date.now() * 1e6
const direction = edgeTags.find(t => t.startsWith('direction:'))
// Callers must place the direction tag at index 0.
const direction = edgeTags[0]
let pathwayStartNs = nowNs
let edgeStartNs = nowNs
let parentHash = ENTRY_PARENT_HASH
Expand Down Expand Up @@ -334,11 +340,7 @@ class DataStreamsProcessor {
closestOppositeDirectionEdgeStart,
}
if (direction === 'direction:out') {
// Add the header for this now, as the callee doesn't have access to context when producing
// - 1 to account for extra byte for {
const ddInfoContinued = {}
DsmPathwayCodec.encode(dataStreamsContext, ddInfoContinued)
payloadSize += getSizeOrZero(JSON.stringify(ddInfoContinued)) - 1
payloadSize += PATHWAY_HEADER_BYTES
}
const checkpoint = {
currentTimestamp: nowNs,
Expand Down
8 changes: 6 additions & 2 deletions packages/dd-trace/src/datastreams/size.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { types } = require('util')

function getSizeOrZero (obj) {
if (typeof obj === 'string') {
return Buffer.from(obj, 'utf8').length
return Buffer.byteLength(obj, 'utf8')
}
if (types.isArrayBuffer(obj)) {
return obj.byteLength
Expand Down Expand Up @@ -32,7 +32,11 @@ function getSizeOrZero (obj) {

function getHeadersSize (headers) {
if (headers === undefined) return 0
return Object.entries(headers).reduce((prev, [key, val]) => getSizeOrZero(key) + getSizeOrZero(val) + prev, 0)
let size = 0
for (const key of Object.keys(headers)) {
size += Buffer.byteLength(key, 'utf8') + getSizeOrZero(headers[key])
}
return size
}

function getMessageSize (message) {
Expand Down
Loading
Loading