Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/asset-worker-rpc-observability.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@cloudflare/workers-shared": patch
---

Add Sentry error reporting and Jaeger tracing to asset-worker outer entrypoint RPC methods

The outer entrypoint's RPC methods (`unstable_canFetch`, `unstable_getByETag`, `unstable_getByPathname`, `unstable_exists`) previously had no error reporting or tracing. When the router-worker calls these methods via RPC and they throw, the error was invisible -- no Sentry report, no Jaeger trace. This adds a `withObservability` helper that wraps each RPC method with Sentry error capture and a Jaeger span, and propagates trace context to the inner entrypoint so spans are connected across the loopback boundary.
146 changes: 146 additions & 0 deletions fixtures/workers-shared-asset-config/loopback.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import Worker, {
} from "@cloudflare/workers-shared/asset-worker";
import { normalizeConfiguration } from "@cloudflare/workers-shared/asset-worker/src/configuration";
import { getAssetWithMetadataFromKV } from "@cloudflare/workers-shared/asset-worker/src/utils/kv";
import { setupSentry } from "@cloudflare/workers-shared/utils/sentry";
import { SELF } from "cloudflare:test";
import { afterEach, beforeEach, describe, it, vi } from "vitest";
import type { AssetMetadata } from "@cloudflare/workers-shared/asset-worker/src/utils/kv";
Expand All @@ -11,6 +12,7 @@ const IncomingRequest = Request<unknown, IncomingRequestCfProperties>;

vi.mock("@cloudflare/workers-shared/asset-worker/src/utils/kv.ts");
vi.mock("@cloudflare/workers-shared/asset-worker/src/configuration");
vi.mock("@cloudflare/workers-shared/utils/sentry");

describe("[Asset Worker] loopback", () => {
beforeEach(async () => {
Expand All @@ -37,6 +39,8 @@ describe("[Asset Worker] loopback", () => {
html_handling: "none",
not_found_handling: "none",
}));

vi.mocked(setupSentry).mockReturnValue(undefined);
});

afterEach(() => {
Expand All @@ -61,3 +65,145 @@ describe("[Asset Worker] loopback", () => {
expect(getAssetWithMetadataFromKV).toBeCalledWith(undefined, "/file.bin");
});
});

describe("[Asset Worker] observability", () => {
const mockCaptureException = vi.fn();
const mockEnterSpan = vi.fn((_name: string, fn: () => unknown) => fn());
const mockRunWithSpanContext = vi.fn((_ctx: unknown, fn: () => unknown) =>
fn()
);

/**
* Builds mock env/ctx and instantiates the outer entrypoint directly.
* This bypasses SELF (which doesn't support RPC in vitest-pool-workers)
* and lets us call outer RPC methods directly.
*/
function buildMocks(innerOverrides?: Record<string, unknown>) {
const env = {
JAEGER: {
enterSpan: mockEnterSpan,
runWithSpanContext: mockRunWithSpanContext,
getSpanContext: () => ({
traceId: "test-trace",
spanId: "test-span",
parentSpanId: null,
traceFlags: 0,
}),
traceId: "test-trace",
spanId: "test-span",
parentSpanId: null,
cfTraceIdHeader: null,
},
SENTRY_DSN: "https://test@sentry.io/123",
SENTRY_ACCESS_CLIENT_ID: "test-client-id",
SENTRY_ACCESS_CLIENT_SECRET: "test-client-secret",
CONFIG: {},
} as unknown as ConstructorParameters<typeof Worker>[1];

const mockCtx = {
waitUntil: () => {},
passThroughOnException: () => {},
exports: {} as unknown,
};

mockCtx.exports = {
AssetWorkerInner: ({ props }: { props: unknown }) => {
const innerCtx = { ...mockCtx, props };
const inner = new AssetWorkerInner(
innerCtx as unknown as ExecutionContext,
env
);
if (innerOverrides) {
for (const [key, value] of Object.entries(innerOverrides)) {
(inner as Record<string, unknown>)[key] = value;
}
}
return inner;
},
};

const outer = new Worker(mockCtx as unknown as ExecutionContext, env);

return { outer, env, ctx: mockCtx };
}

beforeEach(() => {
mockCaptureException.mockClear();
mockEnterSpan.mockClear();
mockRunWithSpanContext.mockClear();
vi.mocked(setupSentry).mockClear();
vi.mocked(setupSentry).mockReturnValue({
captureException: mockCaptureException,
setContext: vi.fn(),
} as unknown as ReturnType<typeof setupSentry>);
});

afterEach(() => {
vi.restoreAllMocks();
});

it("reports errors to Sentry and re-throws when an outer RPC method fails", async ({
expect,
}) => {
const testError = new Error("canFetch failed");
const { outer } = buildMocks({
unstable_canFetch: () => Promise.reject(testError),
});

const request = new IncomingRequest("http://example.com/test");
await expect(outer.unstable_canFetch(request)).rejects.toThrow(
"canFetch failed"
);

expect(setupSentry).toHaveBeenCalled();
expect(mockCaptureException).toHaveBeenCalledWith(testError);
});

it("propagates trace context to inner entrypoint via runWithSpanContext", async ({
expect,
}) => {
// Don't override the inner method -- let the real AssetWorkerInner.unstable_exists
// run so it calls env.JAEGER.runWithSpanContext with the trace context
const { outer } = buildMocks();

await outer.unstable_exists("/test").catch(() => {
// The real implementation may throw due to missing ASSETS_MANIFEST etc.
// We only care that runWithSpanContext was called before the error
});

expect(mockRunWithSpanContext).toHaveBeenCalled();
});

it("skips Sentry when no request is provided to RPC methods", async ({
expect,
}) => {
const testError = new Error("exists failed");
const { outer } = buildMocks({
unstable_exists: () => Promise.reject(testError),
});

await expect(outer.unstable_exists("/test")).rejects.toThrow(
"exists failed"
);

expect(setupSentry).not.toHaveBeenCalled();
expect(mockCaptureException).not.toHaveBeenCalled();
});

it("initializes Sentry when request is provided to RPC methods", async ({
expect,
}) => {
const testError = new Error("getByETag failed");
const { outer } = buildMocks({
unstable_getByETag: () => Promise.reject(testError),
});

const request = new IncomingRequest("http://example.com/test");
await expect(outer.unstable_getByETag("etag", request)).rejects.toThrow(
"getByETag failed"
);

expect(setupSentry).toHaveBeenCalled();
expect(mockCaptureException).toHaveBeenCalledWith(testError);
});
});
116 changes: 92 additions & 24 deletions packages/workers-shared/asset-worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async function unstableExistsImpl(
const analytics = new ExperimentAnalytics(env.EXPERIMENT_ANALYTICS);
const performance = new PerformanceTimer(env.UNSAFE_PERFORMANCE);
const jaeger = env.JAEGER ?? mockJaegerBinding();
return jaeger.enterSpan("unstable_exists", async (span) => {
return jaeger.enterSpan("inner.unstable_exists", async (span) => {
if (env.COLO_METADATA && env.VERSION_METADATA && env.CONFIG) {
analytics.setData({
accountId: env.CONFIG.account_id,
Expand Down Expand Up @@ -145,7 +145,7 @@ async function unstableGetByETagImpl(
): Promise<GetByETagResult> {
const performance = new PerformanceTimer(env.UNSAFE_PERFORMANCE);
const jaeger = env.JAEGER ?? mockJaegerBinding();
return jaeger.enterSpan("unstable_getByETag", async (span) => {
return jaeger.enterSpan("inner.unstable_getByETag", async (span) => {
const startTime = performance.now();
const asset = await getAssetWithMetadataFromKV(
env.ASSETS_KV_NAMESPACE,
Expand Down Expand Up @@ -193,7 +193,7 @@ async function unstableGetByPathnameImpl(
request?: Request
): Promise<GetByETagResult | null> {
const jaeger = env.JAEGER ?? mockJaegerBinding();
return jaeger.enterSpan("unstable_getByPathname", async (span) => {
return jaeger.enterSpan("inner.unstable_getByPathname", async (span) => {
const eTag = await exists(pathname, request);

span.setTags({
Expand Down Expand Up @@ -333,6 +333,40 @@ export default class AssetWorkerOuter<TEnv extends Env = Env>
});
}

/**
* Wraps an RPC method call with Jaeger tracing and Sentry error reporting.
* Creates an outer Jaeger span so that the inner entrypoint's spans are
* connected to the outer's trace via ctx.props.traceContext. If the call
* throws, the error is reported to Sentry (when a request is available for
* context) and then re-thrown to the caller.
*/
private async withObservability<T>(
methodName: string,
fn: () => Promise<T>,
request?: Request
): Promise<T> {
this.env.JAEGER ??= mockJaegerBinding();
const sentry = request
? setupSentry(
request,
this.ctx,
this.env.SENTRY_DSN,
this.env.SENTRY_ACCESS_CLIENT_ID,
this.env.SENTRY_ACCESS_CLIENT_SECRET,
this.env.COLO_METADATA,
this.env.VERSION_METADATA,
this.env.CONFIG?.account_id,
this.env.CONFIG?.script_id
)
: undefined;
try {
return await this.env.JAEGER.enterSpan(`outer.${methodName}`, () => fn());
} catch (err) {
sentry?.captureException(err);
throw err;
}
}

override async fetch(request: Request): Promise<Response> {
this.env.JAEGER ??= mockJaegerBinding();
let sentry: ReturnType<typeof setupSentry> | undefined;
Expand Down Expand Up @@ -377,32 +411,44 @@ export default class AssetWorkerOuter<TEnv extends Env = Env>
}

async unstable_canFetch(request: Request): Promise<boolean> {
this.env.JAEGER ??= mockJaegerBinding();
return this.getInnerEntrypoint().unstable_canFetch(request);
return this.withObservability(
"unstable_canFetch",
() => this.getInnerEntrypoint().unstable_canFetch(request),
request
);
}

async unstable_getByETag(
eTag: string,
request?: Request
): Promise<GetByETagResult> {
this.env.JAEGER ??= mockJaegerBinding();
return this.getInnerEntrypoint().unstable_getByETag(eTag, request);
return this.withObservability(
"unstable_getByETag",
() => this.getInnerEntrypoint().unstable_getByETag(eTag, request),
request
);
}

async unstable_getByPathname(
pathname: string,
request?: Request
): Promise<GetByETagResult | null> {
this.env.JAEGER ??= mockJaegerBinding();
return this.getInnerEntrypoint().unstable_getByPathname(pathname, request);
return this.withObservability(
"unstable_getByPathname",
() => this.getInnerEntrypoint().unstable_getByPathname(pathname, request),
request
);
}

async unstable_exists(
pathname: string,
request?: Request
): Promise<string | null> {
this.env.JAEGER ??= mockJaegerBinding();
return this.getInnerEntrypoint().unstable_exists(pathname, request);
return this.withObservability(
"unstable_exists",
() => this.getInnerEntrypoint().unstable_exists(pathname, request),
request
);
}
}

Expand Down Expand Up @@ -449,39 +495,61 @@ export class AssetWorkerInner<TEnv extends Env = Env>
async unstable_canFetch(request: Request): Promise<boolean> {
// TODO: Mock this with Miniflare
this.env.JAEGER ??= mockJaegerBinding();
const loopbackCtx = this.ctx as AssetWorkerContext;
const traceContext = loopbackCtx.props?.traceContext ?? null;

return canFetch(
request,
this.env,
normalizeConfiguration(this.env.CONFIG),
this.unstable_exists.bind(this)
return this.env.JAEGER.runWithSpanContext(traceContext, () =>
canFetch(
request,
this.env,
normalizeConfiguration(this.env.CONFIG),
this.unstable_exists.bind(this)
)
);
}

async unstable_getByETag(
eTag: string,
request?: Request
): Promise<GetByETagResult> {
return unstableGetByETagImpl(this.env, eTag, request);
this.env.JAEGER ??= mockJaegerBinding();
const loopbackCtx = this.ctx as AssetWorkerContext;
const traceContext = loopbackCtx.props?.traceContext ?? null;

return this.env.JAEGER.runWithSpanContext(traceContext, () =>
unstableGetByETagImpl(this.env, eTag, request)
);
}

async unstable_getByPathname(
pathname: string,
request?: Request
): Promise<GetByETagResult | null> {
return unstableGetByPathnameImpl(
this.env,
this.unstable_exists.bind(this),
this.unstable_getByETag.bind(this),
pathname,
request
this.env.JAEGER ??= mockJaegerBinding();
const loopbackCtx = this.ctx as AssetWorkerContext;
const traceContext = loopbackCtx.props?.traceContext ?? null;

return this.env.JAEGER.runWithSpanContext(traceContext, () =>
unstableGetByPathnameImpl(
this.env,
this.unstable_exists.bind(this),
this.unstable_getByETag.bind(this),
pathname,
request
)
);
}

async unstable_exists(
pathname: string,
request?: Request
): Promise<string | null> {
return unstableExistsImpl(this.env, pathname, request);
this.env.JAEGER ??= mockJaegerBinding();
const loopbackCtx = this.ctx as AssetWorkerContext;
const traceContext = loopbackCtx.props?.traceContext ?? null;

return this.env.JAEGER.runWithSpanContext(traceContext, () =>
unstableExistsImpl(this.env, pathname, request)
);
}
}
6 changes: 3 additions & 3 deletions packages/workers-shared/utils/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ export interface JaegerTracing {
...args: T
): R;
getSpanContext(): SpanContext | null;
runWithSpanContext<T extends unknown[]>(
runWithSpanContext<R, T extends unknown[]>(
spanContext: SpanContext | null,
callback: (...args: T) => unknown,
callback: (...args: T) => R,
...args: T
): unknown;
): R;

readonly traceId: string | null;
readonly spanId: string | null;
Expand Down
Loading