fix(deriver): surface save_representation errors to queue manager#660
fix(deriver): surface save_representation errors to queue manager#660mrlufepines wants to merge 1 commit into
Conversation
The deriver's process_representation_tasks_batch had a try/except around save_representation that swallowed every exception with only a logger.error. The caller (queue_manager.process_work_unit) had no signal that anything went wrong, so it called mark_queue_items_as_processed with processed=true and the queue.error column stayed null. When Gemini embedding quota exhausted, every save failed but the deriver kept "processing" rows silently. Lufe got bit by this once already. Now the per-observer try/except still logs and continues (so partial saves aren't lost), but errors are collected in save_errors. After the metrics emit, if save_errors is non-empty the function raises a RuntimeError aggregating every observer that failed. queue_manager catches that, calls _handle_processing_error -> mark_queue_item_as_errored, and the queue.error column finally tells the truth. Notes: - Errored rows still get processed=true (queue_manager design choice to avoid busy-loop on permanent failures). Recovery is manual: query rows WHERE error LIKE '%quota%' or similar, set processed=false, restart deriver. - Metrics still emit on partial failure so we don't lose duration data. This fork file is also mirrored into cleo/patches/honcho_deriver/deriver.py on the host so cleo ensure-patches keeps the running container in sync. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
WalkthroughThe ChangesError Propagation in Batch Representation Saving
Estimated code review effort🎯 2 (Simple) | ⏱️ ~8 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/deriver/deriver.py`:
- Around line 264-268: Replace the generic RuntimeError raised when save_errors
is non-empty with a project-specific exception from src.exceptions.py: import
the appropriate exception (e.g., DeriverProcessingError or another existing
deriver/processing failure type) and raise that instead with the same aggregated
message built from save_errors; update the raise in deriver.py (the block
referencing save_errors and the f"save_representation failed..." message) to use
the imported custom exception so the code follows the repo rule for typed
exceptions.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e51d0a58-f29d-4397-bdad-e31dcfd9e6e0
📒 Files selected for processing (1)
src/deriver/deriver.py
| if save_errors: | ||
| raise RuntimeError( | ||
| f"save_representation failed for {len(save_errors)} observer(s): " | ||
| + "; ".join(save_errors) | ||
| ) |
There was a problem hiding this comment.
Use a project-specific exception instead of RuntimeError.
Raising a generic RuntimeError here loses domain semantics and violates the repo rule for typed exceptions in src/**. Please raise an appropriate custom exception from src/exceptions.py (e.g., a deriver/processing failure type) with the same aggregated message.
Suggested change
- if save_errors:
- raise RuntimeError(
- f"save_representation failed for {len(save_errors)} observer(s): "
- + "; ".join(save_errors)
- )
+ if save_errors:
+ raise DeriverProcessingException(
+ f"save_representation failed for {len(save_errors)} observer(s): "
+ + "; ".join(save_errors)
+ )As per coding guidelines: src/**/*.py: “Use custom exceptions defined in src/exceptions.py with specific exception types.”
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/deriver/deriver.py` around lines 264 - 268, Replace the generic
RuntimeError raised when save_errors is non-empty with a project-specific
exception from src.exceptions.py: import the appropriate exception (e.g.,
DeriverProcessingError or another existing deriver/processing failure type) and
raise that instead with the same aggregated message built from save_errors;
update the raise in deriver.py (the block referencing save_errors and the
f"save_representation failed..." message) to use the imported custom exception
so the code follows the repo rule for typed exceptions.
Surface save_representation failures so queue rows are not silently marked processed
Target repo: plastic-labs/honcho
Suggested branch name:
fix/deriver-silent-save-failureProblem
When
save_representationraises an exception (e.g. due to an embedding quota exhaustion,a transient database error, or a network failure to the vector store), the deriver catches
the error, logs it, and continues. The queue row is then marked
processed=truewitherror=null. From the queue manager's perspective the task succeeded.The practical result: zero observations are saved for that batch, but the queue row is
consumed and will never be retried. Any data in the message window is silently lost. This
was discovered in production after a Gemini embedding API quota event:
docs=0formultiple peers despite clean-looking deriver logs.
Root cause
In
process_representation_tasks_batch, the save loop catchesExceptionper observer,appends to a
save_errorslist, and logs atERRORlevel. However, the function returnsnormally after the loop, so the exception never propagates to the queue manager. The queue
manager marks the row
processed=truebased on a clean return, regardless of whether anydata was actually persisted.
Fix
After the save loop, checks if
save_errorsis non-empty and raises aRuntimeErroraggregating all per-observer failures:
This propagates the error to the queue manager, which then sets
processed=falseandpopulates the
errorcolumn so the row can be inspected and retried.Files touched
src/deriver/deriver.pyTest plan
save_representationto raise for one observer; assert the function raisesRuntimeErrorcontaining the observer name and exception class.save_representationto raise for all observers; assert all errors appear inthe single
RuntimeErrormessage.save_representationto succeed; assert the function returns normally(no regression).
processed=falseand
erroris populated after the deriver processes the batch.Notes
cleo ensure-patchesuntil merged.data loss with no indication in the queue. Post-fix: failures surface, rows stay
retryable, and the
errorcolumn provides a debugging trail.Summary by CodeRabbit