Skip to content

Commit e30b37f

Browse files
authored
Merge branch 'master' into fix/stale-version-count-bug
2 parents 2a8bc2c + b830f8e commit e30b37f

73 files changed

Lines changed: 4950 additions & 493 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.asf.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,5 +180,7 @@ github:
180180
- yuanyuan8983
181181
- yz-jayhua
182182
- ixzc
183+
- deardeng
184+
- wyxxxcat
183185
notifications:
184186
pullrequests_status: commits@doris.apache.org

be/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -838,6 +838,12 @@ endif()
838838
option(BUILD_FILE_CACHE_MICROBENCH_TOOL "Build file cache mirobench Tool" OFF)
839839
if (BUILD_FILE_CACHE_MICROBENCH_TOOL)
840840
add_subdirectory(${SRC_DIR}/io/tools)
841+
install(FILES
842+
${BASE_DIR}/../bin/start_file_cache_microbench.sh
843+
PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE
844+
GROUP_READ GROUP_WRITE GROUP_EXECUTE
845+
WORLD_READ WORLD_EXECUTE
846+
DESTINATION ${OUTPUT_DIR}/bin)
841847
endif()
842848

843849
option(BUILD_INDEX_TOOL "Build index tool" OFF)

be/src/http/action/file_cache_action.cpp

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -61,28 +61,29 @@ constexpr static std::string_view VALUE = "value";
6161
constexpr static std::string_view RELOAD = "reload";
6262

6363
Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metrics) {
64-
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data());
65-
std::string operation = req->param(OP.data());
64+
const std::string header_json(HEADER_JSON);
65+
req->add_output_header(HttpHeaders::CONTENT_TYPE, header_json.c_str());
66+
std::string operation = req->param(std::string(OP));
6667
Status st = Status::OK();
6768
if (operation == RELEASE) {
6869
size_t released = 0;
69-
const std::string& base_path = req->param(BASE_PATH.data());
70+
const std::string& base_path = req->param(std::string(BASE_PATH));
7071
if (!base_path.empty()) {
7172
released = io::FileCacheFactory::instance()->try_release(base_path);
7273
} else {
7374
released = io::FileCacheFactory::instance()->try_release();
7475
}
7576
EasyJson json;
76-
json[RELEASED_ELEMENTS.data()] = released;
77+
json[std::string(RELEASED_ELEMENTS)] = released;
7778
*json_metrics = json.ToString();
7879
} else if (operation == CLEAR) {
7980
DBUG_EXECUTE_IF("FileCacheAction._handle_header.ignore_clear", {
8081
LOG_WARNING("debug point FileCacheAction._handle_header.ignore_clear");
8182
st = Status::OK();
8283
return st;
8384
});
84-
const std::string& sync = req->param(SYNC.data());
85-
const std::string& segment_path = req->param(VALUE.data());
85+
const std::string& sync = req->param(std::string(SYNC));
86+
const std::string& segment_path = req->param(std::string(VALUE));
8687
if (segment_path.empty()) {
8788
io::FileCacheFactory::instance()->clear_file_caches(to_lower(sync) == "true");
8889
} else {
@@ -91,7 +92,7 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri
9192
cache->remove_if_cached(hash);
9293
}
9394
} else if (operation == RESET) {
94-
std::string capacity = req->param(CAPACITY.data());
95+
std::string capacity = req->param(std::string(CAPACITY));
9596
int64_t new_capacity = 0;
9697
bool parse = true;
9798
try {
@@ -105,24 +106,24 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri
105106
"the interval (0, INT64_MAX]",
106107
capacity);
107108
} else {
108-
const std::string& path = req->param(PATH.data());
109+
const std::string& path = req->param(std::string(PATH));
109110
auto ret = io::FileCacheFactory::instance()->reset_capacity(path, new_capacity);
110111
LOG(INFO) << ret;
111112
}
112113
} else if (operation == HASH) {
113-
const std::string& segment_path = req->param(VALUE.data());
114+
const std::string& segment_path = req->param(std::string(VALUE));
114115
if (segment_path.empty()) {
115-
st = Status::InvalidArgument("missing parameter: {} is required", VALUE.data());
116+
st = Status::InvalidArgument("missing parameter: {} is required", VALUE);
116117
} else {
117118
io::UInt128Wrapper ret = io::BlockFileCache::hash(segment_path);
118119
EasyJson json;
119-
json[HASH.data()] = ret.to_string();
120+
json[std::string(HASH)] = ret.to_string();
120121
*json_metrics = json.ToString();
121122
}
122123
} else if (operation == LIST_CACHE) {
123-
const std::string& segment_path = req->param(VALUE.data());
124+
const std::string& segment_path = req->param(std::string(VALUE));
124125
if (segment_path.empty()) {
125-
st = Status::InvalidArgument("missing parameter: {} is required", VALUE.data());
126+
st = Status::InvalidArgument("missing parameter: {} is required", VALUE);
126127
} else {
127128
io::UInt128Wrapper cache_hash = io::BlockFileCache::hash(segment_path);
128129
std::vector<std::string> cache_files =
@@ -145,9 +146,9 @@ Status FileCacheAction::_handle_header(HttpRequest* req, std::string* json_metri
145146
[&json](auto& x) { json.PushBack(std::move(x)); });
146147
*json_metrics = json.ToString();
147148
} else if (operation == CHECK_CONSISTENCY) {
148-
const std::string& cache_base_path = req->param(BASE_PATH.data());
149+
const std::string& cache_base_path = req->param(std::string(BASE_PATH));
149150
if (cache_base_path.empty()) {
150-
st = Status::InvalidArgument("missing parameter: {} is required", BASE_PATH.data());
151+
st = Status::InvalidArgument("missing parameter: {} is required", BASE_PATH);
151152
} else {
152153
auto* block_file_cache = io::FileCacheFactory::instance()->get_by_path(cache_base_path);
153154
if (block_file_cache == nullptr) {

be/src/io/tools/file_cache_microbench.cpp

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,27 @@ const size_t BUFFER_SIZE = 1024 * 1024;
101101
// Just 10^9.
102102
static constexpr auto NS = 1000000000UL;
103103

104+
static std::string normalize_benchmark_prefix(std::string_view raw_prefix) {
105+
std::string normalized {doris::trim(raw_prefix)};
106+
while (!normalized.empty() && normalized.front() == '/') {
107+
normalized.erase(normalized.begin());
108+
}
109+
while (!normalized.empty() && normalized.back() == '/') {
110+
normalized.pop_back();
111+
}
112+
return normalized;
113+
}
114+
115+
static std::string get_prefix() {
116+
std::string prefix = HIDDEN_PREFIX;
117+
std::string subdir = normalize_benchmark_prefix(doris::config::test_s3_prefix);
118+
if (!subdir.empty()) {
119+
prefix += subdir;
120+
prefix += "/";
121+
}
122+
return prefix;
123+
}
124+
104125
DEFINE_int32(port, 8888, "Http Port of this server");
105126

106127
static std::string build_info() {
@@ -487,7 +508,7 @@ std::string get_usage(const std::string& progname) {
487508
"read_iops": <limit>, // IOPS limit for reading per segment files
488509
"num_threads": <count>, // Number of threads in the thread pool, default 200
489510
"num_files": <count>, // Number of segments to write/read
490-
"file_prefix": "<prefix>", // Prefix for segment files, Notice: this tools hide prefix(test_file_cache_microbench/) before file_prefix
511+
"file_prefix": "<prefix>", // Prefix for segment files, key prefix is test_file_cache_microbench/<test_s3_prefix>/
491512
"write_batch_size": <size>, // Size of data to write in each write operation
492513
"cache_type": <type>, // Write or Read data enter file cache queue type, support NORMAL | TTL | INDEX | DISPOSABLE, default NORMAL
493514
"expiration": <timestamp>, // File cache ttl expire time, value is a unix timestamp
@@ -720,7 +741,7 @@ struct JobConfig {
720741
"repeat: {}, expiration: {}, cache_type: {}, read_offset: [{}, {}), "
721742
"read_length: [{}, {})",
722743
size_bytes_perfile, write_iops, read_iops, num_threads, num_files,
723-
HIDDEN_PREFIX + file_prefix, write_file_cache, write_batch_size, repeat, expiration,
744+
get_prefix() + file_prefix, write_file_cache, write_batch_size, repeat, expiration,
724745
cache_type, read_offset_left, read_offset_right, read_length_left,
725746
read_length_right);
726747
}
@@ -1280,7 +1301,7 @@ class JobManager {
12801301
// If it's a read-only job, find the previously written files
12811302
if (config.read_iops > 0 && config.write_iops == 0) {
12821303
std::string old_job_id =
1283-
s3_file_records.find_job_id_by_prefix(HIDDEN_PREFIX + config.file_prefix);
1304+
s3_file_records.find_job_id_by_prefix(get_prefix() + config.file_prefix);
12841305
if (old_job_id.empty()) {
12851306
throw std::runtime_error(
12861307
"Can't find previously job uploaded files. Please make sure read "
@@ -1293,7 +1314,7 @@ class JobManager {
12931314

12941315
// Generate file keys
12951316
for (int i = 0; i < config.num_files; ++i) {
1296-
keys.push_back(HIDDEN_PREFIX + config.file_prefix + "/" + rewrite_job_id + "_" +
1317+
keys.push_back(get_prefix() + config.file_prefix + "/" + rewrite_job_id + "_" +
12971318
std::to_string(i));
12981319
}
12991320

@@ -1424,7 +1445,7 @@ class JobManager {
14241445
auto start_time = std::chrono::steady_clock::now();
14251446

14261447
int64_t exist_job_perfile_size = s3_file_records.get_exist_job_perfile_size_by_prefix(
1427-
HIDDEN_PREFIX + config.file_prefix);
1448+
get_prefix() + config.file_prefix);
14281449
std::vector<std::future<void>> read_futures;
14291450
doris::io::IOContext io_ctx;
14301451
doris::io::FileCacheStatistics total_stats;
@@ -1447,7 +1468,7 @@ class JobManager {
14471468
std::vector<std::string> read_files;
14481469
if (exist_job_perfile_size != -1) {
14491470
// read exist files
1450-
s3_file_records.get_exist_job_files_by_prefix(HIDDEN_PREFIX + config.file_prefix,
1471+
s3_file_records.get_exist_job_files_by_prefix(get_prefix() + config.file_prefix,
14511472
read_files, config.num_files);
14521473
}
14531474

@@ -2662,12 +2683,8 @@ int main(int argc, char* argv[]) {
26622683
};
26632684
periodiccally_log_thread = std::thread {periodiccally_log};
26642685

2665-
try {
2666-
HttpServer http_server;
2667-
http_server.start(doris_home);
2668-
} catch (const std::exception& e) {
2669-
LOG(ERROR) << "Error in HTTP server: " << e.what();
2670-
}
2686+
HttpServer http_server;
2687+
http_server.start(doris_home);
26712688

26722689
if (periodiccally_log_thread.joinable()) {
26732690
{

be/src/olap/memtable_writer.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,6 @@ Status MemTableWriter::_flush_memtable_async() {
160160
std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
161161
memtable = _mem_table;
162162
_mem_table = nullptr;
163-
}
164-
{
165-
std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
166163
memtable->update_mem_type(MemType::WRITE_FINISHED);
167164
_freezed_mem_tables.push_back(memtable);
168165
}

be/src/pipeline/common/set_utils.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,13 @@ using SetPrimaryTypeHashTableContextNullable = vectorized::MethodSingleNullableC
5151
using SetSerializedHashTableContext =
5252
vectorized::MethodSerialized<PHHashMap<StringRef, RowRefWithFlag>>;
5353
using SetMethodOneString = vectorized::MethodStringNoCache<PHHashMap<StringRef, RowRefWithFlag>>;
54+
using SetMethodOneStringNullable =
55+
vectorized::MethodSingleNullableColumn<vectorized::MethodStringNoCache<
56+
vectorized::DataWithNullKey<PHHashMap<StringRef, RowRefWithFlag>>>>;
5457

5558
using SetHashTableVariants =
5659
std::variant<std::monostate, SetSerializedHashTableContext, SetMethodOneString,
60+
SetMethodOneStringNullable,
5761
SetPrimaryTypeHashTableContextNullable<vectorized::UInt8>,
5862
SetPrimaryTypeHashTableContextNullable<vectorized::UInt16>,
5963
SetPrimaryTypeHashTableContextNullable<vectorized::UInt32>,
@@ -102,7 +106,11 @@ struct SetDataVariants
102106
emplace_single<vectorized::UInt256, SetData<vectorized::UInt256>>(nullable);
103107
break;
104108
case HashKeyType::string_key:
105-
method_variant.emplace<SetMethodOneString>();
109+
if (nullable) {
110+
method_variant.emplace<SetMethodOneStringNullable>();
111+
} else {
112+
method_variant.emplace<SetMethodOneString>();
113+
}
106114
break;
107115
case HashKeyType::fixed64:
108116
method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt64>>(

be/src/pipeline/exec/olap_scan_operator.cpp

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -894,21 +894,7 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() {
894894
_scan_keys.extend_scan_key(temp_range, p._max_scan_key_num,
895895
&exact_range, &eos, &should_break));
896896
if (exact_range) {
897-
auto key = iter->first;
898-
_slot_id_to_value_range.erase(key);
899-
900-
std::vector<std::shared_ptr<ColumnPredicate>> new_predicates;
901-
for (const auto& it : _slot_id_to_predicates[key]) {
902-
if (it->type() == PredicateType::NOT_IN_LIST ||
903-
it->type() == PredicateType::NE) {
904-
new_predicates.push_back(it);
905-
}
906-
}
907-
if (new_predicates.empty()) {
908-
_slot_id_to_predicates.erase(key);
909-
} else {
910-
_slot_id_to_predicates[key] = new_predicates;
911-
}
897+
_slot_id_to_value_range.erase(iter->first);
912898
}
913899
} else {
914900
// if exceed max_pushdown_conditions_per_column, use whole_value_rang instead

be/src/pipeline/exec/scan_operator.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,9 @@ Status ScanLocalState<Derived>::_normalize_bloom_filter(
458458
if (pred) {
459459
DCHECK(*pdt != PushDownType::UNACCEPTABLE) << root->debug_string();
460460
predicates.emplace_back(pred);
461+
} else {
462+
// If exception occurs during processing, do not push down
463+
*pdt = PushDownType::UNACCEPTABLE;
461464
}
462465
};
463466
DCHECK(TExprNodeType::BLOOM_PRED == root->node_type());
@@ -484,6 +487,9 @@ Status ScanLocalState<Derived>::_normalize_topn_filter(
484487
if (pred) {
485488
DCHECK(*pdt != PushDownType::UNACCEPTABLE) << root->debug_string();
486489
predicates.emplace_back(pred);
490+
} else {
491+
// If exception occurs during processing, do not push down
492+
*pdt = PushDownType::UNACCEPTABLE;
487493
}
488494
};
489495
DCHECK(root->is_topn_filter());
@@ -508,6 +514,9 @@ Status ScanLocalState<Derived>::_normalize_bitmap_filter(
508514
if (pred) {
509515
DCHECK(*pdt != PushDownType::UNACCEPTABLE) << root->debug_string();
510516
predicates.emplace_back(pred);
517+
} else {
518+
// If exception occurs during processing, do not push down
519+
*pdt = PushDownType::UNACCEPTABLE;
511520
}
512521
};
513522
DCHECK(TExprNodeType::BITMAP_PRED == root->node_type());
@@ -656,6 +665,9 @@ Status ScanLocalState<Derived>::_normalize_in_predicate(
656665
if (pred) {
657666
DCHECK(*pdt != PushDownType::UNACCEPTABLE) << root->debug_string();
658667
predicates.emplace_back(pred);
668+
} else {
669+
// If exception occurs during processing, do not push down
670+
*pdt = PushDownType::UNACCEPTABLE;
659671
}
660672
};
661673

@@ -754,6 +766,9 @@ Status ScanLocalState<Derived>::_normalize_binary_predicate(
754766
if (pred) {
755767
DCHECK(*pdt != PushDownType::UNACCEPTABLE) << root->debug_string();
756768
predicates.emplace_back(pred);
769+
} else {
770+
// If exception occurs during processing, do not push down
771+
*pdt = PushDownType::UNACCEPTABLE;
757772
}
758773
};
759774

@@ -931,6 +946,9 @@ Status ScanLocalState<Derived>::_normalize_is_null_predicate(
931946
if (pred) {
932947
DCHECK(*pdt != PushDownType::UNACCEPTABLE) << root->debug_string();
933948
predicates.emplace_back(pred);
949+
} else {
950+
// If exception occurs during processing, do not push down
951+
*pdt = PushDownType::UNACCEPTABLE;
934952
}
935953
};
936954
DCHECK(!root->is_rf_wrapper()) << root->debug_string();

0 commit comments

Comments
 (0)