Skip to content

Commit 2ff9d5b

Browse files
fix(observability): flush logs with Next after (#365)
1 parent 412c3a2 commit 2ff9d5b

7 files changed

Lines changed: 125 additions & 152 deletions

bun.lock

Lines changed: 2 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
"@types/micromatch": "^4.0.9",
9393
"@vercel/analytics": "^1.5.0",
9494
"@vercel/kv": "^3.0.0",
95-
"@vercel/otel": "^1.13.0",
95+
"@vercel/otel": "^2.1.2",
9696
"@vercel/speed-insights": "^1.2.0",
9797
"@xterm/xterm": "^6.0.0",
9898
"cheerio": "^1.0.0",
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import { type Context, diag } from '@opentelemetry/api'
2+
import type { LogRecordProcessor, SdkLogRecord } from '@opentelemetry/sdk-logs'
3+
import { after } from 'next/server'
4+
5+
const NEXT_AFTER_OUTSIDE_REQUEST_ERROR_CODE = 'E468'
6+
7+
function isAfterOutsideRequestScopeError(error: unknown): boolean {
8+
return (
9+
error instanceof Error &&
10+
(Reflect.get(error, '__NEXT_ERROR_CODE') ===
11+
NEXT_AFTER_OUTSIDE_REQUEST_ERROR_CODE ||
12+
error.message.includes('`after` was called outside a request scope'))
13+
)
14+
}
15+
16+
export class NextAfterLogRecordProcessor implements LogRecordProcessor {
17+
constructor(private readonly delegate: LogRecordProcessor) {}
18+
19+
onEmit(logRecord: SdkLogRecord, context?: Context): void {
20+
this.delegate.onEmit(logRecord, context)
21+
this.scheduleRequestFlush()
22+
}
23+
24+
forceFlush(): Promise<void> {
25+
return this.delegate.forceFlush()
26+
}
27+
28+
shutdown(): Promise<void> {
29+
return this.delegate.shutdown()
30+
}
31+
32+
private scheduleRequestFlush(): void {
33+
try {
34+
after(async () => {
35+
try {
36+
await this.delegate.forceFlush()
37+
} catch (error) {
38+
diag.warn(
39+
'Failed to flush OpenTelemetry logs in Next.js after()',
40+
error
41+
)
42+
}
43+
})
44+
} catch (error) {
45+
if (isAfterOutsideRequestScopeError(error)) {
46+
return
47+
}
48+
49+
diag.warn(
50+
'Failed to schedule OpenTelemetry log flush in Next.js after()',
51+
error
52+
)
53+
}
54+
}
55+
}

src/core/server/observability/vercel-wait-until-log-record-processor.ts

Lines changed: 0 additions & 73 deletions
This file was deleted.

src/instrumentation.node.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import {
1515
ATTR_SERVICE_VERSION,
1616
} from '@opentelemetry/semantic-conventions'
1717
import { FetchInstrumentation } from '@vercel/otel'
18-
import { VercelWaitUntilLogRecordProcessor } from './core/server/observability/vercel-wait-until-log-record-processor'
18+
import { NextAfterLogRecordProcessor } from './core/server/observability/next-after-log-record-processor'
1919

2020
function parseResourceAttributes(
2121
resourceAttrs?: string
@@ -77,7 +77,7 @@ const sdk = new NodeSDK({
7777
}),
7878
}),
7979
logRecordProcessors: [
80-
new VercelWaitUntilLogRecordProcessor(
80+
new NextAfterLogRecordProcessor(
8181
new BatchLogRecordProcessor(
8282
new OTLPLogExporter({
8383
url: `${OTEL_EXPORTER_OTLP_ENDPOINT}/v1/logs`,
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import type { LogRecordProcessor, SdkLogRecord } from '@opentelemetry/sdk-logs'
2+
import { after } from 'next/server'
3+
import { afterEach, describe, expect, it, vi } from 'vitest'
4+
import { NextAfterLogRecordProcessor } from '@/core/server/observability/next-after-log-record-processor'
5+
6+
vi.mock('next/server', () => ({
7+
after: vi.fn(),
8+
}))
9+
10+
const logRecord = {} as SdkLogRecord
11+
const mockedAfter = vi.mocked(after)
12+
type AfterTask = Parameters<typeof after>[0]
13+
14+
class TestLogRecordProcessor implements LogRecordProcessor {
15+
readonly onEmit = vi.fn()
16+
readonly forceFlush = vi.fn(async () => {})
17+
readonly shutdown = vi.fn(async () => {})
18+
}
19+
20+
function createOutsideRequestScopeError(): Error {
21+
const error = new Error('`after` was called outside a request scope')
22+
Reflect.set(error, '__NEXT_ERROR_CODE', 'E468')
23+
return error
24+
}
25+
26+
afterEach(() => {
27+
mockedAfter.mockReset()
28+
vi.clearAllMocks()
29+
})
30+
31+
describe('NextAfterLogRecordProcessor', () => {
32+
it('delegates log records outside a request scope', () => {
33+
mockedAfter.mockImplementation(() => {
34+
throw createOutsideRequestScopeError()
35+
})
36+
37+
const delegate = new TestLogRecordProcessor()
38+
const processor = new NextAfterLogRecordProcessor(delegate)
39+
40+
processor.onEmit(logRecord)
41+
42+
expect(delegate.onEmit).toHaveBeenCalledWith(logRecord, undefined)
43+
expect(delegate.forceFlush).not.toHaveBeenCalled()
44+
})
45+
46+
it('schedules an after() flush for each emitted log record', async () => {
47+
const callbacks: Array<() => unknown> = []
48+
mockedAfter.mockImplementation((task: AfterTask) => {
49+
callbacks.push(typeof task === 'function' ? task : async () => await task)
50+
})
51+
52+
const delegate = new TestLogRecordProcessor()
53+
const processor = new NextAfterLogRecordProcessor(delegate)
54+
55+
processor.onEmit(logRecord)
56+
processor.onEmit(logRecord)
57+
58+
expect(delegate.onEmit).toHaveBeenCalledTimes(2)
59+
expect(mockedAfter).toHaveBeenCalledTimes(2)
60+
61+
await Promise.all(callbacks.map((callback) => callback()))
62+
63+
expect(delegate.forceFlush).toHaveBeenCalledTimes(2)
64+
})
65+
})

tests/unit/vercel-wait-until-log-record-processor.test.ts

Lines changed: 0 additions & 72 deletions
This file was deleted.

0 commit comments

Comments
 (0)