Skip to content

Commit 9f874e9

Browse files
Flush OTEL logs with Vercel waitUntil (#354)
1 parent f623214 commit 9f874e9

5 files changed

Lines changed: 168 additions & 20 deletions

File tree

bun.lock

Lines changed: 14 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
"@hookform/resolvers": "^5.2.2",
5050
"@marsidev/react-turnstile": "^1.4.1",
5151
"@next-safe-action/adapter-react-hook-form": "^2.0.0",
52-
"@next/env": "^16.2.6",
52+
"@next/env": "^16.2.7",
5353
"@opentelemetry/api": "^1.9.0",
5454
"@opentelemetry/auto-instrumentations-node": "^0.62.1",
5555
"@opentelemetry/exporter-logs-otlp-http": "^0.203.0",
@@ -110,7 +110,7 @@
110110
"micromatch": "^4.0.8",
111111
"motion": "^12.23.25",
112112
"nanoid": "^5.0.9",
113-
"next": "^16.2.6",
113+
"next": "^16.2.7",
114114
"next-safe-action": "^8.0.11",
115115
"next-themes": "^0.4.6",
116116
"nuqs": "^2.7.0",
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import { type Context, diag } from '@opentelemetry/api'
2+
import type { LogRecordProcessor, SdkLogRecord } from '@opentelemetry/sdk-logs'
3+
4+
type WaitUntilInput = Promise<unknown> | (() => Promise<unknown>)
5+
6+
interface VercelRequestContext {
7+
waitUntil: (promiseOrFunc: WaitUntilInput) => void
8+
}
9+
10+
interface VercelRequestContextReader {
11+
get: () => VercelRequestContext | undefined
12+
}
13+
14+
// This mirrors the private request-context hook used by @vercel/otel.
15+
const VERCEL_REQUEST_CONTEXT_SYMBOL = Symbol.for('@vercel/request-context')
16+
17+
function getVercelRequestContext(): VercelRequestContext | undefined {
18+
const reader = Reflect.get(globalThis, VERCEL_REQUEST_CONTEXT_SYMBOL) as
19+
| VercelRequestContextReader
20+
| undefined
21+
22+
return reader?.get()
23+
}
24+
25+
export class VercelWaitUntilLogRecordProcessor implements LogRecordProcessor {
26+
private readonly pendingFlushContexts = new WeakSet<VercelRequestContext>()
27+
28+
constructor(private readonly delegate: LogRecordProcessor) {}
29+
30+
onEmit(logRecord: SdkLogRecord, context?: Context): void {
31+
this.delegate.onEmit(logRecord, context)
32+
this.scheduleRequestFlush()
33+
}
34+
35+
forceFlush(): Promise<void> {
36+
return this.delegate.forceFlush()
37+
}
38+
39+
shutdown(): Promise<void> {
40+
return this.delegate.shutdown()
41+
}
42+
43+
private scheduleRequestFlush(): void {
44+
const requestContext = getVercelRequestContext()
45+
46+
if (!requestContext || this.pendingFlushContexts.has(requestContext)) {
47+
return
48+
}
49+
50+
this.pendingFlushContexts.add(requestContext)
51+
52+
try {
53+
requestContext.waitUntil(async () => {
54+
try {
55+
await this.delegate.forceFlush()
56+
} catch (error) {
57+
diag.warn(
58+
'Failed to flush OpenTelemetry logs in Vercel waitUntil',
59+
error
60+
)
61+
} finally {
62+
this.pendingFlushContexts.delete(requestContext)
63+
}
64+
})
65+
} catch (error) {
66+
this.pendingFlushContexts.delete(requestContext)
67+
diag.warn(
68+
'Failed to schedule OpenTelemetry log flush in Vercel waitUntil',
69+
error
70+
)
71+
}
72+
}
73+
}

src/instrumentation.node.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
ATTR_SERVICE_VERSION,
1616
} from '@opentelemetry/semantic-conventions'
1717
import { FetchInstrumentation } from '@vercel/otel'
18+
import { VercelWaitUntilLogRecordProcessor } from './core/server/observability/vercel-wait-until-log-record-processor'
1819

1920
function parseResourceAttributes(
2021
resourceAttrs?: string
@@ -76,10 +77,12 @@ const sdk = new NodeSDK({
7677
}),
7778
}),
7879
logRecordProcessors: [
79-
new BatchLogRecordProcessor(
80-
new OTLPLogExporter({
81-
url: `${OTEL_EXPORTER_OTLP_ENDPOINT}/v1/logs`,
82-
})
80+
new VercelWaitUntilLogRecordProcessor(
81+
new BatchLogRecordProcessor(
82+
new OTLPLogExporter({
83+
url: `${OTEL_EXPORTER_OTLP_ENDPOINT}/v1/logs`,
84+
})
85+
)
8386
),
8487
],
8588
instrumentations: [
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import type { Context } from '@opentelemetry/api'
2+
import type { LogRecordProcessor, SdkLogRecord } from '@opentelemetry/sdk-logs'
3+
import { afterEach, describe, expect, it, vi } from 'vitest'
4+
import { VercelWaitUntilLogRecordProcessor } from '@/core/server/observability/vercel-wait-until-log-record-processor'
5+
6+
type WaitUntilCallback = () => Promise<unknown>
7+
type WaitUntilInput = Promise<unknown> | WaitUntilCallback
8+
9+
const requestContextSymbol = Symbol.for('@vercel/request-context')
10+
const logRecord = {} as SdkLogRecord
11+
12+
class TestLogRecordProcessor implements LogRecordProcessor {
13+
readonly onEmit = vi.fn()
14+
readonly forceFlush = vi.fn(async () => {})
15+
readonly shutdown = vi.fn(async () => {})
16+
}
17+
18+
function installVercelRequestContext(
19+
waitUntil: (input: WaitUntilInput) => void
20+
) {
21+
const requestContext = {
22+
waitUntil,
23+
}
24+
25+
Reflect.set(globalThis, requestContextSymbol, {
26+
get: () => requestContext,
27+
})
28+
}
29+
30+
afterEach(() => {
31+
Reflect.deleteProperty(globalThis, requestContextSymbol)
32+
vi.clearAllMocks()
33+
})
34+
35+
describe('VercelWaitUntilLogRecordProcessor', () => {
36+
it('delegates log records without a Vercel request context', () => {
37+
const delegate = new TestLogRecordProcessor()
38+
const processor = new VercelWaitUntilLogRecordProcessor(delegate)
39+
40+
processor.onEmit(logRecord)
41+
42+
expect(delegate.onEmit).toHaveBeenCalledWith(logRecord, undefined)
43+
expect(delegate.forceFlush).not.toHaveBeenCalled()
44+
})
45+
46+
it('schedules one waitUntil flush per request context', async () => {
47+
const callbacks: WaitUntilCallback[] = []
48+
const waitUntil = vi.fn((input: WaitUntilInput) => {
49+
callbacks.push(
50+
typeof input === 'function' ? input : async () => await input
51+
)
52+
})
53+
installVercelRequestContext(waitUntil)
54+
55+
const delegate = new TestLogRecordProcessor()
56+
const processor = new VercelWaitUntilLogRecordProcessor(delegate)
57+
58+
processor.onEmit(logRecord)
59+
processor.onEmit(logRecord)
60+
61+
expect(delegate.onEmit).toHaveBeenCalledTimes(2)
62+
expect(waitUntil).toHaveBeenCalledTimes(1)
63+
64+
await callbacks[0]?.()
65+
66+
expect(delegate.forceFlush).toHaveBeenCalledTimes(1)
67+
68+
processor.onEmit(logRecord, {} as Context)
69+
70+
expect(waitUntil).toHaveBeenCalledTimes(2)
71+
})
72+
})

0 commit comments

Comments
 (0)