Skip to content

Commit 712bcd9

Browse files
bchapuisclaude
andcommitted
Add SQL identifier validation, fix any types, and add tests for database nodes
Add validateIdentifier() to prevent SQL injection via table/column name interpolation. Replace all `any` casts with proper types across database nodes. Alphabetize registry registrations, mark query node count output as hidden for consistency. Add test files for all 16 database nodes and full coverage for database-table utility functions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent b0ea214 commit 712bcd9

34 files changed

Lines changed: 4024 additions & 136 deletions

apps/api/src/runtime/cloudflare-node-registry.ts

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,18 @@ import { CsvFilterRowsNode } from "@dafthunk/runtime/nodes/csv/csv-filter-rows-n
6565
import { CsvParseNode } from "@dafthunk/runtime/nodes/csv/csv-parse-node";
6666
import { CsvStringifyNode } from "@dafthunk/runtime/nodes/csv/csv-stringify-node";
6767
import { DatabaseCreateTableNode } from "@dafthunk/runtime/nodes/database/database-create-table-node";
68+
import { DatabaseDeleteRowNode } from "@dafthunk/runtime/nodes/database/database-delete-row-node";
6869
import { DatabaseDescribeTableNode } from "@dafthunk/runtime/nodes/database/database-describe-table-node";
6970
import { DatabaseDropTableNode } from "@dafthunk/runtime/nodes/database/database-drop-table-node";
7071
import { DatabaseExecuteNode } from "@dafthunk/runtime/nodes/database/database-execute-node";
7172
import { DatabaseExportTableNode } from "@dafthunk/runtime/nodes/database/database-export-table-node";
7273
import { DatabaseGetRowCountNode } from "@dafthunk/runtime/nodes/database/database-get-row-count-node";
74+
import { DatabaseGetRowNode } from "@dafthunk/runtime/nodes/database/database-get-row-node";
7375
import { DatabaseImportTableNode } from "@dafthunk/runtime/nodes/database/database-import-table-node";
7476
import { DatabaseListTablesNode } from "@dafthunk/runtime/nodes/database/database-list-tables-node";
77+
import { DatabasePutRowNode } from "@dafthunk/runtime/nodes/database/database-put-row-node";
7578
import { DatabaseQueryNode } from "@dafthunk/runtime/nodes/database/database-query-node";
79+
import { DatabaseRowExistsNode } from "@dafthunk/runtime/nodes/database/database-row-exists-node";
7680
import { DatabaseTableExistsNode } from "@dafthunk/runtime/nodes/database/database-table-exists-node";
7781
import { DatabaseTruncateTableNode } from "@dafthunk/runtime/nodes/database/database-truncate-table-node";
7882
import { ParquetQueryNode } from "@dafthunk/runtime/nodes/database/parquet-query-node";
@@ -511,16 +515,20 @@ export class CloudflareNodeRegistry extends BaseNodeRegistry<Bindings> {
511515
this.registerImplementation(SendQueueMessageNode);
512516
this.registerImplementation(SendQueueBatchNode);
513517
this.registerImplementation(ReceiveQueueMessageNode);
514-
this.registerImplementation(DatabaseQueryNode);
515-
this.registerImplementation(DatabaseExecuteNode);
516518
this.registerImplementation(DatabaseCreateTableNode);
517-
this.registerImplementation(DatabaseImportTableNode);
518-
this.registerImplementation(DatabaseExportTableNode);
519+
this.registerImplementation(DatabaseDeleteRowNode);
519520
this.registerImplementation(DatabaseDescribeTableNode);
520-
this.registerImplementation(DatabaseListTablesNode);
521521
this.registerImplementation(DatabaseDropTableNode);
522-
this.registerImplementation(DatabaseTableExistsNode);
522+
this.registerImplementation(DatabaseExecuteNode);
523+
this.registerImplementation(DatabaseExportTableNode);
523524
this.registerImplementation(DatabaseGetRowCountNode);
525+
this.registerImplementation(DatabaseGetRowNode);
526+
this.registerImplementation(DatabaseImportTableNode);
527+
this.registerImplementation(DatabaseListTablesNode);
528+
this.registerImplementation(DatabasePutRowNode);
529+
this.registerImplementation(DatabaseQueryNode);
530+
this.registerImplementation(DatabaseRowExistsNode);
531+
this.registerImplementation(DatabaseTableExistsNode);
524532
this.registerImplementation(DatabaseTruncateTableNode);
525533
this.registerImplementation(ParquetQueryNode);
526534
this.registerImplementation(ReceiveEmailNode);
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
import type { Node, Schema } from "@dafthunk/types";
2+
import { describe, expect, it, vi } from "vitest";
3+
import type { NodeContext } from "../../node-types";
4+
import { DatabaseCreateTableNode } from "./database-create-table-node";
5+
6+
const testSchema: Schema = {
7+
name: "users",
8+
fields: [
9+
{ name: "id", type: "integer", primaryKey: true },
10+
{ name: "name", type: "string" },
11+
{ name: "email", type: "string" },
12+
],
13+
};
14+
15+
function createMockConnection() {
16+
return {
17+
query: vi.fn().mockResolvedValue({ results: [] }),
18+
execute: vi.fn().mockResolvedValue({
19+
results: [],
20+
meta: { rowsAffected: 0 },
21+
}),
22+
};
23+
}
24+
25+
function createContext(
26+
inputs: Record<string, unknown>,
27+
connection?: ReturnType<typeof createMockConnection>
28+
): NodeContext {
29+
return {
30+
nodeId: "database-create-table",
31+
workflowId: "test-workflow",
32+
organizationId: "test-org",
33+
inputs,
34+
getIntegration: async () => {
35+
throw new Error("No integrations in test");
36+
},
37+
env: {},
38+
databaseService: connection
39+
? { resolve: vi.fn().mockResolvedValue(connection) }
40+
: undefined,
41+
} as unknown as NodeContext;
42+
}
43+
44+
describe("DatabaseCreateTableNode", () => {
45+
const createNode = () =>
46+
new DatabaseCreateTableNode({
47+
nodeId: "database-create-table",
48+
} as unknown as Node);
49+
50+
it("should create a table when it does not exist", async () => {
51+
const connection = createMockConnection();
52+
connection.query.mockResolvedValue({ results: [] });
53+
const node = createNode();
54+
const result = await node.execute(
55+
createContext({ database: "db-1", schema: testSchema }, connection)
56+
);
57+
58+
expect(result.status).toBe("completed");
59+
expect(result.outputs?.created).toBe(true);
60+
expect(connection.query).toHaveBeenCalled();
61+
expect(connection.execute).toHaveBeenCalled();
62+
});
63+
64+
it("should return created=false when table already exists", async () => {
65+
const connection = createMockConnection();
66+
connection.query.mockResolvedValue({
67+
results: [{ name: "users" }],
68+
});
69+
const node = createNode();
70+
const result = await node.execute(
71+
createContext({ database: "db-1", schema: testSchema }, connection)
72+
);
73+
74+
expect(result.status).toBe("completed");
75+
expect(result.outputs?.created).toBe(false);
76+
expect(connection.execute).not.toHaveBeenCalled();
77+
});
78+
79+
it("should return error for missing database", async () => {
80+
const node = createNode();
81+
const result = await node.execute(createContext({ schema: testSchema }));
82+
83+
expect(result.status).toBe("error");
84+
expect(result.error).toContain("'database' is a required input");
85+
});
86+
87+
it("should return error for missing schema", async () => {
88+
const node = createNode();
89+
const connection = createMockConnection();
90+
const result = await node.execute(
91+
createContext({ database: "db-1" }, connection)
92+
);
93+
94+
expect(result.status).toBe("error");
95+
expect(result.error).toContain("'schema' is a required input");
96+
});
97+
98+
it("should return error for schema without name", async () => {
99+
const node = createNode();
100+
const connection = createMockConnection();
101+
const result = await node.execute(
102+
createContext(
103+
{
104+
database: "db-1",
105+
schema: {
106+
fields: [{ name: "id", type: "integer" }],
107+
},
108+
},
109+
connection
110+
)
111+
);
112+
113+
expect(result.status).toBe("error");
114+
expect(result.error).toContain("Invalid schema");
115+
});
116+
117+
it("should return error for schema without fields", async () => {
118+
const node = createNode();
119+
const connection = createMockConnection();
120+
const result = await node.execute(
121+
createContext(
122+
{
123+
database: "db-1",
124+
schema: { name: "test" },
125+
},
126+
connection
127+
)
128+
);
129+
130+
expect(result.status).toBe("error");
131+
expect(result.error).toContain("Invalid schema");
132+
});
133+
134+
it("should return error for schema with empty fields array", async () => {
135+
const node = createNode();
136+
const connection = createMockConnection();
137+
const result = await node.execute(
138+
createContext(
139+
{
140+
database: "db-1",
141+
schema: { name: "test", fields: [] },
142+
},
143+
connection
144+
)
145+
);
146+
147+
expect(result.status).toBe("error");
148+
expect(result.error).toContain("'fields' array cannot be empty");
149+
});
150+
151+
it("should return error when database service is not available", async () => {
152+
const node = createNode();
153+
const result = await node.execute(
154+
createContext({ database: "db-1", schema: testSchema })
155+
);
156+
157+
expect(result.status).toBe("error");
158+
expect(result.error).toContain("Database service not available");
159+
});
160+
161+
it("should return error when database is not found", async () => {
162+
const node = createNode();
163+
const context = {
164+
nodeId: "database-create-table",
165+
workflowId: "test-workflow",
166+
organizationId: "test-org",
167+
inputs: { database: "db-1", schema: testSchema },
168+
getIntegration: async () => {
169+
throw new Error("No integrations in test");
170+
},
171+
env: {},
172+
databaseService: { resolve: vi.fn().mockResolvedValue(null) },
173+
} as unknown as NodeContext;
174+
175+
const result = await node.execute(context);
176+
177+
expect(result.status).toBe("error");
178+
expect(result.error).toContain("not found or does not belong");
179+
});
180+
181+
it("should return error when execute throws", async () => {
182+
const connection = createMockConnection();
183+
connection.query.mockResolvedValue({ results: [] });
184+
connection.execute.mockRejectedValue(new Error("disk full"));
185+
const node = createNode();
186+
const result = await node.execute(
187+
createContext({ database: "db-1", schema: testSchema }, connection)
188+
);
189+
190+
expect(result.status).toBe("error");
191+
expect(result.error).toContain("disk full");
192+
});
193+
194+
it("should return error when query throws during table check", async () => {
195+
const connection = createMockConnection();
196+
connection.query.mockRejectedValue(new Error("connection lost"));
197+
const node = createNode();
198+
const result = await node.execute(
199+
createContext({ database: "db-1", schema: testSchema }, connection)
200+
);
201+
202+
expect(result.status).toBe("error");
203+
expect(result.error).toContain("connection lost");
204+
});
205+
});

packages/runtime/src/nodes/database/database-create-table-node.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ export class DatabaseCreateTableNode extends ExecutableNode {
1818
asTool: true,
1919
inputs: [
2020
{
21-
name: "databaseId",
21+
name: "database",
2222
type: "database",
2323
description: "Database ID.",
2424
required: true,
@@ -42,10 +42,10 @@ export class DatabaseCreateTableNode extends ExecutableNode {
4242
};
4343

4444
async execute(context: NodeContext): Promise<NodeExecution> {
45-
const { databaseId, schema: schemaInput } = context.inputs;
45+
const { database, schema: schemaInput } = context.inputs;
4646

47-
if (!databaseId) {
48-
return this.createErrorResult("'databaseId' is a required input.");
47+
if (!database) {
48+
return this.createErrorResult("'database' is a required input.");
4949
}
5050

5151
if (!schemaInput) {
@@ -71,13 +71,13 @@ export class DatabaseCreateTableNode extends ExecutableNode {
7171
}
7272

7373
const connection = await context.databaseService.resolve(
74-
databaseId,
74+
database,
7575
context.organizationId
7676
);
7777

7878
if (!connection) {
7979
return this.createErrorResult(
80-
`Database '${databaseId}' not found or does not belong to your organization.`
80+
`Database '${database}' not found or does not belong to your organization.`
8181
);
8282
}
8383

0 commit comments

Comments
 (0)