Skip to content

Commit 830f6fd

Browse files
committed
feat: add partitioning, schema validation, catalog properties, and comprehensive tests
- Partition new tables by event_date (day granularity) for efficient time-range queries and data lifecycle management - Validate schema on startup when loading existing tables, failing fast if required columns are missing - Support arbitrary catalog properties via nested <CatalogProperties> elements for S3 credentials, REST auth, etc. - Document catalogImpl short names (hadoop, hive, rest) in Javadoc - Add IcebergAppenderConfigTest exercising full log4j XML config pipeline (plugin discovery, LogManager.getLogger(), context stop flush) - Add IcebergConcurrencyTest with multi-threaded writer stress tests - Add IcebergScheduledFlushTest verifying timer-based flush fires - Add schema validation and partition spec tests to IcebergManagerTest
1 parent ce018d5 commit 830f6fd

7 files changed

Lines changed: 688 additions & 9 deletions

File tree

log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergAppender.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.logging.log4j.iceberg;
1818

19+
import java.util.HashMap;
20+
import java.util.Map;
1921
import java.util.concurrent.TimeUnit;
2022
import org.apache.logging.log4j.core.Appender;
2123
import org.apache.logging.log4j.core.Core;
@@ -26,24 +28,39 @@
2628
import org.apache.logging.log4j.core.config.plugins.Plugin;
2729
import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
2830
import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
31+
import org.apache.logging.log4j.core.config.plugins.PluginElement;
2932
import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
3033

3134
/**
3235
* Log4j appender that writes log events as rows in an Apache Iceberg table
3336
* backed by Parquet data files.
3437
*
38+
* <p>The table is partitioned by {@code event_date} (day granularity) for efficient
39+
* time-range queries and data lifecycle management.</p>
40+
*
41+
* <p>The {@code catalogImpl} attribute accepts either a short type name
42+
* ({@code "hadoop"}, {@code "hive"}, {@code "rest"}) or a fully qualified class name
43+
* (e.g. {@code "org.apache.iceberg.rest.RESTCatalog"}).</p>
44+
*
45+
* <p>Additional catalog properties (e.g. S3 credentials, REST auth headers) can be
46+
* passed via nested {@code <Property>} elements under a {@code <CatalogProperties>}
47+
* wrapper.</p>
48+
*
3549
* <p>Configuration example:</p>
3650
* <pre>
3751
* &lt;Iceberg name="IcebergAppender"
3852
* catalogName="my_catalog"
39-
* catalogImpl="org.apache.iceberg.rest.RESTCatalog"
53+
* catalogImpl="rest"
4054
* catalogUri="http://localhost:8181"
4155
* catalogWarehouse="s3://my-bucket/warehouse"
4256
* tableNamespace="logs"
4357
* tableName="app_logs"
4458
* batchSize="1000"
4559
* flushIntervalSeconds="30"&gt;
46-
* &lt;PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n"/&gt;
60+
* &lt;CatalogProperties&gt;
61+
* &lt;Property name="s3.access-key-id"&gt;AKIA...&lt;/Property&gt;
62+
* &lt;Property name="s3.secret-access-key"&gt;secret&lt;/Property&gt;
63+
* &lt;/CatalogProperties&gt;
4764
* &lt;/Iceberg&gt;
4865
* </pre>
4966
*/
@@ -94,7 +111,7 @@ public static class Builder<B extends Builder<B>> extends AbstractAppender.Build
94111
private String catalogName = "log4j";
95112

96113
@PluginBuilderAttribute
97-
@Required(message = "No catalog implementation class provided")
114+
@Required(message = "No catalog implementation provided (use 'hadoop', 'hive', 'rest', or a fully qualified class name)")
98115
private String catalogImpl;
99116

100117
@PluginBuilderAttribute
@@ -116,6 +133,9 @@ public static class Builder<B extends Builder<B>> extends AbstractAppender.Build
116133
@PluginBuilderAttribute
117134
private int flushIntervalSeconds = 30;
118135

136+
@PluginElement("CatalogProperties")
137+
private Property[] catalogProperties;
138+
119139
public B setCatalogName(final String catalogName) {
120140
this.catalogName = catalogName;
121141
return asBuilder();
@@ -156,8 +176,19 @@ public B setFlushIntervalSeconds(final int flushIntervalSeconds) {
156176
return asBuilder();
157177
}
158178

179+
public B setCatalogProperties(final Property[] catalogProperties) {
180+
this.catalogProperties = catalogProperties;
181+
return asBuilder();
182+
}
183+
159184
@Override
160185
public IcebergAppender build() {
186+
final Map<String, String> extraProperties = new HashMap<>();
187+
if (catalogProperties != null) {
188+
for (final Property prop : catalogProperties) {
189+
extraProperties.put(prop.getName(), prop.getValue());
190+
}
191+
}
161192
final IcebergManager manager = new IcebergManager(
162193
getName(),
163194
catalogName,
@@ -167,7 +198,8 @@ public IcebergAppender build() {
167198
tableNamespace,
168199
tableName,
169200
batchSize,
170-
flushIntervalSeconds);
201+
flushIntervalSeconds,
202+
extraProperties);
171203
return new IcebergAppender(getName(), getFilter(), isIgnoreExceptions(), null, manager);
172204
}
173205
}

log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergManager.java

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020
import java.time.Instant;
2121
import java.time.ZoneOffset;
2222
import java.util.ArrayList;
23+
import java.util.Collections;
2324
import java.util.HashMap;
25+
import java.util.HashSet;
2426
import java.util.List;
2527
import java.util.Map;
28+
import java.util.Set;
2629
import java.util.concurrent.Executors;
2730
import java.util.concurrent.ScheduledExecutorService;
2831
import java.util.concurrent.ScheduledFuture;
@@ -32,6 +35,7 @@
3235
import org.apache.iceberg.AppendFiles;
3336
import org.apache.iceberg.CatalogUtil;
3437
import org.apache.iceberg.DataFile;
38+
import org.apache.iceberg.PartitionSpec;
3539
import org.apache.iceberg.Schema;
3640
import org.apache.iceberg.Table;
3741
import org.apache.iceberg.catalog.Catalog;
@@ -65,6 +69,10 @@ class IcebergManager {
6569
Types.NestedField.optional(6, "thrown", Types.StringType.get()),
6670
Types.NestedField.required(7, "event_date", Types.DateType.get()));
6771

72+
static final PartitionSpec PARTITION_SPEC = PartitionSpec.builderFor(LOG_SCHEMA)
73+
.day("event_date")
74+
.build();
75+
6876
private final String name;
6977
private final String catalogName;
7078
private final String catalogImpl;
@@ -74,6 +82,7 @@ class IcebergManager {
7482
private final String tableName;
7583
private final int batchSize;
7684
private final int flushIntervalSeconds;
85+
private final Map<String, String> extraCatalogProperties;
7786

7887
private final ReentrantLock lock = new ReentrantLock();
7988
private List<LogEvent> buffer;
@@ -93,7 +102,8 @@ class IcebergManager {
93102
final String tableNamespace,
94103
final String tableName,
95104
final int batchSize,
96-
final int flushIntervalSeconds) {
105+
final int flushIntervalSeconds,
106+
final Map<String, String> extraCatalogProperties) {
97107
this.name = name;
98108
this.catalogName = catalogName;
99109
this.catalogImpl = catalogImpl;
@@ -103,9 +113,26 @@ class IcebergManager {
103113
this.tableName = tableName;
104114
this.batchSize = batchSize;
105115
this.flushIntervalSeconds = flushIntervalSeconds;
116+
this.extraCatalogProperties = extraCatalogProperties != null
117+
? Collections.unmodifiableMap(new HashMap<>(extraCatalogProperties))
118+
: Collections.emptyMap();
106119
this.buffer = new ArrayList<>(batchSize);
107120
}
108121

122+
IcebergManager(
123+
final String name,
124+
final String catalogName,
125+
final String catalogImpl,
126+
final String catalogUri,
127+
final String catalogWarehouse,
128+
final String tableNamespace,
129+
final String tableName,
130+
final int batchSize,
131+
final int flushIntervalSeconds) {
132+
this(name, catalogName, catalogImpl, catalogUri, catalogWarehouse,
133+
tableNamespace, tableName, batchSize, flushIntervalSeconds, null);
134+
}
135+
109136
void startup() {
110137
if (running) {
111138
return;
@@ -120,6 +147,7 @@ void startup() {
120147
if (catalogWarehouse != null) {
121148
catalogProperties.put("warehouse", catalogWarehouse);
122149
}
150+
catalogProperties.putAll(extraCatalogProperties);
123151

124152
catalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProperties, new Configuration());
125153
final TableIdentifier tableId = TableIdentifier.of(Namespace.of(tableNamespace), tableName);
@@ -129,10 +157,11 @@ void startup() {
129157
}
130158

131159
if (!catalog.tableExists(tableId)) {
132-
table = catalog.createTable(tableId, LOG_SCHEMA);
133-
LOGGER.info("Created Iceberg table {}", tableId);
160+
table = catalog.createTable(tableId, LOG_SCHEMA, PARTITION_SPEC);
161+
LOGGER.info("Created Iceberg table {} partitioned by event_date", tableId);
134162
} else {
135163
table = catalog.loadTable(tableId);
164+
validateSchema(table.schema());
136165
LOGGER.info("Loaded existing Iceberg table {}", tableId);
137166
}
138167

@@ -147,6 +176,25 @@ void startup() {
147176
running = true;
148177
}
149178

179+
void validateSchema(final Schema existingSchema) {
180+
final Set<String> required = new HashSet<>();
181+
for (final Types.NestedField field : LOG_SCHEMA.columns()) {
182+
required.add(field.name());
183+
}
184+
final Set<String> missing = new HashSet<>();
185+
for (final String fieldName : required) {
186+
if (existingSchema.findField(fieldName) == null) {
187+
missing.add(fieldName);
188+
}
189+
}
190+
if (!missing.isEmpty()) {
191+
throw new IllegalStateException(
192+
"Iceberg table " + tableNamespace + "." + tableName
193+
+ " is missing required columns: " + missing
194+
+ ". Expected schema columns: " + required);
195+
}
196+
}
197+
150198
void write(final LogEvent event) {
151199
lock.lock();
152200
try {
@@ -240,7 +288,7 @@ private DataFile writeParquetFile(final List<LogEvent> events) throws IOExceptio
240288

241289
try (DataWriter<GenericRecord> writer = Parquet.writeData(outputFile)
242290
.schema(LOG_SCHEMA)
243-
.withSpec(table.spec())
291+
.withSpec(PartitionSpec.unpartitioned())
244292
.createWriterFunc(GenericParquetWriter::create)
245293
.overwrite()
246294
.build()) {
@@ -249,7 +297,7 @@ private DataFile writeParquetFile(final List<LogEvent> events) throws IOExceptio
249297
}
250298
}
251299

252-
return org.apache.iceberg.DataFiles.builder(table.spec())
300+
return org.apache.iceberg.DataFiles.builder(PartitionSpec.unpartitioned())
253301
.withPath(filename)
254302
.withFileSizeInBytes(outputFile.toInputFile().getLength())
255303
.withRecordCount(events.size())

0 commit comments

Comments
 (0)