|
22 | 22 | #include <gen_cpp/FrontendService_types.h> |
23 | 23 | #include <glog/logging.h> |
24 | 24 |
|
| 25 | +#include <algorithm> |
25 | 26 | #include <string> |
26 | 27 | #include <utility> |
27 | 28 |
|
@@ -82,8 +83,82 @@ Status OlapTabletFinder::find_tablets(RuntimeState* state, Block* block, int row |
82 | 83 |
|
83 | 84 | if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_ROW) { |
84 | 85 | _vpartition->find_tablets(block, qualified_rows, partitions, tablet_index); |
| 86 | + } else if (_find_tablet_mode == FindTabletMode::FIND_TABLET_RANDOM_BUCKET) { |
| 87 | + // Use per-batch caching: all rows in the same partition go to the same tablet this batch. |
| 88 | + _vpartition->find_tablets(block, qualified_rows, partitions, tablet_index, |
| 89 | + &_partition_to_tablet_map); |
| 90 | + |
| 91 | + // Count actual rows per partition so we don't credit each partition with the |
| 92 | + // full block size when a block spans multiple partitions. |
| 93 | + flat_hash_map<VOlapTablePartition*, int64_t> partition_row_counts; |
| 94 | + for (uint32_t qi : qualified_rows) { |
| 95 | + partition_row_counts[partitions[qi]]++; |
| 96 | + } |
| 97 | + |
| 98 | + int64_t bytes_per_row = rows > 0 ? block->bytes() / rows : 0; |
| 99 | + for (auto& [partition, _] : _partition_to_tablet_map) { |
| 100 | + if (partition->indexes.empty()) { |
| 101 | + continue; |
| 102 | + } |
| 103 | + const auto& seqs = partition->local_bucket_seqs; |
| 104 | + |
| 105 | + // Maintain _partition_seq_pos so rotation is O(1) after the first batch. |
| 106 | + // On first encounter, find the current bucket's position in seqs (one-time O(n)). |
| 107 | + // On subsequent batches we use the cached position directly. |
| 108 | + int32_t& seq_pos = _partition_seq_pos[partition]; // default-initialised to 0 |
| 109 | + if (seq_pos == 0 && !seqs.empty()) { |
| 110 | + // Sync seq_pos with the FE-assigned starting bucket. If load_tablet_idx happens |
| 111 | + // to already be seqs[0] this is a no-op (seq_pos stays 0), otherwise we find |
| 112 | + // the correct starting position once. |
| 113 | + int32_t cur_bucket_val = |
| 114 | + static_cast<int32_t>(partition->load_tablet_idx % partition->num_buckets); |
| 115 | + if (seqs[0] != cur_bucket_val) { |
| 116 | + auto it = std::find(seqs.begin(), seqs.end(), cur_bucket_val); |
| 117 | + if (it != seqs.end()) { |
| 118 | + seq_pos = static_cast<int32_t>(it - seqs.begin()); |
| 119 | + } |
| 120 | + } |
| 121 | + } |
| 122 | + |
| 123 | + // Determine the current bucket: |
| 124 | + // - adaptive mode: use the cached position in local_bucket_seqs (O(1)) |
| 125 | + // - fallback (no seqs, or dynamically added partition without seqs): use |
| 126 | + // load_tablet_idx if valid, otherwise fall back to fast_rand() to match |
| 127 | + // the FIND_TABLET_EVERY_BATCH behaviour for partitions that were added |
| 128 | + // at runtime (e.g. auto-partition, overwrite replace) and did not go |
| 129 | + // through assignRandomBucketPerBe. |
| 130 | + int64_t cur_bucket; |
| 131 | + if (!seqs.empty()) { |
| 132 | + cur_bucket = seqs[seq_pos]; |
| 133 | + } else if (partition->load_tablet_idx >= 0) { |
| 134 | + cur_bucket = partition->load_tablet_idx % partition->num_buckets; |
| 135 | + } else { |
| 136 | + cur_bucket = |
| 137 | + static_cast<int64_t>(butil::fast_rand()) % partition->num_buckets; |
| 138 | + } |
| 139 | + int64_t cur_tablet_id = partition->indexes[0].tablets[cur_bucket]; |
| 140 | + |
| 141 | + // Credit only the rows that actually went to this partition. |
| 142 | + int64_t part_rows = 0; |
| 143 | + if (auto it = partition_row_counts.find(partition); it != partition_row_counts.end()) { |
| 144 | + part_rows = it->second; |
| 145 | + } |
| 146 | + _tablet_written_bytes[cur_tablet_id] += bytes_per_row * part_rows; |
| 147 | + |
| 148 | + // Switch to the next local bucket when the threshold is reached. |
| 149 | + // local_bucket_seqs carries the exact set of buckets assigned to this BE by the FE. |
| 150 | + if (!seqs.empty() && |
| 151 | + _tablet_written_bytes[cur_tablet_id] >= BUCKET_SWITCH_THRESHOLD_BYTES) { |
| 152 | + seq_pos = (seq_pos + 1) % static_cast<int32_t>(seqs.size()); |
| 153 | + partition->load_tablet_idx = seqs[seq_pos]; |
| 154 | + VLOG(2) << "FIND_TABLET_RANDOM_BUCKET: partition=" << partition->id |
| 155 | + << " rotate bucket " << cur_bucket << " -> " << seqs[seq_pos] |
| 156 | + << " after " << _tablet_written_bytes[cur_tablet_id] << " bytes"; |
| 157 | + } |
| 158 | + } |
| 159 | + _partition_to_tablet_map.clear(); |
85 | 160 | } else { |
86 | | - // for random distribution |
| 161 | + // FIND_TABLET_EVERY_BATCH / FIND_TABLET_EVERY_SINK |
87 | 162 | _vpartition->find_tablets(block, qualified_rows, partitions, tablet_index, |
88 | 163 | &_partition_to_tablet_map); |
89 | 164 | if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_BATCH) { |
|
0 commit comments