1717
1818#include " storage/index/ann/ann_index_writer.h"
1919
20+ #include < algorithm>
2021#include < cstddef>
2122#include < memory>
2223#include < string>
2324
2425#include " common/cast_set.h"
26+ #include " common/config.h"
2527#include " storage/index/ann/faiss_ann_index.h"
2628#include " storage/index/inverted/inverted_index_fs_directory.h"
2729
@@ -40,7 +42,7 @@ AnnIndexColumnWriter::AnnIndexColumnWriter(IndexFileWriter* index_file_writer,
4042 const TabletIndex* index_meta)
4143 : _index_file_writer(index_file_writer), _index_meta(index_meta) {}
4244
43- AnnIndexColumnWriter::~AnnIndexColumnWriter () {}
45+ AnnIndexColumnWriter::~AnnIndexColumnWriter () = default ;
4446
4547Status AnnIndexColumnWriter::init () {
4648 Result<std::shared_ptr<DorisFSDirectory>> compound_dir = _index_file_writer->open (_index_meta);
@@ -78,17 +80,17 @@ Status AnnIndexColumnWriter::init() {
7880 index_type, build_parameter.dim , metric_type, build_parameter.max_degree ,
7981 build_parameter.ef_construction , quantizer);
8082
81- size_t block_size = AnnIndexColumnWriter::chunk_size () * build_parameter.dim ;
82- _float_array.reserve (block_size);
83-
8483 return Status::OK ();
8584}
8685
8786Status AnnIndexColumnWriter::add_values (const std::string fn, const void * values, size_t count) {
8887 return Status::OK ();
8988}
9089
91- void AnnIndexColumnWriter::close_on_error () {}
90+ void AnnIndexColumnWriter::close_on_error () {
91+ PODArray<float > empty_buffered_vectors;
92+ _buffered_vectors.swap (empty_buffered_vectors);
93+ }
9294
9395Status AnnIndexColumnWriter::add_array_values (size_t field_size, const void * value_ptr,
9496 const uint8_t * null_map, const uint8_t * offsets_ptr,
@@ -110,26 +112,10 @@ Status AnnIndexColumnWriter::add_array_values(size_t field_size, const void* val
110112
111113 const float * p = reinterpret_cast <const float *>(value_ptr);
112114
113- const size_t full_elements = AnnIndexColumnWriter::chunk_size () * dim;
114- size_t remaining_elements = num_rows * dim;
115- size_t src_offset = 0 ;
116- while (remaining_elements > 0 ) {
117- size_t available_space = full_elements - _float_array.size ();
118- size_t elements_to_add = std::min (remaining_elements, available_space);
119-
120- _float_array.insert (_float_array.end (), p + src_offset, p + src_offset + elements_to_add);
121- src_offset += elements_to_add;
122- remaining_elements -= elements_to_add;
123-
124- if (_float_array.size () == full_elements) {
125- RETURN_IF_ERROR (
126- _vector_index->train (AnnIndexColumnWriter::chunk_size (), _float_array.data ()));
127- RETURN_IF_ERROR (
128- _vector_index->add (AnnIndexColumnWriter::chunk_size (), _float_array.data ()));
129- _float_array.clear ();
130- _need_save_index = true ;
131- }
132- }
115+ // The offsets check above guarantees every array row matches the ANN index dimension.
116+ DCHECK (p != nullptr );
117+ _buffered_vectors.insert (_buffered_vectors.end (), p, p + num_rows * dim);
118+ _total_rows += cast_set<int64_t >(num_rows);
133119
134120 return Status::OK ();
135121}
@@ -152,55 +138,42 @@ int64_t AnnIndexColumnWriter::size() const {
152138}
153139
154140Status AnnIndexColumnWriter::finish () {
155- Int64 min_train_rows = _vector_index->get_min_train_rows ();
156-
157- // Check if we have enough rows to train the index
158- // train/add the remaining data
159- if (_float_array.empty ()) {
160- if (_need_save_index) {
161- return _vector_index->save (_dir.get ());
162- } else {
163- // No data was added at all. This can happen if the segment has 0 rows
164- // or all rows were filtered out. We need to delete the directory entry
165- // to avoid writing an empty/invalid index file.
166- LOG_INFO (" No data to train/add for ANN index. Skipping index building." );
167- return _index_file_writer->delete_index (_index_meta);
168- }
169- } else {
170- DCHECK (_float_array.size () % _vector_index->get_dimension () == 0 );
171-
172- Int64 num_rows = _float_array.size () / _vector_index->get_dimension ();
173-
174- if (num_rows >= min_train_rows) {
175- RETURN_IF_ERROR (_vector_index->train (num_rows, _float_array.data ()));
176- RETURN_IF_ERROR (_vector_index->add (num_rows, _float_array.data ()));
177- _float_array.clear ();
178- return _vector_index->save (_dir.get ());
179- } else {
180- // It happens to have not enough data to train.
181- // If we have data to add before, we still need to save the index.
182- if (_need_save_index) {
183- // For IVF indexes, adding remaining vectors without training is acceptable
184- // because the quantizer was already trained on previous batches. These vectors
185- // are simply added to the nearest clusters without retraining.
186- RETURN_IF_ERROR (_vector_index->add (num_rows, _float_array.data ()));
187- _float_array.clear ();
188- return _vector_index->save (_dir.get ());
189- } else {
190- // Not enough data to train and no data added before.
191- // Means this is a very small segment, we can skip the index building.
192- // We need to delete the directory entry from index_file_writer to avoid
193- // writing an empty/invalid index file which causes "IndexInput read past EOF" error.
194- LOG_INFO (
195- " Remaining data size {} is less than minimum {} rows required for ANN "
196- " index "
197- " training. Skipping index building for this segment." ,
198- num_rows, min_train_rows);
199- _float_array.clear ();
200- return _index_file_writer->delete_index (_index_meta);
201- }
202- }
141+ if (_total_rows == 0 ) {
142+ LOG_INFO (" No data to train/add for ANN index. Skipping index building." );
143+ return _index_file_writer->delete_index (_index_meta);
144+ }
145+
146+ const Int64 min_train_rows = _vector_index->get_min_train_rows ();
147+ const Int64 effective_min_rows =
148+ std::max (min_train_rows, cast_set<Int64>(config::ann_index_build_min_segment_rows));
149+ if (_total_rows < effective_min_rows) {
150+ LOG_INFO (
151+ " Total data size {} is less than minimum {} rows required for ANN index build. "
152+ " Skipping index building for this segment." ,
153+ _total_rows, effective_min_rows);
154+ PODArray<float > empty_buffered_vectors;
155+ _buffered_vectors.swap (empty_buffered_vectors);
156+ return _index_file_writer->delete_index (_index_meta);
157+ }
158+
159+ return _build_and_save (min_train_rows, effective_min_rows);
160+ }
161+
162+ Status AnnIndexColumnWriter::_build_and_save (Int64 min_train_rows, Int64 effective_min_rows) {
163+ const size_t dim = _vector_index->get_dimension ();
164+ DCHECK (_buffered_vectors.size () % dim == 0 );
165+ const Int64 train_rows = cast_set<Int64>(_buffered_vectors.size () / dim);
166+ DORIS_CHECK (train_rows == _total_rows);
167+ DORIS_CHECK (train_rows >= effective_min_rows);
168+ if (min_train_rows > 0 ) {
169+ RETURN_IF_ERROR (_vector_index->train (train_rows, _buffered_vectors.data ()));
203170 }
171+ RETURN_IF_ERROR (_vector_index->add (train_rows, _buffered_vectors.data ()));
172+ // PODArray::clear() keeps the allocated capacity. Swap with an empty array so the
173+ // full-segment build buffer is released before saving the index.
174+ PODArray<float > empty_buffered_vectors;
175+ _buffered_vectors.swap (empty_buffered_vectors);
176+ return _vector_index->save (_dir.get ());
204177}
205178#include " common/compile_check_end.h"
206179} // namespace doris::segment_v2
0 commit comments