From 44bc12b47402c98c4cc7fa42bce8632ed95cf9e8 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 22 Aug 2025 11:49:56 -0700 Subject: [PATCH 01/21] fix(ocr-azure): added OCR_AZURE_API_KEY envvar (#1102) --- apps/sim/lib/documents/document-processor.ts | 6 +++--- apps/sim/lib/env.ts | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/apps/sim/lib/documents/document-processor.ts b/apps/sim/lib/documents/document-processor.ts index 77272822636..44ae8057f28 100644 --- a/apps/sim/lib/documents/document-processor.ts +++ b/apps/sim/lib/documents/document-processor.ts @@ -116,7 +116,7 @@ async function parseDocument( }> { const isPDF = mimeType === 'application/pdf' const hasAzureMistralOCR = - env.AZURE_OPENAI_API_KEY && env.OCR_AZURE_ENDPOINT && env.OCR_AZURE_MODEL_NAME + env.OCR_AZURE_API_KEY && env.OCR_AZURE_ENDPOINT && env.OCR_AZURE_MODEL_NAME const hasMistralOCR = env.MISTRAL_API_KEY // Check Azure Mistral OCR configuration @@ -288,7 +288,7 @@ async function makeOCRRequest(endpoint: string, headers: Record, async function parseWithAzureMistralOCR(fileUrl: string, filename: string, mimeType: string) { validateOCRConfig( - env.AZURE_OPENAI_API_KEY, + env.OCR_AZURE_API_KEY, env.OCR_AZURE_ENDPOINT, env.OCR_AZURE_MODEL_NAME, 'Azure Mistral OCR' @@ -306,7 +306,7 @@ async function parseWithAzureMistralOCR(fileUrl: string, filename: string, mimeT env.OCR_AZURE_ENDPOINT!, { 'Content-Type': 'application/json', - Authorization: `Bearer ${env.AZURE_OPENAI_API_KEY}`, + Authorization: `Bearer ${env.OCR_AZURE_API_KEY}`, }, { model: env.OCR_AZURE_MODEL_NAME, diff --git a/apps/sim/lib/env.ts b/apps/sim/lib/env.ts index fc249871f9b..7f9fbecc706 100644 --- a/apps/sim/lib/env.ts +++ b/apps/sim/lib/env.ts @@ -74,6 +74,7 @@ export const env = createEnv({ WAND_OPENAI_MODEL_NAME: z.string().optional(), // Wand generation OpenAI model name (works with both regular OpenAI and Azure OpenAI) OCR_AZURE_ENDPOINT: z.string().url().optional(), // Azure Mistral OCR service endpoint OCR_AZURE_MODEL_NAME: z.string().optional(), // Azure Mistral OCR model name for document processing + OCR_AZURE_API_KEY: z.string().min(1).optional(), // Azure Mistral OCR API key // Monitoring & Analytics TELEMETRY_ENDPOINT: z.string().url().optional(), // Custom telemetry/analytics endpoint From e13adab14f2c0e24f1bdbb579ff1edfb409325d0 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 22 Aug 2025 12:01:16 -0700 Subject: [PATCH 02/21] improvement(wand): upgrade wand to use SSE (#1100) * improvement(wand): upgrade wand to use SSE * fix(ocr-azure): added OCR_AZURE_API_KEY envvar (#1102) * make wand identical to chat panel --- apps/sim/app/api/wand-generate/route.ts | 26 ++++++++--- .../w/[workflowId]/hooks/use-wand.ts | 46 ++++++++++--------- 2 files changed, 43 insertions(+), 29 deletions(-) diff --git a/apps/sim/app/api/wand-generate/route.ts b/apps/sim/app/api/wand-generate/route.ts index a7bee44f90e..05755adf58f 100644 --- a/apps/sim/app/api/wand-generate/route.ts +++ b/apps/sim/app/api/wand-generate/route.ts @@ -95,12 +95,19 @@ export async function POST(req: NextRequest) { { stream, historyLength: history.length, + endpoint: useWandAzure ? azureEndpoint : 'api.openai.com', + model: useWandAzure ? wandModelName : 'gpt-4o', + apiVersion: useWandAzure ? azureApiVersion : 'N/A', } ) // For streaming responses if (stream) { try { + logger.debug( + `[${requestId}] Starting streaming request to ${useWandAzure ? 'Azure OpenAI' : 'OpenAI'}` + ) + const streamCompletion = await client.chat.completions.create({ model: useWandAzure ? wandModelName : 'gpt-4o', messages: messages, @@ -109,6 +116,8 @@ export async function POST(req: NextRequest) { stream: true, }) + logger.debug(`[${requestId}] Stream connection established successfully`) + return new Response( new ReadableStream({ async start(controller) { @@ -118,21 +127,23 @@ export async function POST(req: NextRequest) { for await (const chunk of streamCompletion) { const content = chunk.choices[0]?.delta?.content || '' if (content) { - // Use the same format as codegen API for consistency + // Use SSE format identical to chat streaming controller.enqueue( - encoder.encode(`${JSON.stringify({ chunk: content, done: false })}\n`) + encoder.encode(`data: ${JSON.stringify({ chunk: content })}\n\n`) ) } } - // Send completion signal - controller.enqueue(encoder.encode(`${JSON.stringify({ chunk: '', done: true })}\n`)) + // Send completion signal in SSE format + controller.enqueue(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`)) controller.close() logger.info(`[${requestId}] Wand generation streaming completed`) } catch (streamError: any) { logger.error(`[${requestId}] Streaming error`, { error: streamError.message }) controller.enqueue( - encoder.encode(`${JSON.stringify({ error: 'Streaming failed', done: true })}\n`) + encoder.encode( + `data: ${JSON.stringify({ error: 'Streaming failed', done: true })}\n\n` + ) ) controller.close() } @@ -140,9 +151,10 @@ export async function POST(req: NextRequest) { }), { headers: { - 'Content-Type': 'text/plain', - 'Cache-Control': 'no-cache, no-transform', + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', Connection: 'keep-alive', + 'X-Accel-Buffering': 'no', }, } ) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-wand.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-wand.ts index c3550e8f273..e73ae183a85 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-wand.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-wand.ts @@ -198,35 +198,37 @@ export function useWand({ const { done, value } = await reader.read() if (done) break - // Process incoming chunks - const text = decoder.decode(value) - const lines = text.split('\n').filter((line) => line.trim() !== '') + // Process incoming chunks using SSE format (identical to Chat panel) + const chunk = decoder.decode(value) + const lines = chunk.split('\n\n') for (const line of lines) { - try { - const data = JSON.parse(line) + if (line.startsWith('data: ')) { + try { + const data = JSON.parse(line.substring(6)) - // Check if there's an error - if (data.error) { - throw new Error(data.error) - } + // Check if there's an error + if (data.error) { + throw new Error(data.error) + } - // Process chunk - if (data.chunk && !data.done) { - accumulatedContent += data.chunk - // Stream each chunk to the UI immediately - if (onStreamChunk) { - onStreamChunk(data.chunk) + // Process chunk + if (data.chunk) { + accumulatedContent += data.chunk + // Stream each chunk to the UI immediately + if (onStreamChunk) { + onStreamChunk(data.chunk) + } } - } - // Check if streaming is complete - if (data.done) { - break + // Check if streaming is complete + if (data.done) { + break + } + } catch (parseError) { + // Continue processing other lines + logger.debug('Failed to parse SSE line', { line, parseError }) } - } catch (parseError) { - // Continue processing other lines - logger.debug('Failed to parse streaming line', { line, parseError }) } } } From 6c606750f57d336a9c2bc3ba3f90eb9575d6c9de Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 22 Aug 2025 12:15:59 -0700 Subject: [PATCH 03/21] improvement(signup): modify signup and login pages to not show social sign in when not configured, increase logo size (#1103) --- .../components/social-login-buttons.tsx | 53 ++----------------- apps/sim/app/(auth)/layout.tsx | 8 +-- apps/sim/app/(auth)/login/login-form.tsx | 10 ++-- apps/sim/app/(auth)/signup/signup-form.tsx | 10 ++-- 4 files changed, 21 insertions(+), 60 deletions(-) diff --git a/apps/sim/app/(auth)/components/social-login-buttons.tsx b/apps/sim/app/(auth)/components/social-login-buttons.tsx index 153689912f2..45d3de78c82 100644 --- a/apps/sim/app/(auth)/components/social-login-buttons.tsx +++ b/apps/sim/app/(auth)/components/social-login-buttons.tsx @@ -3,7 +3,6 @@ import { useEffect, useState } from 'react' import { GithubIcon, GoogleIcon } from '@/components/icons' import { Button } from '@/components/ui/button' -import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from '@/components/ui/tooltip' import { client } from '@/lib/auth-client' interface SocialLoginButtonsProps { @@ -114,58 +113,16 @@ export function SocialLoginButtons({ ) - const renderGithubButton = () => { - if (githubAvailable) return githubButton - - return ( - - - -
{githubButton}
-
- -

- GitHub login requires OAuth credentials to be configured. Add the following - environment variables: -

-
    -
  • • GITHUB_CLIENT_ID
  • -
  • • GITHUB_CLIENT_SECRET
  • -
-
-
-
- ) - } + const hasAnyOAuthProvider = githubAvailable || googleAvailable - const renderGoogleButton = () => { - if (googleAvailable) return googleButton - - return ( - - - -
{googleButton}
-
- -

- Google login requires OAuth credentials to be configured. Add the following - environment variables: -

-
    -
  • • GOOGLE_CLIENT_ID
  • -
  • • GOOGLE_CLIENT_SECRET
  • -
-
-
-
- ) + if (!hasAnyOAuthProvider) { + return null } return (
- {renderGithubButton()} - {renderGoogleButton()} + {githubAvailable && githubButton} + {googleAvailable && googleButton}
) } diff --git a/apps/sim/app/(auth)/layout.tsx b/apps/sim/app/(auth)/layout.tsx index de08c586654..dabbe70c71b 100644 --- a/apps/sim/app/(auth)/layout.tsx +++ b/apps/sim/app/(auth)/layout.tsx @@ -28,12 +28,12 @@ export default function AuthLayout({ children }: { children: React.ReactNode }) {`${brand.name} ) : ( - {`${brand.name} + {`${brand.name} )} diff --git a/apps/sim/app/(auth)/login/login-form.tsx b/apps/sim/app/(auth)/login/login-form.tsx index 9781da8efb6..f3eda79b5a5 100644 --- a/apps/sim/app/(auth)/login/login-form.tsx +++ b/apps/sim/app/(auth)/login/login-form.tsx @@ -366,11 +366,13 @@ export default function LoginPage({ callbackURL={callbackUrl} /> -
-
-
+ {(githubAvailable || googleAvailable) && ( +
+
+
+
-
+ )}
diff --git a/apps/sim/app/(auth)/signup/signup-form.tsx b/apps/sim/app/(auth)/signup/signup-form.tsx index 9a487cddd1e..18c3acfc80e 100644 --- a/apps/sim/app/(auth)/signup/signup-form.tsx +++ b/apps/sim/app/(auth)/signup/signup-form.tsx @@ -381,11 +381,13 @@ function SignupFormContent({ isProduction={isProduction} /> -
-
-
+ {(githubAvailable || googleAvailable) && ( +
+
+
+
-
+ )}
From a268fb7c04ff3f7b614ddf7390000b4b75ed8901 Mon Sep 17 00:00:00 2001 From: Emir Karabeg <78010029+emir-karabeg@users.noreply.github.com> Date: Fri, 22 Aug 2025 12:23:11 -0700 Subject: [PATCH 04/21] fix(chat-deploy): dark mode ui (#1101) --- apps/sim/app/chat/[subdomain]/chat-client.css | 167 ++++++++++++++++++ apps/sim/app/chat/[subdomain]/chat-client.tsx | 2 +- .../[subdomain]/components/header/header.tsx | 45 +---- .../loading-state/loading-state.tsx | 6 +- apps/sim/app/chat/[subdomain]/layout.tsx | 19 ++ 5 files changed, 193 insertions(+), 46 deletions(-) create mode 100644 apps/sim/app/chat/[subdomain]/chat-client.css create mode 100644 apps/sim/app/chat/[subdomain]/layout.tsx diff --git a/apps/sim/app/chat/[subdomain]/chat-client.css b/apps/sim/app/chat/[subdomain]/chat-client.css new file mode 100644 index 00000000000..cd4969462aa --- /dev/null +++ b/apps/sim/app/chat/[subdomain]/chat-client.css @@ -0,0 +1,167 @@ +/* Force light mode for chat subdomain by overriding dark mode utilities */ +/* This file uses CSS variables from globals.css light mode theme */ + +/* When inside the chat layout, force all light mode CSS variables */ +.chat-light-wrapper { + /* Core Colors - from globals.css light mode */ + --background: 0 0% 100%; + --foreground: 0 0% 3.9%; + + /* Card Colors */ + --card: 0 0% 99.2%; + --card-foreground: 0 0% 3.9%; + + /* Popover Colors */ + --popover: 0 0% 100%; + --popover-foreground: 0 0% 3.9%; + + /* Primary Colors */ + --primary: 0 0% 11.2%; + --primary-foreground: 0 0% 98%; + + /* Secondary Colors */ + --secondary: 0 0% 96.1%; + --secondary-foreground: 0 0% 11.2%; + + /* Muted Colors */ + --muted: 0 0% 96.1%; + --muted-foreground: 0 0% 46.9%; + + /* Accent Colors */ + --accent: 0 0% 92.5%; + --accent-foreground: 0 0% 11.2%; + + /* Destructive Colors */ + --destructive: 0 84.2% 60.2%; + --destructive-foreground: 0 0% 98%; + + /* Border & Input Colors */ + --border: 0 0% 89.8%; + --input: 0 0% 89.8%; + --ring: 0 0% 3.9%; + + /* Border Radius */ + --radius: 0.5rem; + + /* Scrollbar Properties */ + --scrollbar-track: 0 0% 85%; + --scrollbar-thumb: 0 0% 65%; + --scrollbar-thumb-hover: 0 0% 55%; + --scrollbar-size: 8px; + + /* Workflow Properties */ + --workflow-background: 0 0% 100%; + --workflow-dots: 0 0% 94.5%; + --card-background: 0 0% 99.2%; + --card-border: 0 0% 89.8%; + --card-text: 0 0% 3.9%; + --card-hover: 0 0% 96.1%; + + /* Base Component Properties */ + --base-muted-foreground: #737373; + + /* Gradient Colors */ + --gradient-primary: 263 85% 70%; + --gradient-secondary: 336 95% 65%; + + /* Brand Colors */ + --brand-primary-hex: #701ffc; + --brand-primary-hover-hex: #802fff; + --brand-secondary-hex: #6518e6; + --brand-accent-hex: #9d54ff; + --brand-accent-hover-hex: #a66fff; + --brand-background-hex: #0c0c0c; + + /* UI Surface Colors */ + --surface-elevated: #202020; +} + +/* Override dark mode utility classes using CSS variables */ +.chat-light-wrapper :is(.dark\:bg-black) { + background-color: hsl(var(--secondary)); +} + +.chat-light-wrapper :is(.dark\:bg-gray-900) { + background-color: hsl(var(--background)); +} + +.chat-light-wrapper :is(.dark\:bg-gray-800) { + background-color: hsl(var(--secondary)); +} + +.chat-light-wrapper :is(.dark\:bg-gray-700) { + background-color: hsl(var(--accent)); +} + +.chat-light-wrapper :is(.dark\:bg-gray-600) { + background-color: hsl(var(--muted)); +} + +.chat-light-wrapper :is(.dark\:bg-gray-300) { + background-color: hsl(var(--primary)); +} + +/* Text color overrides using CSS variables */ +.chat-light-wrapper :is(.dark\:text-gray-100) { + color: hsl(var(--primary)); +} + +.chat-light-wrapper :is(.dark\:text-gray-200) { + color: hsl(var(--foreground)); +} + +.chat-light-wrapper :is(.dark\:text-gray-300) { + color: hsl(var(--muted-foreground)); +} + +.chat-light-wrapper :is(.dark\:text-gray-400) { + color: hsl(var(--muted-foreground)); +} + +.chat-light-wrapper :is(.dark\:text-neutral-600) { + color: hsl(var(--muted-foreground)); +} + +.chat-light-wrapper :is(.dark\:text-blue-400) { + color: var(--brand-accent-hex); +} + +/* Border color overrides using CSS variables */ +.chat-light-wrapper :is(.dark\:border-gray-700) { + border-color: hsl(var(--border)); +} + +.chat-light-wrapper :is(.dark\:border-gray-800) { + border-color: hsl(var(--border)); +} + +.chat-light-wrapper :is(.dark\:border-gray-600) { + border-color: hsl(var(--border)); +} + +.chat-light-wrapper :is(.dark\:divide-gray-700) > * + * { + border-color: hsl(var(--border)); +} + +/* Hover state overrides */ +.chat-light-wrapper :is(.dark\:hover\:bg-gray-800\/60:hover) { + background-color: hsl(var(--card-hover)); +} + +/* Code blocks specific overrides using CSS variables */ +.chat-light-wrapper pre:is(.dark\:bg-black) { + background-color: hsl(var(--workflow-dots)); +} + +.chat-light-wrapper code:is(.dark\:bg-gray-700) { + background-color: hsl(var(--accent)); +} + +.chat-light-wrapper code:is(.dark\:text-gray-200) { + color: hsl(var(--foreground)); +} + +/* Force color scheme */ +.chat-light-wrapper { + color-scheme: light !important; +} diff --git a/apps/sim/app/chat/[subdomain]/chat-client.tsx b/apps/sim/app/chat/[subdomain]/chat-client.tsx index b79b5f9c648..482f9e30983 100644 --- a/apps/sim/app/chat/[subdomain]/chat-client.tsx +++ b/apps/sim/app/chat/[subdomain]/chat-client.tsx @@ -481,7 +481,7 @@ export default function ChatClient({ subdomain }: { subdomain: string }) { // Standard text-based chat interface return ( -
+
{/* Header component */} diff --git a/apps/sim/app/chat/[subdomain]/components/header/header.tsx b/apps/sim/app/chat/[subdomain]/components/header/header.tsx index c3a2269279b..d99f57f9ee2 100644 --- a/apps/sim/app/chat/[subdomain]/components/header/header.tsx +++ b/apps/sim/app/chat/[subdomain]/components/header/header.tsx @@ -22,53 +22,14 @@ export function ChatHeader({ chatConfig, starCount }: ChatHeaderProps) { return (
- {customImage ? ( + {customImage && ( {`${chatConfig?.title - ) : ( - // Default Sim Studio logo when no custom image is provided -
- - - - - - -
)} -

+

{chatConfig?.customizations?.headerText || chatConfig?.title || 'Chat'}

diff --git a/apps/sim/app/chat/[subdomain]/components/loading-state/loading-state.tsx b/apps/sim/app/chat/[subdomain]/components/loading-state/loading-state.tsx index 8e7540a609f..7d99d848144 100644 --- a/apps/sim/app/chat/[subdomain]/components/loading-state/loading-state.tsx +++ b/apps/sim/app/chat/[subdomain]/components/loading-state/loading-state.tsx @@ -2,10 +2,10 @@ export function ChatLoadingState() { return ( -
+
-
-
+
+
) diff --git a/apps/sim/app/chat/[subdomain]/layout.tsx b/apps/sim/app/chat/[subdomain]/layout.tsx new file mode 100644 index 00000000000..d16a72e8528 --- /dev/null +++ b/apps/sim/app/chat/[subdomain]/layout.tsx @@ -0,0 +1,19 @@ +'use client' + +import { ThemeProvider } from 'next-themes' +import './chat-client.css' + +export default function ChatLayout({ children }: { children: React.ReactNode }) { + return ( + +
+ {children} +
+
+ ) +} From 60c46686828ee0b1025b4798afc402821b449e80 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 22 Aug 2025 13:20:45 -0700 Subject: [PATCH 05/21] fix(naming): prevent identical normalized block names (#1105) --- .../stores/workflows/workflow/store.test.ts | 129 ++++++++++++++++++ apps/sim/stores/workflows/workflow/store.ts | 30 +++- apps/sim/stores/workflows/workflow/types.ts | 2 +- 3 files changed, 159 insertions(+), 2 deletions(-) diff --git a/apps/sim/stores/workflows/workflow/store.test.ts b/apps/sim/stores/workflows/workflow/store.test.ts index f68011d01d4..e942c87e7f1 100644 --- a/apps/sim/stores/workflows/workflow/store.test.ts +++ b/apps/sim/stores/workflows/workflow/store.test.ts @@ -603,4 +603,133 @@ describe('workflow store', () => { expect(childBlock.data?.extent).toBe('parent') }) }) + + describe('updateBlockName', () => { + beforeEach(() => { + useWorkflowStore.setState({ + blocks: {}, + edges: [], + loops: {}, + parallels: {}, + }) + + const { addBlock } = useWorkflowStore.getState() + + addBlock('block1', 'agent', 'Column AD', { x: 0, y: 0 }) + addBlock('block2', 'function', 'Employee Length', { x: 100, y: 0 }) + addBlock('block3', 'trigger', 'Start', { x: 200, y: 0 }) + }) + + it('should have test blocks set up correctly', () => { + const state = useWorkflowStore.getState() + + expect(state.blocks.block1).toBeDefined() + expect(state.blocks.block1.name).toBe('Column AD') + expect(state.blocks.block2).toBeDefined() + expect(state.blocks.block2.name).toBe('Employee Length') + expect(state.blocks.block3).toBeDefined() + expect(state.blocks.block3.name).toBe('Start') + }) + + it('should successfully rename a block when no conflicts exist', () => { + const { updateBlockName } = useWorkflowStore.getState() + + const result = updateBlockName('block1', 'Data Processor') + + expect(result).toBe(true) + + const state = useWorkflowStore.getState() + expect(state.blocks.block1.name).toBe('Data Processor') + }) + + it('should allow renaming a block to a different case/spacing of its current name', () => { + const { updateBlockName } = useWorkflowStore.getState() + + const result = updateBlockName('block1', 'column ad') + + expect(result).toBe(true) + + const state = useWorkflowStore.getState() + expect(state.blocks.block1.name).toBe('column ad') + }) + + it('should prevent renaming when another block has the same normalized name', () => { + const { updateBlockName } = useWorkflowStore.getState() + + const result = updateBlockName('block2', 'Column AD') + + expect(result).toBe(false) + + const state = useWorkflowStore.getState() + expect(state.blocks.block2.name).toBe('Employee Length') + }) + + it('should prevent renaming when another block has a name that normalizes to the same value', () => { + const { updateBlockName } = useWorkflowStore.getState() + + const result = updateBlockName('block2', 'columnad') + + expect(result).toBe(false) + + const state = useWorkflowStore.getState() + expect(state.blocks.block2.name).toBe('Employee Length') + }) + + it('should prevent renaming when another block has a similar name with different spacing', () => { + const { updateBlockName } = useWorkflowStore.getState() + + const result = updateBlockName('block3', 'employee length') + + expect(result).toBe(false) + + const state = useWorkflowStore.getState() + expect(state.blocks.block3.name).toBe('Start') + }) + + it('should handle edge cases with empty or whitespace-only names', () => { + const { updateBlockName } = useWorkflowStore.getState() + + const result1 = updateBlockName('block1', '') + expect(result1).toBe(true) + + const result2 = updateBlockName('block2', ' ') + expect(result2).toBe(true) + + const state = useWorkflowStore.getState() + expect(state.blocks.block1.name).toBe('') + expect(state.blocks.block2.name).toBe(' ') + }) + + it('should return false when trying to rename a non-existent block', () => { + const { updateBlockName } = useWorkflowStore.getState() + + const result = updateBlockName('nonexistent', 'New Name') + + expect(result).toBe(false) + }) + + it('should handle complex normalization cases correctly', () => { + const { updateBlockName } = useWorkflowStore.getState() + + const conflictingNames = [ + 'column ad', + 'COLUMN AD', + 'Column AD', + 'columnad', + 'ColumnAD', + 'COLUMNAD', + ] + + for (const name of conflictingNames) { + const result = updateBlockName('block2', name) + expect(result).toBe(false) + } + + const result = updateBlockName('block2', 'Unique Name') + expect(result).toBe(true) + + const state = useWorkflowStore.getState() + expect(state.blocks.block2.name).toBe('Unique Name') + }) + }) }) diff --git a/apps/sim/stores/workflows/workflow/store.ts b/apps/sim/stores/workflows/workflow/store.ts index 8ce5796d304..b388273fb8e 100644 --- a/apps/sim/stores/workflows/workflow/store.ts +++ b/apps/sim/stores/workflows/workflow/store.ts @@ -601,7 +601,33 @@ export const useWorkflowStore = create()( updateBlockName: (id: string, name: string) => { const oldBlock = get().blocks[id] - if (!oldBlock) return + if (!oldBlock) return false + + // Helper function to normalize block names (same as resolver) + const normalizeBlockName = (blockName: string): string => { + return blockName.toLowerCase().replace(/\s+/g, '') + } + + // Check for normalized name collisions + const normalizedNewName = normalizeBlockName(name) + const currentBlocks = get().blocks + + // Find any other block with the same normalized name + const conflictingBlock = Object.entries(currentBlocks).find(([blockId, block]) => { + return ( + blockId !== id && // Different block + block.name && // Has a name + normalizeBlockName(block.name) === normalizedNewName // Same normalized name + ) + }) + + if (conflictingBlock) { + // Don't allow the rename - another block already uses this normalized name + logger.error( + `Cannot rename block to "${name}" - another block "${conflictingBlock[1].name}" already uses the normalized name "${normalizedNewName}"` + ) + return false + } // Create a new state with the updated block name const newState = { @@ -696,6 +722,8 @@ export const useWorkflowStore = create()( pushHistory(set, get, newState, `${name} block name updated`) get().updateLastSaved() // Note: Socket.IO handles real-time sync automatically + + return true }, toggleBlockWide: (id: string) => { diff --git a/apps/sim/stores/workflows/workflow/types.ts b/apps/sim/stores/workflows/workflow/types.ts index 42d12ba0db5..553319a0e49 100644 --- a/apps/sim/stores/workflows/workflow/types.ts +++ b/apps/sim/stores/workflows/workflow/types.ts @@ -183,7 +183,7 @@ export interface WorkflowActions { toggleBlockEnabled: (id: string) => void duplicateBlock: (id: string) => void toggleBlockHandles: (id: string) => void - updateBlockName: (id: string, name: string) => void + updateBlockName: (id: string, name: string) => boolean toggleBlockWide: (id: string) => void setBlockWide: (id: string, isWide: boolean) => void setBlockAdvancedMode: (id: string, advancedMode: boolean) => void From 1ee4263e6098a0f6cdb940b8a519746bd9009c7a Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 22 Aug 2025 14:29:44 -0700 Subject: [PATCH 06/21] feat(helm): added CRON jobs to helm charts (#1107) --- helm/sim/README.md | 76 +++++++++++++++++++++++++++ helm/sim/templates/cronjobs.yaml | 90 ++++++++++++++++++++++++++++++++ helm/sim/values.yaml | 62 ++++++++++++++++++++++ 3 files changed, 228 insertions(+) create mode 100644 helm/sim/templates/cronjobs.yaml diff --git a/helm/sim/README.md b/helm/sim/README.md index fe75dd23421..c639f95adc4 100644 --- a/helm/sim/README.md +++ b/helm/sim/README.md @@ -314,6 +314,42 @@ The following table lists the configurable parameters and their default values. | `migrations.podSecurityContext` | Migrations pod security context | `fsGroup: 1001` | | `migrations.securityContext` | Migrations container security context | `runAsNonRoot: true, runAsUser: 1001` | +### CronJob Parameters + +| Parameter | Description | Default | +|-----------|-------------|---------| +| `cronjobs.enabled` | Enable all scheduled cron jobs | `true` | +| `cronjobs.image.repository` | CronJob image repository for HTTP requests | `curlimages/curl` | +| `cronjobs.image.tag` | CronJob image tag | `8.5.0` | +| `cronjobs.image.pullPolicy` | CronJob image pull policy | `IfNotPresent` | +| `cronjobs.resources` | CronJob resource limits and requests | See values.yaml | +| `cronjobs.restartPolicy` | CronJob pod restart policy | `OnFailure` | +| `cronjobs.activeDeadlineSeconds` | CronJob active deadline in seconds | `300` | +| `cronjobs.startingDeadlineSeconds` | CronJob starting deadline in seconds | `60` | +| `cronjobs.podSecurityContext` | CronJob pod security context | `fsGroup: 1001` | +| `cronjobs.securityContext` | CronJob container security context | `runAsNonRoot: true, runAsUser: 1001` | +| `cronjobs.jobs.scheduleExecution.enabled` | Enable schedule execution cron job | `true` | +| `cronjobs.jobs.scheduleExecution.name` | Schedule execution job name | `schedule-execution` | +| `cronjobs.jobs.scheduleExecution.schedule` | Schedule execution cron schedule | `"*/1 * * * *"` | +| `cronjobs.jobs.scheduleExecution.path` | Schedule execution API path | `"/api/schedules/execute"` | +| `cronjobs.jobs.scheduleExecution.concurrencyPolicy` | Schedule execution concurrency policy | `Forbid` | +| `cronjobs.jobs.scheduleExecution.successfulJobsHistoryLimit` | Schedule execution successful jobs history | `3` | +| `cronjobs.jobs.scheduleExecution.failedJobsHistoryLimit` | Schedule execution failed jobs history | `1` | +| `cronjobs.jobs.gmailWebhookPoll.enabled` | Enable Gmail webhook polling cron job | `true` | +| `cronjobs.jobs.gmailWebhookPoll.name` | Gmail webhook polling job name | `gmail-webhook-poll` | +| `cronjobs.jobs.gmailWebhookPoll.schedule` | Gmail webhook polling cron schedule | `"*/1 * * * *"` | +| `cronjobs.jobs.gmailWebhookPoll.path` | Gmail webhook polling API path | `"/api/webhooks/poll/gmail"` | +| `cronjobs.jobs.gmailWebhookPoll.concurrencyPolicy` | Gmail webhook polling concurrency policy | `Forbid` | +| `cronjobs.jobs.gmailWebhookPoll.successfulJobsHistoryLimit` | Gmail webhook polling successful jobs history | `3` | +| `cronjobs.jobs.gmailWebhookPoll.failedJobsHistoryLimit` | Gmail webhook polling failed jobs history | `1` | +| `cronjobs.jobs.outlookWebhookPoll.enabled` | Enable Outlook webhook polling cron job | `true` | +| `cronjobs.jobs.outlookWebhookPoll.name` | Outlook webhook polling job name | `outlook-webhook-poll` | +| `cronjobs.jobs.outlookWebhookPoll.schedule` | Outlook webhook polling cron schedule | `"*/1 * * * *"` | +| `cronjobs.jobs.outlookWebhookPoll.path` | Outlook webhook polling API path | `"/api/webhooks/poll/outlook"` | +| `cronjobs.jobs.outlookWebhookPoll.concurrencyPolicy` | Outlook webhook polling concurrency policy | `Forbid` | +| `cronjobs.jobs.outlookWebhookPoll.successfulJobsHistoryLimit` | Outlook webhook polling successful jobs history | `3` | +| `cronjobs.jobs.outlookWebhookPoll.failedJobsHistoryLimit` | Outlook webhook polling failed jobs history | `1` | + ### Shared Storage Parameters | Parameter | Description | Default | @@ -509,6 +545,46 @@ This creates network policies that: - Permit DNS resolution and HTTPS egress - Support custom ingress/egress rules +### CronJobs for Scheduled Tasks + +Enable automated scheduled tasks functionality: + +```yaml +cronjobs: + enabled: true + + # Customize individual jobs + jobs: + scheduleExecution: + enabled: true + schedule: "*/1 * * * *" # Every minute + + gmailWebhookPoll: + enabled: true + schedule: "*/1 * * * *" # Every minute + + outlookWebhookPoll: + enabled: true + schedule: "*/1 * * * *" # Every minute + + + # Global job configuration + resources: + limits: + memory: "256Mi" + cpu: "200m" + requests: + memory: "128Mi" + cpu: "100m" +``` + +This creates Kubernetes CronJob resources that: +- Execute HTTP requests to your application's API endpoints +- Handle retries and error logging automatically +- Use minimal resources with curl-based containers +- Support individual enable/disable per job +- Follow Kubernetes security best practices + ### High Availability Configure pod disruption budgets and anti-affinity: diff --git a/helm/sim/templates/cronjobs.yaml b/helm/sim/templates/cronjobs.yaml new file mode 100644 index 00000000000..95a54f74fe0 --- /dev/null +++ b/helm/sim/templates/cronjobs.yaml @@ -0,0 +1,90 @@ +{{- if .Values.cronjobs.enabled }} +{{- range $jobKey, $jobConfig := .Values.cronjobs.jobs }} +{{- if $jobConfig.enabled }} +--- +apiVersion: batch/v1 +kind: CronJob +metadata: + name: {{ include "sim.fullname" $ }}-{{ $jobConfig.name }} + labels: + {{- include "sim.labels" $ | nindent 4 }} + app.kubernetes.io/component: cronjob-{{ $jobConfig.name }} +spec: + schedule: {{ $jobConfig.schedule | quote }} + concurrencyPolicy: {{ $jobConfig.concurrencyPolicy | default "Forbid" }} + successfulJobsHistoryLimit: {{ $jobConfig.successfulJobsHistoryLimit | default 3 }} + failedJobsHistoryLimit: {{ $jobConfig.failedJobsHistoryLimit | default 1 }} + {{- with $.Values.cronjobs.startingDeadlineSeconds }} + startingDeadlineSeconds: {{ . }} + {{- end }} + jobTemplate: + spec: + {{- with $.Values.cronjobs.activeDeadlineSeconds }} + activeDeadlineSeconds: {{ . }} + {{- end }} + template: + metadata: + labels: + {{- include "sim.selectorLabels" $ | nindent 12 }} + app.kubernetes.io/component: cronjob-{{ $jobConfig.name }} + spec: + restartPolicy: {{ $.Values.cronjobs.restartPolicy | default "OnFailure" }} + {{- with $.Values.cronjobs.podSecurityContext }} + securityContext: + {{- toYaml . | nindent 12 }} + {{- end }} + containers: + - name: {{ $jobConfig.name }} + image: "{{ $.Values.cronjobs.image.repository }}:{{ $.Values.cronjobs.image.tag }}" + imagePullPolicy: {{ $.Values.cronjobs.image.pullPolicy }} + {{- with $.Values.cronjobs.securityContext }} + securityContext: + {{- toYaml . | nindent 14 }} + {{- end }} + command: + - /bin/sh + - -c + args: + - | + echo "Starting cron job: {{ $jobConfig.name }}" + echo "Making HTTP request to {{ $jobConfig.path }}" + + # Determine the service URL (use internal service regardless of ingress) + SERVICE_URL="http://{{ include "sim.fullname" $ }}-app:{{ $.Values.app.service.port }}" + + # Make the HTTP request with timeout and retry logic + for i in $(seq 1 3); do + echo "Attempt $i/3" + if curl -f -s -S --max-time 60 --retry 2 --retry-delay 5 \ + -H "Content-Type: application/json" \ + -H "User-Agent: Kubernetes-CronJob/{{ $jobConfig.name }}" \ + "$SERVICE_URL{{ $jobConfig.path }}"; then + echo "Success: HTTP request completed" + exit 0 + fi + echo "Attempt $i failed, retrying..." + sleep 10 + done + echo "Error: All attempts failed" + exit 1 + resources: + {{- toYaml $.Values.cronjobs.resources | nindent 14 }} + {{- with $.Values.global.imagePullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 12 }} + {{- end }} + {{- with $.Values.app.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 12 }} + {{- end }} + {{- with $.Values.affinity }} + affinity: + {{- toYaml . | nindent 12 }} + {{- end }} + {{- with $.Values.tolerations }} + tolerations: + {{- toYaml . | nindent 12 }} + {{- end }} +{{- end }} +{{- end }} +{{- end }} \ No newline at end of file diff --git a/helm/sim/values.yaml b/helm/sim/values.yaml index 680270a9163..90b6bd025ca 100644 --- a/helm/sim/values.yaml +++ b/helm/sim/values.yaml @@ -582,6 +582,68 @@ affinity: {} # Tolerations for scheduling on tainted nodes tolerations: [] +# CronJob configuration for scheduled tasks +cronjobs: + # Enable/disable all cron jobs + enabled: true + + # Individual job configurations + jobs: + scheduleExecution: + enabled: true + name: schedule-execution + schedule: "*/1 * * * *" + path: "/api/schedules/execute" + concurrencyPolicy: Forbid + successfulJobsHistoryLimit: 3 + failedJobsHistoryLimit: 1 + + gmailWebhookPoll: + enabled: true + name: gmail-webhook-poll + schedule: "*/1 * * * *" + path: "/api/webhooks/poll/gmail" + concurrencyPolicy: Forbid + successfulJobsHistoryLimit: 3 + failedJobsHistoryLimit: 1 + + outlookWebhookPoll: + enabled: true + name: outlook-webhook-poll + schedule: "*/1 * * * *" + path: "/api/webhooks/poll/outlook" + concurrencyPolicy: Forbid + successfulJobsHistoryLimit: 3 + failedJobsHistoryLimit: 1 + + + # Global CronJob settings + image: + repository: curlimages/curl + tag: 8.5.0 + pullPolicy: IfNotPresent + + resources: + limits: + memory: "128Mi" + cpu: "100m" + requests: + memory: "64Mi" + cpu: "50m" + + restartPolicy: OnFailure + activeDeadlineSeconds: 300 + startingDeadlineSeconds: 60 + + # Pod security context + podSecurityContext: + fsGroup: 1001 + + # Container security context + securityContext: + runAsNonRoot: true + runAsUser: 1001 + # Observability and telemetry configuration telemetry: # Enable/disable telemetry collection From be810013c770112522ed1fde568f6260412334f9 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Fri, 22 Aug 2025 14:43:21 -0700 Subject: [PATCH 07/21] feat(native-bg-tasks): support webhooks and async workflow executions without trigger.dev (#1106) * feat(native-bg-tasks): support webhooks and async workflow executions without trigger" * fix tests * fix env var defaults and revert async workflow execution to always use trigger * fix UI for hiding async * hide entire toggle --- apps/sim/app/api/__test-utils__/utils.ts | 12 + .../api/webhooks/trigger/[path]/route.test.ts | 36 +- .../app/api/webhooks/trigger/[path]/route.ts | 29 +- .../app/api/workflows/[id]/execute/route.ts | 2 +- .../example-command/example-command.tsx | 119 ++-- apps/sim/background/webhook-execution.ts | 609 +++++++++--------- apps/sim/background/workflow-execution.ts | 376 +++++------ apps/sim/lib/env.ts | 5 + 8 files changed, 617 insertions(+), 571 deletions(-) diff --git a/apps/sim/app/api/__test-utils__/utils.ts b/apps/sim/app/api/__test-utils__/utils.ts index 9f5cdf21c50..692212a0f0d 100644 --- a/apps/sim/app/api/__test-utils__/utils.ts +++ b/apps/sim/app/api/__test-utils__/utils.ts @@ -354,6 +354,18 @@ export function mockExecutionDependencies() { })) } +/** + * Mock Trigger.dev SDK (tasks.trigger and task factory) for tests that import background modules + */ +export function mockTriggerDevSdk() { + vi.mock('@trigger.dev/sdk', () => ({ + tasks: { + trigger: vi.fn().mockResolvedValue({ id: 'mock-task-id' }), + }, + task: vi.fn().mockReturnValue({}), + })) +} + export function mockWorkflowAccessValidation(shouldSucceed = true) { if (shouldSucceed) { vi.mock('@/app/api/workflows/middleware', () => ({ diff --git a/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts b/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts index d7a48dda192..e0c0eb1f3f2 100644 --- a/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts +++ b/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts @@ -5,7 +5,22 @@ import { NextRequest } from 'next/server' * @vitest-environment node */ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' -import { createMockRequest, mockExecutionDependencies } from '@/app/api/__test-utils__/utils' +import { + createMockRequest, + mockExecutionDependencies, + mockTriggerDevSdk, +} from '@/app/api/__test-utils__/utils' + +// Prefer mocking the background module to avoid loading Trigger.dev at all during tests +vi.mock('@/background/webhook-execution', () => ({ + executeWebhookJob: vi.fn().mockResolvedValue({ + success: true, + workflowId: 'test-workflow-id', + executionId: 'test-exec-id', + output: {}, + executedAt: new Date().toISOString(), + }), +})) const hasProcessedMessageMock = vi.fn().mockResolvedValue(false) const markMessageAsProcessedMock = vi.fn().mockResolvedValue(true) @@ -111,6 +126,7 @@ describe('Webhook Trigger API Route', () => { vi.resetAllMocks() mockExecutionDependencies() + mockTriggerDevSdk() vi.doMock('@/services/queue', () => ({ RateLimiter: vi.fn().mockImplementation(() => ({ @@ -309,11 +325,7 @@ describe('Webhook Trigger API Route', () => { const req = createMockRequest('POST', { event: 'test', id: 'test-123' }) const params = Promise.resolve({ path: 'test-path' }) - vi.doMock('@trigger.dev/sdk', () => ({ - tasks: { - trigger: vi.fn().mockResolvedValue({ id: 'mock-task-id' }), - }, - })) + mockTriggerDevSdk() const { POST } = await import('@/app/api/webhooks/trigger/[path]/route') const response = await POST(req, { params }) @@ -339,11 +351,7 @@ describe('Webhook Trigger API Route', () => { const req = createMockRequest('POST', { event: 'bearer.test' }, headers) const params = Promise.resolve({ path: 'test-path' }) - vi.doMock('@trigger.dev/sdk', () => ({ - tasks: { - trigger: vi.fn().mockResolvedValue({ id: 'mock-task-id' }), - }, - })) + mockTriggerDevSdk() const { POST } = await import('@/app/api/webhooks/trigger/[path]/route') const response = await POST(req, { params }) @@ -369,11 +377,7 @@ describe('Webhook Trigger API Route', () => { const req = createMockRequest('POST', { event: 'custom.header.test' }, headers) const params = Promise.resolve({ path: 'test-path' }) - vi.doMock('@trigger.dev/sdk', () => ({ - tasks: { - trigger: vi.fn().mockResolvedValue({ id: 'mock-task-id' }), - }, - })) + mockTriggerDevSdk() const { POST } = await import('@/app/api/webhooks/trigger/[path]/route') const response = await POST(req, { params }) diff --git a/apps/sim/app/api/webhooks/trigger/[path]/route.ts b/apps/sim/app/api/webhooks/trigger/[path]/route.ts index 21e14211fb2..1a01264fdc1 100644 --- a/apps/sim/app/api/webhooks/trigger/[path]/route.ts +++ b/apps/sim/app/api/webhooks/trigger/[path]/route.ts @@ -2,12 +2,14 @@ import { tasks } from '@trigger.dev/sdk' import { and, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { checkServerSideUsageLimits } from '@/lib/billing' +import { env, isTruthy } from '@/lib/env' import { createLogger } from '@/lib/logs/console/logger' import { handleSlackChallenge, handleWhatsAppVerification, validateMicrosoftTeamsSignature, } from '@/lib/webhooks/utils' +import { executeWebhookJob } from '@/background/webhook-execution' import { db } from '@/db' import { subscription, webhook, workflow } from '@/db/schema' import { RateLimiter } from '@/services/queue' @@ -17,6 +19,7 @@ const logger = createLogger('WebhookTriggerAPI') export const dynamic = 'force-dynamic' export const maxDuration = 300 +export const runtime = 'nodejs' /** * Webhook Verification Handler (GET) @@ -330,10 +333,9 @@ export async function POST( // Continue processing - better to risk usage limit bypass than fail webhook } - // --- PHASE 5: Queue webhook execution via trigger.dev --- + // --- PHASE 5: Queue webhook execution (trigger.dev or direct based on env) --- try { - // Queue the webhook execution task - const handle = await tasks.trigger('webhook-execution', { + const payload = { webhookId: foundWebhook.id, workflowId: foundWorkflow.id, userId: foundWorkflow.userId, @@ -342,11 +344,24 @@ export async function POST( headers: Object.fromEntries(request.headers.entries()), path, blockId: foundWebhook.blockId, - }) + } - logger.info( - `[${requestId}] Queued webhook execution task ${handle.id} for ${foundWebhook.provider} webhook` - ) + const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED) + + if (useTrigger) { + const handle = await tasks.trigger('webhook-execution', payload) + logger.info( + `[${requestId}] Queued webhook execution task ${handle.id} for ${foundWebhook.provider} webhook` + ) + } else { + // Fire-and-forget direct execution to avoid blocking webhook response + void executeWebhookJob(payload).catch((error) => { + logger.error(`[${requestId}] Direct webhook execution failed`, error) + }) + logger.info( + `[${requestId}] Queued direct webhook execution for ${foundWebhook.provider} webhook (Trigger.dev disabled)` + ) + } // Return immediate acknowledgment with provider-specific format if (foundWebhook.provider === 'microsoftteams') { diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 72147e97022..ad767fd5267 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -540,7 +540,7 @@ export async function POST( ) } - // Rate limit passed - trigger the task + // Rate limit passed - always use Trigger.dev for async executions const handle = await tasks.trigger('workflow-execution', { workflowId, userId: authenticatedUserId, diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/control-bar/components/deploy-modal/components/deployment-info/components/example-command/example-command.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/control-bar/components/deploy-modal/components/deployment-info/components/example-command/example-command.tsx index 27cab54f7af..62d220c9c86 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/control-bar/components/deploy-modal/components/deployment-info/components/example-command/example-command.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/control-bar/components/deploy-modal/components/deployment-info/components/example-command/example-command.tsx @@ -11,6 +11,7 @@ import { DropdownMenuTrigger, } from '@/components/ui/dropdown-menu' import { Label } from '@/components/ui/label' +import { getEnv, isTruthy } from '@/lib/env' interface ExampleCommandProps { command: string @@ -32,6 +33,7 @@ export function ExampleCommand({ }: ExampleCommandProps) { const [mode, setMode] = useState('sync') const [exampleType, setExampleType] = useState('execute') + const isAsyncEnabled = isTruthy(getEnv('NEXT_PUBLIC_TRIGGER_DEV_ENABLED')) // Format the curl command to use a placeholder for the API key const formatCurlCommand = (command: string, apiKey: string) => { @@ -146,62 +148,67 @@ export function ExampleCommand({
{showLabel && } -
- - - - - - - - setExampleType('execute')} - > - Async Execution - - setExampleType('status')}> - Check Job Status - - setExampleType('rate-limits')} - > - Rate Limits & Usage - - - -
+ {isAsyncEnabled && ( +
+ + + + + + + + setExampleType('execute')} + > + Async Execution + + setExampleType('status')} + > + Check Job Status + + setExampleType('rate-limits')} + > + Rate Limits & Usage + + + +
+ )}
diff --git a/apps/sim/background/webhook-execution.ts b/apps/sim/background/webhook-execution.ts index 4a6c50c0d51..df0bd8ea58b 100644 --- a/apps/sim/background/webhook-execution.ts +++ b/apps/sim/background/webhook-execution.ts @@ -17,362 +17,363 @@ import { mergeSubblockState } from '@/stores/workflows/server-utils' const logger = createLogger('TriggerWebhookExecution') -export const webhookExecution = task({ - id: 'webhook-execution', - retry: { - maxAttempts: 1, - }, - run: async (payload: { - webhookId: string - workflowId: string - userId: string - provider: string - body: any - headers: Record - path: string - blockId?: string - }) => { - const executionId = uuidv4() - const requestId = executionId.slice(0, 8) - - logger.info(`[${requestId}] Starting webhook execution via trigger.dev`, { - webhookId: payload.webhookId, - workflowId: payload.workflowId, - provider: payload.provider, - userId: payload.userId, - executionId, - }) +export type WebhookExecutionPayload = { + webhookId: string + workflowId: string + userId: string + provider: string + body: any + headers: Record + path: string + blockId?: string +} + +export async function executeWebhookJob(payload: WebhookExecutionPayload) { + const executionId = uuidv4() + const requestId = executionId.slice(0, 8) + + logger.info(`[${requestId}] Starting webhook execution`, { + webhookId: payload.webhookId, + workflowId: payload.workflowId, + provider: payload.provider, + userId: payload.userId, + executionId, + }) + + // Initialize logging session outside try block so it's available in catch + const loggingSession = new LoggingSession(payload.workflowId, executionId, 'webhook', requestId) + + try { + // Check usage limits first + const usageCheck = await checkServerSideUsageLimits(payload.userId) + if (usageCheck.isExceeded) { + logger.warn( + `[${requestId}] User ${payload.userId} has exceeded usage limits. Skipping webhook execution.`, + { + currentUsage: usageCheck.currentUsage, + limit: usageCheck.limit, + workflowId: payload.workflowId, + } + ) + throw new Error( + usageCheck.message || + 'Usage limit exceeded. Please upgrade your plan to continue using webhooks.' + ) + } - // Initialize logging session outside try block so it's available in catch - const loggingSession = new LoggingSession(payload.workflowId, executionId, 'webhook', requestId) + // Load workflow from normalized tables + const workflowData = await loadWorkflowFromNormalizedTables(payload.workflowId) + if (!workflowData) { + throw new Error(`Workflow not found: ${payload.workflowId}`) + } - try { - // Check usage limits first - const usageCheck = await checkServerSideUsageLimits(payload.userId) - if (usageCheck.isExceeded) { - logger.warn( - `[${requestId}] User ${payload.userId} has exceeded usage limits. Skipping webhook execution.`, - { - currentUsage: usageCheck.currentUsage, - limit: usageCheck.limit, - workflowId: payload.workflowId, + const { blocks, edges, loops, parallels } = workflowData + + // Get environment variables (matching workflow-execution pattern) + const [userEnv] = await db + .select() + .from(environmentTable) + .where(eq(environmentTable.userId, payload.userId)) + .limit(1) + + let decryptedEnvVars: Record = {} + if (userEnv) { + const decryptionPromises = Object.entries((userEnv.variables as any) || {}).map( + async ([key, encryptedValue]) => { + try { + const { decrypted } = await decryptSecret(encryptedValue as string) + return [key, decrypted] as const + } catch (error: any) { + logger.error(`[${requestId}] Failed to decrypt environment variable "${key}":`, error) + throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`) } - ) - throw new Error( - usageCheck.message || - 'Usage limit exceeded. Please upgrade your plan to continue using webhooks.' - ) - } + } + ) - // Load workflow from normalized tables - const workflowData = await loadWorkflowFromNormalizedTables(payload.workflowId) - if (!workflowData) { - throw new Error(`Workflow not found: ${payload.workflowId}`) - } + const decryptedPairs = await Promise.all(decryptionPromises) + decryptedEnvVars = Object.fromEntries(decryptedPairs) + } + + // Start logging session + await loggingSession.safeStart({ + userId: payload.userId, + workspaceId: '', // TODO: Get from workflow if needed + variables: decryptedEnvVars, + }) - const { blocks, edges, loops, parallels } = workflowData + // Merge subblock states (matching workflow-execution pattern) + const mergedStates = mergeSubblockState(blocks, {}) - // Get environment variables (matching workflow-execution pattern) - const [userEnv] = await db + // Process block states for execution + const processedBlockStates = Object.entries(mergedStates).reduce( + (acc, [blockId, blockState]) => { + acc[blockId] = Object.entries(blockState.subBlocks).reduce( + (subAcc, [key, subBlock]) => { + subAcc[key] = subBlock.value + return subAcc + }, + {} as Record + ) + return acc + }, + {} as Record> + ) + + // Handle workflow variables (for now, use empty object since we don't have workflow metadata) + const workflowVariables = {} + + // Create serialized workflow + const serializer = new Serializer() + const serializedWorkflow = serializer.serializeWorkflow( + mergedStates, + edges, + loops || {}, + parallels || {}, + true // Enable validation during execution + ) + + // Handle special Airtable case + if (payload.provider === 'airtable') { + logger.info(`[${requestId}] Processing Airtable webhook via fetchAndProcessAirtablePayloads`) + + // Load the actual webhook record from database to get providerConfig + const [webhookRecord] = await db .select() - .from(environmentTable) - .where(eq(environmentTable.userId, payload.userId)) + .from(webhook) + .where(eq(webhook.id, payload.webhookId)) .limit(1) - let decryptedEnvVars: Record = {} - if (userEnv) { - const decryptionPromises = Object.entries((userEnv.variables as any) || {}).map( - async ([key, encryptedValue]) => { - try { - const { decrypted } = await decryptSecret(encryptedValue as string) - return [key, decrypted] as const - } catch (error: any) { - logger.error(`[${requestId}] Failed to decrypt environment variable "${key}":`, error) - throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`) - } - } - ) + if (!webhookRecord) { + throw new Error(`Webhook record not found: ${payload.webhookId}`) + } - const decryptedPairs = await Promise.all(decryptionPromises) - decryptedEnvVars = Object.fromEntries(decryptedPairs) + const webhookData = { + id: payload.webhookId, + provider: payload.provider, + providerConfig: webhookRecord.providerConfig, } - // Start logging session - await loggingSession.safeStart({ + // Create a mock workflow object for Airtable processing + const mockWorkflow = { + id: payload.workflowId, userId: payload.userId, - workspaceId: '', // TODO: Get from workflow if needed - variables: decryptedEnvVars, - }) - - // Merge subblock states (matching workflow-execution pattern) - const mergedStates = mergeSubblockState(blocks, {}) - - // Process block states for execution - const processedBlockStates = Object.entries(mergedStates).reduce( - (acc, [blockId, blockState]) => { - acc[blockId] = Object.entries(blockState.subBlocks).reduce( - (subAcc, [key, subBlock]) => { - subAcc[key] = subBlock.value - return subAcc - }, - {} as Record - ) - return acc - }, - {} as Record> - ) + } - // Handle workflow variables (for now, use empty object since we don't have workflow metadata) - const workflowVariables = {} - - // Create serialized workflow - const serializer = new Serializer() - const serializedWorkflow = serializer.serializeWorkflow( - mergedStates, - edges, - loops || {}, - parallels || {}, - true // Enable validation during execution + // Get the processed Airtable input + const airtableInput = await fetchAndProcessAirtablePayloads( + webhookData, + mockWorkflow, + requestId ) - // Handle special Airtable case - if (payload.provider === 'airtable') { - logger.info( - `[${requestId}] Processing Airtable webhook via fetchAndProcessAirtablePayloads` - ) - - // Load the actual webhook record from database to get providerConfig - const [webhookRecord] = await db - .select() - .from(webhook) - .where(eq(webhook.id, payload.webhookId)) - .limit(1) - - if (!webhookRecord) { - throw new Error(`Webhook record not found: ${payload.webhookId}`) - } - - const webhookData = { - id: payload.webhookId, - provider: payload.provider, - providerConfig: webhookRecord.providerConfig, - } - - // Create a mock workflow object for Airtable processing - const mockWorkflow = { - id: payload.workflowId, - userId: payload.userId, - } - - // Get the processed Airtable input - const airtableInput = await fetchAndProcessAirtablePayloads( - webhookData, - mockWorkflow, - requestId - ) - - // If we got input (changes), execute the workflow like other providers - if (airtableInput) { - logger.info(`[${requestId}] Executing workflow with Airtable changes`) - - // Create executor and execute (same as standard webhook flow) - const executor = new Executor({ - workflow: serializedWorkflow, - currentBlockStates: processedBlockStates, - envVarValues: decryptedEnvVars, - workflowInput: airtableInput, - workflowVariables, - contextExtensions: { - executionId, - workspaceId: '', - }, - }) - - // Set up logging on the executor - loggingSession.setupExecutor(executor) - - // Execute the workflow - const result = await executor.execute(payload.workflowId, payload.blockId) - - // Check if we got a StreamingExecution result - const executionResult = - 'stream' in result && 'execution' in result ? result.execution : result - - logger.info(`[${requestId}] Airtable webhook execution completed`, { - success: executionResult.success, - workflowId: payload.workflowId, - }) - - // Update workflow run counts on success - if (executionResult.success) { - await updateWorkflowRunCounts(payload.workflowId) - - // Track execution in user stats - await db - .update(userStats) - .set({ - totalWebhookTriggers: sql`total_webhook_triggers + 1`, - lastActive: sql`now()`, - }) - .where(eq(userStats.userId, payload.userId)) - } + // If we got input (changes), execute the workflow like other providers + if (airtableInput) { + logger.info(`[${requestId}] Executing workflow with Airtable changes`) + + // Create executor and execute (same as standard webhook flow) + const executor = new Executor({ + workflow: serializedWorkflow, + currentBlockStates: processedBlockStates, + envVarValues: decryptedEnvVars, + workflowInput: airtableInput, + workflowVariables, + contextExtensions: { + executionId, + workspaceId: '', + }, + }) - // Build trace spans and complete logging session - const { traceSpans, totalDuration } = buildTraceSpans(executionResult) + // Set up logging on the executor + loggingSession.setupExecutor(executor) - await loggingSession.safeComplete({ - endedAt: new Date().toISOString(), - totalDurationMs: totalDuration || 0, - finalOutput: executionResult.output || {}, - traceSpans: traceSpans as any, - }) + // Execute the workflow + const result = await executor.execute(payload.workflowId, payload.blockId) - return { - success: executionResult.success, - workflowId: payload.workflowId, - executionId, - output: executionResult.output, - executedAt: new Date().toISOString(), - provider: payload.provider, - } - } - // No changes to process - logger.info(`[${requestId}] No Airtable changes to process`) + // Check if we got a StreamingExecution result + const executionResult = + 'stream' in result && 'execution' in result ? result.execution : result - await loggingSession.safeComplete({ - endedAt: new Date().toISOString(), - totalDurationMs: 0, - finalOutput: { message: 'No Airtable changes to process' }, - traceSpans: [], + logger.info(`[${requestId}] Airtable webhook execution completed`, { + success: executionResult.success, + workflowId: payload.workflowId, }) - return { - success: true, - workflowId: payload.workflowId, - executionId, - output: { message: 'No Airtable changes to process' }, - executedAt: new Date().toISOString(), + // Update workflow run counts on success + if (executionResult.success) { + await updateWorkflowRunCounts(payload.workflowId) + + // Track execution in user stats + await db + .update(userStats) + .set({ + totalWebhookTriggers: sql`total_webhook_triggers + 1`, + lastActive: sql`now()`, + }) + .where(eq(userStats.userId, payload.userId)) } - } - // Format input for standard webhooks - const mockWebhook = { - provider: payload.provider, - blockId: payload.blockId, - } - const mockWorkflow = { - id: payload.workflowId, - userId: payload.userId, - } - const mockRequest = { - headers: new Map(Object.entries(payload.headers)), - } as any + // Build trace spans and complete logging session + const { traceSpans, totalDuration } = buildTraceSpans(executionResult) - const input = formatWebhookInput(mockWebhook, mockWorkflow, payload.body, mockRequest) - - if (!input && payload.provider === 'whatsapp') { - logger.info(`[${requestId}] No messages in WhatsApp payload, skipping execution`) await loggingSession.safeComplete({ endedAt: new Date().toISOString(), - totalDurationMs: 0, - finalOutput: { message: 'No messages in WhatsApp payload' }, - traceSpans: [], + totalDurationMs: totalDuration || 0, + finalOutput: executionResult.output || {}, + traceSpans: traceSpans as any, }) + return { - success: true, + success: executionResult.success, workflowId: payload.workflowId, executionId, - output: { message: 'No messages in WhatsApp payload' }, + output: executionResult.output, executedAt: new Date().toISOString(), + provider: payload.provider, } } + // No changes to process + logger.info(`[${requestId}] No Airtable changes to process`) - // Create executor and execute - const executor = new Executor({ - workflow: serializedWorkflow, - currentBlockStates: processedBlockStates, - envVarValues: decryptedEnvVars, - workflowInput: input || {}, - workflowVariables, - contextExtensions: { - executionId, - workspaceId: '', // TODO: Get from workflow if needed - see comment on line 103 - }, + await loggingSession.safeComplete({ + endedAt: new Date().toISOString(), + totalDurationMs: 0, + finalOutput: { message: 'No Airtable changes to process' }, + traceSpans: [], }) - // Set up logging on the executor - loggingSession.setupExecutor(executor) - - logger.info(`[${requestId}] Executing workflow for ${payload.provider} webhook`) - - // Execute the workflow - const result = await executor.execute(payload.workflowId, payload.blockId) - - // Check if we got a StreamingExecution result - const executionResult = - 'stream' in result && 'execution' in result ? result.execution : result - - logger.info(`[${requestId}] Webhook execution completed`, { - success: executionResult.success, + return { + success: true, workflowId: payload.workflowId, - provider: payload.provider, - }) - - // Update workflow run counts on success - if (executionResult.success) { - await updateWorkflowRunCounts(payload.workflowId) - - // Track execution in user stats - await db - .update(userStats) - .set({ - totalWebhookTriggers: sql`total_webhook_triggers + 1`, - lastActive: sql`now()`, - }) - .where(eq(userStats.userId, payload.userId)) + executionId, + output: { message: 'No Airtable changes to process' }, + executedAt: new Date().toISOString(), } + } - // Build trace spans and complete logging session - const { traceSpans, totalDuration } = buildTraceSpans(executionResult) + // Format input for standard webhooks + const mockWebhook = { + provider: payload.provider, + blockId: payload.blockId, + } + const mockWorkflow = { + id: payload.workflowId, + userId: payload.userId, + } + const mockRequest = { + headers: new Map(Object.entries(payload.headers)), + } as any + const input = formatWebhookInput(mockWebhook, mockWorkflow, payload.body, mockRequest) + + if (!input && payload.provider === 'whatsapp') { + logger.info(`[${requestId}] No messages in WhatsApp payload, skipping execution`) await loggingSession.safeComplete({ endedAt: new Date().toISOString(), - totalDurationMs: totalDuration || 0, - finalOutput: executionResult.output || {}, - traceSpans: traceSpans as any, + totalDurationMs: 0, + finalOutput: { message: 'No messages in WhatsApp payload' }, + traceSpans: [], }) - return { - success: executionResult.success, + success: true, workflowId: payload.workflowId, executionId, - output: executionResult.output, + output: { message: 'No messages in WhatsApp payload' }, executedAt: new Date().toISOString(), - provider: payload.provider, } - } catch (error: any) { - logger.error(`[${requestId}] Webhook execution failed`, { - error: error.message, - stack: error.stack, - workflowId: payload.workflowId, - provider: payload.provider, - }) + } - // Complete logging session with error (matching workflow-execution pattern) - try { - await loggingSession.safeCompleteWithError({ - endedAt: new Date().toISOString(), - totalDurationMs: 0, - error: { - message: error.message || 'Webhook execution failed', - stackTrace: error.stack, - }, + // Create executor and execute + const executor = new Executor({ + workflow: serializedWorkflow, + currentBlockStates: processedBlockStates, + envVarValues: decryptedEnvVars, + workflowInput: input || {}, + workflowVariables, + contextExtensions: { + executionId, + workspaceId: '', // TODO: Get from workflow if needed - see comment on line 103 + }, + }) + + // Set up logging on the executor + loggingSession.setupExecutor(executor) + + logger.info(`[${requestId}] Executing workflow for ${payload.provider} webhook`) + + // Execute the workflow + const result = await executor.execute(payload.workflowId, payload.blockId) + + // Check if we got a StreamingExecution result + const executionResult = 'stream' in result && 'execution' in result ? result.execution : result + + logger.info(`[${requestId}] Webhook execution completed`, { + success: executionResult.success, + workflowId: payload.workflowId, + provider: payload.provider, + }) + + // Update workflow run counts on success + if (executionResult.success) { + await updateWorkflowRunCounts(payload.workflowId) + + // Track execution in user stats + await db + .update(userStats) + .set({ + totalWebhookTriggers: sql`total_webhook_triggers + 1`, + lastActive: sql`now()`, }) - } catch (loggingError) { - logger.error(`[${requestId}] Failed to complete logging session`, loggingError) - } + .where(eq(userStats.userId, payload.userId)) + } + + // Build trace spans and complete logging session + const { traceSpans, totalDuration } = buildTraceSpans(executionResult) + + await loggingSession.safeComplete({ + endedAt: new Date().toISOString(), + totalDurationMs: totalDuration || 0, + finalOutput: executionResult.output || {}, + traceSpans: traceSpans as any, + }) + + return { + success: executionResult.success, + workflowId: payload.workflowId, + executionId, + output: executionResult.output, + executedAt: new Date().toISOString(), + provider: payload.provider, + } + } catch (error: any) { + logger.error(`[${requestId}] Webhook execution failed`, { + error: error.message, + stack: error.stack, + workflowId: payload.workflowId, + provider: payload.provider, + }) - throw error // Let Trigger.dev handle retries + // Complete logging session with error (matching workflow-execution pattern) + try { + await loggingSession.safeCompleteWithError({ + endedAt: new Date().toISOString(), + totalDurationMs: 0, + error: { + message: error.message || 'Webhook execution failed', + stackTrace: error.stack, + }, + }) + } catch (loggingError) { + logger.error(`[${requestId}] Failed to complete logging session`, loggingError) } + + throw error + } +} + +export const webhookExecution = task({ + id: 'webhook-execution', + retry: { + maxAttempts: 1, }, + run: async (payload: WebhookExecutionPayload) => executeWebhookJob(payload), }) diff --git a/apps/sim/background/workflow-execution.ts b/apps/sim/background/workflow-execution.ts index 3e64bf37215..f861d3444b6 100644 --- a/apps/sim/background/workflow-execution.ts +++ b/apps/sim/background/workflow-execution.ts @@ -16,200 +16,202 @@ import { mergeSubblockState } from '@/stores/workflows/server-utils' const logger = createLogger('TriggerWorkflowExecution') -export const workflowExecution = task({ - id: 'workflow-execution', - retry: { - maxAttempts: 1, - }, - run: async (payload: { - workflowId: string - userId: string - input?: any - triggerType?: string - metadata?: Record - }) => { - const workflowId = payload.workflowId - const executionId = uuidv4() - const requestId = executionId.slice(0, 8) - - logger.info(`[${requestId}] Starting Trigger.dev workflow execution: ${workflowId}`, { - userId: payload.userId, - triggerType: payload.triggerType, - executionId, - }) +export type WorkflowExecutionPayload = { + workflowId: string + userId: string + input?: any + triggerType?: 'api' | 'webhook' | 'schedule' | 'manual' | 'chat' + metadata?: Record +} + +export async function executeWorkflowJob(payload: WorkflowExecutionPayload) { + const workflowId = payload.workflowId + const executionId = uuidv4() + const requestId = executionId.slice(0, 8) + + logger.info(`[${requestId}] Starting workflow execution: ${workflowId}`, { + userId: payload.userId, + triggerType: payload.triggerType, + executionId, + }) + + // Initialize logging session + const triggerType = payload.triggerType || 'api' + const loggingSession = new LoggingSession(workflowId, executionId, triggerType, requestId) + + try { + const usageCheck = await checkServerSideUsageLimits(payload.userId) + if (usageCheck.isExceeded) { + logger.warn( + `[${requestId}] User ${payload.userId} has exceeded usage limits. Skipping workflow execution.`, + { + currentUsage: usageCheck.currentUsage, + limit: usageCheck.limit, + workflowId: payload.workflowId, + } + ) + throw new Error( + usageCheck.message || + 'Usage limit exceeded. Please upgrade your plan to continue using workflows.' + ) + } - // Initialize logging session - const triggerType = - (payload.triggerType as 'api' | 'webhook' | 'schedule' | 'manual' | 'chat') || 'api' - const loggingSession = new LoggingSession(workflowId, executionId, triggerType, requestId) - - try { - const usageCheck = await checkServerSideUsageLimits(payload.userId) - if (usageCheck.isExceeded) { - logger.warn( - `[${requestId}] User ${payload.userId} has exceeded usage limits. Skipping workflow execution.`, - { - currentUsage: usageCheck.currentUsage, - limit: usageCheck.limit, - workflowId: payload.workflowId, - } - ) - throw new Error( - usageCheck.message || - 'Usage limit exceeded. Please upgrade your plan to continue using workflows.' + // Load workflow data from deployed state (this task is only used for API executions right now) + const workflowData = await loadDeployedWorkflowState(workflowId) + + const { blocks, edges, loops, parallels } = workflowData + + // Merge subblock states (server-safe version doesn't need workflowId) + const mergedStates = mergeSubblockState(blocks, {}) + + // Process block states for execution + const processedBlockStates = Object.entries(mergedStates).reduce( + (acc, [blockId, blockState]) => { + acc[blockId] = Object.entries(blockState.subBlocks).reduce( + (subAcc, [key, subBlock]) => { + subAcc[key] = subBlock.value + return subAcc + }, + {} as Record ) - } - - // Load workflow data from deployed state (this task is only used for API executions right now) - const workflowData = await loadDeployedWorkflowState(workflowId) - - const { blocks, edges, loops, parallels } = workflowData - - // Merge subblock states (server-safe version doesn't need workflowId) - const mergedStates = mergeSubblockState(blocks, {}) - - // Process block states for execution - const processedBlockStates = Object.entries(mergedStates).reduce( - (acc, [blockId, blockState]) => { - acc[blockId] = Object.entries(blockState.subBlocks).reduce( - (subAcc, [key, subBlock]) => { - subAcc[key] = subBlock.value - return subAcc - }, - {} as Record - ) - return acc - }, - {} as Record> + return acc + }, + {} as Record> + ) + + // Get environment variables + const [userEnv] = await db + .select() + .from(environmentTable) + .where(eq(environmentTable.userId, payload.userId)) + .limit(1) + + let decryptedEnvVars: Record = {} + if (userEnv) { + const decryptionPromises = Object.entries((userEnv.variables as any) || {}).map( + async ([key, encryptedValue]) => { + try { + const { decrypted } = await decryptSecret(encryptedValue as string) + return [key, decrypted] as const + } catch (error: any) { + logger.error(`[${requestId}] Failed to decrypt environment variable "${key}":`, error) + throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`) + } + } ) - // Get environment variables - const [userEnv] = await db - .select() - .from(environmentTable) - .where(eq(environmentTable.userId, payload.userId)) - .limit(1) - - let decryptedEnvVars: Record = {} - if (userEnv) { - const decryptionPromises = Object.entries((userEnv.variables as any) || {}).map( - async ([key, encryptedValue]) => { - try { - const { decrypted } = await decryptSecret(encryptedValue as string) - return [key, decrypted] as const - } catch (error: any) { - logger.error(`[${requestId}] Failed to decrypt environment variable "${key}":`, error) - throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`) - } - } - ) + const decryptedPairs = await Promise.all(decryptionPromises) + decryptedEnvVars = Object.fromEntries(decryptedPairs) + } - const decryptedPairs = await Promise.all(decryptionPromises) - decryptedEnvVars = Object.fromEntries(decryptedPairs) - } - - // Start logging session - await loggingSession.safeStart({ - userId: payload.userId, - workspaceId: '', // TODO: Get from workflow if needed - variables: decryptedEnvVars, - }) - - // Create serialized workflow - const serializer = new Serializer() - const serializedWorkflow = serializer.serializeWorkflow( - mergedStates, - edges, - loops || {}, - parallels || {}, - true // Enable validation during execution - ) + // Start logging session + await loggingSession.safeStart({ + userId: payload.userId, + workspaceId: '', // TODO: Get from workflow if needed + variables: decryptedEnvVars, + }) - // Create executor and execute - const executor = new Executor({ - workflow: serializedWorkflow, - currentBlockStates: processedBlockStates, - envVarValues: decryptedEnvVars, - workflowInput: payload.input || {}, - workflowVariables: {}, - contextExtensions: { - executionId, - workspaceId: '', // TODO: Get from workflow if needed - see comment on line 120 - }, - }) - - // Set up logging on the executor - loggingSession.setupExecutor(executor) - - const result = await executor.execute(workflowId) - - // Handle streaming vs regular result - const executionResult = - 'stream' in result && 'execution' in result ? result.execution : result - - logger.info(`[${requestId}] Workflow execution completed: ${workflowId}`, { - success: executionResult.success, - executionTime: executionResult.metadata?.duration, - executionId, - }) - - // Update workflow run counts on success - if (executionResult.success) { - await updateWorkflowRunCounts(workflowId) - - // Track execution in user stats - const statsUpdate = - triggerType === 'api' - ? { totalApiCalls: sql`total_api_calls + 1` } - : triggerType === 'webhook' - ? { totalWebhookTriggers: sql`total_webhook_triggers + 1` } - : triggerType === 'schedule' - ? { totalScheduledExecutions: sql`total_scheduled_executions + 1` } - : { totalManualExecutions: sql`total_manual_executions + 1` } - - await db - .update(userStats) - .set({ - ...statsUpdate, - lastActive: sql`now()`, - }) - .where(eq(userStats.userId, payload.userId)) - } - - // Build trace spans and complete logging session (for both success and failure) - const { traceSpans, totalDuration } = buildTraceSpans(executionResult) - - await loggingSession.safeComplete({ - endedAt: new Date().toISOString(), - totalDurationMs: totalDuration || 0, - finalOutput: executionResult.output || {}, - traceSpans: traceSpans as any, - }) - - return { - success: executionResult.success, - workflowId: payload.workflowId, + // Create serialized workflow + const serializer = new Serializer() + const serializedWorkflow = serializer.serializeWorkflow( + mergedStates, + edges, + loops || {}, + parallels || {}, + true // Enable validation during execution + ) + + // Create executor and execute + const executor = new Executor({ + workflow: serializedWorkflow, + currentBlockStates: processedBlockStates, + envVarValues: decryptedEnvVars, + workflowInput: payload.input || {}, + workflowVariables: {}, + contextExtensions: { executionId, - output: executionResult.output, - executedAt: new Date().toISOString(), - metadata: payload.metadata, - } - } catch (error: any) { - logger.error(`[${requestId}] Workflow execution failed: ${workflowId}`, { - error: error.message, - stack: error.stack, - }) - - await loggingSession.safeCompleteWithError({ - endedAt: new Date().toISOString(), - totalDurationMs: 0, - error: { - message: error.message || 'Workflow execution failed', - stackTrace: error.stack, - }, - }) - - throw error // Let Trigger.dev handle retries + workspaceId: '', // TODO: Get from workflow if needed - see comment on line 120 + }, + }) + + // Set up logging on the executor + loggingSession.setupExecutor(executor) + + const result = await executor.execute(workflowId) + + // Handle streaming vs regular result + const executionResult = 'stream' in result && 'execution' in result ? result.execution : result + + logger.info(`[${requestId}] Workflow execution completed: ${workflowId}`, { + success: executionResult.success, + executionTime: executionResult.metadata?.duration, + executionId, + }) + + // Update workflow run counts on success + if (executionResult.success) { + await updateWorkflowRunCounts(workflowId) + + // Track execution in user stats + const statsUpdate = + triggerType === 'api' + ? { totalApiCalls: sql`total_api_calls + 1` } + : triggerType === 'webhook' + ? { totalWebhookTriggers: sql`total_webhook_triggers + 1` } + : triggerType === 'schedule' + ? { totalScheduledExecutions: sql`total_scheduled_executions + 1` } + : { totalManualExecutions: sql`total_manual_executions + 1` } + + await db + .update(userStats) + .set({ + ...statsUpdate, + lastActive: sql`now()`, + }) + .where(eq(userStats.userId, payload.userId)) } + + // Build trace spans and complete logging session (for both success and failure) + const { traceSpans, totalDuration } = buildTraceSpans(executionResult) + + await loggingSession.safeComplete({ + endedAt: new Date().toISOString(), + totalDurationMs: totalDuration || 0, + finalOutput: executionResult.output || {}, + traceSpans: traceSpans as any, + }) + + return { + success: executionResult.success, + workflowId: payload.workflowId, + executionId, + output: executionResult.output, + executedAt: new Date().toISOString(), + metadata: payload.metadata, + } + } catch (error: any) { + logger.error(`[${requestId}] Workflow execution failed: ${workflowId}`, { + error: error.message, + stack: error.stack, + }) + + await loggingSession.safeCompleteWithError({ + endedAt: new Date().toISOString(), + totalDurationMs: 0, + error: { + message: error.message || 'Workflow execution failed', + stackTrace: error.stack, + }, + }) + + throw error + } +} + +export const workflowExecution = task({ + id: 'workflow-execution', + retry: { + maxAttempts: 1, }, + run: async (payload: WorkflowExecutionPayload) => executeWorkflowJob(payload), }) diff --git a/apps/sim/lib/env.ts b/apps/sim/lib/env.ts index 7f9fbecc706..456f463bdbb 100644 --- a/apps/sim/lib/env.ts +++ b/apps/sim/lib/env.ts @@ -97,6 +97,7 @@ export const env = createEnv({ // Background Jobs & Scheduling TRIGGER_SECRET_KEY: z.string().min(1).optional(), // Trigger.dev secret key for background jobs + TRIGGER_DEV_ENABLED: z.boolean().optional(), // Toggle to enable/disable Trigger.dev for async jobs CRON_SECRET: z.string().optional(), // Secret for authenticating cron job requests JOB_RETENTION_DAYS: z.string().optional().default('1'), // Days to retain job logs/data @@ -217,6 +218,9 @@ export const env = createEnv({ NEXT_PUBLIC_BRAND_ACCENT_COLOR: z.string().regex(/^#[0-9A-Fa-f]{6}$/).optional(), // Accent brand color (hex format) NEXT_PUBLIC_BRAND_ACCENT_HOVER_COLOR: z.string().regex(/^#[0-9A-Fa-f]{6}$/).optional(), // Accent brand hover state (hex format) NEXT_PUBLIC_BRAND_BACKGROUND_COLOR: z.string().regex(/^#[0-9A-Fa-f]{6}$/).optional(), // Brand background color (hex format) + + // Feature Flags + NEXT_PUBLIC_TRIGGER_DEV_ENABLED: z.boolean().optional(), // Client-side gate for async executions UI }, // Variables available on both server and client @@ -250,6 +254,7 @@ export const env = createEnv({ NEXT_PUBLIC_BRAND_ACCENT_COLOR: process.env.NEXT_PUBLIC_BRAND_ACCENT_COLOR, NEXT_PUBLIC_BRAND_ACCENT_HOVER_COLOR: process.env.NEXT_PUBLIC_BRAND_ACCENT_HOVER_COLOR, NEXT_PUBLIC_BRAND_BACKGROUND_COLOR: process.env.NEXT_PUBLIC_BRAND_BACKGROUND_COLOR, + NEXT_PUBLIC_TRIGGER_DEV_ENABLED: process.env.NEXT_PUBLIC_TRIGGER_DEV_ENABLED, NODE_ENV: process.env.NODE_ENV, NEXT_TELEMETRY_DISABLED: process.env.NEXT_TELEMETRY_DISABLED, }, From 917552f0416ef96649f6b82ddf70be098acf6283 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Fri, 22 Aug 2025 16:52:31 -0700 Subject: [PATCH 08/21] fix(billing): vercel cron not processing billing periods (#1112) --- apps/sim/app/api/billing/daily/route.ts | 59 +++++++++++++++++++++---- apps/sim/lib/auth/internal.ts | 4 +- 2 files changed, 53 insertions(+), 10 deletions(-) diff --git a/apps/sim/app/api/billing/daily/route.ts b/apps/sim/app/api/billing/daily/route.ts index 4b88f35635f..4ed088e8664 100644 --- a/apps/sim/app/api/billing/daily/route.ts +++ b/apps/sim/app/api/billing/daily/route.ts @@ -67,7 +67,7 @@ export async function POST(request: NextRequest) { { status: 500 } ) } catch (error) { - logger.error('Fatal error in monthly billing cron job', { error }) + logger.error('Fatal error in daily billing cron job', { error }) return NextResponse.json( { @@ -90,18 +90,59 @@ export async function GET(request: NextRequest) { return authError } - return NextResponse.json({ - status: 'ready', - message: - 'Daily billing check cron job is ready to process users and organizations with periods ending today', - currentDate: new Date().toISOString().split('T')[0], + const startTime = Date.now() + const result = await processDailyBillingCheck() + const duration = Date.now() - startTime + + if (result.success) { + logger.info('Daily billing check (GET) completed successfully', { + processedUsers: result.processedUsers, + processedOrganizations: result.processedOrganizations, + totalChargedAmount: result.totalChargedAmount, + duration: `${duration}ms`, + }) + + return NextResponse.json({ + success: true, + summary: { + processedUsers: result.processedUsers, + processedOrganizations: result.processedOrganizations, + totalChargedAmount: result.totalChargedAmount, + duration: `${duration}ms`, + }, + }) + } + + logger.error('Daily billing check (GET) completed with errors', { + processedUsers: result.processedUsers, + processedOrganizations: result.processedOrganizations, + totalChargedAmount: result.totalChargedAmount, + errorCount: result.errors.length, + errors: result.errors, + duration: `${duration}ms`, }) + + return NextResponse.json( + { + success: false, + summary: { + processedUsers: result.processedUsers, + processedOrganizations: result.processedOrganizations, + totalChargedAmount: result.totalChargedAmount, + errorCount: result.errors.length, + duration: `${duration}ms`, + }, + errors: result.errors, + }, + { status: 500 } + ) } catch (error) { - logger.error('Error in billing health check', { error }) + logger.error('Fatal error in daily billing (GET) cron job', { error }) return NextResponse.json( { - status: 'error', - error: error instanceof Error ? error.message : 'Unknown error', + success: false, + error: 'Internal server error during daily billing check', + details: error instanceof Error ? error.message : 'Unknown error', }, { status: 500 } ) diff --git a/apps/sim/lib/auth/internal.ts b/apps/sim/lib/auth/internal.ts index 29d74988ca3..c82355e849f 100644 --- a/apps/sim/lib/auth/internal.ts +++ b/apps/sim/lib/auth/internal.ts @@ -57,8 +57,10 @@ export async function verifyInternalToken(token: string): Promise { export function verifyCronAuth(request: NextRequest, context?: string): NextResponse | null { const authHeader = request.headers.get('authorization') const expectedAuth = `Bearer ${env.CRON_SECRET}` + const isVercelCron = request.headers.get('x-vercel-cron') === '1' - if (authHeader !== expectedAuth) { + // Allow Vercel Cron requests (they include x-vercel-cron header instead of Authorization) + if (!isVercelCron && authHeader !== expectedAuth) { const contextInfo = context ? ` for ${context}` : '' logger.warn(`Unauthorized CRON access attempt${contextInfo}`, { providedAuth: authHeader, From 780870c48eda6d1e2a9923411be01664e84ed297 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Sat, 23 Aug 2025 10:25:41 -0700 Subject: [PATCH 09/21] fix(billing): make subscription table source of truth for period start and period end (#1114) * fix(billing): vercel cron not processing billing periods * fix(billing): cleanup unused POST and fix bug with billing timing check * make subscriptions table source of truth for dates * update org routes * make everything dependent on stripe webhook --------- Co-authored-by: Waleed Latif Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> Co-authored-by: Adam Gough <77861281+aadamgough@users.noreply.github.com> Co-authored-by: Adam Gough --- apps/sim/app/api/billing/daily/route.ts | 150 ------------------ .../[id]/members/[memberId]/route.ts | 18 ++- .../api/organizations/[id]/members/route.ts | 16 +- apps/sim/lib/billing/core/billing-periods.ts | 11 +- apps/sim/lib/billing/core/billing.ts | 32 +--- .../lib/billing/core/organization-billing.ts | 9 +- apps/sim/lib/billing/core/usage.ts | 10 +- .../webhooks/stripe-invoice-webhooks.ts | 79 +++++++-- apps/sim/vercel.json | 4 - 9 files changed, 109 insertions(+), 220 deletions(-) delete mode 100644 apps/sim/app/api/billing/daily/route.ts diff --git a/apps/sim/app/api/billing/daily/route.ts b/apps/sim/app/api/billing/daily/route.ts deleted file mode 100644 index 4ed088e8664..00000000000 --- a/apps/sim/app/api/billing/daily/route.ts +++ /dev/null @@ -1,150 +0,0 @@ -import { type NextRequest, NextResponse } from 'next/server' -import { verifyCronAuth } from '@/lib/auth/internal' -import { processDailyBillingCheck } from '@/lib/billing/core/billing' -import { createLogger } from '@/lib/logs/console/logger' - -const logger = createLogger('DailyBillingCron') - -/** - * Daily billing CRON job endpoint that checks individual billing periods - */ -export async function POST(request: NextRequest) { - try { - const authError = verifyCronAuth(request, 'daily billing check') - if (authError) { - return authError - } - - logger.info('Starting daily billing check cron job') - - const startTime = Date.now() - - // Process overage billing for users and organizations with periods ending today - const result = await processDailyBillingCheck() - - const duration = Date.now() - startTime - - if (result.success) { - logger.info('Daily billing check completed successfully', { - processedUsers: result.processedUsers, - processedOrganizations: result.processedOrganizations, - totalChargedAmount: result.totalChargedAmount, - duration: `${duration}ms`, - }) - - return NextResponse.json({ - success: true, - summary: { - processedUsers: result.processedUsers, - processedOrganizations: result.processedOrganizations, - totalChargedAmount: result.totalChargedAmount, - duration: `${duration}ms`, - }, - }) - } - - logger.error('Daily billing check completed with errors', { - processedUsers: result.processedUsers, - processedOrganizations: result.processedOrganizations, - totalChargedAmount: result.totalChargedAmount, - errorCount: result.errors.length, - errors: result.errors, - duration: `${duration}ms`, - }) - - return NextResponse.json( - { - success: false, - summary: { - processedUsers: result.processedUsers, - processedOrganizations: result.processedOrganizations, - totalChargedAmount: result.totalChargedAmount, - errorCount: result.errors.length, - duration: `${duration}ms`, - }, - errors: result.errors, - }, - { status: 500 } - ) - } catch (error) { - logger.error('Fatal error in daily billing cron job', { error }) - - return NextResponse.json( - { - success: false, - error: 'Internal server error during daily billing check', - details: error instanceof Error ? error.message : 'Unknown error', - }, - { status: 500 } - ) - } -} - -/** - * GET endpoint for manual testing and health checks - */ -export async function GET(request: NextRequest) { - try { - const authError = verifyCronAuth(request, 'daily billing check health check') - if (authError) { - return authError - } - - const startTime = Date.now() - const result = await processDailyBillingCheck() - const duration = Date.now() - startTime - - if (result.success) { - logger.info('Daily billing check (GET) completed successfully', { - processedUsers: result.processedUsers, - processedOrganizations: result.processedOrganizations, - totalChargedAmount: result.totalChargedAmount, - duration: `${duration}ms`, - }) - - return NextResponse.json({ - success: true, - summary: { - processedUsers: result.processedUsers, - processedOrganizations: result.processedOrganizations, - totalChargedAmount: result.totalChargedAmount, - duration: `${duration}ms`, - }, - }) - } - - logger.error('Daily billing check (GET) completed with errors', { - processedUsers: result.processedUsers, - processedOrganizations: result.processedOrganizations, - totalChargedAmount: result.totalChargedAmount, - errorCount: result.errors.length, - errors: result.errors, - duration: `${duration}ms`, - }) - - return NextResponse.json( - { - success: false, - summary: { - processedUsers: result.processedUsers, - processedOrganizations: result.processedOrganizations, - totalChargedAmount: result.totalChargedAmount, - errorCount: result.errors.length, - duration: `${duration}ms`, - }, - errors: result.errors, - }, - { status: 500 } - ) - } catch (error) { - logger.error('Fatal error in daily billing (GET) cron job', { error }) - return NextResponse.json( - { - success: false, - error: 'Internal server error during daily billing check', - details: error instanceof Error ? error.message : 'Unknown error', - }, - { status: 500 } - ) - } -} diff --git a/apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts b/apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts index ba823229688..dc324edb9a3 100644 --- a/apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts +++ b/apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts @@ -1,6 +1,7 @@ import { and, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { getSession } from '@/lib/auth' +import { getUserUsageData } from '@/lib/billing/core/usage' import { createLogger } from '@/lib/logs/console/logger' import { db } from '@/db' import { member, user, userStats } from '@/db/schema' @@ -80,8 +81,6 @@ export async function GET( .select({ currentPeriodCost: userStats.currentPeriodCost, currentUsageLimit: userStats.currentUsageLimit, - billingPeriodStart: userStats.billingPeriodStart, - billingPeriodEnd: userStats.billingPeriodEnd, usageLimitSetBy: userStats.usageLimitSetBy, usageLimitUpdatedAt: userStats.usageLimitUpdatedAt, lastPeriodCost: userStats.lastPeriodCost, @@ -90,11 +89,22 @@ export async function GET( .where(eq(userStats.userId, memberId)) .limit(1) + const computed = await getUserUsageData(memberId) + if (usageData.length > 0) { memberData = { ...memberData, - usage: usageData[0], - } as typeof memberData & { usage: (typeof usageData)[0] } + usage: { + ...usageData[0], + billingPeriodStart: computed.billingPeriodStart, + billingPeriodEnd: computed.billingPeriodEnd, + }, + } as typeof memberData & { + usage: (typeof usageData)[0] & { + billingPeriodStart: Date | null + billingPeriodEnd: Date | null + } + } } } diff --git a/apps/sim/app/api/organizations/[id]/members/route.ts b/apps/sim/app/api/organizations/[id]/members/route.ts index 9a2cdf99529..9ae87b15c6c 100644 --- a/apps/sim/app/api/organizations/[id]/members/route.ts +++ b/apps/sim/app/api/organizations/[id]/members/route.ts @@ -3,6 +3,7 @@ import { and, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { getEmailSubject, renderInvitationEmail } from '@/components/emails/render-email' import { getSession } from '@/lib/auth' +import { getUserUsageData } from '@/lib/billing/core/usage' import { validateSeatAvailability } from '@/lib/billing/validation/seat-management' import { sendEmail } from '@/lib/email/mailer' import { quickValidateEmail } from '@/lib/email/validation' @@ -63,7 +64,7 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ // Include usage data if requested and user has admin access if (includeUsage && hasAdminAccess) { - const membersWithUsage = await db + const base = await db .select({ id: member.id, userId: member.userId, @@ -74,8 +75,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ userEmail: user.email, currentPeriodCost: userStats.currentPeriodCost, currentUsageLimit: userStats.currentUsageLimit, - billingPeriodStart: userStats.billingPeriodStart, - billingPeriodEnd: userStats.billingPeriodEnd, usageLimitSetBy: userStats.usageLimitSetBy, usageLimitUpdatedAt: userStats.usageLimitUpdatedAt, }) @@ -84,6 +83,17 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ .leftJoin(userStats, eq(user.id, userStats.userId)) .where(eq(member.organizationId, organizationId)) + const membersWithUsage = await Promise.all( + base.map(async (row) => { + const usage = await getUserUsageData(row.userId) + return { + ...row, + billingPeriodStart: usage.billingPeriodStart, + billingPeriodEnd: usage.billingPeriodEnd, + } + }) + ) + return NextResponse.json({ success: true, data: membersWithUsage, diff --git a/apps/sim/lib/billing/core/billing-periods.ts b/apps/sim/lib/billing/core/billing-periods.ts index c0448ac8b85..96cec45da6a 100644 --- a/apps/sim/lib/billing/core/billing-periods.ts +++ b/apps/sim/lib/billing/core/billing-periods.ts @@ -145,12 +145,9 @@ export async function initializeBillingPeriod( end = billingPeriod.end } - // Update user stats with billing period info await db .update(userStats) .set({ - billingPeriodStart: start, - billingPeriodEnd: end, currentPeriodCost: '0', }) .where(eq(userStats.userId, userId)) @@ -212,14 +209,12 @@ export async function resetUserBillingPeriod(userId: string): Promise { newPeriodEnd = billingPeriod.end } - // Archive current period cost and reset for new period + // Archive current period cost and reset for new period (no longer updating period dates in user_stats) await db .update(userStats) .set({ - lastPeriodCost: currentPeriodCost, // Archive previous period - currentPeriodCost: '0', // Reset to zero for new period - billingPeriodStart: newPeriodStart, - billingPeriodEnd: newPeriodEnd, + lastPeriodCost: currentPeriodCost, + currentPeriodCost: '0', }) .where(eq(userStats.userId, userId)) diff --git a/apps/sim/lib/billing/core/billing.ts b/apps/sim/lib/billing/core/billing.ts index 32c74e4d024..0088843aee1 100644 --- a/apps/sim/lib/billing/core/billing.ts +++ b/apps/sim/lib/billing/core/billing.ts @@ -9,7 +9,7 @@ import { getUserUsageData } from '@/lib/billing/core/usage' import { requireStripeClient } from '@/lib/billing/stripe-client' import { createLogger } from '@/lib/logs/console/logger' import { db } from '@/db' -import { member, organization, subscription, user, userStats } from '@/db/schema' +import { member, organization, subscription, user } from '@/db/schema' const logger = createLogger('Billing') @@ -673,15 +673,14 @@ export async function getUsersAndOrganizationsForOverageBilling(): Promise<{ continue // Skip free plans } - // Check if subscription period ends today + // Check if subscription period ends today (range-based, inclusive of day) let shouldBillToday = false if (sub.periodEnd) { const periodEnd = new Date(sub.periodEnd) - periodEnd.setUTCHours(0, 0, 0, 0) // Normalize to start of day + const endsToday = periodEnd >= today && periodEnd < tomorrow - // Bill if the subscription period ends today - if (periodEnd.getTime() === today.getTime()) { + if (endsToday) { shouldBillToday = true logger.info('Subscription period ends today', { referenceId: sub.referenceId, @@ -689,29 +688,6 @@ export async function getUsersAndOrganizationsForOverageBilling(): Promise<{ periodEnd: sub.periodEnd, }) } - } else { - // Fallback: Check userStats billing period for users - const userStatsRecord = await db - .select({ - billingPeriodEnd: userStats.billingPeriodEnd, - }) - .from(userStats) - .where(eq(userStats.userId, sub.referenceId)) - .limit(1) - - if (userStatsRecord.length > 0 && userStatsRecord[0].billingPeriodEnd) { - const billingPeriodEnd = new Date(userStatsRecord[0].billingPeriodEnd) - billingPeriodEnd.setUTCHours(0, 0, 0, 0) // Normalize to start of day - - if (billingPeriodEnd.getTime() === today.getTime()) { - shouldBillToday = true - logger.info('User billing period ends today (from userStats)', { - userId: sub.referenceId, - plan: sub.plan, - billingPeriodEnd: userStatsRecord[0].billingPeriodEnd, - }) - } - } } if (shouldBillToday) { diff --git a/apps/sim/lib/billing/core/organization-billing.ts b/apps/sim/lib/billing/core/organization-billing.ts index dc2c758ba18..7294ca20dab 100644 --- a/apps/sim/lib/billing/core/organization-billing.ts +++ b/apps/sim/lib/billing/core/organization-billing.ts @@ -94,8 +94,6 @@ export async function getOrganizationBillingData( // User stats fields currentPeriodCost: userStats.currentPeriodCost, currentUsageLimit: userStats.currentUsageLimit, - billingPeriodStart: userStats.billingPeriodStart, - billingPeriodEnd: userStats.billingPeriodEnd, lastActive: userStats.lastActive, }) .from(member) @@ -151,10 +149,9 @@ export async function getOrganizationBillingData( const averageUsagePerMember = members.length > 0 ? totalCurrentUsage / members.length : 0 - // Get billing period from first member (should be consistent across org) - const firstMember = membersWithUsage[0] - const billingPeriodStart = firstMember?.billingPeriodStart || null - const billingPeriodEnd = firstMember?.billingPeriodEnd || null + // Billing period comes from the organization's subscription + const billingPeriodStart = subscription.periodStart || null + const billingPeriodEnd = subscription.periodEnd || null return { organizationId, diff --git a/apps/sim/lib/billing/core/usage.ts b/apps/sim/lib/billing/core/usage.ts index 98f5d489668..babfb8af04f 100644 --- a/apps/sim/lib/billing/core/usage.ts +++ b/apps/sim/lib/billing/core/usage.ts @@ -41,6 +41,7 @@ export async function getUserUsageData(userId: string): Promise { } const stats = userStatsData[0] + const subscription = await getHighestPrioritySubscription(userId) const currentUsage = Number.parseFloat( stats.currentPeriodCost?.toString() ?? stats.totalCost.toString() ) @@ -49,14 +50,19 @@ export async function getUserUsageData(userId: string): Promise { const isWarning = percentUsed >= 80 const isExceeded = currentUsage >= limit + // Derive billing period dates from subscription (source of truth). + // For free users or missing dates, expose nulls. + const billingPeriodStart = subscription?.periodStart ?? null + const billingPeriodEnd = subscription?.periodEnd ?? null + return { currentUsage, limit, percentUsed, isWarning, isExceeded, - billingPeriodStart: stats.billingPeriodStart, - billingPeriodEnd: stats.billingPeriodEnd, + billingPeriodStart, + billingPeriodEnd, lastPeriodCost: Number.parseFloat(stats.lastPeriodCost?.toString() || '0'), } } catch (error) { diff --git a/apps/sim/lib/billing/webhooks/stripe-invoice-webhooks.ts b/apps/sim/lib/billing/webhooks/stripe-invoice-webhooks.ts index 37df50d90f8..94ffaac234d 100644 --- a/apps/sim/lib/billing/webhooks/stripe-invoice-webhooks.ts +++ b/apps/sim/lib/billing/webhooks/stripe-invoice-webhooks.ts @@ -1,5 +1,12 @@ +import { eq } from 'drizzle-orm' import type Stripe from 'stripe' +import { + resetOrganizationBillingPeriod, + resetUserBillingPeriod, +} from '@/lib/billing/core/billing-periods' import { createLogger } from '@/lib/logs/console/logger' +import { db } from '@/db' +import { subscription as subscriptionTable } from '@/db/schema' const logger = createLogger('StripeInvoiceWebhooks') @@ -99,27 +106,69 @@ export async function handleInvoiceFinalized(event: Stripe.Event) { try { const invoice = event.data.object as Stripe.Invoice - // Check if this is an overage billing invoice - if (invoice.metadata?.type !== 'overage_billing') { - logger.info('Ignoring non-overage billing invoice finalization', { invoiceId: invoice.id }) + // Case 1: Overage invoices (metadata.type === 'overage_billing') + if (invoice.metadata?.type === 'overage_billing') { + const customerId = invoice.customer as string + const invoiceAmount = invoice.amount_due / 100 + const billingPeriod = invoice.metadata?.billingPeriod || 'unknown' + + logger.info('Overage billing invoice finalized', { + invoiceId: invoice.id, + customerId, + invoiceAmount, + billingPeriod, + customerEmail: invoice.customer_email, + hostedInvoiceUrl: invoice.hosted_invoice_url, + }) + return } - const customerId = invoice.customer as string - const invoiceAmount = invoice.amount_due / 100 // Convert from cents to dollars - const billingPeriod = invoice.metadata?.billingPeriod || 'unknown' + // Case 2: Subscription cycle invoices (primary period rollover) + // When an invoice is finalized for a subscription cycle, align our usage reset to this boundary + if (invoice.subscription) { + const stripeSubscriptionId = String(invoice.subscription) + + const records = await db + .select() + .from(subscriptionTable) + .where(eq(subscriptionTable.stripeSubscriptionId, stripeSubscriptionId)) + .limit(1) + + if (records.length === 0) { + logger.warn('No matching internal subscription for Stripe invoice subscription', { + invoiceId: invoice.id, + stripeSubscriptionId, + }) + return + } + + const sub = records[0] + + // Idempotent reset aligned to the subscription’s new cycle + if (sub.plan === 'team' || sub.plan === 'enterprise') { + await resetOrganizationBillingPeriod(sub.referenceId) + logger.info('Reset organization billing period on subscription invoice finalization', { + invoiceId: invoice.id, + organizationId: sub.referenceId, + plan: sub.plan, + }) + } else { + await resetUserBillingPeriod(sub.referenceId) + logger.info('Reset user billing period on subscription invoice finalization', { + invoiceId: invoice.id, + userId: sub.referenceId, + plan: sub.plan, + }) + } - logger.info('Overage billing invoice finalized', { + return + } + + logger.info('Ignoring non-subscription invoice finalization', { invoiceId: invoice.id, - customerId, - invoiceAmount, - billingPeriod, - customerEmail: invoice.customer_email, - hostedInvoiceUrl: invoice.hosted_invoice_url, + billingReason: invoice.billing_reason, }) - - // Additional invoice finalization logic can be added here - // For example: update internal records, trigger notifications, etc. } catch (error) { logger.error('Failed to handle invoice finalized', { eventId: event.id, diff --git a/apps/sim/vercel.json b/apps/sim/vercel.json index 953eb59329a..ee0ae552623 100644 --- a/apps/sim/vercel.json +++ b/apps/sim/vercel.json @@ -15,10 +15,6 @@ { "path": "/api/logs/cleanup", "schedule": "0 0 * * *" - }, - { - "path": "/api/billing/daily", - "schedule": "0 2 * * *" } ] } From 25b2c45ec027280377bffdfe0b3abc4d1ff4dd2b Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Sat, 23 Aug 2025 10:50:23 -0700 Subject: [PATCH 10/21] fix(billing): change reset user stats func to invoice payment succeeded (#1116) * fix(billing): change reset user stats func to invoice payment succeeded * remove nonexistent billing reason --- .../webhooks/stripe-invoice-webhooks.ts | 131 +++++++++--------- 1 file changed, 66 insertions(+), 65 deletions(-) diff --git a/apps/sim/lib/billing/webhooks/stripe-invoice-webhooks.ts b/apps/sim/lib/billing/webhooks/stripe-invoice-webhooks.ts index 94ffaac234d..7cf1b46c4a1 100644 --- a/apps/sim/lib/billing/webhooks/stripe-invoice-webhooks.ts +++ b/apps/sim/lib/billing/webhooks/stripe-invoice-webhooks.ts @@ -18,27 +18,75 @@ export async function handleInvoicePaymentSucceeded(event: Stripe.Event) { try { const invoice = event.data.object as Stripe.Invoice - // Check if this is an overage billing invoice - if (invoice.metadata?.type !== 'overage_billing') { - logger.info('Ignoring non-overage billing invoice', { invoiceId: invoice.id }) + // Case 1: Overage invoices (metadata.type === 'overage_billing') + if (invoice.metadata?.type === 'overage_billing') { + const customerId = invoice.customer as string + const chargedAmount = invoice.amount_paid / 100 + const billingPeriod = invoice.metadata?.billingPeriod || 'unknown' + + logger.info('Overage billing invoice payment succeeded', { + invoiceId: invoice.id, + customerId, + chargedAmount, + billingPeriod, + customerEmail: invoice.customer_email, + hostedInvoiceUrl: invoice.hosted_invoice_url, + }) + return } - const customerId = invoice.customer as string - const chargedAmount = invoice.amount_paid / 100 // Convert from cents to dollars - const billingPeriod = invoice.metadata?.billingPeriod || 'unknown' + // Case 2: Subscription renewal invoice paid (primary period rollover) + // Only reset on successful payment to avoid granting a new period while in dunning + if (invoice.subscription) { + // Filter to subscription-cycle renewals; ignore updates/off-cycle charges + const reason = invoice.billing_reason + const isCycle = reason === 'subscription_cycle' + if (!isCycle) { + logger.info('Ignoring non-cycle subscription invoice on payment_succeeded', { + invoiceId: invoice.id, + billingReason: reason, + }) + return + } - logger.info('Overage billing invoice payment succeeded', { - invoiceId: invoice.id, - customerId, - chargedAmount, - billingPeriod, - customerEmail: invoice.customer_email, - hostedInvoiceUrl: invoice.hosted_invoice_url, - }) + const stripeSubscriptionId = String(invoice.subscription) + const records = await db + .select() + .from(subscriptionTable) + .where(eq(subscriptionTable.stripeSubscriptionId, stripeSubscriptionId)) + .limit(1) - // Additional payment success logic can be added here - // For example: update internal billing status, trigger analytics events, etc. + if (records.length === 0) { + logger.warn('No matching internal subscription for paid Stripe invoice', { + invoiceId: invoice.id, + stripeSubscriptionId, + }) + return + } + + const sub = records[0] + + if (sub.plan === 'team' || sub.plan === 'enterprise') { + await resetOrganizationBillingPeriod(sub.referenceId) + logger.info('Reset organization billing period on subscription invoice payment', { + invoiceId: invoice.id, + organizationId: sub.referenceId, + plan: sub.plan, + }) + } else { + await resetUserBillingPeriod(sub.referenceId) + logger.info('Reset user billing period on subscription invoice payment', { + invoiceId: invoice.id, + userId: sub.referenceId, + plan: sub.plan, + }) + } + + return + } + + logger.info('Ignoring non-subscription invoice payment', { invoiceId: invoice.id }) } catch (error) { logger.error('Failed to handle invoice payment succeeded', { eventId: event.id, @@ -105,67 +153,20 @@ export async function handleInvoicePaymentFailed(event: Stripe.Event) { export async function handleInvoiceFinalized(event: Stripe.Event) { try { const invoice = event.data.object as Stripe.Invoice - - // Case 1: Overage invoices (metadata.type === 'overage_billing') + // Do not reset usage on finalized; wait for payment success to avoid granting new period during dunning if (invoice.metadata?.type === 'overage_billing') { const customerId = invoice.customer as string const invoiceAmount = invoice.amount_due / 100 const billingPeriod = invoice.metadata?.billingPeriod || 'unknown' - logger.info('Overage billing invoice finalized', { invoiceId: invoice.id, customerId, invoiceAmount, billingPeriod, - customerEmail: invoice.customer_email, - hostedInvoiceUrl: invoice.hosted_invoice_url, }) - - return - } - - // Case 2: Subscription cycle invoices (primary period rollover) - // When an invoice is finalized for a subscription cycle, align our usage reset to this boundary - if (invoice.subscription) { - const stripeSubscriptionId = String(invoice.subscription) - - const records = await db - .select() - .from(subscriptionTable) - .where(eq(subscriptionTable.stripeSubscriptionId, stripeSubscriptionId)) - .limit(1) - - if (records.length === 0) { - logger.warn('No matching internal subscription for Stripe invoice subscription', { - invoiceId: invoice.id, - stripeSubscriptionId, - }) - return - } - - const sub = records[0] - - // Idempotent reset aligned to the subscription’s new cycle - if (sub.plan === 'team' || sub.plan === 'enterprise') { - await resetOrganizationBillingPeriod(sub.referenceId) - logger.info('Reset organization billing period on subscription invoice finalization', { - invoiceId: invoice.id, - organizationId: sub.referenceId, - plan: sub.plan, - }) - } else { - await resetUserBillingPeriod(sub.referenceId) - logger.info('Reset user billing period on subscription invoice finalization', { - invoiceId: invoice.id, - userId: sub.referenceId, - plan: sub.plan, - }) - } - return } - - logger.info('Ignoring non-subscription invoice finalization', { + logger.info('Ignoring subscription invoice finalization; will act on payment_succeeded', { invoiceId: invoice.id, billingReason: invoice.billing_reason, }) From 730164abee207a9d770c1c9cf9de48698cb02a16 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Sat, 23 Aug 2025 13:15:12 -0700 Subject: [PATCH 11/21] fix(custom-tool): fix textarea, param dropdown for available params, validation for invalid schemas, variable resolution in custom tools and subflow tags (#1117) * fix(custom-tools): fix text area for custom tools * added param dropdown in agent custom tool * add syntax highlighting for params, fix dropdown styling * ux * add tooltip to prevent indicate invalid json schema on schema and code tabs * feat(custom-tool): added stricter JSON schema validation and error when saving json schema for custom tools * fix(custom-tool): allow variable resolution in custom tools * fix variable resolution in subflow tags * refactored function execution to use helpers * cleanup * fix block variable resolution to inject at runtime * fix highlighting code --------- Co-authored-by: Vikhyath Mondreti --- apps/sim/app/api/function/execute/route.ts | 139 +++++++- apps/sim/app/api/providers/route.ts | 7 + .../components/code-editor/code-editor.tsx | 92 ++++- .../custom-tool-modal/custom-tool-modal.tsx | 318 +++++++++++++++++- .../components/tool-input/tool-input.tsx | 3 +- .../executor/handlers/agent/agent-handler.ts | 45 +++ .../function/function-handler.test.ts | 3 + .../handlers/function/function-handler.ts | 43 ++- .../executor/handlers/loop/loop-handler.ts | 5 +- .../handlers/parallel/parallel-handler.ts | 5 +- apps/sim/executor/index.ts | 34 +- apps/sim/executor/types.ts | 1 + apps/sim/providers/anthropic/index.ts | 50 +-- apps/sim/providers/azure-openai/index.ts | 26 +- apps/sim/providers/cerebras/index.ts | 21 +- apps/sim/providers/deepseek/index.ts | 26 +- apps/sim/providers/groq/index.ts | 21 +- apps/sim/providers/types.ts | 3 + apps/sim/providers/utils.ts | 20 +- apps/sim/providers/xai/index.ts | 26 +- apps/sim/tools/function/execute.test.ts | 3 + apps/sim/tools/function/execute.ts | 8 + apps/sim/tools/function/types.ts | 1 + apps/sim/tools/index.ts | 14 +- apps/sim/tools/utils.test.ts | 6 + apps/sim/tools/utils.ts | 10 + 26 files changed, 687 insertions(+), 243 deletions(-) diff --git a/apps/sim/app/api/function/execute/route.ts b/apps/sim/app/api/function/execute/route.ts index 96d1d6ed325..08dfae0682f 100644 --- a/apps/sim/app/api/function/execute/route.ts +++ b/apps/sim/app/api/function/execute/route.ts @@ -213,24 +213,81 @@ function createUserFriendlyErrorMessage( } /** - * Resolves environment variables and tags in code - * @param code - Code with variables - * @param params - Parameters that may contain variable values - * @param envVars - Environment variables from the workflow - * @returns Resolved code + * Resolves workflow variables with syntax */ +function resolveWorkflowVariables( + code: string, + workflowVariables: Record, + contextVariables: Record +): string { + let resolvedCode = code -function resolveCodeVariables( + const variableMatches = resolvedCode.match(/]+)>/g) || [] + for (const match of variableMatches) { + const variableName = match.slice(' (variable.name || '').replace(/\s+/g, '') === variableName + ) + + if (foundVariable) { + const variable = foundVariable[1] + // Get the typed value - handle different variable types + let variableValue = variable.value + + if (variable.value !== undefined && variable.value !== null) { + try { + // Handle 'string' type the same as 'plain' for backward compatibility + const type = variable.type === 'string' ? 'plain' : variable.type + + // For plain text, use exactly what's entered without modifications + if (type === 'plain' && typeof variableValue === 'string') { + // Use as-is for plain text + } else if (type === 'number') { + variableValue = Number(variableValue) + } else if (type === 'boolean') { + variableValue = variableValue === 'true' || variableValue === true + } else if (type === 'json') { + try { + variableValue = + typeof variableValue === 'string' ? JSON.parse(variableValue) : variableValue + } catch { + // Keep original value if JSON parsing fails + } + } + } catch (error) { + // Fallback to original value on error + variableValue = variable.value + } + } + + // Create a safe variable reference + const safeVarName = `__variable_${variableName.replace(/[^a-zA-Z0-9_]/g, '_')}` + contextVariables[safeVarName] = variableValue + + // Replace the variable reference with the safe variable name + resolvedCode = resolvedCode.replace(new RegExp(escapeRegExp(match), 'g'), safeVarName) + } else { + // Variable not found - replace with empty string to avoid syntax errors + resolvedCode = resolvedCode.replace(new RegExp(escapeRegExp(match), 'g'), '') + } + } + + return resolvedCode +} + +/** + * Resolves environment variables with {{var_name}} syntax + */ +function resolveEnvironmentVariables( code: string, params: Record, - envVars: Record = {}, - blockData: Record = {}, - blockNameMapping: Record = {} -): { resolvedCode: string; contextVariables: Record } { + envVars: Record, + contextVariables: Record +): string { let resolvedCode = code - const contextVariables: Record = {} - // Resolve environment variables with {{var_name}} syntax const envVarMatches = resolvedCode.match(/\{\{([^}]+)\}\}/g) || [] for (const match of envVarMatches) { const varName = match.slice(2, -2).trim() @@ -245,7 +302,21 @@ function resolveCodeVariables( resolvedCode = resolvedCode.replace(new RegExp(escapeRegExp(match), 'g'), safeVarName) } - // Resolve tags with syntax (including nested paths like ) + return resolvedCode +} + +/** + * Resolves tags with syntax (including nested paths like ) + */ +function resolveTagVariables( + code: string, + params: Record, + blockData: Record, + blockNameMapping: Record, + contextVariables: Record +): string { + let resolvedCode = code + const tagMatches = resolvedCode.match(/<([a-zA-Z_][a-zA-Z0-9_.]*[a-zA-Z0-9_])>/g) || [] for (const match of tagMatches) { @@ -300,6 +371,42 @@ function resolveCodeVariables( resolvedCode = resolvedCode.replace(new RegExp(escapeRegExp(match), 'g'), safeVarName) } + return resolvedCode +} + +/** + * Resolves environment variables and tags in code + * @param code - Code with variables + * @param params - Parameters that may contain variable values + * @param envVars - Environment variables from the workflow + * @returns Resolved code + */ +function resolveCodeVariables( + code: string, + params: Record, + envVars: Record = {}, + blockData: Record = {}, + blockNameMapping: Record = {}, + workflowVariables: Record = {} +): { resolvedCode: string; contextVariables: Record } { + let resolvedCode = code + const contextVariables: Record = {} + + // Resolve workflow variables with syntax first + resolvedCode = resolveWorkflowVariables(resolvedCode, workflowVariables, contextVariables) + + // Resolve environment variables with {{var_name}} syntax + resolvedCode = resolveEnvironmentVariables(resolvedCode, params, envVars, contextVariables) + + // Resolve tags with syntax (including nested paths like ) + resolvedCode = resolveTagVariables( + resolvedCode, + params, + blockData, + blockNameMapping, + contextVariables + ) + return { resolvedCode, contextVariables } } @@ -338,6 +445,7 @@ export async function POST(req: NextRequest) { envVars = {}, blockData = {}, blockNameMapping = {}, + workflowVariables = {}, workflowId, isCustomTool = false, } = body @@ -360,7 +468,8 @@ export async function POST(req: NextRequest) { executionParams, envVars, blockData, - blockNameMapping + blockNameMapping, + workflowVariables ) resolvedCode = codeResolution.resolvedCode const contextVariables = codeResolution.contextVariables @@ -368,8 +477,8 @@ export async function POST(req: NextRequest) { const executionMethod = 'vm' // Default execution method logger.info(`[${requestId}] Using VM for code execution`, { - resolvedCode, hasEnvVars: Object.keys(envVars).length > 0, + hasWorkflowVariables: Object.keys(workflowVariables).length > 0, }) // Create a secure context with console logging diff --git a/apps/sim/app/api/providers/route.ts b/apps/sim/app/api/providers/route.ts index 17d37a4f38f..8aa62f7e71b 100644 --- a/apps/sim/app/api/providers/route.ts +++ b/apps/sim/app/api/providers/route.ts @@ -39,6 +39,9 @@ export async function POST(request: NextRequest) { stream, messages, environmentVariables, + workflowVariables, + blockData, + blockNameMapping, reasoningEffort, verbosity, } = body @@ -60,6 +63,7 @@ export async function POST(request: NextRequest) { messageCount: messages?.length || 0, hasEnvironmentVariables: !!environmentVariables && Object.keys(environmentVariables).length > 0, + hasWorkflowVariables: !!workflowVariables && Object.keys(workflowVariables).length > 0, reasoningEffort, verbosity, }) @@ -103,6 +107,9 @@ export async function POST(request: NextRequest) { stream, messages, environmentVariables, + workflowVariables, + blockData, + blockNameMapping, reasoningEffort, verbosity, }) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/tool-input/components/code-editor/code-editor.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/tool-input/components/code-editor/code-editor.tsx index 9ab634e66af..1ea5279d579 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/tool-input/components/code-editor/code-editor.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/components/sub-block/components/tool-input/components/code-editor/code-editor.tsx @@ -18,6 +18,7 @@ interface CodeEditorProps { highlightVariables?: boolean onKeyDown?: (e: React.KeyboardEvent) => void disabled?: boolean + schemaParameters?: Array<{ name: string; type: string; description: string; required: boolean }> } export function CodeEditor({ @@ -30,6 +31,7 @@ export function CodeEditor({ highlightVariables = true, onKeyDown, disabled = false, + schemaParameters = [], }: CodeEditorProps) { const [code, setCode] = useState(value) const [visualLineHeights, setVisualLineHeights] = useState([]) @@ -120,25 +122,80 @@ export function CodeEditor({ // First, get the default Prism highlighting let highlighted = highlight(code, languages[language], language) - // Then, highlight environment variables with {{var_name}} syntax in blue - if (highlighted.includes('{{')) { - highlighted = highlighted.replace( - /\{\{([^}]+)\}\}/g, - '{{$1}}' - ) + // Collect all syntax highlights to apply in a single pass + type SyntaxHighlight = { + start: number + end: number + replacement: string } + const highlights: SyntaxHighlight[] = [] - // Also highlight tags with syntax in blue - if (highlighted.includes('<') && !language.includes('html')) { - highlighted = highlighted.replace(/<([^>\s/]+)>/g, (match, group) => { - // Avoid replacing HTML tags in comments - if (match.startsWith('