Skip to content

Commit b219459

Browse files
[fix](filecache) add check and exception handle for empty block file (apache#59646)
current code didn't check and handle cases when block file is fail to download (empty file). add check and handler for that.
1 parent 54fff1b commit b219459

3 files changed

Lines changed: 147 additions & 2 deletions

File tree

be/src/io/cache/file_block.cpp

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,12 @@ void FileBlock::reset_downloader_impl(std::lock_guard<std::mutex>& block_lock) {
113113

114114
Status FileBlock::set_downloaded(std::lock_guard<std::mutex>& /* block_lock */) {
115115
DCHECK(_download_state != State::DOWNLOADED);
116-
DCHECK_NE(_downloaded_size, 0);
116+
if (_downloaded_size == 0) {
117+
_download_state = State::EMPTY;
118+
_downloader_id = 0;
119+
return Status::InternalError("Try to set empty block {} as downloaded",
120+
_block_range.to_string());
121+
}
117122
Status status = _mgr->_storage->finalize(_key, this->_block_range.size());
118123
if (status.ok()) [[likely]] {
119124
_download_state = State::DOWNLOADED;
@@ -147,7 +152,15 @@ Status FileBlock::append(Slice data) {
147152
}
148153

149154
Status FileBlock::finalize() {
150-
if (_downloaded_size != 0 && _downloaded_size != _block_range.size()) {
155+
if (_downloaded_size == 0) {
156+
std::lock_guard block_lock(_mutex);
157+
_download_state = State::EMPTY;
158+
_downloader_id = 0;
159+
_cv.notify_all();
160+
return Status::InternalError("Try to finalize an empty file block {}",
161+
_block_range.to_string());
162+
}
163+
if (_downloaded_size != _block_range.size()) {
151164
SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr);
152165
size_t old_size = _block_range.size();
153166
_block_range.right = _block_range.left + _downloaded_size - 1;

be/src/io/cache/file_block.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class FileBlock {
4444
friend class BlockFileCache;
4545
friend class CachedRemoteFileReader;
4646
friend struct FileBlockCell;
47+
friend class FileBlockTestAccessor;
4748

4849
public:
4950
enum class State {

be/test/io/cache/block_file_cache_test.cpp

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,22 @@
2323

2424
namespace doris::io {
2525

26+
class FileBlockTestAccessor {
27+
public:
28+
static void set_state(FileBlock& block, FileBlock::State state) {
29+
block._download_state = state;
30+
}
31+
static void set_downloader_id(FileBlock& block, uint64_t id) { block._downloader_id = id; }
32+
static void set_downloaded_size(FileBlock& block, size_t size) {
33+
block._downloaded_size = size;
34+
}
35+
36+
static Status call_set_downloaded(FileBlock& block) {
37+
std::lock_guard<std::mutex> lock(block._mutex);
38+
return block.set_downloaded(lock);
39+
}
40+
};
41+
2642
fs::path caches_dir = fs::current_path() / "lru_cache_test";
2743
std::string cache_base_path = caches_dir / "cache1" / "";
2844
std::string tmp_file = caches_dir / "tmp_file";
@@ -7671,4 +7687,119 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_bytes_check) {
76717687
FileCacheFactory::instance()->_capacity = 0;
76727688
}
76737689

7690+
TEST_F(BlockFileCacheTest, finalize_empty_block) {
7691+
std::string my_cache_path = caches_dir / "empty_block_test" / "";
7692+
if (fs::exists(my_cache_path)) {
7693+
fs::remove_all(my_cache_path);
7694+
}
7695+
io::FileCacheSettings settings;
7696+
settings.capacity = 100;
7697+
settings.max_file_block_size = 100;
7698+
io::BlockFileCache mgr(my_cache_path, settings);
7699+
ASSERT_TRUE(mgr.initialize().ok());
7700+
7701+
for (int i = 0; i < 100; i++) {
7702+
if (mgr.get_async_open_success()) {
7703+
break;
7704+
};
7705+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
7706+
}
7707+
7708+
auto key = io::BlockFileCache::hash("empty_block_test");
7709+
io::CacheContext context;
7710+
ReadStatistics rstats;
7711+
context.stats = &rstats;
7712+
context.cache_type = io::FileCacheType::NORMAL;
7713+
7714+
{
7715+
auto holder = mgr.get_or_set(key, 0, 10, context);
7716+
auto blocks = fromHolder(holder);
7717+
ASSERT_EQ(blocks.size(), 1);
7718+
auto block = blocks[0];
7719+
ASSERT_EQ(block->state(), io::FileBlock::State::EMPTY);
7720+
7721+
ASSERT_EQ(block->get_or_set_downloader(), io::FileBlock::get_caller_id());
7722+
ASSERT_EQ(block->state(), io::FileBlock::State::DOWNLOADING);
7723+
7724+
// Call finalize without calling append()
7725+
Status st = block->finalize();
7726+
ASSERT_FALSE(st.ok());
7727+
ASSERT_EQ(block->state(), io::FileBlock::State::EMPTY);
7728+
ASSERT_EQ(block->get_downloader(), 0);
7729+
}
7730+
if (fs::exists(my_cache_path)) {
7731+
fs::remove_all(my_cache_path);
7732+
}
7733+
}
7734+
7735+
TEST_F(BlockFileCacheTest, finalize_partial_block) {
7736+
std::string my_cache_path = caches_dir / "partial_block_test" / "";
7737+
if (fs::exists(my_cache_path)) {
7738+
fs::remove_all(my_cache_path);
7739+
}
7740+
io::FileCacheSettings settings;
7741+
settings.capacity = 100;
7742+
settings.max_file_block_size = 100;
7743+
io::BlockFileCache mgr(my_cache_path, settings);
7744+
ASSERT_TRUE(mgr.initialize().ok());
7745+
7746+
for (int i = 0; i < 100; i++) {
7747+
if (mgr.get_async_open_success()) {
7748+
break;
7749+
};
7750+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
7751+
}
7752+
7753+
auto key = io::BlockFileCache::hash("partial_block_test");
7754+
io::CacheContext context;
7755+
ReadStatistics rstats;
7756+
context.stats = &rstats;
7757+
context.cache_type = io::FileCacheType::NORMAL;
7758+
7759+
{
7760+
auto holder = mgr.get_or_set(key, 0, 10, context);
7761+
auto blocks = fromHolder(holder);
7762+
ASSERT_EQ(blocks.size(), 1);
7763+
auto block = blocks[0];
7764+
ASSERT_EQ(block->get_or_set_downloader(), io::FileBlock::get_caller_id());
7765+
7766+
std::string data(5, '0');
7767+
ASSERT_TRUE(block->append(Slice(data.data(), data.size())).ok());
7768+
7769+
// Finalize a block that only has 5 bytes out of 10
7770+
Status st = block->finalize();
7771+
ASSERT_TRUE(st.ok());
7772+
ASSERT_EQ(block->state(), io::FileBlock::State::DOWNLOADED);
7773+
ASSERT_EQ(block->range().size(), 5);
7774+
ASSERT_EQ(block->range().right, 4);
7775+
}
7776+
7777+
// Verify it was shrunk in the cache
7778+
ASSERT_EQ(mgr.get_used_cache_size(io::FileCacheType::NORMAL), 5);
7779+
7780+
if (fs::exists(my_cache_path)) {
7781+
fs::remove_all(my_cache_path);
7782+
}
7783+
}
7784+
7785+
TEST_F(BlockFileCacheTest, set_downloaded_empty_block_branch) {
7786+
FileCacheKey key;
7787+
key.hash = io::BlockFileCache::hash("set_downloaded_empty_block_branch");
7788+
key.offset = 0;
7789+
key.meta.type = io::FileCacheType::NORMAL;
7790+
key.meta.expiration_time = 0;
7791+
key.meta.tablet_id = 0;
7792+
7793+
// mgr is intentionally nullptr: this branch returns before touching storage.
7794+
io::FileBlock block(key, 10, nullptr, io::FileBlock::State::EMPTY);
7795+
FileBlockTestAccessor::set_state(block, io::FileBlock::State::DOWNLOADING);
7796+
FileBlockTestAccessor::set_downloader_id(block, 123);
7797+
FileBlockTestAccessor::set_downloaded_size(block, 0);
7798+
7799+
Status st = FileBlockTestAccessor::call_set_downloaded(block);
7800+
ASSERT_FALSE(st.ok());
7801+
ASSERT_EQ(block.state(), io::FileBlock::State::EMPTY);
7802+
ASSERT_EQ(block.get_downloader(), 0);
7803+
}
7804+
76747805
} // namespace doris::io

0 commit comments

Comments
 (0)