Skip to content

Commit 39ca118

Browse files
committed
analytics event tracking
1 parent a1da657 commit 39ca118

10 files changed

Lines changed: 892 additions & 23 deletions

File tree

apps/backend/scripts/clickhouse-migrations.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ export async function runClickhouseMigrations() {
2020
await client.exec({ query: TOKEN_REFRESH_EVENT_ROW_FORMAT_MUTATION_SQL });
2121
await client.exec({ query: BACKFILL_REFRESH_TOKEN_ID_COLUMN_SQL });
2222
await client.exec({ query: SIGN_UP_RULE_TRIGGER_EVENT_ROW_FORMAT_MUTATION_SQL });
23+
// Recreate the events view so SELECT * picks up columns added by EVENTS_ADD_REPLAY_COLUMNS_SQL
24+
await client.exec({ query: EVENTS_VIEW_SQL });
2325
const queries = [
2426
"REVOKE ALL PRIVILEGES ON *.* FROM limited_user;",
2527
"REVOKE ALL FROM limited_user;",
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import { getClickhouseAdminClient } from "@/lib/clickhouse";
2+
import { findRecentSessionReplay } from "@/lib/session-replays";
3+
import { getPrismaClientForTenancy } from "@/prisma-client";
4+
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
5+
import { KnownErrors } from "@stackframe/stack-shared";
6+
import { adaptSchema, clientOrHigherAuthTypeSchema, yupArray, yupMixed, yupNumber, yupObject, yupString } from "@stackframe/stack-shared/dist/schema-fields";
7+
import { StatusError } from "@stackframe/stack-shared/dist/utils/errors";
8+
9+
const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-8][0-9a-f]{3}-[089ab][0-9a-f]{3}-[0-9a-f]{12}$/i;
10+
11+
const MAX_EVENTS = 500;
12+
13+
export const POST = createSmartRouteHandler({
14+
metadata: {
15+
summary: "Upload analytics event batch",
16+
description: "Uploads a batch of auto-captured analytics events ($page-view, $click).",
17+
tags: ["Analytics Events"],
18+
hidden: true,
19+
},
20+
request: yupObject({
21+
auth: yupObject({
22+
type: clientOrHigherAuthTypeSchema,
23+
tenancy: adaptSchema,
24+
user: adaptSchema,
25+
refreshTokenId: adaptSchema,
26+
}).defined(),
27+
body: yupObject({
28+
session_replay_segment_id: yupString().defined().matches(UUID_RE, "Invalid session_replay_segment_id"),
29+
batch_id: yupString().defined().matches(UUID_RE, "Invalid batch_id"),
30+
sent_at_ms: yupNumber().defined().integer().min(0),
31+
events: yupArray(
32+
yupObject({
33+
event_type: yupString().defined().oneOf(["$page-view", "$click"]),
34+
event_at_ms: yupNumber().defined().integer().min(0),
35+
data: yupMixed().defined(),
36+
}).defined(),
37+
).defined().min(1).max(MAX_EVENTS),
38+
}).defined(),
39+
}),
40+
response: yupObject({
41+
statusCode: yupNumber().oneOf([200]).defined(),
42+
bodyType: yupString().oneOf(["json"]).defined(),
43+
body: yupObject({
44+
inserted: yupNumber().defined(),
45+
}).defined(),
46+
}),
47+
async handler({ auth, body }) {
48+
if (!auth.tenancy.config.apps.installed["analytics"]?.enabled) {
49+
return {
50+
statusCode: 200,
51+
bodyType: "json",
52+
body: { inserted: 0 },
53+
};
54+
}
55+
if (!auth.user) {
56+
throw new KnownErrors.UserAuthenticationRequired();
57+
}
58+
if (!auth.refreshTokenId) {
59+
throw new StatusError(StatusError.BadRequest, "A refresh token is required for analytics events");
60+
}
61+
62+
const projectId = auth.tenancy.project.id;
63+
const branchId = auth.tenancy.branchId;
64+
const userId = auth.user.id;
65+
const refreshTokenId = auth.refreshTokenId;
66+
const tenancyId = auth.tenancy.id;
67+
68+
const prisma = await getPrismaClientForTenancy(auth.tenancy);
69+
const recentSession = await findRecentSessionReplay(prisma, { tenancyId, refreshTokenId });
70+
71+
const clickhouseClient = getClickhouseAdminClient();
72+
73+
const rows = body.events.map((event) => ({
74+
event_type: event.event_type,
75+
event_at: new Date(event.event_at_ms),
76+
data: event.data as Record<string, unknown>,
77+
project_id: projectId,
78+
branch_id: branchId,
79+
user_id: userId,
80+
team_id: null,
81+
refresh_token_id: refreshTokenId,
82+
session_replay_id: recentSession?.id ?? null,
83+
session_replay_segment_id: body.session_replay_segment_id,
84+
}));
85+
86+
await clickhouseClient.insert({
87+
table: "analytics_internal.events",
88+
values: rows,
89+
format: "JSONEachRow",
90+
clickhouse_settings: {
91+
date_time_input_format: "best_effort",
92+
async_insert: 1,
93+
},
94+
});
95+
96+
return {
97+
statusCode: 200,
98+
bodyType: "json",
99+
body: { inserted: body.events.length },
100+
};
101+
},
102+
});

apps/backend/src/app/api/latest/session-replays/batch/route.tsx

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { getPrismaClientForTenancy } from "@/prisma-client";
22
import { uploadBytes } from "@/s3";
33
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
44
import { Prisma } from "@/generated/prisma/client";
5+
import { findRecentSessionReplay } from "@/lib/session-replays";
56
import { KnownErrors } from "@stackframe/stack-shared";
67
import { adaptSchema, clientOrHigherAuthTypeSchema, yupArray, yupMixed, yupNumber, yupObject, yupString } from "@stackframe/stack-shared/dist/schema-fields";
78
import { StatusError } from "@stackframe/stack-shared/dist/utils/errors";
@@ -15,8 +16,6 @@ const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-8][0-9a-f]{3}-[089ab][0-9a-f]{3}-[0
1516

1617
const MAX_BODY_BYTES = 5_000_000;
1718
const MAX_EVENTS = 5_000;
18-
const SESSION_IDLE_TIMEOUT_MS = 3 * 60 * 1000;
19-
const MAX_SESSION_DURATION_MS = 12 * 60 * 60 * 1000;
2019

2120
function extractEventTimesMs(events: unknown[], fallbackMs: number) {
2221
let minTs = Infinity;
@@ -114,22 +113,7 @@ export const POST = createSmartRouteHandler({
114113
const { firstMs, lastMs } = extractEventTimesMs(body.events, body.sent_at_ms);
115114

116115
const prisma = await getPrismaClientForTenancy(auth.tenancy);
117-
118-
// Find a recent session replay for this refresh token (temporal grouping).
119-
// If the last batch arrived within SESSION_IDLE_TIMEOUT_MS, reuse that replay.
120-
// Also enforce a max session duration so replays don't grow indefinitely.
121-
const cutoff = new Date(Date.now() - SESSION_IDLE_TIMEOUT_MS);
122-
const maxDurationCutoff = new Date(Date.now() - MAX_SESSION_DURATION_MS);
123-
const recentSession = await prisma.sessionReplay.findFirst({
124-
where: {
125-
tenancyId,
126-
refreshTokenId,
127-
updatedAt: { gte: cutoff },
128-
startedAt: { gte: maxDurationCutoff },
129-
},
130-
orderBy: { updatedAt: "desc" },
131-
select: { id: true, startedAt: true, lastEventAt: true },
132-
});
116+
const recentSession = await findRecentSessionReplay(prisma, { tenancyId, refreshTokenId });
133117

134118
const replayId = recentSession?.id ?? randomUUID();
135119
const s3Key = `session-replays/${projectId}/${branchId}/${replayId}/${batchId}.json.gz`;
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import { PrismaClient } from "@/generated/prisma/client";
2+
import { PrismaClientWithReplica } from "@/prisma-client";
3+
4+
export const SESSION_IDLE_TIMEOUT_MS = 3 * 60 * 1000;
5+
export const MAX_SESSION_DURATION_MS = 12 * 60 * 60 * 1000;
6+
7+
export async function findRecentSessionReplay(prisma: PrismaClientWithReplica<PrismaClient>, options: {
8+
tenancyId: string,
9+
refreshTokenId: string,
10+
}) {
11+
const cutoff = new Date(Date.now() - SESSION_IDLE_TIMEOUT_MS);
12+
const maxDurationCutoff = new Date(Date.now() - MAX_SESSION_DURATION_MS);
13+
return await prisma.sessionReplay.findFirst({
14+
where: {
15+
tenancyId: options.tenancyId,
16+
refreshTokenId: options.refreshTokenId,
17+
updatedAt: { gte: cutoff },
18+
startedAt: { gte: maxDurationCutoff },
19+
},
20+
orderBy: { updatedAt: "desc" },
21+
select: { id: true, startedAt: true, lastEventAt: true },
22+
});
23+
}

0 commit comments

Comments
 (0)