Skip to content

Commit 21360c4

Browse files
lferranjotare
andauthored
Make sure broker messages from vectorset migrations are properly ingested (#2764)
* Fix has_field usage * Get fields before applying extracted vectors * Add test * add test * Replace sentences_to_delete for vector_prefixes_to_delete * Implement nidx vector deletion by vectorset * Implement the same for node * Populate vector_prefixes_to_delete if vectorsets are used * Remove useless datamanager vectorset creation * Linter * Use inject_message to mark nidx dirty * Extend test to validate keyword search * Fix paragraphs_to_delete * Pass replace_field parameter --------- Co-authored-by: Joan Antoni RE <joanantoni.re16@gmail.com>
1 parent 85e5b46 commit 21360c4

25 files changed

Lines changed: 326 additions & 57 deletions

File tree

nidx/nidx_binding/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@ classifiers = [
1111
"Programming Language :: Python :: Implementation :: PyPy",
1212
]
1313
dynamic = ["version"]
14+
1415
[tool.maturin]
1516
features = ["pyo3/extension-module"]

nidx/nidx_vector/src/data_point_provider/reader.rs

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -395,16 +395,10 @@ mod tests {
395395
};
396396
let resource = Resource {
397397
resource: Some(resource_id),
398-
metadata: None,
399398
texts: HashMap::with_capacity(0),
400399
status: ResourceStatus::Processed as i32,
401400
labels: vec!["2".to_string()],
402401
paragraphs: HashMap::from([("DOC/KEY".to_string(), paragraphs)]),
403-
paragraphs_to_delete: vec![],
404-
sentences_to_delete: vec![],
405-
relations: vec![],
406-
vectors: HashMap::default(),
407-
vectors_to_delete: HashMap::default(),
408402
shard_id: "DOC".to_string(),
409403
..Default::default()
410404
};
@@ -486,16 +480,10 @@ mod tests {
486480
};
487481
let resource = Resource {
488482
resource: Some(resource_id),
489-
metadata: None,
490483
texts: HashMap::with_capacity(0),
491484
status: ResourceStatus::Processed as i32,
492485
labels: vec!["2".to_string()],
493486
paragraphs: HashMap::from([("DOC/KEY".to_string(), paragraphs)]),
494-
paragraphs_to_delete: vec![],
495-
sentences_to_delete: vec![],
496-
relations: vec![],
497-
vectors: HashMap::default(),
498-
vectors_to_delete: HashMap::default(),
499487
shard_id: "DOC".to_string(),
500488
..Default::default()
501489
};
@@ -608,16 +596,10 @@ mod tests {
608596
};
609597
let resource = Resource {
610598
resource: Some(resource_id),
611-
metadata: None,
612599
texts: HashMap::with_capacity(0),
613600
status: ResourceStatus::Processed as i32,
614601
labels: vec!["2".to_string()],
615602
paragraphs: HashMap::from([("DOC/KEY".to_string(), paragraphs)]),
616-
paragraphs_to_delete: vec![],
617-
sentences_to_delete: vec![],
618-
relations: vec![],
619-
vectors: HashMap::default(),
620-
vectors_to_delete: HashMap::default(),
621603
shard_id: "DOC".to_string(),
622604
..Default::default()
623605
};

nidx/nidx_vector/src/lib.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,14 @@ impl VectorIndexer {
6767
index_resource(vectorset_resource, output_dir, config)
6868
}
6969

70-
pub fn deletions_for_resource(&self, resource: &Resource) -> Vec<String> {
71-
resource.sentences_to_delete.clone()
70+
pub fn deletions_for_resource(&self, resource: &Resource, index_name: &str) -> Vec<String> {
71+
if let Some(prefixes) = resource.vector_prefixes_to_delete.get(index_name) {
72+
prefixes.items.clone()
73+
} else {
74+
// DEPRECATED: Bw/c while moving from sentences_to_delete to vector_prefixes_to_delete
75+
#[allow(deprecated)]
76+
resource.sentences_to_delete.clone()
77+
}
7278
}
7379

7480
#[instrument(name = "vector::merge", skip_all)]

nidx/src/indexer.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ fn index_resource_to_index(
263263
};
264264

265265
let deletions = match index.kind {
266-
IndexKind::Vector => nidx_vector::VectorIndexer.deletions_for_resource(resource),
266+
IndexKind::Vector => nidx_vector::VectorIndexer.deletions_for_resource(resource, &index.name),
267267
IndexKind::Text => nidx_text::TextIndexer.deletions_for_resource(resource),
268268
IndexKind::Paragraph => nidx_paragraph::ParagraphIndexer.deletions_for_resource(resource),
269269
IndexKind::Relation => nidx_relation::RelationIndexer.deletions_for_resource(resource),
@@ -277,6 +277,7 @@ fn index_resource_to_index(
277277
mod tests {
278278
use std::io::{Seek, Write};
279279

280+
use nidx_protos::StringList;
280281
use nidx_vector::config::{Similarity, VectorConfig, VectorType};
281282
use tempfile::tempfile;
282283
use uuid::Uuid;
@@ -354,7 +355,12 @@ mod tests {
354355
format!("{}/a/title/0-15", resource.resource.as_ref().unwrap().uuid),
355356
format!("{}/a/summary/0-150", resource.resource.as_ref().unwrap().uuid),
356357
];
357-
resource.sentences_to_delete.append(&mut keys.clone());
358+
resource.vector_prefixes_to_delete.insert(
359+
index.name,
360+
StringList {
361+
items: keys.clone(),
362+
},
363+
);
358364
index_resource(
359365
&meta,
360366
storage.clone(),

nidx/tests/test_date_range_search.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,8 @@ async fn populate(fixture: &mut NidxFixture, shard_id: String, metadata: IndexMe
9292
resource: Some(resource_id),
9393
metadata: Some(metadata),
9494
status: ResourceStatus::Processed as i32,
95-
labels: vec![],
9695
texts: HashMap::from([(field_id.clone(), text_content)]),
9796
paragraphs: HashMap::from([(format!("{raw_resource_id}/{field_id}"), paragraphs)]),
98-
paragraphs_to_delete: vec![],
99-
sentences_to_delete: vec![],
100-
relations: vec![],
101-
vectors: HashMap::default(),
102-
vectors_to_delete: HashMap::default(),
10397
..Default::default()
10498
};
10599

nucliadb/src/nucliadb/common/external_index_providers/pinecone.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@ def get_index_host(self, vectorset_id: str, rollover: bool = False) -> str:
441441

442442
def get_prefixes_to_delete(self, index_data: Resource) -> set[str]:
443443
prefixes_to_delete = set()
444+
# TODO: migrate to vector_prefixes_to_delete
444445
for field_id in index_data.sentences_to_delete:
445446
try:
446447
delete_vid = VectorId.from_string(field_id)

nucliadb/src/nucliadb/ingest/orm/brain.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ def apply_field_metadata(
100100
page_positions: Optional[FilePagePositions],
101101
extracted_text: Optional[ExtractedText],
102102
basic_user_field_metadata: Optional[UserFieldMetadata] = None,
103+
*,
104+
replace_field: bool = False,
103105
):
104106
# To check for duplicate paragraphs
105107
unique_paragraphs: set[str] = set()
@@ -224,6 +226,11 @@ def apply_field_metadata(
224226

225227
self.brain.paragraphs[field_key].paragraphs[key].CopyFrom(p)
226228

229+
if replace_field:
230+
field_type, field_name = field_key.split("/")
231+
full_field_id = ids.FieldId(rid=self.rid, type=field_type, key=field_name).full()
232+
self.brain.paragraphs_to_delete.append(full_field_id)
233+
227234
for relations in metadata.metadata.relations:
228235
for relation in relations.relations:
229236
self.brain.relations.append(relation)
@@ -301,8 +308,11 @@ def apply_field_vectors(
301308

302309
if replace_field:
303310
full_field_id = ids.FieldId(rid=self.rid, type=fid.type, key=fid.key).full()
304-
self.brain.sentences_to_delete.append(full_field_id)
305-
self.brain.paragraphs_to_delete.append(full_field_id)
311+
if vectorset is None:
312+
# DEPRECATED
313+
self.brain.sentences_to_delete.append(full_field_id)
314+
else:
315+
self.brain.vector_prefixes_to_delete[vectorset].items.append(full_field_id)
306316

307317
def _apply_field_vector(
308318
self,

nucliadb/src/nucliadb/ingest/orm/processor/processor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,6 @@ async def txn(
275275

276276
if message.source == writer_pb2.BrokerMessage.MessageSource.WRITER:
277277
resource = await kb.get(uuid)
278-
279278
if resource is None:
280279
# It's a new resource
281280
resource = await kb.add_resource(uuid, message.slug, message.basic)
@@ -737,7 +736,11 @@ def has_vectors_operation(index_message: PBBrainResource) -> bool:
737736
"""
738737
Returns True if the index message has any vectors to index or to delete.
739738
"""
740-
if len(index_message.sentences_to_delete) > 0 or len(index_message.paragraphs_to_delete) > 0:
739+
if (
740+
len(index_message.sentences_to_delete) > 0
741+
or len(index_message.paragraphs_to_delete) > 0
742+
or any([len(deletions.items) for deletions in index_message.vector_prefixes_to_delete.values()])
743+
):
741744
return True
742745
for field_paragraphs in index_message.paragraphs.values():
743746
for paragraph in field_paragraphs.paragraphs.values():

nucliadb/src/nucliadb/ingest/orm/resource.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ async def set_basic(
226226
page_positions=page_positions,
227227
extracted_text=await field_obj.get_extracted_text(),
228228
basic_user_field_metadata=user_field_metadata,
229+
replace_field=True,
229230
)
230231

231232
# Some basic fields are computed off field metadata.
@@ -336,6 +337,7 @@ async def generate_index_message(self, reindex: bool = False) -> ResourceBrain:
336337
page_positions=page_positions,
337338
extracted_text=await field.get_extracted_text(),
338339
basic_user_field_metadata=user_field_metadata,
340+
replace_field=reindex,
339341
)
340342

341343
if self.disable_vectors is False:
@@ -584,6 +586,7 @@ async def apply_extracted(self, message: BrokerMessage):
584586
# Upload to binary storage
585587
# Vector indexing
586588
if self.disable_vectors is False:
589+
await self.get_fields(force=True)
587590
for field_vectors in message.field_vectors:
588591
await self._apply_extracted_vectors(field_vectors)
589592

@@ -723,6 +726,7 @@ async def _apply_field_computed_metadata(self, field_metadata: FieldComputedMeta
723726
page_positions=page_positions,
724727
extracted_text=extracted_text,
725728
basic_user_field_metadata=user_field_metadata,
729+
replace_field=True,
726730
)
727731
loop = asyncio.get_running_loop()
728732
await loop.run_in_executor(_executor, apply_field_metadata)

nucliadb/src/nucliadb/writer/api/v1/knowledgebox.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ async def create_kb_endpoint(request: Request, item: KnowledgeBoxConfig) -> Know
6868
except ExternalIndexCreationError as exc:
6969
raise HTTPException(status_code=502, detail=str(exc))
7070
except Exception:
71+
logger.exception("Could not create KB")
7172
raise HTTPException(status_code=500, detail="Error creating knowledge box")
7273
else:
7374
return KnowledgeBoxObj(uuid=kbid, slug=slug)

0 commit comments

Comments
 (0)