Skip to content

Commit fb5d6dd

Browse files
authored
incremental migration: use a single sql connection (tldraw#7052)
after trying this in staging yesterday we noticed a spike in sql connections. I had cursor hoist the connection to the parent process and clean up the migration script a little. ### Change type - [x] `other` <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Switches batch user migration to a single-connection, sequential flow with per-user sleep, new stats/progress, and matching UI/API changes. > > - **Backend (sync-worker)**: > - Replace batch migration with sequential per-user loop using one PG pool; add `sleepMs` param only. > - Introduce `startUserMigration`, `getNextUnmigratedUser`, `getNumUnmigratedUsers`, `getTotalUsers` helpers. > - Use SQL function `migrate_user_to_groups(...)` and force user DO reboot; send richer stats (`totalUsers`, `usersToMigrate`, `successCount`, `failureCount`, `progress`). > - Update SSE endpoint `/app/admin/migrate_users_batch` to emit new progress schema; `/app/admin/unmigrated_users_count` now returns DB count directly. > - **Frontend (admin.tsx)**: > - Simplify Batch Migration UI: remove `batchSize` and `maxUsers`; add `sleepMs` only. > - Update confirmation text, help copy, and stats display to show `usersToMigrate` and percentage `progress`. > - Stream handling now directly sets `stats` from SSE `details`; updated controls to start/stop accordingly. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit d86ab5f. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent fc8a52c commit fb5d6dd

2 files changed

Lines changed: 90 additions & 148 deletions

File tree

apps/dotcom/client/src/pages/admin.tsx

Lines changed: 24 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -488,13 +488,19 @@ function BatchMigrateUsersToGroups() {
488488
const [progressLog, setProgressLog] = useState<string[]>([])
489489
const [error, setError] = useState(null as string | null)
490490
const [isComplete, setIsComplete] = useState(false)
491-
const [stats, setStats] = useState({ successCount: 0, failureCount: 0, totalUsers: 0 })
491+
const [stats, setStats] = useState(
492+
{} as {
493+
successCount: number
494+
failureCount: number
495+
totalUsers: number
496+
usersToMigrate: number
497+
progress: number
498+
}
499+
)
492500
const [unmigratedCount, setUnmigratedCount] = useState<number | null>(null)
493501
const [isLoadingCount, setIsLoadingCount] = useState(false)
494502
const [eventSource, setEventSource] = useState<EventSource | null>(null)
495-
const [batchSize, setBatchSize] = useState(100)
496-
const [batchSleepMs, setBatchSleepMs] = useState(100)
497-
const [maxUsers, setMaxUsers] = useState<number | ''>('')
503+
const [sleepMs, setSleepMs] = useState(100)
498504
const logContainerRef = useRef<HTMLDivElement>(null)
499505

500506
// Cleanup EventSource on unmount
@@ -540,9 +546,7 @@ function BatchMigrateUsersToGroups() {
540546
}, [eventSource])
541547

542548
const onMigrate = useCallback(async () => {
543-
const migrationMessage = maxUsers
544-
? `Are you sure you want to migrate up to ${maxUsers} users without the groups_backend flag? This action cannot be undone.`
545-
: `Are you sure you want to migrate ALL users without the groups_backend flag? This action cannot be undone.`
549+
const migrationMessage = `Are you sure you want to migrate ALL users without the groups_backend flag? This action cannot be undone.`
546550

547551
if (!window.confirm(migrationMessage)) {
548552
return
@@ -552,16 +556,12 @@ function BatchMigrateUsersToGroups() {
552556
setError(null)
553557
setProgressLog([])
554558
setIsComplete(false)
555-
setStats({ successCount: 0, failureCount: 0, totalUsers: 0 })
559+
setStats({ successCount: 0, failureCount: 0, totalUsers: 0, usersToMigrate: 0, progress: 0 })
556560

557561
try {
558562
const params = new URLSearchParams({
559-
batchSize: batchSize.toString(),
560-
batchSleepMs: batchSleepMs.toString(),
563+
sleepMs: sleepMs.toString(),
561564
})
562-
if (maxUsers) {
563-
params.set('maxUsers', maxUsers.toString())
564-
}
565565
const es = new EventSource(`/api/app/admin/migrate_users_batch?${params}`)
566566
setEventSource(es)
567567

@@ -579,16 +579,7 @@ function BatchMigrateUsersToGroups() {
579579

580580
// Update stats from details
581581
if (data.details) {
582-
if (data.details.totalUsers !== undefined) {
583-
setStats((prev) => ({ ...prev, totalUsers: data.details.totalUsers }))
584-
}
585-
if (data.details.successCount !== undefined && data.details.failureCount !== undefined) {
586-
setStats({
587-
totalUsers: data.details.totalUsers || 0,
588-
successCount: data.details.successCount,
589-
failureCount: data.details.failureCount,
590-
})
591-
}
582+
setStats(data.details)
592583
}
593584

594585
if (data.type === 'complete') {
@@ -615,7 +606,7 @@ function BatchMigrateUsersToGroups() {
615606
setIsMigrating(false)
616607
setEventSource(null)
617608
}
618-
}, [batchSize, batchSleepMs, maxUsers])
609+
}, [sleepMs])
619610

620611
return (
621612
<div className={styles.dangerZone}>
@@ -640,55 +631,27 @@ function BatchMigrateUsersToGroups() {
640631

641632
<p className="tla-text_ui__small">
642633
This will migrate all users who don&apos;t have the groups_backend flag. The process will
643-
run sequentially and report progress in real-time. Configure the batch size (number of users
644-
processed before a pause), sleep duration (milliseconds to wait between batches), and max
645-
users (limit for incremental rollout, leave empty to migrate all) below.
634+
run sequentially (one user at a time) and report progress in real-time. Configure the sleep
635+
duration (milliseconds to wait between each user migration) below.
646636
</p>
647637

648638
{error && <div className={styles.errorMessage}>{error}</div>}
649639

650640
{/* Configuration Inputs */}
651641
<div className={styles.configContainer}>
652642
<div>
653-
<label htmlFor="batchSize">Batch size:</label>
643+
<label htmlFor="sleepMs">Sleep between migrations (ms):</label>
654644
<input
655-
id="batchSize"
645+
id="sleepMs"
656646
type="number"
657-
value={batchSize}
658-
onChange={(e) => setBatchSize(Number(e.target.value))}
659-
disabled={isMigrating}
660-
min={1}
661-
className={styles.searchInput}
662-
style={{ width: '100px', marginLeft: '8px' }}
663-
/>
664-
</div>
665-
<div>
666-
<label htmlFor="batchSleepMs">Sleep between batches (ms):</label>
667-
<input
668-
id="batchSleepMs"
669-
type="number"
670-
value={batchSleepMs}
671-
onChange={(e) => setBatchSleepMs(Number(e.target.value))}
647+
value={sleepMs}
648+
onChange={(e) => setSleepMs(Number(e.target.value))}
672649
disabled={isMigrating}
673650
min={0}
674651
className={styles.searchInput}
675652
style={{ width: '100px', marginLeft: '8px' }}
676653
/>
677654
</div>
678-
<div>
679-
<label htmlFor="maxUsers">Max users (leave empty for all):</label>
680-
<input
681-
id="maxUsers"
682-
type="number"
683-
value={maxUsers}
684-
onChange={(e) => setMaxUsers(e.target.value === '' ? '' : Number(e.target.value))}
685-
disabled={isMigrating}
686-
min={1}
687-
placeholder="All users"
688-
className={styles.searchInput}
689-
style={{ width: '100px', marginLeft: '8px' }}
690-
/>
691-
</div>
692655
</div>
693656

694657
{/* Stats Display */}
@@ -699,8 +662,8 @@ function BatchMigrateUsersToGroups() {
699662
<span className={styles.statValue}>{stats.totalUsers}</span>
700663
</div>
701664
<div className={styles.statItem}>
702-
<span className={styles.statLabel}>Completed:</span>
703-
<span className={styles.statValue}>{stats.successCount + stats.failureCount}</span>
665+
<span className={styles.statLabel}>Users to Migrate:</span>
666+
<span className={styles.statValue}>{stats.usersToMigrate}</span>
704667
</div>
705668
<div className={styles.statItem}>
706669
<span className={styles.statLabel}>Succeeded:</span>
@@ -712,9 +675,7 @@ function BatchMigrateUsersToGroups() {
712675
</div>
713676
<div className={styles.statItem}>
714677
<span className={styles.statLabel}>Progress:</span>
715-
<span className={styles.statValue}>
716-
{Math.round(((stats.successCount + stats.failureCount) / stats.totalUsers) * 100)}%
717-
</span>
678+
<span className={styles.statValue}>{(stats.progress * 100).toFixed(2)}%</span>
718679
</div>
719680
</div>
720681
)}

apps/dotcom/sync-worker/src/adminRoutes.ts

Lines changed: 66 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { TlaFile } from '@tldraw/dotcom-shared'
22
import { assert, sleep, uniqueId } from '@tldraw/utils'
33
import { createRouter } from '@tldraw/worker-shared'
44
import { StatusError, json } from 'itty-router'
5+
import { sql } from 'kysely'
56
import { createPostgresConnectionPool } from './postgres'
67
import { returnFileSnapshot } from './routes/tla/getFileSnapshot'
78
import { type Environment } from './types'
@@ -64,16 +65,13 @@ export const adminRoutes = createRouter<Environment>()
6465
})
6566
.get('/app/admin/unmigrated_users_count', async (_res, env) => {
6667
const pg = createPostgresConnectionPool(env, '/app/admin/unmigrated_users_count')
67-
const users = await getUnmigratedUsers(pg)
68-
return json({ count: users.length })
68+
return json({ count: await getNumUnmigratedUsers(pg) })
6969
})
7070
.get('/app/admin/migrate_users_batch', async (res, env) => {
7171
let stopRequested = false
7272

7373
// Parse query parameters for batch configuration
74-
const batchSize = parseInt((res.query['batchSize'] as string) || '100')
75-
const batchSleepMs = parseInt((res.query['batchSleepMs'] as string) || '100')
76-
const maxUsers = res.query['maxUsers'] ? parseInt(res.query['maxUsers'] as string) : undefined
74+
const sleepMs = parseInt((res.query['sleepMs'] as string) || '100')
7775

7876
return new Response(
7977
new ReadableStream({
@@ -95,14 +93,7 @@ export const adminRoutes = createRouter<Environment>()
9593

9694
sendProgress('starting', 'Beginning batch user migration process...')
9795

98-
await performBatchUserMigration(
99-
env,
100-
sendProgress,
101-
shouldStop,
102-
batchSize,
103-
batchSleepMs,
104-
maxUsers
105-
)
96+
await startUserMigration(env, sendProgress, shouldStop, sleepMs)
10697

10798
// Send completion event
10899
const completionEvent = {
@@ -397,92 +388,98 @@ async function performUserDeletion(
397388
await user.admin_delete(userRow.id)
398389
}
399390

400-
async function getUnmigratedUsers(pg: ReturnType<typeof createPostgresConnectionPool>) {
391+
async function getNextUnmigratedUser(pg: ReturnType<typeof createPostgresConnectionPool>) {
401392
return await pg
402393
.selectFrom('user')
403394
.where((eb) => eb.or([eb('flags', 'not like', '%groups_backend%'), eb('flags', 'is', null)]))
404395
.select(['id', 'email', 'name'])
405-
.execute()
396+
.limit(1)
397+
.executeTakeFirst()
406398
}
407399

408-
async function performBatchUserMigration(
400+
async function getNumUnmigratedUsers(pg: ReturnType<typeof createPostgresConnectionPool>) {
401+
const res = await sql<{
402+
count: number
403+
}>`select count(*) from public.user where flags not like '%groups_backend%' or flags is null`.execute(
404+
pg
405+
)
406+
return res.rows[0].count
407+
}
408+
async function getTotalUsers(pg: ReturnType<typeof createPostgresConnectionPool>) {
409+
const res = await sql<{ count: number }>`select count(*) from public.user`.execute(pg)
410+
return res.rows[0].count
411+
}
412+
413+
async function startUserMigration(
409414
env: Environment,
410415
sendProgress: (step: string, message: string, details?: any) => void,
411416
shouldStop: () => boolean,
412-
batchSize: number = 100,
413-
batchSleepMs: number = 100,
414-
maxUsers?: number
417+
sleepTime: number = 100
415418
) {
416419
const pg = createPostgresConnectionPool(env, '/app/admin/migrate_users_batch')
417420

418421
sendProgress('query', 'Fetching users without groups_backend flag...')
419422

420-
const allUsersToMigrate = await getUnmigratedUsers(pg)
423+
const usersToMigrate = await getNumUnmigratedUsers(pg)
424+
const totalUsers = await getTotalUsers(pg)
425+
let successCount = 0
426+
let failureCount = 0
421427

422-
// Limit the number of users if maxUsers is specified
423-
const usersToMigrate = maxUsers ? allUsersToMigrate.slice(0, maxUsers) : allUsersToMigrate
428+
function getStats() {
429+
return {
430+
totalUsers,
431+
usersToMigrate,
432+
successCount,
433+
failureCount,
434+
progress: successCount / usersToMigrate,
435+
}
436+
}
424437

425-
const totalUsers = usersToMigrate.length
426-
const totalAvailable = allUsersToMigrate.length
427-
sendProgress(
428-
'query',
429-
maxUsers
430-
? `Found ${totalAvailable} users to migrate (limiting to ${totalUsers})`
431-
: `Found ${totalUsers} users to migrate`,
432-
{ totalUsers, totalAvailable }
433-
)
438+
sendProgress('query', `${usersToMigrate}/${totalUsers} users left to migrate`, getStats())
434439

435-
if (totalUsers === 0) {
440+
if (usersToMigrate === 0) {
436441
sendProgress('complete', 'No users to migrate')
437442
return
438443
}
439444

440-
let successCount = 0
441-
let failureCount = 0
442445
const failures: Array<{ userId: string; email: string; error: string }> = []
443446

444447
// Process users in batches
445-
for (let i = 0; i < usersToMigrate.length; i++) {
448+
while (true) {
449+
const userRow = await getNextUnmigratedUser(pg)
450+
if (!userRow) {
451+
break
452+
}
453+
446454
// Check if we should stop
447455
if (shouldStop()) {
448-
sendProgress('stopped', 'Migration stopped by user', {
449-
totalUsers,
450-
successCount,
451-
failureCount,
452-
processed: i,
453-
remaining: totalUsers - i,
454-
})
456+
sendProgress('stopped', 'Migration stopped by user', getStats())
455457
break
456458
}
457459

458-
const userRow = usersToMigrate[i]
459-
const progress = i + 1
460-
461-
sendProgress('migrating', `Migrating user ${progress}/${totalUsers}: ${userRow.email}`, {
460+
sendProgress('migrating', `Migrating user ${userRow.email}`, {
462461
userId: userRow.id,
463462
email: userRow.email,
464-
progress,
465-
totalUsers,
466-
successCount,
467-
failureCount,
463+
...getStats(),
468464
})
469465

470466
try {
471467
const user = getUserDurableObject(env, userRow.id)
472-
const result = await user.admin_migrateToGroups(userRow.id, uniqueId())
468+
469+
const result = await sql<{
470+
files_migrated: number
471+
pinned_files_migrated: number
472+
flag_added: boolean
473+
}>`SELECT * FROM migrate_user_to_groups(${userRow.id}, ${uniqueId()})`.execute(pg)
474+
await user.admin_forceHardReboot(userRow.id)
473475

474476
successCount++
475-
sendProgress(
476-
'success',
477-
`Successfully migrated user ${progress}/${totalUsers}: ${userRow.email}`,
478-
{
479-
userId: userRow.id,
480-
email: userRow.email,
481-
result,
482-
successCount,
483-
failureCount,
484-
}
485-
)
477+
sendProgress('success', `Successfully migrated user ${userRow.email}`, {
478+
userId: userRow.id,
479+
email: userRow.email,
480+
result: result.rows[0],
481+
...getStats(),
482+
})
486483
} catch (error) {
487484
failureCount++
488485
const errorMessage = error instanceof Error ? error.message : String(error)
@@ -492,35 +489,19 @@ async function performBatchUserMigration(
492489
error: errorMessage,
493490
})
494491

495-
sendProgress(
496-
'failure',
497-
`Failed to migrate user ${progress}/${totalUsers}: ${userRow.email}`,
498-
{
499-
userId: userRow.id,
500-
email: userRow.email,
501-
error: errorMessage,
502-
successCount,
503-
failureCount,
504-
}
505-
)
492+
sendProgress('failure', `Failed to migrate ${userRow.email}`, {
493+
userId: userRow.id,
494+
email: userRow.email,
495+
error: errorMessage,
496+
...getStats(),
497+
})
506498
}
507499

508500
// Brief pause between migrations to avoid overwhelming the system
509-
if ((i + 1) % batchSize === 0) {
510-
sendProgress('batch_complete', `Completed ${i + 1} of ${totalUsers} users`, {
511-
progress: i + 1,
512-
totalUsers,
513-
successCount,
514-
failureCount,
515-
})
516-
await sleep(batchSleepMs)
517-
}
501+
await sleep(sleepTime)
518502
}
519503

520504
sendProgress('summary', 'Migration batch complete', {
521-
totalUsers,
522-
successCount,
523-
failureCount,
524505
failures: failures.length > 0 ? failures : undefined,
525506
})
526507
}

0 commit comments

Comments
 (0)