diff --git a/.specs/kiloclaw-datamodel.md b/.specs/kiloclaw-datamodel.md index 4c8522ac92..20c7fa2faa 100644 --- a/.specs/kiloclaw-datamodel.md +++ b/.specs/kiloclaw-datamodel.md @@ -392,11 +392,6 @@ not yet enforced in the current codebase: across all services that mutate subscription records. Some subscription-creation paths may already write change-log entries; complete cross-service coverage remains the intended invariant. -4. Fresh Provision Admission SHOULD be implemented in the Registry-backed - Worker admission flow before the existing web advisory lock is removed. - (Currently, web requests use transitional PostgreSQL advisory-lock - coordination that is being replaced because it is unsafe through - transaction-pooled production connections.) ## Changelog diff --git a/apps/web/src/app/(app)/claw/components/billing/ReferralRewardStatusCard.test.ts b/apps/web/src/app/(app)/claw/components/billing/ReferralRewardStatusCard.test.ts index caff4dcb7e..20d2229baa 100644 --- a/apps/web/src/app/(app)/claw/components/billing/ReferralRewardStatusCard.test.ts +++ b/apps/web/src/app/(app)/claw/components/billing/ReferralRewardStatusCard.test.ts @@ -81,8 +81,8 @@ describe('ReferralRewardStatusCard', () => { application: { appliedAt: '2026-04-10T00:05:00.000Z', subscriptionId: '11111111-1111-4111-8111-111111111111', - previousRenewalBoundary: '2026-05-01T00:00:00.000Z', - newRenewalBoundary: '2026-06-01T00:00:00.000Z', + previousRenewalBoundary: '2026-05-01T12:00:00.000Z', + newRenewalBoundary: '2026-06-01T12:00:00.000Z', }, }, { diff --git a/apps/web/src/app/(app)/claw/components/billing/ReferralRewardsSummary.test.ts b/apps/web/src/app/(app)/claw/components/billing/ReferralRewardsSummary.test.ts index 866785d436..21938983d8 100644 --- a/apps/web/src/app/(app)/claw/components/billing/ReferralRewardsSummary.test.ts +++ b/apps/web/src/app/(app)/claw/components/billing/ReferralRewardsSummary.test.ts @@ -30,15 +30,15 @@ describe('ReferralRewardsSummary', () => { role: 'referrer', appliedAt: '2026-04-10T00:05:00.000Z', monthsGranted: 1, - previousRenewalBoundary: '2026-05-01T00:00:00.000Z', - newRenewalBoundary: '2026-06-01T00:00:00.000Z', + previousRenewalBoundary: '2026-05-01T12:00:00.000Z', + newRenewalBoundary: '2026-06-01T12:00:00.000Z', }, { role: 'referee', appliedAt: '2026-04-11T00:05:00.000Z', monthsGranted: 1, - previousRenewalBoundary: '2026-06-01T00:00:00.000Z', - newRenewalBoundary: '2026-07-01T00:00:00.000Z', + previousRenewalBoundary: '2026-06-01T12:00:00.000Z', + newRenewalBoundary: '2026-07-01T12:00:00.000Z', }, ], }, diff --git a/apps/web/src/app/admin/components/KiloclawReferralsInvestigation.test.ts b/apps/web/src/app/admin/components/KiloclawReferralsInvestigation.test.ts index c4ce6d9b60..73502969ab 100644 --- a/apps/web/src/app/admin/components/KiloclawReferralsInvestigation.test.ts +++ b/apps/web/src/app/admin/components/KiloclawReferralsInvestigation.test.ts @@ -46,8 +46,8 @@ function referralRow(params: { id: `${params.referralId}-application`, beneficiaryUserId: 'referrer-1', subscriptionId: '55555555-5555-4555-8555-555555555555', - previousRenewalBoundary: '2026-05-01T00:00:00.000Z', - newRenewalBoundary: '2026-06-01T00:00:00.000Z', + previousRenewalBoundary: '2026-05-01T12:00:00.000Z', + newRenewalBoundary: '2026-06-01T12:00:00.000Z', appliedAt: '2026-04-10T00:05:00.000Z', }, ] diff --git a/apps/web/src/lib/impact/kiloclaw-referrals.test.ts b/apps/web/src/lib/impact/kiloclaw-referrals.test.ts index 53eb697740..9c9b013ef5 100644 --- a/apps/web/src/lib/impact/kiloclaw-referrals.test.ts +++ b/apps/web/src/lib/impact/kiloclaw-referrals.test.ts @@ -145,8 +145,8 @@ async function insertActivePersonalSubscription( plan: 'standard', status: 'active', current_period_start: '2026-04-01T00:00:00.000Z', - current_period_end: '2026-05-01T00:00:00.000Z', - credit_renewal_at: '2026-05-01T00:00:00.000Z', + current_period_end: '2026-05-01T12:00:00.000Z', + credit_renewal_at: '2026-05-01T12:00:00.000Z', cancel_at_period_end: false, ...overrides, }) @@ -636,7 +636,7 @@ describe('kiloclaw referrals', () => { expect(applications).toHaveLength(2); expect( applications.map(application => String(application.new_renewal_boundary)).sort() - ).toEqual(['2026-06-01 00:00:00+00', '2026-06-01 00:00:00+00']); + ).toEqual(['2026-06-01 12:00:00+00', '2026-06-01 12:00:00+00']); const subscriptions = await db .select({ @@ -650,13 +650,13 @@ describe('kiloclaw referrals', () => { expect.arrayContaining([ expect.objectContaining({ userId: referrer.id, - currentPeriodEnd: '2026-06-01 00:00:00+00', - creditRenewalAt: '2026-06-01 00:00:00+00', + currentPeriodEnd: '2026-06-01 12:00:00+00', + creditRenewalAt: '2026-06-01 12:00:00+00', }), expect.objectContaining({ userId: referee.id, - currentPeriodEnd: '2026-06-01 00:00:00+00', - creditRenewalAt: '2026-06-01 00:00:00+00', + currentPeriodEnd: '2026-06-01 12:00:00+00', + creditRenewalAt: '2026-06-01 12:00:00+00', }), ]) ); @@ -1573,7 +1573,7 @@ describe('kiloclaw referrals', () => { .select() .from(kiloclaw_subscriptions) .where(eq(kiloclaw_subscriptions.user_id, referee.id)); - expect(subscription.current_period_end).toBe('2026-05-01 00:00:00+00'); + expect(subscription.current_period_end).toBe('2026-05-01 12:00:00+00'); const refereeRewards = await db .select({ @@ -1646,7 +1646,7 @@ describe('kiloclaw referrals', () => { 'sub_referee_123', expect.objectContaining({ proration_behavior: 'none', - trial_end: Math.floor(new Date('2026-06-01T00:00:00.000Z').getTime() / 1000), + trial_end: Math.floor(new Date('2026-06-01T12:00:00.000Z').getTime() / 1000), }), expect.objectContaining({ idempotencyKey: expect.stringContaining('stripe-apply'), diff --git a/apps/web/src/lib/kilo-pass/apple-store-notifications.test.ts b/apps/web/src/lib/kilo-pass/apple-store-notifications.test.ts index ad5ad20fc1..d8e0de5b3a 100644 --- a/apps/web/src/lib/kilo-pass/apple-store-notifications.test.ts +++ b/apps/web/src/lib/kilo-pass/apple-store-notifications.test.ts @@ -55,7 +55,7 @@ function transaction( bundleId: 'com.kilocode.kiloapp', productId: 'kilopass.tier19.monthly.v1', purchaseDate: 1_777_626_000_000, - expiresDate: 1_780_218_000_000, + expiresDate: Date.parse('2030-06-01T00:00:00.000Z'), environment: 'Sandbox', rawPayload: { test: true }, ...overrides, @@ -1152,8 +1152,8 @@ describe('processAppStoreKiloPassNotification', () => { transactionId: tx1, productId: 'kilopass.tier19.monthly.v1', appAccountToken: user.app_store_account_token, - purchaseDate: Date.parse('2026-05-01T00:00:00.000Z'), - expiresDate: Date.parse('2026-05-31T00:00:00.000Z'), + purchaseDate: Date.parse('2026-06-01T00:00:00.000Z'), + expiresDate: Date.parse('2026-07-01T00:00:00.000Z'), currency: 'USD', price: 19000, }), @@ -1173,8 +1173,8 @@ describe('processAppStoreKiloPassNotification', () => { transactionId: tx2, productId: 'kilopass.tier49.monthly.v1', appAccountToken: user.app_store_account_token, - purchaseDate: Date.parse('2026-05-16T00:00:00.000Z'), - expiresDate: Date.parse('2026-06-16T00:00:00.000Z'), + purchaseDate: Date.parse('2026-06-16T00:00:00.000Z'), + expiresDate: Date.parse('2026-07-16T00:00:00.000Z'), currency: 'USD', price: 49000, }), @@ -1188,7 +1188,7 @@ describe('processAppStoreKiloPassNotification', () => { const issuance = await db.query.kilo_pass_issuances.findFirst({ where: and( eq(kilo_pass_issuances.kilo_pass_subscription_id, subscription?.id ?? ''), - eq(kilo_pass_issuances.issue_month, '2026-05-01') + eq(kilo_pass_issuances.issue_month, '2026-06-01') ), }); expect(issuance).toBeDefined(); diff --git a/apps/web/src/lib/kiloclaw/kiloclaw-internal-client.ts b/apps/web/src/lib/kiloclaw/kiloclaw-internal-client.ts index 557da41343..48546ebd03 100644 --- a/apps/web/src/lib/kiloclaw/kiloclaw-internal-client.ts +++ b/apps/web/src/lib/kiloclaw/kiloclaw-internal-client.ts @@ -303,6 +303,21 @@ export class KiloClawInternalClient { ); } + async repairProvisionReservation( + userId: string, + instanceId: string, + orgId?: string + ): Promise<{ ok: true }> { + return this.request( + '/api/platform/provision/repair-reservation', + { + method: 'POST', + body: JSON.stringify({ userId, instanceId, orgId }), + }, + { userId } + ); + } + async start( userId: string, instanceId?: string, diff --git a/apps/web/src/lib/kiloclaw/provision-error-handler.ts b/apps/web/src/lib/kiloclaw/provision-error-handler.ts new file mode 100644 index 0000000000..e1eb5f07eb --- /dev/null +++ b/apps/web/src/lib/kiloclaw/provision-error-handler.ts @@ -0,0 +1,36 @@ +import { TRPCError } from '@trpc/server'; +import { UpstreamApiError } from '@/lib/trpc/init'; +import { KiloClawApiError } from './kiloclaw-internal-client'; + +type ProvisionErrorPayload = { message?: string; code?: string }; + +type ProvisionErrorPayloadReader = (err: KiloClawApiError) => ProvisionErrorPayload; + +export function handleProvisionError(err: unknown, getPayload: ProvisionErrorPayloadReader): never { + if (err instanceof KiloClawApiError) { + const { message, code } = getPayload(err); + if ( + (err.statusCode === 409 || err.statusCode === 503) && + (code === 'provision_in_progress' || + code === 'provision_completion_pending' || + code === 'instance_already_active' || + code === 'instance_destroyed') + ) { + throw new TRPCError({ + code: 'CONFLICT', + message: + message ?? + 'An instance is already being created. Wait for setup to finish, then try again.', + cause: new UpstreamApiError(code), + }); + } + if (err.statusCode === 404 && code === 'instance_not_found') { + throw new TRPCError({ + code: 'NOT_FOUND', + message: message ?? 'No active KiloClaw instance found', + cause: new UpstreamApiError(code), + }); + } + } + throw err; +} diff --git a/apps/web/src/lib/kiloclaw/provision-lock.test.ts b/apps/web/src/lib/kiloclaw/provision-lock.test.ts deleted file mode 100644 index c8683ca646..0000000000 --- a/apps/web/src/lib/kiloclaw/provision-lock.test.ts +++ /dev/null @@ -1,200 +0,0 @@ -import { beforeAll, beforeEach, describe, expect, it, jest } from '@jest/globals'; -import type { withKiloclawProvisionContextLock as WithKiloclawProvisionContextLock } from './provision-lock'; - -jest.mock('@vercel/functions', () => ({ - attachDatabasePool: jest.fn(), -})); - -jest.mock('@kilocode/db', () => ({ - computeDatabaseUrl: jest.fn(() => 'postgres://provision-lock-test'), -})); - -jest.mock('@kilocode/db/client', () => { - const connectMock = jest.fn(); - const onMock = jest.fn(); - - return { - createDrizzleClient: jest.fn(() => ({ - pool: { - connect: connectMock, - on: onMock, - }, - })), - __connectMock: connectMock, - }; -}); - -type AsyncMock = jest.Mock<(...args: unknown[]) => Promise>; -type ReleaseMock = jest.Mock<(...args: unknown[]) => void>; -type ProvisionLockClientMock = { - query: AsyncMock; - release: ReleaseMock; -}; -type DrizzleClientModuleMock = { - __connectMock: AsyncMock; -}; - -const { __connectMock: connectMock } = - jest.requireMock('@kilocode/db/client'); - -let withKiloclawProvisionContextLock: typeof WithKiloclawProvisionContextLock; - -beforeAll(async () => { - const provisionLockModule = await import('./provision-lock'); - withKiloclawProvisionContextLock = provisionLockModule.withKiloclawProvisionContextLock; -}); - -function createProvisionLockClient(): ProvisionLockClientMock { - return { - query: jest.fn<(...args: unknown[]) => Promise>(), - release: jest.fn<(...args: unknown[]) => void>(), - }; -} - -function queueProvisionLockClient(client: ProvisionLockClientMock): void { - connectMock.mockResolvedValueOnce(client); -} - -function prepareSuccessfulLockLifecycle(client: ProvisionLockClientMock): void { - client.query - .mockResolvedValueOnce({ rows: [] }) - .mockResolvedValueOnce({ rows: [{ unlocked: true }] }); -} - -describe('withKiloclawProvisionContextLock', () => { - beforeEach(() => { - connectMock.mockReset(); - }); - - it('acquires the context lock, runs work, unlocks, and returns a reusable client', async () => { - const client = createProvisionLockClient(); - const work = jest.fn(async () => 'provisioned'); - prepareSuccessfulLockLifecycle(client); - queueProvisionLockClient(client); - - await expect(withKiloclawProvisionContextLock('normal-key', work)).resolves.toBe('provisioned'); - - expect(work).toHaveBeenCalledTimes(1); - expect(client.query).toHaveBeenNthCalledWith(1, 'SELECT pg_advisory_lock(hashtext($1))', [ - 'normal-key', - ]); - expect(client.query).toHaveBeenNthCalledWith( - 2, - 'SELECT pg_advisory_unlock(hashtext($1)) AS unlocked', - ['normal-key'] - ); - expect(client.release.mock.calls).toEqual([[]]); - }); - - it('unlocks and returns a reusable client when work throws', async () => { - const client = createProvisionLockClient(); - prepareSuccessfulLockLifecycle(client); - queueProvisionLockClient(client); - - await expect( - withKiloclawProvisionContextLock('work-failure-key', async () => { - throw new Error('provision work failed'); - }) - ).rejects.toThrow('provision work failed'); - - expect(client.query).toHaveBeenNthCalledWith( - 2, - 'SELECT pg_advisory_unlock(hashtext($1)) AS unlocked', - ['work-failure-key'] - ); - expect(client.release.mock.calls).toEqual([[]]); - }); - - it('discards the client while preserving the work result when unlock throws', async () => { - const client = createProvisionLockClient(); - const consoleErrorSpy = jest.spyOn(console, 'error').mockImplementation(() => undefined); - client.query - .mockResolvedValueOnce({ rows: [] }) - .mockRejectedValueOnce(new Error('unlock transport failed')); - queueProvisionLockClient(client); - - await expect( - withKiloclawProvisionContextLock('unlock-throw-key', async () => 'completed') - ).resolves.toBe('completed'); - - expect(client.release).toHaveBeenCalledWith(true); - expect(consoleErrorSpy).toHaveBeenCalledWith( - '[kiloclaw] Failed to release provision context lock', - { - lockKey: 'unlock-throw-key', - error: 'unlock transport failed', - } - ); - consoleErrorSpy.mockRestore(); - }); - - it('discards the client when PostgreSQL does not confirm unlock cleanup', async () => { - const client = createProvisionLockClient(); - const consoleErrorSpy = jest.spyOn(console, 'error').mockImplementation(() => undefined); - client.query - .mockResolvedValueOnce({ rows: [] }) - .mockResolvedValueOnce({ rows: [{ unlocked: false }] }); - queueProvisionLockClient(client); - - await expect( - withKiloclawProvisionContextLock('unlock-false-key', async () => 'completed') - ).resolves.toBe('completed'); - - expect(client.release).toHaveBeenCalledWith(true); - expect(consoleErrorSpy).toHaveBeenCalledWith( - '[kiloclaw] Failed to release provision context lock', - { - lockKey: 'unlock-false-key', - error: 'PostgreSQL did not confirm provision context lock release', - } - ); - consoleErrorSpy.mockRestore(); - }); - - it('discards the client and skips work when lock acquisition times out', async () => { - const client = createProvisionLockClient(); - const work = jest.fn(async () => 'must not run'); - client.query.mockRejectedValueOnce(new Error('canceling statement due to statement timeout')); - queueProvisionLockClient(client); - - await expect(withKiloclawProvisionContextLock('acquire-timeout-key', work)).rejects.toThrow( - 'canceling statement due to statement timeout' - ); - - expect(work).not.toHaveBeenCalled(); - expect(client.query).toHaveBeenCalledTimes(1); - expect(client.release).toHaveBeenCalledWith(true); - }); - - it('prevents later checkouts from reusing a client whose unlock state is uncertain', async () => { - const contaminatedClient = createProvisionLockClient(); - const cleanClient = createProvisionLockClient(); - const consoleErrorSpy = jest.spyOn(console, 'error').mockImplementation(() => undefined); - let contaminatedClientDiscarded = false; - - contaminatedClient.query - .mockResolvedValueOnce({ rows: [] }) - .mockRejectedValueOnce(new Error('unlock cancelled')); - contaminatedClient.release.mockImplementation(discard => { - contaminatedClientDiscarded = discard === true; - }); - prepareSuccessfulLockLifecycle(cleanClient); - connectMock.mockImplementation(async () => - contaminatedClientDiscarded ? cleanClient : contaminatedClient - ); - - await expect( - withKiloclawProvisionContextLock('contaminated-key', async () => 'first') - ).resolves.toBe('first'); - await expect(withKiloclawProvisionContextLock('clean-key', async () => 'second')).resolves.toBe( - 'second' - ); - - expect(contaminatedClient.release).toHaveBeenCalledWith(true); - expect(cleanClient.query).toHaveBeenNthCalledWith(1, 'SELECT pg_advisory_lock(hashtext($1))', [ - 'clean-key', - ]); - expect(cleanClient.release.mock.calls).toEqual([[]]); - consoleErrorSpy.mockRestore(); - }); -}); diff --git a/apps/web/src/lib/kiloclaw/provision-lock.ts b/apps/web/src/lib/kiloclaw/provision-lock.ts deleted file mode 100644 index dbf8105289..0000000000 --- a/apps/web/src/lib/kiloclaw/provision-lock.ts +++ /dev/null @@ -1,90 +0,0 @@ -import 'server-only'; - -import { attachDatabasePool } from '@vercel/functions'; -import { computeDatabaseUrl } from '@kilocode/db'; -import { createDrizzleClient } from '@kilocode/db/client'; - -export function getPersonalProvisionLockKey(userId: string): string { - return `kiloclaw:provision:personal:${userId}`; -} - -export function getOrganizationProvisionLockKey(userId: string, organizationId: string): string { - return `kiloclaw:provision:org:${userId}:${organizationId}`; -} - -const DEFAULT_PROVISION_LOCK_POOL_MAX = 16; - -function getProvisionLockPoolMax(): number { - const parsed = Number.parseInt(process.env.KILOCLAW_PROVISION_LOCK_POOL_MAX || '', 10); - if (!Number.isFinite(parsed) || parsed < 1) { - return DEFAULT_PROVISION_LOCK_POOL_MAX; - } - return parsed; -} - -const provisionLockClient = createDrizzleClient({ - connectionString: computeDatabaseUrl(), - poolConfig: { - max: getProvisionLockPoolMax(), - idleTimeoutMillis: 1_000, - connectionTimeoutMillis: Number.parseInt(process.env.POSTGRES_CONNECT_TIMEOUT || '30000'), - application_name: 'kilocode-web-kiloclaw-provision-lock', - }, -}); - -if (process.env.NODE_ENV !== 'test') { - attachDatabasePool(provisionLockClient.pool); -} - -provisionLockClient.pool.on('error', err => { - console.error('Unexpected error on idle client (kiloclaw provision lock)', err); -}); - -export async function withKiloclawProvisionContextLock( - lockKey: string, - work: () => Promise -): Promise { - const client = await provisionLockClient.pool.connect(); - let lockAcquired = false; - let discardClient = false; - - try { - try { - await client.query('SELECT pg_advisory_lock(hashtext($1))', [lockKey]); - lockAcquired = true; - } catch (error) { - discardClient = true; - throw error; - } - - return await work(); - } finally { - if (lockAcquired) { - try { - const unlockResult = await client.query<{ unlocked: boolean }>( - 'SELECT pg_advisory_unlock(hashtext($1)) AS unlocked', - [lockKey] - ); - if (unlockResult.rows[0]?.unlocked !== true) { - discardClient = true; - console.error('[kiloclaw] Failed to release provision context lock', { - lockKey, - error: 'PostgreSQL did not confirm provision context lock release', - }); - } - } catch (error) { - discardClient = true; - console.error('[kiloclaw] Failed to release provision context lock', { - lockKey, - error: error instanceof Error ? error.message : String(error), - }); - } - } - - if (discardClient) { - client.release(true); - } else { - client.release(); - } - } -} diff --git a/apps/web/src/lib/kiloclaw/types.ts b/apps/web/src/lib/kiloclaw/types.ts index cd5d859021..fb86ba4e66 100644 --- a/apps/web/src/lib/kiloclaw/types.ts +++ b/apps/web/src/lib/kiloclaw/types.ts @@ -263,6 +263,17 @@ export type RegistryResult = { createdAt: string; destroyedAt: string | null; }>; + reservations: Array<{ + instanceId: string; + doKey: string; + assignedUserId: string; + status: 'in_progress' | 'completed' | 'failed_requires_reconciliation' | 'released'; + startedAt: string; + updatedAt: string; + completedAt: string | null; + failureCode: string | null; + resolutionReason: string | null; + }>; migrated: boolean; }; diff --git a/apps/web/src/routers/kiloclaw-billing-router.test.ts b/apps/web/src/routers/kiloclaw-billing-router.test.ts index 16f09c77b5..4baff13a1f 100644 --- a/apps/web/src/routers/kiloclaw-billing-router.test.ts +++ b/apps/web/src/routers/kiloclaw-billing-router.test.ts @@ -188,6 +188,9 @@ const stripePriceIdsMock = jest.requireMock<{ const kiloclawInternalClientMock = jest.requireMock( '@/lib/kiloclaw/kiloclaw-internal-client' ); +const { KiloClawApiError: MockKiloClawApiError } = jest.requireMock<{ + KiloClawApiError: new (statusCode: number, responseBody: string) => Error; +}>('@/lib/kiloclaw/kiloclaw-internal-client'); beforeAll(async () => { const mod = await import('@/routers/test-utils'); @@ -816,6 +819,46 @@ describe('provision detached personal billing recovery', () => { expect(kiloclawInternalClientMock.__provisionMock).not.toHaveBeenCalled(); }); + + it.each(['provision', 'updateConfig'] as const)( + 'maps Worker admission conflicts from kiloclaw.%s', + async procedure => { + kiloclawInternalClientMock.__provisionMock.mockRejectedValueOnce( + new MockKiloClawApiError( + 409, + JSON.stringify({ + error: + 'An instance is already being created. Wait for setup to finish, then try again.', + code: 'provision_in_progress', + }) + ) + ); + + const caller = await createCallerForUser(user.id); + await expect(caller.kiloclaw[procedure]({})).rejects.toMatchObject({ + code: 'CONFLICT', + message: 'An instance is already being created. Wait for setup to finish, then try again.', + }); + } + ); + + it.each(['provision', 'updateConfig'] as const)( + 'maps missing Worker instances from kiloclaw.%s', + async procedure => { + kiloclawInternalClientMock.__provisionMock.mockRejectedValueOnce( + new MockKiloClawApiError( + 404, + JSON.stringify({ error: 'Active instance not found', code: 'instance_not_found' }) + ) + ); + + const caller = await createCallerForUser(user.id); + await expect(caller.kiloclaw[procedure]({})).rejects.toMatchObject({ + code: 'NOT_FOUND', + message: 'Active instance not found', + }); + } + ); }); describe('requireKiloClawAccess', () => { diff --git a/apps/web/src/routers/kiloclaw-router.ts b/apps/web/src/routers/kiloclaw-router.ts index c16f8239be..189d4f7395 100644 --- a/apps/web/src/routers/kiloclaw-router.ts +++ b/apps/web/src/routers/kiloclaw-router.ts @@ -72,11 +72,8 @@ import { workerInstanceId, type ActiveKiloClawInstance, } from '@/lib/kiloclaw/instance-registry'; -import { - getPersonalProvisionLockKey, - withKiloclawProvisionContextLock, -} from '@/lib/kiloclaw/provision-lock'; import { encryptProvisionSecretsForWorker } from '@/lib/kiloclaw/provision-secrets'; +import { handleProvisionError } from '@/lib/kiloclaw/provision-error-handler'; import { clearSubscriptionLifecycleAfterInstanceDestroy, clearTrialInactivityStopAfterStart, @@ -1112,28 +1109,30 @@ async function provisionInstance( : undefined; const client = new KiloClawInternalClient(); - const result = await client.provision( - user.id, - { - envVars: input.envVars, - encryptedSecrets, - channels: buildWorkerChannels(input.channels), - kilocodeApiKey, - kilocodeApiKeyExpiresAt, - kilocodeDefaultModel: input.kilocodeDefaultModel ?? undefined, - userTimezone: input.userTimezone === undefined ? undefined : input.userTimezone, - userLocation: input.userLocation === undefined ? undefined : input.userLocation, - pinnedImageTag, - }, - params.instanceId - ? { - instanceId: params.instanceId, - bootstrapSubscription: params.bootstrapSubscription, - } - : undefined - ); - - return result; + try { + return await client.provision( + user.id, + { + envVars: input.envVars, + encryptedSecrets, + channels: buildWorkerChannels(input.channels), + kilocodeApiKey, + kilocodeApiKeyExpiresAt, + kilocodeDefaultModel: input.kilocodeDefaultModel ?? undefined, + userTimezone: input.userTimezone === undefined ? undefined : input.userTimezone, + userLocation: input.userLocation === undefined ? undefined : input.userLocation, + pinnedImageTag, + }, + params.instanceId + ? { + instanceId: params.instanceId, + bootstrapSubscription: params.bootstrapSubscription, + } + : undefined + ); + } catch (error) { + handleProvisionError(error, getKiloClawApiErrorPayload); + } } async function emitProvisionTrialStartSideEffects(params: { @@ -3273,25 +3272,20 @@ export const kiloclawRouter = createTRPCRouter({ // Explicit lifecycle APIs provision: baseProcedure.input(updateConfigSchema).mutation(async ({ ctx, input }) => { - return await withKiloclawProvisionContextLock( - getPersonalProvisionLockKey(ctx.user.id), - async () => { - const { instanceId, bootstrapSubscription, shouldEnqueueTrialStartAffiliate } = - await ensureProvisionAccess(ctx.user.id, ctx.user.google_user_email); - const result = await provisionInstance(ctx.user, input, { - instanceId, - bootstrapSubscription, - }); - if (shouldEnqueueTrialStartAffiliate) { - await emitProvisionTrialStartSideEffects({ - userId: ctx.user.id, - userEmail: ctx.user.google_user_email, - instanceId: result.instanceId, - }); - } - return result; - } - ); + const { instanceId, bootstrapSubscription, shouldEnqueueTrialStartAffiliate } = + await ensureProvisionAccess(ctx.user.id, ctx.user.google_user_email); + const result = await provisionInstance(ctx.user, input, { + instanceId, + bootstrapSubscription, + }); + if (shouldEnqueueTrialStartAffiliate) { + await emitProvisionTrialStartSideEffects({ + userId: ctx.user.id, + userEmail: ctx.user.google_user_email, + instanceId: result.instanceId, + }); + } + return result; }), patchConfig: clawAccessProcedure @@ -3303,25 +3297,20 @@ export const kiloclawRouter = createTRPCRouter({ // Backward-compatible alias — uses the same trial-bootstrap flow as provision // so first-time callers can create a trial row (clawAccessProcedure would reject them). updateConfig: baseProcedure.input(updateConfigSchema).mutation(async ({ ctx, input }) => { - return await withKiloclawProvisionContextLock( - getPersonalProvisionLockKey(ctx.user.id), - async () => { - const { instanceId, bootstrapSubscription, shouldEnqueueTrialStartAffiliate } = - await ensureProvisionAccess(ctx.user.id, ctx.user.google_user_email); - const result = await provisionInstance(ctx.user, input, { - instanceId, - bootstrapSubscription, - }); - if (shouldEnqueueTrialStartAffiliate) { - await emitProvisionTrialStartSideEffects({ - userId: ctx.user.id, - userEmail: ctx.user.google_user_email, - instanceId: result.instanceId, - }); - } - return result; - } - ); + const { instanceId, bootstrapSubscription, shouldEnqueueTrialStartAffiliate } = + await ensureProvisionAccess(ctx.user.id, ctx.user.google_user_email); + const result = await provisionInstance(ctx.user, input, { + instanceId, + bootstrapSubscription, + }); + if (shouldEnqueueTrialStartAffiliate) { + await emitProvisionTrialStartSideEffects({ + userId: ctx.user.id, + userEmail: ctx.user.google_user_email, + instanceId: result.instanceId, + }); + } + return result; }), updateKiloCodeConfig: clawAccessProcedure diff --git a/apps/web/src/routers/organizations/organization-kiloclaw-router.test.ts b/apps/web/src/routers/organizations/organization-kiloclaw-router.test.ts index 61c07ef230..a037f0558f 100644 --- a/apps/web/src/routers/organizations/organization-kiloclaw-router.test.ts +++ b/apps/web/src/routers/organizations/organization-kiloclaw-router.test.ts @@ -30,6 +30,7 @@ type KiloClawClientMock = { __getLatestVersionForInstanceMock: AnyMock; __patchWebSearchConfigMock: AnyMock; __provisionMock: AnyMock; + __repairProvisionReservationMock: AnyMock; __restartGatewayProcessMock: AnyMock; __startMock: AnyMock; __stopMock: AnyMock; @@ -79,6 +80,7 @@ jest.mock('@/lib/kiloclaw/kiloclaw-internal-client', () => { const getLatestVersionForInstanceMock = jest.fn(); const patchWebSearchConfigMock = jest.fn(); const provisionMock = jest.fn(); + const repairProvisionReservationMock = (jest.fn() as AnyMock).mockResolvedValue({ ok: true }); const restartGatewayProcessMock = jest.fn(); const startMock = jest.fn(); const stopMock = jest.fn(); @@ -90,6 +92,7 @@ jest.mock('@/lib/kiloclaw/kiloclaw-internal-client', () => { getLatestVersionForInstance: getLatestVersionForInstanceMock, patchWebSearchConfig: patchWebSearchConfigMock, provision: provisionMock, + repairProvisionReservation: repairProvisionReservationMock, restartGatewayProcess: restartGatewayProcessMock, start: startMock, stop: stopMock, @@ -109,6 +112,7 @@ jest.mock('@/lib/kiloclaw/kiloclaw-internal-client', () => { __getLatestVersionForInstanceMock: getLatestVersionForInstanceMock, __patchWebSearchConfigMock: patchWebSearchConfigMock, __provisionMock: provisionMock, + __repairProvisionReservationMock: repairProvisionReservationMock, __restartGatewayProcessMock: restartGatewayProcessMock, __startMock: startMock, __stopMock: stopMock, @@ -129,6 +133,9 @@ jest.mock('@/lib/kiloclaw/kiloclaw-user-client', () => { const kiloclawClientMock = jest.requireMock( '@/lib/kiloclaw/kiloclaw-internal-client' ); +const { KiloClawApiError: MockKiloClawApiError } = jest.requireMock<{ + KiloClawApiError: new (statusCode: number, responseBody: string) => Error; +}>('@/lib/kiloclaw/kiloclaw-internal-client'); const kiloclawUserClientMock = jest.requireMock( '@/lib/kiloclaw/kiloclaw-user-client' ); @@ -453,6 +460,98 @@ describe('organizations.kiloclaw.provision trial entitlement gate', () => { message: 'Organization KiloClaw entitlement has expired.', }); }); + + it('repairs reservation finalization when an active organization instance already exists', async () => { + const user = await insertTestUser({ + google_user_email: `org-kiloclaw-provision-existing-${crypto.randomUUID()}@example.com`, + }); + const organization = await createOrganization('Org KiloClaw Existing Provision Test', user.id); + const instanceId = await createActiveOrgInstance(user.id, organization.id); + + const caller = await createCallerForUser(user.id); + await expect( + caller.organizations.kiloclaw.provision({ organizationId: organization.id }) + ).rejects.toMatchObject({ code: 'CONFLICT' }); + expect(kiloclawClientMock.__repairProvisionReservationMock).toHaveBeenCalledWith( + user.id, + instanceId, + organization.id + ); + }); + + it('surfaces finalization pending when existing organization repair fails', async () => { + const user = await insertTestUser({ + google_user_email: `org-kiloclaw-repair-pending-${crypto.randomUUID()}@example.com`, + }); + const organization = await createOrganization('Org KiloClaw Repair Pending Test', user.id); + await createActiveOrgInstance(user.id, organization.id); + kiloclawClientMock.__repairProvisionReservationMock.mockRejectedValueOnce( + new MockKiloClawApiError( + 503, + JSON.stringify({ + error: 'Provisioning completed but finalization is pending', + code: 'provision_completion_pending', + }) + ) + ); + + const caller = await createCallerForUser(user.id); + await expect( + caller.organizations.kiloclaw.provision({ organizationId: organization.id }) + ).rejects.toMatchObject({ + code: 'CONFLICT', + message: 'Provisioning completed but finalization is pending', + }); + }); + + it('maps finalization pending errors during organization config updates', async () => { + const user = await insertTestUser({ + google_user_email: `org-kiloclaw-update-pending-${crypto.randomUUID()}@example.com`, + }); + const organization = await createOrganization('Org KiloClaw Update Pending Test', user.id); + await createActiveOrgInstance(user.id, organization.id); + kiloclawClientMock.__provisionMock.mockRejectedValueOnce( + new MockKiloClawApiError( + 503, + JSON.stringify({ + error: 'Provisioning completed but finalization is pending', + code: 'provision_completion_pending', + }) + ) + ); + + const caller = await createCallerForUser(user.id); + await expect( + caller.organizations.kiloclaw.updateConfig({ organizationId: organization.id }) + ).rejects.toMatchObject({ + code: 'CONFLICT', + message: 'Provisioning completed but finalization is pending', + }); + }); + + it('maps Worker fresh-provision admission conflicts', async () => { + const user = await insertTestUser({ + google_user_email: `org-kiloclaw-provision-conflict-${crypto.randomUUID()}@example.com`, + }); + const organization = await createOrganization('Org KiloClaw Provision Conflict Test', user.id); + kiloclawClientMock.__provisionMock.mockRejectedValueOnce( + new MockKiloClawApiError( + 409, + JSON.stringify({ + error: 'An instance is already being created. Wait for setup to finish, then try again.', + code: 'provision_in_progress', + }) + ) + ); + + const caller = await createCallerForUser(user.id); + await expect( + caller.organizations.kiloclaw.provision({ organizationId: organization.id }) + ).rejects.toMatchObject({ + code: 'CONFLICT', + message: 'An instance is already being created. Wait for setup to finish, then try again.', + }); + }); }); describe('organizations.kiloclaw compute entitlement gates', () => { diff --git a/apps/web/src/routers/organizations/organization-kiloclaw-router.ts b/apps/web/src/routers/organizations/organization-kiloclaw-router.ts index e9770cf2cb..b054047ac7 100644 --- a/apps/web/src/routers/organizations/organization-kiloclaw-router.ts +++ b/apps/web/src/routers/organizations/organization-kiloclaw-router.ts @@ -49,11 +49,8 @@ import { workerInstanceId, } from '@/lib/kiloclaw/instance-registry'; import { clearSubscriptionLifecycleAfterInstanceDestroy } from '@/lib/kiloclaw/instance-lifecycle'; -import { - getOrganizationProvisionLockKey, - withKiloclawProvisionContextLock, -} from '@/lib/kiloclaw/provision-lock'; import { encryptProvisionSecretsForWorker } from '@/lib/kiloclaw/provision-secrets'; +import { handleProvisionError } from '@/lib/kiloclaw/provision-error-handler'; import { organizationMemberProcedure, organizationMemberMutationProcedure, @@ -453,56 +450,65 @@ export const organizationKiloclawRouter = createTRPCRouter({ .input(updateConfigSchema) .mutation(async ({ ctx, input }) => { await requireOrganizationKiloClawComputeEntitlement(input.organizationId); - return await withKiloclawProvisionContextLock( - getOrganizationProvisionLockKey(ctx.user.id, input.organizationId), - async () => { - const existing = await getActiveOrgInstance(ctx.user.id, input.organizationId); - if (existing) { - throw new TRPCError({ - code: 'CONFLICT', - message: 'You already have an active KiloClaw instance in this organization', - }); + const existing = await getActiveOrgInstance(ctx.user.id, input.organizationId); + if (existing) { + const client = new KiloClawInternalClient(); + try { + await client.repairProvisionReservation(ctx.user.id, existing.id, input.organizationId); + } catch (error) { + if (error instanceof KiloClawApiError) { + const { code } = getKiloClawApiErrorPayload(error); + if (code !== 'provision_repair_unavailable') + handleProvisionError(error, getKiloClawApiErrorPayload); + } else { + throw error; } + } + throw new TRPCError({ + code: 'CONFLICT', + message: 'You already have an active KiloClaw instance in this organization', + }); + } - const encryptedSecrets = encryptProvisionSecretsForWorker(input.secrets); - - const expiresInSeconds = TOKEN_EXPIRY.thirtyDays; - const kilocodeApiKey = generateApiToken(ctx.user, undefined, { - expiresIn: expiresInSeconds, - }); - const kilocodeApiKeyExpiresAt = new Date( - Date.now() + expiresInSeconds * 1000 - ).toISOString(); + const encryptedSecrets = encryptProvisionSecretsForWorker(input.secrets); + const expiresInSeconds = TOKEN_EXPIRY.thirtyDays; + const kilocodeApiKey = generateApiToken(ctx.user, undefined, { + expiresIn: expiresInSeconds, + }); + const kilocodeApiKeyExpiresAt = new Date(Date.now() + expiresInSeconds * 1000).toISOString(); - const client = new KiloClawInternalClient(); - const result = await client.provision( - ctx.user.id, - { - envVars: input.envVars, - encryptedSecrets, - channels: buildWorkerChannels(input.channels), - kilocodeApiKey, - kilocodeApiKeyExpiresAt, - kilocodeDefaultModel: input.kilocodeDefaultModel ?? undefined, - userTimezone: input.userTimezone === undefined ? undefined : input.userTimezone, - userLocation: input.userLocation === undefined ? undefined : input.userLocation, - }, - { orgId: input.organizationId } - ); + const client = new KiloClawInternalClient(); + let result: Awaited>; + try { + result = await client.provision( + ctx.user.id, + { + envVars: input.envVars, + encryptedSecrets, + channels: buildWorkerChannels(input.channels), + kilocodeApiKey, + kilocodeApiKeyExpiresAt, + kilocodeDefaultModel: input.kilocodeDefaultModel ?? undefined, + userTimezone: input.userTimezone === undefined ? undefined : input.userTimezone, + userLocation: input.userLocation === undefined ? undefined : input.userLocation, + }, + { orgId: input.organizationId } + ); + } catch (error) { + handleProvisionError(error, getKiloClawApiErrorPayload); + } - PostHogClient().capture({ - distinctId: ctx.user.google_user_email, - event: 'claw_org_instance_provisioned', - properties: { - user_id: ctx.user.id, - organization_id: input.organizationId, - instance_id: result.instanceId, - }, - }); + PostHogClient().capture({ + distinctId: ctx.user.google_user_email, + event: 'claw_org_instance_provisioned', + properties: { + user_id: ctx.user.id, + organization_id: input.organizationId, + instance_id: result.instanceId, + }, + }); - return result; - } - ); + return result; }), updateConfig: organizationMemberProcedure @@ -527,23 +533,25 @@ export const organizationKiloclawRouter = createTRPCRouter({ .limit(1); const client = new KiloClawInternalClient(); - const result = await client.provision( - ctx.user.id, - { - envVars: input.envVars, - encryptedSecrets, - channels: buildWorkerChannels(input.channels), - kilocodeApiKey, - kilocodeApiKeyExpiresAt, - kilocodeDefaultModel: input.kilocodeDefaultModel ?? undefined, - userTimezone: input.userTimezone === undefined ? undefined : input.userTimezone, - userLocation: input.userLocation === undefined ? undefined : input.userLocation, - pinnedImageTag: pin?.image_tag, - }, - { instanceId: instance.id, orgId: input.organizationId } - ); - - return result; + try { + return await client.provision( + ctx.user.id, + { + envVars: input.envVars, + encryptedSecrets, + channels: buildWorkerChannels(input.channels), + kilocodeApiKey, + kilocodeApiKeyExpiresAt, + kilocodeDefaultModel: input.kilocodeDefaultModel ?? undefined, + userTimezone: input.userTimezone === undefined ? undefined : input.userTimezone, + userLocation: input.userLocation === undefined ? undefined : input.userLocation, + pinnedImageTag: pin?.image_tag, + }, + { instanceId: instance.id, orgId: input.organizationId } + ); + } catch (error) { + handleProvisionError(error, getKiloClawApiErrorPayload); + } }), start: organizationMemberProcedure.mutation(async ({ ctx, input }) => { diff --git a/services/kiloclaw/AGENTS.md b/services/kiloclaw/AGENTS.md index 75e20657bd..5b9f2fb904 100644 --- a/services/kiloclaw/AGENTS.md +++ b/services/kiloclaw/AGENTS.md @@ -16,7 +16,41 @@ These are non-negotiable. Do not reintroduce shared/fallback paths. - **Env var name constraints.** User-provided `envVars` and `encryptedSecrets` keys must be valid shell identifiers (`/^[A-Za-z_][A-Za-z0-9_]*$/`) and must not use reserved prefixes `KILOCLAW_ENC_` or `KILOCLAW_ENV_`. Validated at schema level (ingest) and runtime (decrypt block). - **Token comparisons must be timing-safe.** Never compare auth/proxy tokens with `===`/`!==`. Use `timingSafeTokenEqual` from `controller/src/auth.ts` (or an equivalent `crypto.timingSafeEqual`-based helper) for bearer/proxy token validation. - **`kiloclaw_instances` write split.** The Worker is the sole inserter of rows (`insertProvisionedInstanceRecord`, enforced by `.specs/kiloclaw-datamodel.md` rule 21) — infrastructure is provisioned first, the row reflects it. The Worker also owns updates to a small set of DO-mirrored columns: destroy finalization (`markDestroyedInPostgresHelper`) and denormalized operational metadata (`tracked_image_tag`, `instance_type`, `admin_size_override`) written via `syncTrackedImageTagToPostgresHelper` / `syncInstanceTypeToPostgresHelper` / `syncAdminSizeOverrideToPostgresHelper`. The DO is the source of truth for these columns; the Postgres copy is a denormalized read cache for SQL-filterable admin tooling. `tracked_image_tag` and `instance_type` are written by the alarm reconciler when DO state changes; `admin_size_override` is written only on explicit admin RPC paths (set/clear/auto-clear-on-tier-resize) — there is no observation path. Next.js owns updates to ownership, lifecycle, and billing columns (`organization_id`, `name`, `inbound_email_enabled`, soft-delete via `markActiveInstanceDestroyed`, etc. in `apps/web/src/lib/kiloclaw/instance-registry.ts` and `instance-lifecycle.ts`). Next.js never inserts `kiloclaw_instances` rows. New Worker writes beyond the DO-mirrored carve-out require explicit justification — prefer a Next.js tRPC procedure unless the data fundamentally lives in the DO and is being denormalized for query-shape reasons. -- **Fresh-provision admission reservations (required rollout target).** Once the Registry reservation rollout lands, fresh instance creation MUST acquire durable, non-routable context admission in `KiloClawRegistry` before invoking `KiloClawInstance.provision()` or provider allocation. A reservation is not a Postgres instance record and grants no access. Personal admission is per user; organization admission is per assigned user within `org:{orgId}`. Pending or reconciliation-required attempts fail closed and MUST NOT be exposed through routable registry entry reads. Until that rollout is deployed, the existing web provision lock is transitional protection and must not be removed. +- **Fresh-provision admission reservations.** Fresh instance creation MUST acquire durable, non-routable context admission in `KiloClawRegistry` before invoking `KiloClawInstance.provision()` or provider allocation. A reservation is not a Postgres instance record and grants no access. Personal admission is per user; organization admission is per assigned user within `org:{orgId}`. Pending or reconciliation-required attempts fail closed and MUST NOT be exposed through routable registry entry reads. Registry admission and finalization are correctness-critical; do not treat reservation publication failure as best-effort routing metadata. + +### Fresh Provision Admission State + +``` +fresh create request + | + | beginFreshProvision (atomic unresolved-scope insert) + v +in_progress + |\ + | \ provider/row/bootstrap succeeds + | v + | completed + routable Registry entry + | | + | | finalized destroy + | v + | released + tombstone + | + \ ambiguous provider/row/bootstrap failure + v + failed_requires_reconciliation + | + | confirmed cleanup / destroy finalization + v + released + tombstone or no route +``` + +- `beginFreshProvision()` is the only entry to `in_progress`; the partial unique unresolved-scope index prevents a second fresh executor for the same owner/user context. +- `completeFreshProvision()` publishes the routable entry only after canonical instance insertion and subscription bootstrap succeed. `repairCompletedProvision()` is the idempotent recovery path when that completion acknowledgement is lost. +- `failFreshProvision()` is used only when provider side effects may exist and cleanup is not yet confirmed. `releaseFreshProvision()` is used only after confirmed cleanup or active-instance reconciliation proves the candidate is safe to abandon. +- `finalizeDestroyedInstance()` atomically tombstones the routable entry and transitions any unresolved/completed reservation to terminal `released`, so delayed repair cannot revive a destroyed route. +- `publishRecoveredInstance()` is limited to explicit subscription-recovery flows after bootstrap succeeds and refuses to publish if a destroy tombstone already exists. +- `createInstance()` remains legacy/lazy routing publication and must not be used to complete a fresh reservation. + - **DO restore from Postgres.** If DO SQLite is wiped, `start(userId)` reads the active instance row from Postgres and repopulates the DO state. This is the backup path for development mistakes that corrupt DO storage. - **Two-phase destroy.** Fly resource IDs (`pendingDestroyMachineId`, `pendingDestroyVolumeId`) are persisted before deletion attempts. DO state is only cleared when both are confirmed deleted. The alarm retries on failure. - **No machine recreation on transient errors.** `startExistingMachine()` only creates a new machine on 404 (confirmed gone). Transient Fly API errors (500, timeout) are re-thrown, not masked by duplicate creation. diff --git a/services/kiloclaw/drizzle/0001_chubby_slipstream.sql b/services/kiloclaw/drizzle/0001_chubby_slipstream.sql new file mode 100644 index 0000000000..eede2e4200 --- /dev/null +++ b/services/kiloclaw/drizzle/0001_chubby_slipstream.sql @@ -0,0 +1,13 @@ +CREATE TABLE `provision_reservations` ( + `instance_id` text PRIMARY KEY NOT NULL, + `do_key` text NOT NULL, + `assigned_user_id` text NOT NULL, + `status` text NOT NULL, + `started_at` text NOT NULL, + `updated_at` text NOT NULL, + `completed_at` text, + `failure_code` text, + `resolution_reason` text +); +--> statement-breakpoint +CREATE UNIQUE INDEX `uq_provision_reservations_unresolved_user` ON `provision_reservations` (`assigned_user_id`) WHERE "provision_reservations"."status" IN ('in_progress', 'failed_requires_reconciliation'); \ No newline at end of file diff --git a/services/kiloclaw/drizzle/meta/0001_snapshot.json b/services/kiloclaw/drizzle/meta/0001_snapshot.json new file mode 100644 index 0000000000..8ecc7a355e --- /dev/null +++ b/services/kiloclaw/drizzle/meta/0001_snapshot.json @@ -0,0 +1,145 @@ +{ + "version": "6", + "dialect": "sqlite", + "id": "e5703007-d9d0-4131-ac92-983840277e5d", + "prevId": "f488eead-1986-472a-8973-f487dc7599bf", + "tables": { + "instances": { + "name": "instances", + "columns": { + "instance_id": { + "name": "instance_id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "do_key": { + "name": "do_key", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "assigned_user_id": { + "name": "assigned_user_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "destroyed_at": { + "name": "destroyed_at", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "provision_reservations": { + "name": "provision_reservations", + "columns": { + "instance_id": { + "name": "instance_id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "do_key": { + "name": "do_key", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "assigned_user_id": { + "name": "assigned_user_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "status": { + "name": "status", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "started_at": { + "name": "started_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "updated_at": { + "name": "updated_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "completed_at": { + "name": "completed_at", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "failure_code": { + "name": "failure_code", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "resolution_reason": { + "name": "resolution_reason", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": { + "uq_provision_reservations_unresolved_user": { + "name": "uq_provision_reservations_unresolved_user", + "columns": [ + "assigned_user_id" + ], + "isUnique": true, + "where": "\"provision_reservations\".\"status\" IN ('in_progress', 'failed_requires_reconciliation')" + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + } + }, + "views": {}, + "enums": {}, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + }, + "internal": { + "indexes": {} + } +} \ No newline at end of file diff --git a/services/kiloclaw/drizzle/meta/_journal.json b/services/kiloclaw/drizzle/meta/_journal.json index 23010e462a..1a5b5a426d 100644 --- a/services/kiloclaw/drizzle/meta/_journal.json +++ b/services/kiloclaw/drizzle/meta/_journal.json @@ -8,6 +8,13 @@ "when": 1774809105002, "tag": "0000_messy_grim_reaper", "breakpoints": true + }, + { + "idx": 1, + "version": "6", + "when": 1780254551640, + "tag": "0001_chubby_slipstream", + "breakpoints": true } ] } \ No newline at end of file diff --git a/services/kiloclaw/drizzle/migrations.js b/services/kiloclaw/drizzle/migrations.js index 809f6627dc..24380d25db 100644 --- a/services/kiloclaw/drizzle/migrations.js +++ b/services/kiloclaw/drizzle/migrations.js @@ -1,9 +1,11 @@ import journal from './meta/_journal.json'; import m0000 from './0000_messy_grim_reaper.sql'; +import m0001 from './0001_chubby_slipstream.sql'; export default { journal, migrations: { m0000, + m0001, }, }; diff --git a/services/kiloclaw/src/db/index.ts b/services/kiloclaw/src/db/index.ts index 13a4fdd734..6b4cf002b3 100644 --- a/services/kiloclaw/src/db/index.ts +++ b/services/kiloclaw/src/db/index.ts @@ -12,6 +12,7 @@ import { kiloclaw_scheduled_actions, kiloclaw_scheduled_action_stages, kiloclaw_scheduled_action_targets, + kiloclaw_subscriptions, kiloclaw_version_pins, } from '@kilocode/db/schema'; import type { KiloClawScheduledActionStatus } from '@kilocode/db/schema-types'; @@ -105,6 +106,49 @@ export async function getActivePersonalInstance(db: WorkerDb, userId: string) { return { id: row.id, sandboxId: row.sandbox_id, orgId: row.organization_id }; } +// Admission assumes one active organization instance per assigned user. If +// legacy drift leaves multiple rows, the oldest row is the deterministic +// representative until reconciliation collapses the duplicate state. +export async function getActiveOrganizationInstance( + db: WorkerDb, + userId: string, + organizationId: string +) { + const row = await db + .select({ + id: kiloclaw_instances.id, + sandbox_id: kiloclaw_instances.sandbox_id, + organization_id: kiloclaw_instances.organization_id, + }) + .from(kiloclaw_instances) + .where( + and( + eq(kiloclaw_instances.user_id, userId), + eq(kiloclaw_instances.organization_id, organizationId), + isNull(kiloclaw_instances.destroyed_at) + ) + ) + .orderBy(kiloclaw_instances.created_at) + .limit(1) + .then(rows => rows[0] ?? null); + + if (!row) return null; + return { id: row.id, sandboxId: row.sandbox_id, orgId: row.organization_id }; +} + +export async function hasSubscriptionForInstance( + db: WorkerDb, + instanceId: string +): Promise { + const row = await db + .select({ id: kiloclaw_subscriptions.id }) + .from(kiloclaw_subscriptions) + .where(eq(kiloclaw_subscriptions.instance_id, instanceId)) + .limit(1) + .then(rows => rows[0] ?? null); + return row !== null; +} + /** * Look up an active instance by its sandboxId. * Used for DO restore when the DO has a stored sandboxId but lost other state. diff --git a/services/kiloclaw/src/db/sqlite-schema.ts b/services/kiloclaw/src/db/sqlite-schema.ts index af81fdbaa4..42a210bf24 100644 --- a/services/kiloclaw/src/db/sqlite-schema.ts +++ b/services/kiloclaw/src/db/sqlite-schema.ts @@ -1,6 +1,7 @@ -import { sqliteTable, text } from 'drizzle-orm/sqlite-core'; +import { sql } from 'drizzle-orm'; +import { sqliteTable, text, uniqueIndex } from 'drizzle-orm/sqlite-core'; -/** Registry DO SQLite table: tracks instance ownership per registry (user or org). */ +/** Registry DO SQLite table: tracks routable instance ownership per registry (user or org). */ export const registryInstances = sqliteTable('instances', { instance_id: text('instance_id').primaryKey(), do_key: text('do_key').notNull(), @@ -8,3 +9,25 @@ export const registryInstances = sqliteTable('instances', { created_at: text('created_at').notNull(), destroyed_at: text('destroyed_at'), }); + +export const registryProvisionReservations = sqliteTable( + 'provision_reservations', + { + instance_id: text('instance_id').primaryKey(), + do_key: text('do_key').notNull(), + assigned_user_id: text('assigned_user_id').notNull(), + status: text('status', { + enum: ['in_progress', 'completed', 'failed_requires_reconciliation', 'released'], + }).notNull(), + started_at: text('started_at').notNull(), + updated_at: text('updated_at').notNull(), + completed_at: text('completed_at'), + failure_code: text('failure_code'), + resolution_reason: text('resolution_reason'), + }, + table => [ + uniqueIndex('uq_provision_reservations_unresolved_user') + .on(table.assigned_user_id) + .where(sql`${table.status} IN ('in_progress', 'failed_requires_reconciliation')`), + ] +); diff --git a/services/kiloclaw/src/durable-objects/kiloclaw-instance.test.ts b/services/kiloclaw/src/durable-objects/kiloclaw-instance.test.ts index 454d78fa66..0d5135b67a 100644 --- a/services/kiloclaw/src/durable-objects/kiloclaw-instance.test.ts +++ b/services/kiloclaw/src/durable-objects/kiloclaw-instance.test.ts @@ -95,6 +95,8 @@ vi.mock('../lib/user-flags', () => ({ vi.mock('../db', () => ({ getWorkerDb: vi.fn(() => ({})), getActivePersonalInstance: vi.fn().mockResolvedValue(null), + getInstanceById: vi.fn().mockResolvedValue(null), + getInstanceByIdIncludingDestroyed: vi.fn().mockResolvedValue(null), findPepperByUserId: vi.fn().mockResolvedValue({ id: 'user-1', api_token_pepper: 'pepper-1', @@ -206,11 +208,17 @@ function createFakeStorage() { } return result; }, + list(): Map { + return new Map(store); + }, put(entries: Record): void { for (const [k, v] of Object.entries(entries)) { store.set(k, v); } }, + delete(key: string): void { + store.delete(key); + }, deleteAll(): void { store.clear(); alarmTime = null; @@ -224,6 +232,22 @@ function createFakeStorage() { deleteAlarm(): void { alarmTime = null; }, + async transaction(callback: (txn: unknown) => Promise): Promise { + return await callback({ + put(entries: Record): void { + for (const [k, v] of Object.entries(entries)) store.set(k, v); + }, + delete(keys: string | string[]): void { + for (const key of Array.isArray(keys) ? keys : [keys]) store.delete(key); + }, + setAlarm(time: number): void { + alarmTime = time; + }, + deleteAlarm(): void { + alarmTime = null; + }, + }); + }, // Test helpers _store: store, _getAlarm: () => alarmTime, @@ -256,6 +280,15 @@ function createFakeEnv(opts: { includeNorthflank?: boolean } = {}) { idFromName: vi.fn().mockReturnValue('fake-do-id'), get: vi.fn().mockReturnValue(appStub), } as unknown, + KILOCLAW_REGISTRY: { + idFromName: vi.fn((key: string) => key), + get: vi.fn().mockReturnValue({ + destroyInstance: vi.fn().mockResolvedValue(undefined), + finalizeDestroyedInstance: vi.fn().mockResolvedValue(undefined), + releaseFreshProvision: vi.fn().mockResolvedValue(undefined), + listInstances: vi.fn().mockResolvedValue([]), + }), + } as unknown, HYPERDRIVE: { connectionString: 'postgresql://fake' } as unknown, AGENT_ENV_VARS_PRIVATE_KEY: 'test-private-key', KV_CLAW_CACHE: { @@ -560,6 +593,36 @@ describe('two-phase destroy', () => { expect(destroyEvents[0]?.blobs).toEqual(expect.arrayContaining(['manual_user_request'])); }); + it('does not release an admission reservation during bootstrap cleanup destruction', async () => { + const env = createFakeEnv(); + const registryStub = (env.KILOCLAW_REGISTRY as unknown as { get: Mock }).get('user:user-1') as { + finalizeDestroyedInstance: Mock; + }; + const { instance, storage } = createInstance(createFakeStorage(), env); + await seedRunning(storage, { sandboxId: 'ki_11111111111141118111111111111111' }); + + (flyClient.destroyMachine as Mock).mockResolvedValue(undefined); + (flyClient.deleteVolume as Mock).mockResolvedValue(undefined); + + await instance.destroy({ reason: 'bootstrap_cleanup_failure' }); + + expect(registryStub.finalizeDestroyedInstance).not.toHaveBeenCalled(); + expect(storage._store.get('pendingRegistryCleanup')).toEqual( + expect.objectContaining({ releaseProvisionReservation: false }) + ); + + await instance.allowProvisionReservationReleaseOnFinalize(); + + expect(registryStub.finalizeDestroyedInstance).toHaveBeenCalledWith( + 'user:user-1', + 'user-1', + '11111111-1111-4111-8111-111111111111', + '11111111-1111-4111-8111-111111111111', + 'instance_destroyed' + ); + expect(storage._store.has('pendingRegistryCleanup')).toBe(false); + }); + it('keeps pendingDestroyMachineId when machine delete fails', async () => { const { instance, storage } = createInstance(); await seedRunning(storage); @@ -668,9 +731,11 @@ describe('two-phase destroy', () => { await expect(instance.destroy()).resolves.toBeDefined(); }); - it('alarm retries pending destroy to completion', async () => { - const { instance, storage } = createInstance(); + it('alarm retries pending destroy to completion and releases its provision reservation', async () => { + const env = createFakeEnv(); + const { instance, storage } = createInstance(createFakeStorage(), env); await seedProvisioned(storage, { + sandboxId: 'ki_11111111111141118111111111111111', status: 'destroying', flyMachineId: 'machine-1', flyVolumeId: 'vol-1', @@ -706,12 +771,96 @@ describe('two-phase destroy', () => { (flyClient.deleteVolume as Mock).mockResolvedValue(undefined); // Need a fresh instance to re-loadState from storage - const { instance: inst2 } = createInstance(storage); + const { instance: inst2 } = createInstance(storage, env); await inst2.alarm(); // Now fully cleaned up expect(storage._store.size).toBe(0); expect(storage._getAlarm()).toBeNull(); + const registryStub = (env.KILOCLAW_REGISTRY as unknown as { get: Mock }).get.mock.results[0] + ?.value as { finalizeDestroyedInstance: Mock }; + expect(registryStub.finalizeDestroyedInstance).toHaveBeenCalledWith( + 'user:user-1', + 'user-1', + '11111111-1111-4111-8111-111111111111', + '11111111-1111-4111-8111-111111111111', + 'instance_destroyed' + ); + }); + + it('releases a pending reservation after alarm sees canonical Postgres destroy confirmation', async () => { + const env = createFakeEnv(); + const registryStub = (env.KILOCLAW_REGISTRY as unknown as { get: Mock }).get('user:user-1') as { + finalizeDestroyedInstance: Mock; + }; + const { instance, storage } = createInstance(createFakeStorage(), env); + storage._store.set('pendingRegistryCleanup', { + userId: 'user-1', + orgId: null, + sandboxId: 'ki_11111111111141118111111111111111', + releaseProvisionReservation: false, + }); + const getWorkerDbSpy = vi.spyOn(db, 'getWorkerDb').mockReturnValue({} as never); + const getInstanceByIdSpy = vi.spyOn(db, 'getInstanceById').mockResolvedValue(null as never); + const getInstanceByIdIncludingDestroyedSpy = vi + .spyOn(db, 'getInstanceByIdIncludingDestroyed') + .mockResolvedValue({ id: '11111111-1111-4111-8111-111111111111' } as never); + + await instance.alarm(); + + expect(registryStub.finalizeDestroyedInstance).toHaveBeenCalledWith( + 'user:user-1', + 'user-1', + '11111111-1111-4111-8111-111111111111', + '11111111-1111-4111-8111-111111111111', + 'instance_destroyed' + ); + expect(storage._store.has('pendingRegistryCleanup')).toBe(false); + getWorkerDbSpy.mockRestore(); + getInstanceByIdSpy.mockRestore(); + getInstanceByIdIncludingDestroyedSpy.mockRestore(); + }); + + it('retries reservation release after alarm-completed destruction if Registry is unavailable', async () => { + const env = createFakeEnv(); + const registryStub = (env.KILOCLAW_REGISTRY as unknown as { get: Mock }).get('user:user-1') as { + finalizeDestroyedInstance: Mock; + }; + registryStub.finalizeDestroyedInstance.mockRejectedValueOnce(new Error('registry unavailable')); + const { instance, storage } = createInstance(createFakeStorage(), env); + await seedProvisioned(storage, { + sandboxId: 'ki_11111111111141118111111111111111', + status: 'destroying', + flyMachineId: 'machine-1', + flyVolumeId: 'vol-1', + providerState: { + provider: 'fly', + appName: 'acct-test', + machineId: 'machine-1', + volumeId: 'vol-1', + region: 'iad', + }, + pendingDestroyMachineId: 'machine-1', + pendingDestroyVolumeId: 'vol-1', + }); + (flyClient.destroyMachine as Mock).mockResolvedValue(undefined); + (flyClient.deleteVolume as Mock).mockResolvedValue(undefined); + + await instance.alarm(); + + expect(storage._store.get('pendingRegistryCleanup')).toEqual({ + userId: 'user-1', + orgId: null, + sandboxId: 'ki_11111111111141118111111111111111', + releaseProvisionReservation: true, + }); + expect(storage._getAlarm()).not.toBeNull(); + + const { instance: retryInstance } = createInstance(storage, env); + await retryInstance.alarm(); + + expect(storage._store.has('pendingRegistryCleanup')).toBe(false); + expect(registryStub.finalizeDestroyedInstance).toHaveBeenCalledTimes(2); }); it('fully destroys docker-local instances when container and volume deletes succeed', async () => { @@ -2346,6 +2495,14 @@ describe('status guards', () => { ); }); + it('provision() rejects a wiped explicit instance without fresh admission', async () => { + const { instance } = createInstance(); + + await expect( + instance.provision('user-1', {}, { instanceId: '11111111-1111-4111-8111-111111111111' }) + ).rejects.toThrow('Instance not provisioned'); + }); + it('stop() is a no-op when destroying', async () => { const { instance, storage } = createInstance(); await seedRunning(storage, { status: 'destroying' }); diff --git a/services/kiloclaw/src/durable-objects/kiloclaw-instance/index.ts b/services/kiloclaw/src/durable-objects/kiloclaw-instance/index.ts index 37eab1ef91..2030553fcd 100644 --- a/services/kiloclaw/src/durable-objects/kiloclaw-instance/index.ts +++ b/services/kiloclaw/src/durable-objects/kiloclaw-instance/index.ts @@ -10,6 +10,7 @@ import { DurableObject } from 'cloudflare:workers'; import type { KiloClawEnv } from '../../types'; +import { getInstanceById, getInstanceByIdIncludingDestroyed, getWorkerDb } from '../../db'; import type { OpenclawFileWriteValidation } from '../gateway-controller-types'; import type { InstanceConfig, @@ -112,6 +113,7 @@ import { markRestartSuccessful, emitDestroyPendingTelemetry, maybeEmitDestroyStuckTelemetry, + type FinalizeDestroyRetention, } from './reconcile'; import { restoreFromPostgres, @@ -196,6 +198,17 @@ function resolveInstanceTypeFromState( return state.instanceType ?? tryInstanceTypeLabel(state.machineSize, state.volumeSizeGb); } +type PendingRegistryCleanup = { + userId: string; + orgId: string | null; + sandboxId: string; + releaseProvisionReservation: boolean; +}; + +const PENDING_REGISTRY_CLEANUP_KEY = 'pendingRegistryCleanup'; +const SKIP_PROVISION_RESERVATION_RELEASE_KEY = 'skipProvisionReservationRelease'; +const REGISTRY_CLEANUP_RETRY_MS = 60_000; + export class KiloClawInstance extends DurableObject { private s: InstanceMutableState = createMutableState(); private startInProgress = false; @@ -803,7 +816,12 @@ export class KiloClawInstance extends DurableObject { async provision( userId: string, config: InstanceConfig, - opts?: { orgId?: string | null; instanceId?: string; provider?: ProviderId } + opts?: { + orgId?: string | null; + instanceId?: string; + provider?: ProviderId; + freshProvision?: boolean; + } ): Promise<{ sandboxId: string }> { const provisionStart = performance.now(); await this.loadState(); @@ -824,6 +842,9 @@ export class KiloClawInstance extends DurableObject { ? sandboxIdFromInstanceId(opts.instanceId) : sandboxIdFromUserId(userId); const isNew = !this.s.status; + if (opts?.instanceId && !opts.freshProvision && isNew) { + throw Object.assign(new Error('Instance not provisioned'), { status: 404 }); + } if (!isNew && opts?.provider && opts.provider !== this.s.provider) { throw Object.assign( new Error(`Cannot change provider from ${this.s.provider} to ${opts.provider}`), @@ -2514,6 +2535,131 @@ export class KiloClawInstance extends DurableObject { }; } + private registryCleanupRetention( + userId: string, + orgId: string | null, + sandboxId: string, + releaseProvisionReservation: boolean + ): FinalizeDestroyRetention { + const pendingCleanup = { + userId, + orgId, + sandboxId, + releaseProvisionReservation, + } satisfies PendingRegistryCleanup; + return { + entries: { [PENDING_REGISTRY_CLEANUP_KEY]: pendingCleanup }, + retryAlarmAt: Date.now() + REGISTRY_CLEANUP_RETRY_MS, + }; + } + + private async cleanupRegistryAfterFinalizedDestroy( + userId: string, + orgId: string | null, + sandboxId: string, + releaseProvisionReservation: boolean + ): Promise { + try { + const registryInstanceId = isInstanceKeyedSandboxId(sandboxId) + ? instanceIdFromSandboxId(sandboxId) + : null; + let releaseAllowed = releaseProvisionReservation; + if (!releaseAllowed && registryInstanceId) { + const connectionString = this.env.HYPERDRIVE?.connectionString; + if (!connectionString) throw new Error('HYPERDRIVE is not configured'); + const db = getWorkerDb(connectionString); + const active = await getInstanceById(db, registryInstanceId); + if (!active) { + const destroyed = await getInstanceByIdIncludingDestroyed(db, registryInstanceId, { + includeDestroyed: true, + }); + releaseAllowed = destroyed !== null; + } + } + const registryKeys = registryInstanceId + ? orgId + ? [`org:${orgId}`] + : [`user:${userId}`] + : orgId + ? [`user:${userId}`, `org:${orgId}`] + : [`user:${userId}`]; + + for (const registryKey of registryKeys) { + const registryStub = this.env.KILOCLAW_REGISTRY.get( + this.env.KILOCLAW_REGISTRY.idFromName(registryKey) + ); + if (registryInstanceId) { + if (releaseAllowed) { + await registryStub.finalizeDestroyedInstance( + registryKey, + userId, + registryInstanceId, + registryInstanceId, + 'instance_destroyed' + ); + } else { + await registryStub.destroyInstance(registryKey, registryInstanceId); + } + console.log('[DO] Registry entry destroyed on finalization:', { + registryKey, + instanceId: registryInstanceId, + }); + } else { + const legacyDoKeys = legacyDoKeysForIdentity(userId, sandboxId); + const entries = await registryStub.listInstances(registryKey); + const legacyEntry = entries.find(e => legacyDoKeys.includes(e.doKey)); + if (legacyEntry) { + await registryStub.destroyInstance(registryKey, legacyEntry.instanceId); + console.log('[DO] Registry entry destroyed on finalization (legacy):', { + registryKey, + instanceId: legacyEntry.instanceId, + doKeysTried: legacyDoKeys, + matchedDoKey: legacyEntry.doKey, + }); + } else { + console.log( + '[DO] Registry cleanup: no active entry found (already cleaned or never existed):', + { + registryKey, + doKeysTried: legacyDoKeys, + activeEntryCount: entries.length, + } + ); + } + } + } + if (!releaseAllowed) { + await this.ctx.storage.setAlarm(Date.now() + REGISTRY_CLEANUP_RETRY_MS); + return; + } + await this.ctx.storage.delete(PENDING_REGISTRY_CLEANUP_KEY); + await this.ctx.storage.deleteAlarm(); + } catch (registryErr) { + console.error('[DO] Registry cleanup on finalization failed; will retry:', registryErr); + await this.ctx.storage.setAlarm(Date.now() + REGISTRY_CLEANUP_RETRY_MS); + } + } + + async allowProvisionReservationReleaseOnFinalize(): Promise { + await this.ctx.storage.delete(SKIP_PROVISION_RESERVATION_RELEASE_KEY); + const pendingCleanup = await this.ctx.storage.get( + PENDING_REGISTRY_CLEANUP_KEY + ); + if (pendingCleanup) { + const permittedCleanup = { + ...pendingCleanup, + releaseProvisionReservation: true, + } satisfies PendingRegistryCleanup; + await this.ctx.storage.put({ [PENDING_REGISTRY_CLEANUP_KEY]: permittedCleanup }); + await this.cleanupRegistryAfterFinalizedDestroy( + permittedCleanup.userId, + permittedCleanup.orgId, + permittedCleanup.sandboxId, + true + ); + } + } + async destroy(options?: { reason?: KiloclawDestroyReason }): Promise { await this.loadState(); @@ -2528,6 +2674,12 @@ export class KiloClawInstance extends DurableObject { } const machineUptimeMs = this.s.lastStartedAt ? Date.now() - this.s.lastStartedAt : 0; + const releaseProvisionReservation = options?.reason !== 'bootstrap_cleanup_failure'; + if (releaseProvisionReservation) { + await this.ctx.storage.delete(SKIP_PROVISION_RESERVATION_RELEASE_KEY); + } else { + await this.ctx.storage.put(SKIP_PROVISION_RESERVATION_RELEASE_KEY, true); + } const runtimeId = getRuntimeId(this.s); const storageId = getStorageId(this.s); const destroyStartedAt = this.s.destroyStartedAt ?? Date.now(); @@ -2594,58 +2746,22 @@ export class KiloClawInstance extends DurableObject { this.s, destroyRctx, (userId, sandboxId) => - markDestroyedInPostgresHelper(this.env, this.ctx, this.s, userId, sandboxId) + markDestroyedInPostgresHelper(this.env, this.ctx, this.s, userId, sandboxId), + this.registryCleanupRetention( + preDestroyUserId, + preDestroyOrgId, + preDestroySandboxId, + releaseProvisionReservation + ) ); - // Clean up registry entry on finalization. This covers both platform-initiated - // and alarm-initiated destroys. The platform route's registry cleanup is - // redundant but harmless (destroyInstance is idempotent on already-destroyed entries). if (finalized.finalized && preDestroyUserId && preDestroySandboxId) { - try { - const registryInstanceId = isInstanceKeyedSandboxId(preDestroySandboxId) - ? instanceIdFromSandboxId(preDestroySandboxId) - : null; - - const registryKeys = [`user:${preDestroyUserId}`]; - if (preDestroyOrgId) registryKeys.push(`org:${preDestroyOrgId}`); - - for (const registryKey of registryKeys) { - const registryStub = this.env.KILOCLAW_REGISTRY.get( - this.env.KILOCLAW_REGISTRY.idFromName(registryKey) - ); - if (registryInstanceId) { - await registryStub.destroyInstance(registryKey, registryInstanceId); - console.log('[DO] Registry entry destroyed on finalization:', { - registryKey, - instanceId: registryInstanceId, - }); - } else { - const legacyDoKeys = legacyDoKeysForIdentity(preDestroyUserId, preDestroySandboxId); - const entries = await registryStub.listInstances(registryKey); - const legacyEntry = entries.find(e => legacyDoKeys.includes(e.doKey)); - if (legacyEntry) { - await registryStub.destroyInstance(registryKey, legacyEntry.instanceId); - console.log('[DO] Registry entry destroyed on finalization (legacy):', { - registryKey, - instanceId: legacyEntry.instanceId, - doKeysTried: legacyDoKeys, - matchedDoKey: legacyEntry.doKey, - }); - } else { - console.log( - '[DO] Registry cleanup: no active entry found (already cleaned or never existed):', - { - registryKey, - doKeysTried: legacyDoKeys, - activeEntryCount: entries.length, - } - ); - } - } - } - } catch (registryErr) { - console.error('[DO] Registry cleanup on finalization failed (non-fatal):', registryErr); - } + await this.cleanupRegistryAfterFinalizedDestroy( + preDestroyUserId, + preDestroyOrgId, + preDestroySandboxId, + releaseProvisionReservation + ); } if (!finalized.finalized) { @@ -4320,6 +4436,19 @@ export class KiloClawInstance extends DurableObject { // ======================================================================== override async alarm(): Promise { + const pendingRegistryCleanup = await this.ctx.storage.get( + PENDING_REGISTRY_CLEANUP_KEY + ); + if (pendingRegistryCleanup) { + await this.cleanupRegistryAfterFinalizedDestroy( + pendingRegistryCleanup.userId, + pendingRegistryCleanup.orgId, + pendingRegistryCleanup.sandboxId, + pendingRegistryCleanup.releaseProvisionReservation + ); + return; + } + await this.loadState(); if (!this.s.userId || !this.s.status) return; @@ -4422,6 +4551,18 @@ export class KiloClawInstance extends DurableObject { 'alarm_retained_recovery_cleanup' ); + const skipProvisionReservationRelease = + (await this.ctx.storage.get(SKIP_PROVISION_RESERVATION_RELEASE_KEY)) === true; + const pendingDestroyIdentity = + this.s.status === 'destroying' && this.s.userId && this.s.sandboxId + ? { + userId: this.s.userId, + orgId: this.s.orgId, + sandboxId: this.s.sandboxId, + releaseProvisionReservation: !skipProvisionReservationRelease, + } + : null; + try { if (this.s.provider !== 'fly') { if (this.s.status === 'destroying') { @@ -4432,9 +4573,24 @@ export class KiloClawInstance extends DurableObject { this.s, destroyRctx, (userId, sandboxId) => - markDestroyedInPostgresHelper(this.env, this.ctx, this.s, userId, sandboxId) + markDestroyedInPostgresHelper(this.env, this.ctx, this.s, userId, sandboxId), + pendingDestroyIdentity + ? this.registryCleanupRetention( + pendingDestroyIdentity.userId, + pendingDestroyIdentity.orgId, + pendingDestroyIdentity.sandboxId, + pendingDestroyIdentity.releaseProvisionReservation + ) + : undefined ); - if (!result.finalized) { + if (result.finalized && pendingDestroyIdentity) { + await this.cleanupRegistryAfterFinalizedDestroy( + pendingDestroyIdentity.userId, + pendingDestroyIdentity.orgId, + pendingDestroyIdentity.sandboxId, + pendingDestroyIdentity.releaseProvisionReservation + ); + } else if (!result.finalized) { await maybeEmitDestroyStuckTelemetry(this.ctx, this.s, destroyRctx); } } else { @@ -4455,9 +4611,26 @@ export class KiloClawInstance extends DurableObject { 'alarm', () => this.destroy({ reason: 'stale_provision_cleanup' }).then(() => undefined), (userId, sandboxId) => - markDestroyedInPostgresHelper(this.env, this.ctx, this.s, userId, sandboxId) + markDestroyedInPostgresHelper(this.env, this.ctx, this.s, userId, sandboxId), + pendingDestroyIdentity + ? this.registryCleanupRetention( + pendingDestroyIdentity.userId, + pendingDestroyIdentity.orgId, + pendingDestroyIdentity.sandboxId, + pendingDestroyIdentity.releaseProvisionReservation + ) + : undefined ); + if (pendingDestroyIdentity && this.s.status === null) { + await this.cleanupRegistryAfterFinalizedDestroy( + pendingDestroyIdentity.userId, + pendingDestroyIdentity.orgId, + pendingDestroyIdentity.sandboxId, + pendingDestroyIdentity.releaseProvisionReservation + ); + } + if (reconcileResult.beginUnexpectedStopRecovery && this.s.status === 'running') { await beginUnexpectedStopRecovery( this.recoveryRuntime(), diff --git a/services/kiloclaw/src/durable-objects/kiloclaw-instance/reconcile.ts b/services/kiloclaw/src/durable-objects/kiloclaw-instance/reconcile.ts index 75d00b9de4..b7a7147d55 100644 --- a/services/kiloclaw/src/durable-objects/kiloclaw-instance/reconcile.ts +++ b/services/kiloclaw/src/durable-objects/kiloclaw-instance/reconcile.ts @@ -48,6 +48,11 @@ import * as gateway from './gateway'; import { writeEvent, eventContextFromState } from '../../utils/analytics'; import { maybeDispatchStartFailurePush } from './lifecycle-push'; +export type FinalizeDestroyRetention = { + entries: Record; + retryAlarmAt: number; +}; + export type ReconcileWithFlyResult = { beginUnexpectedStopRecovery?: { flyState: 'stopped'; @@ -177,12 +182,20 @@ export async function reconcileWithFly( /** Callback to trigger a full destroy (calls back into the DO). */ triggerDestroy: () => Promise, /** Callback for marking Postgres row destroyed during finalization. */ - markDestroyedInPostgres?: (userId: string, sandboxId: string) => Promise + markDestroyedInPostgres?: (userId: string, sandboxId: string) => Promise, + finalizeRetention?: FinalizeDestroyRetention ): Promise { const rctx = createReconcileContext(state, env, reason); if (state.status === 'destroying') { - await retryPendingDestroy(flyConfig, ctx, state, rctx, markDestroyedInPostgres); + await retryPendingDestroy( + flyConfig, + ctx, + state, + rctx, + markDestroyedInPostgres, + finalizeRetention + ); return {}; } @@ -1393,7 +1406,8 @@ async function retryPendingDestroy( ctx: DurableObjectState, state: InstanceMutableState, rctx: ReconcileContext, - markDestroyedInPostgres?: (userId: string, sandboxId: string) => Promise + markDestroyedInPostgres?: (userId: string, sandboxId: string) => Promise, + finalizeRetention?: FinalizeDestroyRetention ): Promise { await recoverBoundMachineForDestroy(flyConfig, ctx, state, rctx); await tryDeleteMachine(flyConfig, ctx, state, rctx); @@ -1404,7 +1418,13 @@ async function retryPendingDestroy( // primary destroy path is unaffected. May also promote an attached orphan // into the pending pointers, which then defers finalize to the next alarm. await tryDeleteOrphanVolumes(flyConfig, ctx, state, rctx); - const result = await finalizeDestroyIfComplete(ctx, state, rctx, markDestroyedInPostgres); + const result = await finalizeDestroyIfComplete( + ctx, + state, + rctx, + markDestroyedInPostgres, + finalizeRetention + ); if (!result.finalized) { await maybeEmitDestroyStuckTelemetry(ctx, state, rctx); } @@ -1787,7 +1807,8 @@ export async function finalizeDestroyIfComplete( ctx: DurableObjectState, state: InstanceMutableState, rctx: ReconcileContext, - markDestroyedInPostgres?: (userId: string, sandboxId: string) => Promise + markDestroyedInPostgres?: (userId: string, sandboxId: string) => Promise, + finalizeRetention?: FinalizeDestroyRetention ): Promise { if (state.pendingDestroyMachineId || state.pendingDestroyVolumeId) { return destroyResultFromState(state, { @@ -1825,8 +1846,18 @@ export async function finalizeDestroyIfComplete( sandbox_id: destroyedSandboxId, }); - await ctx.storage.deleteAlarm(); - await ctx.storage.deleteAll(); + if (finalizeRetention) { + const keys = [...(await ctx.storage.list()).keys()]; + await ctx.storage.transaction(async txn => { + await txn.deleteAlarm(); + if (keys.length > 0) await txn.delete(keys); + await txn.put(finalizeRetention.entries); + await txn.setAlarm(finalizeRetention.retryAlarmAt); + }); + } else { + await ctx.storage.deleteAlarm(); + await ctx.storage.deleteAll(); + } resetMutableState(state); return destroyResultFromState(state, { finalized: true, destroyedUserId, destroyedSandboxId }); diff --git a/services/kiloclaw/src/durable-objects/kiloclaw-registry.test.ts b/services/kiloclaw/src/durable-objects/kiloclaw-registry.test.ts new file mode 100644 index 0000000000..32eea3b7da --- /dev/null +++ b/services/kiloclaw/src/durable-objects/kiloclaw-registry.test.ts @@ -0,0 +1,328 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import type * as DrizzleOrm from 'drizzle-orm'; +import { registryInstances, registryProvisionReservations } from '../db/sqlite-schema'; +import { sandboxIdFromUserId } from '../auth/sandbox-id'; + +type Predicate = (row: Record) => boolean; +type Row = Record; + +const { databaseRows, mockGetActivePersonalInstance, mockHasSubscriptionForInstance } = vi.hoisted( + () => ({ + databaseRows: { + instances: [] as Row[], + reservations: [] as Row[], + }, + mockGetActivePersonalInstance: vi.fn(), + mockHasSubscriptionForInstance: vi.fn(), + }) +); + +vi.mock('cloudflare:workers', () => ({ + DurableObject: class FakeDurableObject { + ctx: unknown; + env: unknown; + constructor(ctx: unknown, env: unknown) { + this.ctx = ctx; + this.env = env; + } + }, +})); +vi.mock('drizzle-orm/durable-sqlite/migrator', () => ({ migrate: vi.fn() })); +vi.mock('drizzle-orm', async importOriginal => { + const actual = await importOriginal(); + return { + ...actual, + eq: + (column: { name: string }, value: unknown): Predicate => + row => + row[column.name] === value, + isNull: + (column: { name: string }): Predicate => + row => + row[column.name] == null, + and: + (...predicates: Predicate[]): Predicate => + row => + predicates.every(predicate => predicate(row)), + inArray: + (column: { name: string }, values: unknown[]): Predicate => + row => + values.includes(row[column.name]), + }; +}); +vi.mock('drizzle-orm/durable-sqlite', () => ({ + drizzle: vi.fn(() => createFakeDatabase()), +})); +vi.mock('../db', () => ({ + getWorkerDb: vi.fn(() => ({})), + getActivePersonalInstance: mockGetActivePersonalInstance, + hasSubscriptionForInstance: mockHasSubscriptionForInstance, +})); + +function rowsForTable(table: unknown): Row[] { + return table === registryInstances ? databaseRows.instances : databaseRows.reservations; +} + +function createFakeDatabase() { + return { + select: vi.fn(() => ({ + from: vi.fn((table: unknown) => ({ + all: vi.fn(() => rowsForTable(table)), + where: vi.fn((predicate: Predicate) => ({ + all: vi.fn(() => rowsForTable(table).filter(predicate)), + get: vi.fn(() => rowsForTable(table).find(predicate)), + })), + })), + })), + insert: vi.fn((table: unknown) => ({ + values: vi.fn((values: Row) => ({ + onConflictDoNothing: vi.fn(() => ({ + run: vi.fn(() => { + const rows = rowsForTable(table); + if (!rows.some(row => row.instance_id === values.instance_id)) rows.push(values); + }), + })), + onConflictDoUpdate: vi.fn((options: { set: Row }) => ({ + run: vi.fn(() => { + const rows = rowsForTable(table); + const existing = rows.find(row => row.instance_id === values.instance_id); + if (existing) Object.assign(existing, options.set); + else rows.push(values); + }), + })), + run: vi.fn(() => { + const rows = rowsForTable(table); + if (table === registryProvisionReservations) { + const unresolved = rows.some( + row => + row.assigned_user_id === values.assigned_user_id && + ['in_progress', 'failed_requires_reconciliation'].includes(String(row.status)) + ); + if (unresolved) throw new Error('UNIQUE constraint failed'); + } + rows.push(values); + }), + })), + })), + update: vi.fn((table: unknown) => ({ + set: vi.fn((patch: Row) => ({ + where: vi.fn((predicate: Predicate) => ({ + run: vi.fn(() => { + for (const row of rowsForTable(table).filter(predicate)) Object.assign(row, patch); + }), + })), + })), + })), + }; +} + +function createState() { + const values = new Map(); + return { + storage: { + get: vi.fn(async (key: string) => values.get(key)), + put: vi.fn(async (key: string, value: unknown) => values.set(key, value)), + transactionSync: vi.fn((callback: () => unknown) => callback()), + }, + blockConcurrencyWhile: vi.fn((callback: () => Promise) => callback()), + }; +} + +import { KiloClawRegistry } from './kiloclaw-registry'; + +describe('KiloClawRegistry fresh provision reservations', () => { + beforeEach(() => { + databaseRows.instances.length = 0; + databaseRows.reservations.length = 0; + mockGetActivePersonalInstance.mockReset().mockResolvedValue(null); + mockHasSubscriptionForInstance.mockReset().mockResolvedValue(false); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('keeps legacy user-keyed rows routable before early-bird backfill completes', async () => { + mockGetActivePersonalInstance.mockResolvedValue({ + id: 'instance-legacy', + sandboxId: sandboxIdFromUserId('user-1'), + orgId: null, + }); + const registry = new KiloClawRegistry( + createState() as never, + { HYPERDRIVE: { connectionString: 'postgresql://fake' } } as never + ); + + expect(await registry.listInstances('user:user-1')).toEqual([ + expect.objectContaining({ instanceId: 'instance-legacy', assignedUserId: 'user-1' }), + ]); + }); + + it('does not lazily publish an unpaired instance-keyed row or retry it after quarantine', async () => { + const now = Date.now(); + vi.spyOn(Date, 'now').mockReturnValue(now); + mockGetActivePersonalInstance.mockResolvedValue({ + id: 'instance-1', + sandboxId: 'ki_11111111111141118111111111111111', + orgId: null, + }); + const registry = new KiloClawRegistry( + createState() as never, + { HYPERDRIVE: { connectionString: 'postgresql://fake' } } as never + ); + + expect(await registry.listInstances('user:user-1')).toEqual([]); + + mockHasSubscriptionForInstance.mockResolvedValue(true); + vi.mocked(Date.now).mockReturnValue(now + 60_001); + expect(await registry.listInstances('user:user-1')).toEqual([]); + expect(mockHasSubscriptionForInstance).toHaveBeenCalledTimes(1); + }); + + it('admits one unresolved personal provision and rejects a duplicate', async () => { + const registry = new KiloClawRegistry(createState() as never, {} as never); + + const first = await registry.beginFreshProvision('user:user-1', 'user-1', 'instance-1', 'do-1'); + const second = await registry.beginFreshProvision( + 'user:user-1', + 'user-1', + 'instance-2', + 'do-2' + ); + + expect(first.outcome).toBe('admitted'); + expect(second).toMatchObject({ + outcome: 'conflict', + reservation: { instanceId: 'instance-1', status: 'in_progress' }, + }); + }); + + it('allows distinct assigned users to provision within one organization registry', async () => { + const registry = new KiloClawRegistry(createState() as never, {} as never); + + const first = await registry.beginFreshProvision('org:org-1', 'user-1', 'instance-1', 'do-1'); + const second = await registry.beginFreshProvision('org:org-1', 'user-2', 'instance-2', 'do-2'); + + expect(first.outcome).toBe('admitted'); + expect(second.outcome).toBe('admitted'); + }); + + it('blocks reconciliation-required attempts until an operator releases them', async () => { + const registry = new KiloClawRegistry(createState() as never, {} as never); + await registry.beginFreshProvision('user:user-1', 'user-1', 'instance-1', 'do-1'); + await registry.failFreshProvision('user:user-1', 'user-1', 'instance-1', 'provider_failed'); + + const blocked = await registry.beginFreshProvision( + 'user:user-1', + 'user-1', + 'instance-2', + 'do-2' + ); + expect(blocked).toMatchObject({ + outcome: 'conflict', + reservation: { instanceId: 'instance-1', status: 'failed_requires_reconciliation' }, + }); + + await registry.releaseFreshProvision( + 'user:user-1', + 'user-1', + 'instance-1', + 'operator_verified' + ); + const retry = await registry.beginFreshProvision('user:user-1', 'user-1', 'instance-2', 'do-2'); + expect(retry.outcome).toBe('admitted'); + }); + + it('keeps reservations out of routable instance reads until completion', async () => { + const registry = new KiloClawRegistry(createState() as never, {} as never); + await registry.beginFreshProvision('user:user-1', 'user-1', 'instance-1', 'do-1'); + + expect(await registry.listInstances('user:user-1')).toEqual([]); + await registry.completeFreshProvision('user:user-1', 'user-1', 'instance-1', 'do-1'); + expect(await registry.listInstances('user:user-1')).toEqual([ + expect.objectContaining({ instanceId: 'instance-1', doKey: 'do-1' }), + ]); + }); + + it('repairs completion idempotently after a lost finalization acknowledgement', async () => { + const registry = new KiloClawRegistry(createState() as never, {} as never); + await registry.beginFreshProvision('user:user-1', 'user-1', 'instance-1', 'do-1'); + + await registry.repairCompletedProvision('user:user-1', 'user-1', 'instance-1', 'do-1'); + await registry.repairCompletedProvision('user:user-1', 'user-1', 'instance-1', 'do-1'); + + const all = await registry.listAllInstances('user:user-1'); + expect(all.entries).toHaveLength(1); + expect(all.reservations).toEqual([ + expect.objectContaining({ instanceId: 'instance-1', status: 'completed' }), + ]); + }); + + it('revives a tombstoned route entry when a reserved canonical provision is repaired', async () => { + const registry = new KiloClawRegistry(createState() as never, {} as never); + await registry.beginFreshProvision('user:user-1', 'user-1', 'instance-1', 'do-new'); + databaseRows.instances.push({ + instance_id: 'instance-1', + do_key: 'do-old', + assigned_user_id: 'user-old', + created_at: '2026-05-30T00:00:00.000Z', + destroyed_at: '2026-05-30T01:00:00.000Z', + }); + + await registry.repairCompletedProvision('user:user-1', 'user-1', 'instance-1', 'do-new'); + + expect(await registry.listInstances('user:user-1')).toEqual([ + expect.objectContaining({ + instanceId: 'instance-1', + doKey: 'do-new', + assignedUserId: 'user-1', + destroyedAt: null, + }), + ]); + }); + + it('does not resurrect a released reservation during late completion', async () => { + const registry = new KiloClawRegistry(createState() as never, {} as never); + await registry.beginFreshProvision('user:user-1', 'user-1', 'instance-1', 'do-1'); + await registry.releaseFreshProvision('user:user-1', 'user-1', 'instance-1', 'destroyed'); + + await expect( + registry.completeFreshProvision('user:user-1', 'user-1', 'instance-1', 'do-1') + ).rejects.toThrow('Cannot complete a released provision reservation'); + expect(await registry.listInstances('user:user-1')).toEqual([]); + }); + + it('does not resurrect a completed provision after destroy fences delayed repair', async () => { + const registry = new KiloClawRegistry(createState() as never, {} as never); + await registry.beginFreshProvision('user:user-1', 'user-1', 'instance-1', 'do-1'); + await registry.completeFreshProvision('user:user-1', 'user-1', 'instance-1', 'do-1'); + await registry.finalizeDestroyedInstance( + 'user:user-1', + 'user-1', + 'instance-1', + 'do-1', + 'destroyed' + ); + + await expect( + registry.repairCompletedProvision('user:user-1', 'user-1', 'instance-1', 'do-1') + ).rejects.toThrow('Cannot complete a released provision reservation'); + expect(await registry.listInstances('user:user-1')).toEqual([]); + }); + + it('refuses recovery publication after destroy established a tombstone without reservation', async () => { + const registry = new KiloClawRegistry(createState() as never, {} as never); + await registry.finalizeDestroyedInstance( + 'user:user-1', + 'user-1', + 'instance-1', + 'do-1', + 'destroyed' + ); + + expect( + await registry.publishRecoveredInstance('user:user-1', 'user-1', 'instance-1', 'do-1') + ).toBe(false); + expect(await registry.listInstances('user:user-1')).toEqual([]); + }); +}); diff --git a/services/kiloclaw/src/durable-objects/kiloclaw-registry.ts b/services/kiloclaw/src/durable-objects/kiloclaw-registry.ts index c7247e2d44..491044a931 100644 --- a/services/kiloclaw/src/durable-objects/kiloclaw-registry.ts +++ b/services/kiloclaw/src/durable-objects/kiloclaw-registry.ts @@ -1,12 +1,13 @@ import { DurableObject } from 'cloudflare:workers'; import { drizzle, type DrizzleSqliteDODatabase } from 'drizzle-orm/durable-sqlite'; import { migrate } from 'drizzle-orm/durable-sqlite/migrator'; -import { eq, isNull, and } from 'drizzle-orm'; +import { eq, isNull, and, inArray } from 'drizzle-orm'; import migrations from '../../drizzle/migrations'; -import { registryInstances } from '../db/sqlite-schema'; -import { getWorkerDb, getActivePersonalInstance } from '../db'; +import { registryInstances, registryProvisionReservations } from '../db/sqlite-schema'; +import { getWorkerDb, getActivePersonalInstance, hasSubscriptionForInstance } from '../db'; import type { KiloClawEnv } from '../types'; import { doKeyFromActiveInstance } from '../lib/instance-routing'; +import { isInstanceKeyedSandboxId } from '@kilocode/worker-utils/instance-id'; export type RegistryEntry = { instanceId: string; @@ -16,6 +17,28 @@ export type RegistryEntry = { destroyedAt: string | null; }; +export type ProvisionReservationStatus = + | 'in_progress' + | 'completed' + | 'failed_requires_reconciliation' + | 'released'; + +export type ProvisionReservationEntry = { + instanceId: string; + doKey: string; + assignedUserId: string; + status: ProvisionReservationStatus; + startedAt: string; + updatedAt: string; + completedAt: string | null; + failureCode: string | null; + resolutionReason: string | null; +}; + +export type BeginFreshProvisionResult = + | { outcome: 'admitted'; reservation: ProvisionReservationEntry } + | { outcome: 'conflict'; reservation: ProvisionReservationEntry }; + function rowToEntry(row: typeof registryInstances.$inferSelect): RegistryEntry { return { instanceId: row.instance_id, @@ -26,6 +49,22 @@ function rowToEntry(row: typeof registryInstances.$inferSelect): RegistryEntry { }; } +function rowToReservation( + row: typeof registryProvisionReservations.$inferSelect +): ProvisionReservationEntry { + return { + instanceId: row.instance_id, + doKey: row.do_key, + assignedUserId: row.assigned_user_id, + status: row.status, + startedAt: row.started_at, + updatedAt: row.updated_at, + completedAt: row.completed_at, + failureCode: row.failure_code, + resolutionReason: row.resolution_reason, + }; +} + /** * KiloClawRegistry DO — SQLite-backed index of instances per owner. * @@ -95,10 +134,12 @@ export class KiloClawRegistry extends DurableObject { .map(rowToEntry); } - /** List all registry entries including destroyed ones, plus migration status (admin). */ - async listAllInstances( - ownerKey: string - ): Promise<{ entries: RegistryEntry[]; migrated: boolean }> { + /** List all registry entries and fresh-provision admission state for admin inspection. */ + async listAllInstances(ownerKey: string): Promise<{ + entries: RegistryEntry[]; + reservations: ProvisionReservationEntry[]; + migrated: boolean; + }> { await this.ensureOwnerKey(ownerKey); if (!this.migrated) { @@ -110,7 +151,204 @@ export class KiloClawRegistry extends DurableObject { } const entries = this.db.select().from(registryInstances).all().map(rowToEntry); - return { entries, migrated: this.migrated }; + const reservations = this.db + .select() + .from(registryProvisionReservations) + .all() + .map(rowToReservation); + return { entries, reservations, migrated: this.migrated }; + } + + async beginFreshProvision( + ownerKey: string, + assignedUserId: string, + instanceId: string, + doKey: string + ): Promise { + await this.ensureOwnerKey(ownerKey); + const now = new Date().toISOString(); + + try { + const reservation = this.ctx.storage.transactionSync(() => { + this.db + .insert(registryProvisionReservations) + .values({ + instance_id: instanceId, + do_key: doKey, + assigned_user_id: assignedUserId, + status: 'in_progress', + started_at: now, + updated_at: now, + }) + .run(); + const row = this.db + .select() + .from(registryProvisionReservations) + .where(eq(registryProvisionReservations.instance_id, instanceId)) + .get(); + if (!row) throw new Error('Provision reservation missing after insertion'); + return rowToReservation(row); + }); + return { outcome: 'admitted', reservation }; + } catch (error) { + const unresolved = this.db + .select() + .from(registryProvisionReservations) + .where( + and( + eq(registryProvisionReservations.assigned_user_id, assignedUserId), + inArray(registryProvisionReservations.status, [ + 'in_progress', + 'failed_requires_reconciliation', + ]) + ) + ) + .get(); + if (unresolved) return { outcome: 'conflict', reservation: rowToReservation(unresolved) }; + throw error; + } + } + + async completeFreshProvision( + ownerKey: string, + assignedUserId: string, + instanceId: string, + doKey: string + ): Promise { + await this.ensureOwnerKey(ownerKey); + this.finalizeFreshProvision(assignedUserId, instanceId, doKey, true); + } + + async repairCompletedProvision( + ownerKey: string, + assignedUserId: string, + instanceId: string, + doKey: string + ): Promise { + await this.ensureOwnerKey(ownerKey); + return this.finalizeFreshProvision(assignedUserId, instanceId, doKey, false); + } + + async failFreshProvision( + ownerKey: string, + assignedUserId: string, + instanceId: string, + failureCode: string + ): Promise { + await this.ensureOwnerKey(ownerKey); + const now = new Date().toISOString(); + this.db + .update(registryProvisionReservations) + .set({ + status: 'failed_requires_reconciliation', + updated_at: now, + failure_code: failureCode, + }) + .where( + and( + eq(registryProvisionReservations.instance_id, instanceId), + eq(registryProvisionReservations.assigned_user_id, assignedUserId), + eq(registryProvisionReservations.status, 'in_progress') + ) + ) + .run(); + } + + async releaseFreshProvision( + ownerKey: string, + assignedUserId: string, + instanceId: string, + reason: string + ): Promise { + await this.ensureOwnerKey(ownerKey); + const now = new Date().toISOString(); + this.db + .update(registryProvisionReservations) + .set({ status: 'released', updated_at: now, resolution_reason: reason }) + .where( + and( + eq(registryProvisionReservations.instance_id, instanceId), + eq(registryProvisionReservations.assigned_user_id, assignedUserId), + inArray(registryProvisionReservations.status, [ + 'in_progress', + 'completed', + 'failed_requires_reconciliation', + ]) + ) + ) + .run(); + } + + async listProvisionReservations(ownerKey: string): Promise { + await this.ensureOwnerKey(ownerKey); + return this.db.select().from(registryProvisionReservations).all().map(rowToReservation); + } + + private finalizeFreshProvision( + assignedUserId: string, + instanceId: string, + doKey: string, + reservationRequired: boolean + ): boolean { + const now = new Date().toISOString(); + return this.ctx.storage.transactionSync(() => { + const reservation = this.db + .select() + .from(registryProvisionReservations) + .where( + and( + eq(registryProvisionReservations.instance_id, instanceId), + eq(registryProvisionReservations.assigned_user_id, assignedUserId) + ) + ) + .get(); + if (!reservation) { + if (reservationRequired) + throw new Error('Provision reservation not found during completion'); + return false; + } + if (reservation.status === 'released') { + throw new Error('Cannot complete a released provision reservation'); + } + // A canonical active row plus subscription is stronger evidence than the + // transient provider-side failure that originally set reconciliation state. + // Repair deliberately clears that state only after the Worker has verified + // canonical success before invoking this method. + if ( + reservationRequired && + reservation.status !== 'in_progress' && + reservation.status !== 'completed' + ) { + throw new Error(`Cannot complete provision reservation from ${reservation.status}`); + } + + this.db + .update(registryProvisionReservations) + .set({ + status: 'completed', + updated_at: now, + completed_at: now, + failure_code: null, + resolution_reason: null, + }) + .where(eq(registryProvisionReservations.instance_id, instanceId)) + .run(); + this.db + .insert(registryInstances) + .values({ + instance_id: instanceId, + do_key: doKey, + assigned_user_id: assignedUserId, + created_at: now, + destroyed_at: null, + }) + .onConflictDoUpdate({ + target: registryInstances.instance_id, + set: { do_key: doKey, assigned_user_id: assignedUserId, destroyed_at: null }, + }) + .run(); + return true; + }); } async createInstance( @@ -128,11 +366,42 @@ export class KiloClawRegistry extends DurableObject { do_key: doKey, assigned_user_id: assignedUserId, created_at: new Date().toISOString(), + destroyed_at: null, }) .onConflictDoNothing() .run(); } + async publishRecoveredInstance( + ownerKey: string, + assignedUserId: string, + instanceId: string, + doKey: string + ): Promise { + await this.ensureOwnerKey(ownerKey); + return this.ctx.storage.transactionSync(() => { + const existing = this.db + .select() + .from(registryInstances) + .where(eq(registryInstances.instance_id, instanceId)) + .get(); + if (existing?.destroyed_at) return false; + if (!existing) { + this.db + .insert(registryInstances) + .values({ + instance_id: instanceId, + do_key: doKey, + assigned_user_id: assignedUserId, + created_at: new Date().toISOString(), + destroyed_at: null, + }) + .run(); + } + return true; + }); + } + async destroyInstance(ownerKey: string, instanceId: string): Promise { await this.ensureOwnerKey(ownerKey); @@ -145,6 +414,48 @@ export class KiloClawRegistry extends DurableObject { .run(); } + async finalizeDestroyedInstance( + ownerKey: string, + assignedUserId: string, + instanceId: string, + doKey: string, + reason: string + ): Promise { + await this.ensureOwnerKey(ownerKey); + const now = new Date().toISOString(); + this.ctx.storage.transactionSync(() => { + this.db + .insert(registryInstances) + .values({ + instance_id: instanceId, + do_key: doKey, + assigned_user_id: assignedUserId, + created_at: now, + destroyed_at: now, + }) + .onConflictDoUpdate({ + target: registryInstances.instance_id, + set: { destroyed_at: now }, + }) + .run(); + this.db + .update(registryProvisionReservations) + .set({ status: 'released', updated_at: now, resolution_reason: reason }) + .where( + and( + eq(registryProvisionReservations.instance_id, instanceId), + eq(registryProvisionReservations.assigned_user_id, assignedUserId), + inArray(registryProvisionReservations.status, [ + 'in_progress', + 'completed', + 'failed_requires_reconciliation', + ]) + ) + ) + .run(); + }); + } + async resolveDoKey(ownerKey: string, instanceId: string): Promise { await this.ensureOwnerKey(ownerKey); @@ -206,6 +517,15 @@ export class KiloClawRegistry extends DurableObject { const instance = await getActivePersonalInstance(db, userId); if (instance) { + const hasSubscription = await hasSubscriptionForInstance(db, instance.id); + if (!hasSubscription && isInstanceKeyedSandboxId(instance.sandboxId)) { + // Instance-keyed rows without a subscription are quarantine state. + // They must not be published through lazy migration, and later + // subscription recovery is responsible for publishing their route. + this.migrated = true; + await this.ctx.storage.put('migrated', true); + return; + } const doKey = doKeyFromActiveInstance(instance); this.db .insert(registryInstances) @@ -218,6 +538,9 @@ export class KiloClawRegistry extends DurableObject { .onConflictDoNothing() .run(); } + // Legacy user-keyed rows can remain subscription-less until early-bird backfill + // completes, so they stay routable. New instance-keyed rows without a + // subscription are quarantine state and must not be published. // No Postgres row means no legacy instance — Postgres is the source of truth. // Orphaned DOs (state but no Postgres row) only occur via manual DB deletion // and are handled by the resolveRegistryEntry fallback in index.ts. diff --git a/services/kiloclaw/src/routes/platform-provision-bootstrap.test.ts b/services/kiloclaw/src/routes/platform-provision-bootstrap.test.ts index c62ef640a6..6eafcc64f4 100644 --- a/services/kiloclaw/src/routes/platform-provision-bootstrap.test.ts +++ b/services/kiloclaw/src/routes/platform-provision-bootstrap.test.ts @@ -2,6 +2,7 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'; import type * as DbModule from '../db'; import type * as ProvisionBootstrapModule from './provision-bootstrap'; import type * as AnalyticsModule from '../utils/analytics'; +import type { BeginFreshProvisionResult } from '../durable-objects/kiloclaw-registry'; const { mockGetWorkerDb, @@ -86,9 +87,19 @@ function createSelectBuilder(rows: T[]): SelectBuilder { return builder; } -function createWorkerDb() { +function createWorkerDb(options?: { + existingActiveInstance?: { id: string; sandboxId: string; organizationId: string | null }; + activeInstanceReads?: Array<{ + id: string; + sandboxId: string; + organizationId: string | null; + } | null>; + hasSubscription?: boolean; +}) { const txInsertReturningQueue = [[{ id: 'instance-new', sandboxId: 'sandbox-new' }], [], []]; const updateSets: Array> = []; + const existingActiveInstance = options?.existingActiveInstance; + const activeInstanceReads = [...(options?.activeInstanceReads ?? [])]; let insertedInstance: { id: string; userId: string; @@ -99,11 +110,30 @@ function createWorkerDb() { destroyedAt: string | null; } | null = null; - const createSelectRows = (fields: Record) => { + const createSelectRows = (fields: Record): Array> => { if ('alias' in fields) { return []; } + if ('sandbox_id' in fields) { + const activeInstance = + activeInstanceReads.length > 0 ? activeInstanceReads.shift() : existingActiveInstance; + if (activeInstance) { + return [ + { + id: activeInstance.id, + sandbox_id: activeInstance.sandboxId, + organization_id: activeInstance.organizationId, + }, + ]; + } + return []; + } + + if (Object.keys(fields).length === 1 && 'id' in fields && options?.hasSubscription) { + return [{ id: 'subscription-1' }]; + } + if (!insertedInstance) { return []; } @@ -205,28 +235,65 @@ function createWorkerDb() { return await callback(tx); }), - select: vi.fn(() => createSelectBuilder([])), + select: vi.fn((fields: Record) => + createSelectBuilder(createSelectRows(fields)) + ), }; } function makeEnv() { - const destroy = vi.fn().mockResolvedValue(undefined); + const destroy = vi + .fn<(options?: { reason?: string }) => Promise<{ finalized: boolean } | undefined>>() + .mockResolvedValue(undefined); + const allowProvisionReservationReleaseOnFinalize = vi.fn().mockResolvedValue(undefined); const provision = vi.fn().mockResolvedValue({ sandboxId: 'sandbox-new' }); + const beginFreshProvision = vi.fn< + ( + registryKey: string, + assignedUserId: string, + instanceId: string, + doKey: string + ) => Promise + >(async (_registryKey: string, assignedUserId: string, instanceId: string, doKey: string) => ({ + outcome: 'admitted', + reservation: { + instanceId, + doKey, + assignedUserId, + status: 'in_progress', + startedAt: '2026-05-31T00:00:00.000Z', + updatedAt: '2026-05-31T00:00:00.000Z', + completedAt: null, + failureCode: null, + resolutionReason: null, + }, + })); + const completeFreshProvision = vi.fn().mockResolvedValue(undefined); + const repairCompletedProvision = vi.fn().mockResolvedValue(true); + const failFreshProvision = vi.fn().mockResolvedValue(undefined); + const releaseFreshProvision = vi.fn().mockResolvedValue(undefined); + const createInstance = vi.fn().mockResolvedValue(undefined); + const registryStub = { + beginFreshProvision, + completeFreshProvision, + repairCompletedProvision, + failFreshProvision, + releaseFreshProvision, + createInstance, + listInstances: vi.fn().mockResolvedValue([]), + destroyInstance: vi.fn().mockResolvedValue(undefined), + }; return { env: { HYPERDRIVE: { connectionString: 'postgresql://fake' }, KILOCLAW_INSTANCE: { idFromName: vi.fn((id: string) => id), - get: vi.fn(() => ({ provision, destroy })), + get: vi.fn(() => ({ provision, destroy, allowProvisionReservationReleaseOnFinalize })), }, KILOCLAW_REGISTRY: { idFromName: vi.fn((id: string) => id), - get: vi.fn(() => ({ - createInstance: vi.fn().mockResolvedValue(undefined), - listInstances: vi.fn().mockResolvedValue([]), - destroyInstance: vi.fn().mockResolvedValue(undefined), - })), + get: vi.fn(() => registryStub), }, KILOCLAW_AE: { writeDataPoint: vi.fn() }, KV_CLAW_CACHE: { @@ -238,7 +305,14 @@ function makeEnv() { }, } as never, destroy, + allowProvisionReservationReleaseOnFinalize, provision, + beginFreshProvision, + completeFreshProvision, + repairCompletedProvision, + failFreshProvision, + releaseFreshProvision, + createInstance, }; } @@ -282,8 +356,213 @@ describe('platform provision bootstrap quarantine', () => { ); }); - it('returns an error and marks fresh instance destroyed when RPC and fallback both fail', async () => { - const { env, destroy } = makeEnv(); + it('admits fresh provisioning before provider work and completes after bootstrap', async () => { + const { env, beginFreshProvision, provision, completeFreshProvision } = makeEnv(); + mockGetWorkerDb.mockReturnValue(createWorkerDb()); + mockBootstrapProvisionedSubscriptionWithFallback.mockResolvedValueOnce({ mode: 'rpc' }); + + const response = await platform.request( + '/provision', + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ userId: 'user-1', provider: 'fly' }), + }, + env + ); + + expect(response.status).toBe(201); + expect(beginFreshProvision).toHaveBeenCalledOnce(); + expect(beginFreshProvision.mock.invocationCallOrder[0]).toBeLessThan( + provision.mock.invocationCallOrder[0] ?? Number.MAX_SAFE_INTEGER + ); + expect(completeFreshProvision.mock.invocationCallOrder[0]).toBeGreaterThan( + mockBootstrapProvisionedSubscriptionWithFallback.mock.invocationCallOrder[0] ?? 0 + ); + }); + + it('returns a conflict without provider work when fresh admission is already occupied', async () => { + const { env, beginFreshProvision, provision } = makeEnv(); + mockGetWorkerDb.mockReturnValue(createWorkerDb()); + beginFreshProvision.mockResolvedValueOnce({ + outcome: 'conflict', + reservation: { + instanceId: 'existing-reservation', + doKey: 'existing-reservation', + assignedUserId: 'user-1', + status: 'in_progress', + startedAt: '2026-05-31T00:00:00.000Z', + updatedAt: '2026-05-31T00:00:00.000Z', + completedAt: null, + failureCode: null, + resolutionReason: null, + }, + }); + + const response = await platform.request( + '/provision', + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ userId: 'user-1', provider: 'fly' }), + }, + env + ); + + expect(response.status).toBe(409); + await expect(response.json()).resolves.toMatchObject({ code: 'provision_in_progress' }); + expect(provision).not.toHaveBeenCalled(); + }); + + it('releases admission without provider work when a subscribed canonical active instance exists', async () => { + const { env, provision, releaseFreshProvision, repairCompletedProvision } = makeEnv(); + mockGetWorkerDb.mockReturnValue( + createWorkerDb({ + existingActiveInstance: { + id: '11111111-1111-4111-8111-111111111111', + sandboxId: 'ki_11111111111141118111111111111111', + organizationId: null, + }, + hasSubscription: true, + }) + ); + + const response = await platform.request( + '/provision', + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ userId: 'user-1', provider: 'fly' }), + }, + env + ); + + expect(response.status).toBe(409); + await expect(response.json()).resolves.toMatchObject({ code: 'instance_already_active' }); + expect(provision).not.toHaveBeenCalled(); + expect(releaseFreshProvision).toHaveBeenCalledWith( + 'user:user-1', + 'user-1', + expect.any(String), + 'active_instance_exists' + ); + expect(repairCompletedProvision).toHaveBeenCalledWith( + 'user:user-1', + 'user-1', + '11111111-1111-4111-8111-111111111111', + expect.any(String) + ); + }); + + it('does not publish an active row that lacks canonical subscription state', async () => { + const { env, provision, releaseFreshProvision, repairCompletedProvision, createInstance } = + makeEnv(); + mockGetWorkerDb.mockReturnValue( + createWorkerDb({ + existingActiveInstance: { + id: '11111111-1111-4111-8111-111111111111', + sandboxId: 'ki_11111111111141118111111111111111', + organizationId: null, + }, + }) + ); + + const response = await platform.request( + '/provision', + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ userId: 'user-1', provider: 'fly' }), + }, + env + ); + + expect(response.status).toBe(409); + expect(provision).not.toHaveBeenCalled(); + expect(releaseFreshProvision).toHaveBeenCalled(); + expect(repairCompletedProvision).not.toHaveBeenCalled(); + expect(createInstance).not.toHaveBeenCalled(); + }); + + it('repairs a completed reservation through the dedicated endpoint', async () => { + const { env, repairCompletedProvision } = makeEnv(); + mockGetWorkerDb.mockReturnValue( + createWorkerDb({ + existingActiveInstance: { + id: '11111111-1111-4111-8111-111111111111', + sandboxId: 'ki_11111111111141118111111111111111', + organizationId: null, + }, + hasSubscription: true, + }) + ); + + const response = await platform.request( + '/provision/repair-reservation', + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + userId: 'user-1', + instanceId: '11111111-1111-4111-8111-111111111111', + }), + }, + env + ); + + expect(response.status).toBe(200); + expect(repairCompletedProvision).toHaveBeenCalledWith( + 'user:user-1', + 'user-1', + '11111111-1111-4111-8111-111111111111', + expect.any(String) + ); + }); + + it('repairs Registry completion before returning a successful fresh provision', async () => { + const { env, completeFreshProvision, repairCompletedProvision } = makeEnv(); + mockGetWorkerDb.mockReturnValue(createWorkerDb()); + mockBootstrapProvisionedSubscriptionWithFallback.mockResolvedValueOnce({ mode: 'rpc' }); + completeFreshProvision.mockRejectedValueOnce(new Error('completion unavailable')); + + const response = await platform.request( + '/provision', + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ userId: 'user-1', provider: 'fly' }), + }, + env + ); + + expect(response.status).toBe(201); + expect(repairCompletedProvision).toHaveBeenCalledOnce(); + }); + + it('returns a pending-finalization response when completion and repair both fail', async () => { + const { env, completeFreshProvision, repairCompletedProvision } = makeEnv(); + mockGetWorkerDb.mockReturnValue(createWorkerDb()); + mockBootstrapProvisionedSubscriptionWithFallback.mockResolvedValueOnce({ mode: 'rpc' }); + completeFreshProvision.mockRejectedValueOnce(new Error('completion unavailable')); + repairCompletedProvision.mockRejectedValueOnce(new Error('repair unavailable')); + + const response = await platform.request( + '/provision', + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ userId: 'user-1', provider: 'fly' }), + }, + env + ); + + expect(response.status).toBe(503); + await expect(response.json()).resolves.toMatchObject({ code: 'provision_completion_pending' }); + }); + + it('returns an error and authorizes release after Postgres quarantine succeeds', async () => { + const { env, destroy, failFreshProvision, allowProvisionReservationReleaseOnFinalize } = + makeEnv(); const workerDb = createWorkerDb(); mockGetWorkerDb.mockReturnValue(workerDb); mockBootstrapProvisionedSubscriptionWithFallback.mockRejectedValueOnce( @@ -323,6 +602,35 @@ describe('platform provision bootstrap quarantine', () => { expect(eventCall?.[1]?.userId).toBe('user-1'); expect(typeof eventCall?.[1]?.instanceId).toBe('string'); expect(eventCall?.[1]?.instanceId?.length).toBeGreaterThan(0); + expect(allowProvisionReservationReleaseOnFinalize).toHaveBeenCalledOnce(); + expect(failFreshProvision).not.toHaveBeenCalled(); + }); + + it('delegates finalized bootstrap cleanup release to the instance DO', async () => { + const { env, destroy, allowProvisionReservationReleaseOnFinalize, failFreshProvision } = + makeEnv(); + mockGetWorkerDb.mockReturnValue(createWorkerDb()); + destroy.mockResolvedValueOnce({ finalized: true }); + mockBootstrapProvisionedSubscriptionWithFallback.mockRejectedValueOnce( + new BootstrapProvisionFallbackError({ + rpcError: new Error('rpc down'), + fallbackError: new Error('fallback down'), + }) + ); + + const response = await platform.request( + '/provision', + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ userId: 'user-1', provider: 'fly' }), + }, + env + ); + + expect(response.status).toBe(500); + expect(allowProvisionReservationReleaseOnFinalize).toHaveBeenCalledOnce(); + expect(failFreshProvision).not.toHaveBeenCalled(); }); it('surfaces bootstrap-time organization entitlement loss and tears down new infrastructure', async () => { @@ -636,11 +944,20 @@ describe('platform /provision: instanceType defaulting', () => { // volumeSizeGb when `config.instanceType` is undefined; defaulting to // perf-1-3 unconditionally would silently overwrite custom (e.g. // extend-volume) and legacy tiers on the next config change. - const { env, provision } = makeEnv(); - mockGetWorkerDb.mockReturnValue(createWorkerDb()); + const { env, provision, beginFreshProvision, repairCompletedProvision } = makeEnv(); + const existingInstanceId = '11111111-1111-4111-8111-111111111111'; + mockGetWorkerDb.mockReturnValue( + createWorkerDb({ + existingActiveInstance: { + id: existingInstanceId, + sandboxId: 'ki_11111111111141118111111111111111', + organizationId: null, + }, + hasSubscription: true, + }) + ); mockBootstrapProvisionedSubscriptionWithFallback.mockResolvedValueOnce({ mode: 'rpc' }); - const existingInstanceId = '11111111-1111-4111-8111-111111111111'; const response = await platform.request( '/provision', { @@ -658,13 +975,175 @@ describe('platform /provision: instanceType defaulting', () => { expect(response.status).toBe(201); expect(provision).toHaveBeenCalledTimes(1); + expect(beginFreshProvision).not.toHaveBeenCalled(); + expect(repairCompletedProvision).toHaveBeenCalledWith( + 'user:user-1', + 'user-1', + existingInstanceId, + expect.any(String) + ); const provisionConfig = provision.mock.calls[0][1] as Record; expect(provisionConfig.instanceType).toBeUndefined(); }); + it('fails closed before mutation when an existing-instance repair cannot be confirmed', async () => { + const { env, repairCompletedProvision, provision } = makeEnv(); + mockGetWorkerDb.mockReturnValue( + createWorkerDb({ + existingActiveInstance: { + id: '11111111-1111-4111-8111-111111111111', + sandboxId: 'ki_11111111111141118111111111111111', + organizationId: null, + }, + hasSubscription: true, + }) + ); + mockBootstrapProvisionedSubscriptionWithFallback.mockResolvedValueOnce({ mode: 'rpc' }); + repairCompletedProvision.mockRejectedValueOnce(new Error('registry unavailable')); + + const response = await platform.request( + '/provision', + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + userId: 'user-1', + instanceId: '11111111-1111-4111-8111-111111111111', + }), + }, + env + ); + + expect(response.status).toBe(503); + await expect(response.json()).resolves.toMatchObject({ code: 'provision_completion_pending' }); + expect(provision).not.toHaveBeenCalled(); + }); + + it('does not republish an existing instance after destroy wins during its update', async () => { + const { env, provision, createInstance } = makeEnv(); + const existing = { + id: '11111111-1111-4111-8111-111111111111', + sandboxId: 'ki_11111111111141118111111111111111', + organizationId: null, + }; + mockGetWorkerDb.mockReturnValue( + createWorkerDb({ activeInstanceReads: [existing, null], hasSubscription: true }) + ); + + const response = await platform.request( + '/provision', + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ userId: 'user-1', instanceId: existing.id }), + }, + env + ); + + expect(response.status).toBe(409); + await expect(response.json()).resolves.toMatchObject({ code: 'instance_destroyed' }); + expect(provision).toHaveBeenCalledOnce(); + expect(createInstance).not.toHaveBeenCalled(); + }); + + it('rejects an arbitrary instanceId before provider work', async () => { + const { env, provision, beginFreshProvision } = makeEnv(); + mockGetWorkerDb.mockReturnValue(createWorkerDb()); + + const response = await platform.request( + '/provision', + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + userId: 'user-1', + instanceId: '11111111-1111-4111-8111-111111111111', + }), + }, + env + ); + + expect(response.status).toBe(404); + await expect(response.json()).resolves.toMatchObject({ code: 'instance_not_found' }); + expect(provision).not.toHaveBeenCalled(); + expect(beginFreshProvision).not.toHaveBeenCalled(); + }); + + it('rejects an arbitrary organization instanceId before provider work', async () => { + const { env, provision, beginFreshProvision } = makeEnv(); + mockGetWorkerDb.mockReturnValue(createWorkerDb()); + + const response = await platform.request( + '/provision', + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + userId: 'user-1', + orgId: '22222222-2222-4222-8222-222222222222', + instanceId: '11111111-1111-4111-8111-111111111111', + }), + }, + env + ); + + expect(response.status).toBe(404); + await expect(response.json()).resolves.toMatchObject({ code: 'instance_not_found' }); + expect(provision).not.toHaveBeenCalled(); + expect(beginFreshProvision).not.toHaveBeenCalled(); + }); + + it('recovers an unpaired explicit instance only through subscription bootstrap', async () => { + const { env, provision, repairCompletedProvision } = makeEnv(); + repairCompletedProvision.mockResolvedValueOnce(true); + mockGetWorkerDb.mockReturnValue( + createWorkerDb({ + existingActiveInstance: { + id: '11111111-1111-4111-8111-111111111111', + sandboxId: 'ki_11111111111141118111111111111111', + organizationId: null, + }, + }) + ); + mockBootstrapProvisionedSubscriptionWithFallback.mockResolvedValueOnce({ mode: 'rpc' }); + + const response = await platform.request( + '/provision', + { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + userId: 'user-1', + instanceId: '11111111-1111-4111-8111-111111111111', + bootstrapSubscription: true, + }), + }, + env + ); + + expect(response.status).toBe(201); + expect(provision).toHaveBeenCalledOnce(); + expect(mockBootstrapProvisionedSubscriptionWithFallback).toHaveBeenCalledOnce(); + expect(repairCompletedProvision).toHaveBeenCalledWith( + 'user:user-1', + 'user-1', + '11111111-1111-4111-8111-111111111111', + expect.any(String) + ); + }); + it('honors caller-supplied instanceType on RE-PROVISION', async () => { const { env, provision } = makeEnv(); - mockGetWorkerDb.mockReturnValue(createWorkerDb()); + mockGetWorkerDb.mockReturnValue( + createWorkerDb({ + existingActiveInstance: { + id: '11111111-1111-4111-8111-111111111111', + sandboxId: 'ki_11111111111141118111111111111111', + organizationId: null, + }, + hasSubscription: true, + }) + ); mockBootstrapProvisionedSubscriptionWithFallback.mockResolvedValueOnce({ mode: 'rpc' }); const existingInstanceId = '11111111-1111-4111-8111-111111111111'; diff --git a/services/kiloclaw/src/routes/platform-sanitize-error.test.ts b/services/kiloclaw/src/routes/platform-sanitize-error.test.ts index d5481071d9..20650d945d 100644 --- a/services/kiloclaw/src/routes/platform-sanitize-error.test.ts +++ b/services/kiloclaw/src/routes/platform-sanitize-error.test.ts @@ -1,5 +1,6 @@ import { afterEach, describe, expect, it, vi } from 'vitest'; import type { KiloClawEnv } from '../types'; +import * as DbModule from '../db'; import type { InstanceMutableState } from '../durable-objects/kiloclaw-instance/types'; import { cancelDoctorViaController, @@ -35,6 +36,11 @@ function envWithDOError(error: Error, writeDataPoint = vi.fn()) { ), }, KILOCLAW_AE: { writeDataPoint }, + HYPERDRIVE: { connectionString: 'postgresql://fake' }, + KILOCLAW_REGISTRY: { + idFromName: (id: string) => id, + get: () => ({ repairCompletedProvision: vi.fn().mockResolvedValue(false) }), + }, KILOCLAW_BILLING: { resolveProvisionEntitlement: vi.fn().mockResolvedValue({ priceVersion: '2026-05-10', @@ -162,6 +168,25 @@ describe('sanitizeError: Instance-not-* status correction', () => { const err = new Error('Fly API allocateIP failed (500): upstream'); const writeDataPoint = vi.fn(); const env = envWithDOError(err, writeDataPoint); + vi.spyOn(DbModule, 'getWorkerDb').mockReturnValue({ + select: () => ({ + from: () => ({ + where: () => ({ + orderBy: () => ({ + limit: () => + Promise.resolve([ + { + id: '11111111-1111-4111-8111-111111111111', + sandbox_id: 'ki_11111111111141118111111111111111', + organization_id: null, + }, + ]), + }), + limit: () => Promise.resolve([{ id: 'subscription-1' }]), + }), + }), + }), + } as never); const consoleSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); const resp = await platform.request( @@ -169,7 +194,10 @@ describe('sanitizeError: Instance-not-* status correction', () => { { method: 'POST', headers: { 'content-type': 'application/json' }, - body: JSON.stringify({ userId: 'user-1' }), + body: JSON.stringify({ + userId: 'user-1', + instanceId: '11111111-1111-4111-8111-111111111111', + }), }, env ); diff --git a/services/kiloclaw/src/routes/platform.ts b/services/kiloclaw/src/routes/platform.ts index 45c8918f61..c4dc5477cf 100644 --- a/services/kiloclaw/src/routes/platform.ts +++ b/services/kiloclaw/src/routes/platform.ts @@ -80,7 +80,14 @@ import { } from '../providers/rollout'; import type { ProviderId } from '../schemas/instance-config'; import { doKeyFromActiveInstance, resolveDoKeyForUser } from '../lib/instance-routing'; -import { getInstanceById, getInstanceByIdIncludingDestroyed, getWorkerDb } from '../db'; +import { + getActiveOrganizationInstance, + getActivePersonalInstance, + getInstanceById, + getInstanceByIdIncludingDestroyed, + getWorkerDb, + hasSubscriptionForInstance, +} from '../db'; import { and, eq, isNull, sql } from 'drizzle-orm'; import { alias } from 'drizzle-orm/pg-core'; import { volumeNameFromSandboxId } from '../durable-objects/machine-config'; @@ -120,6 +127,12 @@ const WebSearchConfigPatchSchema = z.object({ exaMode: z.enum(['kilo-proxy', 'disabled']).nullable().optional(), }); +const ProvisionReservationRepairSchema = z.object({ + userId: z.string().min(1), + instanceId: z.uuid(), + orgId: z.uuid().nullable().optional(), +}); + const KILOCLAW_WORKER_DESTROY_ACTOR = { actorType: 'system', actorId: 'kiloclaw-worker', @@ -418,6 +431,44 @@ function buildDefaultInboundEmailAlias(instanceId: string): string { return `claw-${instanceId.replaceAll('-', '')}`; } +function provisionRegistryKey(userId: string, orgId: string | null | undefined): string { + return orgId ? `org:${orgId}` : `user:${userId}`; +} + +function getProvisionRegistryStub( + env: AppEnv['Bindings'], + userId: string, + orgId: string | null | undefined +) { + const registryKey = provisionRegistryKey(userId, orgId); + return { + registryKey, + stub: env.KILOCLAW_REGISTRY.get(env.KILOCLAW_REGISTRY.idFromName(registryKey)), + }; +} + +async function getActiveProvisionContextInstance( + env: AppEnv['Bindings'], + userId: string, + orgId: string | null | undefined +) { + const connectionString = env.HYPERDRIVE?.connectionString; + if (!connectionString) throw new Error('HYPERDRIVE is not configured'); + const db = getWorkerDb(connectionString); + return orgId + ? await getActiveOrganizationInstance(db, userId, orgId) + : await getActivePersonalInstance(db, userId); +} + +async function hasCanonicalProvisionSubscription( + env: AppEnv['Bindings'], + instanceId: string +): Promise { + const connectionString = env.HYPERDRIVE?.connectionString; + if (!connectionString) throw new Error('HYPERDRIVE is not configured'); + return await hasSubscriptionForInstance(getWorkerDb(connectionString), instanceId); +} + function isWithinSelfServiceEntitlement( requestedTier: InstanceTierKey, entitlementTier: InstanceTierKey @@ -1115,6 +1166,10 @@ platform.post('/provision', async c => { const provisionRoute = '/api/platform/provision'; const provisionDoKey = await resolveInstanceDoKey(c.env, userId, provisionedInstanceId); const provisionStartedAt = performance.now(); + let provisionRegistry: ReturnType | null = null; + let freshReservationAdmitted = false; + let freshProviderWorkStarted = false; + let explicitInstanceRequiresSubscriptionBootstrap = false; let selectedProvider = provider; if (!selectedProvider && shouldInsertInstanceRecord) { @@ -1134,6 +1189,41 @@ platform.post('/provision', async c => { let instanceType: InstanceTierKey | undefined; let provision: Awaited>; try { + if (instanceId) { + const activeInstance = await getActiveProvisionContextInstance(c.env, userId, orgId); + if (activeInstance?.id !== instanceId) { + return jsonError('Active instance not found', 404, 'instance_not_found'); + } + if (await hasCanonicalProvisionSubscription(c.env, instanceId)) { + const { registryKey, stub } = getProvisionRegistryStub(c.env, userId, orgId); + try { + await stub.repairCompletedProvision( + registryKey, + userId, + instanceId, + doKeyFromActiveInstance(activeInstance) + ); + } catch (repairError) { + console.error( + '[platform] Failed to repair existing provision before update:', + repairError + ); + return jsonError( + 'Provisioning completed but finalization is pending', + 503, + 'provision_completion_pending' + ); + } + } else if (bootstrapSubscription === true) { + explicitInstanceRequiresSubscriptionBootstrap = true; + } else { + return jsonError( + 'Provisioning completed but subscription finalization is pending', + 503, + 'provision_completion_pending' + ); + } + } if (selectedProvider) { assertAvailableProvider(c.env, selectedProvider); } @@ -1167,6 +1257,99 @@ platform.post('/provision', async c => { ) { return c.json({ error: 'instanceType exceeds self-service entitlement' }, 400); } + + provisionRegistry = getProvisionRegistryStub(c.env, userId, orgId); + const admission = await provisionRegistry.stub.beginFreshProvision( + provisionRegistry.registryKey, + userId, + provisionedInstanceId, + provisionDoKey + ); + if (admission.outcome === 'conflict') { + writeEvent(c.env, { + event: 'instance.provision_reservation_conflict', + delivery: 'http', + route: provisionRoute, + userId, + instanceId: admission.reservation.instanceId, + orgId: orgId ?? undefined, + label: admission.reservation.status, + }); + const activeInstance = await getActiveProvisionContextInstance(c.env, userId, orgId); + if ( + activeInstance?.id === admission.reservation.instanceId && + (await hasCanonicalProvisionSubscription(c.env, activeInstance.id)) + ) { + try { + await provisionRegistry.stub.repairCompletedProvision( + provisionRegistry.registryKey, + userId, + activeInstance.id, + doKeyFromActiveInstance(activeInstance) + ); + writeEvent(c.env, { + event: 'instance.provision_reservation_repaired', + delivery: 'http', + route: provisionRoute, + userId, + instanceId: activeInstance.id, + orgId: orgId ?? undefined, + }); + return jsonError('User already has an active instance', 409, 'instance_already_active'); + } catch (repairError) { + console.error( + '[platform] Failed to repair completed provision reservation:', + repairError + ); + return jsonError( + 'Provisioning completed but finalization is pending', + 503, + 'provision_completion_pending' + ); + } + } + return jsonError( + 'An instance is already being created. Wait for setup to finish, then try again.', + 409, + 'provision_in_progress' + ); + } + freshReservationAdmitted = true; + writeEvent(c.env, { + event: 'instance.provision_reservation_started', + delivery: 'http', + route: provisionRoute, + userId, + instanceId: provisionedInstanceId, + orgId: orgId ?? undefined, + }); + + const activeInstance = await getActiveProvisionContextInstance(c.env, userId, orgId); + if (activeInstance) { + if (await hasCanonicalProvisionSubscription(c.env, activeInstance.id)) { + const repaired = await provisionRegistry.stub.repairCompletedProvision( + provisionRegistry.registryKey, + userId, + activeInstance.id, + doKeyFromActiveInstance(activeInstance) + ); + if (!repaired) { + await provisionRegistry.stub.createInstance( + provisionRegistry.registryKey, + userId, + activeInstance.id, + doKeyFromActiveInstance(activeInstance) + ); + } + } + await provisionRegistry.stub.releaseFreshProvision( + provisionRegistry.registryKey, + userId, + provisionedInstanceId, + 'active_instance_exists' + ); + return jsonError('User already has an active instance', 409, 'instance_already_active'); + } } // Only default to the billing entitlement tier on FRESH inserts. On // re-provision (config updates with an existing instanceId), pass @@ -1178,6 +1361,7 @@ platform.post('/provision', async c => { instanceType = requestedInstanceType ?? (shouldInsertInstanceRecord ? provisionEntitlement?.selfServiceInstanceType : undefined); + freshProviderWorkStarted = shouldInsertInstanceRecord; provision = await withResolvedDORetry( c.env, userId, @@ -1198,11 +1382,47 @@ platform.post('/provision', async c => { region, pinnedImageTag, }, - { instanceId: provisionedInstanceId, orgId, provider: selectedProvider } + { + instanceId: provisionedInstanceId, + orgId, + provider: selectedProvider, + freshProvision: shouldInsertInstanceRecord, + } ), 'provision' ); + if (instanceId) { + const activeAfterProvision = await getActiveProvisionContextInstance(c.env, userId, orgId); + if (activeAfterProvision?.id !== instanceId) { + return jsonError('Instance was destroyed during update', 409, 'instance_destroyed'); + } + } } catch (err) { + if (freshReservationAdmitted && provisionRegistry) { + try { + if (freshProviderWorkStarted) { + await provisionRegistry.stub.failFreshProvision( + provisionRegistry.registryKey, + userId, + provisionedInstanceId, + 'provider_provision_failed' + ); + } else { + await provisionRegistry.stub.releaseFreshProvision( + provisionRegistry.registryKey, + userId, + provisionedInstanceId, + 'failed_before_provider_work' + ); + } + } catch (reservationError) { + console.error('[platform] Failed to update fresh provision reservation after error:', { + instanceId: provisionedInstanceId, + error: + reservationError instanceof Error ? reservationError.message : String(reservationError), + }); + } + } const raw = err instanceof Error ? err.message : 'Unknown error'; if (raw.includes('duplicate key') || raw.includes('unique constraint')) { console.error('[platform] provision failed: duplicate instance'); @@ -1278,16 +1498,48 @@ platform.post('/provision', async c => { '[platform] Failed to destroy provisioned instance after bootstrap error:', destroyErr ); + return null; }); + let instanceMarkedDestroyed = true; await markProvisionedInstanceDestroyed({ env: c.env, instanceId: provisionedInstanceId, }).catch(markErr => { + instanceMarkedDestroyed = false; console.error( '[platform] Failed to mark instance destroyed after bootstrap error:', markErr ); }); + if (instanceMarkedDestroyed) { + await withResolvedDORetry( + c.env, + userId, + provisionedInstanceId, + stub => stub.allowProvisionReservationReleaseOnFinalize(), + 'allowProvisionReservationReleaseOnFinalize' + ).catch(releaseSignalError => { + console.error( + '[platform] Failed to confirm reservation cleanup release; DO will retry after Postgres confirmation:', + releaseSignalError + ); + }); + } + if (provisionRegistry && !instanceMarkedDestroyed) { + await provisionRegistry.stub + .failFreshProvision( + provisionRegistry.registryKey, + userId, + provisionedInstanceId, + 'instance_record_insert_failed' + ) + .catch(reservationError => { + console.error( + '[platform] Failed to finalize failed provision reservation:', + reservationError + ); + }); + } return jsonError(message, status); } } @@ -1388,16 +1640,48 @@ platform.post('/provision', async c => { '[platform] Failed to destroy provisioned instance after subscription bootstrap error:', destroyErr ); + return null; }); + let instanceMarkedDestroyed = true; await markProvisionedInstanceDestroyed({ env: c.env, instanceId: provisionedInstanceId, }).catch(markErr => { + instanceMarkedDestroyed = false; console.error( '[platform] Failed to mark bootstrap-quarantined instance destroyed for retry:', markErr ); }); + if (instanceMarkedDestroyed) { + await withResolvedDORetry( + c.env, + userId, + provisionedInstanceId, + stub => stub.allowProvisionReservationReleaseOnFinalize(), + 'allowProvisionReservationReleaseOnFinalize' + ).catch(releaseSignalError => { + console.error( + '[platform] Failed to confirm reservation cleanup release; DO will retry after Postgres confirmation:', + releaseSignalError + ); + }); + } + if (provisionRegistry && !instanceMarkedDestroyed) { + await provisionRegistry.stub + .failFreshProvision( + provisionRegistry.registryKey, + userId, + provisionedInstanceId, + 'subscription_bootstrap_failed' + ) + .catch(reservationError => { + console.error( + '[platform] Failed to finalize failed provision reservation:', + reservationError + ); + }); + } } console.error( '[platform] Subscription bootstrap failed after local fallback; instance quarantined for remediation', @@ -1414,19 +1698,91 @@ platform.post('/provision', async c => { } } - try { - const registryKey = orgId ? `org:${orgId}` : `user:${userId}`; - const registryStub = c.env.KILOCLAW_REGISTRY.get( - c.env.KILOCLAW_REGISTRY.idFromName(registryKey) - ); - await registryStub.createInstance(registryKey, userId, provisionedInstanceId, provisionDoKey); - console.log('[platform] Registry entry created:', { - registryKey, - instanceId: provisionedInstanceId, - doKey: provisionDoKey, - }); - } catch (registryErr) { - console.error('[platform] Registry create failed (non-fatal):', registryErr); + if (shouldInsertInstanceRecord && provisionRegistry) { + try { + await provisionRegistry.stub.completeFreshProvision( + provisionRegistry.registryKey, + userId, + provisionedInstanceId, + provisionDoKey + ); + writeEvent(c.env, { + event: 'instance.provision_reservation_completed', + delivery: 'http', + route: provisionRoute, + userId, + instanceId: provisionedInstanceId, + orgId: orgId ?? undefined, + }); + } catch (registryErr) { + console.error('[platform] Registry completion failed; attempting repair:', registryErr); + writeEvent(c.env, { + event: 'instance.provision_reservation_repair_required', + delivery: 'http', + route: provisionRoute, + userId, + instanceId: provisionedInstanceId, + orgId: orgId ?? undefined, + }); + try { + const repaired = await provisionRegistry.stub.repairCompletedProvision( + provisionRegistry.registryKey, + userId, + provisionedInstanceId, + provisionDoKey + ); + if (!repaired) throw new Error('Provision reservation missing during completion repair'); + writeEvent(c.env, { + event: 'instance.provision_reservation_repaired', + delivery: 'http', + route: provisionRoute, + userId, + instanceId: provisionedInstanceId, + orgId: orgId ?? undefined, + }); + } catch (repairError) { + console.error('[platform] Registry completion repair failed:', repairError); + return jsonError( + 'Provisioning completed but finalization is pending', + 503, + 'provision_completion_pending' + ); + } + } + } else if (explicitInstanceRequiresSubscriptionBootstrap) { + try { + const registryKey = provisionRegistryKey(userId, orgId); + const registryStub = c.env.KILOCLAW_REGISTRY.get( + c.env.KILOCLAW_REGISTRY.idFromName(registryKey) + ); + const repaired = await registryStub.repairCompletedProvision( + registryKey, + userId, + provisionedInstanceId, + provisionDoKey + ); + if (!repaired) { + const published = await registryStub.publishRecoveredInstance( + registryKey, + userId, + provisionedInstanceId, + provisionDoKey + ); + if (!published) { + return jsonError('Instance was destroyed during update', 409, 'instance_destroyed'); + } + } + } catch (registryErr) { + console.error( + '[platform] Registry completion failed after subscription recovery:', + registryErr + ); + return jsonError( + 'Provisioning completed but finalization is pending', + 503, + 'provision_completion_pending' + ); + } } return c.json( @@ -1438,7 +1794,54 @@ platform.post('/provision', async c => { ); }); +platform.post('/provision/repair-reservation', async c => { + const result = await parseBody(c, ProvisionReservationRepairSchema); + if ('error' in result) return result.error; + const { userId, instanceId, orgId } = result.data; + + try { + const activeInstance = await getActiveProvisionContextInstance(c.env, userId, orgId); + if ( + activeInstance?.id !== instanceId || + !(await hasCanonicalProvisionSubscription(c.env, instanceId)) + ) { + return jsonError( + 'No completed active provision exists for repair', + 409, + 'provision_repair_unavailable' + ); + } + const { registryKey, stub } = getProvisionRegistryStub(c.env, userId, orgId); + const repaired = await stub.repairCompletedProvision( + registryKey, + userId, + instanceId, + doKeyFromActiveInstance(activeInstance) + ); + if (!repaired) { + return jsonError( + 'No provision reservation exists for repair', + 409, + 'provision_repair_unavailable' + ); + } + writeEvent(c.env, { + event: 'instance.provision_reservation_repaired', + delivery: 'http', + route: '/api/platform/provision/repair-reservation', + userId, + instanceId, + orgId: orgId ?? undefined, + }); + return c.json({ ok: true }); + } catch (error) { + const { message, status } = sanitizeError(error, 'provision reservation repair'); + return jsonError(message, status); + } +}); + // PATCH /api/platform/kilocode-config + platform.patch('/kilocode-config', async c => { const result = await parseBody(c, KiloCodeConfigPatchSchema); if ('error' in result) return result.error; @@ -3711,6 +4114,17 @@ platform.get('/registry-entries', async c => { createdAt: string; destroyedAt: string | null; }>; + reservations: Array<{ + instanceId: string; + doKey: string; + assignedUserId: string; + status: string; + startedAt: string; + updatedAt: string; + completedAt: string | null; + failureCode: string | null; + resolutionReason: string | null; + }>; migrated: boolean; }> = [];