Skip to content

Commit a65d017

Browse files
chandrasekharan-zipstackclaudevishnuszipstack
authored
UN-3431 [FIX] Restore log_events_id on tool-run dispatch and persist without UI subscriber (#1960)
* UN-3431 [FIX] Restore log_events_id on tool-run dispatch and persist without UI subscriber structure_tool_task lost log_events_id from both the agentic_table and structure_pipeline ExecutionContexts during the agentic_table refactor; the executor shim therefore received an empty log_events_id and bailed before publishing anything. Tool-run lines stopped reaching the workflow logs UI for every dispatch through these paths. Two changes: - structure_tool_task: thread log_events_id into both ExecutionContexts. - executor_tool_shim.stream_log: gate only the PROGRESS path on log_events_id; the LOG payload now falls back to execution_id as the routing channel so logs persist to execution_log even when no websocket subscriber exists (API deployments). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * UN-3431 [MISC] Improve tool-run log narrative for workflow execution UI Reshape the per-run shim.stream_log emissions so the workflow logs UI reads as a per-phase narrative with one start, one end, and adapter identity surfaced exactly once per unique adapter: - Add a non-sensitive run-config preamble at the top of _handle_structure_pipeline: prompt count + single_pass / summarize / challenge flags. No prompt names or text are logged. - Introduce ExecutorToolShim.log_adapter_once(kind, adapter_id, adapter) with a per-shim dedup set so "Using LLM/Embedding/Vector DB: `<model>`" appears at most once per unique adapter id. Used from _initialize_adapters, _handle_index, and the summarize path. - Drop intermediate / redundant lines that did not add information on their own: "Initializing text extractor", "Using text extractor" (rolled into the start line), "Extracting text from document", "Saving extraction metadata", "Initialized embedding and vector DB adapters", "Indexing file", "Adding nodes to vector db". - Collapse the index-status trio ("Document already indexed, re-indexing" + "Indexing document for the first time" + "Indexing document into vector store") into a single "Indexing document" / "Re-indexing document" line driven by doc_id_found. - Gate "Retrieving context for" and "Retrieved N chunks via RAG for" on chunk_size > 0 so single-pass / full-context paths do not emit a misleading retrieval line. - Combine summarize start into one line that names the LLM model. - Wrap dynamic identifiers (adapter labels, extractor class, prompt names) in backticks; drop trailing "..." across all stream_log emissions. - Emit a final "Pipeline completed: N/M prompts answered" with a non-null count from structured_output[OUTPUT]. Pairs with the cloud-side log cleanup PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: vishnuszipstack <117254672+vishnuszipstack@users.noreply.github.com>
1 parent 9678d78 commit a65d017

4 files changed

Lines changed: 95 additions & 63 deletions

File tree

workers/executor/executor_tool_shim.py

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ def __init__(
9393
# silently swallowing every subsequent log line at DEBUG.
9494
self._progress_publish_failed = False
9595
self._log_publish_failed = False
96+
# Adapters whose name+model has already been surfaced to the UI;
97+
# later mentions skip repeating the model line.
98+
self._adapters_logged: set[str] = set()
9699
# Initialize StreamMixin. EXECUTION_BY_TOOL is not set in
97100
# the worker environment, so _exec_by_tool will be False.
98101
super().__init__(log_level=LogLevel.INFO)
@@ -161,41 +164,36 @@ def stream_log(
161164
if _levels.index(level) < _levels.index(self.log_level):
162165
return
163166

164-
# Publish progress to frontend via the log consumer queue.
165-
if not self.log_events_id:
166-
return
167-
168167
wf_level = _SDK_TO_WF_LEVEL.get(level, "INFO")
169168

170-
# PROGRESS payload — IDE prompt-card live updates only. Dropped at
171-
# the DB persist layer because LogPublisher.publish only stores
172-
# payloads whose `type == "LOG"`.
173-
try:
174-
progress_payload = LogPublisher.log_progress(
175-
component=self.component,
176-
level=wf_level,
177-
state=stage,
178-
message=log,
179-
)
180-
LogPublisher.publish(
181-
channel_id=self.log_events_id,
182-
payload=progress_payload,
183-
)
184-
except Exception:
185-
first_failure = not self._progress_publish_failed
186-
self._progress_publish_failed = True
187-
logger.log(
188-
logging.WARNING if first_failure else logging.DEBUG,
189-
"Failed to publish progress log (non-fatal)",
190-
exc_info=first_failure,
191-
)
192-
193-
# LOG payload — feeds workflow logs UI and persists to execution_log.
194-
# LogDataDTO validation requires `execution_id` and `organization_id`;
195-
# `file_execution_id` is optional.
169+
# PROGRESS payload routes via the websocket room; skip when absent.
170+
if self.log_events_id:
171+
try:
172+
progress_payload = LogPublisher.log_progress(
173+
component=self.component,
174+
level=wf_level,
175+
state=stage,
176+
message=log,
177+
)
178+
LogPublisher.publish(
179+
channel_id=self.log_events_id,
180+
payload=progress_payload,
181+
)
182+
except Exception:
183+
first_failure = not self._progress_publish_failed
184+
self._progress_publish_failed = True
185+
logger.log(
186+
logging.WARNING if first_failure else logging.DEBUG,
187+
"Failed to publish progress log (non-fatal)",
188+
exc_info=first_failure,
189+
)
190+
191+
# LOG payload persists to execution_log; falls back to execution_id
192+
# as the routing channel so it survives without a websocket subscriber.
196193
if not (self.execution_id and self.organization_id):
197194
return
198195

196+
log_channel = self.log_events_id or self.execution_id
199197
try:
200198
log_payload = LogPublisher.log_workflow(
201199
stage=stage,
@@ -206,7 +204,7 @@ def stream_log(
206204
organization_id=self.organization_id,
207205
)
208206
LogPublisher.publish(
209-
channel_id=self.log_events_id,
207+
channel_id=log_channel,
210208
payload=log_payload,
211209
)
212210
except Exception:
@@ -234,3 +232,26 @@ def stream_error_and_exit(self, message: str, err: Exception | None = None) -> N
234232
"""
235233
logger.error(message)
236234
raise SdkError(message, actual_err=err)
235+
236+
def log_adapter_once(
237+
self,
238+
kind: str,
239+
adapter_id: str,
240+
adapter: Any,
241+
) -> None:
242+
"""Surface adapter identity to the UI on first use only.
243+
244+
``kind`` is the human label ("LLM", "Embedding", "Vector DB").
245+
``adapter`` is an SDK instance — read only for non-sensitive
246+
identity (model name or adapter display name); ``adapter_id``
247+
is the dedup key. Subsequent calls for the same id are no-ops.
248+
"""
249+
if not adapter_id or adapter_id in self._adapters_logged:
250+
return
251+
self._adapters_logged.add(adapter_id)
252+
get_model = getattr(adapter, "get_model_name", None)
253+
if callable(get_model):
254+
label = get_model() or adapter_id
255+
else:
256+
label = getattr(adapter, "_adapter_name", "") or adapter_id
257+
self.stream_log(f"Using {kind}: `{label}`")

workers/executor/executors/index.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,6 @@ def perform_indexing(
155155
):
156156
return doc_id
157157

158-
self.tool.stream_log("Indexing file...")
159158
full_text = [
160159
{
161160
"section": "full",
@@ -171,7 +170,6 @@ def perform_indexing(
171170
def _trigger_indexing(self, vector_db: Any, documents: list) -> None:
172171
import openai
173172

174-
self.tool.stream_log("Adding nodes to vector db...")
175173
try:
176174
vector_db.index_document(
177175
documents,

workers/executor/executors/legacy_executor.py

Lines changed: 41 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from executor.executor_tool_shim import ExecutorToolShim
1414
from executor.executors.constants import ExecutionSource
1515
from executor.executors.constants import IndexingConstants as IKeys
16+
from executor.executors.constants import PromptServiceConstants as PSKeys
1617
from executor.executors.dto import (
1718
ChunkingConfig,
1819
FileInfo,
@@ -242,15 +243,15 @@ def _handle_extract(self, context: ExecutionContext) -> ExecutionResult:
242243
Path(file_path).name,
243244
context.run_id,
244245
)
245-
shim.stream_log("Initializing text extractor...")
246-
shim.stream_log(f"Using text extractor: {type(x2text.x2text_instance).__name__}")
247-
246+
extractor_name = type(x2text.x2text_instance).__name__
248247
try:
249-
shim.stream_log("Extracting text from document...")
248+
shim.stream_log(
249+
f"Extracting text using `{extractor_name}`"
250+
+ (" (with highlight)" if enable_highlight else "")
251+
)
250252
if enable_highlight and isinstance(
251253
x2text.x2text_instance, (LLMWhisperer, LLMWhispererV2)
252254
):
253-
shim.stream_log("Extracting text with highlight support enabled...")
254255
process_response: TextExtractionResult = x2text.process(
255256
input_file_path=file_path,
256257
output_file_path=output_file_path,
@@ -301,7 +302,6 @@ def _handle_extract(self, context: ExecutionContext) -> ExecutionResult:
301302
process_response.extraction_metadata
302303
and process_response.extraction_metadata.line_metadata
303304
):
304-
shim.stream_log("Saving extraction metadata...")
305305
result_data["highlight_metadata"] = (
306306
process_response.extraction_metadata.line_metadata
307307
)
@@ -605,12 +605,23 @@ def _failure(child_result: ExecutionResult) -> ExecutionResult:
605605
shim = self._build_shim(
606606
platform_api_key=extract_params.get("platform_api_key", ""),
607607
)
608+
609+
# One-shot run-config line — non-sensitive flags only; adapter
610+
# identities are emitted inline on first use with full model info.
611+
tool_settings = answer_params.get(PSKeys.TOOL_SETTINGS, {})
612+
outputs = answer_params.get(PSKeys.OUTPUTS, [])
613+
shim.stream_log(
614+
f"Run config: prompts={len(outputs)} | "
615+
f"single_pass={'on' if is_single_pass else 'off'} | "
616+
f"summarize={'on' if is_summarization else 'off'} | "
617+
f"challenge="
618+
f"{'on' if tool_settings.get(PSKeys.ENABLE_CHALLENGE) else 'off'}"
619+
)
608620
step = 1
609621

610622
try:
611623
# ---- Step 1: Extract ----
612624
if not skip_extraction:
613-
shim.stream_log(f"Pipeline step {step}: Extracting text from document...")
614625
step += 1
615626
extract_ctx = ExecutionContext(
616627
executor_name=context.executor_name,
@@ -632,7 +643,6 @@ def _failure(child_result: ExecutionResult) -> ExecutionResult:
632643

633644
# ---- Step 2: Summarize (if enabled) ----
634645
if is_summarization:
635-
shim.stream_log(f"Pipeline step {step}: Summarizing extracted text...")
636646
step += 1
637647
summarize_result = self._run_pipeline_summarize(
638648
context=context,
@@ -648,9 +658,6 @@ def _failure(child_result: ExecutionResult) -> ExecutionResult:
648658
answer_params["file_path"] = input_file_path
649659
elif not is_single_pass:
650660
# ---- Step 3: Index per output with dedup ----
651-
shim.stream_log(
652-
f"Pipeline step {step}: Indexing document into vector store..."
653-
)
654661
step += 1
655662
index_metrics = self._run_pipeline_index(
656663
context=context,
@@ -693,7 +700,9 @@ def _failure(child_result: ExecutionResult) -> ExecutionResult:
693700
index_metrics=index_metrics,
694701
)
695702

696-
shim.stream_log("Pipeline completed successfully")
703+
output_map = structured_output.get(PSKeys.OUTPUT, {}) or {}
704+
answered = sum(1 for v in output_map.values() if v not in (None, "", [], {}))
705+
shim.stream_log(f"Pipeline completed: {answered}/{len(outputs)} prompts answered")
697706
out_metadata = {
698707
k: v
699708
for k, v in (answer_result.metadata or {}).items()
@@ -728,12 +737,9 @@ def _run_pipeline_answer_step(
728737
output["chunk-size"] = 0
729738
output["chunk-overlap"] = 0
730739
operation = Operation.SINGLE_PASS_EXTRACTION.value
731-
mode_label = "single pass"
732740
else:
733741
operation = Operation.ANSWER_PROMPT.value
734-
mode_label = "prompt"
735742

736-
shim.stream_log(f"Pipeline step {step}: Running {mode_label} execution...")
737743
answer_ctx = ExecutionContext(
738744
executor_name=context.executor_name,
739745
operation=operation,
@@ -1075,8 +1081,6 @@ def _handle_index(self, context: ExecutionContext) -> ExecutionResult:
10751081
Path(file_path).name,
10761082
context.run_id,
10771083
)
1078-
shim.stream_log("Initializing indexing pipeline...")
1079-
10801084
# Skip indexing when chunk_size is 0 — no vector operations needed.
10811085
# ChunkingConfig raises ValueError for 0, so handle before DTO.
10821086
if chunk_size == 0:
@@ -1117,7 +1121,6 @@ def _handle_index(self, context: ExecutionContext) -> ExecutionResult:
11171121
)
11181122
doc_id = index.generate_index_key(file_info=file_info, fs=fs_instance)
11191123
logger.debug("Generated index key: doc_id=%s", doc_id)
1120-
shim.stream_log("Checking document index status...")
11211124

11221125
embedding = embedding_compat(
11231126
adapter_instance_id=embedding_instance_id,
@@ -1129,7 +1132,8 @@ def _handle_index(self, context: ExecutionContext) -> ExecutionResult:
11291132
adapter_instance_id=vector_db_instance_id,
11301133
embedding=embedding,
11311134
)
1132-
shim.stream_log("Initialized embedding and vector DB adapters")
1135+
shim.log_adapter_once("Embedding", embedding_instance_id, embedding)
1136+
shim.log_adapter_once("Vector DB", vector_db_instance_id, vector_db)
11331137

11341138
doc_id_found = index.is_document_indexed(
11351139
doc_id=doc_id, embedding=embedding, vector_db=vector_db
@@ -1150,11 +1154,9 @@ def _handle_index(self, context: ExecutionContext) -> ExecutionResult:
11501154
)
11511155
return ExecutionResult(success=True, data={IKeys.DOC_ID: doc_id})
11521156

1153-
if doc_id_found and reindex:
1154-
shim.stream_log("Document already indexed, re-indexing...")
1155-
else:
1156-
shim.stream_log("Indexing document for the first time...")
1157-
shim.stream_log("Indexing document into vector store...")
1157+
shim.stream_log(
1158+
"Re-indexing document" if doc_id_found else "Indexing document"
1159+
)
11581160
index.perform_indexing(
11591161
vector_db=vector_db,
11601162
doc_id=doc_id,
@@ -1689,7 +1691,8 @@ def _execute_single_prompt(
16891691
retrieval_strategy = output.get(PSKeys.RETRIEVAL_STRATEGY)
16901692
valid_strategies = {s.value for s in RetrievalStrategy}
16911693
if retrieval_strategy in valid_strategies:
1692-
shim.stream_log(f"Retrieving context for: `{prompt_name}`")
1694+
if chunk_size > 0:
1695+
shim.stream_log(f"Retrieving context for: `{prompt_name}`")
16931696
logger.info(
16941697
"Performing retrieval: prompt=%s strategy=%s chunk_size=%d",
16951698
prompt_name,
@@ -1713,9 +1716,11 @@ def _execute_single_prompt(
17131716
context_retrieval_metrics=context_retrieval_metrics,
17141717
)
17151718
metadata[PSKeys.CONTEXT][prompt_name] = context_list
1716-
shim.stream_log(
1717-
f"Retrieved {len(context_list)} context chunks for: `{prompt_name}`"
1718-
)
1719+
if chunk_size > 0:
1720+
shim.stream_log(
1721+
f"Retrieved {len(context_list)} chunks via RAG "
1722+
f"for `{prompt_name}`"
1723+
)
17191724
logger.debug(
17201725
"Retrieved %d context chunks for prompt: %s",
17211726
len(context_list),
@@ -1861,6 +1866,11 @@ def _init_llm_and_retrieval(
18611866
adapter_instance_id=output[PSKeys.VECTOR_DB],
18621867
embedding=embedding,
18631868
)
1869+
shim.log_adapter_once("LLM", output[PSKeys.LLM], llm)
1870+
if embedding is not None:
1871+
shim.log_adapter_once("Embedding", output[PSKeys.EMBEDDING], embedding)
1872+
if vector_db is not None:
1873+
shim.log_adapter_once("Vector DB", output[PSKeys.VECTOR_DB], vector_db)
18641874
shim.stream_log(
18651875
f"Initialized LLM and retrieval adapters for: `{prompt_name}`"
18661876
)
@@ -2274,7 +2284,6 @@ def _handle_summarize(self, context: ExecutionContext) -> ExecutionResult:
22742284

22752285
_, _, _, _, llm_cls, _, _ = self._get_prompt_deps()
22762286

2277-
shim.stream_log("Initializing LLM for summarization...")
22782287
llm: Any = None
22792288
try:
22802289
llm = llm_cls(
@@ -2286,7 +2295,9 @@ def _handle_summarize(self, context: ExecutionContext) -> ExecutionResult:
22862295
AnswerPromptService as answer_prompt_svc,
22872296
)
22882297

2289-
shim.stream_log("Running document summarization...")
2298+
shim.stream_log(
2299+
f"Summarizing extracted text using LLM: `{llm.get_model_name()}`"
2300+
)
22902301
summary = answer_prompt_svc.run_completion(llm=llm, prompt=prompt)
22912302
records = list(llm.flush_pending_usage())
22922303
logger.info("Summarization completed: run_id=%s", context.run_id)

workers/file_processing/structure_tool_task.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,7 @@ def _execute_structure_tool_impl(params: dict) -> dict:
459459
execution_source="tool",
460460
organization_id=organization_id,
461461
request_id=file_execution_id,
462+
log_events_id=log_events_id,
462463
execution_id=execution_id,
463464
file_execution_id=file_execution_id,
464465
executor_params=agentic_params,
@@ -490,6 +491,7 @@ def _execute_structure_tool_impl(params: dict) -> dict:
490491
execution_source="tool",
491492
organization_id=organization_id,
492493
request_id=file_execution_id,
494+
log_events_id=log_events_id,
493495
execution_id=execution_id,
494496
file_execution_id=file_execution_id,
495497
executor_params={

0 commit comments

Comments
 (0)