feat(nodes): improve Milvus vector DB node — address all TODOs#562
Conversation
…screen Handle TASK_STATE.STOPPING in the control button to show "Stopping..." with a disabled state and distinct orange styling, preventing duplicate clicks and giving immediate visual feedback during pipeline shutdown. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Add configurable timeout (default 60s) replacing hardcoded timeout=20, read from node config via 'timeout' key (TODO line 101, 483) - Add connection error handling with meaningful failure messages instead of raw pymilvus exceptions propagating - Implement bulk insert with configurable batch size (default 50) for addChunks(), replacing one-at-a-time upserts (TODO lines 449, 464) - Add _batchUpsertResults() helper to batch-update markDeleted/markActive operations, eliminating the per-vector upsert loop bottleneck (TODO lines 514-515, 546-547) - Add timeout parameter to remove() delete call (TODO line 483) - Document Milvus COSINE distance score range [0,2] rescaling to [0,1] for codebase consistency (TODO line 253) - Fix typos in docstrings and comments Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
📝 WalkthroughWalkthroughOptimizes Milvus integration by introducing batched upsert operations, configurable timeouts, and improved error handling. Changes focus on bulk insert performance, connection robustness, and standardized field queries. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
The PageStatus changes belong in a separate PR (rocketride-org#549) and were accidentally included here.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@nodes/src/nodes/milvus/milvus.py`:
- Around line 111-123: The connection logic in the constructor always falls
through to the branch that prepends "https://" because self.host was already
stripped of any scheme; remove the dead startswith('http')/startswith('https')
check in the block that builds the MilvusClient URI and instead consistently
construct the URI (e.g., uri=f'https://{self.host}' when profile != 'local' and
uri=f'http://{self.host}:{self.port}' for local), and when re-raising the
failure convert raise Exception(f'Failed to connect to Milvus at {self.host}:
{e}') to use exception chaining (raise Exception(... ) from e) so the original
traceback is preserved; update the code that instantiates MilvusClient (the
self.client assignment) and the except handler accordingly.
- Around line 528-549: The three Milvus query calls that feed
_batchUpsertResults (used by markDeleted/markActive) and the query in
renderChunks are missing output_fields and thus return only the primary key; fix
each query call to include output_fields so full records are returned: for the
queries whose results are later upserted (the ones passing into
_batchUpsertResults/used by markDeleted and markActive) add
output_fields=['id','vector','content','meta'] (or at minimum
['vector','content','meta']) so _batchUpsertResults upserts complete documents,
and for the query in renderChunks add output_fields=['meta','content']
(including any field that contains chunkId if it's stored outside meta) to avoid
KeyError when accessing point['content'] and point['chunkId'] in renderChunks.
Ensure these are added to the Milvus client.query(...) calls that feed
_batchUpsertResults and renderChunks.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 51b827e6-948b-4944-bafa-6c6df22a4427
📒 Files selected for processing (1)
nodes/src/nodes/milvus/milvus.py
stepmikhaylov
left a comment
There was a problem hiding this comment.
Thank you for addressing these todo-s. CodeRabbit detected a critical issue that seems to be very relevant. It's a significant improvement for our Milvus node, and it would be helpful if you could address it so that we can merge these changes.
- Remove dead protocol check (host already stripped of scheme at init) - Add exception chaining with 'from e' for connection errors (B904) - Add output_fields to markDeleted/markActive queries to prevent data loss during upsert (was only returning primary key) - Add output_fields to renderChunks query to prevent KeyError on content/chunkId access Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
nodes/src/nodes/milvus/milvus.py (3)
453-497:⚠️ Potential issue | 🟡 MinorPartial failure can leave data in inconsistent state.
Old chunks are deleted at line 459 before new chunks are inserted. If an exception occurs during any batch upsert (lines 470-475), some batches succeed while others don't, leaving the collection in an inconsistent state with partial data for those
objectIds.Consider wrapping the entire operation in a transactional pattern or implementing rollback logic. Alternatively, insert new chunks first with a temporary flag, then delete old chunks, then update the flag—though this adds complexity.
This is an existing concern amplified by batching. For now, documenting this limitation may be sufficient.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@nodes/src/nodes/milvus/milvus.py` around lines 453 - 497, The current flow deletes old chunks via self.client.delete (filter_condition) before inserting new data with flush_batch()/self.client.upsert, which can leave the collection partially updated if upserts fail; to fix, implement an atomic/rollback-safe pattern: either 1) read and persist a backup of the deleted entities (fetch current chunks for the objectIds before calling self.client.delete) so you can restore them on upsert failure, or 2) change ordering to upsert new chunks with a temporary marker/meta flag (use the same batch/upsert logic in flush_batch), then only after all upserts succeed delete old chunks and finally remove the temporary flag, or 3) upsert into a temporary collection and swap/rename on success; modify the code around the delete call and the flush_batch/upsert logic (functions/methods: the block that builds filter_condition, self.client.delete, flush_batch, and where tmp_struct is created) to implement one of these strategies and ensure errors during upsert trigger restore/cleanup logic.
626-637:⚠️ Potential issue | 🟠 MajorLine 637 will raise
KeyError:chunkIdis inside themetaobject.The query at line 626 returns
output_fields=['meta', 'content'], so results containmetaas a nested object. Throughout the file (lines 229, 233, 237, 412, 624),chunkIdis consistently accessed asmeta['chunkId']in filter expressions, confirming it is a field within themetaJSON object, not at the top level. Line 637 must be changed frompoint['chunkId']topoint['meta']['chunkId'].Required fix
for point in results: content = point['content'] - chunk = point['chunkId'] + chunk = point['meta']['chunkId'] index = chunk - offset🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@nodes/src/nodes/milvus/milvus.py` around lines 626 - 637, The for-loop processing Milvus query results uses point['chunkId'] but the query requested output_fields=['meta','content'], so chunkId lives under meta; update the loop in the block handling results (after self.client.query(...) and where variables text, lastIndex are set and the loop "for point in results" runs) to read the chunk id as point['meta']['chunkId'] (and optionally guard against missing meta to avoid KeyError) instead of point['chunkId'] so indexing succeeds when building the text array for renderChunkSize.
264-270:⚠️ Potential issue | 🔴 CriticalFix the incorrect COSINE distance documentation.
The inline comment incorrectly states Milvus COSINE returns
[0, 2]with0meaning identical. According to Milvus documentation, COSINE actually returns[-1, 1]where1is identical and-1is opposite. The formula(distance + 1) / 2is correct and properly maps[-1, 1]to[0, 1], but the comment describing this transformation must be corrected to reflect the actual distance range and semantics.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@nodes/src/nodes/milvus/milvus.py` around lines 264 - 270, The inline comment near the COSINE handling (the block checking self.similarity == 'COSINE' and the score = (point.get('distance') + 1) / 2 calculation) is incorrect; update it to state that Milvus COSINE returns values in the range [-1, 1] where 1 is identical and -1 is opposite, and note that the formula maps [-1, 1] to [0, 1] with 1 meaning most similar to stay consistent with the codebase scoring convention.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@nodes/src/nodes/milvus/milvus.py`:
- Around line 98-100: Validate and coerce the configured timeout and
bulkInsertBatchSize to positive integers when assigning self.timeout and
self.bulkInsertBatchSize: convert values from config to int, ensure they are >
0, and if not either clamp to DEFAULT_TIMEOUT / DEFAULT_BULK_INSERT_BATCH_SIZE
or raise a clear ValueError (and/or log a warning) so the batching logic that
checks len(batch) >= self.bulkInsertBatchSize behaves correctly; update the
initialization that sets self.timeout and self.bulkInsertBatchSize to perform
this validation and fallback logic.
- Around line 525-546: The _batchUpsertResults method should take isDeleted as a
keyword-only parameter and add error handling around each upsert call: change
the signature to make isDeleted keyword-only (e.g., def
_batchUpsertResults(self, results: List[dict], *, isDeleted: bool) -> None) so
callers must use isDeleted=..., and wrap each
self.client.upsert(collection_name=self.collection, data=batch) inside a
try/except that logs the exception (use the same logger pattern as flush_batch
in addChunks) and either retries or re-raises after logging to avoid silent
partial updates; ensure you update references to _batchUpsertResults call sites
accordingly.
---
Outside diff comments:
In `@nodes/src/nodes/milvus/milvus.py`:
- Around line 453-497: The current flow deletes old chunks via
self.client.delete (filter_condition) before inserting new data with
flush_batch()/self.client.upsert, which can leave the collection partially
updated if upserts fail; to fix, implement an atomic/rollback-safe pattern:
either 1) read and persist a backup of the deleted entities (fetch current
chunks for the objectIds before calling self.client.delete) so you can restore
them on upsert failure, or 2) change ordering to upsert new chunks with a
temporary marker/meta flag (use the same batch/upsert logic in flush_batch),
then only after all upserts succeed delete old chunks and finally remove the
temporary flag, or 3) upsert into a temporary collection and swap/rename on
success; modify the code around the delete call and the flush_batch/upsert logic
(functions/methods: the block that builds filter_condition, self.client.delete,
flush_batch, and where tmp_struct is created) to implement one of these
strategies and ensure errors during upsert trigger restore/cleanup logic.
- Around line 626-637: The for-loop processing Milvus query results uses
point['chunkId'] but the query requested output_fields=['meta','content'], so
chunkId lives under meta; update the loop in the block handling results (after
self.client.query(...) and where variables text, lastIndex are set and the loop
"for point in results" runs) to read the chunk id as point['meta']['chunkId']
(and optionally guard against missing meta to avoid KeyError) instead of
point['chunkId'] so indexing succeeds when building the text array for
renderChunkSize.
- Around line 264-270: The inline comment near the COSINE handling (the block
checking self.similarity == 'COSINE' and the score = (point.get('distance') + 1)
/ 2 calculation) is incorrect; update it to state that Milvus COSINE returns
values in the range [-1, 1] where 1 is identical and -1 is opposite, and note
that the formula maps [-1, 1] to [0, 1] with 1 meaning most similar to stay
consistent with the codebase scoring convention.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: abacd5cf-3989-42a5-9cb4-9397ba9a37b3
📒 Files selected for processing (3)
apps/vscode/src/providers/views/PageStatus/PageStatus.tsxapps/vscode/src/providers/views/PageStatus/styles.cssnodes/src/nodes/milvus/milvus.py
…-org#562 - Remove unrelated PageStatus changes that were re-introduced - Validate timeout and bulkInsertBatchSize to ensure positive values - Make isDeleted a keyword-only argument in _batchUpsertResults Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Addressed all review feedback:
Thanks for the review! |
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
nodes/src/nodes/milvus/milvus.py (1)
525-546:⚠️ Potential issue | 🟡 MinorAdd error handling around upsert calls for consistency with
flush_batch.The
flush_batchfunction inaddChunkswraps upsert calls in try/except with logging before re-raise. This method lacks equivalent handling—if an upsert fails mid-batch, there's no diagnostic logging before the exception propagates.Proposed fix
def _batchUpsertResults(self, results: List[dict], *, isDeleted: bool) -> None: """ Batch-update the isDeleted metadata field on a list of query results. Collects results into batches of bulkInsertBatchSize and upserts them together, avoiding the performance bottleneck of one-at-a-time upserts. """ batch: List[dict] = [] for result in results: meta = result.get('meta', {}) meta['isDeleted'] = isDeleted result['meta'] = meta batch.append(result) if len(batch) >= self.bulkInsertBatchSize: - self.client.upsert(collection_name=self.collection, data=batch) + try: + self.client.upsert(collection_name=self.collection, data=batch) + except Exception as e: + engLib.debug(f'Error during batch upsert ({len(batch)} results): {e}') + raise batch = [] # Flush remaining if batch: - self.client.upsert(collection_name=self.collection, data=batch) + try: + self.client.upsert(collection_name=self.collection, data=batch) + except Exception as e: + engLib.debug(f'Error during batch upsert ({len(batch)} results): {e}') + raise🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@nodes/src/nodes/milvus/milvus.py` around lines 525 - 546, _batchUpsertResults currently calls self.client.upsert directly and lacks the try/except logging that flush_batch (used in addChunks) provides; wrap each upsert call (both inside the batch-full branch and the final flush) in a try/except that catches Exception, logs a descriptive error including the collection name (self.collection), batch size/contents or len(batch), and the exception (use the same logger used by flush_batch/addChunks), then re-raise the exception so behavior remains the same but with diagnostic logging.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@nodes/src/nodes/milvus/milvus.py`:
- Around line 466-476: The nested function flush_batch is missing a return type
annotation (Ruff ANN202); update its definition in milvus.py to add an explicit
return type of None for flush_batch (i.e., annotate the function signature for
the nested def flush_batch() to -> None) and ensure the rest of the body
unchanged.
- Around line 98-100: The code sets self.timeout and self.bulkInsertBatchSize
from config using DEFAULT_TIMEOUT and DEFAULT_BULK_INSERT_BATCH_SIZE but those
keys are not present in the services.json schema, so add JSON schema entries for
"timeout" and "bulkInsertBatchSize" to services.json (with appropriate types,
minimum 1, and default values matching DEFAULT_TIMEOUT and
DEFAULT_BULK_INSERT_BATCH_SIZE) so users can configure them via the UI;
alternatively, if you intend to keep them non-configurable, add a clarifying
comment next to the assignments in milvus.py referencing DEFAULT_TIMEOUT and
DEFAULT_BULK_INSERT_BATCH_SIZE to document that these values are intentionally
not exposed.
---
Duplicate comments:
In `@nodes/src/nodes/milvus/milvus.py`:
- Around line 525-546: _batchUpsertResults currently calls self.client.upsert
directly and lacks the try/except logging that flush_batch (used in addChunks)
provides; wrap each upsert call (both inside the batch-full branch and the final
flush) in a try/except that catches Exception, logs a descriptive error
including the collection name (self.collection), batch size/contents or
len(batch), and the exception (use the same logger used by
flush_batch/addChunks), then re-raise the exception so behavior remains the same
but with diagnostic logging.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: fc9cd903-fc58-4eec-a226-c3a0148c28d4
📒 Files selected for processing (1)
nodes/src/nodes/milvus/milvus.py
* feat(vscode): improve stop button feedback in Pipeline Observability screen Handle TASK_STATE.STOPPING in the control button to show "Stopping..." with a disabled state and distinct orange styling, preventing duplicate clicks and giving immediate visual feedback during pipeline shutdown. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(nodes): improve Milvus vector DB node — address all TODOs - Add configurable timeout (default 60s) replacing hardcoded timeout=20, read from node config via 'timeout' key (TODO line 101, 483) - Add connection error handling with meaningful failure messages instead of raw pymilvus exceptions propagating - Implement bulk insert with configurable batch size (default 50) for addChunks(), replacing one-at-a-time upserts (TODO lines 449, 464) - Add _batchUpsertResults() helper to batch-update markDeleted/markActive operations, eliminating the per-vector upsert loop bottleneck (TODO lines 514-515, 546-547) - Add timeout parameter to remove() delete call (TODO line 483) - Document Milvus COSINE distance score range [0,2] rescaling to [0,1] for codebase consistency (TODO line 253) - Fix typos in docstrings and comments Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: remove unrelated PageStatus "Stopping..." changes from Milvus PR The PageStatus changes belong in a separate PR (#549) and were accidentally included here. * fix(nodes): address CodeRabbit feedback on Milvus PR #562 - Remove dead protocol check (host already stripped of scheme at init) - Add exception chaining with 'from e' for connection errors (B904) - Add output_fields to markDeleted/markActive queries to prevent data loss during upsert (was only returning primary key) - Add output_fields to renderChunks query to prevent KeyError on content/chunkId access Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(nodes): address remaining review feedback on Milvus PR #562 - Remove unrelated PageStatus changes that were re-introduced - Validate timeout and bulkInsertBatchSize to ensure positive values - Make isDeleted a keyword-only argument in _batchUpsertResults Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
nodes/src/nodes/milvus/milvus.py)Type
Feature Enhancement
Why this feature fits this codebase
The Milvus node contained 7 TODO comments left by the original authors flagging known performance and reliability gaps. These are not speculative improvements — they are changes the maintainers explicitly marked as needed:
timeout=20with no config option. Now readstimeoutfrom node config (default 60s), matching Qdrant's pattern (timeout=60).upsert()calls inaddChunks(). Now batches chunks (configurablebulkInsertBatchSize, default 50) before upserting, following the same flush-batch pattern used in Pinecone and Qdrant nodes.markDeleted()/markActive()flagged as a performance bottleneck. Extracted into_batchUpsertResults()helper that collects results into batches.[0, 2]and the rescaling to[0, 1]is correct.What changed
nodes/src/nodes/milvus/milvus.pyaddChunks();_batchUpsertResults()helper formarkDeleted/markActive; timeout onremove(); COSINE score documentation; typo fixesValidation
ruff format --checkpassesruff checkpassesHow this can be extended
bulkInsertBatchSizeconfig could be tuned per deployment (cloud vs local Milvus)_batchUpsertResults()could be reused for future bulk metadata update operationsCloses: N/A — new contribution, no pre-existing issue
#Hack-with-bay-2
🤖 Generated with Claude Code
Summary by CodeRabbit
Bug Fixes
Chores