diff --git a/.changeset/asset-worker-rpc-observability.md b/.changeset/asset-worker-rpc-observability.md new file mode 100644 index 0000000000..7bdd9890b7 --- /dev/null +++ b/.changeset/asset-worker-rpc-observability.md @@ -0,0 +1,7 @@ +--- +"@cloudflare/workers-shared": patch +--- + +Add Sentry error reporting and Jaeger tracing to asset-worker outer entrypoint RPC methods + +The outer entrypoint's RPC methods (`unstable_canFetch`, `unstable_getByETag`, `unstable_getByPathname`, `unstable_exists`) previously had no error reporting or tracing. When the router-worker calls these methods via RPC and they throw, the error was invisible -- no Sentry report, no Jaeger trace. This adds a `withObservability` helper that wraps each RPC method with Sentry error capture and a Jaeger span, and propagates trace context to the inner entrypoint so spans are connected across the loopback boundary. diff --git a/fixtures/workers-shared-asset-config/loopback.test.ts b/fixtures/workers-shared-asset-config/loopback.test.ts index 680def7dee..9c882b040e 100644 --- a/fixtures/workers-shared-asset-config/loopback.test.ts +++ b/fixtures/workers-shared-asset-config/loopback.test.ts @@ -3,6 +3,7 @@ import Worker, { } from "@cloudflare/workers-shared/asset-worker"; import { normalizeConfiguration } from "@cloudflare/workers-shared/asset-worker/src/configuration"; import { getAssetWithMetadataFromKV } from "@cloudflare/workers-shared/asset-worker/src/utils/kv"; +import { setupSentry } from "@cloudflare/workers-shared/utils/sentry"; import { SELF } from "cloudflare:test"; import { afterEach, beforeEach, describe, it, vi } from "vitest"; import type { AssetMetadata } from "@cloudflare/workers-shared/asset-worker/src/utils/kv"; @@ -11,6 +12,7 @@ const IncomingRequest = Request; vi.mock("@cloudflare/workers-shared/asset-worker/src/utils/kv.ts"); vi.mock("@cloudflare/workers-shared/asset-worker/src/configuration"); +vi.mock("@cloudflare/workers-shared/utils/sentry"); describe("[Asset Worker] loopback", () => { beforeEach(async () => { @@ -37,6 +39,8 @@ describe("[Asset Worker] loopback", () => { html_handling: "none", not_found_handling: "none", })); + + vi.mocked(setupSentry).mockReturnValue(undefined); }); afterEach(() => { @@ -61,3 +65,145 @@ describe("[Asset Worker] loopback", () => { expect(getAssetWithMetadataFromKV).toBeCalledWith(undefined, "/file.bin"); }); }); + +describe("[Asset Worker] observability", () => { + const mockCaptureException = vi.fn(); + const mockEnterSpan = vi.fn((_name: string, fn: () => unknown) => fn()); + const mockRunWithSpanContext = vi.fn((_ctx: unknown, fn: () => unknown) => + fn() + ); + + /** + * Builds mock env/ctx and instantiates the outer entrypoint directly. + * This bypasses SELF (which doesn't support RPC in vitest-pool-workers) + * and lets us call outer RPC methods directly. + */ + function buildMocks(innerOverrides?: Record) { + const env = { + JAEGER: { + enterSpan: mockEnterSpan, + runWithSpanContext: mockRunWithSpanContext, + getSpanContext: () => ({ + traceId: "test-trace", + spanId: "test-span", + parentSpanId: null, + traceFlags: 0, + }), + traceId: "test-trace", + spanId: "test-span", + parentSpanId: null, + cfTraceIdHeader: null, + }, + SENTRY_DSN: "https://test@sentry.io/123", + SENTRY_ACCESS_CLIENT_ID: "test-client-id", + SENTRY_ACCESS_CLIENT_SECRET: "test-client-secret", + CONFIG: {}, + } as unknown as ConstructorParameters[1]; + + const mockCtx = { + waitUntil: () => {}, + passThroughOnException: () => {}, + exports: {} as unknown, + }; + + mockCtx.exports = { + AssetWorkerInner: ({ props }: { props: unknown }) => { + const innerCtx = { ...mockCtx, props }; + const inner = new AssetWorkerInner( + innerCtx as unknown as ExecutionContext, + env + ); + if (innerOverrides) { + for (const [key, value] of Object.entries(innerOverrides)) { + (inner as Record)[key] = value; + } + } + return inner; + }, + }; + + const outer = new Worker(mockCtx as unknown as ExecutionContext, env); + + return { outer, env, ctx: mockCtx }; + } + + beforeEach(() => { + mockCaptureException.mockClear(); + mockEnterSpan.mockClear(); + mockRunWithSpanContext.mockClear(); + vi.mocked(setupSentry).mockClear(); + vi.mocked(setupSentry).mockReturnValue({ + captureException: mockCaptureException, + setContext: vi.fn(), + } as unknown as ReturnType); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it("reports errors to Sentry and re-throws when an outer RPC method fails", async ({ + expect, + }) => { + const testError = new Error("canFetch failed"); + const { outer } = buildMocks({ + unstable_canFetch: () => Promise.reject(testError), + }); + + const request = new IncomingRequest("http://example.com/test"); + await expect(outer.unstable_canFetch(request)).rejects.toThrow( + "canFetch failed" + ); + + expect(setupSentry).toHaveBeenCalled(); + expect(mockCaptureException).toHaveBeenCalledWith(testError); + }); + + it("propagates trace context to inner entrypoint via runWithSpanContext", async ({ + expect, + }) => { + // Don't override the inner method -- let the real AssetWorkerInner.unstable_exists + // run so it calls env.JAEGER.runWithSpanContext with the trace context + const { outer } = buildMocks(); + + await outer.unstable_exists("/test").catch(() => { + // The real implementation may throw due to missing ASSETS_MANIFEST etc. + // We only care that runWithSpanContext was called before the error + }); + + expect(mockRunWithSpanContext).toHaveBeenCalled(); + }); + + it("skips Sentry when no request is provided to RPC methods", async ({ + expect, + }) => { + const testError = new Error("exists failed"); + const { outer } = buildMocks({ + unstable_exists: () => Promise.reject(testError), + }); + + await expect(outer.unstable_exists("/test")).rejects.toThrow( + "exists failed" + ); + + expect(setupSentry).not.toHaveBeenCalled(); + expect(mockCaptureException).not.toHaveBeenCalled(); + }); + + it("initializes Sentry when request is provided to RPC methods", async ({ + expect, + }) => { + const testError = new Error("getByETag failed"); + const { outer } = buildMocks({ + unstable_getByETag: () => Promise.reject(testError), + }); + + const request = new IncomingRequest("http://example.com/test"); + await expect(outer.unstable_getByETag("etag", request)).rejects.toThrow( + "getByETag failed" + ); + + expect(setupSentry).toHaveBeenCalled(); + expect(mockCaptureException).toHaveBeenCalledWith(testError); + }); +}); diff --git a/packages/workers-shared/asset-worker/src/worker.ts b/packages/workers-shared/asset-worker/src/worker.ts index a9b05a0515..cfa525718f 100644 --- a/packages/workers-shared/asset-worker/src/worker.ts +++ b/packages/workers-shared/asset-worker/src/worker.ts @@ -109,7 +109,7 @@ async function unstableExistsImpl( const analytics = new ExperimentAnalytics(env.EXPERIMENT_ANALYTICS); const performance = new PerformanceTimer(env.UNSAFE_PERFORMANCE); const jaeger = env.JAEGER ?? mockJaegerBinding(); - return jaeger.enterSpan("unstable_exists", async (span) => { + return jaeger.enterSpan("inner.unstable_exists", async (span) => { if (env.COLO_METADATA && env.VERSION_METADATA && env.CONFIG) { analytics.setData({ accountId: env.CONFIG.account_id, @@ -145,7 +145,7 @@ async function unstableGetByETagImpl( ): Promise { const performance = new PerformanceTimer(env.UNSAFE_PERFORMANCE); const jaeger = env.JAEGER ?? mockJaegerBinding(); - return jaeger.enterSpan("unstable_getByETag", async (span) => { + return jaeger.enterSpan("inner.unstable_getByETag", async (span) => { const startTime = performance.now(); const asset = await getAssetWithMetadataFromKV( env.ASSETS_KV_NAMESPACE, @@ -193,7 +193,7 @@ async function unstableGetByPathnameImpl( request?: Request ): Promise { const jaeger = env.JAEGER ?? mockJaegerBinding(); - return jaeger.enterSpan("unstable_getByPathname", async (span) => { + return jaeger.enterSpan("inner.unstable_getByPathname", async (span) => { const eTag = await exists(pathname, request); span.setTags({ @@ -333,6 +333,40 @@ export default class AssetWorkerOuter }); } + /** + * Wraps an RPC method call with Jaeger tracing and Sentry error reporting. + * Creates an outer Jaeger span so that the inner entrypoint's spans are + * connected to the outer's trace via ctx.props.traceContext. If the call + * throws, the error is reported to Sentry (when a request is available for + * context) and then re-thrown to the caller. + */ + private async withObservability( + methodName: string, + fn: () => Promise, + request?: Request + ): Promise { + this.env.JAEGER ??= mockJaegerBinding(); + const sentry = request + ? setupSentry( + request, + this.ctx, + this.env.SENTRY_DSN, + this.env.SENTRY_ACCESS_CLIENT_ID, + this.env.SENTRY_ACCESS_CLIENT_SECRET, + this.env.COLO_METADATA, + this.env.VERSION_METADATA, + this.env.CONFIG?.account_id, + this.env.CONFIG?.script_id + ) + : undefined; + try { + return await this.env.JAEGER.enterSpan(`outer.${methodName}`, () => fn()); + } catch (err) { + sentry?.captureException(err); + throw err; + } + } + override async fetch(request: Request): Promise { this.env.JAEGER ??= mockJaegerBinding(); let sentry: ReturnType | undefined; @@ -377,32 +411,44 @@ export default class AssetWorkerOuter } async unstable_canFetch(request: Request): Promise { - this.env.JAEGER ??= mockJaegerBinding(); - return this.getInnerEntrypoint().unstable_canFetch(request); + return this.withObservability( + "unstable_canFetch", + () => this.getInnerEntrypoint().unstable_canFetch(request), + request + ); } async unstable_getByETag( eTag: string, request?: Request ): Promise { - this.env.JAEGER ??= mockJaegerBinding(); - return this.getInnerEntrypoint().unstable_getByETag(eTag, request); + return this.withObservability( + "unstable_getByETag", + () => this.getInnerEntrypoint().unstable_getByETag(eTag, request), + request + ); } async unstable_getByPathname( pathname: string, request?: Request ): Promise { - this.env.JAEGER ??= mockJaegerBinding(); - return this.getInnerEntrypoint().unstable_getByPathname(pathname, request); + return this.withObservability( + "unstable_getByPathname", + () => this.getInnerEntrypoint().unstable_getByPathname(pathname, request), + request + ); } async unstable_exists( pathname: string, request?: Request ): Promise { - this.env.JAEGER ??= mockJaegerBinding(); - return this.getInnerEntrypoint().unstable_exists(pathname, request); + return this.withObservability( + "unstable_exists", + () => this.getInnerEntrypoint().unstable_exists(pathname, request), + request + ); } } @@ -449,12 +495,16 @@ export class AssetWorkerInner async unstable_canFetch(request: Request): Promise { // TODO: Mock this with Miniflare this.env.JAEGER ??= mockJaegerBinding(); + const loopbackCtx = this.ctx as AssetWorkerContext; + const traceContext = loopbackCtx.props?.traceContext ?? null; - return canFetch( - request, - this.env, - normalizeConfiguration(this.env.CONFIG), - this.unstable_exists.bind(this) + return this.env.JAEGER.runWithSpanContext(traceContext, () => + canFetch( + request, + this.env, + normalizeConfiguration(this.env.CONFIG), + this.unstable_exists.bind(this) + ) ); } @@ -462,19 +512,31 @@ export class AssetWorkerInner eTag: string, request?: Request ): Promise { - return unstableGetByETagImpl(this.env, eTag, request); + this.env.JAEGER ??= mockJaegerBinding(); + const loopbackCtx = this.ctx as AssetWorkerContext; + const traceContext = loopbackCtx.props?.traceContext ?? null; + + return this.env.JAEGER.runWithSpanContext(traceContext, () => + unstableGetByETagImpl(this.env, eTag, request) + ); } async unstable_getByPathname( pathname: string, request?: Request ): Promise { - return unstableGetByPathnameImpl( - this.env, - this.unstable_exists.bind(this), - this.unstable_getByETag.bind(this), - pathname, - request + this.env.JAEGER ??= mockJaegerBinding(); + const loopbackCtx = this.ctx as AssetWorkerContext; + const traceContext = loopbackCtx.props?.traceContext ?? null; + + return this.env.JAEGER.runWithSpanContext(traceContext, () => + unstableGetByPathnameImpl( + this.env, + this.unstable_exists.bind(this), + this.unstable_getByETag.bind(this), + pathname, + request + ) ); } @@ -482,6 +544,12 @@ export class AssetWorkerInner pathname: string, request?: Request ): Promise { - return unstableExistsImpl(this.env, pathname, request); + this.env.JAEGER ??= mockJaegerBinding(); + const loopbackCtx = this.ctx as AssetWorkerContext; + const traceContext = loopbackCtx.props?.traceContext ?? null; + + return this.env.JAEGER.runWithSpanContext(traceContext, () => + unstableExistsImpl(this.env, pathname, request) + ); } } diff --git a/packages/workers-shared/utils/types.ts b/packages/workers-shared/utils/types.ts index 1cd4374b3c..b7e4626cab 100644 --- a/packages/workers-shared/utils/types.ts +++ b/packages/workers-shared/utils/types.ts @@ -102,11 +102,11 @@ export interface JaegerTracing { ...args: T ): R; getSpanContext(): SpanContext | null; - runWithSpanContext( + runWithSpanContext( spanContext: SpanContext | null, - callback: (...args: T) => unknown, + callback: (...args: T) => R, ...args: T - ): unknown; + ): R; readonly traceId: string | null; readonly spanId: string | null;