diff --git a/apps/dotcom/client/src/pages/admin.tsx b/apps/dotcom/client/src/pages/admin.tsx index c0f75839c357..d782ae1e738f 100644 --- a/apps/dotcom/client/src/pages/admin.tsx +++ b/apps/dotcom/client/src/pages/admin.tsx @@ -488,13 +488,19 @@ function BatchMigrateUsersToGroups() { const [progressLog, setProgressLog] = useState([]) const [error, setError] = useState(null as string | null) const [isComplete, setIsComplete] = useState(false) - const [stats, setStats] = useState({ successCount: 0, failureCount: 0, totalUsers: 0 }) + const [stats, setStats] = useState( + {} as { + successCount: number + failureCount: number + totalUsers: number + usersToMigrate: number + progress: number + } + ) const [unmigratedCount, setUnmigratedCount] = useState(null) const [isLoadingCount, setIsLoadingCount] = useState(false) const [eventSource, setEventSource] = useState(null) - const [batchSize, setBatchSize] = useState(100) - const [batchSleepMs, setBatchSleepMs] = useState(100) - const [maxUsers, setMaxUsers] = useState('') + const [sleepMs, setSleepMs] = useState(100) const logContainerRef = useRef(null) // Cleanup EventSource on unmount @@ -540,9 +546,7 @@ function BatchMigrateUsersToGroups() { }, [eventSource]) const onMigrate = useCallback(async () => { - const migrationMessage = maxUsers - ? `Are you sure you want to migrate up to ${maxUsers} users without the groups_backend flag? This action cannot be undone.` - : `Are you sure you want to migrate ALL users without the groups_backend flag? This action cannot be undone.` + const migrationMessage = `Are you sure you want to migrate ALL users without the groups_backend flag? This action cannot be undone.` if (!window.confirm(migrationMessage)) { return @@ -552,16 +556,12 @@ function BatchMigrateUsersToGroups() { setError(null) setProgressLog([]) setIsComplete(false) - setStats({ successCount: 0, failureCount: 0, totalUsers: 0 }) + setStats({ successCount: 0, failureCount: 0, totalUsers: 0, usersToMigrate: 0, progress: 0 }) try { const params = new URLSearchParams({ - batchSize: batchSize.toString(), - batchSleepMs: batchSleepMs.toString(), + sleepMs: sleepMs.toString(), }) - if (maxUsers) { - params.set('maxUsers', maxUsers.toString()) - } const es = new EventSource(`/api/app/admin/migrate_users_batch?${params}`) setEventSource(es) @@ -579,16 +579,7 @@ function BatchMigrateUsersToGroups() { // Update stats from details if (data.details) { - if (data.details.totalUsers !== undefined) { - setStats((prev) => ({ ...prev, totalUsers: data.details.totalUsers })) - } - if (data.details.successCount !== undefined && data.details.failureCount !== undefined) { - setStats({ - totalUsers: data.details.totalUsers || 0, - successCount: data.details.successCount, - failureCount: data.details.failureCount, - }) - } + setStats(data.details) } if (data.type === 'complete') { @@ -615,7 +606,7 @@ function BatchMigrateUsersToGroups() { setIsMigrating(false) setEventSource(null) } - }, [batchSize, batchSleepMs, maxUsers]) + }, [sleepMs]) return (
@@ -640,9 +631,8 @@ function BatchMigrateUsersToGroups() {

This will migrate all users who don't have the groups_backend flag. The process will - run sequentially and report progress in real-time. Configure the batch size (number of users - processed before a pause), sleep duration (milliseconds to wait between batches), and max - users (limit for incremental rollout, leave empty to migrate all) below. + run sequentially (one user at a time) and report progress in real-time. Configure the sleep + duration (milliseconds to wait between each user migration) below.

{error &&
{error}
} @@ -650,45 +640,18 @@ function BatchMigrateUsersToGroups() { {/* Configuration Inputs */}
- + setBatchSize(Number(e.target.value))} - disabled={isMigrating} - min={1} - className={styles.searchInput} - style={{ width: '100px', marginLeft: '8px' }} - /> -
-
- - setBatchSleepMs(Number(e.target.value))} + value={sleepMs} + onChange={(e) => setSleepMs(Number(e.target.value))} disabled={isMigrating} min={0} className={styles.searchInput} style={{ width: '100px', marginLeft: '8px' }} />
-
- - setMaxUsers(e.target.value === '' ? '' : Number(e.target.value))} - disabled={isMigrating} - min={1} - placeholder="All users" - className={styles.searchInput} - style={{ width: '100px', marginLeft: '8px' }} - /> -
{/* Stats Display */} @@ -699,8 +662,8 @@ function BatchMigrateUsersToGroups() { {stats.totalUsers}
- Completed: - {stats.successCount + stats.failureCount} + Users to Migrate: + {stats.usersToMigrate}
Succeeded: @@ -712,9 +675,7 @@ function BatchMigrateUsersToGroups() {
Progress: - - {Math.round(((stats.successCount + stats.failureCount) / stats.totalUsers) * 100)}% - + {(stats.progress * 100).toFixed(2)}%
)} diff --git a/apps/dotcom/sync-worker/src/TLUserDurableObject.ts b/apps/dotcom/sync-worker/src/TLUserDurableObject.ts index 8a580f06cf29..edc2624bfa06 100644 --- a/apps/dotcom/sync-worker/src/TLUserDurableObject.ts +++ b/apps/dotcom/sync-worker/src/TLUserDurableObject.ts @@ -565,6 +565,7 @@ export class TLUserDurableObject extends DurableObject { async admin_migrateToGroups(userId: string, inviteSecret: string | null = null) { this.userId ??= userId + this.log.debug('migrating to groups', userId, inviteSecret) // Call the Postgres migration function const result = await sql<{ files_migrated: number @@ -572,9 +573,13 @@ export class TLUserDurableObject extends DurableObject { flag_added: boolean }>`SELECT * FROM migrate_user_to_groups(${userId}, ${inviteSecret})`.execute(this.db) - // Reboot the user's cache to pick up the new data structure + this.log.debug('migration result', result.rows[0]) + + await this.env.USER_DO_SNAPSHOTS.delete(getUserDoSnapshotKey(this.env, userId)) await this.cache?.reboot({ delay: false, source: 'admin', hard: true }) + this.log.debug('migration complete, user rebooted') + return result.rows[0] } diff --git a/apps/dotcom/sync-worker/src/UserDataSyncer.ts b/apps/dotcom/sync-worker/src/UserDataSyncer.ts index d21202b295f4..4a347f2d861b 100644 --- a/apps/dotcom/sync-worker/src/UserDataSyncer.ts +++ b/apps/dotcom/sync-worker/src/UserDataSyncer.ts @@ -262,7 +262,7 @@ export class UserDataSyncer { this.ctx.abort() return } - this.log.debug('rebooting', source) + this.log.debug('rebooting', source, 'hard:', hard, 'delay:', delay) this.logEvent({ type: 'reboot', id: this.userId }) await this.queue.push(async () => { if (delay) { diff --git a/apps/dotcom/sync-worker/src/adminRoutes.ts b/apps/dotcom/sync-worker/src/adminRoutes.ts index 2600c0943692..c1fc3e7edc61 100644 --- a/apps/dotcom/sync-worker/src/adminRoutes.ts +++ b/apps/dotcom/sync-worker/src/adminRoutes.ts @@ -2,6 +2,7 @@ import { TlaFile } from '@tldraw/dotcom-shared' import { assert, sleep, uniqueId } from '@tldraw/utils' import { createRouter } from '@tldraw/worker-shared' import { StatusError, json } from 'itty-router' +import { sql } from 'kysely' import { createPostgresConnectionPool } from './postgres' import { returnFileSnapshot } from './routes/tla/getFileSnapshot' import { type Environment } from './types' @@ -64,16 +65,13 @@ export const adminRoutes = createRouter() }) .get('/app/admin/unmigrated_users_count', async (_res, env) => { const pg = createPostgresConnectionPool(env, '/app/admin/unmigrated_users_count') - const users = await getUnmigratedUsers(pg) - return json({ count: users.length }) + return json({ count: await getNumUnmigratedUsers(pg) }) }) .get('/app/admin/migrate_users_batch', async (res, env) => { let stopRequested = false // Parse query parameters for batch configuration - const batchSize = parseInt((res.query['batchSize'] as string) || '100') - const batchSleepMs = parseInt((res.query['batchSleepMs'] as string) || '100') - const maxUsers = res.query['maxUsers'] ? parseInt(res.query['maxUsers'] as string) : undefined + const sleepMs = parseInt((res.query['sleepMs'] as string) || '100') return new Response( new ReadableStream({ @@ -95,14 +93,7 @@ export const adminRoutes = createRouter() sendProgress('starting', 'Beginning batch user migration process...') - await performBatchUserMigration( - env, - sendProgress, - shouldStop, - batchSize, - batchSleepMs, - maxUsers - ) + await startUserMigration(env, sendProgress, shouldStop, sleepMs) // Send completion event const completionEvent = { @@ -397,92 +388,98 @@ async function performUserDeletion( await user.admin_delete(userRow.id) } -async function getUnmigratedUsers(pg: ReturnType) { +async function getNextUnmigratedUser(pg: ReturnType) { return await pg .selectFrom('user') .where((eb) => eb.or([eb('flags', 'not like', '%groups_backend%'), eb('flags', 'is', null)])) .select(['id', 'email', 'name']) - .execute() + .limit(1) + .executeTakeFirst() } -async function performBatchUserMigration( +async function getNumUnmigratedUsers(pg: ReturnType) { + const res = await sql<{ + count: number + }>`select count(*) from public.user where flags not like '%groups_backend%' or flags is null`.execute( + pg + ) + return res.rows[0].count +} +async function getTotalUsers(pg: ReturnType) { + const res = await sql<{ count: number }>`select count(*) from public.user`.execute(pg) + return res.rows[0].count +} + +async function startUserMigration( env: Environment, sendProgress: (step: string, message: string, details?: any) => void, shouldStop: () => boolean, - batchSize: number = 100, - batchSleepMs: number = 100, - maxUsers?: number + sleepTime: number = 100 ) { const pg = createPostgresConnectionPool(env, '/app/admin/migrate_users_batch') sendProgress('query', 'Fetching users without groups_backend flag...') - const allUsersToMigrate = await getUnmigratedUsers(pg) + const usersToMigrate = await getNumUnmigratedUsers(pg) + const totalUsers = await getTotalUsers(pg) + let successCount = 0 + let failureCount = 0 - // Limit the number of users if maxUsers is specified - const usersToMigrate = maxUsers ? allUsersToMigrate.slice(0, maxUsers) : allUsersToMigrate + function getStats() { + return { + totalUsers, + usersToMigrate, + successCount, + failureCount, + progress: successCount / usersToMigrate, + } + } - const totalUsers = usersToMigrate.length - const totalAvailable = allUsersToMigrate.length - sendProgress( - 'query', - maxUsers - ? `Found ${totalAvailable} users to migrate (limiting to ${totalUsers})` - : `Found ${totalUsers} users to migrate`, - { totalUsers, totalAvailable } - ) + sendProgress('query', `${usersToMigrate}/${totalUsers} users left to migrate`, getStats()) - if (totalUsers === 0) { + if (usersToMigrate === 0) { sendProgress('complete', 'No users to migrate') return } - let successCount = 0 - let failureCount = 0 const failures: Array<{ userId: string; email: string; error: string }> = [] // Process users in batches - for (let i = 0; i < usersToMigrate.length; i++) { + while (true) { + const userRow = await getNextUnmigratedUser(pg) + if (!userRow) { + break + } + // Check if we should stop if (shouldStop()) { - sendProgress('stopped', 'Migration stopped by user', { - totalUsers, - successCount, - failureCount, - processed: i, - remaining: totalUsers - i, - }) + sendProgress('stopped', 'Migration stopped by user', getStats()) break } - const userRow = usersToMigrate[i] - const progress = i + 1 - - sendProgress('migrating', `Migrating user ${progress}/${totalUsers}: ${userRow.email}`, { + sendProgress('migrating', `Migrating user ${userRow.email}`, { userId: userRow.id, email: userRow.email, - progress, - totalUsers, - successCount, - failureCount, + ...getStats(), }) try { const user = getUserDurableObject(env, userRow.id) - const result = await user.admin_migrateToGroups(userRow.id, uniqueId()) + + const result = await sql<{ + files_migrated: number + pinned_files_migrated: number + flag_added: boolean + }>`SELECT * FROM migrate_user_to_groups(${userRow.id}, ${uniqueId()})`.execute(pg) + await user.admin_forceHardReboot(userRow.id) successCount++ - sendProgress( - 'success', - `Successfully migrated user ${progress}/${totalUsers}: ${userRow.email}`, - { - userId: userRow.id, - email: userRow.email, - result, - successCount, - failureCount, - } - ) + sendProgress('success', `Successfully migrated user ${userRow.email}`, { + userId: userRow.id, + email: userRow.email, + result: result.rows[0], + ...getStats(), + }) } catch (error) { failureCount++ const errorMessage = error instanceof Error ? error.message : String(error) @@ -492,35 +489,19 @@ async function performBatchUserMigration( error: errorMessage, }) - sendProgress( - 'failure', - `Failed to migrate user ${progress}/${totalUsers}: ${userRow.email}`, - { - userId: userRow.id, - email: userRow.email, - error: errorMessage, - successCount, - failureCount, - } - ) + sendProgress('failure', `Failed to migrate ${userRow.email}`, { + userId: userRow.id, + email: userRow.email, + error: errorMessage, + ...getStats(), + }) } // Brief pause between migrations to avoid overwhelming the system - if ((i + 1) % batchSize === 0) { - sendProgress('batch_complete', `Completed ${i + 1} of ${totalUsers} users`, { - progress: i + 1, - totalUsers, - successCount, - failureCount, - }) - await sleep(batchSleepMs) - } + await sleep(sleepTime) } sendProgress('summary', 'Migration batch complete', { - totalUsers, - successCount, - failureCount, failures: failures.length > 0 ? failures : undefined, }) }