1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414#include " mixed_streamer_reducer.h"
15+ #include < algorithm>
1516#include < ailego/pattern/defer.h>
1617#include < utility/sparse_utility.h>
1718#include < zvec/ailego/utility/file_helper.h>
@@ -141,15 +142,16 @@ int MixedStreamerReducer::reduce(const IndexFilter &filter) {
141142 ailego::ElapsedTime timer;
142143
143144
144- std::vector<int > add_results (num_of_add_threads_, -1 );
145+ const size_t add_thread_count = enable_pk_rewrite_ ? 1 : num_of_add_threads_;
146+ std::vector<int > add_results (add_thread_count, -1 );
145147 auto add_group = thread_pool_->make_group ();
146148
147149 std::vector<int > read_results (streamers_.size (), -1 );
148150 // TODO: use id instead of key
149151 uint32_t id_offset = 0 , next_id = 0 ;
150152
151153 if (is_sparse_) {
152- for (size_t i = 0 ; i < num_of_add_threads_ ; i++) {
154+ for (size_t i = 0 ; i < add_thread_count ; i++) {
153155 add_group->submit (ailego::Closure::New (
154156 this , &MixedStreamerReducer::add_sparse_vec, &add_results[i]));
155157 }
@@ -162,7 +164,7 @@ int MixedStreamerReducer::reduce(const IndexFilter &filter) {
162164
163165 sparse_mt_list_.done ();
164166 } else {
165- for (size_t i = 0 ; i < num_of_add_threads_ ; i++) {
167+ for (size_t i = 0 ; i < add_thread_count ; i++) {
166168 add_group->submit (ailego::Closure::New (
167169 this , &MixedStreamerReducer::add_vec, &add_results[i]));
168170 // add_vec(&add_results[i]);
@@ -304,6 +306,7 @@ int MixedStreamerReducer::read_vec(size_t source_streamer_index,
304306
305307 IndexProvider::Pointer provider = streamer->create_provider ();
306308 IndexProvider::Iterator::Pointer iterator = provider->create_iterator ();
309+ std::vector<std::pair<uint32_t , std::vector<uint8_t >>> pending_items;
307310
308311 while (iterator->is_valid ()) {
309312 if (stop_flag_ != nullptr && stop_flag_->load (std::memory_order_relaxed)) {
@@ -332,13 +335,19 @@ int MixedStreamerReducer::read_vec(size_t source_streamer_index,
332335 memcpy (bytes.data (), iterator->data (), bytes.size ());
333336 }
334337
335- // TODO: use id instead of key
336- if (!mt_list_.produce (VectorItem ((*next_id)++, std::move (bytes)))) {
337- LOG_ERROR (" Produce vector to queue failed. key[%lu]" ,
338- (size_t )iterator->key ());
338+ pending_items.emplace_back (iterator->key () + id_offset, std::move (bytes));
339+ iterator->next ();
340+ }
341+
342+ std::sort (pending_items.begin (), pending_items.end (),
343+ [](const auto &lhs, const auto &rhs) {
344+ return lhs.first < rhs.first ;
345+ });
346+ for (auto &item : pending_items) {
347+ if (!mt_list_.produce (VectorItem ((*next_id)++, std::move (item.second )))) {
348+ LOG_ERROR (" Produce vector to queue failed. key[%u]" , item.first );
339349 return IndexError_Runtime;
340350 }
341- iterator->next ();
342351 }
343352 return 0 ;
344353}
@@ -508,6 +517,7 @@ int MixedStreamerReducer::read_sparse_vec(size_t source_streamer_index,
508517 streamer->create_sparse_provider ();
509518 IndexStreamer::SparseProvider::Iterator::Pointer iterator =
510519 provider->create_iterator ();
520+ std::vector<SparseVectorItem> pending_items;
511521
512522 while (iterator->is_valid ()) {
513523 if (stop_flag_ != nullptr && stop_flag_->load (std::memory_order_relaxed)) {
@@ -547,15 +557,24 @@ int MixedStreamerReducer::read_sparse_vec(size_t source_streamer_index,
547557 memcpy (sparse_indices.data (), iterator->sparse_indices (),
548558 sparse_indices.size () * sizeof (uint32_t ));
549559
550- // TODO: use id instead of key
551- if (!sparse_mt_list_.produce (SparseVectorItem ((*next_id)++,
552- std::move (sparse_indices),
553- std::move (sparse_values)))) {
560+ pending_items.emplace_back (iterator->key () + id_offset,
561+ std::move (sparse_indices),
562+ std::move (sparse_values));
563+ iterator->next ();
564+ }
565+
566+ std::sort (pending_items.begin (), pending_items.end (),
567+ [](const SparseVectorItem &lhs, const SparseVectorItem &rhs) {
568+ return lhs.pkey_ < rhs.pkey_ ;
569+ });
570+ for (auto &item : pending_items) {
571+ if (!sparse_mt_list_.produce (SparseVectorItem (
572+ (*next_id)++, std::move (item.sparse_indices_ ),
573+ std::move (item.sparse_values_ )))) {
554574 LOG_ERROR (" Produce vector to queue failed. key[%lu]" ,
555- ( size_t )iterator-> key ( ));
575+ static_cast < size_t >(item. pkey_ ));
556576 return IndexError_Runtime;
557577 }
558- iterator->next ();
559578 }
560579 return 0 ;
561580}
0 commit comments