Skip to content

Commit 9cdb9a7

Browse files
committed
fix(tracing): upgrade to otel 2.0
1 parent 9950eb8 commit 9cdb9a7

15 files changed

Lines changed: 2163 additions & 634 deletions

File tree

js/core/package.json

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@
2626
"author": "genkit",
2727
"license": "Apache-2.0",
2828
"dependencies": {
29-
"@opentelemetry/api": "^1.9.0",
30-
"@opentelemetry/context-async-hooks": "~1.25.0",
31-
"@opentelemetry/core": "~1.25.0",
32-
"@opentelemetry/sdk-metrics": "~1.25.0",
33-
"@opentelemetry/sdk-node": "^0.52.0",
34-
"@opentelemetry/sdk-trace-base": "~1.25.0",
29+
"@opentelemetry/api": "^1.9.1",
30+
"@opentelemetry/context-async-hooks": "^2.6.1",
31+
"@opentelemetry/core": "^2.6.1",
32+
"@opentelemetry/sdk-metrics": "^2.6.1",
33+
"@opentelemetry/sdk-node": "^0.214.0",
34+
"@opentelemetry/sdk-trace-base": "^2.6.1",
3535
"@types/json-schema": "^7.0.15",
3636
"ajv": "^8.12.0",
3737
"ajv-formats": "^3.0.1",

js/core/src/tracing.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,10 @@ export async function enableTelemetry(
100100
if (isOTelInitializationDisabled()) {
101101
return;
102102
}
103-
global[instrumentationKey] =
104-
telemetryConfig instanceof Promise ? telemetryConfig : Promise.resolve();
105-
return getTelemetryProvider().enableTelemetry(telemetryConfig);
103+
const instrumentationPromise =
104+
getTelemetryProvider().enableTelemetry(telemetryConfig);
105+
global[instrumentationKey] = instrumentationPromise;
106+
return instrumentationPromise;
106107
}
107108

108109
/**

js/core/src/tracing/exporter.ts

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ export class TraceServerExporter implements SpanExporter {
7373
displayName: span.name,
7474
links: span.links,
7575
spanKind: SpanKind[span.kind],
76-
parentSpanId: span.parentSpanId,
76+
parentSpanId: span.parentSpanContext?.spanId,
7777
sameProcessAsParentSpan: { value: !span.spanContext().isRemote },
7878
status: span.status,
7979
timeEvents: {
@@ -86,17 +86,17 @@ export class TraceServerExporter implements SpanExporter {
8686
})),
8787
},
8888
};
89-
if (span.instrumentationLibrary !== undefined) {
89+
if (span.instrumentationScope !== undefined) {
9090
spanData.instrumentationLibrary = {
91-
name: span.instrumentationLibrary.name,
91+
name: span.instrumentationScope.name,
9292
};
93-
if (span.instrumentationLibrary.schemaUrl !== undefined) {
93+
if (span.instrumentationScope.schemaUrl !== undefined) {
9494
spanData.instrumentationLibrary.schemaUrl =
95-
span.instrumentationLibrary.schemaUrl;
95+
span.instrumentationScope.schemaUrl;
9696
}
97-
if (span.instrumentationLibrary.version !== undefined) {
97+
if (span.instrumentationScope.version !== undefined) {
9898
spanData.instrumentationLibrary.version =
99-
span.instrumentationLibrary.version;
99+
span.instrumentationScope.version;
100100
}
101101
}
102102
deleteUndefinedProps(spanData);
@@ -127,19 +127,19 @@ export class TraceServerExporter implements SpanExporter {
127127
await this.save(traceId, traces[traceId]);
128128
} catch (e) {
129129
error = true;
130-
logger.error(`Failed to save trace ${traceId}`, e);
131-
}
132-
if (done) {
133-
return done({
134-
code: error ? ExportResultCode.FAILED : ExportResultCode.SUCCESS,
135-
});
130+
logger.defaultLogger.error(`Failed to save trace ${traceId}`, e);
136131
}
137132
}
133+
if (done) {
134+
return done({
135+
code: error ? ExportResultCode.FAILED : ExportResultCode.SUCCESS,
136+
});
137+
}
138138
}
139139

140-
private async save(traceId, spans: ReadableSpan[]): Promise<void> {
140+
private async save(traceId: string, spans: ReadableSpan[]): Promise<void> {
141141
if (!telemetryServerUrl) {
142-
logger.debug(
142+
logger.defaultLogger.debug(
143143
`Telemetry server is not configured, trace ${traceId} not saved!`
144144
);
145145
return;

js/core/src/tracing/node-telemetry-provider.ts

Lines changed: 131 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,14 @@
1414
* limitations under the License.
1515
*/
1616

17+
import { Context } from '@opentelemetry/api';
18+
import { MetricReader } from '@opentelemetry/sdk-metrics';
1719
import { NodeSDK } from '@opentelemetry/sdk-node';
1820
import {
1921
BatchSpanProcessor,
2022
SimpleSpanProcessor,
23+
type ReadableSpan,
24+
type Span,
2125
type SpanProcessor,
2226
} from '@opentelemetry/sdk-trace-base';
2327
import { logger } from '../logging.js';
@@ -29,6 +33,7 @@ import { RealtimeSpanProcessor } from './realtime-span-processor.js';
2933

3034
let telemetrySDK: NodeSDK | null = null;
3135
let nodeOtelConfig: TelemetryConfig | null = null;
36+
let isSigtermHandlerRegistered = false;
3237

3338
export function initNodeTelemetryProvider() {
3439
setTelemetryProvider({
@@ -37,6 +42,72 @@ export function initNodeTelemetryProvider() {
3742
});
3843
}
3944

45+
/**
46+
* MultiSpanProcessor is a multiplexer that allows Genkit to register multiple
47+
* span processors reliably.
48+
*
49+
* We provide it as the sole entry in the `spanProcessors` array to the
50+
* OpenTelemetry NodeSDK because the SDK's internal logic for merging
51+
* `traceExporter`, `spanProcessor` (singular), and `spanProcessors` (plural)
52+
* varies across versions and can lead to exporters being silently overwritten
53+
* or ignored.
54+
*
55+
* By wrapping all processors into this single delegate, we guarantee that
56+
* Genkit's internal telemetry and any user-provided exporters both receive
57+
* every start and end event.
58+
*/
59+
export class MultiSpanProcessor implements SpanProcessor {
60+
constructor(private processors: SpanProcessor[]) {}
61+
onStart(span: Span, parentContext: Context) {
62+
this.processors.forEach((p) => {
63+
try {
64+
p.onStart(span, parentContext);
65+
} catch (e) {
66+
logger.defaultLogger.error(
67+
`Error in span processor (${p.constructor.name}) onStart: ${e}`
68+
);
69+
}
70+
});
71+
}
72+
onEnd(span: ReadableSpan) {
73+
this.processors.forEach((p) => {
74+
try {
75+
p.onEnd(span);
76+
} catch (e) {
77+
logger.defaultLogger.error(
78+
`Error in span processor (${p.constructor.name}) onEnd: ${e}`
79+
);
80+
}
81+
});
82+
}
83+
async forceFlush() {
84+
await Promise.all(
85+
this.processors.map(async (p) => {
86+
try {
87+
await p.forceFlush();
88+
} catch (e) {
89+
logger.defaultLogger.error(
90+
`Error in span processor (${p.constructor.name}) forceFlush: ${e}`
91+
);
92+
}
93+
})
94+
);
95+
}
96+
async shutdown() {
97+
await Promise.all(
98+
this.processors.map(async (p) => {
99+
try {
100+
await p.shutdown();
101+
} catch (e) {
102+
logger.defaultLogger.error(
103+
`Error in span processor (${p.constructor.name}) shutdown: ${e}`
104+
);
105+
}
106+
})
107+
);
108+
}
109+
}
110+
40111
/**
41112
* Enables tracing and metrics open telemetry configuration.
42113
*/
@@ -52,23 +123,47 @@ async function enableTelemetry(
52123
? await telemetryConfig
53124
: telemetryConfig;
54125

55-
nodeOtelConfig = telemetryConfig || {};
126+
if (telemetrySDK) {
127+
await cleanUpTracing();
128+
}
129+
130+
nodeOtelConfig = { ...telemetryConfig };
56131

57132
const processors: SpanProcessor[] = [createTelemetryServerProcessor()];
58-
if (nodeOtelConfig.traceExporter) {
59-
throw new Error('Please specify spanProcessors instead.');
60-
}
61133
if (nodeOtelConfig.spanProcessors) {
62134
processors.push(...nodeOtelConfig.spanProcessors);
63135
}
64136
if (nodeOtelConfig.spanProcessor) {
65137
processors.push(nodeOtelConfig.spanProcessor);
66-
delete nodeOtelConfig.spanProcessor;
67138
}
68-
nodeOtelConfig.spanProcessors = processors;
139+
if (nodeOtelConfig.traceExporter) {
140+
processors.push(new BatchSpanProcessor(nodeOtelConfig.traceExporter));
141+
}
142+
143+
if (processors.length > 1) {
144+
nodeOtelConfig.spanProcessors = [new MultiSpanProcessor(processors)];
145+
} else {
146+
nodeOtelConfig.spanProcessors = processors;
147+
}
148+
delete nodeOtelConfig.spanProcessor;
149+
delete nodeOtelConfig.traceExporter;
150+
69151
telemetrySDK = new NodeSDK(nodeOtelConfig);
70152
telemetrySDK.start();
71-
process.on('SIGTERM', async () => await cleanUpTracing());
153+
154+
if (!isSigtermHandlerRegistered) {
155+
let isShuttingDown = false;
156+
const shutdownHandler = (signal: string) => () => {
157+
if (isShuttingDown) return; // Prevent SIGTERM + SIGINT race
158+
isShuttingDown = true;
159+
cleanUpTracing().finally(() => {
160+
process.kill(process.pid, signal);
161+
});
162+
};
163+
process.once('SIGTERM', shutdownHandler('SIGTERM'));
164+
process.once('SIGINT', shutdownHandler('SIGINT'));
165+
isSigtermHandlerRegistered = true;
166+
}
72167
}
73168

74169
async function cleanUpTracing(): Promise<void> {
@@ -79,10 +174,20 @@ async function cleanUpTracing(): Promise<void> {
79174
// Metrics are not flushed as part of the shutdown operation. If metrics
80175
// are enabled, we need to manually flush them *before* the reader
81176
// receives shutdown order.
82-
await maybeFlushMetrics();
83-
await telemetrySDK.shutdown();
84-
logger.debug('OpenTelemetry SDK shut down.');
85-
telemetrySDK = null;
177+
try {
178+
await maybeFlushMetrics();
179+
} catch (e) {
180+
logger.defaultLogger.error(`Error flushing metrics during shutdown: ${e}`);
181+
}
182+
183+
try {
184+
await telemetrySDK.shutdown();
185+
logger.debug('OpenTelemetry SDK shut down.');
186+
} catch (e) {
187+
logger.defaultLogger.error(`Error shutting down OpenTelemetry SDK: ${e}`);
188+
} finally {
189+
telemetrySDK = null;
190+
}
86191
}
87192

88193
/**
@@ -102,11 +207,23 @@ function createTelemetryServerProcessor(): SpanProcessor {
102207
}
103208

104209
/** Flush metrics if present. */
105-
function maybeFlushMetrics(): Promise<void> {
210+
async function maybeFlushMetrics(): Promise<void> {
211+
const readers: MetricReader[] = [];
106212
if (nodeOtelConfig?.metricReader) {
107-
return nodeOtelConfig.metricReader.forceFlush();
213+
readers.push(nodeOtelConfig.metricReader as MetricReader);
214+
}
215+
if (nodeOtelConfig?.metricReaders) {
216+
readers.push(...(nodeOtelConfig.metricReaders as MetricReader[]));
108217
}
109-
return Promise.resolve();
218+
await Promise.all(
219+
readers.map(async (r) => {
220+
try {
221+
await r.forceFlush();
222+
} catch (e) {
223+
logger.defaultLogger.error(`Error flushing metrics: ${e}`);
224+
}
225+
})
226+
);
110227
}
111228

112229
/**

js/core/tests/action_test.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { z } from 'zod';
2121
import { action, defineAction } from '../src/action.js';
2222
import { initNodeFeatures } from '../src/node.js';
2323
import { Registry } from '../src/registry.js';
24-
import { enableTelemetry } from '../src/tracing.js';
24+
import { enableTelemetry, flushTracing } from '../src/tracing.js';
2525
import { TestSpanExporter } from './utils.js';
2626

2727
initNodeFeatures();
@@ -33,8 +33,9 @@ enableTelemetry({
3333

3434
describe('action', () => {
3535
var registry: Registry;
36-
beforeEach(() => {
36+
beforeEach(async () => {
3737
registry = new Registry();
38+
await flushTracing();
3839
spanExporter.exportedSpans = [];
3940
});
4041

@@ -289,6 +290,7 @@ describe('action', () => {
289290
act.__action.key = 'some-custom-key';
290291

291292
await act();
293+
await flushTracing();
292294

293295
assert.strictEqual(spanExporter.exportedSpans.length, 1);
294296
assert.strictEqual(

js/core/tests/flow_test.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { defineFlow, run } from '../src/flow.js';
2121
import { defineAction, getContext, z } from '../src/index.js';
2222
import { initNodeFeatures } from '../src/node.js';
2323
import { Registry } from '../src/registry.js';
24-
import { enableTelemetry } from '../src/tracing.js';
24+
import { enableTelemetry, flushTracing } from '../src/tracing.js';
2525
import { TestSpanExporter } from './utils.js';
2626

2727
initNodeFeatures();
@@ -48,10 +48,12 @@ function createTestFlow(registry: Registry) {
4848
describe('flow', () => {
4949
let registry: Registry;
5050

51-
beforeEach(() => {
51+
beforeEach(async () => {
5252
// Skips starting reflection server.
5353
delete process.env.GENKIT_ENV;
5454
registry = new Registry();
55+
await flushTracing();
56+
spanExporter.exportedSpans = [];
5557
});
5658

5759
describe('runFlow', () => {
@@ -370,6 +372,8 @@ describe('flow', () => {
370372
telemetryLabels: { custom: 'label' },
371373
});
372374

375+
await flushTracing();
376+
373377
assert.equal(result, 'bar foo');
374378
assert.strictEqual(spanExporter.exportedSpans.length, 1);
375379
assert.strictEqual(spanExporter.exportedSpans[0].displayName, 'testFlow');
@@ -394,6 +398,8 @@ describe('flow', () => {
394398
});
395399
const result = await output;
396400

401+
await flushTracing();
402+
397403
assert.equal(result, 'bar foo');
398404
assert.strictEqual(spanExporter.exportedSpans.length, 1);
399405
assert.strictEqual(spanExporter.exportedSpans[0].displayName, 'testFlow');
@@ -445,6 +451,8 @@ describe('flow', () => {
445451
context: { user: 'pavel' },
446452
});
447453

454+
await flushTracing();
455+
448456
assert.equal(result, 'foo bar');
449457
assert.strictEqual(spanExporter.exportedSpans.length, 3);
450458

0 commit comments

Comments
 (0)