1414 * limitations under the License.
1515 */
1616
17+ import { Context } from '@opentelemetry/api' ;
1718import { NodeSDK } from '@opentelemetry/sdk-node' ;
1819import {
1920 BatchSpanProcessor ,
2021 SimpleSpanProcessor ,
22+ type ReadableSpan ,
2123 type SpanProcessor ,
2224} from '@opentelemetry/sdk-trace-base' ;
2325import { logger } from '../logging.js' ;
@@ -29,6 +31,7 @@ import { RealtimeSpanProcessor } from './realtime-span-processor.js';
2931
3032let telemetrySDK : NodeSDK | null = null ;
3133let nodeOtelConfig : TelemetryConfig | null = null ;
34+ let isSigtermHandlerRegistered = false ;
3235
3336export function initNodeTelemetryProvider ( ) {
3437 setTelemetryProvider ( {
@@ -37,6 +40,35 @@ export function initNodeTelemetryProvider() {
3740 } ) ;
3841}
3942
43+ /**
44+ * MultiSpanProcessor is a multiplexer that allows Genkit to register multiple
45+ * span processors reliably.
46+ *
47+ * It is used instead of providing a `spanProcessors` array to the OpenTelemetry NodeSDK
48+ * because the SDK's internal logic for merging `traceExporter`, `spanProcessor` (singular),
49+ * and `spanProcessors` (plural) varies across versions and can lead to exporters being
50+ * silently overwritten or ignored.
51+ *
52+ * By wrapping all processors into this single delegate, we guarantee that Genkit's
53+ * internal telemetry and any user-provided exporters both receive every start and
54+ * end event, providing a robust compatibility layer during the OTel 2.0 upgrade.
55+ */
56+ class MultiSpanProcessor implements SpanProcessor {
57+ constructor ( private processors : SpanProcessor [ ] ) { }
58+ onStart ( span : any , parentContext : Context ) {
59+ this . processors . forEach ( ( p ) => p . onStart ?.( span , parentContext ) ) ;
60+ }
61+ onEnd ( span : ReadableSpan ) {
62+ this . processors . forEach ( ( p ) => p . onEnd ( span ) ) ;
63+ }
64+ async forceFlush ( ) {
65+ await Promise . all ( this . processors . map ( ( p ) => p . forceFlush ( ) ) ) ;
66+ }
67+ async shutdown ( ) {
68+ await Promise . all ( this . processors . map ( ( p ) => p . shutdown ( ) ) ) ;
69+ }
70+ }
71+
4072/**
4173 * Enables tracing and metrics open telemetry configuration.
4274 */
@@ -52,23 +84,48 @@ async function enableTelemetry(
5284 ? await telemetryConfig
5385 : telemetryConfig ;
5486
87+ // If already initialized and new config is empty, skip to avoid unnecessary restarts
88+ if (
89+ telemetrySDK &&
90+ ( ! telemetryConfig ||
91+ ( ! telemetryConfig . spanProcessors &&
92+ ! telemetryConfig . spanProcessor &&
93+ ! telemetryConfig . traceExporter &&
94+ ! telemetryConfig . metricReader ) )
95+ ) {
96+ return ;
97+ }
98+
5599 nodeOtelConfig = telemetryConfig || { } ;
56100
57- const processors : SpanProcessor [ ] = [ createTelemetryServerProcessor ( ) ] ;
58- if ( nodeOtelConfig . traceExporter ) {
59- throw new Error ( 'Please specify spanProcessors instead.' ) ;
60- }
101+ const processors : SpanProcessor [ ] = [ ] ;
61102 if ( nodeOtelConfig . spanProcessors ) {
62103 processors . push ( ...nodeOtelConfig . spanProcessors ) ;
63104 }
64105 if ( nodeOtelConfig . spanProcessor ) {
65106 processors . push ( nodeOtelConfig . spanProcessor ) ;
66107 delete nodeOtelConfig . spanProcessor ;
67108 }
68- nodeOtelConfig . spanProcessors = processors ;
109+ processors . push ( createTelemetryServerProcessor ( ) ) ;
110+
111+ if ( processors . length > 1 ) {
112+ nodeOtelConfig . spanProcessor = new MultiSpanProcessor ( processors ) ;
113+ } else {
114+ nodeOtelConfig . spanProcessor = processors [ 0 ] ;
115+ }
116+ delete nodeOtelConfig . spanProcessors ;
117+
118+ if ( telemetrySDK ) {
119+ await cleanUpTracing ( ) ;
120+ }
121+
69122 telemetrySDK = new NodeSDK ( nodeOtelConfig ) ;
70123 telemetrySDK . start ( ) ;
71- process . on ( 'SIGTERM' , async ( ) => await cleanUpTracing ( ) ) ;
124+
125+ if ( ! isSigtermHandlerRegistered ) {
126+ process . on ( 'SIGTERM' , async ( ) => await cleanUpTracing ( ) ) ;
127+ isSigtermHandlerRegistered = true ;
128+ }
72129}
73130
74131async function cleanUpTracing ( ) : Promise < void > {
@@ -113,7 +170,7 @@ function maybeFlushMetrics(): Promise<void> {
113170 * Flushes all configured span processors.
114171 */
115172async function flushTracing ( ) {
116- if ( nodeOtelConfig ?. spanProcessors ) {
117- await Promise . all ( nodeOtelConfig . spanProcessors . map ( ( p ) => p . forceFlush ( ) ) ) ;
173+ if ( nodeOtelConfig ?. spanProcessor ) {
174+ await nodeOtelConfig . spanProcessor . forceFlush ( ) ;
118175 }
119176}
0 commit comments