1414 * limitations under the License.
1515 */
1616
17+ import { Context } from '@opentelemetry/api' ;
18+ import { MetricReader } from '@opentelemetry/sdk-metrics' ;
1719import { NodeSDK } from '@opentelemetry/sdk-node' ;
1820import {
1921 BatchSpanProcessor ,
2022 SimpleSpanProcessor ,
23+ type ReadableSpan ,
24+ type Span ,
2125 type SpanProcessor ,
2226} from '@opentelemetry/sdk-trace-base' ;
2327import { logger } from '../logging.js' ;
@@ -29,6 +33,7 @@ import { RealtimeSpanProcessor } from './realtime-span-processor.js';
2933
3034let telemetrySDK : NodeSDK | null = null ;
3135let nodeOtelConfig : TelemetryConfig | null = null ;
36+ let isSigtermHandlerRegistered = false ;
3237
3338export function initNodeTelemetryProvider ( ) {
3439 setTelemetryProvider ( {
@@ -37,6 +42,72 @@ export function initNodeTelemetryProvider() {
3742 } ) ;
3843}
3944
45+ /**
46+ * MultiSpanProcessor is a multiplexer that allows Genkit to register multiple
47+ * span processors reliably.
48+ *
49+ * We provide it as the sole entry in the `spanProcessors` array to the
50+ * OpenTelemetry NodeSDK because the SDK's internal logic for merging
51+ * `traceExporter`, `spanProcessor` (singular), and `spanProcessors` (plural)
52+ * varies across versions and can lead to exporters being silently overwritten
53+ * or ignored.
54+ *
55+ * By wrapping all processors into this single delegate, we guarantee that
56+ * Genkit's internal telemetry and any user-provided exporters both receive
57+ * every start and end event.
58+ */
59+ export class MultiSpanProcessor implements SpanProcessor {
60+ constructor ( private processors : SpanProcessor [ ] ) { }
61+ onStart ( span : Span , parentContext : Context ) {
62+ this . processors . forEach ( ( p ) => {
63+ try {
64+ p . onStart ( span , parentContext ) ;
65+ } catch ( e ) {
66+ logger . defaultLogger . error (
67+ `Error in span processor (${ p . constructor . name } ) onStart: ${ e } `
68+ ) ;
69+ }
70+ } ) ;
71+ }
72+ onEnd ( span : ReadableSpan ) {
73+ this . processors . forEach ( ( p ) => {
74+ try {
75+ p . onEnd ( span ) ;
76+ } catch ( e ) {
77+ logger . defaultLogger . error (
78+ `Error in span processor (${ p . constructor . name } ) onEnd: ${ e } `
79+ ) ;
80+ }
81+ } ) ;
82+ }
83+ async forceFlush ( ) {
84+ await Promise . all (
85+ this . processors . map ( async ( p ) => {
86+ try {
87+ await p . forceFlush ( ) ;
88+ } catch ( e ) {
89+ logger . defaultLogger . error (
90+ `Error in span processor (${ p . constructor . name } ) forceFlush: ${ e } `
91+ ) ;
92+ }
93+ } )
94+ ) ;
95+ }
96+ async shutdown ( ) {
97+ await Promise . all (
98+ this . processors . map ( async ( p ) => {
99+ try {
100+ await p . shutdown ( ) ;
101+ } catch ( e ) {
102+ logger . defaultLogger . error (
103+ `Error in span processor (${ p . constructor . name } ) shutdown: ${ e } `
104+ ) ;
105+ }
106+ } )
107+ ) ;
108+ }
109+ }
110+
40111/**
41112 * Enables tracing and metrics open telemetry configuration.
42113 */
@@ -52,23 +123,44 @@ async function enableTelemetry(
52123 ? await telemetryConfig
53124 : telemetryConfig ;
54125
55- nodeOtelConfig = telemetryConfig || { } ;
126+ if ( telemetrySDK ) {
127+ await cleanUpTracing ( ) ;
128+ }
129+
130+ nodeOtelConfig = { ...telemetryConfig } ;
56131
57132 const processors : SpanProcessor [ ] = [ createTelemetryServerProcessor ( ) ] ;
58- if ( nodeOtelConfig . traceExporter ) {
59- throw new Error ( 'Please specify spanProcessors instead.' ) ;
60- }
61133 if ( nodeOtelConfig . spanProcessors ) {
62134 processors . push ( ...nodeOtelConfig . spanProcessors ) ;
63135 }
64136 if ( nodeOtelConfig . spanProcessor ) {
65137 processors . push ( nodeOtelConfig . spanProcessor ) ;
66- delete nodeOtelConfig . spanProcessor ;
67138 }
68- nodeOtelConfig . spanProcessors = processors ;
139+ if ( nodeOtelConfig . traceExporter ) {
140+ processors . push ( new BatchSpanProcessor ( nodeOtelConfig . traceExporter ) ) ;
141+ }
142+
143+ if ( processors . length > 1 ) {
144+ nodeOtelConfig . spanProcessors = [ new MultiSpanProcessor ( processors ) ] ;
145+ } else {
146+ nodeOtelConfig . spanProcessors = processors ;
147+ }
148+ delete nodeOtelConfig . spanProcessor ;
149+ delete nodeOtelConfig . traceExporter ;
150+
69151 telemetrySDK = new NodeSDK ( nodeOtelConfig ) ;
70152 telemetrySDK . start ( ) ;
71- process . on ( 'SIGTERM' , async ( ) => await cleanUpTracing ( ) ) ;
153+
154+ if ( ! isSigtermHandlerRegistered ) {
155+ const shutdownHandler = ( signal : string ) => ( ) => {
156+ cleanUpTracing ( ) . finally ( ( ) => {
157+ process . kill ( process . pid , signal ) ;
158+ } ) ;
159+ } ;
160+ process . once ( 'SIGTERM' , shutdownHandler ( 'SIGTERM' ) ) ;
161+ process . once ( 'SIGINT' , shutdownHandler ( 'SIGINT' ) ) ;
162+ isSigtermHandlerRegistered = true ;
163+ }
72164}
73165
74166async function cleanUpTracing ( ) : Promise < void > {
@@ -79,10 +171,20 @@ async function cleanUpTracing(): Promise<void> {
79171 // Metrics are not flushed as part of the shutdown operation. If metrics
80172 // are enabled, we need to manually flush them *before* the reader
81173 // receives shutdown order.
82- await maybeFlushMetrics ( ) ;
83- await telemetrySDK . shutdown ( ) ;
84- logger . debug ( 'OpenTelemetry SDK shut down.' ) ;
85- telemetrySDK = null ;
174+ try {
175+ await maybeFlushMetrics ( ) ;
176+ } catch ( e ) {
177+ logger . error ( `Error flushing metrics during shutdown: ${ e } ` ) ;
178+ }
179+
180+ try {
181+ await telemetrySDK . shutdown ( ) ;
182+ logger . debug ( 'OpenTelemetry SDK shut down.' ) ;
183+ } catch ( e ) {
184+ logger . error ( `Error shutting down OpenTelemetry SDK: ${ e } ` ) ;
185+ } finally {
186+ telemetrySDK = null ;
187+ }
86188}
87189
88190/**
@@ -102,11 +204,23 @@ function createTelemetryServerProcessor(): SpanProcessor {
102204}
103205
104206/** Flush metrics if present. */
105- function maybeFlushMetrics ( ) : Promise < void > {
207+ async function maybeFlushMetrics ( ) : Promise < void > {
208+ const readers : MetricReader [ ] = [ ] ;
106209 if ( nodeOtelConfig ?. metricReader ) {
107- return nodeOtelConfig . metricReader . forceFlush ( ) ;
210+ readers . push ( nodeOtelConfig . metricReader as MetricReader ) ;
211+ }
212+ if ( nodeOtelConfig ?. metricReaders ) {
213+ readers . push ( ...( nodeOtelConfig . metricReaders as MetricReader [ ] ) ) ;
108214 }
109- return Promise . resolve ( ) ;
215+ await Promise . all (
216+ readers . map ( async ( r ) => {
217+ try {
218+ await r . forceFlush ( ) ;
219+ } catch ( e ) {
220+ logger . error ( `Error flushing metrics: ${ e } ` ) ;
221+ }
222+ } )
223+ ) ;
110224}
111225
112226/**
0 commit comments