Skip to content

Commit 2bd493c

Browse files
authored
feat: implement location provider (#467)
1 parent 0f44ce2 commit 2bd493c

File tree

8 files changed

+383
-18
lines changed

8 files changed

+383
-18
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ set(ICEBERG_SOURCES
4040
file_writer.cc
4141
inheritable_metadata.cc
4242
json_internal.cc
43+
location_provider.cc
4344
manifest/manifest_adapter.cc
4445
manifest/manifest_entry.cc
4546
manifest/manifest_group.cc

src/iceberg/location_provider.cc

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/location_provider.h"
21+
22+
#include "iceberg/partition_spec.h"
23+
#include "iceberg/table_properties.h"
24+
#include "iceberg/util/location_util.h"
25+
#include "iceberg/util/murmurhash3_internal.h"
26+
27+
namespace iceberg {
28+
29+
namespace {
30+
31+
constexpr uint8_t kEntropyDirMask = 0x0F;
32+
constexpr uint8_t kRestDirMask = 0xFF;
33+
constexpr int32_t kHashBits = 20;
34+
constexpr int32_t kEntropyDirLength = 4;
35+
constexpr int32_t kEntropyDirDepth = 3;
36+
37+
std::string DataLocation(const TableProperties& properties, std::string_view location) {
38+
auto data_location = properties.Get(TableProperties::kWriteDataLocation);
39+
if (data_location.empty()) {
40+
data_location = std::format("{}/data", location);
41+
}
42+
return data_location;
43+
}
44+
45+
std::string PathContext(std::string_view location) {
46+
std::string_view path = LocationUtil::StripTrailingSlash(location);
47+
48+
size_t last_slash = path.find_last_of('/');
49+
if (last_slash != std::string_view::npos && last_slash < path.length() - 1) {
50+
std::string_view data_path = path.substr(last_slash + 1);
51+
std::string_view parent_path(path.data(), last_slash);
52+
size_t parent_last_slash = parent_path.find_last_of('/');
53+
54+
if (parent_last_slash != std::string::npos) {
55+
std::string_view parent_name = parent_path.substr(parent_last_slash + 1);
56+
return std::format("{}/{}", parent_name, data_path);
57+
} else {
58+
return std::format("{}/{}", parent_path, data_path);
59+
}
60+
}
61+
62+
return std::string(location);
63+
}
64+
65+
/// \brief Divides hash into directories for optimized orphan removal operation using
66+
/// kEntropyDirDepth and kEntropyDirLength.
67+
///
68+
/// If the low `kHashBits = 20` of `hash` is '10011001100110011001', then return
69+
/// '1001/1001/1001/10011001' with depth 3 and length 4.
70+
///
71+
/// \param hash The hash value to be divided.
72+
/// \return The path according to the `hash` value.
73+
std::string DirsFromHash(int32_t hash) {
74+
std::string hash_with_dirs;
75+
76+
for (int32_t i = 0; i < kEntropyDirDepth * kEntropyDirLength; i += kEntropyDirLength) {
77+
if (i > 0) {
78+
hash_with_dirs += "/";
79+
}
80+
uint8_t dir_bits = kEntropyDirMask & (hash >> (kHashBits - i - kEntropyDirLength));
81+
hash_with_dirs += std::format("{:04b}", dir_bits);
82+
}
83+
84+
hash_with_dirs += "/";
85+
uint8_t rest_bits = kRestDirMask & hash;
86+
hash_with_dirs += std::format("{:08b}", rest_bits);
87+
88+
return hash_with_dirs;
89+
}
90+
91+
std::string ComputeHash(std::string_view file_name) {
92+
int32_t hash_value = 0;
93+
MurmurHash3_x86_32(file_name.data(), file_name.size(), 0, &hash_value);
94+
return DirsFromHash(hash_value);
95+
}
96+
97+
} // namespace
98+
99+
// Default location provider for local file system.
100+
class DefaultLocationProvider : public LocationProvider {
101+
public:
102+
DefaultLocationProvider(std::string_view location, const TableProperties& properties);
103+
104+
std::string NewDataLocation(std::string_view filename) override;
105+
106+
Result<std::string> NewDataLocation(const PartitionSpec& spec,
107+
const PartitionValues& partition,
108+
std::string_view filename) override;
109+
110+
private:
111+
std::string data_location_;
112+
};
113+
114+
// Implementation of DefaultLocationProvider
115+
DefaultLocationProvider::DefaultLocationProvider(std::string_view location,
116+
const TableProperties& properties)
117+
: data_location_(
118+
LocationUtil::StripTrailingSlash(DataLocation(properties, location))) {}
119+
120+
std::string DefaultLocationProvider::NewDataLocation(std::string_view filename) {
121+
return std::format("{}/{}", data_location_, filename);
122+
}
123+
124+
Result<std::string> DefaultLocationProvider::NewDataLocation(
125+
const PartitionSpec& spec, const PartitionValues& partition,
126+
std::string_view filename) {
127+
ICEBERG_ASSIGN_OR_RAISE(auto partition_path, spec.PartitionPath(partition));
128+
return std::format("{}/{}/{}", data_location_, partition_path, filename);
129+
}
130+
131+
// Location provider for object stores.
132+
class ObjectStoreLocationProvider : public LocationProvider {
133+
public:
134+
ObjectStoreLocationProvider(std::string_view location,
135+
const TableProperties& properties);
136+
137+
std::string NewDataLocation(std::string_view filename) override;
138+
139+
Result<std::string> NewDataLocation(const PartitionSpec& spec,
140+
const PartitionValues& partition,
141+
std::string_view filename) override;
142+
143+
private:
144+
std::string storage_location_;
145+
std::string context_;
146+
bool include_partition_paths_;
147+
};
148+
149+
// Implementation of ObjectStoreLocationProvider
150+
ObjectStoreLocationProvider::ObjectStoreLocationProvider(
151+
std::string_view location, const TableProperties& properties)
152+
: include_partition_paths_(
153+
properties.Get(TableProperties::kWriteObjectStorePartitionedPaths)) {
154+
storage_location_ =
155+
LocationUtil::StripTrailingSlash(DataLocation(properties, location));
156+
157+
// If the storage location is within the table prefix, don't add table and database name
158+
// context
159+
if (!storage_location_.starts_with(location)) {
160+
context_ = PathContext(location);
161+
}
162+
}
163+
164+
std::string ObjectStoreLocationProvider::NewDataLocation(std::string_view filename) {
165+
std::string hash = ComputeHash(filename);
166+
167+
if (!context_.empty()) {
168+
return std::format("{}/{}/{}/{}", storage_location_, hash, context_, filename);
169+
} else {
170+
// If partition paths are included, add last part of entropy as dir before partition
171+
// names
172+
if (include_partition_paths_) {
173+
return std::format("{}/{}/{}", storage_location_, hash, filename);
174+
} else {
175+
// If partition paths are not included, append last part of entropy with `-` to file
176+
// name
177+
return std::format("{}/{}-{}", storage_location_, hash, filename);
178+
}
179+
}
180+
}
181+
182+
Result<std::string> ObjectStoreLocationProvider::NewDataLocation(
183+
const PartitionSpec& spec, const PartitionValues& partition,
184+
std::string_view filename) {
185+
if (include_partition_paths_) {
186+
ICEBERG_ASSIGN_OR_RAISE(auto partition_path, spec.PartitionPath(partition));
187+
return NewDataLocation(std::format("{}/{}", partition_path, filename));
188+
} else {
189+
return NewDataLocation(filename);
190+
}
191+
}
192+
193+
Result<std::unique_ptr<LocationProvider>> LocationProvider::Make(
194+
std::string_view location, const TableProperties& properties) {
195+
location = LocationUtil::StripTrailingSlash(location);
196+
197+
// TODO(xxx): create location provider specified by "write.location-provider.impl"
198+
199+
if (properties.Get(TableProperties::kObjectStoreEnabled)) {
200+
return std::make_unique<ObjectStoreLocationProvider>(location, properties);
201+
} else {
202+
return std::make_unique<DefaultLocationProvider>(location, properties);
203+
}
204+
}
205+
206+
} // namespace iceberg

src/iceberg/location_provider.h

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,38 +19,45 @@
1919

2020
#pragma once
2121

22+
#include <memory>
2223
#include <string>
24+
#include <string_view>
2325

2426
#include "iceberg/iceberg_export.h"
27+
#include "iceberg/result.h"
2528
#include "iceberg/type_fwd.h"
2629

2730
namespace iceberg {
2831

29-
/// \brief Interface for providing data file locations to write tasks.
32+
/// \brief Interface for providing data file locations.
3033
class ICEBERG_EXPORT LocationProvider {
3134
public:
3235
virtual ~LocationProvider() = default;
3336

3437
/// \brief Return a fully-qualified data file location for the given filename.
3538
///
36-
/// \param filename a file name
39+
/// \param filename file name to get location
3740
/// \return a fully-qualified location URI for a data file
38-
virtual std::string NewDataLocation(const std::string& filename) = 0;
41+
virtual std::string NewDataLocation(std::string_view filename) = 0;
3942

4043
/// \brief Return a fully-qualified data file location for the given partition and
4144
/// filename.
4245
///
43-
/// \param spec a partition spec
44-
/// \param partition_data a tuple of partition data for data in the file, matching the
45-
/// given spec
46-
/// \param filename a file name
46+
/// \param spec partition spec
47+
/// \param partition a tuple of partition values matching the given spec
48+
/// \param filename file name
4749
/// \return a fully-qualified location URI for a data file
50+
virtual Result<std::string> NewDataLocation(const PartitionSpec& spec,
51+
const PartitionValues& partition,
52+
std::string_view filename) = 0;
53+
54+
/// \brief Create a LocationProvider for the given table location and properties.
4855
///
49-
/// TODO(wgtmac): StructLike is not well thought yet, we may wrap an ArrowArray
50-
/// with single row in StructLike.
51-
virtual std::string NewDataLocation(const PartitionSpec& spec,
52-
const StructLike& partition_data,
53-
const std::string& filename) = 0;
56+
/// \param location table location
57+
/// \param properties table properties
58+
/// \return a LocationProvider instance
59+
static Result<std::unique_ptr<LocationProvider>> Make(
60+
std::string_view location, const TableProperties& properties);
5461
};
5562

5663
} // namespace iceberg

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ iceberg_sources = files(
6161
'file_writer.cc',
6262
'inheritable_metadata.cc',
6363
'json_internal.cc',
64+
'location_provider.cc',
6465
'manifest/manifest_adapter.cc',
6566
'manifest/manifest_entry.cc',
6667
'manifest/manifest_group.cc',

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ add_iceberg_test(schema_test
7373

7474
add_iceberg_test(table_test
7575
SOURCES
76+
location_provider_test.cc
7677
metrics_config_test.cc
7778
snapshot_test.cc
7879
snapshot_util_test.cc

0 commit comments

Comments
 (0)