-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Expand file tree
/
Copy pathfile_handle_cache.cpp
More file actions
217 lines (184 loc) · 7.57 KB
/
Copy pathfile_handle_cache.cpp
File metadata and controls
217 lines (184 loc) · 7.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// This file is copied from
// https://github.com/apache/impala/blob/master/be/src/runtime/io/handle-cache.inline.h
// and modified by Doris
#include "io/fs/file_handle_cache.h"
#include <cstdint>
#include <thread>
#include <tuple>
#include "common/cast_set.h"
#include "io/fs/err_utils.h"
#include "util/hash_util.hpp"
#include "util/time.h"
namespace doris::io {
HdfsFileHandle::~HdfsFileHandle() {
if (_hdfs_file != nullptr && _fs != nullptr) {
VLOG_FILE << "hdfsCloseFile() fid=" << _hdfs_file;
hdfsCloseFile(_fs, _hdfs_file); // TODO: check return code
}
_fs = nullptr;
_hdfs_file = nullptr;
}
Status HdfsFileHandle::init(int64_t file_size) {
_hdfs_file = hdfsOpenFile(_fs, _fname.c_str(), O_RDONLY, 0, 0, 0);
if (_hdfs_file == nullptr) {
std::string _err_msg = hdfs_error();
// invoker maybe just skip Status.NotFound and continue
// so we need distinguish between it and other kinds of errors
if (_err_msg.find("No such file or directory") != std::string::npos) {
return Status::NotFound(_err_msg);
}
return Status::InternalError("failed to open {}: {}", _fname, _err_msg);
}
_file_size = file_size;
if (_file_size <= 0) {
hdfsFileInfo* file_info = hdfsGetPathInfo(_fs, _fname.c_str());
if (file_info == nullptr) {
return Status::InternalError("failed to get file size of {}: {}", _fname, hdfs_error());
}
_file_size = file_info->mSize;
hdfsFreeFileInfo(file_info, 1);
}
return Status::OK();
}
CachedHdfsFileHandle::CachedHdfsFileHandle(const hdfsFS& fs, const std::string& fname,
int64_t mtime)
: HdfsFileHandle(fs, fname, mtime) {}
CachedHdfsFileHandle::~CachedHdfsFileHandle() {}
FileHandleCache::Accessor::Accessor() : _cache_accessor() {}
FileHandleCache::Accessor::Accessor(FileHandleCachePartition::CacheType::Accessor&& cache_accessor)
: _cache_accessor(std::move(cache_accessor)) {}
void FileHandleCache::Accessor::set(
FileHandleCachePartition::CacheType::Accessor&& cache_accessor) {
_cache_accessor = std::move(cache_accessor);
}
CachedHdfsFileHandle* FileHandleCache::Accessor::get() {
return _cache_accessor.get();
}
void FileHandleCache::Accessor::release() {
if (_cache_accessor.get()) {
_cache_accessor.release();
}
}
void FileHandleCache::Accessor::destroy() {
if (_cache_accessor.get()) {
_cache_accessor.destroy();
}
}
FileHandleCache::Accessor::~Accessor() {
if (_cache_accessor.get()) {
#ifdef USE_HADOOP_HDFS
if (hdfsUnbufferFile(get()->file()) != 0) {
VLOG_FILE << "FS does not support file handle unbuffering, closing file="
<< _cache_accessor.get_key()->second.first;
destroy();
} else {
// Calling explicit release to handle metrics
release();
}
#else
destroy();
#endif
}
}
FileHandleCache::FileHandleCache(size_t capacity, size_t num_partitions,
uint64_t unused_handle_timeout_secs)
: _cache_partitions(num_partitions),
_unused_handle_timeout_secs(unused_handle_timeout_secs) {
DCHECK_GT(num_partitions, 0);
size_t remainder = capacity % num_partitions;
size_t base_capacity = capacity / num_partitions;
size_t partition_capacity = (remainder > 0 ? base_capacity + 1 : base_capacity);
for (FileHandleCachePartition& p : _cache_partitions) {
p.cache.set_capacity(partition_capacity);
}
Status st = init();
if (!st) {
LOG(FATAL) << "failed to start file handle cache thread: " << st.to_string();
}
}
FileHandleCache::~FileHandleCache() {
_is_shut_down.store(true);
if (_eviction_thread != nullptr) {
_eviction_thread->join();
}
}
Status FileHandleCache::init() {
return Thread::create("file-handle-cache", "File Handle Timeout",
&FileHandleCache::_evict_handles_loop, this, &_eviction_thread);
}
Status FileHandleCache::get_file_handle(const hdfsFS& fs, const std::string& fname, int64_t mtime,
int64_t file_size, bool require_new_handle,
FileHandleCache::Accessor* accessor, bool* cache_hit) {
DCHECK_GE(mtime, 0);
// Hash the key and get appropriate partition
uintptr_t fs_identity = reinterpret_cast<uintptr_t>(fs);
uint32_t seed = HashUtil::hash(&fs_identity, sizeof(fs_identity), 0);
int index = HashUtil::hash(fname.data(), cast_set<int>(fname.size()), seed) %
_cache_partitions.size();
FileHandleCachePartition& p = _cache_partitions[index];
auto cache_key = make_cache_key(fs, fname, mtime);
// If this requires a new handle, skip to the creation codepath. Otherwise,
// find an unused entry with the same mtime
if (!require_new_handle) {
auto cache_accessor = p.cache.get(cache_key);
if (cache_accessor.get()) {
// Found a handler in cache and reserved it
*cache_hit = true;
accessor->set(std::move(cache_accessor));
return Status::OK();
}
}
// There was no entry that was free or caller asked for a new handle
*cache_hit = false;
// Emplace a new file handle and get access
auto accessor_tmp = p.cache.emplace_and_get(cache_key, fs, fname, mtime);
// Opening a file handle requires talking to the NameNode so it can take some time.
Status status = accessor_tmp.get()->init(file_size);
if (UNLIKELY(!status.ok())) {
// Removing the handler from the cache after failed initialization.
accessor_tmp.destroy();
return status;
}
// Moving the cache accessor to the in/out parameter
accessor->set(std::move(accessor_tmp));
return Status::OK();
}
#ifdef BE_TEST
bool FileHandleCache::same_cache_key_for_test(const hdfsFS& lhs_fs, const std::string& lhs_fname,
int64_t lhs_mtime, const hdfsFS& rhs_fs,
const std::string& rhs_fname, int64_t rhs_mtime) {
return make_cache_key(lhs_fs, lhs_fname, lhs_mtime) ==
make_cache_key(rhs_fs, rhs_fname, rhs_mtime);
}
#endif
void FileHandleCache::_evict_handles_loop() {
while (!_is_shut_down.load()) {
if (_unused_handle_timeout_secs) {
for (FileHandleCachePartition& p : _cache_partitions) {
uint64_t now = MonotonicSeconds();
uint64_t oldest_allowed_timestamp =
now > _unused_handle_timeout_secs ? now - _unused_handle_timeout_secs : 0;
p.cache.evict_older_than(oldest_allowed_timestamp);
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
} // namespace doris::io