-
Notifications
You must be signed in to change notification settings - Fork 70
Flush OTEL logs with Vercel waitUntil #354
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
73 changes: 73 additions & 0 deletions
73
src/core/server/observability/vercel-wait-until-log-record-processor.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| import { type Context, diag } from '@opentelemetry/api' | ||
| import type { LogRecordProcessor, SdkLogRecord } from '@opentelemetry/sdk-logs' | ||
|
|
||
| type WaitUntilInput = Promise<unknown> | (() => Promise<unknown>) | ||
|
|
||
| interface VercelRequestContext { | ||
| waitUntil: (promiseOrFunc: WaitUntilInput) => void | ||
| } | ||
|
|
||
| interface VercelRequestContextReader { | ||
| get: () => VercelRequestContext | undefined | ||
| } | ||
|
|
||
| // This mirrors the private request-context hook used by @vercel/otel. | ||
| const VERCEL_REQUEST_CONTEXT_SYMBOL = Symbol.for('@vercel/request-context') | ||
|
|
||
| function getVercelRequestContext(): VercelRequestContext | undefined { | ||
| const reader = Reflect.get(globalThis, VERCEL_REQUEST_CONTEXT_SYMBOL) as | ||
| | VercelRequestContextReader | ||
| | undefined | ||
|
|
||
| return reader?.get() | ||
| } | ||
|
|
||
| export class VercelWaitUntilLogRecordProcessor implements LogRecordProcessor { | ||
| private readonly pendingFlushContexts = new WeakSet<VercelRequestContext>() | ||
|
|
||
| constructor(private readonly delegate: LogRecordProcessor) {} | ||
|
|
||
| onEmit(logRecord: SdkLogRecord, context?: Context): void { | ||
| this.delegate.onEmit(logRecord, context) | ||
| this.scheduleRequestFlush() | ||
| } | ||
|
|
||
| forceFlush(): Promise<void> { | ||
| return this.delegate.forceFlush() | ||
| } | ||
|
|
||
| shutdown(): Promise<void> { | ||
| return this.delegate.shutdown() | ||
| } | ||
|
|
||
| private scheduleRequestFlush(): void { | ||
| const requestContext = getVercelRequestContext() | ||
|
|
||
| if (!requestContext || this.pendingFlushContexts.has(requestContext)) { | ||
| return | ||
| } | ||
|
|
||
| this.pendingFlushContexts.add(requestContext) | ||
|
|
||
| try { | ||
| requestContext.waitUntil(async () => { | ||
| try { | ||
| await this.delegate.forceFlush() | ||
| } catch (error) { | ||
| diag.warn( | ||
| 'Failed to flush OpenTelemetry logs in Vercel waitUntil', | ||
| error | ||
| ) | ||
| } finally { | ||
| this.pendingFlushContexts.delete(requestContext) | ||
| } | ||
| }) | ||
| } catch (error) { | ||
|
Check failure on line 65 in src/core/server/observability/vercel-wait-until-log-record-processor.ts
|
||
| this.pendingFlushContexts.delete(requestContext) | ||
| diag.warn( | ||
| 'Failed to schedule OpenTelemetry log flush in Vercel waitUntil', | ||
| error | ||
| ) | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| import type { Context } from '@opentelemetry/api' | ||
| import type { LogRecordProcessor, SdkLogRecord } from '@opentelemetry/sdk-logs' | ||
| import { afterEach, describe, expect, it, vi } from 'vitest' | ||
| import { VercelWaitUntilLogRecordProcessor } from '@/core/server/observability/vercel-wait-until-log-record-processor' | ||
|
|
||
| type WaitUntilCallback = () => Promise<unknown> | ||
| type WaitUntilInput = Promise<unknown> | WaitUntilCallback | ||
|
|
||
| const requestContextSymbol = Symbol.for('@vercel/request-context') | ||
| const logRecord = {} as SdkLogRecord | ||
|
|
||
| class TestLogRecordProcessor implements LogRecordProcessor { | ||
| readonly onEmit = vi.fn() | ||
| readonly forceFlush = vi.fn(async () => {}) | ||
| readonly shutdown = vi.fn(async () => {}) | ||
| } | ||
|
|
||
| function installVercelRequestContext( | ||
| waitUntil: (input: WaitUntilInput) => void | ||
| ) { | ||
| const requestContext = { | ||
| waitUntil, | ||
| } | ||
|
|
||
| Reflect.set(globalThis, requestContextSymbol, { | ||
| get: () => requestContext, | ||
| }) | ||
| } | ||
|
|
||
| afterEach(() => { | ||
| Reflect.deleteProperty(globalThis, requestContextSymbol) | ||
| vi.clearAllMocks() | ||
| }) | ||
|
|
||
| describe('VercelWaitUntilLogRecordProcessor', () => { | ||
| it('delegates log records without a Vercel request context', () => { | ||
| const delegate = new TestLogRecordProcessor() | ||
| const processor = new VercelWaitUntilLogRecordProcessor(delegate) | ||
|
|
||
| processor.onEmit(logRecord) | ||
|
|
||
| expect(delegate.onEmit).toHaveBeenCalledWith(logRecord, undefined) | ||
| expect(delegate.forceFlush).not.toHaveBeenCalled() | ||
| }) | ||
|
|
||
| it('schedules one waitUntil flush per request context', async () => { | ||
| const callbacks: WaitUntilCallback[] = [] | ||
| const waitUntil = vi.fn((input: WaitUntilInput) => { | ||
| callbacks.push( | ||
| typeof input === 'function' ? input : async () => await input | ||
| ) | ||
| }) | ||
| installVercelRequestContext(waitUntil) | ||
|
|
||
| const delegate = new TestLogRecordProcessor() | ||
| const processor = new VercelWaitUntilLogRecordProcessor(delegate) | ||
|
|
||
| processor.onEmit(logRecord) | ||
| processor.onEmit(logRecord) | ||
|
|
||
| expect(delegate.onEmit).toHaveBeenCalledTimes(2) | ||
| expect(waitUntil).toHaveBeenCalledTimes(1) | ||
|
|
||
| await callbacks[0]?.() | ||
|
|
||
| expect(delegate.forceFlush).toHaveBeenCalledTimes(1) | ||
|
|
||
| processor.onEmit(logRecord, {} as Context) | ||
|
|
||
| expect(waitUntil).toHaveBeenCalledTimes(2) | ||
| }) | ||
| }) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 The dedupe set marks the context as "pending" for the entire duration of
forceFlush(), butBatchLogRecordProcessor.forceFlush()snapshots its internal buffer synchronously and then awaits the OTLP export. Any log emitted during that export window goes into the next batch — butscheduleRequestFlushshort-circuits (line 46-48) because the context is still inpendingFlushContexts, so no newwaitUntilis registered. When the in-flight flush resolves, Vercel can terminate the runtime before the BatchLogRecordProcessor's 5s timer fires (it'sunref()'d), silently dropping that log — the exact failure mode this PR aims to prevent. Fix:deletethe context frompendingFlushContextsbefore awaitingforceFlush, so concurrent emits schedule a freshwaitUntilthat picks up late additions.Extended reasoning...
What the bug is
scheduleRequestFlushatsrc/core/server/observability/vercel-wait-until-log-record-processor.ts:43-72usespendingFlushContextsto dedupe — only onewaitUntil(forceFlush)should be scheduled per request context. The intent is sound, but the dedupe flag is released too late: the context is removed from the set only in thefinallyblock at line 62, afterforceFlush()resolves. This is a classic flag-after-side-effect race.Why it manifests
BatchLogRecordProcessor.forceFlush()(in@opentelemetry/sdk-logs) calls_flushAll(), which synchronously snapshots the buffer:Records emitted after this snapshot land in the new (empty) buffer and are not part of the in-flight export. The OTLP HTTP POST typically takes tens to hundreds of milliseconds, which is plenty of time for additional logs to be emitted from concurrent async work (database calls, fetch instrumentation auto-spans, etc.).
Step-by-step proof
delegate.onEmitpushes A →scheduleRequestFlushaddsrequestContexttopendingFlushContexts, callsrequestContext.waitUntil(asyncFn).asyncFnruns;delegate.forceFlush()is invoked. Internally,_flushAllsnapshotstoFlush = [A], resets the buffer to[], and starts the OTLP HTTPS POST.asyncFnis now suspended atawait.await db.query()orawait fetch(...)). During this window, log B is emitted.delegate.onEmit(B, ctx)pushes B into the now-empty buffer.scheduleRequestFlushruns:pendingFlushContexts.has(requestContext)istrue→ early return at line 46-48. No newwaitUntilis registered._flushAllcalls_maybeStartTimerwhich schedules a delayed flush — but the BatchLogRecordProcessor's timer isunref()'d, so it cannot keep the Vercel runtime alive.asyncFn'sfinallyremovesrequestContextfrompendingFlushContextsand the async function resolves.waitUntilpromises have settled. The runtime is terminated.BatchLogRecordProcessor._finishedLogRecordsand is silently dropped.This is precisely the failure mode the PR title ("flush otel logs with vercel waitUntil") is trying to prevent — under realistic conditions where logs are interleaved with async I/O, the fix is only partial.
Why existing code does not prevent it
The test at
tests/unit/vercel-wait-until-log-record-processor.test.tsexercises back-to-back synchronous emissions before the callback runs (processor.onEmit(); processor.onEmit(); await callbacks[0]();). It does not cover emissions that happen during theawait this.delegate.forceFlush()window — the exact case this bug describes.Impact
In Vercel serverless functions handling typical async request workflows, logs emitted while an OTLP flush is in flight are at risk of being lost on runtime termination. The race window is the OTLP HTTP RTT (tens to hundreds of ms), easily long enough to overlap with subsequent log emissions. Auto-instrumentations like
FetchInstrumentationthat emit telemetry from async span completions further widen the window.How to fix
Remove the context from
pendingFlushContextsbefore awaitingforceFlush, so any log emitted during the export schedules a freshwaitUntil:With this change, concurrent emits add the context back into the set and register a new
waitUntil, extending the runtime lifetime until the late additions are exported.