Skip to content

Commit 21ee36d

Browse files
bchapuisclaude
andcommitted
Redesign HITL to use URL-based forms instead of WebSocket
Replace the single WebSocket-dependent human-input node with a two-node architecture: hitl-form (generates a signed form URL) and hitl-wait (pauses until the form is submitted). This allows HITL forms to be sent via any channel (email, SMS, Discord) using existing workflow nodes, without requiring the user to have the editor open. Key changes: - Signed HMAC tokens encode form config in the URL (stateless) - Public /forms/:signedToken API endpoints (no auth, token IS auth) - Public /f/:signedToken frontend page for form submission - One-time submission enforced via WorkflowAgent DO storage - Runtime now applies completed node results immediately in mixed levels (completed + pending), so parallel nodes don't wait Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent c4c8899 commit 21ee36d

24 files changed

Lines changed: 748 additions & 435 deletions

apps/api/src/context.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ export interface Bindings {
6767
R2_SECRET_ACCESS_KEY?: string;
6868
R2_BUCKET_NAME?: string;
6969
SECRET_MASTER_KEY: string;
70+
HITL_SIGNING_KEY: string;
7071
STRIPE_SECRET_KEY?: string;
7172
STRIPE_WEBHOOK_SECRET?: string;
7273
STRIPE_PRICE_ID_PRO?: string;

apps/api/src/durable-objects/workflow-agent.ts

Lines changed: 57 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import type {
2929
WorkflowExecuteMessage,
3030
WorkflowExecution,
3131
WorkflowExecutionUpdateMessage,
32-
WorkflowHumanInputMessage,
3332
WorkflowInitMessage,
3433
WorkflowState,
3534
WorkflowUpdateMessage,
@@ -92,6 +91,7 @@ export class WorkflowAgent extends Agent<Bindings, WorkflowAgentState> {
9291
private static readonly PERSIST_DEBOUNCE_MS = 500;
9392
private static readonly STORAGE_KEY_DIRTY = "dirty:persist";
9493
private static readonly STORAGE_PREFIX_EXEC_BUFFER = "execbuf:";
94+
private static readonly STORAGE_PREFIX_HITL = "hitl:";
9595

9696
initialState: WorkflowAgentState = {};
9797

@@ -199,11 +199,6 @@ export class WorkflowAgent extends Agent<Bindings, WorkflowAgentState> {
199199
parsed as WorkflowExecuteMessage
200200
);
201201
break;
202-
case "human_input":
203-
await this.handleHumanInputMessage(
204-
parsed as WorkflowHumanInputMessage
205-
);
206-
break;
207202
default:
208203
connection.close(1003, "Unknown message type");
209204
break;
@@ -436,24 +431,6 @@ export class WorkflowAgent extends Agent<Bindings, WorkflowAgentState> {
436431
}
437432
}
438433

439-
private async handleHumanInputMessage(
440-
message: WorkflowHumanInputMessage
441-
): Promise<void> {
442-
const { executionId, nodeId, response } = message;
443-
const instance = await this.env.EXECUTE.get(executionId);
444-
await instance.sendEvent({
445-
type: `human-input-${nodeId}`,
446-
payload: {
447-
outputs: {
448-
response: response.text ?? "",
449-
approved: response.approved ?? false,
450-
metadata: response.metadata ?? {},
451-
},
452-
usage: 0,
453-
},
454-
});
455-
}
456-
457434
private async subscribeToExecution(
458435
connection: Connection,
459436
executionId: string
@@ -593,6 +570,62 @@ export class WorkflowAgent extends Agent<Bindings, WorkflowAgentState> {
593570
}
594571
}
595572

573+
// ── HITL form state ───────────────────────────────────────────────────
574+
575+
/**
576+
* Check if a HITL form has already been submitted.
577+
* Returns `{ submitted: boolean }`.
578+
*/
579+
async getHitlFormStatus(token: string): Promise<{ submitted: boolean }> {
580+
const key = WorkflowAgent.STORAGE_PREFIX_HITL + token;
581+
const record = await this.storage.get<{ submitted: boolean }>(key);
582+
return { submitted: record?.submitted ?? false };
583+
}
584+
585+
/**
586+
* Atomically check-and-submit a HITL form response.
587+
* Rejects duplicate submissions. On success, sends the event to the
588+
* EXECUTE workflow instance to resume the paused node.
589+
*/
590+
async checkAndSubmitHitlForm(
591+
token: string,
592+
executionId: string,
593+
response: { text?: string; approved?: boolean; metadata?: Record<string, unknown> }
594+
): Promise<{ success: boolean; error?: string }> {
595+
const key = WorkflowAgent.STORAGE_PREFIX_HITL + token;
596+
const existing = await this.storage.get<{ submitted: boolean }>(key);
597+
598+
if (existing?.submitted) {
599+
return { success: false, error: "Form has already been submitted" };
600+
}
601+
602+
// Mark as submitted before sending event (fail-safe: prevents double-submit
603+
// even if the event send fails)
604+
await this.storage.put(key, { submitted: true, submittedAt: Date.now() });
605+
606+
try {
607+
const instance = await this.env.EXECUTE.get(executionId);
608+
await instance.sendEvent({
609+
type: `hitl-response-${token}`,
610+
payload: {
611+
outputs: {
612+
response: response.text ?? "",
613+
approved: response.approved ?? false,
614+
metadata: response.metadata ?? {},
615+
},
616+
usage: 0,
617+
},
618+
});
619+
return { success: true };
620+
} catch (error) {
621+
console.error("Failed to send HITL event:", error);
622+
return {
623+
success: false,
624+
error: "Failed to resume workflow. The execution may have expired.",
625+
};
626+
}
627+
}
628+
596629
// ── Persistence ───────────────────────────────────────────────────────
597630

598631
/**

apps/api/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import endpointExecuteRoutes from "./routes/endpoint-execute";
2020
import endpointRoutes from "./routes/endpoints";
2121
import executionRoutes from "./routes/executions";
2222
import feedbackRoutes from "./routes/feedback";
23+
import formRoutes from "./routes/forms";
2324
import health from "./routes/health";
2425
import integrationRoutes from "./routes/integrations";
2526
import invitationRoutes from "./routes/invitations";
@@ -106,6 +107,7 @@ app.route("/queues", queuePublishRoutes);
106107
app.route("/replicate", replicateRoutes);
107108

108109
// Public routes
110+
app.route("/forms", formRoutes);
109111
app.route("/templates", templateRoutes);
110112
app.route("/types", typeRoutes);
111113

apps/api/src/mocks/node-registry.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ import {
66
import { NumberInputNode } from "@dafthunk/runtime/nodes/input/number-input-node";
77
import { ConditionalForkNode } from "@dafthunk/runtime/nodes/logic/conditional-fork-node";
88
import { ConditionalJoinNode } from "@dafthunk/runtime/nodes/logic/conditional-join-node";
9-
import { HumanInputNode } from "@dafthunk/runtime/nodes/logic/human-input-node";
9+
import { HitlFormNode } from "@dafthunk/runtime/nodes/logic/hitl-form-node";
10+
import { HitlWaitNode } from "@dafthunk/runtime/nodes/logic/hitl-wait-node";
1011
import { AdditionNode } from "@dafthunk/runtime/nodes/math/addition-node";
1112
import { AvgNode } from "@dafthunk/runtime/nodes/math/avg-node";
1213
import { DivisionNode } from "@dafthunk/runtime/nodes/math/division-node";
@@ -30,7 +31,7 @@ import type { Bindings } from "../context";
3031
* - Addition, Subtraction, Multiplication, Division
3132
* - Number Input
3233
* - Sum, Max, Min, Avg, Median
33-
* - Conditional Fork, Conditional Join, Human Input
34+
* - Conditional Fork, Conditional Join, HITL Form, HITL Wait
3435
* - Multi-Step Addition, Failing Multi-Step (test nodes)
3536
*/
3637
export class MockNodeRegistry extends BaseNodeRegistry<Bindings> {
@@ -47,7 +48,8 @@ export class MockNodeRegistry extends BaseNodeRegistry<Bindings> {
4748
this.registerImplementation(MedianNode);
4849
this.registerImplementation(ConditionalForkNode);
4950
this.registerImplementation(ConditionalJoinNode);
50-
this.registerImplementation(HumanInputNode);
51+
this.registerImplementation(HitlFormNode);
52+
this.registerImplementation(HitlWaitNode);
5153
this.registerImplementation(MultiStepAdditionNode);
5254
this.registerImplementation(FailingMultiStepNode);
5355
}

apps/api/src/routes/executions.ts

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -116,47 +116,4 @@ executionRoutes.get("/", jwtMiddleware, async (c) => {
116116
return c.json(response);
117117
});
118118

119-
/**
120-
* Submit human input for a pending node in a running execution.
121-
* Used for headless triggers (email, HTTP, cron) where no WebSocket is available.
122-
*/
123-
executionRoutes.post(
124-
"/:executionId/nodes/:nodeId/input",
125-
apiKeyOrJwtMiddleware,
126-
async (c) => {
127-
const executionId = c.req.param("executionId");
128-
const nodeId = c.req.param("nodeId");
129-
130-
if (!isUuid(executionId)) {
131-
return c.json({ error: "Invalid execution ID format" }, 400);
132-
}
133-
134-
const body = await c.req.json<{
135-
text?: string;
136-
approved?: boolean;
137-
metadata?: Record<string, unknown>;
138-
}>();
139-
140-
try {
141-
const instance = await c.env.EXECUTE.get(executionId);
142-
await instance.sendEvent({
143-
type: `human-input-${nodeId}`,
144-
payload: {
145-
outputs: {
146-
response: body.text ?? "",
147-
approved: body.approved ?? false,
148-
metadata: body.metadata ?? {},
149-
},
150-
usage: 0,
151-
},
152-
});
153-
154-
return c.json({ success: true });
155-
} catch (error) {
156-
console.error("Error submitting human input:", error);
157-
return c.json({ error: "Failed to submit human input" }, 500);
158-
}
159-
}
160-
);
161-
162119
export default executionRoutes;

apps/api/src/routes/forms.ts

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/**
2+
* Public HITL Form Routes
3+
*
4+
* These routes are unauthenticated — the signed token IS the authorization.
5+
* They allow external users to view and submit human-in-the-loop forms
6+
* via a shareable URL.
7+
*/
8+
9+
import { verifyHitlToken } from "@dafthunk/runtime";
10+
import { Hono } from "hono";
11+
12+
import type { ApiContext } from "../context";
13+
import { getAgentByName } from "../durable-objects/agent-utils";
14+
15+
const formRoutes = new Hono<ApiContext>();
16+
17+
/**
18+
* GET /forms/:signedToken
19+
*
20+
* Returns the form configuration (prompt, context, input type) and
21+
* submission status. No authentication required.
22+
*/
23+
formRoutes.get("/:signedToken", async (c) => {
24+
const signedToken = c.req.param("signedToken");
25+
const payload = await verifyHitlToken(signedToken, c.env.HITL_SIGNING_KEY);
26+
27+
if (!payload) {
28+
return c.json({ error: "Invalid or expired form link" }, 400);
29+
}
30+
31+
try {
32+
const agent = await getAgentByName(c.env.WORKFLOW_AGENT, payload.wid);
33+
const { submitted } = await agent.getHitlFormStatus(payload.tok);
34+
35+
return c.json({
36+
prompt: payload.p,
37+
context: payload.c,
38+
inputType: payload.t,
39+
submitted,
40+
});
41+
} catch (error) {
42+
console.error("Error loading form:", error);
43+
return c.json({ error: "Failed to load form" }, 500);
44+
}
45+
});
46+
47+
/**
48+
* POST /forms/:signedToken
49+
*
50+
* Submit a form response. Validates the token, checks for duplicate
51+
* submissions, and sends the event to resume the paused workflow.
52+
*/
53+
formRoutes.post("/:signedToken", async (c) => {
54+
const signedToken = c.req.param("signedToken");
55+
const payload = await verifyHitlToken(signedToken, c.env.HITL_SIGNING_KEY);
56+
57+
if (!payload) {
58+
return c.json({ error: "Invalid or expired form link" }, 400);
59+
}
60+
61+
const body = await c.req.json<{
62+
text?: string;
63+
approved?: boolean;
64+
metadata?: Record<string, unknown>;
65+
}>();
66+
67+
try {
68+
const agent = await getAgentByName(c.env.WORKFLOW_AGENT, payload.wid);
69+
const result = await agent.checkAndSubmitHitlForm(
70+
payload.tok,
71+
payload.eid,
72+
{
73+
text: body.text,
74+
approved: body.approved,
75+
metadata: body.metadata,
76+
}
77+
);
78+
79+
if (!result.success) {
80+
return c.json({ error: result.error }, 409);
81+
}
82+
83+
return c.json({ success: true });
84+
} catch (error) {
85+
console.error("Error submitting form:", error);
86+
return c.json({ error: "Failed to submit form" }, 500);
87+
}
88+
});
89+
90+
export default formRoutes;

apps/api/src/runtime/cloudflare-node-registry.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,8 @@ import { LikePostLinkedInNode } from "@dafthunk/runtime/nodes/linkedin/like-post
352352
import { SharePostLinkedInNode } from "@dafthunk/runtime/nodes/linkedin/share-post-linkedin-node";
353353
import { ConditionalForkNode } from "@dafthunk/runtime/nodes/logic/conditional-fork-node";
354354
import { ConditionalJoinNode } from "@dafthunk/runtime/nodes/logic/conditional-join-node";
355-
import { HumanInputNode } from "@dafthunk/runtime/nodes/logic/human-input-node";
355+
import { HitlFormNode } from "@dafthunk/runtime/nodes/logic/hitl-form-node";
356+
import { HitlWaitNode } from "@dafthunk/runtime/nodes/logic/hitl-wait-node";
356357
import { AbsoluteValueNode } from "@dafthunk/runtime/nodes/math/absolute-value-node";
357358
import { AdditionNode } from "@dafthunk/runtime/nodes/math/addition-node";
358359
import { AvgNode } from "@dafthunk/runtime/nodes/math/avg-node";
@@ -673,7 +674,8 @@ export class CloudflareNodeRegistry extends BaseNodeRegistry<Bindings> {
673674
this.registerImplementation(BooleanInputNode);
674675
this.registerImplementation(ConditionalForkNode);
675676
this.registerImplementation(ConditionalJoinNode);
676-
this.registerImplementation(HumanInputNode);
677+
this.registerImplementation(HitlFormNode);
678+
this.registerImplementation(HitlWaitNode);
677679

678680
// Image operations
679681
this.registerImplementation(PhotonAddNoiseNode);

apps/api/src/utils/encryption.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const createMockEnv = (masterKey?: string): Bindings => ({
3333
WEBSITE_URL: "",
3434
EMAIL_DOMAIN: "",
3535
JWT_SECRET: "",
36+
HITL_SIGNING_KEY: "test-hitl-signing-key",
3637
CLOUDFLARE_ENV: "",
3738
CLOUDFLARE_ACCOUNT_ID: "",
3839
CLOUDFLARE_API_TOKEN: "",

0 commit comments

Comments
 (0)