Skip to content

Commit f5ddc63

Browse files
bchapuisclaude
andcommitted
Add JSON Schema Compose node for building records from schema fields
Inverse of JsonSchemaExtractNode — takes individual field inputs and combines them into a single JSON record. The widget uses a single atomic updateNodeData call to avoid race conditions between competing input state updates. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 9dc0178 commit f5ddc63

5 files changed

Lines changed: 387 additions & 0 deletions

File tree

apps/api/src/runtime/cloudflare-node-registry.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ import { JsonObjectKeysNode } from "@dafthunk/runtime/nodes/json/json-object-key
325325
import { JsonObjectValuesNode } from "@dafthunk/runtime/nodes/json/json-object-values-node";
326326
import { JsonRemoveNode } from "@dafthunk/runtime/nodes/json/json-remove-node";
327327
import { JsonReplaceNode } from "@dafthunk/runtime/nodes/json/json-replace-node";
328+
import { JsonSchemaComposeNode } from "@dafthunk/runtime/nodes/json/json-schema-compose-node";
328329
import { JsonSchemaExtractNode } from "@dafthunk/runtime/nodes/json/json-schema-extract-node";
329330
import { JsonSetNode } from "@dafthunk/runtime/nodes/json/json-set-node";
330331
import { JsonStripNullsNode } from "@dafthunk/runtime/nodes/json/json-strip-nulls-node";
@@ -614,6 +615,7 @@ export class CloudflareNodeRegistry extends BaseNodeRegistry<Bindings> {
614615
this.registerImplementation(JsonToBlobNode);
615616
this.registerImplementation(JsonToGeojsonNode);
616617
this.registerImplementation(JsonTypeofNode);
618+
this.registerImplementation(JsonSchemaComposeNode);
617619
this.registerImplementation(JsonSchemaExtractNode);
618620
this.registerImplementation(JsonValidNode);
619621
this.registerImplementation(JsonStringTemplateNode);

apps/app/src/components/workflow/widgets/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import { javascriptInputWidget } from "./input/javascript-input";
2929
import { jsonInputWidget } from "./input/json-input";
3030
import { numberInputWidget } from "./input/number-input";
3131
import { replicateModelInputWidget } from "./input/replicate-model-input";
32+
import { schemaComposeInputWidget } from "./input/schema-compose-input";
3233
import { schemaExtractInputWidget } from "./input/schema-extract-input";
3334
import { secretInputWidget } from "./input/secret-input";
3435
import { sliderInputWidget } from "./input/slider-input";
@@ -86,6 +87,7 @@ const widgets = [
8687
audioRecorderInputWidget,
8788
canvasInputWidget,
8889
replicateModelInputWidget,
90+
schemaComposeInputWidget,
8991
schemaExtractInputWidget,
9092
createDynamicInputsWidget("string-concat", {
9193
prefix: "input",
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
import type { Field, FieldType, GetSchemaResponse } from "@dafthunk/types";
2+
import { useCallback, useState } from "react";
3+
4+
import { useAuth } from "@/components/auth-context";
5+
import { SchemaDialog } from "@/components/schema-dialog";
6+
import {
7+
Select,
8+
SelectContent,
9+
SelectItem,
10+
SelectSeparator,
11+
SelectTrigger,
12+
SelectValue,
13+
} from "@/components/ui/select";
14+
import { useSchemas } from "@/services/schema-service";
15+
import { makeOrgRequest } from "@/services/utils";
16+
import { cn } from "@/utils/utils";
17+
18+
import { useWorkflow } from "../../workflow-context";
19+
import type { WorkflowParameter } from "../../workflow-types";
20+
import type { BaseWidgetProps } from "../widget";
21+
import { createWidget, getInputValue } from "../widget";
22+
23+
const CREATE_NEW = "__create_new__";
24+
25+
const FIELD_TYPE_TO_PARAMETER_TYPE: Record<FieldType, string> = {
26+
string: "string",
27+
integer: "number",
28+
number: "number",
29+
boolean: "boolean",
30+
datetime: "date",
31+
json: "json",
32+
};
33+
34+
interface SchemaComposeInputProps extends BaseWidgetProps {
35+
nodeId: string;
36+
schemaId: string;
37+
hasSchemaInputs: boolean;
38+
}
39+
40+
function SchemaComposeInputWidget({
41+
nodeId,
42+
schemaId,
43+
className,
44+
disabled = false,
45+
}: SchemaComposeInputProps) {
46+
const [loading, setLoading] = useState(false);
47+
const [isCreateDialogOpen, setIsCreateDialogOpen] = useState(false);
48+
const { schemas, isSchemasLoading, mutateSchemas } = useSchemas();
49+
const { organization } = useAuth();
50+
const { updateNodeData, edges, deleteEdge } = useWorkflow();
51+
52+
const applySchema = useCallback(
53+
async (selectedSchemaId: string) => {
54+
if (!organization?.id || !updateNodeData) return;
55+
56+
setLoading(true);
57+
try {
58+
const response = await makeOrgRequest<GetSchemaResponse>(
59+
organization.id,
60+
"/schemas",
61+
`/${selectedSchemaId}`
62+
);
63+
64+
const fieldInputs = response.schema.fields.map(
65+
(field) =>
66+
({
67+
id: field.name,
68+
name: field.name,
69+
type: FIELD_TYPE_TO_PARAMETER_TYPE[field.type] ?? "any",
70+
}) as WorkflowParameter
71+
);
72+
73+
// Remove all edges connected to this node (schema change breaks connections)
74+
if (edges && deleteEdge) {
75+
for (const edge of edges) {
76+
if (edge.target === nodeId || edge.source === nodeId) {
77+
deleteEdge(edge.id);
78+
}
79+
}
80+
}
81+
82+
// Build the schema input from scratch to avoid race conditions
83+
// between competing input updates (onChange vs updateNodeData)
84+
updateNodeData(nodeId, (current) => {
85+
const existingSchema = current.inputs?.find(
86+
(i) => i.id === "schema"
87+
);
88+
const schemaInput = {
89+
...(existingSchema ?? {
90+
id: "schema",
91+
name: "schema",
92+
type: "schema" as const,
93+
hidden: true,
94+
required: true,
95+
}),
96+
value: selectedSchemaId,
97+
} as WorkflowParameter;
98+
return {
99+
inputs: [schemaInput, ...fieldInputs],
100+
};
101+
});
102+
} finally {
103+
setLoading(false);
104+
}
105+
},
106+
[organization?.id, updateNodeData, edges, deleteEdge, nodeId]
107+
);
108+
109+
const handleChange = useCallback(
110+
(val: string) => {
111+
if (val === CREATE_NEW) {
112+
setIsCreateDialogOpen(true);
113+
return;
114+
}
115+
applySchema(val);
116+
},
117+
[applySchema]
118+
);
119+
120+
const handleCreate = useCallback(
121+
async (data: { name: string; description: string; fields: Field[] }) => {
122+
if (!organization?.id) return;
123+
const { createSchema } = await import("@/services/schema-service");
124+
const response = await createSchema(data, organization.id);
125+
await mutateSchemas();
126+
applySchema(response.schema.id);
127+
},
128+
[organization?.id, mutateSchemas, applySchema]
129+
);
130+
131+
const isLoading = loading || isSchemasLoading;
132+
const selectedName = schemas?.find((s) => s.id === schemaId)?.name;
133+
134+
return (
135+
<div className={cn("p-2", className)}>
136+
<Select
137+
value={schemaId || ""}
138+
onValueChange={handleChange}
139+
disabled={disabled || isLoading}
140+
>
141+
<SelectTrigger className="h-auto text-xs">
142+
<SelectValue
143+
placeholder={
144+
isLoading
145+
? "Loading..."
146+
: schemas?.length === 0
147+
? "No schemas"
148+
: "Select schema"
149+
}
150+
>
151+
{selectedName}
152+
</SelectValue>
153+
</SelectTrigger>
154+
<SelectContent>
155+
{schemas?.map((schema) => (
156+
<SelectItem key={schema.id} value={schema.id} className="text-xs">
157+
{schema.name}
158+
</SelectItem>
159+
))}
160+
<SelectSeparator />
161+
<SelectItem value={CREATE_NEW} className="text-xs">
162+
+ New Schema
163+
</SelectItem>
164+
</SelectContent>
165+
</Select>
166+
167+
<SchemaDialog
168+
open={isCreateDialogOpen}
169+
onOpenChange={setIsCreateDialogOpen}
170+
onSubmit={handleCreate}
171+
title="Create New Schema"
172+
submitLabel="Create Schema"
173+
/>
174+
</div>
175+
);
176+
}
177+
178+
export const schemaComposeInputWidget = createWidget({
179+
component: SchemaComposeInputWidget,
180+
nodeTypes: ["json-schema-compose"],
181+
inputField: "schema",
182+
extractConfig: (nodeId, inputs) => ({
183+
nodeId,
184+
schemaId: getInputValue(inputs, "schema", ""),
185+
hasSchemaInputs: (inputs ?? []).length > 1,
186+
}),
187+
});
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import type { NodeContext } from "@dafthunk/runtime";
2+
import type { Node, Schema } from "@dafthunk/types";
3+
import { describe, expect, it } from "vitest";
4+
import { JsonSchemaComposeNode } from "./json-schema-compose-node";
5+
6+
const testSchema: Schema = {
7+
name: "person",
8+
fields: [
9+
{ name: "name", type: "string", required: true },
10+
{ name: "age", type: "integer" },
11+
{ name: "active", type: "boolean" },
12+
],
13+
};
14+
15+
function createNode() {
16+
return new JsonSchemaComposeNode({
17+
nodeId: "test",
18+
} as unknown as Node);
19+
}
20+
21+
function createContext(inputs: Record<string, unknown>): NodeContext {
22+
return {
23+
nodeId: "test",
24+
organizationId: "org-1",
25+
inputs,
26+
getIntegration: async () => {
27+
throw new Error("No integrations in test");
28+
},
29+
} as unknown as NodeContext;
30+
}
31+
32+
describe("JsonSchemaComposeNode", () => {
33+
it("should compose a record from individual field inputs", async () => {
34+
const node = createNode();
35+
const context = createContext({
36+
schema: testSchema,
37+
name: "Alice",
38+
age: 30,
39+
active: true,
40+
});
41+
42+
const result = await node.execute(context);
43+
expect(result.status).toBe("completed");
44+
expect(result.outputs?.record).toEqual({
45+
name: "Alice",
46+
age: 30,
47+
active: true,
48+
});
49+
});
50+
51+
it("should output null for missing field inputs", async () => {
52+
const node = createNode();
53+
const context = createContext({
54+
schema: testSchema,
55+
name: "Bob",
56+
});
57+
58+
const result = await node.execute(context);
59+
expect(result.status).toBe("completed");
60+
expect(result.outputs?.record).toEqual({
61+
name: "Bob",
62+
age: null,
63+
active: null,
64+
});
65+
});
66+
67+
it("should ignore extra inputs not in the schema", async () => {
68+
const node = createNode();
69+
const context = createContext({
70+
schema: testSchema,
71+
name: "Carol",
72+
age: 25,
73+
active: false,
74+
email: "c@x.com",
75+
});
76+
77+
const result = await node.execute(context);
78+
expect(result.status).toBe("completed");
79+
const record = result.outputs?.record as Record<string, unknown>;
80+
expect(record.name).toBe("Carol");
81+
expect(record.email).toBeUndefined();
82+
});
83+
84+
it("should error when schema is missing", async () => {
85+
const node = createNode();
86+
const context = createContext({ name: "X" });
87+
88+
const result = await node.execute(context);
89+
expect(result.status).toBe("error");
90+
expect(result.error).toContain("schema must be selected");
91+
});
92+
93+
it("should error when schema is not a valid object", async () => {
94+
const node = createNode();
95+
const context = createContext({
96+
schema: "not-an-object",
97+
name: "X",
98+
});
99+
100+
const result = await node.execute(context);
101+
expect(result.status).toBe("error");
102+
expect(result.error).toContain("schema must be selected");
103+
});
104+
});

0 commit comments

Comments
 (0)