-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Expand file tree
/
Copy pathsegment_loader.h
More file actions
200 lines (166 loc) · 7.19 KB
/
segment_loader.h
File metadata and controls
200 lines (166 loc) · 7.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <butil/macros.h>
#include <glog/logging.h>
#include <stddef.h>
#include <stdint.h>
#include <atomic>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
#include "common/cast_set.h"
#include "common/status.h"
#include "cpp/lru_cache.h"
#include "runtime/memory/lru_cache_policy.h"
#include "storage/olap_common.h" // for rowset id
#include "storage/segment/segment.h"
#include "util/time.h"
namespace doris {
class SegmentCacheHandle;
class BetaRowset;
// SegmentLoader is used to load the Segment of BetaRowset.
// An LRUCache is encapsulated inside it, which is used to cache the opened segments.
// The caller should use the following method to load and obtain
// the segments of a specified rowset:
//
// SegmentCacheHandle cache_handle;
// RETURN_IF_ERROR(SegmentCache::instance()->load_segments(_rowset, &cache_handle));
// for (auto& seg_ptr : cache_handle.value()->segments) {
// ... visit segment ...
// }
//
// Make sure that cache_handle is valid during the segment usage period.
using BetaRowsetSharedPtr = std::shared_ptr<BetaRowset>;
class SegmentCache : public LRUCachePolicy {
public:
using LRUCachePolicy::insert;
// The cache key or segment lru cache
struct CacheKey {
CacheKey(RowsetId rowset_id_, int64_t segment_id_)
: rowset_id(rowset_id_), segment_id(segment_id_) {}
RowsetId rowset_id;
int64_t segment_id;
// Encode to a flat binary which can be used as LRUCache's key
[[nodiscard]] std::string encode() const {
return rowset_id.to_string() + std::to_string(segment_id);
}
};
// The cache value of segment lru cache.
// Holding all opened segments of a rowset.
class CacheValue : public LRUCacheValueBase {
public:
CacheValue(segment_v2::SegmentSharedPtr segment_) : segment(std::move(segment_)) {}
const segment_v2::SegmentSharedPtr segment;
};
SegmentCache(size_t memory_bytes_limit, size_t segment_num_limit)
: LRUCachePolicy(CachePolicy::CacheType::SEGMENT_CACHE, memory_bytes_limit,
LRUCacheType::SIZE, config::tablet_rowset_stale_sweep_time_sec,
/*num shards*/ 64,
/*element count capacity */ cast_set<uint32_t>(segment_num_limit),
config::enable_segment_cache_prune, /*is lru-k*/ true) {}
// Lookup the given segment in the cache.
// If the segment is found, the cache entry will be written into handle.
// Return true if entry is found, otherwise return false.
bool lookup(const SegmentCache::CacheKey& key, SegmentCacheHandle* handle);
// Insert a cache entry by key.
// And the cache entry will be returned in handle.
// This function is thread-safe.
void insert(const SegmentCache::CacheKey& key, CacheValue& value, SegmentCacheHandle* handle);
void erase(const SegmentCache::CacheKey& key);
};
class SegmentLoader {
public:
static SegmentLoader* instance();
// Create global instance of this class.
// "capacity" is the capacity of lru cache.
// TODO: Currently we use the number of rowset as the cache capacity.
// That is, the limit of cache is the number of rowset.
// This is because currently we cannot accurately estimate the memory occupied by a segment.
// After the estimation of segment memory usage is provided later, it is recommended
// to use Memory as the capacity limit of the cache.
SegmentLoader(size_t memory_limit_bytes, size_t segment_num_count) {
_segment_cache = std::make_unique<SegmentCache>(memory_limit_bytes, segment_num_count);
}
// Load segments of "rowset", return the "cache_handle" which contains segments.
// If use_cache is true, it will be loaded from _cache.
Status load_segments(const BetaRowsetSharedPtr& rowset, SegmentCacheHandle* cache_handle,
bool use_cache = false, bool need_load_pk_index_and_bf = false,
OlapReaderStatistics* index_load_stats = nullptr);
// Load one segment of "rowset", return the "cache_handle" which contains segments.
// If use_cache is true, it will be loaded from _cache.
Status load_segment(const BetaRowsetSharedPtr& rowset, int64_t segment_id,
SegmentCacheHandle* cache_handle, bool use_cache = false,
bool need_load_pk_index_and_bf = false,
OlapReaderStatistics* index_load_stats = nullptr);
void erase_segment(const SegmentCache::CacheKey& key);
void erase_segments(const RowsetId& rowset_id, int64_t num_segments);
int64_t cache_mem_usage() const {
#ifdef BE_TEST
return _cache_mem_usage;
#else
return _segment_cache->value_mem_consumption();
#endif
}
private:
SegmentLoader();
std::unique_ptr<SegmentCache> _segment_cache;
// Just used for BE UT
int64_t _cache_mem_usage = 0;
};
// A handle for a single rowset from segment lru cache.
// The handle can ensure that the segment is valid
// and will not be closed while the holder of the handle is accessing the segment.
// The handle will automatically release the cache entry when it is destroyed.
// So the caller need to make sure the handle is valid in lifecycle.
class SegmentCacheHandle {
public:
SegmentCacheHandle() = default;
~SegmentCacheHandle() = default;
void push_segment(LRUCachePolicy* cache, Cache::Handle* handle) {
segments.push_back(((SegmentCache::CacheValue*)cache->value(handle))->segment);
cache->release(handle);
}
void push_segment(segment_v2::SegmentSharedPtr segment) {
segments.push_back(std::move(segment));
}
std::vector<segment_v2::SegmentSharedPtr>& get_segments() { return segments; }
[[nodiscard]] bool is_inited() const { return _init; }
void set_inited() {
DCHECK(!_init);
_init = true;
}
segment_v2::SegmentSharedPtr pop_unhealthy_segment() {
if (segments.empty()) {
return nullptr;
}
segment_v2::SegmentSharedPtr last_segment = segments.back();
if (last_segment->healthy_status().ok()) {
return nullptr;
}
segments.pop_back();
return last_segment;
}
private:
std::vector<segment_v2::SegmentSharedPtr> segments;
bool _init {false};
// Don't allow copy and assign
DISALLOW_COPY_AND_ASSIGN(SegmentCacheHandle);
};
} // namespace doris