Skip to content

Commit 306679d

Browse files
committed
feat(cpp): implement LRU cache for chunk management and update readers
1 parent d1fe7f9 commit 306679d

5 files changed

Lines changed: 400 additions & 44 deletions

File tree

cpp/src/graphar/arrow/chunk_reader.cc

Lines changed: 84 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
170170
seek_id_(0),
171171
schema_(nullptr),
172172
chunk_table_(nullptr),
173+
chunk_cache_(options.cache_capacity),
173174
filter_options_(options) {
174175
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
175176
GAR_ASSIGN_OR_RAISE_ERROR(auto pg_path_prefix,
@@ -194,6 +195,7 @@ VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
194195
seek_id_(0),
195196
schema_(nullptr),
196197
chunk_table_(nullptr),
198+
chunk_cache_(options.cache_capacity),
197199
filter_options_(options) {
198200
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
199201

@@ -211,9 +213,8 @@ Status VertexPropertyArrowChunkReader::seek(IdType id) {
211213
IdType pre_chunk_index = chunk_index_;
212214
chunk_index_ = id / vertex_info_->GetChunkSize();
213215
if (chunk_index_ != pre_chunk_index) {
214-
// TODO(@acezen): use a cache to avoid reloading the same chunk, could use
215-
// a LRU cache.
216-
chunk_table_.reset();
216+
auto* cached = chunk_cache_.Get(chunk_index_);
217+
chunk_table_ = cached ? *cached : nullptr;
217218
}
218219
if (chunk_index_ >= chunk_num_) {
219220
return Status::IndexError("Internal vertex id ", id, " is out of range [0,",
@@ -260,6 +261,7 @@ VertexPropertyArrowChunkReader::GetChunkV2() {
260261
GAR_RETURN_NOT_OK(
261262
CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
262263
}
264+
chunk_cache_.Put(chunk_index_, chunk_table_);
263265
}
264266
IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize();
265267
return chunk_table_->Slice(row_offset);
@@ -304,6 +306,7 @@ VertexPropertyArrowChunkReader::GetChunkV1() {
304306
GAR_RETURN_NOT_OK(
305307
CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
306308
}
309+
chunk_cache_.Put(chunk_index_, chunk_table_);
307310
}
308311
IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize();
309312
return chunk_table_->Slice(row_offset);
@@ -340,6 +343,7 @@ VertexPropertyArrowChunkReader::GetLabelChunk() {
340343
// GAR_RETURN_NOT_OK(
341344
// CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
342345
// }
346+
chunk_cache_.Put(chunk_index_, chunk_table_);
343347
}
344348
IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize();
345349
return chunk_table_->Slice(row_offset);
@@ -352,17 +356,22 @@ Status VertexPropertyArrowChunkReader::next_chunk() {
352356
vertex_info_->GetType(), " chunk num ", chunk_num_);
353357
}
354358
seek_id_ = chunk_index_ * vertex_info_->GetChunkSize();
355-
chunk_table_.reset();
359+
auto* cached = chunk_cache_.Get(chunk_index_);
360+
chunk_table_ = cached ? *cached : nullptr;
356361

357362
return Status::OK();
358363
}
359364

360365
void VertexPropertyArrowChunkReader::Filter(util::Filter filter) {
361366
filter_options_.filter = filter;
367+
chunk_table_ = nullptr;
368+
chunk_cache_.Clear();
362369
}
363370

364371
void VertexPropertyArrowChunkReader::Select(util::ColumnNames column_names) {
365372
filter_options_.columns = column_names;
373+
chunk_table_ = nullptr;
374+
chunk_cache_.Clear();
366375
}
367376

368377
Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
@@ -517,14 +526,15 @@ VertexPropertyArrowChunkReader::MakeForLabels(
517526

518527
AdjListArrowChunkReader::AdjListArrowChunkReader(
519528
const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
520-
const std::string& prefix)
529+
const std::string& prefix, size_t cache_capacity)
521530
: edge_info_(edge_info),
522531
adj_list_type_(adj_list_type),
523532
prefix_(prefix),
524533
vertex_chunk_index_(0),
525534
chunk_index_(0),
526535
seek_offset_(0),
527536
chunk_table_(nullptr),
537+
chunk_cache_(cache_capacity),
528538
chunk_num_(-1) /* -1 means uninitialized */ {
529539
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
530540
GAR_ASSIGN_OR_RAISE_ERROR(auto adj_list_path_prefix,
@@ -544,6 +554,7 @@ AdjListArrowChunkReader::AdjListArrowChunkReader(
544554
chunk_index_(other.chunk_index_),
545555
seek_offset_(other.seek_offset_),
546556
chunk_table_(nullptr),
557+
chunk_cache_(other.chunk_cache_.capacity()),
547558
vertex_chunk_num_(other.vertex_chunk_num_),
548559
chunk_num_(other.chunk_num_),
549560
base_dir_(other.base_dir_),
@@ -585,7 +596,9 @@ Status AdjListArrowChunkReader::seek_src(IdType id) {
585596
// initialize or update chunk_num_
586597
vertex_chunk_index_ = new_vertex_chunk_index;
587598
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
588-
chunk_table_.reset();
599+
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
600+
auto* cached = chunk_cache_.Get(key);
601+
chunk_table_ = cached ? *cached : nullptr;
589602
}
590603

591604
if (adj_list_type_ == AdjListType::unordered_by_source) {
@@ -618,7 +631,9 @@ Status AdjListArrowChunkReader::seek_dst(IdType id) {
618631
// initialize or update chunk_num_
619632
vertex_chunk_index_ = new_vertex_chunk_index;
620633
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
621-
chunk_table_.reset();
634+
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
635+
auto* cached = chunk_cache_.Get(key);
636+
chunk_table_ = cached ? *cached : nullptr;
622637
}
623638

624639
if (adj_list_type_ == AdjListType::unordered_by_dest) {
@@ -636,7 +651,9 @@ Status AdjListArrowChunkReader::seek(IdType offset) {
636651
IdType pre_chunk_index = chunk_index_;
637652
chunk_index_ = offset / edge_info_->GetChunkSize();
638653
if (chunk_index_ != pre_chunk_index) {
639-
chunk_table_.reset();
654+
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
655+
auto* cached = chunk_cache_.Get(key);
656+
chunk_table_ = cached ? *cached : nullptr;
640657
}
641658
if (chunk_num_ < 0) {
642659
// initialize chunk_num_
@@ -666,6 +683,8 @@ Result<std::shared_ptr<arrow::Table>> AdjListArrowChunkReader::GetChunk() {
666683
std::string path = prefix_ + chunk_file_path;
667684
auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
668685
GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
686+
chunk_cache_.Put(std::make_pair(vertex_chunk_index_, chunk_index_),
687+
chunk_table_);
669688
}
670689
IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize();
671690
return chunk_table_->Slice(row_offset);
@@ -688,21 +707,26 @@ Status AdjListArrowChunkReader::next_chunk() {
688707
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
689708
}
690709
seek_offset_ = chunk_index_ * edge_info_->GetChunkSize();
691-
chunk_table_.reset();
710+
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
711+
auto* cached = chunk_cache_.Get(key);
712+
chunk_table_ = cached ? *cached : nullptr;
692713
return Status::OK();
693714
}
694715

695716
Status AdjListArrowChunkReader::seek_chunk_index(IdType vertex_chunk_index,
696717
IdType chunk_index) {
718+
bool changed = false;
697719
if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) {
698720
vertex_chunk_index_ = vertex_chunk_index;
699721
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
700-
chunk_table_.reset();
722+
changed = true;
701723
}
702-
if (chunk_index_ != chunk_index) {
724+
if (chunk_index_ != chunk_index || changed) {
703725
chunk_index_ = chunk_index;
704726
seek_offset_ = chunk_index * edge_info_->GetChunkSize();
705-
chunk_table_.reset();
727+
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
728+
auto* cached = chunk_cache_.Get(key);
729+
chunk_table_ = cached ? *cached : nullptr;
706730
}
707731
return Status::OK();
708732
}
@@ -715,32 +739,35 @@ Result<IdType> AdjListArrowChunkReader::GetRowNumOfChunk() {
715739
std::string path = prefix_ + chunk_file_path;
716740
auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
717741
GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
742+
chunk_cache_.Put(std::make_pair(vertex_chunk_index_, chunk_index_),
743+
chunk_table_);
718744
}
719745
return chunk_table_->num_rows();
720746
}
721747

722748
Result<std::shared_ptr<AdjListArrowChunkReader>> AdjListArrowChunkReader::Make(
723749
const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
724-
const std::string& prefix) {
750+
const std::string& prefix, size_t cache_capacity) {
725751
if (!edge_info->HasAdjacentListType(adj_list_type)) {
726752
return Status::KeyError(
727753
"The adjacent list type ", AdjListTypeToString(adj_list_type),
728754
" doesn't exist in edge ", edge_info->GetEdgeType(), ".");
729755
}
730756
return std::make_shared<AdjListArrowChunkReader>(edge_info, adj_list_type,
731-
prefix);
757+
prefix, cache_capacity);
732758
}
733759

734760
Result<std::shared_ptr<AdjListArrowChunkReader>> AdjListArrowChunkReader::Make(
735761
const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_type,
736762
const std::string& edge_type, const std::string& dst_type,
737-
AdjListType adj_list_type) {
763+
AdjListType adj_list_type, size_t cache_capacity) {
738764
auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
739765
if (!edge_info) {
740766
return Status::KeyError("The edge ", src_type, " ", edge_type, " ",
741767
dst_type, " doesn't exist.");
742768
}
743-
return Make(edge_info, adj_list_type, graph_info->GetPrefix());
769+
return Make(edge_info, adj_list_type, graph_info->GetPrefix(),
770+
cache_capacity);
744771
}
745772

746773
Status AdjListArrowChunkReader::initOrUpdateEdgeChunkNum() {
@@ -752,13 +779,14 @@ Status AdjListArrowChunkReader::initOrUpdateEdgeChunkNum() {
752779

753780
AdjListOffsetArrowChunkReader::AdjListOffsetArrowChunkReader(
754781
const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
755-
const std::string& prefix)
782+
const std::string& prefix, size_t cache_capacity)
756783
: edge_info_(std::move(edge_info)),
757784
adj_list_type_(adj_list_type),
758785
prefix_(prefix),
759786
chunk_index_(0),
760787
seek_id_(0),
761-
chunk_table_(nullptr) {
788+
chunk_table_(nullptr),
789+
chunk_cache_(cache_capacity) {
762790
std::string base_dir;
763791
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
764792
GAR_ASSIGN_OR_RAISE_ERROR(auto dir_path,
@@ -785,7 +813,8 @@ Status AdjListOffsetArrowChunkReader::seek(IdType id) {
785813
IdType pre_chunk_index = chunk_index_;
786814
chunk_index_ = id / vertex_chunk_size_;
787815
if (chunk_index_ != pre_chunk_index) {
788-
chunk_table_.reset();
816+
auto* cached = chunk_cache_.Get(chunk_index_);
817+
chunk_table_ = cached ? *cached : nullptr;
789818
}
790819
if (chunk_index_ >= vertex_chunk_num_) {
791820
return Status::IndexError("Internal vertex id ", id, "is out of range [0,",
@@ -806,6 +835,7 @@ AdjListOffsetArrowChunkReader::GetChunk() {
806835
std::string path = prefix_ + chunk_file_path;
807836
auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
808837
GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
838+
chunk_cache_.Put(chunk_index_, chunk_table_);
809839
}
810840
IdType row_offset = seek_id_ - chunk_index_ * vertex_chunk_size_;
811841
return chunk_table_->Slice(row_offset)->column(0)->chunk(0);
@@ -820,35 +850,38 @@ Status AdjListOffsetArrowChunkReader::next_chunk() {
820850
AdjListTypeToString(adj_list_type_), ".");
821851
}
822852
seek_id_ = chunk_index_ * vertex_chunk_size_;
823-
chunk_table_.reset();
853+
auto* cached = chunk_cache_.Get(chunk_index_);
854+
chunk_table_ = cached ? *cached : nullptr;
824855

825856
return Status::OK();
826857
}
827858

828859
Result<std::shared_ptr<AdjListOffsetArrowChunkReader>>
829860
AdjListOffsetArrowChunkReader::Make(const std::shared_ptr<EdgeInfo>& edge_info,
830861
AdjListType adj_list_type,
831-
const std::string& prefix) {
862+
const std::string& prefix,
863+
size_t cache_capacity) {
832864
if (!edge_info->HasAdjacentListType(adj_list_type)) {
833865
return Status::KeyError(
834866
"The adjacent list type ", AdjListTypeToString(adj_list_type),
835867
" doesn't exist in edge ", edge_info->GetEdgeType(), ".");
836868
}
837-
return std::make_shared<AdjListOffsetArrowChunkReader>(edge_info,
838-
adj_list_type, prefix);
869+
return std::make_shared<AdjListOffsetArrowChunkReader>(
870+
edge_info, adj_list_type, prefix, cache_capacity);
839871
}
840872

841873
Result<std::shared_ptr<AdjListOffsetArrowChunkReader>>
842874
AdjListOffsetArrowChunkReader::Make(
843875
const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_type,
844876
const std::string& edge_type, const std::string& dst_type,
845-
AdjListType adj_list_type) {
877+
AdjListType adj_list_type, size_t cache_capacity) {
846878
auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
847879
if (!edge_info) {
848880
return Status::KeyError("The edge ", src_type, " ", edge_type, " ",
849881
dst_type, " doesn't exist.");
850882
}
851-
return Make(edge_info, adj_list_type, graph_info->GetPrefix());
883+
return Make(edge_info, adj_list_type, graph_info->GetPrefix(),
884+
cache_capacity);
852885
}
853886

854887
AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
@@ -865,6 +898,7 @@ AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
865898
seek_offset_(0),
866899
schema_(nullptr),
867900
chunk_table_(nullptr),
901+
chunk_cache_(options.cache_capacity),
868902
filter_options_(options),
869903
chunk_num_(-1) /* -1 means uninitialized */ {
870904
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
@@ -890,6 +924,7 @@ AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
890924
seek_offset_(other.seek_offset_),
891925
schema_(other.schema_),
892926
chunk_table_(nullptr),
927+
chunk_cache_(other.chunk_cache_.capacity()),
893928
filter_options_(other.filter_options_),
894929
vertex_chunk_num_(other.vertex_chunk_num_),
895930
chunk_num_(other.chunk_num_),
@@ -935,7 +970,9 @@ Status AdjListPropertyArrowChunkReader::seek_src(IdType id) {
935970
if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
936971
vertex_chunk_index_ = new_vertex_chunk_index;
937972
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
938-
chunk_table_.reset();
973+
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
974+
auto* cached = chunk_cache_.Get(key);
975+
chunk_table_ = cached ? *cached : nullptr;
939976
}
940977

941978
if (adj_list_type_ == AdjListType::unordered_by_source) {
@@ -967,7 +1004,9 @@ Status AdjListPropertyArrowChunkReader::seek_dst(IdType id) {
9671004
if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
9681005
vertex_chunk_index_ = new_vertex_chunk_index;
9691006
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
970-
chunk_table_.reset();
1007+
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
1008+
auto* cached = chunk_cache_.Get(key);
1009+
chunk_table_ = cached ? *cached : nullptr;
9711010
}
9721011

9731012
if (adj_list_type_ == AdjListType::unordered_by_dest) {
@@ -985,7 +1024,9 @@ Status AdjListPropertyArrowChunkReader::seek(IdType offset) {
9851024
seek_offset_ = offset;
9861025
chunk_index_ = offset / edge_info_->GetChunkSize();
9871026
if (chunk_index_ != pre_chunk_index) {
988-
chunk_table_.reset();
1027+
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
1028+
auto* cached = chunk_cache_.Get(key);
1029+
chunk_table_ = cached ? *cached : nullptr;
9891030
}
9901031
if (chunk_num_ < 0) {
9911032
// initialize chunk_num_
@@ -1024,6 +1065,8 @@ AdjListPropertyArrowChunkReader::GetChunk() {
10241065
GAR_RETURN_NOT_OK(
10251066
CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
10261067
}
1068+
chunk_cache_.Put(std::make_pair(vertex_chunk_index_, chunk_index_),
1069+
chunk_table_);
10271070
}
10281071
IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize();
10291072
return chunk_table_->Slice(row_offset);
@@ -1049,31 +1092,40 @@ Status AdjListPropertyArrowChunkReader::next_chunk() {
10491092
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
10501093
}
10511094
seek_offset_ = chunk_index_ * edge_info_->GetChunkSize();
1052-
chunk_table_.reset();
1095+
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
1096+
auto* cached = chunk_cache_.Get(key);
1097+
chunk_table_ = cached ? *cached : nullptr;
10531098
return Status::OK();
10541099
}
10551100

10561101
Status AdjListPropertyArrowChunkReader::seek_chunk_index(
10571102
IdType vertex_chunk_index, IdType chunk_index) {
1103+
bool changed = false;
10581104
if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) {
10591105
vertex_chunk_index_ = vertex_chunk_index;
10601106
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
1061-
chunk_table_.reset();
1107+
changed = true;
10621108
}
1063-
if (chunk_index_ != chunk_index) {
1109+
if (chunk_index_ != chunk_index || changed) {
10641110
chunk_index_ = chunk_index;
10651111
seek_offset_ = chunk_index * edge_info_->GetChunkSize();
1066-
chunk_table_.reset();
1112+
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
1113+
auto* cached = chunk_cache_.Get(key);
1114+
chunk_table_ = cached ? *cached : nullptr;
10671115
}
10681116
return Status::OK();
10691117
}
10701118

10711119
void AdjListPropertyArrowChunkReader::Filter(util::Filter filter) {
10721120
filter_options_.filter = filter;
1121+
chunk_table_ = nullptr;
1122+
chunk_cache_.Clear();
10731123
}
10741124

10751125
void AdjListPropertyArrowChunkReader::Select(util::ColumnNames column_names) {
10761126
filter_options_.columns = column_names;
1127+
chunk_table_ = nullptr;
1128+
chunk_cache_.Clear();
10771129
}
10781130

10791131
Result<std::shared_ptr<AdjListPropertyArrowChunkReader>>

0 commit comments

Comments
 (0)