11import type { Prisma } from "@/generated/prisma/client" ;
22import { getClickhouseAdminClient , isClickhouseConfigured } from "@/lib/clickhouse" ;
3+ import { endUserIpInfoSchema , type EndUserIpInfo } from "@/lib/events" ;
34import { DEFAULT_BRANCH_ID } from "@/lib/tenancies" ;
45import { globalPrismaClient } from "@/prisma-client" ;
56import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler" ;
67import { yupNumber , yupObject , yupString } from "@stackframe/stack-shared/dist/schema-fields" ;
7- import { StatusError } from "@stackframe/stack-shared/dist/utils/errors" ;
8+ import { StackAssertionError , StatusError , throwErr } from "@stackframe/stack-shared/dist/utils/errors" ;
89
910type Cursor = {
1011 created_at_millis : number ,
@@ -22,41 +23,86 @@ const parseMillisOrThrow = (value: number | undefined, field: string) => {
2223 return parsed ;
2324} ;
2425
25- const createClickhouseRows = ( event : {
26+ type EventWithIpInfo = {
2627 id : string ,
2728 systemEventTypeIds : string [ ] ,
28- data : any ,
29+ data : unknown ,
2930 eventEndedAt : Date ,
3031 eventStartedAt : Date ,
3132 isWide : boolean ,
32- } ) => {
33+ isEndUserIpInfoGuessTrusted : boolean ,
34+ endUserIpInfoGuess : {
35+ ip : string ,
36+ countryCode : string | null ,
37+ regionCode : string | null ,
38+ cityName : string | null ,
39+ latitude : number | null ,
40+ longitude : number | null ,
41+ tzIdentifier : string | null ,
42+ } | null ,
43+ } ;
44+
45+ /**
46+ * Transforms a $session-activity event from Postgres into a $token-refresh event for ClickHouse.
47+ *
48+ * The $session-activity event has:
49+ * - sessionId (from SessionActivityEventType)
50+ * - userId, branchId, isAnonymous, teamId (from UserActivityEventType)
51+ * - projectId (from ProjectEventType via inheritance)
52+ *
53+ * The $token-refresh event needs:
54+ * - projectId, branchId, organizationId (null), userId, refreshTokenId, isAnonymous, ipInfo
55+ */
56+ const createClickhouseRow = ( event : EventWithIpInfo ) => {
3357 const dataRecord = typeof event . data === "object" && event . data !== null ? event . data as Record < string , unknown > : { } ;
34- const clickhouseEventData = {
35- ...dataRecord ,
36- is_wide : event . isWide ,
37- event_started_at : event . eventStartedAt ,
38- event_ended_at : event . eventEndedAt ,
39- } ;
40- const projectId = typeof dataRecord . projectId === "string" ? dataRecord . projectId : "" ;
41- const branchId = DEFAULT_BRANCH_ID ;
42- const userId = typeof dataRecord . userId === "string" ? dataRecord . userId : "" ;
43- const teamId = typeof dataRecord . teamId === "string" ? dataRecord . teamId : "" ;
44- const sessionId = typeof dataRecord . sessionId === "string" ? dataRecord . sessionId : "" ;
58+
59+ // Extract fields from the old $session-activity format
60+ const projectId = typeof dataRecord . projectId === "string" ? dataRecord . projectId : throwErr ( new StackAssertionError ( "projectId is required" ) ) ;
61+ const branchId = typeof dataRecord . branchId === "string" ? dataRecord . branchId : DEFAULT_BRANCH_ID ;
62+ const userId = typeof dataRecord . userId === "string" && dataRecord . userId ? dataRecord . userId : throwErr ( new StackAssertionError ( "userId is required" ) ) ;
63+ // sessionId becomes refreshTokenId in the new schema
64+ const refreshTokenId = typeof dataRecord . sessionId === "string" ? dataRecord . sessionId : throwErr ( new StackAssertionError ( "sessionId is required" ) ) ;
65+ // isAnonymous may not exist on old events, default to false
4566 const isAnonymous = typeof dataRecord . isAnonymous === "boolean" ? dataRecord . isAnonymous : false ;
4667
47- const eventTypes = [ ...new Set ( event . systemEventTypeIds ) ] ;
68+ // Build ipInfo from the event's endUserIpInfoGuess relation
69+ let ipInfo : EndUserIpInfo | null = null ;
70+ if ( event . endUserIpInfoGuess ) {
71+ const ip = event . endUserIpInfoGuess ;
72+ ipInfo = {
73+ ip : ip . ip ,
74+ isTrusted : event . isEndUserIpInfoGuessTrusted ,
75+ countryCode : ip . countryCode ?? undefined ,
76+ regionCode : ip . regionCode ?? undefined ,
77+ cityName : ip . cityName ?? undefined ,
78+ latitude : ip . latitude ?? undefined ,
79+ longitude : ip . longitude ?? undefined ,
80+ tzIdentifier : ip . tzIdentifier ?? undefined ,
81+ } ;
82+ // Validate against schema
83+ ipInfo = endUserIpInfoSchema . nullable ( ) . defined ( ) . validateSync ( ipInfo , { stripUnknown : true } ) ;
84+ }
85+
86+ // Build the data object matching TokenRefreshEventType schema
87+ const tokenRefreshData = {
88+ projectId,
89+ branchId,
90+ organizationId : null ,
91+ userId : userId ,
92+ refreshTokenId,
93+ isAnonymous,
94+ ipInfo,
95+ } ;
4896
49- return eventTypes . map ( eventType => ( {
50- event_type : eventType ,
97+ return {
98+ event_type : '$token-refresh' ,
5199 event_at : event . eventEndedAt ,
52- data : clickhouseEventData ,
100+ data : tokenRefreshData ,
53101 project_id : projectId ,
54102 branch_id : branchId ,
55103 user_id : userId ,
56- team_id : teamId ,
57- session_id : sessionId ,
58- is_anonymous : isAnonymous ,
59- } ) ) ;
104+ team_id : null ,
105+ } ;
60106} ;
61107
62108export const POST = createSmartRouteHandler ( {
@@ -106,11 +152,17 @@ export const POST = createSmartRouteHandler({
106152 const cursorId = body . cursor ?. id ;
107153 const limit = body . limit ;
108154
155+ const laterOfMinCreatedAtOrCursorCreatedAt = ! cursorCreatedAt || minCreatedAt > cursorCreatedAt ? minCreatedAt : cursorCreatedAt ;
156+
109157 const baseWhere : Prisma . EventWhereInput = {
110158 createdAt : {
111- gte : minCreatedAt ,
159+ gte : laterOfMinCreatedAtOrCursorCreatedAt ,
112160 lt : maxCreatedAt ,
113161 } ,
162+ // Only migrate $session-activity events (translated to $token-refresh in ClickHouse)
163+ systemEventTypeIds : {
164+ has : '$session-activity' ,
165+ } ,
114166 } ;
115167
116168 const cursorFilter : Prisma . EventWhereInput | undefined = ( cursorCreatedAt && cursorId ) ? {
@@ -131,6 +183,9 @@ export const POST = createSmartRouteHandler({
131183 { id : "asc" } ,
132184 ] ,
133185 take : limit ,
186+ include : {
187+ endUserIpInfoGuess : true ,
188+ } ,
134189 } ) ;
135190
136191 let insertedRows = 0 ;
@@ -141,22 +196,19 @@ export const POST = createSmartRouteHandler({
141196 throw new StatusError ( StatusError . ServiceUnavailable , "ClickHouse is not configured" ) ;
142197 }
143198 const clickhouseClient = getClickhouseAdminClient ( ) ;
144- const rowsByEvent = events . map ( createClickhouseRows ) ;
145- const rowsToInsert = rowsByEvent . flat ( ) ;
146- migratedEvents = rowsByEvent . reduce ( ( acc , rows ) => acc + ( rows . length ? 1 : 0 ) , 0 ) ;
147-
148- if ( rowsToInsert . length ) {
149- await clickhouseClient . insert ( {
150- table : "analytics_internal.events" ,
151- values : rowsToInsert ,
152- format : "JSONEachRow" ,
153- clickhouse_settings : {
154- date_time_input_format : "best_effort" ,
155- async_insert : 1 ,
156- } ,
157- } ) ;
158- insertedRows = rowsToInsert . length ;
159- }
199+ const rowsToInsert = events . map ( createClickhouseRow ) ;
200+ migratedEvents = events . length ;
201+
202+ await clickhouseClient . insert ( {
203+ table : "analytics_internal.events" ,
204+ values : rowsToInsert ,
205+ format : "JSONEachRow" ,
206+ clickhouse_settings : {
207+ date_time_input_format : "best_effort" ,
208+ async_insert : 1 ,
209+ } ,
210+ } ) ;
211+ insertedRows = rowsToInsert . length ;
160212 }
161213
162214 const lastEvent = events . at ( - 1 ) ;
0 commit comments