Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"@hookform/resolvers": "^5.2.2",
"@marsidev/react-turnstile": "^1.4.1",
"@next-safe-action/adapter-react-hook-form": "^2.0.0",
"@next/env": "^16.2.6",
"@next/env": "^16.2.7",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/auto-instrumentations-node": "^0.62.1",
"@opentelemetry/exporter-logs-otlp-http": "^0.203.0",
Expand Down Expand Up @@ -110,7 +110,7 @@
"micromatch": "^4.0.8",
"motion": "^12.23.25",
"nanoid": "^5.0.9",
"next": "^16.2.6",
"next": "^16.2.7",
"next-safe-action": "^8.0.11",
"next-themes": "^0.4.6",
"nuqs": "^2.7.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { type Context, diag } from '@opentelemetry/api'
import type { LogRecordProcessor, SdkLogRecord } from '@opentelemetry/sdk-logs'

type WaitUntilInput = Promise<unknown> | (() => Promise<unknown>)

interface VercelRequestContext {
waitUntil: (promiseOrFunc: WaitUntilInput) => void
}

interface VercelRequestContextReader {
get: () => VercelRequestContext | undefined
}

// This mirrors the private request-context hook used by @vercel/otel.
const VERCEL_REQUEST_CONTEXT_SYMBOL = Symbol.for('@vercel/request-context')

function getVercelRequestContext(): VercelRequestContext | undefined {
const reader = Reflect.get(globalThis, VERCEL_REQUEST_CONTEXT_SYMBOL) as
| VercelRequestContextReader
| undefined

return reader?.get()
}

export class VercelWaitUntilLogRecordProcessor implements LogRecordProcessor {
private readonly pendingFlushContexts = new WeakSet<VercelRequestContext>()

constructor(private readonly delegate: LogRecordProcessor) {}

onEmit(logRecord: SdkLogRecord, context?: Context): void {
this.delegate.onEmit(logRecord, context)
this.scheduleRequestFlush()
}

forceFlush(): Promise<void> {
return this.delegate.forceFlush()
}

shutdown(): Promise<void> {
return this.delegate.shutdown()
}

private scheduleRequestFlush(): void {
const requestContext = getVercelRequestContext()

if (!requestContext || this.pendingFlushContexts.has(requestContext)) {
return
}

this.pendingFlushContexts.add(requestContext)

try {
requestContext.waitUntil(async () => {
try {
await this.delegate.forceFlush()
} catch (error) {
diag.warn(
'Failed to flush OpenTelemetry logs in Vercel waitUntil',
error
)
} finally {
this.pendingFlushContexts.delete(requestContext)
}
})
} catch (error) {

Check failure on line 65 in src/core/server/observability/vercel-wait-until-log-record-processor.ts

View check run for this annotation

Claude / Claude Code Review

Race condition: logs emitted during in-flight flush may be dropped

The dedupe set marks the context as "pending" for the entire duration of `forceFlush()`, but `BatchLogRecordProcessor.forceFlush()` snapshots its internal buffer synchronously and then awaits the OTLP export. Any log emitted during that export window goes into the next batch — but `scheduleRequestFlush` short-circuits (line 46-48) because the context is still in `pendingFlushContexts`, so no new `waitUntil` is registered. When the in-flight flush resolves, Vercel can terminate the runtime before
Comment on lines +51 to +65

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The dedupe set marks the context as "pending" for the entire duration of forceFlush(), but BatchLogRecordProcessor.forceFlush() snapshots its internal buffer synchronously and then awaits the OTLP export. Any log emitted during that export window goes into the next batch — but scheduleRequestFlush short-circuits (line 46-48) because the context is still in pendingFlushContexts, so no new waitUntil is registered. When the in-flight flush resolves, Vercel can terminate the runtime before the BatchLogRecordProcessor's 5s timer fires (it's unref()'d), silently dropping that log — the exact failure mode this PR aims to prevent. Fix: delete the context from pendingFlushContexts before awaiting forceFlush, so concurrent emits schedule a fresh waitUntil that picks up late additions.

Extended reasoning...

What the bug is

scheduleRequestFlush at src/core/server/observability/vercel-wait-until-log-record-processor.ts:43-72 uses pendingFlushContexts to dedupe — only one waitUntil(forceFlush) should be scheduled per request context. The intent is sound, but the dedupe flag is released too late: the context is removed from the set only in the finally block at line 62, after forceFlush() resolves. This is a classic flag-after-side-effect race.

Why it manifests

BatchLogRecordProcessor.forceFlush() (in @opentelemetry/sdk-logs) calls _flushAll(), which synchronously snapshots the buffer:

let toFlush = this._finishedLogRecords;
this._finishedLogRecords = [];   // buffer swapped to empty
// ... then async OTLP HTTP export happens

Records emitted after this snapshot land in the new (empty) buffer and are not part of the in-flight export. The OTLP HTTP POST typically takes tens to hundreds of milliseconds, which is plenty of time for additional logs to be emitted from concurrent async work (database calls, fetch instrumentation auto-spans, etc.).

Step-by-step proof

  1. Log A emitted → delegate.onEmit pushes A → scheduleRequestFlush adds requestContext to pendingFlushContexts, calls requestContext.waitUntil(asyncFn).
  2. asyncFn runs; delegate.forceFlush() is invoked. Internally, _flushAll snapshots toFlush = [A], resets the buffer to [], and starts the OTLP HTTPS POST. asyncFn is now suspended at await.
  3. The request handler continues (e.g. await db.query() or await fetch(...)). During this window, log B is emitted.
  4. delegate.onEmit(B, ctx) pushes B into the now-empty buffer. scheduleRequestFlush runs: pendingFlushContexts.has(requestContext) is true → early return at line 46-48. No new waitUntil is registered.
  5. The OTLP export for [A] resolves. _flushAll calls _maybeStartTimer which schedules a delayed flush — but the BatchLogRecordProcessor's timer is unref()'d, so it cannot keep the Vercel runtime alive.
  6. asyncFn's finally removes requestContext from pendingFlushContexts and the async function resolves.
  7. Vercel observes that all waitUntil promises have settled. The runtime is terminated.
  8. Log B is still sitting in BatchLogRecordProcessor._finishedLogRecords and is silently dropped.

This is precisely the failure mode the PR title ("flush otel logs with vercel waitUntil") is trying to prevent — under realistic conditions where logs are interleaved with async I/O, the fix is only partial.

Why existing code does not prevent it

The test at tests/unit/vercel-wait-until-log-record-processor.test.ts exercises back-to-back synchronous emissions before the callback runs (processor.onEmit(); processor.onEmit(); await callbacks[0]();). It does not cover emissions that happen during the await this.delegate.forceFlush() window — the exact case this bug describes.

Impact

In Vercel serverless functions handling typical async request workflows, logs emitted while an OTLP flush is in flight are at risk of being lost on runtime termination. The race window is the OTLP HTTP RTT (tens to hundreds of ms), easily long enough to overlap with subsequent log emissions. Auto-instrumentations like FetchInstrumentation that emit telemetry from async span completions further widen the window.

How to fix

Remove the context from pendingFlushContexts before awaiting forceFlush, so any log emitted during the export schedules a fresh waitUntil:

requestContext.waitUntil(async () => {
  this.pendingFlushContexts.delete(requestContext)
  try {
    await this.delegate.forceFlush()
  } catch (error) {
    diag.warn('Failed to flush OpenTelemetry logs in Vercel waitUntil', error)
  }
})

With this change, concurrent emits add the context back into the set and register a new waitUntil, extending the runtime lifetime until the late additions are exported.

this.pendingFlushContexts.delete(requestContext)
diag.warn(
'Failed to schedule OpenTelemetry log flush in Vercel waitUntil',
error
)
}
}
}
11 changes: 7 additions & 4 deletions src/instrumentation.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
ATTR_SERVICE_VERSION,
} from '@opentelemetry/semantic-conventions'
import { FetchInstrumentation } from '@vercel/otel'
import { VercelWaitUntilLogRecordProcessor } from './core/server/observability/vercel-wait-until-log-record-processor'

function parseResourceAttributes(
resourceAttrs?: string
Expand Down Expand Up @@ -76,10 +77,12 @@ const sdk = new NodeSDK({
}),
}),
logRecordProcessors: [
new BatchLogRecordProcessor(
new OTLPLogExporter({
url: `${OTEL_EXPORTER_OTLP_ENDPOINT}/v1/logs`,
})
new VercelWaitUntilLogRecordProcessor(
new BatchLogRecordProcessor(
new OTLPLogExporter({
url: `${OTEL_EXPORTER_OTLP_ENDPOINT}/v1/logs`,
})
)
),
],
instrumentations: [
Expand Down
72 changes: 72 additions & 0 deletions tests/unit/vercel-wait-until-log-record-processor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import type { Context } from '@opentelemetry/api'
import type { LogRecordProcessor, SdkLogRecord } from '@opentelemetry/sdk-logs'
import { afterEach, describe, expect, it, vi } from 'vitest'
import { VercelWaitUntilLogRecordProcessor } from '@/core/server/observability/vercel-wait-until-log-record-processor'

type WaitUntilCallback = () => Promise<unknown>
type WaitUntilInput = Promise<unknown> | WaitUntilCallback

const requestContextSymbol = Symbol.for('@vercel/request-context')
const logRecord = {} as SdkLogRecord

class TestLogRecordProcessor implements LogRecordProcessor {
readonly onEmit = vi.fn()
readonly forceFlush = vi.fn(async () => {})
readonly shutdown = vi.fn(async () => {})
}

function installVercelRequestContext(
waitUntil: (input: WaitUntilInput) => void
) {
const requestContext = {
waitUntil,
}

Reflect.set(globalThis, requestContextSymbol, {
get: () => requestContext,
})
}

afterEach(() => {
Reflect.deleteProperty(globalThis, requestContextSymbol)
vi.clearAllMocks()
})

describe('VercelWaitUntilLogRecordProcessor', () => {
it('delegates log records without a Vercel request context', () => {
const delegate = new TestLogRecordProcessor()
const processor = new VercelWaitUntilLogRecordProcessor(delegate)

processor.onEmit(logRecord)

expect(delegate.onEmit).toHaveBeenCalledWith(logRecord, undefined)
expect(delegate.forceFlush).not.toHaveBeenCalled()
})

it('schedules one waitUntil flush per request context', async () => {
const callbacks: WaitUntilCallback[] = []
const waitUntil = vi.fn((input: WaitUntilInput) => {
callbacks.push(
typeof input === 'function' ? input : async () => await input
)
})
installVercelRequestContext(waitUntil)

const delegate = new TestLogRecordProcessor()
const processor = new VercelWaitUntilLogRecordProcessor(delegate)

processor.onEmit(logRecord)
processor.onEmit(logRecord)

expect(delegate.onEmit).toHaveBeenCalledTimes(2)
expect(waitUntil).toHaveBeenCalledTimes(1)

await callbacks[0]?.()

expect(delegate.forceFlush).toHaveBeenCalledTimes(1)

processor.onEmit(logRecord, {} as Context)

expect(waitUntil).toHaveBeenCalledTimes(2)
})
})
Loading