Skip to content

Commit 1a2e4c3

Browse files
committed
refactor: separate writers into individual files to match Java organization
- Move DataWriter to data_writer.h/.cc - Move FileWriterFactory to file_writer_factory.h/.cc - Keep only FileWriter base interface in writer.h/.cc - Update includes and build files - All tests pass
1 parent ef13433 commit 1a2e4c3

8 files changed

Lines changed: 407 additions & 281 deletions

File tree

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ set(ICEBERG_INCLUDES "$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
2020
set(ICEBERG_SOURCES
2121
arrow_c_data_guard_internal.cc
2222
catalog/memory/in_memory_catalog.cc
23+
data/data_writer.cc
2324
data/equality_delete_writer.cc
25+
data/file_writer_factory.cc
2426
data/position_delete_writer.cc
2527
data/writer.cc
2628
delete_file_index.cc

src/iceberg/data/data_writer.cc

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/data/data_writer.h"
21+
22+
namespace iceberg {
23+
24+
//=============================================================================
25+
// DataWriter - stub implementation (to be completed in separate PR per #441)
26+
//=============================================================================
27+
28+
class DataWriter::Impl {
29+
public:
30+
explicit Impl(const DataWriterOptions& options) : options_(options) {}
31+
DataWriterOptions options_;
32+
bool is_closed_ = false;
33+
};
34+
35+
DataWriter::DataWriter(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
36+
DataWriter::~DataWriter() = default;
37+
38+
Status DataWriter::Write(ArrowArray* data) {
39+
if (!data) {
40+
return InvalidArgument("Cannot write null data");
41+
}
42+
if (impl_->is_closed_) {
43+
return Invalid("Writer is already closed");
44+
}
45+
return NotImplemented("DataWriter not yet implemented - see #441");
46+
}
47+
48+
Result<int64_t> DataWriter::Length() const {
49+
return NotImplemented("DataWriter not yet implemented - see #441");
50+
}
51+
52+
Status DataWriter::Close() {
53+
if (impl_->is_closed_) {
54+
return {}; // Close is idempotent
55+
}
56+
impl_->is_closed_ = true;
57+
return NotImplemented("DataWriter not yet implemented - see #441");
58+
}
59+
60+
Result<FileWriter::WriteResult> DataWriter::Metadata() {
61+
if (!impl_->is_closed_) {
62+
return Invalid("Writer must be closed before getting metadata");
63+
}
64+
return NotImplemented("DataWriter not yet implemented - see #441");
65+
}
66+
67+
// Internal factory function for FileWriterFactory
68+
std::unique_ptr<DataWriter> MakeDataWriterInternal(const DataWriterOptions& options) {
69+
auto impl = std::make_unique<DataWriter::Impl>(options);
70+
return std::unique_ptr<DataWriter>(new DataWriter(std::move(impl)));
71+
}
72+
73+
} // namespace iceberg

src/iceberg/data/data_writer.h

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/data/data_writer.h
23+
/// Data writer for Iceberg tables.
24+
25+
#include <cstdint>
26+
#include <memory>
27+
#include <optional>
28+
#include <string>
29+
30+
#include "iceberg/arrow_c_data.h"
31+
#include "iceberg/data/writer.h"
32+
#include "iceberg/file_format.h"
33+
#include "iceberg/iceberg_export.h"
34+
#include "iceberg/result.h"
35+
#include "iceberg/row/partition_values.h"
36+
#include "iceberg/type_fwd.h"
37+
38+
namespace iceberg {
39+
40+
/// \brief Options for creating a DataWriter.
41+
///
42+
/// \note The following features from Java DataWriter are not yet supported:
43+
/// - Encryption key metadata (uses FileIO instead of EncryptedOutputFile)
44+
/// - Metrics collection and reporting
45+
/// - Split offsets tracking
46+
struct ICEBERG_EXPORT DataWriterOptions {
47+
std::string path;
48+
std::shared_ptr<Schema> schema;
49+
std::shared_ptr<PartitionSpec> spec;
50+
PartitionValues partition;
51+
FileFormatType format = FileFormatType::kParquet;
52+
std::shared_ptr<FileIO> io;
53+
std::optional<int32_t> sort_order_id;
54+
std::shared_ptr<class WriterProperties> properties;
55+
};
56+
57+
/// \brief Writer for Iceberg data files.
58+
///
59+
/// \warning Thread Safety: Writer instances are NOT thread-safe. Each writer should only
60+
/// be used by a single thread. Do not call Write(), Close(), or Metadata() concurrently.
61+
class ICEBERG_EXPORT DataWriter : public FileWriter {
62+
public:
63+
~DataWriter() override;
64+
65+
Status Write(ArrowArray* data) override;
66+
Result<int64_t> Length() const override;
67+
Status Close() override;
68+
Result<WriteResult> Metadata() override;
69+
70+
private:
71+
friend class FileWriterFactory;
72+
friend std::unique_ptr<DataWriter> MakeDataWriterInternal(const DataWriterOptions&);
73+
class Impl;
74+
std::unique_ptr<Impl> impl_;
75+
explicit DataWriter(std::unique_ptr<Impl> impl);
76+
};
77+
78+
} // namespace iceberg
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/data/file_writer_factory.h"
21+
22+
#include "iceberg/data/data_writer.h"
23+
#include "iceberg/data/equality_delete_writer.h"
24+
#include "iceberg/data/position_delete_writer.h"
25+
26+
namespace iceberg {
27+
28+
// Forward declarations for internal factory functions
29+
std::unique_ptr<DataWriter> MakeDataWriterInternal(const DataWriterOptions& options);
30+
std::unique_ptr<PositionDeleteWriter> MakePositionDeleteWriterInternal(
31+
const PositionDeleteWriterOptions& options);
32+
std::unique_ptr<EqualityDeleteWriter> MakeEqualityDeleteWriterInternal(
33+
const EqualityDeleteWriterOptions& options);
34+
35+
//=============================================================================
36+
// FileWriterFactory::Impl
37+
//=============================================================================
38+
39+
class FileWriterFactory::Impl {
40+
public:
41+
Impl(std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec,
42+
std::shared_ptr<FileIO> io, std::shared_ptr<WriterProperties> properties)
43+
: schema_(std::move(schema)),
44+
spec_(std::move(spec)),
45+
io_(std::move(io)),
46+
properties_(std::move(properties)) {}
47+
48+
std::shared_ptr<Schema> schema_;
49+
std::shared_ptr<PartitionSpec> spec_;
50+
std::shared_ptr<FileIO> io_;
51+
std::shared_ptr<WriterProperties> properties_;
52+
53+
std::shared_ptr<Schema> eq_delete_schema_;
54+
std::vector<int32_t> equality_field_ids_;
55+
std::shared_ptr<Schema> pos_delete_row_schema_;
56+
};
57+
58+
//=============================================================================
59+
// FileWriterFactory
60+
//=============================================================================
61+
62+
FileWriterFactory::FileWriterFactory(std::shared_ptr<Schema> schema,
63+
std::shared_ptr<PartitionSpec> spec,
64+
std::shared_ptr<FileIO> io,
65+
std::shared_ptr<WriterProperties> properties)
66+
: impl_(std::make_unique<Impl>(std::move(schema), std::move(spec), std::move(io),
67+
std::move(properties))) {}
68+
69+
FileWriterFactory::~FileWriterFactory() = default;
70+
71+
void FileWriterFactory::SetEqualityDeleteConfig(std::shared_ptr<Schema> eq_delete_schema,
72+
std::vector<int32_t> equality_field_ids) {
73+
impl_->eq_delete_schema_ = std::move(eq_delete_schema);
74+
impl_->equality_field_ids_ = std::move(equality_field_ids);
75+
}
76+
77+
void FileWriterFactory::SetPositionDeleteRowSchema(
78+
std::shared_ptr<Schema> pos_delete_row_schema) {
79+
impl_->pos_delete_row_schema_ = std::move(pos_delete_row_schema);
80+
}
81+
82+
Result<std::unique_ptr<DataWriter>> FileWriterFactory::NewDataWriter(
83+
std::string path, FileFormatType format, PartitionValues partition,
84+
std::optional<int32_t> sort_order_id) {
85+
// Input validation
86+
if (path.empty()) {
87+
return InvalidArgument("Path cannot be empty");
88+
}
89+
if (!impl_->schema_) {
90+
return InvalidArgument("Schema cannot be null");
91+
}
92+
if (!impl_->spec_) {
93+
return InvalidArgument("PartitionSpec cannot be null");
94+
}
95+
96+
DataWriterOptions options;
97+
options.path = std::move(path);
98+
options.schema = impl_->schema_;
99+
options.spec = impl_->spec_;
100+
options.partition = std::move(partition);
101+
options.format = format;
102+
options.io = impl_->io_;
103+
options.sort_order_id = sort_order_id;
104+
options.properties = impl_->properties_;
105+
106+
return MakeDataWriterInternal(options);
107+
}
108+
109+
Result<std::unique_ptr<PositionDeleteWriter>> FileWriterFactory::NewPositionDeleteWriter(
110+
std::string path, FileFormatType format, PartitionValues partition) {
111+
// Input validation
112+
if (path.empty()) {
113+
return InvalidArgument("Path cannot be empty");
114+
}
115+
if (!impl_->schema_) {
116+
return InvalidArgument("Schema cannot be null");
117+
}
118+
if (!impl_->spec_) {
119+
return InvalidArgument("PartitionSpec cannot be null");
120+
}
121+
122+
PositionDeleteWriterOptions options;
123+
options.path = std::move(path);
124+
options.schema = impl_->schema_;
125+
options.spec = impl_->spec_;
126+
options.partition = std::move(partition);
127+
options.format = format;
128+
options.io = impl_->io_;
129+
options.row_schema = impl_->pos_delete_row_schema_;
130+
options.properties = impl_->properties_;
131+
132+
return MakePositionDeleteWriterInternal(options);
133+
}
134+
135+
Result<std::unique_ptr<EqualityDeleteWriter>> FileWriterFactory::NewEqualityDeleteWriter(
136+
std::string path, FileFormatType format, PartitionValues partition,
137+
std::optional<int32_t> sort_order_id) {
138+
// Input validation
139+
if (path.empty()) {
140+
return InvalidArgument("Path cannot be empty");
141+
}
142+
if (!impl_->schema_) {
143+
return InvalidArgument("Schema cannot be null");
144+
}
145+
if (!impl_->spec_) {
146+
return InvalidArgument("PartitionSpec cannot be null");
147+
}
148+
if (impl_->equality_field_ids_.empty()) {
149+
return InvalidArgument(
150+
"Equality field IDs cannot be empty - call SetEqualityDeleteConfig first");
151+
}
152+
153+
EqualityDeleteWriterOptions options;
154+
options.path = std::move(path);
155+
options.schema = impl_->eq_delete_schema_ ? impl_->eq_delete_schema_ : impl_->schema_;
156+
options.spec = impl_->spec_;
157+
options.partition = std::move(partition);
158+
options.format = format;
159+
options.io = impl_->io_;
160+
options.equality_field_ids = impl_->equality_field_ids_;
161+
options.sort_order_id = sort_order_id;
162+
options.properties = impl_->properties_;
163+
164+
return MakeEqualityDeleteWriterInternal(options);
165+
}
166+
167+
} // namespace iceberg

0 commit comments

Comments
 (0)