Skip to content

Commit 4a6051f

Browse files
committed
externalize before compaction in fallback'
1 parent 6322544 commit 4a6051f

2 files changed

Lines changed: 50 additions & 32 deletions

File tree

apps/sim/lib/integrations/integrations.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -721,7 +721,7 @@
721721
"slug": "apify",
722722
"name": "Apify",
723723
"description": "Run Apify actors and retrieve results",
724-
"longDescription": "Integrate Apify into your workflow. Run any Apify actor with custom input and retrieve results. Supports both synchronous and asynchronous execution with automatic dataset fetching.",
724+
"longDescription": "Integrate Apify into your workflow. Run any Apify actor or saved task with custom input, fetch dataset items, and check run status. Supports both synchronous and asynchronous execution with automatic dataset fetching.",
725725
"bgColor": "#FFFFFF",
726726
"iconName": "ApifyIcon",
727727
"docsUrl": "https://docs.sim.ai/tools/apify",
@@ -2527,7 +2527,7 @@
25272527
"triggerCount": 0,
25282528
"authType": "none",
25292529
"category": "tools",
2530-
"integrationTypes": ["databases", "analytics"],
2530+
"integrationType": "databases",
25312531
"tags": ["data-warehouse", "data-analytics"]
25322532
},
25332533
{

apps/sim/lib/logs/execution/logger.ts

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ import {
3434
} from '@/lib/execution/payloads/large-value-metadata'
3535
import { emitWorkflowExecutionCompleted } from '@/lib/logs/events'
3636
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
37-
import { externalizeExecutionData, stripSpanCosts } from '@/lib/logs/execution/trace-store'
37+
import {
38+
externalizeExecutionData,
39+
stripSpanCosts,
40+
TRACE_STORE_REF_KEY,
41+
} from '@/lib/logs/execution/trace-store'
3842
import type {
3943
BlockOutputData,
4044
ExecutionEnvironment,
@@ -686,31 +690,30 @@ export class ExecutionLogger implements IExecutionLoggerService {
686690
models: costSummary.models,
687691
}
688692

689-
const boundedExecutionData = this.compactExecutionDataForStorage(
690-
this.buildCompletedExecutionData({
691-
existingExecutionData,
692-
traceSpans: mergedTraceSpans,
693-
finalOutput,
694-
finalizationPath,
695-
completionFailure,
696-
executionCost,
697-
executionState,
698-
workflowInput,
699-
}),
700-
executionId
701-
)
693+
const builtExecutionData = this.buildCompletedExecutionData({
694+
existingExecutionData,
695+
traceSpans: mergedTraceSpans,
696+
finalOutput,
697+
finalizationPath,
698+
completionFailure,
699+
executionCost,
700+
executionState,
701+
workflowInput,
702+
})
702703

704+
// Extract files from the full (pre-summarization) payload so large runs keep
705+
// every file reference even when the inline fallback later summarizes spans.
703706
const executionFiles = this.extractFilesFromExecution(
704-
boundedExecutionData.traceSpans,
705-
boundedExecutionData.finalOutput,
706-
boundedExecutionData.workflowInput
707+
builtExecutionData.traceSpans,
708+
builtExecutionData.finalOutput,
709+
builtExecutionData.workflowInput
707710
)
708711

709-
const filteredTraceSpans = filterForDisplay(boundedExecutionData.traceSpans)
710-
const filteredFinalOutput = filterForDisplay(boundedExecutionData.finalOutput)
712+
const filteredTraceSpans = filterForDisplay(builtExecutionData.traceSpans)
713+
const filteredFinalOutput = filterForDisplay(builtExecutionData.finalOutput)
711714
const filteredWorkflowInput =
712-
boundedExecutionData.workflowInput !== undefined
713-
? filterForDisplay(boundedExecutionData.workflowInput)
715+
builtExecutionData.workflowInput !== undefined
716+
? filterForDisplay(builtExecutionData.workflowInput)
714717
: undefined
715718
const redactedTraceSpans = redactApiKeys(filteredTraceSpans)
716719
const redactedFinalOutput = redactApiKeys(filteredFinalOutput)
@@ -726,22 +729,30 @@ export class ExecutionLogger implements IExecutionLoggerService {
726729
? Math.max(0, Math.round(rawDurationMs))
727730
: 0
728731

732+
const cleanExecutionData: ExecutionData = {
733+
...builtExecutionData,
734+
traceSpans: redactedTraceSpans,
735+
finalOutput: redactedFinalOutput,
736+
...(redactedWorkflowInput !== undefined ? { workflowInput: redactedWorkflowInput } : {}),
737+
}
738+
739+
stripSpanCosts((cleanExecutionData as Record<string, unknown>).traceSpans)
740+
741+
// Bounded in-memory form. Returned to callers (notification delivery/events)
742+
// and reused as the inline-storage fallback below. This is a no-op for
743+
// payloads already within MAX_EXECUTION_DATA_BYTES.
729744
const completedExecutionData = this.compactExecutionDataForStorage(
730-
{
731-
...boundedExecutionData,
732-
traceSpans: redactedTraceSpans,
733-
finalOutput: redactedFinalOutput,
734-
...(redactedWorkflowInput !== undefined ? { workflowInput: redactedWorkflowInput } : {}),
735-
},
745+
cleanExecutionData,
736746
executionId
737747
)
738748

739-
stripSpanCosts((completedExecutionData as Record<string, unknown>).traceSpans)
740-
749+
// Persist the FULL payload to object storage first: blob storage holds up to
750+
// MAX_DURABLE_LARGE_VALUE_BYTES (64MB), far above the inline row budget, so
751+
// externalized logs keep full-fidelity trace IO instead of being summarized.
741752
// Externalization requires the execution owner (workspace_files.user_id is
742753
// NOT NULL). billingUserId comes from environment.userId and is effectively
743754
// always present for a real run; if it's somehow absent, keep data inline.
744-
let storedExecutionData = completedExecutionData as Record<string, unknown>
755+
let storedExecutionData = cleanExecutionData as Record<string, unknown>
745756
if (billingUserId) {
746757
storedExecutionData = await externalizeExecutionData(storedExecutionData, {
747758
workspaceId: existingLog?.workspaceId ?? null,
@@ -754,6 +765,13 @@ export class ExecutionLogger implements IExecutionLoggerService {
754765
executionId,
755766
})
756767
}
768+
769+
// A successful externalization returns a slim row carrying TRACE_STORE_REF_KEY.
770+
// Only when externalization was skipped or fell back to inline do we compact
771+
// the payload to keep the Postgres row within MAX_EXECUTION_DATA_BYTES.
772+
if (!(TRACE_STORE_REF_KEY in storedExecutionData)) {
773+
storedExecutionData = completedExecutionData as Record<string, unknown>
774+
}
757775
const completedExecutionLargeValueKeys = collectLargeValueReferenceKeys(storedExecutionData)
758776

759777
const updatedLog = await db.transaction(async (tx) => {

0 commit comments

Comments
 (0)