Skip to content

Commit 5a7809a

Browse files
committed
feat(observability): add otel ingest and span sanitization
- add a landing otlp traces proxy with token-based rate limiting and redaction - attach provider, thread, and user telemetry attributes to server spans - switch analytics service to the otel-backed implementation
1 parent 72158b3 commit 5a7809a

25 files changed

Lines changed: 1006 additions & 35 deletions

.codex/config.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
[mcp_servers.Railway]
2+
args = ["@railway/mcp-server"]
3+
command = "npx"
4+
5+
[mcp_servers.chrome-devtools]
6+
args = ["chrome-devtools-mcp@latest", "--browserUrl", "http://127.0.0.1:9222"]
7+
command = "npx"
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import { createHash } from "node:crypto";
2+
3+
import { redactOtlpBody, upsertResourceAttribute, type OtlpTraceBody } from "~/lib/otelRedact";
4+
import { allow, type RateLimitTier } from "~/lib/otelRateLimit";
5+
import { tokenHash, verifyJiraToken } from "~/lib/otelVerifier";
6+
7+
export const runtime = "nodejs";
8+
9+
const SERVICE_NAME = "marcode";
10+
11+
function readBearerToken(request: Request): string | undefined {
12+
const header = request.headers.get("authorization") ?? request.headers.get("Authorization");
13+
if (!header) return undefined;
14+
const match = /^Bearer\s+(.+)$/i.exec(header.trim());
15+
return match?.[1]?.trim() || undefined;
16+
}
17+
18+
function readClientIp(request: Request): string {
19+
const forwardedFor = request.headers.get("x-forwarded-for");
20+
if (forwardedFor) {
21+
const first = forwardedFor.split(",")[0]?.trim();
22+
if (first) return first;
23+
}
24+
return request.headers.get("x-real-ip") ?? "unknown";
25+
}
26+
27+
function parseExtraHeaders(input: string | undefined): Record<string, string> {
28+
if (!input) return {};
29+
const out: Record<string, string> = {};
30+
for (const part of input.split(",")) {
31+
const [name, ...rest] = part.split("=");
32+
if (!name) continue;
33+
const value = rest.join("=").trim();
34+
if (!value) continue;
35+
out[name.trim()] = value;
36+
}
37+
return out;
38+
}
39+
40+
function isOtlpTraceBody(value: unknown): value is OtlpTraceBody {
41+
return (
42+
typeof value === "object" &&
43+
value !== null &&
44+
"resourceSpans" in value &&
45+
Array.isArray((value as { resourceSpans: unknown }).resourceSpans)
46+
);
47+
}
48+
49+
export async function POST(request: Request): Promise<Response> {
50+
const endpoint = process.env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT;
51+
if (!endpoint) {
52+
return Response.json({ error: "OTEL ingest not configured" }, { status: 503 });
53+
}
54+
55+
const ip = readClientIp(request);
56+
const ipHash = createHash("sha256").update(ip).digest("hex").slice(0, 16);
57+
const token = readBearerToken(request);
58+
const limitKey = `${ip}:${token ? tokenHash(token) : "anon"}`;
59+
60+
const verification = token ? await verifyJiraToken(token) : { valid: false, isGenesis: false };
61+
const isGenesis = verification.isGenesis;
62+
const tier: RateLimitTier = verification.valid ? "authed" : "anonymous";
63+
64+
if (!isGenesis) {
65+
const result = allow(limitKey, tier);
66+
if (!result.allowed) {
67+
return new Response(JSON.stringify({ error: "Rate limit exceeded" }), {
68+
status: 429,
69+
headers: {
70+
"Content-Type": "application/json",
71+
"Retry-After": Math.max(1, Math.ceil((result.resetAt - Date.now()) / 1000)).toString(),
72+
},
73+
});
74+
}
75+
}
76+
77+
let body: unknown;
78+
try {
79+
body = await request.json();
80+
} catch {
81+
return Response.json({ error: "Invalid JSON body" }, { status: 400 });
82+
}
83+
if (!isOtlpTraceBody(body)) {
84+
return Response.json({ error: "Body must be OTLP/JSON with resourceSpans" }, { status: 400 });
85+
}
86+
87+
upsertResourceAttribute(body, "service.name", { stringValue: SERVICE_NAME });
88+
upsertResourceAttribute(body, "user.is_genesis", { boolValue: isGenesis });
89+
upsertResourceAttribute(body, "ingest.client_ip_hash", { stringValue: ipHash });
90+
redactOtlpBody(body);
91+
92+
const extraHeaders = parseExtraHeaders(process.env.OTEL_EXPORTER_OTLP_TRACES_HEADERS);
93+
let upstream: Response;
94+
try {
95+
upstream = await fetch(endpoint, {
96+
method: "POST",
97+
headers: { "Content-Type": "application/json", ...extraHeaders },
98+
body: JSON.stringify(body),
99+
});
100+
} catch (cause) {
101+
return Response.json({ error: "Trace export failed", detail: String(cause) }, { status: 502 });
102+
}
103+
if (!upstream.ok) {
104+
const text = await upstream.text().catch(() => "");
105+
return new Response(JSON.stringify({ error: "Tempo rejected the trace", detail: text }), {
106+
status: 502,
107+
headers: { "Content-Type": "application/json" },
108+
});
109+
}
110+
return new Response(null, { status: 204 });
111+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/**
2+
* Sliding-window rate limiter for the OTEL ingest endpoint.
3+
*
4+
* - Authed (valid Jira token): 120 req/min per (IP, tokenHash) bucket.
5+
* - Anonymous (no token / invalid): 30 req/min per IP bucket.
6+
* - Genesis: bypass entirely (caller skips the check).
7+
*
8+
* In-memory single-instance store. Adequate for the current Dockerized
9+
* single-replica landing deploy; swap for Redis if we ever scale horizontally.
10+
*/
11+
const RATE_WINDOW_MS = 60 * 1000;
12+
const AUTHED_LIMIT_PER_WINDOW = 120;
13+
const ANONYMOUS_LIMIT_PER_WINDOW = 30;
14+
const PRUNE_INTERVAL_MS = 10 * 1000;
15+
16+
const buckets = new Map<string, number[]>();
17+
18+
let janitorStarted = false;
19+
function ensureJanitor() {
20+
if (janitorStarted) return;
21+
janitorStarted = true;
22+
const interval = setInterval(() => {
23+
const cutoff = Date.now() - RATE_WINDOW_MS;
24+
for (const [key, timestamps] of buckets) {
25+
const fresh = timestamps.filter((t) => t > cutoff);
26+
if (fresh.length === 0) {
27+
buckets.delete(key);
28+
} else if (fresh.length !== timestamps.length) {
29+
buckets.set(key, fresh);
30+
}
31+
}
32+
}, PRUNE_INTERVAL_MS);
33+
if (typeof interval === "object" && interval !== null && "unref" in interval) {
34+
(interval as { unref: () => void }).unref();
35+
}
36+
}
37+
38+
export type RateLimitTier = "authed" | "anonymous";
39+
40+
export interface AllowResult {
41+
readonly allowed: boolean;
42+
readonly remaining: number;
43+
readonly resetAt: number;
44+
}
45+
46+
export function allow(key: string, tier: RateLimitTier): AllowResult {
47+
ensureJanitor();
48+
const limit = tier === "authed" ? AUTHED_LIMIT_PER_WINDOW : ANONYMOUS_LIMIT_PER_WINDOW;
49+
const now = Date.now();
50+
const cutoff = now - RATE_WINDOW_MS;
51+
52+
const timestamps = (buckets.get(key) ?? []).filter((t) => t > cutoff);
53+
if (timestamps.length >= limit) {
54+
buckets.set(key, timestamps);
55+
const oldest = timestamps[0] ?? now;
56+
return {
57+
allowed: false,
58+
remaining: 0,
59+
resetAt: oldest + RATE_WINDOW_MS,
60+
};
61+
}
62+
63+
timestamps.push(now);
64+
buckets.set(key, timestamps);
65+
return {
66+
allowed: true,
67+
remaining: limit - timestamps.length,
68+
resetAt: now + RATE_WINDOW_MS,
69+
};
70+
}

apps/landing/src/lib/otelRedact.ts

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/**
2+
* OTLP body redactor.
3+
*
4+
* Walks the standard OTLP/JSON shape and drops attribute entries whose key
5+
* matches a deny-list of suffixes. Mirrors the server-side
6+
* `OTEL_DENY_SUFFIXES` in `apps/server/src/observability/Attributes.ts` so
7+
* sensitive data has two lines of defense.
8+
*/
9+
export const OTEL_DENY_SUFFIXES: ReadonlyArray<string> = [
10+
"cwd",
11+
"path",
12+
"file_path",
13+
"filename",
14+
"directory",
15+
"command_line",
16+
"argv",
17+
"cmdline",
18+
"body",
19+
"content",
20+
"text",
21+
"message_text",
22+
"prompt",
23+
"completion",
24+
"diff",
25+
"patch",
26+
"email",
27+
"username",
28+
"account_id",
29+
"url",
30+
"token",
31+
"secret",
32+
"key",
33+
"password",
34+
"authorization",
35+
"title",
36+
"summary",
37+
"description",
38+
];
39+
40+
export function isDeniedAttributeKey(key: string): boolean {
41+
for (const suffix of OTEL_DENY_SUFFIXES) {
42+
if (key === suffix || key.endsWith(`.${suffix}`)) return true;
43+
}
44+
return false;
45+
}
46+
47+
interface OtlpKeyValue {
48+
key: string;
49+
value?: unknown;
50+
}
51+
52+
function filterAttributes(attrs: unknown): OtlpKeyValue[] {
53+
if (!Array.isArray(attrs)) return [];
54+
return (attrs as OtlpKeyValue[]).filter(
55+
(entry) =>
56+
typeof entry === "object" &&
57+
entry !== null &&
58+
typeof entry.key === "string" &&
59+
!isDeniedAttributeKey(entry.key),
60+
);
61+
}
62+
63+
interface OtlpSpan {
64+
attributes?: OtlpKeyValue[];
65+
events?: Array<{ attributes?: OtlpKeyValue[] }>;
66+
}
67+
68+
interface OtlpScopeSpans {
69+
spans?: OtlpSpan[];
70+
}
71+
72+
interface OtlpResource {
73+
attributes?: OtlpKeyValue[];
74+
}
75+
76+
interface OtlpResourceSpans {
77+
resource?: OtlpResource;
78+
scopeSpans?: OtlpScopeSpans[];
79+
}
80+
81+
export interface OtlpTraceBody {
82+
resourceSpans?: OtlpResourceSpans[];
83+
}
84+
85+
/**
86+
* redactOtlpBody - mutates and returns the body with deny-listed attributes
87+
* removed from resource, span, and span event attribute arrays.
88+
*/
89+
export function redactOtlpBody(body: OtlpTraceBody): OtlpTraceBody {
90+
for (const rs of body.resourceSpans ?? []) {
91+
if (rs.resource) {
92+
rs.resource.attributes = filterAttributes(rs.resource.attributes);
93+
}
94+
for (const ss of rs.scopeSpans ?? []) {
95+
for (const span of ss.spans ?? []) {
96+
span.attributes = filterAttributes(span.attributes);
97+
for (const event of span.events ?? []) {
98+
event.attributes = filterAttributes(event.attributes);
99+
}
100+
}
101+
}
102+
}
103+
return body;
104+
}
105+
106+
/**
107+
* upsertResourceAttribute - inserts or replaces a key/value entry on every
108+
* resourceSpans[i].resource.attributes. Used to OVERRIDE `service.name` and
109+
* stamp `user.is_genesis`/`ingest.client_ip_hash` server-side regardless of
110+
* what the client sent.
111+
*/
112+
export function upsertResourceAttribute(
113+
body: OtlpTraceBody,
114+
key: string,
115+
value: { stringValue?: string; boolValue?: boolean; intValue?: string },
116+
): void {
117+
for (const rs of body.resourceSpans ?? []) {
118+
rs.resource = rs.resource ?? { attributes: [] };
119+
const attrs: OtlpKeyValue[] = Array.isArray(rs.resource.attributes)
120+
? rs.resource.attributes
121+
: [];
122+
const existingIndex = attrs.findIndex((entry) => entry.key === key);
123+
const next: OtlpKeyValue = { key, value };
124+
if (existingIndex >= 0) {
125+
attrs[existingIndex] = next;
126+
} else {
127+
attrs.push(next);
128+
}
129+
rs.resource.attributes = attrs;
130+
}
131+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { createHash } from "node:crypto";
2+
3+
const ATLASSIAN_ME_URL = "https://api.atlassian.com/me";
4+
const VERIFY_CACHE_TTL_MS = 5 * 60 * 1000;
5+
const GENESIS_EMAIL_SUFFIXES = ["@gen.tech", "@obrio.co"] as const;
6+
7+
export interface VerifyJiraTokenResult {
8+
readonly valid: boolean;
9+
readonly email?: string;
10+
readonly isGenesis: boolean;
11+
}
12+
13+
interface CachedEntry {
14+
readonly result: VerifyJiraTokenResult;
15+
readonly expiresAt: number;
16+
}
17+
18+
const cache = new Map<string, CachedEntry>();
19+
20+
export function tokenHash(token: string): string {
21+
return createHash("sha256").update(token).digest("hex").slice(0, 16);
22+
}
23+
24+
function isGenesisEmail(email: string | undefined): boolean {
25+
if (!email) return false;
26+
const normalized = email.trim().toLowerCase();
27+
return GENESIS_EMAIL_SUFFIXES.some((suffix) => normalized.endsWith(suffix));
28+
}
29+
30+
export async function verifyJiraToken(token: string): Promise<VerifyJiraTokenResult> {
31+
const key = tokenHash(token);
32+
const now = Date.now();
33+
const cached = cache.get(key);
34+
if (cached && cached.expiresAt > now) {
35+
return cached.result;
36+
}
37+
38+
let result: VerifyJiraTokenResult = { valid: false, isGenesis: false };
39+
try {
40+
const response = await fetch(ATLASSIAN_ME_URL, {
41+
headers: { Authorization: `Bearer ${token}`, Accept: "application/json" },
42+
});
43+
if (response.ok) {
44+
const body = (await response.json()) as { email?: unknown };
45+
const email = typeof body.email === "string" ? body.email : undefined;
46+
result = { valid: true, isGenesis: isGenesisEmail(email), ...(email ? { email } : {}) };
47+
}
48+
} catch {
49+
// Network failure → treat as anonymous; don't bubble up.
50+
}
51+
52+
cache.set(key, { result, expiresAt: now + VERIFY_CACHE_TTL_MS });
53+
return result;
54+
}

0 commit comments

Comments
 (0)