Skip to content

Commit 49cd45e

Browse files
committed
Concurrent load disk when cs starts #880
(#880)
1 parent 09abd6d commit 49cd45e

4 files changed

Lines changed: 58 additions & 16 deletions

File tree

src/chunkserver/block_manager.cc

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,17 +163,47 @@ int64_t BlockManager::DiskQuota() const {
163163
return disk_quota_;
164164
}
165165

166-
// TODO: concurrent load
167166
bool BlockManager::LoadStorage() {
168-
bool ret = true;
167+
// return value from Disk::LoadStorage:
168+
// 0: initial state, 1: done, -1: error occured
169+
std::vector<int> ret_vals;
170+
ret_vals.resize(disks_.size(), 0);
171+
ThreadPool tp(disks_.size());
172+
int disk_index = 0;
169173
for (auto it = disks_.begin(); it != disks_.end(); ++it) {
170174
Disk* disk = it->second;
171-
ret = ret && disk->LoadStorage(std::bind(&BlockManager::AddBlock,
175+
std::function<bool (int64_t, Disk*, BlockMeta)> callback = std::bind(&BlockManager::AddBlock,
172176
this, std::placeholders::_1,
173177
std::placeholders::_2,
174-
std::placeholders::_3));
175-
disk_quota_ += disk->GetQuota();
178+
std::placeholders::_3);
179+
tp.AddTask(std::bind(&Disk::LoadStorage, disk, callback, &(ret_vals[disk_index])));
180+
++disk_index;
181+
}
182+
bool ret = true;
183+
while (true) {
184+
sleep(1);
185+
bool done = true;
186+
for (auto it = ret_vals.begin(); it != ret_vals.end(); ++it) {
187+
if (*it < 0) {
188+
ret = false;
189+
break;
190+
} else if (*it == 0) {
191+
done = false;
192+
break;
193+
}
194+
}
195+
if (done || !ret) {
196+
break;
197+
}
198+
}
199+
if (ret) {
200+
for (auto it = disks_.begin(); it != disks_.end(); ++it) {
201+
Disk* disk = it->second;
202+
disk_quota_ += disk->GetQuota();
203+
}
176204
}
205+
tp.Stop(false);
206+
LOG(INFO, "LoadStorage done. Quota = %ld", disk_quota_);
177207
return ret;
178208
}
179209

src/chunkserver/disk.cc

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,16 @@ Disk::~Disk() {
3535
metadb_ = NULL;
3636
}
3737

38-
bool Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback) {
38+
void Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback, int* ret_val) {
3939
MutexLock lock(&mu_);
4040
int64_t start_load_time = common::timer::get_micros();
4141
leveldb::Options options;
4242
options.create_if_missing = true;
4343
leveldb::Status s = leveldb::DB::Open(options, path_ + "meta/", &metadb_);
4444
if (!s.ok()) {
4545
LOG(WARNING, "Load blocks fail: %s", s.ToString().c_str());
46-
return false;
46+
*ret_val = -1;
47+
return;
4748
}
4849

4950
std::string version_key(8, '\0');
@@ -61,7 +62,8 @@ bool Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback)
6162
if (1 != sscanf(it->key().data(), "%ld", &block_id)) {
6263
LOG(WARNING, "Unknown key: %s\n", it->key().ToString().c_str());
6364
delete it;
64-
return false;
65+
*ret_val = -1;
66+
return;
6567
}
6668
BlockMeta meta;
6769
if (!meta.ParseFromArray(it->value().data(), it->value().size())) {
@@ -97,13 +99,15 @@ bool Disk::LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback)
9799
}
98100
delete it;
99101
int64_t end_load_time = common::timer::get_micros();
100-
LOG(INFO, "Disk %s Load %ld blocks, use %ld ms, namespace version: %ld",
101-
path_.c_str(), block_num, (end_load_time - start_load_time) / 1000, namespace_version_);
102+
LOG(INFO, "Disk %s Load %ld blocks, use %ld ms, namespace version: %ld, size %s",
103+
path_.c_str(), block_num, (end_load_time - start_load_time) / 1000,
104+
namespace_version_, common::HumanReadableString(counters_.data_size.Get()).c_str());
102105
if (namespace_version_ == 0 && block_num > 0) {
103106
LOG(WARNING, "Namespace version lost!");
104107
}
105108
quota_ += counters_.data_size.Get();
106-
return true;
109+
*ret_val = 1;
110+
return;
107111
}
108112

109113
std::string Disk::Path() const {

src/chunkserver/disk.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class Disk {
3636
Disk(const std::string& path, int64_t quota);
3737
~Disk();
3838
std::string Path() const;
39-
bool LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback);
39+
void LoadStorage(std::function<void (int64_t, Disk*, BlockMeta)> callback, int* ret_val);
4040
int64_t NamespaceVersion() const;
4141
bool SetNamespaceVersion(int64_t version);
4242
void Seek(int64_t block_id, std::vector<leveldb::Iterator*>* iters);

src/chunkserver/test/data_block_test.cc

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,12 @@ TEST_F(DataBlockTest, CreateBlock) {
2828
mkdir("./block123", 0755);
2929
std::string file_path("./block123");
3030
Disk disk(file_path, 1000000);
31-
disk.LoadStorage(std::bind(AddBlock, std::placeholders::_1,
32-
std::placeholders::_2, std::placeholders::_3));
31+
int ret_val;
32+
std::function<void (int64_t, Disk*, BlockMeta)> callback = std::bind(AddBlock,
33+
std::placeholders::_1,
34+
std::placeholders::_2,
35+
std::placeholders::_3);
36+
disk.LoadStorage(callback, &ret_val);
3337
BlockMeta meta;
3438
FileCache file_cache(10);
3539
int64_t block_id = 123;
@@ -51,8 +55,12 @@ TEST_F(DataBlockTest, WriteAndReadBlock) {
5155
mkdir("./block123", 0755);
5256
std::string file_path("./block123");
5357
Disk disk(file_path, 1000000);
54-
disk.LoadStorage(std::bind(AddBlock, std::placeholders::_1,
55-
std::placeholders::_2, std::placeholders::_3));
58+
int ret_val;
59+
std::function<void (int64_t, Disk*, BlockMeta)> callback = std::bind(AddBlock,
60+
std::placeholders::_1,
61+
std::placeholders::_2,
62+
std::placeholders::_3);
63+
disk.LoadStorage(callback, &ret_val);
5664
FileCache file_cache(10);
5765
int64_t block_id = 123;
5866
meta.set_block_id(block_id);

0 commit comments

Comments
 (0)