diff --git a/apps/local/src/server/executor.ts b/apps/local/src/server/executor.ts index b453c3b66..787d2cd09 100644 --- a/apps/local/src/server/executor.ts +++ b/apps/local/src/server/executor.ts @@ -6,6 +6,7 @@ import { homedir } from "node:os"; import { basename, join } from "node:path"; import { Scope, ScopeId, collectTables, createExecutor, type AnyPlugin } from "@executor-js/sdk"; +import { withQueryContext } from "fumadb/query"; import { loadPluginsFromJsonc } from "@executor-js/config"; import executorConfig from "../../executor.config"; @@ -189,7 +190,9 @@ const importLegacySqliteIfNeeded = async (options: { const result = await importSqliteDataToFuma({ sqlitePath: storage.sqlitePath, markerPath: storage.importMarkerPath, - db: target.db, + target: withQueryContext(target.db, { + allowedScopeIds: new Set([scopeId]), + }), tables, scopeId, }); diff --git a/apps/local/src/server/sqlite-import.test.ts b/apps/local/src/server/sqlite-import.test.ts index eb83f06ba..30035f182 100644 --- a/apps/local/src/server/sqlite-import.test.ts +++ b/apps/local/src/server/sqlite-import.test.ts @@ -5,6 +5,7 @@ import { tmpdir } from "node:os"; import { join } from "node:path"; import { collectTables } from "@executor-js/sdk"; +import { withQueryContext } from "fumadb/query"; import { importSqliteDataToFuma } from "./sqlite-import"; import { createSqliteFumaDb, type SqliteFumaDb } from "./sqlite-fumadb"; @@ -81,10 +82,11 @@ describe("importSqliteDataToFuma", () => { path: join(workDir, "target.db"), }); + const scopedDb = withQueryContext(sqlite.db, { allowedScopeIds: new Set(["scope_a"]) }); const result = await importSqliteDataToFuma({ sqlitePath, markerPath, - db: sqlite.db, + target: scopedDb, tables, scopeId: "scope_a", }); @@ -96,7 +98,7 @@ describe("importSqliteDataToFuma", () => { expect(existsSync(sqlitePath)).toBe(false); expect(result.backupPath && existsSync(result.backupPath)).toBe(true); - const source = (await sqlite.db.findFirst("source", { + const source = (await scopedDb.findFirst("source", { where: (b) => b("id", "=", "src_1"), })) as Record; expect(source.scope_id).toBe("scope_a"); @@ -105,7 +107,7 @@ describe("importSqliteDataToFuma", () => { expect(source.can_edit).toBe(true); expect(source.created_at).toBeInstanceOf(Date); - const blob = (await sqlite.db.findFirst("blob", { + const blob = (await scopedDb.findFirst("blob", { where: (b) => b("id", "=", JSON.stringify(["scope_a/plugin", "spec"])), })) as Record; expect(blob.value).toBe("{}"); diff --git a/apps/local/src/server/sqlite-import.ts b/apps/local/src/server/sqlite-import.ts index d18d60a3c..f7e7bb242 100644 --- a/apps/local/src/server/sqlite-import.ts +++ b/apps/local/src/server/sqlite-import.ts @@ -6,7 +6,7 @@ import { dirname } from "node:path"; /* oxlint-disable executor/no-json-parse, executor/no-switch-statement, executor/no-try-catch-or-throw -- boundary: one-shot legacy SQLite importer normalizes unknown rows and wraps native sqlite failures */ -import type { AnyColumn, AnyTable, FumaDb, FumaTables } from "@executor-js/sdk"; +import { type AnyColumn, type AnyTable, type FumaTables } from "@executor-js/sdk"; type SqliteRow = Record; @@ -25,7 +25,7 @@ export class LocalSqliteImportError extends Data.TaggedError("LocalSqliteImportE export interface LocalSqliteImportOptions { readonly sqlitePath: string; readonly markerPath: string; - readonly db: FumaDb; + readonly target: ImportFumaDb; readonly tables: FumaTables; readonly scopeId: string; } @@ -181,7 +181,7 @@ export const importSqliteDataToFuma = async ( const importedTables: string[] = []; let importedRows = 0; - await (options.db as ImportFumaDb).transaction(async (db) => { + await options.target.transaction(async (db) => { for (const [tableKey, table] of Object.entries(options.tables)) { const tableName = table.names.sql; if (!tableExists(sqlite!, tableName)) continue; diff --git a/bun.lock b/bun.lock index 19accc320..6a4e32ef5 100644 --- a/bun.lock +++ b/bun.lock @@ -372,9 +372,11 @@ }, "devDependencies": { "@effect/vitest": "catalog:", + "@types/better-sqlite3": "^7.6.13", "@types/node": "catalog:", "@types/pg": "^8.20.0", "@types/semver": "^7.7.1", + "better-sqlite3": "^12.9.0", "tsup": "catalog:", "typescript": "catalog:", "vitest": "catalog:", diff --git a/packages/core/fumadb/package.json b/packages/core/fumadb/package.json index 79903dd1a..adf258bbf 100644 --- a/packages/core/fumadb/package.json +++ b/packages/core/fumadb/package.json @@ -86,9 +86,11 @@ }, "devDependencies": { "@effect/vitest": "catalog:", + "@types/better-sqlite3": "^7.6.13", "@types/node": "catalog:", "@types/pg": "^8.20.0", "@types/semver": "^7.7.1", + "better-sqlite3": "^12.9.0", "tsup": "catalog:", "typescript": "catalog:", "vitest": "catalog:" diff --git a/packages/core/fumadb/src/query/index.ts b/packages/core/fumadb/src/query/index.ts index 59c9164ea..b26ae9d5f 100644 --- a/packages/core/fumadb/src/query/index.ts +++ b/packages/core/fumadb/src/query/index.ts @@ -1,46 +1,34 @@ import type { AnySchema, AnyTable, - IdColumn, Relation, + TableColumnValues, + TableInsertValues, + TableUpdateValues, } from "../schema/create"; import type { Condition, ConditionBuilder } from "./condition-builder"; import type { ORMAdapter } from "./orm"; export type { Condition, ConditionBuilder } from "./condition-builder"; +export { withQueryContext } from "./orm"; +export type { + CompiledJoin, + ORMAdapter, + SimplifiedCountOptions, + SimplifyFindOptions, +} from "./orm"; export type AnySelectClause = SelectClause; export type SelectClause = true | (keyof T["columns"])[]; -type TableToColumnValues = { - [K in keyof T["columns"]]: T["columns"][K]["$out"]; -}; +export type TableToColumnValues = TableColumnValues; -type PickNullable = { - [P in keyof T as null extends T[P] ? P : never]: T[P]; -}; +export type TableToInsertValues = TableInsertValues; -type PickNotNullable = { - [P in keyof T as null extends T[P] ? never : P]: T[P]; -}; - -type TableToInsertValues = Partial< - PickNullable<{ - [K in keyof T["columns"]]: T["columns"][K]["$in"]; - }> -> & - PickNotNullable<{ - [K in keyof T["columns"]]: T["columns"][K]["$in"]; - }>; - -type TableToUpdateValues = { - [K in keyof T["columns"]]?: T["columns"][K] extends IdColumn - ? never - : T["columns"][K]["$in"]; -}; +export type TableToUpdateValues = TableUpdateValues; -type MainSelectResult< +export type MainSelectResult< S extends SelectClause, T extends AnyTable, > = S extends true @@ -70,7 +58,7 @@ export type JoinBuilder = { : never; }; -type SelectResult< +export type SelectResult< T extends AnyTable, JoinOut, Select extends SelectClause, @@ -113,7 +101,7 @@ export type FindManyOptions< : {}); export interface AbstractQuery { - internal: ORMAdapter; + internal: ORMAdapter; /** * The code in the transaction will receive a transaction query instance. diff --git a/packages/core/fumadb/src/query/orm/index.ts b/packages/core/fumadb/src/query/orm/index.ts index 6f46ae3e8..623a7ab26 100644 --- a/packages/core/fumadb/src/query/orm/index.ts +++ b/packages/core/fumadb/src/query/orm/index.ts @@ -1,4 +1,9 @@ -import type { AnyColumn, AnyRelation, AnySchema, AnyTable } from "../../schema"; +import type { + AnyColumn, + AnyRelation, + AnySchema, + AnyTable, +} from "../../schema"; import type { AbstractQuery, AnySelectClause, @@ -7,7 +12,7 @@ import type { JoinBuilder, OrderBy, } from ".."; -import { buildCondition, type Condition } from "../condition-builder"; +import { buildCondition, createBuilder, type Condition } from "../condition-builder"; export interface CompiledJoin { relation: AnyRelation; @@ -89,24 +94,155 @@ export type SimplifyFindOptions = Omit< join?: CompiledJoin[]; }; -export interface ORMAdapter { - tables: Record; +type WriteOperation = "create" | "update" | "upsert"; + +const mergePolicyCondition = ( + table: AnyTable, + where: Condition | undefined, + condition: Condition | boolean | void, +): Condition | undefined | false => { + if (condition === undefined || condition === true) return where; + if (condition === false) return false; + + const next = createBuilder(table.columns).and(where ?? true, condition); + if (next === true) return undefined; + if (next === false) return false; + return next; +}; + +const applyReadPolicies = async ( + table: AnyTable, + where: Condition | undefined, + context: unknown, +): Promise => { + let nextWhere = where; + + for (const policy of table.policies) { + const condition = await policy.onRead?.({ + where: nextWhere, + context, + builder: createBuilder(table.columns), + }); + const merged = mergePolicyCondition(table, nextWhere, condition); + if (merged === false) return false; + nextWhere = merged; + } + + return nextWhere; +}; + +const applyReadPoliciesToOptions = async ( + table: AnyTable, + options: SimplifyFindOptions, + context: unknown, +): Promise | false> => { + const where = await applyReadPolicies(table, options.where, context); + if (where === false) return false; + + let changed = where !== options.where; + const join: CompiledJoin[] | undefined = options.join ? [] : undefined; + + for (const entry of options.join ?? []) { + if (entry.options === false) { + join!.push(entry); + continue; + } + + const nextOptions = await applyReadPoliciesToOptions( + entry.relation.table, + entry.options, + context, + ); + if (nextOptions === false) { + join!.push({ ...entry, options: false }); + changed = true; + continue; + } + if (nextOptions !== entry.options) changed = true; + join!.push(nextOptions === entry.options ? entry : { ...entry, options: nextOptions }); + } + + return changed ? { ...options, where, join } : options; +}; + +const runCreatePolicies = async ( + table: AnyTable, + values: Record, + context: unknown, +): Promise => { + for (const policy of table.policies) { + await policy.onCreate?.({ values, context }); + } +}; + +const applyUpdatePolicies = async ( + table: AnyTable, + where: Condition | undefined, + set: Record, + context: unknown, + operation: Extract, +): Promise => { + let nextWhere = where; + + for (const policy of table.policies) { + const condition = await policy.onUpdate?.({ + where: nextWhere, + set, + context, + builder: createBuilder(table.columns), + operation, + }); + const merged = mergePolicyCondition(table, nextWhere, condition); + if (merged === false) return false; + nextWhere = merged; + } + + return nextWhere; +}; + +const applyDeletePolicies = async ( + table: AnyTable, + where: Condition | undefined, + context: unknown, +): Promise => { + let nextWhere = where; + + for (const policy of table.policies) { + const condition = await policy.onDelete?.({ + where: nextWhere, + context, + builder: createBuilder(table.columns), + }); + const merged = mergePolicyCondition(table, nextWhere, condition); + if (merged === false) return false; + nextWhere = merged; + } + + return nextWhere; +}; + +export interface ORMAdapter { + tables: S["tables"]; + context?: unknown; count: (table: AnyTable, v: SimplifiedCountOptions) => Promise; findFirst: ( - table: AnyTable, - v: SimplifyFindOptions,) => Promise | null>; + table: AnyTable, + v: SimplifyFindOptions, + ) => Promise | null>; findMany: ( - table: AnyTable, - v: SimplifyFindOptions,) => Promise[]>; + table: AnyTable, + v: SimplifyFindOptions, + ) => Promise[]>; updateMany: ( - table: AnyTable, - v: { - where?: Condition; - set: Record; - },) => Promise; + table: AnyTable, + v: { + where?: Condition; + set: Record; + }, + ) => Promise; upsert: ( table: AnyTable, @@ -118,70 +254,99 @@ export interface ORMAdapter { ) => Promise; create: ( - table: AnyTable, - values: Record,) => Promise>; + table: AnyTable, + values: Record, + ) => Promise>; createMany: ( - table: AnyTable, - values: Record[],) => Promise< - { - _id: unknown; - }[] - >; + table: AnyTable, + values: Record[], + ) => Promise< + { + _id: unknown; + }[] + >; deleteMany: ( - table: AnyTable, - v: { - where?: Condition; - },) => Promise; + table: AnyTable, + v: { + where?: Condition; + }, + ) => Promise; /** * Override this to support native transaction, otherwise use soft transaction. */ transaction: ( - run: (transactionInstance: AbstractQuery) => Promise, + run: (transactionInstance: AbstractQuery) => Promise, ) => Promise; } +export interface ToORMOptions { + readonly context?: unknown; +} + export function toORM( - adapter: ORMAdapter, + adapter: ORMAdapter, + options: ToORMOptions = {}, ): AbstractQuery { - function toTable(name: unknown) { - const table = adapter.tables[name as string]; - if (!table) throw new Error(`[FumaDB] Invalid table name ${name}.`); + const context = options.context ?? adapter.context; + const internal: ORMAdapter = + context === adapter.context ? adapter : { ...adapter, context }; + + function toTable( + name: TableName, + ): S["tables"][TableName] { + const table = internal.tables[name]; + if (!table) throw new Error(`[FumaDB] Invalid table name ${String(name)}.`); return table; } return { - internal: adapter, + internal, async count(name, { where } = {}) { const table = toTable(name); let conditions = where ? buildCondition(table.columns, where) : undefined; if (conditions === true) conditions = undefined; if (conditions === false) return 0; - return await adapter.count(table, { - where: conditions, - }); + const constrainedWhere = await applyReadPolicies(table, conditions, context); + if (constrainedWhere === false) return 0; + return await internal.count(table, { where: constrainedWhere }); }, async upsert(name, { where, ...options }) { const table = toTable(name); const conditions = where ? buildCondition(table.columns, where) : undefined; if (conditions === false) return; + let compiledWhere: Condition | undefined | false = conditions === true ? undefined : conditions; - await adapter.upsert(table, { - where: conditions === true ? undefined : conditions, + compiledWhere = await applyUpdatePolicies( + table, + compiledWhere, + options.update, + context, + "upsert", + ); + if (compiledWhere === false) return; + await runCreatePolicies(table, options.create, context); + await internal.upsert(table, { + where: compiledWhere, ...options, }); }, async create(name, values) { const table = toTable(name); - return await adapter.create(table, values); + await runCreatePolicies(table, values, context); + return await internal.create(table, values); }, async createMany(name, values) { const table = toTable(name); - return await adapter.createMany(table, values); + for (const value of values) { + await runCreatePolicies(table, value, context); + } + + return await internal.createMany(table, values); }, async deleteMany(name, { where }) { const table = toTable(name); @@ -189,27 +354,27 @@ export function toORM( if (conditions === true) conditions = undefined; if (conditions === false) return; - await adapter.deleteMany(table, { where: conditions }); + const constrainedWhere = await applyDeletePolicies(table, conditions, context); + if (constrainedWhere === false) return; + await internal.deleteMany(table, { where: constrainedWhere }); }, async findMany(name, options = {}) { const table = toTable(name); - const compiledOptions = buildFindOptions( - table, - options as FindManyOptions, - ); + let compiledOptions = buildFindOptions(table, options as FindManyOptions); if (compiledOptions === false) return []; - return await adapter.findMany(table, compiledOptions); + compiledOptions = await applyReadPoliciesToOptions(table, compiledOptions, context); + if (compiledOptions === false) return []; + return await internal.findMany(table, compiledOptions); }, async findFirst(name, options) { const table = toTable(name); - const compiledOptions = buildFindOptions( - table, - options as FindFirstOptions, - ); + let compiledOptions = buildFindOptions(table, options as FindFirstOptions); if (compiledOptions === false) return null; - return await adapter.findFirst(table, compiledOptions); + compiledOptions = await applyReadPoliciesToOptions(table, compiledOptions, context); + if (compiledOptions === false) return null; + return await internal.findFirst(table, compiledOptions); }, async updateMany(name, { set, where }) { const table = toTable(name); @@ -217,10 +382,27 @@ export function toORM( if (conditions === true) conditions = undefined; if (conditions === false) return; - return adapter.updateMany(table, { set, where: conditions }); + const constrainedWhere = await applyUpdatePolicies( + table, + conditions, + set, + context, + "update", + ); + if (constrainedWhere === false) return; + return internal.updateMany(table, { set, where: constrainedWhere }); }, async transaction(run) { - return adapter.transaction(run as any); + return internal.transaction((transactionInstance) => + run(withQueryContext(transactionInstance, context)), + ); }, } as AbstractQuery; } + +export function withQueryContext( + db: AbstractQuery, + context: TContext, +): AbstractQuery { + return toORM(db.internal, { context }); +} diff --git a/packages/core/fumadb/src/query/table-policy.test.ts b/packages/core/fumadb/src/query/table-policy.test.ts new file mode 100644 index 000000000..a855aa78f --- /dev/null +++ b/packages/core/fumadb/src/query/table-policy.test.ts @@ -0,0 +1,500 @@ +import Database from "better-sqlite3"; +import { describe, expect, it } from "@effect/vitest"; +import { drizzle } from "drizzle-orm/better-sqlite3"; +import { Effect } from "effect"; +import { fumadb } from "fumadb"; +import { + createDrizzleRuntimeSchemaFromTables, + createDrizzleRuntimeSchemaSqlFromTables, + drizzleAdapter, +} from "fumadb/adapters/drizzle"; +import { withQueryContext, type AbstractQuery } from "fumadb/query"; +import { column, idColumn, schema, table } from "fumadb/schema"; + +interface TenantPolicyContext { + readonly allowedTenantIds: ReadonlySet; + readonly marker: string; + readonly observed: string[]; +} + +const observe = (context: TenantPolicyContext, event: string) => { + context.observed.push(`${context.marker}:${event}`); +}; + +const assertTenantAllowed = (tableName: string, context: TenantPolicyContext, tenantId: string) => { + observe(context, `${tableName}:assert`); + if (!context.allowedTenantIds.has(tenantId)) { + // oxlint-disable-next-line executor/no-try-catch-or-throw, executor/no-error-constructor -- boundary: FumaDB table policy callbacks reject writes by throwing + throw new Error(`tenant ${tenantId} is not allowed for ${tableName}`); + } +}; + +const authors = table("policy_authors", { + id: idColumn("id", "varchar(255)"), + tenantId: column("tenant_id", "varchar(255)"), + name: column("name", "string"), +}).policy({ + name: "tenant.authors", + onRead: ({ builder, context }) => { + observe(context, "authors:read"); + return builder("tenantId", "in", [...context.allowedTenantIds]); + }, + onCreate: ({ values, context }) => assertTenantAllowed("authors", context, values.tenantId), + onUpdate: ({ builder, set, context }) => { + observe(context, "authors:update"); + if (set.tenantId !== undefined) assertTenantAllowed("authors", context, set.tenantId); + return builder("tenantId", "in", [...context.allowedTenantIds]); + }, + onDelete: ({ builder, context }) => { + observe(context, "authors:delete"); + return builder("tenantId", "in", [...context.allowedTenantIds]); + }, +}); + +const posts = table("policy_posts", { + id: idColumn("id", "varchar(255)"), + tenantId: column("tenant_id", "varchar(255)"), + authorId: column("author_id", "varchar(255)"), + title: column("title", "string"), +}).policy({ + name: "tenant.posts", + onRead: ({ builder, context }) => { + observe(context, "posts:read"); + return builder("tenantId", "in", [...context.allowedTenantIds]); + }, + onCreate: ({ values, context }) => assertTenantAllowed("posts", context, values.tenantId), + onUpdate: ({ builder, set, context }) => { + observe(context, "posts:update"); + if (set.tenantId !== undefined) assertTenantAllowed("posts", context, set.tenantId); + return builder("tenantId", "in", [...context.allowedTenantIds]); + }, + onDelete: ({ builder, context }) => { + observe(context, "posts:delete"); + return builder("tenantId", "in", [...context.allowedTenantIds]); + }, +}); + +const comments = table("policy_comments", { + id: idColumn("id", "varchar(255)"), + tenantId: column("tenant_id", "varchar(255)"), + postId: column("post_id", "varchar(255)"), + body: column("body", "string"), +}).policy({ + name: "tenant.comments", + onRead: ({ builder, context }) => { + observe(context, "comments:read"); + return builder("tenantId", "in", [...context.allowedTenantIds]); + }, + onCreate: ({ values, context }) => assertTenantAllowed("comments", context, values.tenantId), + onUpdate: ({ builder, set, context }) => { + observe(context, "comments:update"); + if (set.tenantId !== undefined) assertTenantAllowed("comments", context, set.tenantId); + return builder("tenantId", "in", [...context.allowedTenantIds]); + }, + onDelete: ({ builder, context }) => { + observe(context, "comments:delete"); + return builder("tenantId", "in", [...context.allowedTenantIds]); + }, +}); + +const v1 = schema({ + version: "1.0.0", + tables: { + authors, + posts, + comments, + }, + relations: { + authors: ({ many }) => ({ + posts: many("posts"), + }), + posts: ({ one, many }) => ({ + author: one("authors", ["authorId", "id"]).foreignKey(), + comments: many("comments"), + }), + comments: ({ one }) => ({ + post: one("posts", ["postId", "id"]).foreignKey(), + }), + }, +}); + +const tablePolicyDB = fumadb({ + namespace: "table_policy_test", + schemas: [v1], +}); + +type TablePolicyQuery = AbstractQuery; + +const makeContext = (allowedTenantIds: readonly string[], marker: string): TenantPolicyContext => ({ + allowedTenantIds: new Set(allowedTenantIds), + marker, + observed: [], +}); + +const makeHarness = async () => { + const sqlite = new Database(":memory:"); + sqlite.pragma("foreign_keys = ON"); + const runtimeSchema = createDrizzleRuntimeSchemaFromTables({ + tables: v1.tables, + namespace: "table_policy_test", + version: "1.0.0", + provider: "sqlite", + }); + const drizzleDb = drizzle(sqlite, { schema: runtimeSchema }); + + for (const statement of createDrizzleRuntimeSchemaSqlFromTables({ + tables: v1.tables, + namespace: "table_policy_test", + version: "1.0.0", + provider: "sqlite", + })) { + sqlite.exec(statement); + } + + const client = tablePolicyDB.client( + drizzleAdapter({ + db: drizzleDb, + provider: "sqlite", + }), + ); + + return { + orm: client.orm("1.0.0"), + close: async () => { + sqlite.close(); + }, + }; +}; + +const useHarness = (run: (orm: TablePolicyQuery) => Promise) => + Effect.acquireUseRelease( + Effect.promise(makeHarness), + ({ orm }) => Effect.promise(() => run(orm)), + ({ close }) => Effect.promise(close), + ); + +const seedTenants = async (orm: TablePolicyQuery) => { + const seed = withQueryContext(orm, makeContext(["tenant-a", "tenant-b"], "seed")); + + await seed.createMany("authors", [ + { + id: "author-a", + tenantId: "tenant-a", + name: "Ada", + }, + { + id: "author-b", + tenantId: "tenant-b", + name: "Bert", + }, + ]); + + await seed.createMany("posts", [ + { + id: "post-a-1", + tenantId: "tenant-a", + authorId: "author-a", + title: "A One", + }, + { + id: "post-a-2", + tenantId: "tenant-a", + authorId: "author-a", + title: "A Two", + }, + { + id: "post-b-1", + tenantId: "tenant-b", + authorId: "author-b", + title: "B One", + }, + ]); + + await seed.createMany("comments", [ + { + id: "comment-a-1", + tenantId: "tenant-a", + postId: "post-a-1", + body: "A comment", + }, + { + id: "comment-b-1", + tenantId: "tenant-b", + postId: "post-b-1", + body: "B comment", + }, + ]); +}; + +describe("FumaDB table policies", () => { + it.effect( + "filters reads, joins, counts, updates, deletes, and upserts through public query APIs", + () => + useHarness(async (orm) => { + await seedTenants(orm); + const tenantAContext = makeContext(["tenant-a"], "tenant-a"); + const tenantA = withQueryContext(orm, tenantAContext); + const allTenants = withQueryContext(orm, makeContext(["tenant-a", "tenant-b"], "all")); + + await expect(tenantA.count("posts")).resolves.toBe(2); + + await expect( + tenantA.findMany("authors", { + orderBy: ["id", "asc"], + join: (builder) => + builder.posts({ + orderBy: ["id", "asc"], + join: (builder) => builder.comments({ orderBy: ["id", "asc"] }), + }), + }), + ).resolves.toEqual([ + { + id: "author-a", + tenantId: "tenant-a", + name: "Ada", + posts: [ + { + id: "post-a-1", + tenantId: "tenant-a", + authorId: "author-a", + title: "A One", + comments: [ + { + id: "comment-a-1", + tenantId: "tenant-a", + postId: "post-a-1", + body: "A comment", + }, + ], + }, + { + id: "post-a-2", + tenantId: "tenant-a", + authorId: "author-a", + title: "A Two", + comments: [], + }, + ], + }, + ]); + + await expect( + tenantA.findMany("posts", { + orderBy: ["id", "asc"], + join: (builder) => + builder.author({ + select: ["id", "tenantId"], + }), + }), + ).resolves.toEqual([ + { + id: "post-a-1", + tenantId: "tenant-a", + authorId: "author-a", + title: "A One", + author: { + id: "author-a", + tenantId: "tenant-a", + }, + }, + { + id: "post-a-2", + tenantId: "tenant-a", + authorId: "author-a", + title: "A Two", + author: { + id: "author-a", + tenantId: "tenant-a", + }, + }, + ]); + + await tenantA.updateMany("posts", { + set: { + title: "tenant-a-updated", + }, + }); + await expect( + allTenants.findMany("posts", { + select: ["id", "title"], + orderBy: ["id", "asc"], + }), + ).resolves.toEqual([ + { + id: "post-a-1", + title: "tenant-a-updated", + }, + { + id: "post-a-2", + title: "tenant-a-updated", + }, + { + id: "post-b-1", + title: "B One", + }, + ]); + + await tenantA.deleteMany("comments", {}); + await expect( + allTenants.findMany("comments", { + select: ["id", "tenantId"], + orderBy: ["id", "asc"], + }), + ).resolves.toEqual([ + { + id: "comment-b-1", + tenantId: "tenant-b", + }, + ]); + + await tenantA.upsert("posts", { + where: (builder) => builder("id", "=", "post-a-2"), + update: { + title: "tenant-a-upserted", + }, + create: { + id: "post-a-created-if-missing", + tenantId: "tenant-a", + authorId: "author-a", + title: "not used", + }, + }); + await tenantA.upsert("posts", { + where: (builder) => builder("id", "=", "post-a-3"), + update: { + title: "not used", + }, + create: { + id: "post-a-3", + tenantId: "tenant-a", + authorId: "author-a", + title: "A Three", + }, + }); + + await expect( + tenantA.findMany("posts", { + select: ["id", "title"], + orderBy: ["id", "asc"], + }), + ).resolves.toEqual([ + { + id: "post-a-1", + title: "tenant-a-updated", + }, + { + id: "post-a-2", + title: "tenant-a-upserted", + }, + { + id: "post-a-3", + title: "A Three", + }, + ]); + + expect(tenantAContext.observed).toEqual( + expect.arrayContaining([ + "tenant-a:posts:read", + "tenant-a:authors:read", + "tenant-a:comments:read", + "tenant-a:posts:update", + "tenant-a:comments:delete", + "tenant-a:posts:assert", + ]), + ); + }), + ); + + it.effect( + "rejects out-of-context writes across createMany, updateMany, upsert, and transactions", + () => + useHarness(async (orm) => { + await seedTenants(orm); + const tenantAContext = makeContext(["tenant-a"], "tenant-a"); + const tenantA = withQueryContext(orm, tenantAContext); + + await expect( + tenantA.createMany("posts", [ + { + id: "post-a-batch", + tenantId: "tenant-a", + authorId: "author-a", + title: "A batch", + }, + { + id: "post-b-batch", + tenantId: "tenant-b", + authorId: "author-b", + title: "B batch", + }, + ]), + ).rejects.toThrow("tenant tenant-b is not allowed for posts"); + await expect( + tenantA.findFirst("posts", { + where: (builder) => builder("id", "=", "post-a-batch"), + }), + ).resolves.toBeNull(); + + await expect( + tenantA.updateMany("posts", { + where: (builder) => builder("id", "=", "post-a-1"), + set: { + tenantId: "tenant-b", + }, + }), + ).rejects.toThrow("tenant tenant-b is not allowed for posts"); + + await expect( + tenantA.upsert("posts", { + where: (builder) => builder("id", "=", "post-b-2"), + update: { + title: "not used", + }, + create: { + id: "post-b-2", + tenantId: "tenant-b", + authorId: "author-b", + title: "B Two", + }, + }), + ).rejects.toThrow("tenant tenant-b is not allowed for posts"); + + await expect( + tenantA.transaction(async (tx) => { + await tx.create("posts", { + id: "post-a-transaction", + tenantId: "tenant-a", + authorId: "author-a", + title: "A transaction", + }); + await expect(tx.count("posts")).resolves.toBe(3); + await tx.create("posts", { + id: "post-b-transaction", + tenantId: "tenant-b", + authorId: "author-b", + title: "B transaction", + }); + }), + ).rejects.toThrow("tenant tenant-b is not allowed for posts"); + + await expect( + tenantA.findFirst("posts", { + where: (builder) => builder("id", "=", "post-a-transaction"), + }), + ).resolves.toBeNull(); + await expect( + tenantA.findFirst("posts", { + where: (builder) => builder("id", "=", "post-b-transaction"), + }), + ).resolves.toBeNull(); + + expect(tenantAContext.observed).toEqual( + expect.arrayContaining([ + "tenant-a:posts:assert", + "tenant-a:posts:update", + "tenant-a:posts:read", + ]), + ); + }), + ); +}); diff --git a/packages/core/fumadb/src/schema/create.ts b/packages/core/fumadb/src/schema/create.ts index 4c2b6cef4..13d047055 100644 --- a/packages/core/fumadb/src/schema/create.ts +++ b/packages/core/fumadb/src/schema/create.ts @@ -1,17 +1,122 @@ import { createId } from "../cuid"; import type { CustomMigrationFn } from "../migration-engine/create"; +import type { Condition, ConditionBuilder } from "../query/condition-builder"; import { validateSchema } from "./validate"; export type AnySchema = Schema>; export type AnyRelation = Relation; -export type AnyTable = Table; +export interface AnyTable { + names: NameVariants; + ormName: string; + columns: Record; + relations: Record; + foreignKeys: ForeignKey[]; + policies: StoredTablePolicy[]; + getUniqueConstraints: (level?: "table" | "column" | "all") => UniqueConstraint[]; + getColumnByName: (name: string, type?: keyof NameVariants) => AnyColumn | undefined; + getIdColumn: () => AnyColumn; + clone: () => AnyTable; +} export type AnyColumn = | Column | IdColumn; +export type TableColumnName = Extract; + +export type TableColumnValues = { + [K in keyof T["columns"]]: T["columns"][K]["$out"]; +}; + +type Simplify = { + [K in keyof T]: T[K]; +}; + +type TableColumnInputValues = { + [K in keyof T["columns"]]: T["columns"][K]["$in"]; +}; + +type PickNotNullable = { + [P in keyof T as null extends T[P] ? never : P]: T[P]; +}; + +export type TableInsertValues = Simplify< + { + [K in keyof T["columns"]]?: T["columns"][K]["$in"]; + } & PickNotNullable> +>; + +export type TableUpdateValues = { + [K in keyof T["columns"]]?: T["columns"][K] extends IdColumn + ? never + : T["columns"][K]["$in"]; +}; + +type MaybePromise = T | Promise; +export type TablePolicyCondition = Condition | boolean | void; + +export interface TableReadPolicyInput { + readonly where: Condition | undefined; + readonly context: TContext; + readonly builder: ConditionBuilder; +} + +export interface TableCreatePolicyInput { + readonly values: TableInsertValues; + readonly context: TContext; +} + +export interface TableUpdatePolicyInput { + readonly where: Condition | undefined; + readonly set: TableUpdateValues; + readonly context: TContext; + readonly builder: ConditionBuilder; + readonly operation: "update" | "upsert"; +} + +export interface TableDeletePolicyInput { + readonly where: Condition | undefined; + readonly context: TContext; + readonly builder: ConditionBuilder; +} + +export type TableReadPolicyCallback = ( + input: TableReadPolicyInput +) => MaybePromise; + +export type TableCreatePolicyCallback = ( + input: TableCreatePolicyInput +) => MaybePromise; + +export type TableUpdatePolicyCallback = ( + input: TableUpdatePolicyInput +) => MaybePromise; + +export type TableDeletePolicyCallback = ( + input: TableDeletePolicyInput +) => MaybePromise; + +export interface TablePolicy< + TTable extends AnyTable = AnyTable, + TContext = unknown, +> { + readonly name?: string; + onRead?: TableReadPolicyCallback; + onCreate?: TableCreatePolicyCallback; + onUpdate?: TableUpdatePolicyCallback; + onDelete?: TableDeletePolicyCallback; +} + +export interface StoredTablePolicy { + readonly name?: string; + onRead?: TableReadPolicyCallback; + onCreate?: TableCreatePolicyCallback; + onUpdate?: TableUpdatePolicyCallback; + onDelete?: TableDeletePolicyCallback; +} + export type ForeignKeyAction = "RESTRICT" | "CASCADE" | "SET NULL"; export interface NameVariants { @@ -199,6 +304,7 @@ export interface Table< columns: Columns; relations: Relations; foreignKeys: ForeignKey[]; + policies: StoredTablePolicy[]; /** * @param level default to 'all' @@ -220,10 +326,9 @@ export interface Table< /** * Add unique constraint to the fields, for consistency, duplicated null values are allowed. */ - unique: ( - name: string, - columns: (keyof Columns)[] - ) => Table; + unique(name: string, columns: (keyof Columns)[]): Table; + + policy(policy: TablePolicy): this; clone: () => Table; } @@ -469,6 +574,7 @@ export function table>( let names: NameVariants | undefined; const uniqueConstraints: UniqueConstraint[] = []; + const policies: StoredTablePolicy[] = []; const out: Table = { ormName: "", get names() { @@ -484,6 +590,7 @@ export function table>( columns, relations: {}, foreignKeys: [], + policies, getUniqueConstraints(level = "all") { const result: UniqueConstraint[] = []; if (level === "all" || level === "table") @@ -522,6 +629,10 @@ export function table>( return this; }, + policy(policy) { + policies.push(policy as StoredTablePolicy); + return this; + }, clone() { const cloneColumns: Record = {}; @@ -536,6 +647,7 @@ export function table>( con.columns.map((col) => col.ormName) ); } + clone.policies.push(...policies); return clone; }, @@ -594,7 +706,7 @@ type CreateSchemaTables< > : Relations > - : never; + : Tables[K]; }; export interface Schema< diff --git a/packages/core/fumadb/test/setup.ts b/packages/core/fumadb/test/setup.ts index 4e42f47cf..eec1370c4 100644 --- a/packages/core/fumadb/test/setup.ts +++ b/packages/core/fumadb/test/setup.ts @@ -1,5 +1,17 @@ +import * as fs from "node:fs"; +import * as path from "node:path"; import { afterAll } from "vitest"; -import { cleanupFiles } from "./shared"; + +const sqlitePath = path.join( + import.meta.dirname, + "../node_modules/sqlite.sqlite", +); +const prismaDir = path.join(import.meta.dirname, "./prisma"); + +const cleanupFiles = () => { + fs.rmSync(sqlitePath, { force: true }); + fs.rmSync(prismaDir, { recursive: true, force: true }); +}; afterAll(() => { cleanupFiles(); diff --git a/packages/core/sdk/src/core-schema.ts b/packages/core/sdk/src/core-schema.ts index 29c473b01..c2241df8c 100644 --- a/packages/core/sdk/src/core-schema.ts +++ b/packages/core/sdk/src/core-schema.ts @@ -1,5 +1,11 @@ import { column, idColumn, table, type AnyColumn, type AnyTable } from "fumadb/schema"; import type { FumaRow } from "./fuma-runtime"; +import { + assertExecutorScopeAllowed, + executorScopePolicyName, + executorScopeIds, + type ExecutorScopePolicyContext, +} from "./scope-policy"; type UserColumns = Record; @@ -18,33 +24,94 @@ export const executorTable = ( columns: TColumns, ) => { const out = table(name, { + ...columns, row_id: idColumn("row_id", "varchar(255)").defaultTo$("auto"), id: column("id", "varchar(255)"), - ...columns, }); out.unique(`${name}_id_uidx`, ["id"]); - return out; + return out.policy({ + name: executorScopePolicyName, + }); }; -export const scopedExecutorTable = ( +const scopedExecutorTableBase = ( name: string, columns: TColumns, ) => { const out = table(name, { + ...columns, row_id: idColumn("row_id", "varchar(255)").defaultTo$("auto"), id: column("id", "varchar(255)"), scope_id: column("scope_id", "varchar(255)"), - ...columns, }); out.unique(`${name}_scope_id_id_uidx`, ["scope_id", "id"]); return out; }; +export const scopedExecutorTable = ( + name: string, + columns: TColumns, +) => { + const out = scopedExecutorTableBase(name, columns); + return out.policy({ + name: executorScopePolicyName, + onRead: ({ builder, context }) => + builder("scope_id", "in", executorScopeIds(name, "read", context)), + onCreate: ({ values, context }) => + assertExecutorScopeAllowed(name, "write", values.scope_id, context), + onUpdate: ({ builder, set, context }) => { + if (set.scope_id !== undefined) { + assertExecutorScopeAllowed(name, "write", set.scope_id, context); + } + return builder("scope_id", "in", executorScopeIds(name, "write", context)); + }, + onDelete: ({ builder, context }) => + builder("scope_id", "in", executorScopeIds(name, "delete", context)), + }); +}; + const defineTables = >(tables: TTables): TTables => tables; export const credentialBindingKinds = ["text", "secret", "connection"] as const; +const credentialBindingTable = (() => { + const out = scopedExecutorTableBase("credential_binding", { + plugin_id: textColumn("plugin_id"), + source_id: textColumn("source_id"), + source_scope_id: textColumn("source_scope_id"), + slot_key: textColumn("slot_key"), + kind: textColumn("kind"), + text_value: nullableTextColumn("text_value"), + secret_id: nullableTextColumn("secret_id"), + secret_scope_id: nullableTextColumn("secret_scope_id"), + connection_id: nullableTextColumn("connection_id"), + created_at: dateColumn("created_at"), + updated_at: dateColumn("updated_at"), + }); + + return out.policy({ + name: executorScopePolicyName, + onRead: ({ builder, context }) => + builder("scope_id", "in", executorScopeIds("credential_binding", "read", context)), + onCreate: ({ values, context }) => + assertExecutorScopeAllowed("credential_binding", "write", values.scope_id, context), + onUpdate: ({ builder, set, context }) => { + if (set.scope_id !== undefined) { + assertExecutorScopeAllowed("credential_binding", "write", set.scope_id, context); + } + return builder("scope_id", "in", executorScopeIds("credential_binding", "write", context)); + }, + onDelete: ({ builder, context }) => { + const scopeIds = executorScopeIds("credential_binding", "delete", context); + return builder.or( + builder("scope_id", "in", scopeIds), + builder("source_scope_id", "in", scopeIds), + ); + }, + }); +})(); + export const coreTables = defineTables({ source: scopedExecutorTable("source", { plugin_id: textColumn("plugin_id"), @@ -101,19 +168,7 @@ export const coreTables = defineTables({ expires_at: bigintColumn("expires_at"), created_at: dateColumn("created_at"), }), - credential_binding: scopedExecutorTable("credential_binding", { - plugin_id: textColumn("plugin_id"), - source_id: textColumn("source_id"), - source_scope_id: textColumn("source_scope_id"), - slot_key: textColumn("slot_key"), - kind: textColumn("kind"), - text_value: nullableTextColumn("text_value"), - secret_id: nullableTextColumn("secret_id"), - secret_scope_id: nullableTextColumn("secret_scope_id"), - connection_id: nullableTextColumn("connection_id"), - created_at: dateColumn("created_at"), - updated_at: dateColumn("updated_at"), - }), + credential_binding: credentialBindingTable, tool_policy: scopedExecutorTable("tool_policy", { pattern: textColumn("pattern"), action: textColumn("action"), diff --git a/packages/core/sdk/src/credential-bindings.test.ts b/packages/core/sdk/src/credential-bindings.test.ts index 7bba8f9f3..e1c824c7b 100644 --- a/packages/core/sdk/src/credential-bindings.test.ts +++ b/packages/core/sdk/src/credential-bindings.test.ts @@ -1,5 +1,6 @@ import { describe, expect, it } from "@effect/vitest"; import { Effect, Predicate, Result } from "effect"; +import { withQueryContext } from "fumadb/query"; import { CreateConnectionInput, TokenMaterial } from "./connections"; import { createExecutor, type Executor } from "./executor"; @@ -97,7 +98,10 @@ const makeHarness = () => { }); return { - db: config.db, + dbFor: (visibleScopes: readonly Scope[]) => + withQueryContext(config.db, { + allowedScopeIds: new Set(visibleScopes.map((visibleScope) => String(visibleScope.id))), + }), scopes, create: (visibleScopes: readonly Scope[]) => create(visibleScopes, plugins), }; @@ -304,7 +308,7 @@ describe("credential bindings", () => { const migratedAt = new Date("2026-05-01T00:00:00.000Z"); yield* Effect.promise(() => - harness.db.create("credential_binding", { + harness.dbFor([harness.scopes.org]).create("credential_binding", { id: "invalid-outer-binding", scope_id: harness.scopes.org.id, plugin_id: TEST_PLUGIN_ID, @@ -520,7 +524,7 @@ describe("credential bindings", () => { const migratedAt = new Date("2026-05-01T00:00:00.000Z"); yield* Effect.promise(() => - harness.db.create("credential_binding", { + harness.dbFor([harness.scopes.userWorkspaceA]).create("credential_binding", { id: "openapi-source-binding:legacy-row-id", scope_id: harness.scopes.userWorkspaceA.id, plugin_id: TEST_PLUGIN_ID, @@ -546,7 +550,7 @@ describe("credential bindings", () => { }); const rawRows = yield* Effect.promise(() => - harness.db.findMany("credential_binding", { + harness.dbFor([harness.scopes.userWorkspaceA]).findMany("credential_binding", { where: (b) => b.and( b("scope_id", "=", harness.scopes.userWorkspaceA.id), diff --git a/packages/core/sdk/src/executor.ts b/packages/core/sdk/src/executor.ts index 046c3091e..6883a14f5 100644 --- a/packages/core/sdk/src/executor.ts +++ b/packages/core/sdk/src/executor.ts @@ -12,13 +12,14 @@ import { import { FetchHttpClient, type HttpClient } from "effect/unstable/http"; import { fumadb } from "fumadb"; import { memoryAdapter } from "fumadb/adapters/memory"; -import type { Condition, ConditionBuilder } from "fumadb/query"; +import { withQueryContext, type Condition, type ConditionBuilder } from "fumadb/query"; import { schema as fumaSchema, type RelationsMap } from "fumadb/schema"; import type { AnyColumn } from "fumadb/schema"; import type { OAuthEndpointUrlPolicy } from "./oauth-helpers"; import { generateKeyBetween } from "fractional-indexing"; import { StorageError, + isStorageFailure, makeFumaClient, type FumaDb, type FumaRow, @@ -122,6 +123,7 @@ import { type ToolListFilter, } from "./types"; import { buildToolTypeScriptPreview } from "./schema-types"; +import { assertExecutorScopePolicyTable, type ExecutorScopePolicyContext } from "./scope-policy"; import { validateHostedOutboundUrl } from "./hosted-http-client"; const MAX_ANNOTATION_GROUPS = 64; @@ -398,9 +400,21 @@ export const collectTables = (plugins: readonly AnyPlugin[]): FumaTables => { merged[tableKey] = tableDef as FumaTables[string]; } } + + validateExecutorScopePolicyTables(merged); + return merged; }; +const validateExecutorScopePolicyTables = (tables: FumaTables): void => { + for (const tableDef of Object.values(tables)) { + assertExecutorScopePolicyTable(tableDef); + } +}; + +const storageFailureFromUnknown = (message: string, cause: unknown): StorageFailure => + isStorageFailure(cause) ? cause : new StorageError({ message, cause }); + const createDefaultMemoryDb = (tables: FumaTables): ExecutorDb => { const version = "1.0.0"; const latestSchema = fumaSchema>({ @@ -784,14 +798,22 @@ export const createExecutor = collectTables(plugins), + catch: (cause) => storageFailureFromUnknown("Failed to collect executor tables", cause), + }); const dbInput = yield* Effect.suspend(() => { - if (!config.db) return Effect.succeed(createDefaultMemoryDb(collectTables(plugins))); + if (!config.db) return Effect.succeed(createDefaultMemoryDb(tables)); if (typeof config.db !== "function") return Effect.succeed(config.db); - const out = config.db({ tables: collectTables(plugins) }); + const out = config.db({ tables }); return Effect.isEffect(out) ? out : Effect.succeed(out); }); const rootDbUntyped = "db" in dbInput ? dbInput.db : dbInput; const closeDb = "db" in dbInput ? dbInput.close : undefined; + yield* Effect.try({ + try: () => validateExecutorScopePolicyTables(rootDbUntyped.internal.tables), + catch: (cause) => storageFailureFromUnknown("Failed to validate executor tables", cause), + }); if (scopes.length === 0) { return yield* new StorageError({ @@ -800,12 +822,13 @@ export const createExecutor =