Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions js/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
"author": "genkit",
"license": "Apache-2.0",
"dependencies": {
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/context-async-hooks": "~1.25.0",
"@opentelemetry/core": "~1.25.0",
"@opentelemetry/sdk-metrics": "~1.25.0",
"@opentelemetry/sdk-node": "^0.52.0",
"@opentelemetry/sdk-trace-base": "~1.25.0",
"@opentelemetry/api": "^1.9.1",
"@opentelemetry/context-async-hooks": "^2.6.1",
"@opentelemetry/core": "^2.6.1",
"@opentelemetry/sdk-metrics": "^2.6.1",
"@opentelemetry/sdk-node": "^0.214.0",
"@opentelemetry/sdk-trace-base": "^2.6.1",
"@types/json-schema": "^7.0.15",
"ajv": "^8.12.0",
"ajv-formats": "^3.0.1",
Expand Down
7 changes: 4 additions & 3 deletions js/core/src/tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ export async function enableTelemetry(
if (isOTelInitializationDisabled()) {
return;
}
global[instrumentationKey] =
telemetryConfig instanceof Promise ? telemetryConfig : Promise.resolve();
return getTelemetryProvider().enableTelemetry(telemetryConfig);
const instrumentationPromise =
getTelemetryProvider().enableTelemetry(telemetryConfig);
global[instrumentationKey] = instrumentationPromise;
return instrumentationPromise;
}

/**
Expand Down
30 changes: 15 additions & 15 deletions js/core/src/tracing/exporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export class TraceServerExporter implements SpanExporter {
displayName: span.name,
links: span.links,
spanKind: SpanKind[span.kind],
parentSpanId: span.parentSpanId,
parentSpanId: span.parentSpanContext?.spanId,
sameProcessAsParentSpan: { value: !span.spanContext().isRemote },
status: span.status,
timeEvents: {
Expand All @@ -86,17 +86,17 @@ export class TraceServerExporter implements SpanExporter {
})),
},
};
if (span.instrumentationLibrary !== undefined) {
if (span.instrumentationScope !== undefined) {
spanData.instrumentationLibrary = {
name: span.instrumentationLibrary.name,
name: span.instrumentationScope.name,
};
if (span.instrumentationLibrary.schemaUrl !== undefined) {
if (span.instrumentationScope.schemaUrl !== undefined) {
spanData.instrumentationLibrary.schemaUrl =
span.instrumentationLibrary.schemaUrl;
span.instrumentationScope.schemaUrl;
}
if (span.instrumentationLibrary.version !== undefined) {
if (span.instrumentationScope.version !== undefined) {
spanData.instrumentationLibrary.version =
span.instrumentationLibrary.version;
span.instrumentationScope.version;
}
}
deleteUndefinedProps(spanData);
Expand Down Expand Up @@ -127,19 +127,19 @@ export class TraceServerExporter implements SpanExporter {
await this.save(traceId, traces[traceId]);
} catch (e) {
error = true;
logger.error(`Failed to save trace ${traceId}`, e);
}
if (done) {
return done({
code: error ? ExportResultCode.FAILED : ExportResultCode.SUCCESS,
});
logger.defaultLogger.error(`Failed to save trace ${traceId}`, e);
}
}
if (done) {
return done({
code: error ? ExportResultCode.FAILED : ExportResultCode.SUCCESS,
});
}
}

private async save(traceId, spans: ReadableSpan[]): Promise<void> {
private async save(traceId: string, spans: ReadableSpan[]): Promise<void> {
if (!telemetryServerUrl) {
logger.debug(
logger.defaultLogger.debug(
`Telemetry server is not configured, trace ${traceId} not saved!`
);
return;
Expand Down
145 changes: 131 additions & 14 deletions js/core/src/tracing/node-telemetry-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
* limitations under the License.
*/

import { Context } from '@opentelemetry/api';
Comment thread
MichaelDoyle marked this conversation as resolved.
import { MetricReader } from '@opentelemetry/sdk-metrics';
import { NodeSDK } from '@opentelemetry/sdk-node';
import {
BatchSpanProcessor,
SimpleSpanProcessor,
type ReadableSpan,
type Span,
type SpanProcessor,
} from '@opentelemetry/sdk-trace-base';
import { logger } from '../logging.js';
Expand All @@ -29,6 +33,7 @@ import { RealtimeSpanProcessor } from './realtime-span-processor.js';

let telemetrySDK: NodeSDK | null = null;
let nodeOtelConfig: TelemetryConfig | null = null;
let isSigtermHandlerRegistered = false;

export function initNodeTelemetryProvider() {
setTelemetryProvider({
Expand All @@ -37,6 +42,72 @@ export function initNodeTelemetryProvider() {
});
}

/**
* MultiSpanProcessor is a multiplexer that allows Genkit to register multiple
* span processors reliably.
*
* We provide it as the sole entry in the `spanProcessors` array to the
* OpenTelemetry NodeSDK because the SDK's internal logic for merging
* `traceExporter`, `spanProcessor` (singular), and `spanProcessors` (plural)
* varies across versions and can lead to exporters being silently overwritten
* or ignored.
*
* By wrapping all processors into this single delegate, we guarantee that
* Genkit's internal telemetry and any user-provided exporters both receive
* every start and end event.
*/
export class MultiSpanProcessor implements SpanProcessor {
constructor(private processors: SpanProcessor[]) {}
onStart(span: Span, parentContext: Context) {
this.processors.forEach((p) => {
try {
p.onStart(span, parentContext);
} catch (e) {
logger.defaultLogger.error(
`Error in span processor (${p.constructor.name}) onStart: ${e}`
);
}
});
}
onEnd(span: ReadableSpan) {
this.processors.forEach((p) => {
try {
p.onEnd(span);
} catch (e) {
logger.defaultLogger.error(
`Error in span processor (${p.constructor.name}) onEnd: ${e}`
);
}
});
}
async forceFlush() {
await Promise.all(
this.processors.map(async (p) => {
try {
await p.forceFlush();
} catch (e) {
logger.defaultLogger.error(
`Error in span processor (${p.constructor.name}) forceFlush: ${e}`
);
}
})
);
}
async shutdown() {
await Promise.all(
this.processors.map(async (p) => {
try {
await p.shutdown();
} catch (e) {
logger.defaultLogger.error(
`Error in span processor (${p.constructor.name}) shutdown: ${e}`
);
}
})
);
}
}

/**
* Enables tracing and metrics open telemetry configuration.
*/
Expand All @@ -52,23 +123,47 @@ async function enableTelemetry(
? await telemetryConfig
: telemetryConfig;

nodeOtelConfig = telemetryConfig || {};
if (telemetrySDK) {
await cleanUpTracing();
}

nodeOtelConfig = { ...telemetryConfig };

const processors: SpanProcessor[] = [createTelemetryServerProcessor()];
if (nodeOtelConfig.traceExporter) {
throw new Error('Please specify spanProcessors instead.');
}
if (nodeOtelConfig.spanProcessors) {
processors.push(...nodeOtelConfig.spanProcessors);
}
if (nodeOtelConfig.spanProcessor) {
processors.push(nodeOtelConfig.spanProcessor);
delete nodeOtelConfig.spanProcessor;
}
nodeOtelConfig.spanProcessors = processors;
if (nodeOtelConfig.traceExporter) {
processors.push(new BatchSpanProcessor(nodeOtelConfig.traceExporter));
}

if (processors.length > 1) {
nodeOtelConfig.spanProcessors = [new MultiSpanProcessor(processors)];
} else {
nodeOtelConfig.spanProcessors = processors;
}
delete nodeOtelConfig.spanProcessor;
delete nodeOtelConfig.traceExporter;

telemetrySDK = new NodeSDK(nodeOtelConfig);
telemetrySDK.start();
process.on('SIGTERM', async () => await cleanUpTracing());

if (!isSigtermHandlerRegistered) {
let isShuttingDown = false;
const shutdownHandler = (signal: string) => () => {
if (isShuttingDown) return; // Prevent SIGTERM + SIGINT race
isShuttingDown = true;
cleanUpTracing().finally(() => {
process.kill(process.pid, signal);
});
};
process.once('SIGTERM', shutdownHandler('SIGTERM'));
process.once('SIGINT', shutdownHandler('SIGINT'));
isSigtermHandlerRegistered = true;
}
}

async function cleanUpTracing(): Promise<void> {
Expand All @@ -79,10 +174,20 @@ async function cleanUpTracing(): Promise<void> {
// Metrics are not flushed as part of the shutdown operation. If metrics
// are enabled, we need to manually flush them *before* the reader
// receives shutdown order.
await maybeFlushMetrics();
await telemetrySDK.shutdown();
logger.debug('OpenTelemetry SDK shut down.');
telemetrySDK = null;
try {
await maybeFlushMetrics();
} catch (e) {
logger.defaultLogger.error(`Error flushing metrics during shutdown: ${e}`);
}

try {
await telemetrySDK.shutdown();
logger.debug('OpenTelemetry SDK shut down.');
Comment thread
MichaelDoyle marked this conversation as resolved.
} catch (e) {
logger.defaultLogger.error(`Error shutting down OpenTelemetry SDK: ${e}`);
} finally {
telemetrySDK = null;
}
}

/**
Expand All @@ -102,11 +207,23 @@ function createTelemetryServerProcessor(): SpanProcessor {
}

/** Flush metrics if present. */
function maybeFlushMetrics(): Promise<void> {
async function maybeFlushMetrics(): Promise<void> {
const readers: MetricReader[] = [];
if (nodeOtelConfig?.metricReader) {
return nodeOtelConfig.metricReader.forceFlush();
readers.push(nodeOtelConfig.metricReader as MetricReader);
}
if (nodeOtelConfig?.metricReaders) {
readers.push(...(nodeOtelConfig.metricReaders as MetricReader[]));
}
return Promise.resolve();
await Promise.all(
readers.map(async (r) => {
try {
await r.forceFlush();
} catch (e) {
logger.defaultLogger.error(`Error flushing metrics: ${e}`);
}
})
);
}

/**
Expand Down
6 changes: 4 additions & 2 deletions js/core/tests/action_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { z } from 'zod';
import { action, defineAction } from '../src/action.js';
import { initNodeFeatures } from '../src/node.js';
import { Registry } from '../src/registry.js';
import { enableTelemetry } from '../src/tracing.js';
import { enableTelemetry, flushTracing } from '../src/tracing.js';
import { TestSpanExporter } from './utils.js';

initNodeFeatures();
Expand All @@ -33,8 +33,9 @@ enableTelemetry({

describe('action', () => {
var registry: Registry;
beforeEach(() => {
beforeEach(async () => {
registry = new Registry();
await flushTracing();
spanExporter.exportedSpans = [];
});

Expand Down Expand Up @@ -289,6 +290,7 @@ describe('action', () => {
act.__action.key = 'some-custom-key';

await act();
await flushTracing();

assert.strictEqual(spanExporter.exportedSpans.length, 1);
assert.strictEqual(
Expand Down
12 changes: 10 additions & 2 deletions js/core/tests/flow_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { defineFlow, run } from '../src/flow.js';
import { defineAction, getContext, z } from '../src/index.js';
import { initNodeFeatures } from '../src/node.js';
import { Registry } from '../src/registry.js';
import { enableTelemetry } from '../src/tracing.js';
import { enableTelemetry, flushTracing } from '../src/tracing.js';
import { TestSpanExporter } from './utils.js';

initNodeFeatures();
Expand All @@ -48,10 +48,12 @@ function createTestFlow(registry: Registry) {
describe('flow', () => {
let registry: Registry;

beforeEach(() => {
beforeEach(async () => {
// Skips starting reflection server.
delete process.env.GENKIT_ENV;
registry = new Registry();
await flushTracing();
spanExporter.exportedSpans = [];
});

describe('runFlow', () => {
Expand Down Expand Up @@ -370,6 +372,8 @@ describe('flow', () => {
telemetryLabels: { custom: 'label' },
});

await flushTracing();

assert.equal(result, 'bar foo');
assert.strictEqual(spanExporter.exportedSpans.length, 1);
assert.strictEqual(spanExporter.exportedSpans[0].displayName, 'testFlow');
Expand All @@ -394,6 +398,8 @@ describe('flow', () => {
});
const result = await output;

await flushTracing();

assert.equal(result, 'bar foo');
assert.strictEqual(spanExporter.exportedSpans.length, 1);
assert.strictEqual(spanExporter.exportedSpans[0].displayName, 'testFlow');
Expand Down Expand Up @@ -445,6 +451,8 @@ describe('flow', () => {
context: { user: 'pavel' },
});

await flushTracing();

assert.equal(result, 'foo bar');
assert.strictEqual(spanExporter.exportedSpans.length, 3);

Expand Down
Loading
Loading