Skip to content

Commit a3083c9

Browse files
dhensbyclaude
andcommitted
feat: extend TracingChannel coverage to the callback API
The callback branches of query/batch/execute/bulk on Request, prepare and execute on PreparedStatement, and connect on ConnectionPool were previously invisible to TracingChannel subscribers, forcing consumers to migrate to the promise API just to get observability. Add a traceCallback helper alongside tracePromise in lib/diagnostics that wraps Node's TracingChannel#traceCallback with the same hasSubscribers fast path. Route each callback branch through it so subscribers see the same start / end / asyncStart / asyncEnd / error sub-events regardless of which API style the caller chose — no branching required on the subscriber side. Internal requests (sp_prepare, sp_execute, sp_unprepare) continue to be suppressed via _internal; Request exposes a _tracedCallback mirror of _tracedPromise to keep that gate symmetric. Stream-mode PreparedStatement.execute without a user callback falls through untraced because there is no async boundary for traceCallback to hook — streaming consumers get lifecycle via Request events instead. Update the README note that said the promise API was the only traced path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 2dcf2a6 commit a3083c9

6 files changed

Lines changed: 205 additions & 15 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2265,7 +2265,7 @@ TracingChannel contexts include identifiers (`requestId`, `poolId`), operation d
22652265
22662266
> **Note on identifiers:** `connectionId`, `poolId`, `requestId`, `transactionId`, and `preparedStatementId` are monotonically increasing integers scoped to the current node process. They are not stable across restarts and cannot be used to correlate activity across processes.
22672267
2268-
> **Note:** TracingChannel instrumentation is active on the **promise** API. If you use the callback API, TracingChannel events will not fire for those calls. Point-event channels (connection, transaction, pool lifecycle) fire regardless of API style.
2268+
> **Note:** TracingChannel instrumentation fires for both the promise and callback APIs. The callback API is traced via Node's `TracingChannel#traceCallback`, which emits the same `start` / `end` / `asyncStart` / `asyncEnd` / `error` sub-events as the promise path, so subscribers do not need to branch by API style. Point-event channels (connection, transaction, pool lifecycle) likewise fire regardless of API style.
22692269
22702270
### Point-event channels
22712271

lib/base/connection-pool.js

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const { IDS } = require('../utils')
88
const ConnectionError = require('../error/connection-error')
99
const shared = require('../shared')
1010
const { MSSQLError } = require('../error')
11-
const { CHANNELS, tracePromise, publish } = require('../diagnostics')
11+
const { CHANNELS, tracePromise, traceCallback, publish } = require('../diagnostics')
1212

1313
/**
1414
* Class ConnectionPool.
@@ -440,7 +440,16 @@ class ConnectionPool extends EventEmitter {
440440

441441
connect (callback) {
442442
if (typeof callback === 'function') {
443-
this._connect(callback)
443+
traceCallback(CHANNELS.TRACE_CONNECT, this._connect, 0, () => ({
444+
server: this.config.server,
445+
port: this.config.port,
446+
database: this.config.database,
447+
poolId: IDS.get(this),
448+
poolConfig: {
449+
min: (this.config.pool && this.config.pool.min) || 0,
450+
max: (this.config.pool && this.config.pool.max) || 10
451+
}
452+
}), this, [callback])
444453
return this
445454
}
446455

lib/base/prepared-statement.js

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ const globalConnection = require('../global-connection')
77
const { TransactionError, PreparedStatementError } = require('../error')
88
const shared = require('../shared')
99
const { TYPES, declare } = require('../datatypes')
10-
const { CHANNELS, tracePromise, publish } = require('../diagnostics')
10+
const { CHANNELS, tracePromise, traceCallback, publish } = require('../diagnostics')
1111

1212
/**
1313
* Class PreparedStatement.
@@ -195,7 +195,12 @@ class PreparedStatement extends EventEmitter {
195195

196196
prepare (statement, callback) {
197197
if (typeof callback === 'function') {
198-
this._prepare(statement, callback)
198+
traceCallback(CHANNELS.TRACE_PREPARED_STATEMENT_PREPARE, this._prepare, 1, () => ({
199+
statement: statement || this.statement,
200+
parameters: Object.keys(this.parameters),
201+
preparedStatementId: IDS.get(this),
202+
poolId: getPoolId(this)
203+
}), this, [statement, callback])
199204
return this
200205
}
201206

@@ -285,7 +290,19 @@ class PreparedStatement extends EventEmitter {
285290

286291
execute (values, callback) {
287292
if (this.stream || (typeof callback === 'function')) {
288-
return this._execute(values, callback)
293+
if (typeof callback !== 'function') {
294+
// Stream mode without a callback: no async boundary for traceCallback
295+
// to hook — fall through to the untraced call. Subscribers interested
296+
// in streaming completion should listen to Request events.
297+
return this._execute(values, callback)
298+
}
299+
return traceCallback(CHANNELS.TRACE_PREPARED_STATEMENT_EXECUTE, this._execute, 1, () => ({
300+
statement: this.statement,
301+
parameters: Object.keys(this.parameters),
302+
handle: this._handle,
303+
preparedStatementId: IDS.get(this),
304+
poolId: getPoolId(this)
305+
}), this, [values, callback])
289306
}
290307

291308
return tracePromise(CHANNELS.TRACE_PREPARED_STATEMENT_EXECUTE, () => {

lib/base/request.js

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const globalConnection = require('../global-connection')
88
const { RequestError, ConnectionError } = require('../error')
99
const { TYPES } = require('../datatypes')
1010
const shared = require('../shared')
11-
const { CHANNELS, tracePromise, publish } = require('../diagnostics')
11+
const { CHANNELS, tracePromise, traceCallback, publish } = require('../diagnostics')
1212

1313
/**
1414
* Class Request.
@@ -231,6 +231,13 @@ class Request extends EventEmitter {
231231
return tracePromise(channel, fn, contextFactory)
232232
}
233233

234+
_tracedCallback (channel, contextFactory, fn, position, args) {
235+
if (this._internal) {
236+
return fn.apply(this, args)
237+
}
238+
return traceCallback(channel, fn, position, contextFactory, this, args)
239+
}
240+
234241
/**
235242
* Execute the SQL batch.
236243
*
@@ -245,7 +252,11 @@ class Request extends EventEmitter {
245252
this.rowsAffected = 0
246253

247254
if (typeof callback === 'function') {
248-
this._batch(batch, (err, recordsets, output, rowsAffected) => {
255+
this._tracedCallback(CHANNELS.TRACE_BATCH, () => ({
256+
command: batch,
257+
requestId: IDS.get(this),
258+
poolId: getPoolId(this)
259+
}), this._batch, 1, [batch, (err, recordsets, output, rowsAffected) => {
249260
if (this.stream) {
250261
if (err) this.emit('error', err)
251262
err = null
@@ -263,7 +274,7 @@ class Request extends EventEmitter {
263274
output,
264275
rowsAffected
265276
})
266-
})
277+
}])
267278
return this
268279
}
269280

@@ -344,7 +355,12 @@ class Request extends EventEmitter {
344355
if (this.arrayRowMode === null && this.parent) this.arrayRowMode = this.parent.config.arrayRowMode
345356

346357
if (this.stream || typeof callback === 'function') {
347-
this._bulk(table, options, (err, rowsAffected) => {
358+
this._tracedCallback(CHANNELS.TRACE_BULK, () => ({
359+
table: table.path || table.name,
360+
rowCount: table.rows ? table.rows.length : 0,
361+
requestId: IDS.get(this),
362+
poolId: getPoolId(this)
363+
}), this._bulk, 2, [table, options, (err, rowsAffected) => {
348364
if (this.stream) {
349365
if (err) this.emit('error', err)
350366
return this.emit('done', {
@@ -356,7 +372,7 @@ class Request extends EventEmitter {
356372
callback(null, {
357373
rowsAffected
358374
})
359-
})
375+
}])
360376
return this
361377
}
362378

@@ -456,7 +472,12 @@ class Request extends EventEmitter {
456472
this.rowsAffected = 0
457473

458474
if (typeof callback === 'function') {
459-
this._query(command, (err, recordsets, output, rowsAffected, columns) => {
475+
this._tracedCallback(CHANNELS.TRACE_QUERY, () => ({
476+
command,
477+
parameters: this._getParameterNames(),
478+
requestId: IDS.get(this),
479+
poolId: getPoolId(this)
480+
}), this._query, 1, [command, (err, recordsets, output, rowsAffected, columns) => {
460481
if (this.stream) {
461482
if (err) this.emit('error', err)
462483
err = null
@@ -476,7 +497,7 @@ class Request extends EventEmitter {
476497
}
477498
if (this.arrayRowMode) result.columns = columns
478499
callback(null, result)
479-
})
500+
}])
480501
return this
481502
}
482503

@@ -552,7 +573,12 @@ class Request extends EventEmitter {
552573
this.rowsAffected = 0
553574

554575
if (typeof callback === 'function') {
555-
this._execute(command, (err, recordsets, output, returnValue, rowsAffected, columns) => {
576+
this._tracedCallback(CHANNELS.TRACE_EXECUTE, () => ({
577+
procedure: command,
578+
parameters: this._getParameterNames(),
579+
requestId: IDS.get(this),
580+
poolId: getPoolId(this)
581+
}), this._execute, 1, [command, (err, recordsets, output, returnValue, rowsAffected, columns) => {
556582
if (this.stream) {
557583
if (err) this.emit('error', err)
558584
err = null
@@ -574,7 +600,7 @@ class Request extends EventEmitter {
574600
}
575601
if (this.arrayRowMode) result.columns = columns
576602
callback(null, result)
577-
})
603+
}])
578604
return this
579605
}
580606

lib/diagnostics.js

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,30 @@ function tracePromise (name, fn, contextFactory) {
105105
return fn()
106106
}
107107

108+
/**
109+
* Trace a callback-style async operation using a TracingChannel.
110+
*
111+
* When subscribers are active, delegates to TracingChannel.traceCallback,
112+
* which replaces the callback at `position` in `args` with a wrapped
113+
* version that publishes to start/end/asyncStart/asyncEnd/error. When
114+
* no subscribers are active, calls `fn` directly with zero overhead.
115+
*
116+
* @param {string} name - TracingChannel name (one of CHANNELS.TRACE_*)
117+
* @param {Function} fn - The function to call (receives callback at `position`)
118+
* @param {number} position - Index of the callback within `args`
119+
* @param {Function} contextFactory - Factory function returning the context object
120+
* @param {*} thisArg - `this` binding for fn
121+
* @param {Array} args - Arguments to pass to fn (includes the callback at `position`)
122+
* @returns {*} The return value of fn
123+
*/
124+
function traceCallback (name, fn, position, contextFactory, thisArg, args) {
125+
const channel = tracingChannels[name]
126+
if (tracingChannelHasSubscribers(channel)) {
127+
return channel.traceCallback(fn, position, contextFactory(), thisArg, ...args)
128+
}
129+
return fn.apply(thisArg, args)
130+
}
131+
108132
/**
109133
* Publish a point event on a named channel.
110134
*
@@ -124,5 +148,6 @@ module.exports = {
124148
CHANNELS,
125149
tracingChannels,
126150
tracePromise,
151+
traceCallback,
127152
publish
128153
}

test/common/diagnostics.js

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,4 +371,117 @@ describe('Diagnostics Channel', () => {
371371
}
372372
})
373373
})
374+
375+
// Exercise the callback API through TracingChannel#traceCallback, which
376+
// shares the sub-event channels (start/asyncEnd/error/...) with
377+
// tracePromise — subscribers should not need to branch by API style.
378+
describe('Callback API tracing', () => {
379+
const sql = require('../../')
380+
381+
function collectTraces (tracingChannel) {
382+
const events = []
383+
const handlers = {
384+
start: (ctx) => events.push({ event: 'start', ctx }),
385+
end: (ctx) => events.push({ event: 'end', ctx }),
386+
asyncStart: (ctx) => events.push({ event: 'asyncStart', ctx }),
387+
asyncEnd: (ctx) => events.push({ event: 'asyncEnd', ctx }),
388+
error: (ctx) => events.push({ event: 'error', ctx })
389+
}
390+
tracingChannel.subscribe(handlers)
391+
return {
392+
events,
393+
stop () { tracingChannel.unsubscribe(handlers) }
394+
}
395+
}
396+
397+
it('request.query(cb) emits TRACE_QUERY start/asyncEnd with context', (done) => {
398+
const req = new sql.Request()
399+
req._query = (cmd, cb) => setImmediate(cb, null, [[{ x: 1 }]], {}, 1)
400+
req.input('id', sql.Int, 42)
401+
402+
const tc = tracingChannels[CHANNELS.TRACE_QUERY]
403+
const { events, stop } = collectTraces(tc)
404+
req.query('SELECT @id', (err, result) => {
405+
assert.ifError(err)
406+
assert.ok(result)
407+
// asyncEnd fires after this callback returns, so defer the check.
408+
setImmediate(() => {
409+
try {
410+
const starts = events.filter(e => e.event === 'start')
411+
const asyncEnds = events.filter(e => e.event === 'asyncEnd')
412+
assert.strictEqual(starts.length, 1)
413+
assert.strictEqual(asyncEnds.length, 1)
414+
assert.strictEqual(starts[0].ctx.command, 'SELECT @id')
415+
assert.deepStrictEqual(starts[0].ctx.parameters, ['id'])
416+
done()
417+
} catch (e) {
418+
done(e)
419+
} finally {
420+
stop()
421+
}
422+
})
423+
})
424+
})
425+
426+
it('request.query(cb) emits TRACE_QUERY error on driver failure', (done) => {
427+
const req = new sql.Request()
428+
const boom = new Error('boom')
429+
req._query = (cmd, cb) => setImmediate(cb, boom)
430+
431+
const tc = tracingChannels[CHANNELS.TRACE_QUERY]
432+
const { events, stop } = collectTraces(tc)
433+
req.query('SELECT 1', (err) => {
434+
try {
435+
assert.strictEqual(err, boom)
436+
const errors = events.filter(e => e.event === 'error')
437+
assert.strictEqual(errors.length, 1)
438+
assert.strictEqual(errors[0].ctx.error, boom)
439+
done()
440+
} catch (e) {
441+
done(e)
442+
} finally {
443+
stop()
444+
}
445+
})
446+
})
447+
448+
it('callback path honours the _internal flag', (done) => {
449+
const req = new sql.Request()
450+
req._internal = true
451+
req._query = (cmd, cb) => setImmediate(cb, null, [[]], {}, 0)
452+
453+
const tc = tracingChannels[CHANNELS.TRACE_QUERY]
454+
const { events, stop } = collectTraces(tc)
455+
req.query('SELECT 1', () => {
456+
try {
457+
assert.strictEqual(events.length, 0)
458+
done()
459+
} catch (e) {
460+
done(e)
461+
} finally {
462+
stop()
463+
}
464+
})
465+
})
466+
467+
it('request.execute(cb) emits TRACE_EXECUTE', (done) => {
468+
const req = new sql.Request()
469+
req._execute = (cmd, cb) => setImmediate(cb, null, [[]], {}, 0, 0)
470+
471+
const tc = tracingChannels[CHANNELS.TRACE_EXECUTE]
472+
const { events, stop } = collectTraces(tc)
473+
req.execute('sp_test', () => {
474+
try {
475+
const starts = events.filter(e => e.event === 'start')
476+
assert.strictEqual(starts.length, 1)
477+
assert.strictEqual(starts[0].ctx.procedure, 'sp_test')
478+
done()
479+
} catch (e) {
480+
done(e)
481+
} finally {
482+
stop()
483+
}
484+
})
485+
})
486+
})
374487
})

0 commit comments

Comments
 (0)