Skip to content

Commit 8eac0cd

Browse files
authored
feat(telemetry): add stable session identifier headers (#7821)
* feat(telemetry): add stable session identifier headers adds DD-Session-ID and DD-Root-Session-ID headers to telemetry requests so backend can correlate telemetry across parent/child processes without runtime_id fragmentation
1 parent a788ea6 commit 8eac0cd

8 files changed

Lines changed: 480 additions & 10 deletions

File tree

packages/datadog-instrumentations/src/child_process.js

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const {
1212
const childProcessChannel = dc.tracingChannel('datadog:child_process:execution')
1313

1414
// ignored exec method because it calls to execFile directly
15-
const execAsyncMethods = ['execFile', 'spawn']
15+
const execAsyncMethods = ['execFile', 'spawn', 'fork']
1616

1717
const names = ['child_process', 'node:child_process']
1818

@@ -97,8 +97,10 @@ function wrapChildProcessSyncMethod (returnError, shell = false) {
9797
return childProcessMethod.apply(this, arguments)
9898
}
9999

100-
const childProcessInfo = normalizeArgs(arguments, shell)
100+
const callArgs = [...arguments]
101+
const childProcessInfo = normalizeArgs(callArgs, shell)
101102
const context = createContextFromChildProcessInfo(childProcessInfo)
103+
context.callArgs = callArgs
102104

103105
return childProcessChannel.start.runStores(context, () => {
104106
try {
@@ -108,7 +110,7 @@ function wrapChildProcessSyncMethod (returnError, shell = false) {
108110
return returnError(error, context)
109111
}
110112

111-
const result = childProcessMethod.apply(this, arguments)
113+
const result = childProcessMethod.apply(this, context.callArgs)
112114
context.result = result
113115

114116
return result
@@ -131,9 +133,11 @@ function wrapChildProcessCustomPromisifyMethod (customPromisifyMethod, shell) {
131133
return customPromisifyMethod.apply(this, arguments)
132134
}
133135

134-
const childProcessInfo = normalizeArgs(arguments, shell)
136+
const callArgs = [...arguments]
137+
const childProcessInfo = normalizeArgs(callArgs, shell)
135138

136139
const context = createContextFromChildProcessInfo(childProcessInfo)
140+
context.callArgs = callArgs
137141

138142
const { start, end, asyncStart, asyncEnd, error } = childProcessChannel
139143
start.publish(context)
@@ -143,7 +147,7 @@ function wrapChildProcessCustomPromisifyMethod (customPromisifyMethod, shell) {
143147
result = Promise.reject(context.abortController.signal.reason || new Error('Aborted'))
144148
} else {
145149
try {
146-
result = customPromisifyMethod.apply(this, arguments)
150+
result = customPromisifyMethod.apply(this, context.callArgs)
147151
} catch (error) {
148152
context.error = error
149153
error.publish(context)
@@ -181,9 +185,11 @@ function wrapChildProcessAsyncMethod (ChildProcess, shell = false) {
181185
return childProcessMethod.apply(this, arguments)
182186
}
183187

184-
const childProcessInfo = normalizeArgs(arguments, shell)
188+
const callArgs = [...arguments]
189+
const childProcessInfo = normalizeArgs(callArgs, shell)
185190

186191
const context = createContextFromChildProcessInfo(childProcessInfo)
192+
context.callArgs = callArgs
187193
return childProcessChannel.start.runStores(context, () => {
188194
let childProcess
189195
if (context.abortController.signal.aborted) {
@@ -194,15 +200,15 @@ function wrapChildProcessAsyncMethod (ChildProcess, shell = false) {
194200
const error = context.abortController.signal.reason || new Error('Aborted')
195201
childProcess.emit('error', error)
196202

197-
const cb = arguments[arguments.length - 1]
203+
const cb = context.callArgs[context.callArgs.length - 1]
198204
if (typeof cb === 'function') {
199205
cb(error)
200206
}
201207

202208
childProcess.emit('close')
203209
})
204210
} else {
205-
childProcess = childProcessMethod.apply(this, arguments)
211+
childProcess = childProcessMethod.apply(this, context.callArgs)
206212
}
207213

208214
if (childProcess) {

packages/datadog-instrumentations/test/child_process.spec.js

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
'use strict'
22

33
const assert = require('node:assert/strict')
4+
const fs = require('node:fs')
5+
const os = require('node:os')
6+
const path = require('node:path')
47
const { promisify } = require('node:util')
58

69
const dc = require('dc-polyfill')
@@ -326,7 +329,6 @@ describe('child process', () => {
326329
abortController: sinon.match.instanceOf(AbortController),
327330
shell: true,
328331
})
329-
sinon.assert.calledOnce(start)
330332
sinon.assert.calledWithMatch(asyncFinish, {
331333
command: 'echo',
332334
file: 'echo',
@@ -701,6 +703,130 @@ describe('child process', () => {
701703
})
702704
})
703705
})
706+
707+
describe('fork', () => {
708+
let tmpScript
709+
710+
before(() => {
711+
tmpScript = path.join(os.tmpdir(), `dd-trace-test-fork-${Date.now()}.js`)
712+
fs.writeFileSync(tmpScript, 'process.exit(0)')
713+
})
714+
715+
after(() => {
716+
try { fs.unlinkSync(tmpScript) } catch (e) { /* ignore */ }
717+
})
718+
719+
it('should execute success callbacks', (done) => {
720+
const child = childProcess.fork(tmpScript)
721+
722+
child.once('close', () => {
723+
sinon.assert.calledOnce(start)
724+
sinon.assert.calledWithMatch(start, {
725+
command: tmpScript,
726+
file: tmpScript,
727+
shell: false,
728+
abortController: sinon.match.instanceOf(AbortController),
729+
})
730+
sinon.assert.calledOnce(asyncFinish)
731+
sinon.assert.calledWithMatch(asyncFinish, {
732+
command: tmpScript,
733+
file: tmpScript,
734+
shell: false,
735+
result: 0,
736+
})
737+
sinon.assert.notCalled(error)
738+
done()
739+
})
740+
})
741+
742+
it('should publish arguments', (done) => {
743+
const child = childProcess.fork(tmpScript, ['--flag'])
744+
745+
child.once('close', () => {
746+
sinon.assert.calledOnce(start)
747+
sinon.assert.calledWithMatch(start, {
748+
command: `${tmpScript} --flag`,
749+
file: tmpScript,
750+
fileArgs: ['--flag'],
751+
shell: false,
752+
abortController: sinon.match.instanceOf(AbortController),
753+
})
754+
done()
755+
})
756+
})
757+
758+
it('should execute error callback for non-existent module', (done) => {
759+
const child = childProcess.fork('non_existent_module_test.js')
760+
761+
child.once('error', () => {})
762+
763+
child.once('close', () => {
764+
sinon.assert.calledOnce(start)
765+
sinon.assert.calledWithMatch(start, {
766+
command: 'non_existent_module_test.js',
767+
file: 'non_existent_module_test.js',
768+
shell: false,
769+
})
770+
sinon.assert.calledOnce(error)
771+
done()
772+
})
773+
})
774+
})
775+
776+
describe('callArgs on context', () => {
777+
function injectTestEnv (context) {
778+
if (!context.callArgs) return
779+
const args = context.callArgs
780+
const opts = args[2] != null && typeof args[2] === 'object' ? args[2] : {}
781+
args[2] = { ...opts, env: { ...process.env, DD_TEST_VAR: 'injected' } }
782+
}
783+
784+
it('should include callArgs for async methods', (done) => {
785+
const child = childProcess.spawn('echo', ['hello'])
786+
787+
child.once('close', () => {
788+
sinon.assert.calledOnce(start)
789+
const context = start.firstCall.firstArg
790+
assert.ok(Array.isArray(context.callArgs))
791+
assert.strictEqual(context.callArgs[0], 'echo')
792+
assert.deepStrictEqual(context.callArgs[1], ['hello'])
793+
done()
794+
})
795+
})
796+
797+
it('should include callArgs for sync methods', () => {
798+
childProcess.spawnSync('echo', ['hello'])
799+
800+
sinon.assert.calledOnce(start)
801+
const context = start.firstCall.firstArg
802+
assert.ok(Array.isArray(context.callArgs))
803+
assert.strictEqual(context.callArgs[0], 'echo')
804+
assert.deepStrictEqual(context.callArgs[1], ['hello'])
805+
})
806+
807+
it('should allow subscribers to mutate callArgs for async methods', (done) => {
808+
childProcessChannel.subscribe({ start: injectTestEnv })
809+
810+
const script = 'process.exit(process.env.DD_TEST_VAR === "injected" ? 0 : 1)'
811+
const child = childProcess.spawn('node', ['-e', script])
812+
813+
child.once('close', (code) => {
814+
childProcessChannel.unsubscribe({ start: injectTestEnv })
815+
assert.strictEqual(code, 0)
816+
done()
817+
})
818+
})
819+
820+
it('should allow subscribers to mutate callArgs for sync methods', () => {
821+
childProcessChannel.subscribe({ start: injectTestEnv })
822+
823+
const script = 'process.exit(process.env.DD_TEST_VAR === "injected" ? 0 : 1)'
824+
const result = childProcess.spawnSync('node', ['-e', script])
825+
826+
childProcessChannel.unsubscribe({ start: injectTestEnv })
827+
assert.strictEqual(result.status, 0)
828+
})
829+
})
704830
})
705831
})
706832
})

packages/dd-trace/src/config/index.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ const VALID_PROPAGATION_BEHAVIOR_EXTRACT = new Set(['continue', 'restart', 'igno
4949
const VALID_LOG_LEVELS = new Set(['debug', 'info', 'warn', 'error'])
5050
const DEFAULT_OTLP_PORT = 4318
5151
const RUNTIME_ID = uuid()
52+
// eslint-disable-next-line eslint-rules/eslint-process-env -- internal propagation, not user config
53+
const ROOT_SESSION_ID = process.env.DD_ROOT_JS_SESSION_ID || RUNTIME_ID
5254
const NAMING_VERSIONS = new Set(['v0', 'v1'])
5355
const DEFAULT_NAMING_VERSION = 'v0'
5456

@@ -145,6 +147,8 @@ class Config {
145147
'runtime-id': RUNTIME_ID,
146148
})
147149

150+
this.rootSessionId = ROOT_SESSION_ID
151+
148152
if (this.isCiVisibility) {
149153
tagger.add(this.tags, {
150154
[ORIGIN_KEY]: 'ciapp-test',

packages/dd-trace/src/telemetry/send-data.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,17 @@ let agentTelemetry = true
9191
* @returns {Record<string, string>}
9292
*/
9393
function getHeaders (config, application, reqType) {
94+
const sessionId = config.tags['runtime-id']
9495
const headers = {
9596
'content-type': 'application/json',
9697
'dd-telemetry-api-version': 'v2',
9798
'dd-telemetry-request-type': reqType,
9899
'dd-client-library-language': application.language_name,
99100
'dd-client-library-version': application.tracer_version,
101+
'dd-session-id': sessionId,
102+
}
103+
if (config.rootSessionId && config.rootSessionId !== sessionId) {
104+
headers['dd-root-session-id'] = config.rootSessionId
100105
}
101106
const debug = config.telemetry && config.telemetry.debug
102107
if (debug) {
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
'use strict'
2+
3+
const dc = require('dc-polyfill')
4+
5+
const childProcessChannel = dc.tracingChannel('datadog:child_process:execution')
6+
7+
let subscribed = false
8+
let rootSessionId
9+
let runtimeId
10+
11+
function injectSessionEnv (existingEnv) {
12+
// eslint-disable-next-line eslint-rules/eslint-process-env -- not in supported-configurations.json
13+
const base = existingEnv == null ? process.env : existingEnv
14+
return {
15+
...base,
16+
DD_ROOT_JS_SESSION_ID: rootSessionId,
17+
DD_PARENT_JS_SESSION_ID: runtimeId,
18+
}
19+
}
20+
21+
function findOptionsIndex (args, shell) {
22+
if (Array.isArray(args[1])) {
23+
return { index: 2, exists: args[2] != null && typeof args[2] === 'object' }
24+
}
25+
if (args[1] != null && typeof args[1] === 'object') {
26+
return { index: 1, exists: true }
27+
}
28+
if (!shell && args[2] != null && typeof args[2] === 'object') {
29+
return { index: 2, exists: true }
30+
}
31+
return { index: shell ? 1 : 2, exists: false }
32+
}
33+
34+
function onChildProcessStart (context) {
35+
if (!context.callArgs) return
36+
37+
const args = context.callArgs
38+
const { index, exists } = findOptionsIndex(args, context.shell)
39+
40+
if (exists) {
41+
args[index] = { ...args[index], env: injectSessionEnv(args[index].env) }
42+
return
43+
}
44+
45+
const opts = { env: injectSessionEnv(null) }
46+
47+
if (!context.shell && !Array.isArray(args[1])) {
48+
args.splice(1, 0, [])
49+
}
50+
51+
if (typeof args[index] === 'function') {
52+
args.splice(index, 0, opts)
53+
} else {
54+
args[index] = opts
55+
}
56+
}
57+
58+
const handler = { start: onChildProcessStart }
59+
60+
function start (config) {
61+
if (!config.telemetry?.enabled || subscribed) return
62+
subscribed = true
63+
64+
rootSessionId = config.rootSessionId
65+
runtimeId = config.tags['runtime-id']
66+
67+
childProcessChannel.subscribe(handler)
68+
}
69+
70+
function stop () {
71+
if (!subscribed) return
72+
childProcessChannel.unsubscribe(handler)
73+
subscribed = false
74+
rootSessionId = undefined
75+
runtimeId = undefined
76+
}
77+
78+
module.exports = { start, stop, _onChildProcessStart: onChildProcessStart }

packages/dd-trace/src/telemetry/telemetry.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const endpoints = require('./endpoints')
1212
const { sendData } = require('./send-data')
1313
const { manager: metricsManager } = require('./metrics')
1414
const telemetryLogger = require('./logs')
15+
const sessionPropagation = require('./session-propagation')
1516

1617
/**
1718
* @typedef {Record<string, unknown>} TelemetryPayloadObject
@@ -370,6 +371,7 @@ function start (aConfig, thePluginManager) {
370371
dependencies.start(config, application, host, getRetryData, updateRetryData)
371372
telemetryLogger.start(config)
372373
endpoints.start(config, application, host, getRetryData, updateRetryData)
374+
sessionPropagation.start(config)
373375

374376
sendData(config, application, host, 'app-started', appStarted(config))
375377

@@ -397,6 +399,7 @@ function stop () {
397399
telemetryStopChannel.publish(getTelemetryData())
398400

399401
endpoints.stop()
402+
sessionPropagation.stop()
400403
config = undefined
401404
}
402405

0 commit comments

Comments
 (0)