|
1 | 1 | import { createRequire } from "module" |
2 | 2 | import { ExportResultCode, type ExportResult } from "@opentelemetry/core" |
3 | | -import type { PushMetricExporter, ResourceMetrics, InstrumentType } from "@opentelemetry/sdk-metrics" |
4 | | -import type { AggregationOption } from "@opentelemetry/sdk-metrics/build/src/view/AggregationOption" |
5 | | -import type { AggregationTemporality } from "@opentelemetry/sdk-metrics/build/src/export/AggregationTemporality" |
| 3 | +import type { PushMetricExporter, ResourceMetrics } from "@opentelemetry/sdk-metrics" |
6 | 4 | import type { SpanExporter, ReadableSpan } from "@opentelemetry/sdk-trace-base" |
7 | 5 | import type { LogRecordExporter, ReadableLogRecord } from "@opentelemetry/sdk-logs" |
8 | 6 | import type { Metadata } from "@grpc/grpc-js" |
9 | 7 |
|
10 | 8 | const require = createRequire(import.meta.url) |
| 9 | +const DEFAULT_HELPER_TIMEOUT_MS = 5000 |
11 | 10 |
|
12 | 11 | type Exporter<T> = { |
13 | 12 | export(items: T, resultCallback: (result: ExportResult) => void): void |
14 | 13 | shutdown(): Promise<void> |
15 | 14 | forceFlush?(): Promise<void> |
16 | 15 | } |
17 | 16 |
|
| 17 | +type SelectAggregation = NonNullable<PushMetricExporter["selectAggregation"]> |
| 18 | +type SelectAggregationTemporality = NonNullable<PushMetricExporter["selectAggregationTemporality"]> |
| 19 | + |
18 | 20 | export type HeadersMap = Record<string, string> |
19 | 21 |
|
20 | 22 | export function parseOtlpHeaders(raw: string | undefined): HeadersMap { |
@@ -55,6 +57,7 @@ export class DynamicHeaders { |
55 | 57 | constructor( |
56 | 58 | private readonly staticHeaders: HeadersMap, |
57 | 59 | private readonly helper: string | undefined, |
| 60 | + private readonly helperTimeoutMs = DEFAULT_HELPER_TIMEOUT_MS, |
58 | 61 | ) { |
59 | 62 | this.headers = { ...staticHeaders } |
60 | 63 | } |
@@ -86,12 +89,20 @@ export class DynamicHeaders { |
86 | 89 | } |
87 | 90 |
|
88 | 91 | private async runHelper(): Promise<HeadersMap> { |
89 | | - const proc = Bun.spawn([this.helper!], { stdout: "pipe", stderr: "pipe" }) |
| 92 | + const proc = Bun.spawn([this.helper!], { |
| 93 | + stdout: "pipe", |
| 94 | + stderr: "pipe", |
| 95 | + timeout: this.helperTimeoutMs, |
| 96 | + killSignal: "SIGTERM", |
| 97 | + }) |
90 | 98 | const [stdout, stderr, exitCode] = await Promise.all([ |
91 | 99 | new Response(proc.stdout).text(), |
92 | 100 | new Response(proc.stderr).text(), |
93 | 101 | proc.exited, |
94 | 102 | ]) |
| 103 | + if (proc.signalCode) { |
| 104 | + throw new Error(`OTLP headers helper was terminated by ${proc.signalCode}`) |
| 105 | + } |
95 | 106 | if (exitCode !== 0) { |
96 | 107 | const detail = stderr.trim() || `exit code ${exitCode}` |
97 | 108 | throw new Error(`OTLP headers helper failed: ${detail}`) |
@@ -133,11 +144,13 @@ export class RefreshingMetricExporter implements PushMetricExporter { |
133 | 144 | return this.exporter.shutdown() |
134 | 145 | } |
135 | 146 |
|
136 | | - selectAggregationTemporality(instrumentType: InstrumentType): AggregationTemporality { |
| 147 | + selectAggregationTemporality( |
| 148 | + instrumentType: Parameters<SelectAggregationTemporality>[0], |
| 149 | + ): ReturnType<SelectAggregationTemporality> { |
137 | 150 | return this.exporter.selectAggregationTemporality!(instrumentType) |
138 | 151 | } |
139 | 152 |
|
140 | | - selectAggregation(instrumentType: InstrumentType): AggregationOption { |
| 153 | + selectAggregation(instrumentType: Parameters<SelectAggregation>[0]): ReturnType<SelectAggregation> { |
141 | 154 | return this.exporter.selectAggregation!(instrumentType) |
142 | 155 | } |
143 | 156 |
|
|
0 commit comments