From a73e998ac66d0ac006fc6f8216f6853f35cdf344 Mon Sep 17 00:00:00 2001 From: Bertil Chapuis Date: Sun, 22 Mar 2026 06:51:26 +0100 Subject: [PATCH 1/3] Add DafthunkAgent: conversational assistant with workflow editing, node search, and resource creation Full-page chat UI with multi-conversation support (SQLite-backed), Claude Opus 4.6, and 16 tools: org state, templates, node library search, workflow CRUD + editing, secrets, endpoints, emails, queues, datasets, databases, and integration OAuth linking. Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/api/src/context.ts | 2 + .../api/src/durable-objects/dafthunk-agent.ts | 1180 +++++++++++++++++ apps/api/src/index.ts | 3 + apps/api/src/routes/onboarding.ts | 35 + apps/api/src/utils/encryption.test.ts | 1 + apps/api/wrangler.jsonc | 18 + .../onboarding/onboarding-chat-panel.tsx | 211 +++ apps/app/src/hooks/use-onboarding-chat.ts | 113 ++ apps/app/src/pages/dashboard-page.tsx | 7 + apps/app/src/routes.tsx | 13 +- .../src/services/onboarding-chat-service.ts | 173 +++ packages/types/src/index.ts | 1 + packages/types/src/onboarding.ts | 36 + 13 files changed, 1792 insertions(+), 1 deletion(-) create mode 100644 apps/api/src/durable-objects/dafthunk-agent.ts create mode 100644 apps/api/src/routes/onboarding.ts create mode 100644 apps/app/src/components/onboarding/onboarding-chat-panel.tsx create mode 100644 apps/app/src/hooks/use-onboarding-chat.ts create mode 100644 apps/app/src/services/onboarding-chat-service.ts create mode 100644 packages/types/src/onboarding.ts diff --git a/apps/api/src/context.ts b/apps/api/src/context.ts index b9d3da72..b00ae910 100644 --- a/apps/api/src/context.ts +++ b/apps/api/src/context.ts @@ -2,6 +2,7 @@ import type { RuntimeParams } from "@dafthunk/runtime"; import { JWTTokenPayload } from "@dafthunk/types"; import type { FFmpegContainer } from "./containers/ffmpeg-container"; import type { AgentRunner } from "./durable-objects/agent-runner"; +import type { DafthunkAgent } from "./durable-objects/dafthunk-agent"; import { DatabaseDO } from "./durable-objects/database-do"; import type { WorkflowAgent } from "./durable-objects/workflow-agent"; @@ -15,6 +16,7 @@ export interface Bindings { WORKFLOW_AGENT: DurableObjectNamespace; DATABASE: DurableObjectNamespace; AGENT_RUNNER: DurableObjectNamespace; + DAFTHUNK_AGENT: DurableObjectNamespace; FFMPEG_CONTAINER?: DurableObjectNamespace; DUCKDB_SANDBOX?: DurableObjectNamespace; WORKFLOW_QUEUE: Queue; diff --git a/apps/api/src/durable-objects/dafthunk-agent.ts b/apps/api/src/durable-objects/dafthunk-agent.ts new file mode 100644 index 00000000..cd0a8e9d --- /dev/null +++ b/apps/api/src/durable-objects/dafthunk-agent.ts @@ -0,0 +1,1180 @@ +import Anthropic from "@anthropic-ai/sdk"; +import { getAnthropicConfig } from "@dafthunk/runtime/utils/ai-gateway"; +import type { + OnboardingChatMessage, + OnboardingClientMessage, + OnboardingServerMessage, +} from "@dafthunk/types"; +import { Agent } from "agents"; +import type { Connection } from "partyserver"; +import { v7 as uuidv7 } from "uuid"; + +import type { Bindings } from "../context"; +import { + createDatabase, + createDatabaseRecord, + createDataset, + createEmail, + createEndpoint, + createQueue, + createSecret, + getIntegrations, + getSecrets, +} from "../db"; +import { CloudflareNodeRegistry } from "../runtime/cloudflare-node-registry"; +import { WorkflowStore } from "../stores/workflow-store"; +import { workflowTemplates } from "../templates"; + +const MODEL = "claude-opus-4-20250514"; + +interface UserProfile { + expertiseLevel?: "beginner" | "intermediate" | "advanced"; + preferredTone?: "casual" | "professional" | "technical"; + domain?: string; +} + +export interface OnboardingAgentState { + organizationId: string; + userId: string; + activeConversationId: string; + userProfile: UserProfile; +} + +const TOOLS: Anthropic.Tool[] = [ + // ── Discovery tools ─────────────────────────────────────────────── + { + name: "getOrgState", + description: + "Get org state: integrations, secrets, workflows, endpoints, emails, queues, datasets, databases.", + input_schema: { + type: "object" as const, + properties: {}, + required: [], + }, + }, + { + name: "listTemplates", + description: "List available workflow templates. Optionally filter by tag.", + input_schema: { + type: "object" as const, + properties: { + tag: { type: "string", description: "Filter by tag" }, + }, + required: [], + }, + }, + { + name: "checkTemplateRequirements", + description: "Check if the org has integrations needed for a template.", + input_schema: { + type: "object" as const, + properties: { + templateId: { type: "string", description: "Template ID" }, + }, + required: ["templateId"], + }, + }, + { + name: "getSetupUrl", + description: "Get URL for a settings page (integrations, secrets, etc).", + input_schema: { + type: "object" as const, + properties: { + page: { + type: "string", + enum: [ + "integrations", + "secrets", + "endpoints", + "emails", + "queues", + "datasets", + "databases", + ], + description: "Settings page", + }, + }, + required: ["page"], + }, + }, + // ── Creation tools ──────────────────────────────────────────────── + { + name: "createWorkflowFromTemplate", + description: "Create a workflow from a template.", + input_schema: { + type: "object" as const, + properties: { + templateId: { type: "string", description: "Template ID" }, + customName: { type: "string", description: "Custom workflow name" }, + aiPromptOverrides: { + type: "object", + description: "Node ID → custom prompt for AI nodes", + additionalProperties: { type: "string" }, + }, + }, + required: ["templateId"], + }, + }, + { + name: "createSecret", + description: + "Store a secret (API key, token). Only use when the user explicitly provides a credential value.", + input_schema: { + type: "object" as const, + properties: { + name: { + type: "string", + description: "Secret name (e.g. OPENAI_API_KEY)", + }, + value: { type: "string", description: "Secret value" }, + }, + required: ["name", "value"], + }, + }, + { + name: "createEndpoint", + description: "Create an HTTP endpoint (webhook or request-response).", + input_schema: { + type: "object" as const, + properties: { + name: { type: "string", description: "Endpoint name" }, + mode: { + type: "string", + enum: ["webhook", "request"], + description: + "webhook (fire-and-forget) or request (wait for response)", + }, + }, + required: ["name", "mode"], + }, + }, + { + name: "createEmail", + description: + "Create an @dafthunk.com email address for triggering workflows.", + input_schema: { + type: "object" as const, + properties: { + name: { + type: "string", + description: "Email prefix (e.g. 'support' → support@dafthunk.com)", + }, + }, + required: ["name"], + }, + }, + { + name: "createQueue", + description: "Create a message queue for async workflow triggering.", + input_schema: { + type: "object" as const, + properties: { + name: { type: "string", description: "Queue name" }, + }, + required: ["name"], + }, + }, + { + name: "createDataset", + description: "Create a dataset for storing files (CSV, JSON, etc).", + input_schema: { + type: "object" as const, + properties: { + name: { type: "string", description: "Dataset name" }, + }, + required: ["name"], + }, + }, + { + name: "createDatabase", + description: "Create a SQLite database for structured data storage.", + input_schema: { + type: "object" as const, + properties: { + name: { type: "string", description: "Database name" }, + }, + required: ["name"], + }, + }, + { + name: "connectIntegration", + description: + "Get the OAuth URL to connect an integration. Returns a link the user must open in their browser.", + input_schema: { + type: "object" as const, + properties: { + provider: { + type: "string", + enum: [ + "google-mail", + "google-calendar", + "discord", + "github", + "reddit", + "linkedin", + ], + description: "Integration provider", + }, + }, + required: ["provider"], + }, + }, + // ── Node & workflow editing tools ───────────────────────────────── + { + name: "searchNodes", + description: + "Search the node library by keyword. Returns matching node types with descriptions, inputs, and outputs.", + input_schema: { + type: "object" as const, + properties: { + query: { + type: "string", + description: "Search keyword (matches name, description, tags)", + }, + }, + required: ["query"], + }, + }, + { + name: "getWorkflow", + description: + "Read a workflow's full definition: nodes, edges, trigger, and metadata.", + input_schema: { + type: "object" as const, + properties: { + workflowId: { type: "string", description: "Workflow ID" }, + }, + required: ["workflowId"], + }, + }, + { + name: "updateWorkflow", + description: + "Update a workflow. Can rename, change description/trigger, update node inputs, add nodes, remove nodes, add edges, or remove edges.", + input_schema: { + type: "object" as const, + properties: { + workflowId: { type: "string", description: "Workflow ID" }, + name: { type: "string", description: "New workflow name" }, + description: { type: "string", description: "New description" }, + trigger: { type: "string", description: "New trigger type" }, + updateNodeInputs: { + type: "array", + description: "Update inputs on existing nodes", + items: { + type: "object", + properties: { + nodeId: { type: "string" }, + inputs: { + type: "object", + description: "Input name → new value", + additionalProperties: {}, + }, + }, + required: ["nodeId", "inputs"], + }, + }, + addNodes: { + type: "array", + description: "Nodes to add (type, name, position)", + items: { + type: "object", + properties: { + type: { + type: "string", + description: "Node type from searchNodes", + }, + name: { type: "string", description: "Display name" }, + positionX: { type: "number" }, + positionY: { type: "number" }, + inputs: { + type: "object", + description: "Input name → value", + additionalProperties: {}, + }, + }, + required: ["type"], + }, + }, + removeNodes: { + type: "array", + description: "Node IDs to remove", + items: { type: "string" }, + }, + addEdges: { + type: "array", + description: "Edges to add", + items: { + type: "object", + properties: { + source: { type: "string", description: "Source node ID" }, + sourceOutput: { + type: "string", + description: "Source output name", + }, + target: { type: "string", description: "Target node ID" }, + targetInput: { type: "string", description: "Target input name" }, + }, + required: ["source", "sourceOutput", "target", "targetInput"], + }, + }, + removeEdges: { + type: "array", + description: "Edges to remove (by source+target node IDs)", + items: { + type: "object", + properties: { + source: { type: "string" }, + target: { type: "string" }, + }, + required: ["source", "target"], + }, + }, + }, + required: ["workflowId"], + }, + }, +]; + +const AI_NODE_TYPES = [ + "anthropic-chat", + "openai-chat", + "gemini-chat", + "workers-ai-chat", +]; + +const INTEGRATION_NODE_MAP: Record = { + "discord-send-message": "discord", + "discord-message": "discord", + "telegram-send-message": "telegram", + "telegram-message": "telegram", + "whatsapp-send-message": "whatsapp", + "whatsapp-message": "whatsapp", + "google-mail-send": "google-mail", + "google-calendar-list-events": "google-calendar", + "github-create-issue": "github", +}; + +const TOOL_DESCRIPTIONS: Record = { + getOrgState: "Checking org state...", + listTemplates: "Browsing templates...", + checkTemplateRequirements: "Checking requirements...", + createWorkflowFromTemplate: "Creating workflow...", + getSetupUrl: "Getting link...", + createSecret: "Storing secret...", + createEndpoint: "Creating endpoint...", + createEmail: "Creating email...", + createQueue: "Creating queue...", + createDataset: "Creating dataset...", + createDatabase: "Creating database...", + connectIntegration: "Getting OAuth link...", + searchNodes: "Searching nodes...", + getWorkflow: "Reading workflow...", + updateWorkflow: "Updating workflow...", +}; + +export class DafthunkAgent extends Agent { + private schemaInitialized = false; + + constructor(ctx: DurableObjectState, env: Bindings) { + super(ctx, env); + } + + private ensureSchema(): void { + if (this.schemaInitialized) return; + this.ctx.storage.sql.exec(` + CREATE TABLE IF NOT EXISTS conversations ( + id TEXT PRIMARY KEY, + title TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ) + `); + this.ctx.storage.sql.exec(` + CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + conversation_id TEXT NOT NULL REFERENCES conversations(id) ON DELETE CASCADE, + role TEXT NOT NULL, + content TEXT NOT NULL, + timestamp INTEGER NOT NULL + ) + `); + this.schemaInitialized = true; + } + + private createConversation(title: string): string { + const id = uuidv7(); + const now = Date.now(); + this.ctx.storage.sql.exec( + "INSERT INTO conversations (id, title, created_at, updated_at) VALUES (?, ?, ?, ?)", + id, + title, + now, + now + ); + return id; + } + + private getConversationMessages( + conversationId: string + ): OnboardingChatMessage[] { + const rows = this.ctx.storage.sql + .exec( + "SELECT role, content, timestamp FROM messages WHERE conversation_id = ? ORDER BY id", + conversationId + ) + .toArray(); + return rows.map((r) => ({ + role: r.role as "user" | "assistant", + content: r.content as string, + timestamp: r.timestamp as number, + })); + } + + private addMessage( + conversationId: string, + role: "user" | "assistant", + content: string + ): void { + const now = Date.now(); + this.ctx.storage.sql.exec( + "INSERT INTO messages (conversation_id, role, content, timestamp) VALUES (?, ?, ?, ?)", + conversationId, + role, + content, + now + ); + this.ctx.storage.sql.exec( + "UPDATE conversations SET updated_at = ? WHERE id = ?", + now, + conversationId + ); + } + + private listConversations(): { + id: string; + title: string; + createdAt: number; + updatedAt: number; + }[] { + const rows = this.ctx.storage.sql + .exec( + "SELECT id, title, created_at, updated_at FROM conversations ORDER BY updated_at DESC" + ) + .toArray(); + return rows.map((r) => ({ + id: r.id as string, + title: r.title as string, + createdAt: r.created_at as number, + updatedAt: r.updated_at as number, + })); + } + + private generateTitle(content: string): string { + return content.length > 50 ? `${content.slice(0, 47)}...` : content; + } + + onConnect(connection: Connection, ctx: { request: Request }): void { + this.ensureSchema(); + + const userId = ctx.request.headers.get("X-User-Id") ?? ""; + const orgId = ctx.request.headers.get("X-Organization-Id") ?? ""; + + if (!this.state) { + const convId = this.createConversation("New conversation"); + this.setState({ + organizationId: orgId, + userId, + activeConversationId: convId, + userProfile: {}, + }); + } + + const activeId = this.state!.activeConversationId; + const messages = this.getConversationMessages(activeId); + + connection.send( + JSON.stringify({ + type: "conversation_switched", + conversationId: activeId, + messages, + } satisfies OnboardingServerMessage) + ); + connection.send( + JSON.stringify({ + type: "conversations", + conversations: this.listConversations(), + } satisfies OnboardingServerMessage) + ); + } + + async onMessage(connection: Connection, message: string): Promise { + try { + this.ensureSchema(); + const parsed = JSON.parse(message as string) as OnboardingClientMessage; + + switch (parsed.type) { + case "list_conversations": { + connection.send( + JSON.stringify({ + type: "conversations", + conversations: this.listConversations(), + } satisfies OnboardingServerMessage) + ); + return; + } + + case "new_conversation": { + const convId = this.createConversation("New conversation"); + this.setState({ ...this.state!, activeConversationId: convId }); + connection.send( + JSON.stringify({ + type: "conversation_switched", + conversationId: convId, + messages: [], + } satisfies OnboardingServerMessage) + ); + connection.send( + JSON.stringify({ + type: "conversations", + conversations: this.listConversations(), + } satisfies OnboardingServerMessage) + ); + return; + } + + case "switch_conversation": { + const convId = parsed.conversationId; + this.setState({ ...this.state!, activeConversationId: convId }); + const messages = this.getConversationMessages(convId); + connection.send( + JSON.stringify({ + type: "conversation_switched", + conversationId: convId, + messages, + } satisfies OnboardingServerMessage) + ); + return; + } + + case "delete_conversation": { + const convId = parsed.conversationId; + this.ctx.storage.sql.exec( + "DELETE FROM messages WHERE conversation_id = ?", + convId + ); + this.ctx.storage.sql.exec( + "DELETE FROM conversations WHERE id = ?", + convId + ); + + // If we deleted the active conversation, switch to the latest or create new + if (this.state?.activeConversationId === convId) { + const remaining = this.listConversations(); + const newActiveId = + remaining.length > 0 + ? remaining[0].id + : this.createConversation("New conversation"); + this.setState({ + ...this.state!, + activeConversationId: newActiveId, + }); + connection.send( + JSON.stringify({ + type: "conversation_switched", + conversationId: newActiveId, + messages: this.getConversationMessages(newActiveId), + } satisfies OnboardingServerMessage) + ); + } + + connection.send( + JSON.stringify({ + type: "conversations", + conversations: this.listConversations(), + } satisfies OnboardingServerMessage) + ); + return; + } + + case "chat": { + if (!parsed.content.trim()) return; + + const activeId = this.state!.activeConversationId; + this.addMessage(activeId, "user", parsed.content); + + // Update title from first user message + const allMessages = this.getConversationMessages(activeId); + const userMessages = allMessages.filter((m) => m.role === "user"); + if (userMessages.length === 1) { + const title = this.generateTitle(parsed.content); + this.ctx.storage.sql.exec( + "UPDATE conversations SET title = ? WHERE id = ?", + title, + activeId + ); + connection.send( + JSON.stringify({ + type: "conversations", + conversations: this.listConversations(), + } satisfies OnboardingServerMessage) + ); + } + + await this.runAgentLoop(connection, allMessages); + return; + } + } + } catch (error) { + const errorMsg: OnboardingServerMessage = { + type: "error", + message: error instanceof Error ? error.message : "An error occurred", + }; + connection.send(JSON.stringify(errorMsg)); + } + } + + private async runAgentLoop( + connection: Connection, + chatHistory: OnboardingChatMessage[] + ): Promise { + const client = new Anthropic({ + apiKey: "gateway-managed", + timeout: 120_000, + ...getAnthropicConfig(this.env), + }); + + const systemPrompt = this.buildSystemPrompt(); + + const anthropicMessages: Anthropic.MessageParam[] = chatHistory.map( + (m) => ({ + role: m.role, + content: m.content, + }) + ); + + let continueLoop = true; + + while (continueLoop) { + const startMsg: OnboardingServerMessage = { type: "stream_start" }; + connection.send(JSON.stringify(startMsg)); + + let fullText = ""; + + const stream = client.messages.stream({ + model: MODEL, + max_tokens: 2048, + system: systemPrompt, + messages: anthropicMessages, + tools: TOOLS, + }); + + for await (const event of stream) { + if ( + event.type === "content_block_delta" && + event.delta.type === "text_delta" + ) { + fullText += event.delta.text; + const chunkMsg: OnboardingServerMessage = { + type: "stream_chunk", + content: event.delta.text, + }; + connection.send(JSON.stringify(chunkMsg)); + } + } + + const finalMessage = await stream.finalMessage(); + + const toolBlocks = finalMessage.content.filter( + (b): b is Anthropic.ToolUseBlock => b.type === "tool_use" + ); + + if (toolBlocks.length === 0) { + const endMsg: OnboardingServerMessage = { type: "stream_end" }; + connection.send(JSON.stringify(endMsg)); + + if (fullText) { + this.addMessage( + this.state!.activeConversationId, + "assistant", + fullText + ); + + const completeMsg: OnboardingServerMessage = { + type: "turn_complete", + content: fullText, + }; + connection.send(JSON.stringify(completeMsg)); + } + + continueLoop = false; + } else { + const endMsg: OnboardingServerMessage = { type: "stream_end" }; + connection.send(JSON.stringify(endMsg)); + + anthropicMessages.push({ + role: "assistant", + content: finalMessage.content, + }); + + const toolResults: Anthropic.ToolResultBlockParam[] = []; + for (const toolBlock of toolBlocks) { + const progressMsg: OnboardingServerMessage = { + type: "tool_progress", + tool: toolBlock.name, + description: TOOL_DESCRIPTIONS[toolBlock.name] ?? "Working...", + }; + connection.send(JSON.stringify(progressMsg)); + + const result = await this.executeTool( + toolBlock.name, + toolBlock.input as Record + ); + toolResults.push({ + type: "tool_result", + tool_use_id: toolBlock.id, + content: JSON.stringify(result), + }); + } + + anthropicMessages.push({ + role: "user", + content: toolResults, + }); + } + } + } + + private buildSystemPrompt(): string { + const profile = this.state?.userProfile ?? {}; + + return `You help users set up workflows on Dafthunk, a visual workflow automation platform. + +## Writing style +Every word must earn its place. Be direct, specific, and brief. No filler, no preamble, no "Great question!" or "I'd be happy to help." Just answer. Prefer short sentences. Use lists only when comparing options. One idea per message when possible. +${profile.domain ? `\nUser's domain: "${profile.domain}". Tailor examples.` : ""} +${profile.expertiseLevel ? `User expertise: ${profile.expertiseLevel}.` : ""} +${profile.preferredTone ? `Tone: ${profile.preferredTone}.` : ""} + +## Triggers available +Email (@dafthunk.com), Discord, Telegram, WhatsApp, HTTP endpoints, cron. No Gmail. + +## What you can do +- Create: workflows (from templates), secrets, endpoints, emails, queues, datasets, databases. +- Connect integrations: use connectIntegration to link to the OAuth page. +- Search the node library: use searchNodes to find node types by keyword. +- Edit workflows: use getWorkflow to read, then updateWorkflow to modify nodes, edges, inputs, or metadata. + +## Rules +- Call getOrgState or listTemplates before recommending. Never guess. +- Call checkTemplateRequirements before creating a workflow. +- Only call createSecret when the user gives you the actual value. +- For integrations, use connectIntegration — format the URL as a markdown link. +- Before editing a workflow, call getWorkflow to read its current state. +- Before adding a node, call searchNodes to find the correct type.`; + } + + private async executeTool( + name: string, + input: Record + ): Promise> { + const orgId = this.state?.organizationId ?? ""; + const db = createDatabase(this.env.DB); + + switch (name) { + case "getOrgState": { + const [orgIntegrations, orgSecrets, workflows] = await Promise.all([ + getIntegrations(db, orgId), + getSecrets(db, orgId), + new WorkflowStore(this.env).list(orgId), + ]); + + return { + integrations: orgIntegrations.map((i) => ({ + name: i.name, + provider: i.provider, + status: i.status, + })), + secretNames: orgSecrets.map((s) => s.name), + workflowCount: workflows.length, + }; + } + + case "listTemplates": { + const tag = input.tag as string | undefined; + let templates = workflowTemplates; + if (tag) { + templates = templates.filter((t) => + t.tags.some((tg) => tg.toLowerCase() === tag.toLowerCase()) + ); + } + return { + templates: templates.map((t) => ({ + id: t.id, + name: t.name, + description: t.description, + tags: t.tags, + trigger: t.trigger, + })), + }; + } + + case "checkTemplateRequirements": { + const templateId = input.templateId as string; + const template = workflowTemplates.find((t) => t.id === templateId); + if (!template) return { error: `Template "${templateId}" not found` }; + + const requiredIntegrations = new Set(); + for (const node of template.nodes) { + const provider = INTEGRATION_NODE_MAP[node.type]; + if (provider) requiredIntegrations.add(provider); + } + + const orgIntegrations = await getIntegrations(db, orgId); + const connectedProviders = new Set( + orgIntegrations.map((i) => i.provider) + ); + + const missing: string[] = []; + for (const required of requiredIntegrations) { + if (!connectedProviders.has(required)) missing.push(required); + } + + return { + templateId: template.id, + templateName: template.name, + requiredIntegrations: [...requiredIntegrations], + missingIntegrations: missing, + ready: missing.length === 0, + }; + } + + case "createWorkflowFromTemplate": { + const templateId = input.templateId as string; + const customName = input.customName as string | undefined; + const aiPromptOverrides = input.aiPromptOverrides as + | Record + | undefined; + + const template = workflowTemplates.find((t) => t.id === templateId); + if (!template) return { error: `Template "${templateId}" not found` }; + + const nodes = structuredClone(template.nodes); + + if (aiPromptOverrides) { + for (const node of nodes) { + if ( + AI_NODE_TYPES.includes(node.type) && + aiPromptOverrides[node.id] + ) { + const promptInput = node.inputs.find( + (i: { name: string }) => + i.name === "system_prompt" || i.name === "instructions" + ); + if (promptInput) { + promptInput.value = aiPromptOverrides[node.id]; + } + } + } + } + + const userProfile = this.state?.userProfile; + if (userProfile?.preferredTone) { + for (const node of nodes) { + if (AI_NODE_TYPES.includes(node.type)) { + if (aiPromptOverrides && aiPromptOverrides[node.id]) continue; + const promptInput = node.inputs.find( + (i: { name: string }) => + i.name === "system_prompt" || i.name === "instructions" + ); + if (promptInput && typeof promptInput.value === "string") { + promptInput.value = `${promptInput.value}\n\nTone: ${userProfile.preferredTone}`; + } + } + } + } + + const workflowId = crypto.randomUUID(); + const workflow = await new WorkflowStore(this.env).save({ + id: workflowId, + name: customName ?? template.name, + description: template.description, + trigger: template.trigger, + organizationId: orgId, + nodes, + edges: structuredClone(template.edges), + }); + + return { workflowId: workflow.id, name: workflow.name }; + } + + case "createSecret": { + const secretName = input.name as string; + const secretValue = input.value as string; + await createSecret(db, orgId, secretName, secretValue, this.env); + return { name: secretName, created: true }; + } + + case "createEndpoint": { + const id = uuidv7(); + const endpoint = await createEndpoint(db, { + id, + name: input.name as string, + mode: input.mode as "webhook" | "request", + organizationId: orgId, + }); + return { id: endpoint.id, name: endpoint.name, mode: endpoint.mode }; + } + + case "createEmail": { + const id = uuidv7(); + const email = await createEmail(db, { + id, + name: input.name as string, + organizationId: orgId, + }); + return { + id: email.id, + name: email.name, + address: `${email.name}@${this.env.EMAIL_DOMAIN || "dafthunk.com"}`, + }; + } + + case "createQueue": { + const id = uuidv7(); + const queue = await createQueue(db, { + id, + name: input.name as string, + organizationId: orgId, + }); + return { id: queue.id, name: queue.name }; + } + + case "createDataset": { + const id = uuidv7(); + const dataset = await createDataset(db, { + id, + name: input.name as string, + organizationId: orgId, + }); + return { id: dataset.id, name: dataset.name }; + } + + case "createDatabase": { + const id = uuidv7(); + const database = await createDatabaseRecord(db, { + id, + name: input.name as string, + organizationId: orgId, + }); + return { id: database.id, name: database.name }; + } + + case "connectIntegration": { + const provider = input.provider as string; + const webHost = this.env.WEB_HOST || "https://app.dafthunk.com"; + const integrationsUrl = `${webHost}/org/${orgId}/integrations`; + return { + provider, + url: integrationsUrl, + message: `Link the user to the integrations page to connect ${provider}. Format as markdown link.`, + }; + } + + case "getSetupUrl": { + const page = input.page as string; + const webHost = this.env.WEB_HOST || "https://app.dafthunk.com"; + return { url: `${webHost}/org/${orgId}/${page}`, page }; + } + + case "searchNodes": { + const query = (input.query as string).toLowerCase(); + const registry = new CloudflareNodeRegistry(this.env, false); + const allNodes = registry.getNodeTypes(); + const matches = allNodes.filter((n) => { + const haystack = [n.name, n.type, n.description ?? "", ...n.tags] + .join(" ") + .toLowerCase(); + return haystack.includes(query); + }); + return { + results: matches.slice(0, 20).map((n) => ({ + type: n.type, + name: n.name, + description: n.description, + tags: n.tags, + inputs: n.inputs.map((p) => ({ + name: p.name, + type: p.type, + description: p.description, + })), + outputs: n.outputs.map((p) => ({ + name: p.name, + type: p.type, + description: p.description, + })), + })), + total: matches.length, + }; + } + + case "getWorkflow": { + const workflowId = input.workflowId as string; + const store = new WorkflowStore(this.env); + const workflow = await store.getWithData(workflowId, orgId); + if (!workflow) return { error: "Workflow not found" }; + + return { + id: workflow.id, + name: workflow.name, + description: workflow.description, + trigger: workflow.trigger, + runtime: workflow.runtime, + nodes: workflow.data.nodes.map((n) => ({ + id: n.id, + type: n.type, + name: n.name, + position: n.position, + inputs: n.inputs.map((p) => ({ + name: p.name, + type: p.type, + value: p.value, + })), + outputs: n.outputs.map((p) => ({ + name: p.name, + type: p.type, + })), + })), + edges: workflow.data.edges, + }; + } + + case "updateWorkflow": { + const workflowId = input.workflowId as string; + const store = new WorkflowStore(this.env); + const workflow = await store.getWithData(workflowId, orgId); + if (!workflow) return { error: "Workflow not found" }; + + const nodes = structuredClone(workflow.data.nodes); + let edges = structuredClone(workflow.data.edges); + const newName = (input.name as string | undefined) ?? workflow.name; + const newDescription = + (input.description as string | undefined) ?? workflow.description; + const newTrigger = + (input.trigger as string | undefined) ?? workflow.trigger; + + // Update node inputs + const updateNodeInputs = input.updateNodeInputs as + | { nodeId: string; inputs: Record }[] + | undefined; + if (updateNodeInputs) { + for (const update of updateNodeInputs) { + const node = nodes.find((n) => n.id === update.nodeId); + if (!node) continue; + for (const [inputName, value] of Object.entries(update.inputs)) { + const param = node.inputs.find((p) => p.name === inputName); + if (param) param.value = value; + } + } + } + + // Add nodes + const addNodes = input.addNodes as + | { + type: string; + name?: string; + positionX?: number; + positionY?: number; + inputs?: Record; + }[] + | undefined; + if (addNodes) { + const registry = new CloudflareNodeRegistry(this.env, false); + const allTypes = registry.getNodeTypes(); + for (const spec of addNodes) { + const nodeType = allTypes.find((t) => t.type === spec.type); + if (!nodeType) continue; + const nodeId = uuidv7(); + const newNode = { + id: nodeId, + type: spec.type, + name: spec.name ?? nodeType.name, + position: { + x: spec.positionX ?? 0, + y: spec.positionY ?? 0, + }, + inputs: nodeType.inputs.map((p) => ({ + ...p, + value: + spec.inputs && spec.inputs[p.name] !== undefined + ? spec.inputs[p.name] + : p.value, + })), + outputs: nodeType.outputs.map((p) => ({ ...p })), + }; + nodes.push(newNode); + } + } + + // Remove nodes + const removeNodes = input.removeNodes as string[] | undefined; + if (removeNodes) { + const removeSet = new Set(removeNodes); + const filtered = nodes.filter((n) => !removeSet.has(n.id)); + nodes.length = 0; + nodes.push(...filtered); + edges = edges.filter( + (e) => !removeSet.has(e.source) && !removeSet.has(e.target) + ); + } + + // Add edges + const addEdges = input.addEdges as + | { + source: string; + sourceOutput: string; + target: string; + targetInput: string; + }[] + | undefined; + if (addEdges) { + for (const edge of addEdges) { + edges.push(edge); + } + } + + // Remove edges + const removeEdges = input.removeEdges as + | { source: string; target: string }[] + | undefined; + if (removeEdges) { + for (const re of removeEdges) { + edges = edges.filter( + (e) => !(e.source === re.source && e.target === re.target) + ); + } + } + + await store.save({ + id: workflowId, + name: newName, + description: newDescription ?? undefined, + trigger: newTrigger, + organizationId: orgId, + nodes, + edges, + }); + + return { + workflowId, + name: newName, + nodeCount: nodes.length, + edgeCount: edges.length, + }; + } + + default: + return { error: `Unknown tool: ${name}` }; + } + } +} diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 5b1d8ed9..a21b7944 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -26,6 +26,7 @@ import invitationRoutes from "./routes/invitations"; import llmsRoutes from "./routes/llms"; import oauthRoutes from "./routes/oauth"; import objectRoutes from "./routes/objects"; +import onboardingRoutes from "./routes/onboarding"; import organizationRoutes from "./routes/organizations"; import playgroundRoutes from "./routes/playground"; import profileRoutes from "./routes/profile"; @@ -125,6 +126,7 @@ app.route("/:organizationId/workflows", workflowRoutes); app.route("/:organizationId/objects", objectRoutes); app.route("/:organizationId/playground", playgroundRoutes); app.route("/:organizationId/usage", usageRoutes); +app.route("/:organizationId/onboarding", onboardingRoutes); app.route("/:organizationId/ws", wsRoutes); // Export Durable Objects @@ -133,6 +135,7 @@ export { DatabaseDO }; export { Sandbox } from "@cloudflare/sandbox"; export { FFmpegContainer } from "./containers/ffmpeg-container"; export { AgentRunner } from "./durable-objects/agent-runner"; +export { DafthunkAgent } from "./durable-objects/dafthunk-agent"; export default { email: handleIncomingEmail, diff --git a/apps/api/src/routes/onboarding.ts b/apps/api/src/routes/onboarding.ts new file mode 100644 index 00000000..ce6d9abf --- /dev/null +++ b/apps/api/src/routes/onboarding.ts @@ -0,0 +1,35 @@ +import { Hono } from "hono"; + +import { jwtMiddleware } from "../auth"; +import type { ApiContext } from "../context"; +import { getAgentByName } from "../durable-objects/agent-utils"; + +const onboardingRoutes = new Hono(); + +onboardingRoutes.get("/", jwtMiddleware, async (c) => { + const userId = c.var.jwtPayload?.sub; + + if (!userId) { + return c.json({ error: "Unauthorized" }, 401); + } + + const orgId = c.get("organizationId"); + if (!orgId) { + return c.json({ error: "Organization required" }, 400); + } + + const stub = await getAgentByName(c.env.DAFTHUNK_AGENT, orgId); + + const headers = new Headers(c.req.raw.headers); + headers.set("X-User-Id", userId); + headers.set("X-Organization-Id", orgId); + const newReq = new Request(c.req.url, { + method: c.req.method, + headers, + body: c.req.raw.body, + }); + + return stub.fetch(newReq); +}); + +export default onboardingRoutes; diff --git a/apps/api/src/utils/encryption.test.ts b/apps/api/src/utils/encryption.test.ts index cb477527..d4959295 100644 --- a/apps/api/src/utils/encryption.test.ts +++ b/apps/api/src/utils/encryption.test.ts @@ -39,6 +39,7 @@ const createMockEnv = (masterKey?: string): Bindings => ({ CLOUDFLARE_AI_GATEWAY_ID: "", AI_OPTIONS: {}, AGENT_RUNNER: {} as DurableObjectNamespace, + DAFTHUNK_AGENT: {} as DurableObjectNamespace, }); describe("Encryption Utilities", () => { diff --git a/apps/api/wrangler.jsonc b/apps/api/wrangler.jsonc index f5f40a86..5355d467 100644 --- a/apps/api/wrangler.jsonc +++ b/apps/api/wrangler.jsonc @@ -122,6 +122,11 @@ "class_name": "WorkflowAgent", "script_name": "dafthunk-api" }, + { + "name": "DAFTHUNK_AGENT", + "class_name": "DafthunkAgent", + "script_name": "dafthunk-api" + }, { "name": "FFMPEG_CONTAINER", "class_name": "FFmpegContainer" @@ -171,6 +176,10 @@ { "tag": "v10", "new_sqlite_classes": ["WorkflowAgent"] + }, + { + "tag": "v11", + "new_sqlite_classes": ["DafthunkAgent"] } ], "unsafe": { @@ -296,6 +305,11 @@ "class_name": "WorkflowAgent", "script_name": "dafthunk-api" }, + { + "name": "DAFTHUNK_AGENT", + "class_name": "DafthunkAgent", + "script_name": "dafthunk-api" + }, { "name": "FFMPEG_CONTAINER", "class_name": "FFmpegContainer" @@ -345,6 +359,10 @@ { "tag": "v10", "new_sqlite_classes": ["WorkflowAgent"] + }, + { + "tag": "v11", + "new_sqlite_classes": ["DafthunkAgent"] } ], "unsafe": { diff --git a/apps/app/src/components/onboarding/onboarding-chat-panel.tsx b/apps/app/src/components/onboarding/onboarding-chat-panel.tsx new file mode 100644 index 00000000..dd4d6356 --- /dev/null +++ b/apps/app/src/components/onboarding/onboarding-chat-panel.tsx @@ -0,0 +1,211 @@ +import type { + OnboardingChatMessage, + OnboardingConversation, +} from "@dafthunk/types"; +import ArrowUp from "lucide-react/icons/arrow-up"; +import Loader from "lucide-react/icons/loader"; +import PenSquare from "lucide-react/icons/pen-square"; +import Trash2 from "lucide-react/icons/trash-2"; +import { useEffect, useRef, useState } from "react"; +import Markdown from "react-markdown"; + +import { useAuth } from "@/components/auth-context"; +import { Button } from "@/components/ui/button"; +import { useOnboardingChat } from "@/hooks/use-onboarding-chat"; +import { cn } from "@/utils/utils"; + +export function OnboardingChatPage() { + const { organization } = useAuth(); + const orgId = organization?.id ?? ""; + const { + messages, + conversations, + activeConversationId, + isStreaming, + isConnected, + currentStreamContent, + toolProgress, + sendMessage, + newConversation, + switchConversation, + deleteConversation, + } = useOnboardingChat(orgId, true); + + const [input, setInput] = useState(""); + const scrollRef = useRef(null); + const textareaRef = useRef(null); + + useEffect(() => { + if (scrollRef.current) { + scrollRef.current.scrollTop = scrollRef.current.scrollHeight; + } + }, [messages, currentStreamContent, toolProgress]); + + useEffect(() => { + const ta = textareaRef.current; + if (!ta) return; + ta.style.height = "auto"; + ta.style.height = `${Math.min(ta.scrollHeight, 200)}px`; + }, [input]); + + const handleSubmit = () => { + const trimmed = input.trim(); + if (!trimmed || isStreaming) return; + sendMessage(trimmed); + setInput(""); + }; + + const handleKeyDown = (e: React.KeyboardEvent) => { + if (e.key === "Enter" && !e.shiftKey) { + e.preventDefault(); + handleSubmit(); + } + }; + + return ( +
+ {/* Sidebar */} +
+
+ +
+
+ {conversations.map((conv) => ( + switchConversation(conv.id)} + onDelete={() => deleteConversation(conv.id)} + /> + ))} +
+
+ + {/* Chat area */} +
+
+
+ {messages.length === 0 && !isStreaming && ( +
+

+ What would you like to automate? +

+

+ I'll help you set up your first workflow. +

+
+ )} + + {messages.map((msg, i) => ( + + ))} + + {toolProgress && ( +
+ + {toolProgress} +
+ )} + + {isStreaming && currentStreamContent && ( +
+ {currentStreamContent} + +
+ )} +
+
+ +
+
+
+