Skip to content

Commit b99367d

Browse files
committed
fix(billing): notify table limits post-commit so a rolled-back insert never emails or burns the claim
1 parent 99177d9 commit b99367d

7 files changed

Lines changed: 84 additions & 28 deletions

File tree

apps/sim/lib/table/__tests__/update-row.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ vi.mock('@sim/db', () => dbChainMock)
1919
// Capacity is exercised in billing.test.ts; here it's a no-op so the timeout-scaling
2020
// suites can use large synthetic row counts without tripping the plan limit.
2121
vi.mock('@/lib/table/billing', () => ({
22-
assertRowCapacity: vi.fn().mockResolvedValue(undefined),
22+
assertRowCapacity: vi.fn().mockResolvedValue(1_000_000),
23+
notifyTableRowUsage: vi.fn(),
2324
getMaxRowsPerTable: vi.fn().mockResolvedValue(1_000_000),
2425
wouldExceedRowLimit: () => false,
2526
TableRowLimitError: class TableRowLimitError extends Error {},

apps/sim/lib/table/billing.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,16 +120,16 @@ describe('wouldExceedRowLimit', () => {
120120
})
121121

122122
describe('assertRowCapacity', () => {
123-
it('passes when the write stays under the plan limit', async () => {
123+
it('returns the resolved limit when the write stays under it', async () => {
124124
await expect(
125125
assertRowCapacity({ workspaceId: nextWorkspaceId(), currentRowCount: 10, addedRows: 5 })
126-
).resolves.toBeUndefined()
126+
).resolves.toBe(5000)
127127
})
128128

129-
it('allows reaching the limit exactly', async () => {
129+
it('allows reaching the limit exactly and returns it', async () => {
130130
await expect(
131131
assertRowCapacity({ workspaceId: nextWorkspaceId(), currentRowCount: 4999, addedRows: 1 })
132-
).resolves.toBeUndefined()
132+
).resolves.toBe(5000)
133133
})
134134

135135
it('throws TableRowLimitError when the write would exceed the limit', async () => {
@@ -155,6 +155,6 @@ describe('assertRowCapacity', () => {
155155
currentRowCount: 10_000_000,
156156
addedRows: 1,
157157
})
158-
).resolves.toBeUndefined()
158+
).resolves.toBe(-1)
159159
})
160160
})

apps/sim/lib/table/billing.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -191,25 +191,23 @@ export function wouldExceedRowLimit(
191191
* connection (and locks) risks pool starvation. Callers already inside a tx should
192192
* fetch the limit up front and use {@link wouldExceedRowLimit} instead.
193193
*
194+
* Pure check (no side effects): returns the resolved limit so callers can fire
195+
* {@link notifyTableRowUsage} AFTER their insert commits — a pre-commit notify
196+
* would email (and burn the dedup claim) for a write that later rolls back.
197+
*
198+
* @returns the resolved plan row limit (-1 for unlimited)
194199
* @throws {TableRowLimitError} if `currentRowCount + addedRows` exceeds the limit
195200
*/
196201
export async function assertRowCapacity(params: {
197202
workspaceId: string
198203
currentRowCount: number
199204
addedRows: number
200-
}): Promise<void> {
205+
}): Promise<number> {
201206
const limit = await getMaxRowsPerTable(params.workspaceId)
202207
if (wouldExceedRowLimit(limit, params.currentRowCount, params.addedRows)) {
203208
throw new TableRowLimitError(limit)
204209
}
205-
206-
// Accepted write: warn (best-effort) once the table crosses the notify band.
207-
notifyTableRowUsage({
208-
workspaceId: params.workspaceId,
209-
currentRowCount: params.currentRowCount,
210-
addedRows: params.addedRows,
211-
limit,
212-
})
210+
return limit
213211
}
214212

215213
/**

apps/sim/lib/table/import-data.ts

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { userTableDefinitions, userTableRows } from '@sim/db/schema'
99
import { createLogger } from '@sim/logger'
1010
import { generateId } from '@sim/utils/id'
1111
import { eq } from 'drizzle-orm'
12-
import { assertRowCapacity } from '@/lib/table/billing'
12+
import { assertRowCapacity, notifyTableRowUsage } from '@/lib/table/billing'
1313
import { CSV_MAX_BATCH_SIZE } from '@/lib/table/import'
1414
import { nKeysBetween } from '@/lib/table/order-key'
1515
import { acquireRowOrderLock } from '@/lib/table/rows/ordering'
@@ -150,12 +150,12 @@ export async function importAppendRows(
150150
ctx: { workspaceId: string; userId?: string; requestId: string }
151151
): Promise<{ inserted: TableRow[]; table: TableDefinition }> {
152152
// Gate capacity before opening the tx — the lookup is a separate pool read.
153-
await assertRowCapacity({
153+
const rowLimit = await assertRowCapacity({
154154
workspaceId: ctx.workspaceId,
155155
currentRowCount: table.rowCount,
156156
addedRows: rows.length,
157157
})
158-
return db.transaction(async (trx) => {
158+
const result = await db.transaction(async (trx) => {
159159
let working = table
160160
if (additions.length > 0) {
161161
// Take the row-order lock before creating columns so this path uses the
@@ -179,6 +179,14 @@ export async function importAppendRows(
179179
}
180180
return { inserted, table: working }
181181
})
182+
// Post-commit: a pre-commit notify would email/burn the claim for a rolled-back import.
183+
notifyTableRowUsage({
184+
workspaceId: ctx.workspaceId,
185+
currentRowCount: table.rowCount,
186+
addedRows: result.inserted.length,
187+
limit: rowLimit,
188+
})
189+
return result
182190
}
183191

184192
/**
@@ -193,12 +201,12 @@ export async function importReplaceRows(
193201
): Promise<ReplaceRowsResult> {
194202
// Replace deletes all existing rows, so the footprint is just the new set. Gate
195203
// before opening the tx — the plan lookup is a separate pool read.
196-
await assertRowCapacity({
204+
const rowLimit = await assertRowCapacity({
197205
workspaceId: data.workspaceId,
198206
currentRowCount: 0,
199207
addedRows: data.rows.length,
200208
})
201-
return db.transaction(async (trx) => {
209+
const result = await db.transaction(async (trx) => {
202210
let working = table
203211
if (additions.length > 0) {
204212
await acquireRowOrderLock(trx, table.id)
@@ -211,4 +219,12 @@ export async function importReplaceRows(
211219
requestId
212220
)
213221
})
222+
// Post-commit: footprint is the new set (prior rows deleted → prior count 0).
223+
notifyTableRowUsage({
224+
workspaceId: data.workspaceId,
225+
currentRowCount: 0,
226+
addedRows: result.insertedCount,
227+
limit: rowLimit,
228+
})
229+
return result
214230
}

apps/sim/lib/table/import-runner.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import {
1717
type TableSchema,
1818
validateMapping,
1919
} from '@/lib/table'
20-
import { assertRowCapacity } from '@/lib/table/billing'
20+
import { assertRowCapacity, notifyTableRowUsage } from '@/lib/table/billing'
2121
import { withGeneratedColumnIds } from '@/lib/table/column-keys'
2222
import { appendTableEvent } from '@/lib/table/events'
2323
import {
@@ -200,7 +200,7 @@ export async function runTableImport(payload: TableImportPayload): Promise<void>
200200
const owns = await updateJobProgress(tableId, inserted, importId)
201201
if (!owns) throw new ImportSupersededError()
202202
const coerced = coerceRowsForTable(rows, schema, headerToColumn)
203-
await assertRowCapacity({
203+
const rowLimit = await assertRowCapacity({
204204
workspaceId,
205205
currentRowCount: existingRowCount + inserted,
206206
addedRows: coerced.length,
@@ -217,6 +217,13 @@ export async function runTableImport(payload: TableImportPayload): Promise<void>
217217
{ ...table, schema },
218218
requestId
219219
)
220+
// Post-commit (per batch): pre-batch count as prior, actual inserted as the delta.
221+
notifyTableRowUsage({
222+
workspaceId,
223+
currentRowCount: existingRowCount + inserted,
224+
addedRows: result.inserted,
225+
limit: rowLimit,
226+
})
220227
inserted += result.inserted
221228
lastOrderKey = result.lastOrderKey
222229
// Emit after the first batch, then every interval, so the bar appears early without flooding.

apps/sim/lib/table/rows/service.ts

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ export async function insertRow(
123123
}
124124

125125
// Best-effort capacity check against the workspace's current plan limit.
126-
await assertRowCapacity({
126+
const rowLimit = await assertRowCapacity({
127127
workspaceId: table.workspaceId,
128128
currentRowCount: table.rowCount,
129129
addedRows: 1,
@@ -144,6 +144,14 @@ export async function insertRow(
144144
now,
145145
})
146146

147+
// Post-commit: a pre-commit notify would email/burn the dedup claim for a rolled-back insert.
148+
notifyTableRowUsage({
149+
workspaceId: table.workspaceId,
150+
currentRowCount: table.rowCount,
151+
addedRows: 1,
152+
limit: rowLimit,
153+
})
154+
147155
logger.info(`[${requestId}] Inserted row ${rowId} into table ${data.tableId}`)
148156

149157
const insertedRow: TableRow = {
@@ -194,13 +202,20 @@ export async function batchInsertRows(
194202
): Promise<TableRow[]> {
195203
// Best-effort capacity check against the workspace's current plan limit. Import
196204
// paths call `batchInsertRowsWithTx` directly and gate capacity up front instead.
197-
await assertRowCapacity({
205+
const rowLimit = await assertRowCapacity({
198206
workspaceId: table.workspaceId,
199207
currentRowCount: table.rowCount,
200208
addedRows: data.rows.length,
201209
})
202210

203211
const result = await db.transaction((trx) => batchInsertRowsWithTx(trx, data, table, requestId))
212+
// Post-commit: notify with the actual inserted count, so a rolled-back batch never emails.
213+
notifyTableRowUsage({
214+
workspaceId: table.workspaceId,
215+
currentRowCount: table.rowCount,
216+
addedRows: result.length,
217+
limit: rowLimit,
218+
})
204219
dispatchAfterBatchInsert(table, result, requestId, data.userId)
205220
return result
206221
}
@@ -349,12 +364,20 @@ export async function replaceTableRows(
349364
): Promise<ReplaceRowsResult> {
350365
// All existing rows are deleted, so the footprint is just the new set. Checked
351366
// before the tx opens — never inside it (the plan lookup is a separate pool read).
352-
await assertRowCapacity({
367+
const rowLimit = await assertRowCapacity({
353368
workspaceId: table.workspaceId,
354369
currentRowCount: 0,
355370
addedRows: data.rows.length,
356371
})
357-
return db.transaction((trx) => replaceTableRowsWithTx(trx, data, table, requestId))
372+
const result = await db.transaction((trx) => replaceTableRowsWithTx(trx, data, table, requestId))
373+
// Post-commit: footprint is the new set (prior rows deleted → prior count 0).
374+
notifyTableRowUsage({
375+
workspaceId: table.workspaceId,
376+
currentRowCount: 0,
377+
addedRows: result.insertedCount,
378+
limit: rowLimit,
379+
})
380+
return result
358381
}
359382

360383
/**

apps/sim/lib/table/service.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import { generateId } from '@sim/utils/id'
1515
import { and, count, eq, isNull, sql } from 'drizzle-orm'
1616
import { generateRestoreName } from '@/lib/core/utils/restore-name'
1717
import type { DbOrTx } from '@/lib/db/types'
18-
import { assertRowCapacity } from '@/lib/table/billing'
18+
import { assertRowCapacity, notifyTableRowUsage } from '@/lib/table/billing'
1919
import { generateColumnId, getColumnId, withGeneratedColumnIds } from '@/lib/table/column-keys'
2020
import { COLUMN_TYPES, NAME_PATTERN, TABLE_LIMITS } from '@/lib/table/constants'
2121
import { EMPTY_JOB_FIELDS, latestJobForTable, latestJobsForTables } from '@/lib/table/jobs/service'
@@ -285,8 +285,9 @@ export async function createTable(
285285
// Starter rows count against the plan too. Checked before the tx (the lookup is a
286286
// separate pool read) — a new table starts empty, so the footprint is just these.
287287
const initialRowCount = data.initialRowCount ?? 0
288+
let rowLimit: number | undefined
288289
if (initialRowCount > 0) {
289-
await assertRowCapacity({
290+
rowLimit = await assertRowCapacity({
290291
workspaceId: data.workspaceId,
291292
currentRowCount: 0,
292293
addedRows: initialRowCount,
@@ -369,6 +370,16 @@ export async function createTable(
369370
throw error
370371
}
371372

373+
// Post-commit: starter rows landed, so notify here (a rolled-back create never emails).
374+
if (initialRowCount > 0 && rowLimit !== undefined) {
375+
notifyTableRowUsage({
376+
workspaceId: data.workspaceId,
377+
currentRowCount: 0,
378+
addedRows: initialRowCount,
379+
limit: rowLimit,
380+
})
381+
}
382+
372383
logger.info(`[${requestId}] Created table ${tableId} in workspace ${data.workspaceId}`)
373384

374385
return {

0 commit comments

Comments
 (0)