diff --git a/log4j-iceberg/pom.xml b/log4j-iceberg/pom.xml
new file mode 100644
index 00000000000..18cde56d19b
--- /dev/null
+++ b/log4j-iceberg/pom.xml
@@ -0,0 +1,226 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache.logging.log4j
+ log4j
+ ${revision}
+ ../log4j-parent
+
+
+ log4j-iceberg
+
+ Apache Log4j Iceberg
+ Apache Iceberg appender for Log4j. Writes log events as rows in an Iceberg table using Parquet files.
+
+
+ org.apache.logging.log4j.core
+ 1.10.1
+ 1.16.0
+ 3.4.1
+
+
+
+
+
+ org.xerial.snappy
+ snappy-java
+ 1.1.10.7
+
+
+ io.airlift
+ aircompressor
+ 2.0.2
+
+
+ org.apache.commons
+ commons-text
+ 1.11.0
+
+
+
+
+
+
+
+ org.apache.logging.log4j
+ log4j-core
+
+
+
+
+ org.apache.iceberg
+ iceberg-core
+ ${iceberg.version}
+
+
+ org.apache.iceberg
+ iceberg-parquet
+ ${iceberg.version}
+
+
+ org.apache.iceberg
+ iceberg-data
+ ${iceberg.version}
+
+
+ org.apache.iceberg
+ iceberg-api
+ ${iceberg.version}
+
+
+
+
+ org.apache.parquet
+ parquet-avro
+ ${parquet.version}
+
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+
+
+ org.slf4j
+ slf4j-reload4j
+
+
+ ch.qos.reload4j
+ reload4j
+
+
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ ${hadoop.version}
+
+
+ org.slf4j
+ slf4j-reload4j
+
+
+ ch.qos.reload4j
+ reload4j
+
+
+ org.apache.hadoop
+ hadoop-yarn-common
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+
+
+ org.apache.hadoop
+ hadoop-hdfs-client
+
+
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+ org.apache.logging.log4j
+ log4j-core-test
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+ org.assertj
+ assertj-core
+ 3.26.3
+ test
+
+
+
+
+
+
+
+
+ biz.aQute.bnd
+ bnd-baseline-maven-plugin
+
+
+ check-api-compat
+
+ baseline
+
+
+ true
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.8.12
+
+
+
+ prepare-agent
+
+
+
+ report
+
+ report
+
+ test
+
+
+ check
+
+ check
+
+
+
+
+ BUNDLE
+
+
+ LINE
+ COVEREDRATIO
+ 1.00
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergAppender.java b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergAppender.java
new file mode 100644
index 00000000000..50290990466
--- /dev/null
+++ b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergAppender.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.logging.log4j.iceberg;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Core;
+import org.apache.logging.log4j.core.Filter;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.Property;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
+import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
+
+/**
+ * Log4j appender that writes log events as rows in an Apache Iceberg table
+ * backed by Parquet data files.
+ *
+ *
Configuration example:
+ *
+ * <Iceberg name="IcebergAppender"
+ * catalogName="my_catalog"
+ * catalogImpl="org.apache.iceberg.rest.RESTCatalog"
+ * catalogUri="http://localhost:8181"
+ * catalogWarehouse="s3://my-bucket/warehouse"
+ * tableNamespace="logs"
+ * tableName="app_logs"
+ * batchSize="1000"
+ * flushIntervalSeconds="30">
+ * <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n"/>
+ * </Iceberg>
+ *
+ */
+@Plugin(name = "Iceberg", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
+public class IcebergAppender extends AbstractAppender {
+
+ private final IcebergManager manager;
+
+ private IcebergAppender(
+ final String name,
+ final Filter filter,
+ final boolean ignoreExceptions,
+ final Property[] properties,
+ final IcebergManager manager) {
+ super(name, filter, null, ignoreExceptions, properties);
+ this.manager = manager;
+ }
+
+ @Override
+ public void append(final LogEvent event) {
+ manager.write(event.toImmutable());
+ }
+
+ @Override
+ public void start() {
+ manager.startup();
+ super.start();
+ }
+
+ @Override
+ public boolean stop(final long timeout, final TimeUnit timeUnit) {
+ setStopping();
+ boolean stopped = super.stop(timeout, timeUnit, false);
+ stopped &= manager.stop(timeout, timeUnit);
+ setStopped();
+ return stopped;
+ }
+
+ @PluginBuilderFactory
+ public static > B newBuilder() {
+ return new Builder().asBuilder();
+ }
+
+ public static class Builder> extends AbstractAppender.Builder
+ implements org.apache.logging.log4j.core.util.Builder {
+
+ @PluginBuilderAttribute
+ private String catalogName = "log4j";
+
+ @PluginBuilderAttribute
+ @Required(message = "No catalog implementation class provided")
+ private String catalogImpl;
+
+ @PluginBuilderAttribute
+ private String catalogUri;
+
+ @PluginBuilderAttribute
+ private String catalogWarehouse;
+
+ @PluginBuilderAttribute
+ private String tableNamespace = "default";
+
+ @PluginBuilderAttribute
+ @Required(message = "No Iceberg table name provided")
+ private String tableName;
+
+ @PluginBuilderAttribute
+ private int batchSize = 1000;
+
+ @PluginBuilderAttribute
+ private int flushIntervalSeconds = 30;
+
+ public B setCatalogName(final String catalogName) {
+ this.catalogName = catalogName;
+ return asBuilder();
+ }
+
+ public B setCatalogImpl(final String catalogImpl) {
+ this.catalogImpl = catalogImpl;
+ return asBuilder();
+ }
+
+ public B setCatalogUri(final String catalogUri) {
+ this.catalogUri = catalogUri;
+ return asBuilder();
+ }
+
+ public B setCatalogWarehouse(final String catalogWarehouse) {
+ this.catalogWarehouse = catalogWarehouse;
+ return asBuilder();
+ }
+
+ public B setTableNamespace(final String tableNamespace) {
+ this.tableNamespace = tableNamespace;
+ return asBuilder();
+ }
+
+ public B setTableName(final String tableName) {
+ this.tableName = tableName;
+ return asBuilder();
+ }
+
+ public B setBatchSize(final int batchSize) {
+ this.batchSize = batchSize;
+ return asBuilder();
+ }
+
+ public B setFlushIntervalSeconds(final int flushIntervalSeconds) {
+ this.flushIntervalSeconds = flushIntervalSeconds;
+ return asBuilder();
+ }
+
+ @Override
+ public IcebergAppender build() {
+ final IcebergManager manager = new IcebergManager(
+ getName(),
+ catalogName,
+ catalogImpl,
+ catalogUri,
+ catalogWarehouse,
+ tableNamespace,
+ tableName,
+ batchSize,
+ flushIntervalSeconds);
+ return new IcebergAppender(getName(), getFilter(), isIgnoreExceptions(), null, manager);
+ }
+ }
+}
diff --git a/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergCommitException.java b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergCommitException.java
new file mode 100644
index 00000000000..1fd4eecaa3b
--- /dev/null
+++ b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergCommitException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.logging.log4j.iceberg;
+
+public class IcebergCommitException extends RuntimeException {
+
+ IcebergCommitException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergManager.java b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergManager.java
new file mode 100644
index 00000000000..47993c8973a
--- /dev/null
+++ b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergManager.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.logging.log4j.iceberg;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.types.Types;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.status.StatusLogger;
+
+/**
+ * Manages the lifecycle of the Iceberg catalog, table, and periodic flush of
+ * buffered log events into Parquet data files committed to the Iceberg table.
+ */
+class IcebergManager {
+
+ private static final StatusLogger LOGGER = StatusLogger.getLogger();
+ static final int MAX_COMMIT_RETRIES = 4;
+ private static final long RETRY_BASE_SLEEP_MS = 100;
+
+ static final Schema LOG_SCHEMA = new Schema(
+ Types.NestedField.required(1, "timestamp", Types.TimestampType.withZone()),
+ Types.NestedField.required(2, "level", Types.StringType.get()),
+ Types.NestedField.required(3, "logger_name", Types.StringType.get()),
+ Types.NestedField.optional(4, "message", Types.StringType.get()),
+ Types.NestedField.optional(5, "thread_name", Types.StringType.get()),
+ Types.NestedField.optional(6, "thrown", Types.StringType.get()),
+ Types.NestedField.required(7, "event_date", Types.DateType.get()));
+
+ private final String name;
+ private final String catalogName;
+ private final String catalogImpl;
+ private final String catalogUri;
+ private final String catalogWarehouse;
+ private final String tableNamespace;
+ private final String tableName;
+ private final int batchSize;
+ private final int flushIntervalSeconds;
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private List buffer;
+
+ Catalog catalog;
+ Table table;
+ ScheduledExecutorService scheduler;
+ ScheduledFuture> flushTask;
+ volatile boolean running;
+
+ IcebergManager(
+ final String name,
+ final String catalogName,
+ final String catalogImpl,
+ final String catalogUri,
+ final String catalogWarehouse,
+ final String tableNamespace,
+ final String tableName,
+ final int batchSize,
+ final int flushIntervalSeconds) {
+ this.name = name;
+ this.catalogName = catalogName;
+ this.catalogImpl = catalogImpl;
+ this.catalogUri = catalogUri;
+ this.catalogWarehouse = catalogWarehouse;
+ this.tableNamespace = tableNamespace;
+ this.tableName = tableName;
+ this.batchSize = batchSize;
+ this.flushIntervalSeconds = flushIntervalSeconds;
+ this.buffer = new ArrayList<>(batchSize);
+ }
+
+ void startup() {
+ if (running) {
+ return;
+ }
+ LOGGER.info("Starting IcebergManager [{}]", name);
+
+ final Map catalogProperties = new HashMap<>();
+ catalogProperties.put("type", catalogImpl);
+ if (catalogUri != null) {
+ catalogProperties.put("uri", catalogUri);
+ }
+ if (catalogWarehouse != null) {
+ catalogProperties.put("warehouse", catalogWarehouse);
+ }
+
+ catalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProperties, new Configuration());
+ final TableIdentifier tableId = TableIdentifier.of(Namespace.of(tableNamespace), tableName);
+
+ if (!((org.apache.iceberg.catalog.SupportsNamespaces) catalog).namespaceExists(Namespace.of(tableNamespace))) {
+ ((org.apache.iceberg.catalog.SupportsNamespaces) catalog).createNamespace(Namespace.of(tableNamespace));
+ }
+
+ if (!catalog.tableExists(tableId)) {
+ table = catalog.createTable(tableId, LOG_SCHEMA);
+ LOGGER.info("Created Iceberg table {}", tableId);
+ } else {
+ table = catalog.loadTable(tableId);
+ LOGGER.info("Loaded existing Iceberg table {}", tableId);
+ }
+
+ scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
+ final Thread t = new Thread(r, "log4j-iceberg-flush-" + name);
+ t.setDaemon(true);
+ return t;
+ });
+ flushTask = scheduler.scheduleAtFixedRate(
+ this::flush, flushIntervalSeconds, flushIntervalSeconds, TimeUnit.SECONDS);
+
+ running = true;
+ }
+
+ void write(final LogEvent event) {
+ lock.lock();
+ try {
+ buffer.add(event);
+ if (buffer.size() >= batchSize) {
+ flushBuffer();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void flush() {
+ lock.lock();
+ try {
+ flushBuffer();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void flushBuffer() {
+ if (buffer.isEmpty()) {
+ return;
+ }
+ final List events = buffer;
+ buffer = new ArrayList<>(batchSize);
+
+ final DataFile dataFile;
+ try {
+ dataFile = writeParquetFile(events);
+ } catch (final Exception e) {
+ throw new IcebergCommitException(
+ "Failed to write Parquet file for Iceberg table " + tableNamespace + "." + tableName, e);
+ }
+
+ commitWithRetry(dataFile, events.size());
+ }
+
+ private void commitWithRetry(final DataFile dataFile, final int eventCount) {
+ int attempt = 0;
+ while (true) {
+ try {
+ final AppendFiles append = table.newAppend();
+ append.appendFile(dataFile);
+ append.commit();
+ LOGGER.debug("Committed {} log events to Iceberg table {}.{}", eventCount, tableNamespace, tableName);
+ return;
+ } catch (final Exception e) {
+ if (attempt >= MAX_COMMIT_RETRIES) {
+ throw new IcebergCommitException(
+ "Failed to commit to Iceberg table "
+ + tableNamespace
+ + "."
+ + tableName
+ + " after "
+ + (MAX_COMMIT_RETRIES + 1)
+ + " attempts",
+ e);
+ }
+ final long sleepMs = RETRY_BASE_SLEEP_MS * (1L << attempt);
+ LOGGER.warn(
+ "Commit attempt {} failed for Iceberg table {}.{}, retrying in {} ms",
+ attempt + 1,
+ tableNamespace,
+ tableName,
+ sleepMs,
+ e);
+ try {
+ Thread.sleep(sleepMs);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IcebergCommitException(
+ "Interrupted while retrying commit to Iceberg table " + tableNamespace + "." + tableName,
+ e);
+ }
+ table.refresh();
+ attempt++;
+ }
+ }
+ }
+
+ private DataFile writeParquetFile(final List events) throws IOException {
+ final String filename = String.format(
+ "%s/data/%s-%d-%d.parquet",
+ table.location(),
+ name,
+ System.currentTimeMillis(),
+ Thread.currentThread().getId());
+ final OutputFile outputFile = table.io().newOutputFile(filename);
+
+ try (DataWriter writer = Parquet.writeData(outputFile)
+ .schema(LOG_SCHEMA)
+ .withSpec(table.spec())
+ .createWriterFunc(GenericParquetWriter::create)
+ .overwrite()
+ .build()) {
+ for (final LogEvent event : events) {
+ writer.write(toRecord(event));
+ }
+ }
+
+ return org.apache.iceberg.DataFiles.builder(table.spec())
+ .withPath(filename)
+ .withFileSizeInBytes(outputFile.toInputFile().getLength())
+ .withRecordCount(events.size())
+ .withFormat(org.apache.iceberg.FileFormat.PARQUET)
+ .build();
+ }
+
+ private GenericRecord toRecord(final LogEvent event) {
+ final GenericRecord record = GenericRecord.create(LOG_SCHEMA);
+ final Instant instant = Instant.ofEpochMilli(event.getTimeMillis());
+ record.setField("timestamp", instant.atOffset(ZoneOffset.UTC));
+ record.setField("level", event.getLevel().name());
+ record.setField("logger_name", event.getLoggerName());
+ record.setField(
+ "message", event.getMessage() != null ? event.getMessage().getFormattedMessage() : null);
+ record.setField("thread_name", event.getThreadName());
+ record.setField("thrown", event.getThrown() != null ? event.getThrown().toString() : null);
+ record.setField("event_date", instant.atOffset(ZoneOffset.UTC).toLocalDate());
+ return record;
+ }
+
+ boolean stop(final long timeout, final TimeUnit timeUnit) {
+ if (!running) {
+ return true;
+ }
+ LOGGER.info("Stopping IcebergManager [{}]", name);
+ running = false;
+
+ if (flushTask != null) {
+ flushTask.cancel(false);
+ }
+ if (scheduler != null) {
+ scheduler.shutdown();
+ try {
+ scheduler.awaitTermination(timeout, timeUnit);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ flush();
+
+ if (catalog instanceof java.io.Closeable) {
+ try {
+ ((java.io.Closeable) catalog).close();
+ } catch (final IOException e) {
+ LOGGER.warn("Error closing Iceberg catalog", e);
+ }
+ }
+ return true;
+ }
+
+ String getName() {
+ return name;
+ }
+}
diff --git a/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/package-info.java b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/package-info.java
new file mode 100644
index 00000000000..fb606a77d36
--- /dev/null
+++ b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Log4j appender for writing log events to Apache Iceberg tables as Parquet files.
+ */
+@Export
+@Version("2.26.0")
+package org.apache.logging.log4j.iceberg;
+
+import org.osgi.annotation.bundle.Export;
+import org.osgi.annotation.versioning.Version;
diff --git a/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergAppenderTest.java b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergAppenderTest.java
new file mode 100644
index 00000000000..87c8c9b62b3
--- /dev/null
+++ b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergAppenderTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.logging.log4j.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.message.SimpleMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class IcebergAppenderTest {
+
+ private Path warehouseDir;
+ private IcebergAppender appender;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ warehouseDir = Files.createTempDirectory("iceberg-appender-test");
+ }
+
+ @AfterEach
+ void tearDown() throws IOException {
+ if (appender != null && appender.isStarted()) {
+ appender.stop(5, TimeUnit.SECONDS);
+ }
+ Files.walk(warehouseDir)
+ .sorted(Comparator.reverseOrder())
+ .map(Path::toFile)
+ .forEach(File::delete);
+ }
+
+ @SuppressWarnings("unchecked")
+ private IcebergAppender buildAppender(final String tableName) {
+ final IcebergAppender.Builder extends IcebergAppender.Builder>> builder = IcebergAppender.newBuilder();
+ builder.setName("testAppender");
+ builder.setCatalogName("test_catalog");
+ builder.setCatalogImpl("hadoop");
+ builder.setCatalogWarehouse(warehouseDir.toAbsolutePath().toString());
+ builder.setTableNamespace("test_ns");
+ builder.setTableName(tableName);
+ builder.setBatchSize(100);
+ builder.setFlushIntervalSeconds(3600);
+ return builder.build();
+ }
+
+ @Test
+ void builderCreatesAppender() {
+ appender = buildAppender("builder_test");
+ assertThat(appender).isNotNull();
+ assertThat(appender.getName()).isEqualTo("testAppender");
+ }
+
+ @Test
+ void startAndStop() {
+ appender = buildAppender("start_stop");
+ appender.start();
+ assertThat(appender.isStarted()).isTrue();
+
+ assertThat(appender.stop(5, TimeUnit.SECONDS)).isTrue();
+ assertThat(appender.isStopped()).isTrue();
+ }
+
+ @Test
+ void appendWritesEvents() throws IOException {
+ appender = buildAppender("append_test");
+ appender.start();
+
+ final LogEvent event = Log4jLogEvent.newBuilder()
+ .setLoggerName("com.example.AppenderTest")
+ .setLevel(Level.WARN)
+ .setMessage(new SimpleMessage("appender test message"))
+ .setThreadName("test-thread")
+ .setTimeMillis(System.currentTimeMillis())
+ .build();
+
+ appender.append(event);
+ appender.stop(5, TimeUnit.SECONDS);
+
+ final IcebergManager reader = new IcebergManager(
+ "reader",
+ "test_catalog",
+ "hadoop",
+ null,
+ warehouseDir.toAbsolutePath().toString(),
+ "test_ns",
+ "append_test",
+ 100,
+ 3600);
+ reader.startup();
+ reader.table.refresh();
+ try (CloseableIterable records =
+ IcebergGenerics.read(reader.table).build()) {
+ int count = 0;
+ for (final Record record : records) {
+ assertThat(record.getField("level")).isEqualTo("WARN");
+ assertThat(record.getField("message")).isEqualTo("appender test message");
+ count++;
+ }
+ assertThat(count).isEqualTo(1);
+ }
+ reader.stop(5, TimeUnit.SECONDS);
+ }
+
+ @Test
+ void newBuilderFactory() {
+ final IcebergAppender.Builder> builder = IcebergAppender.newBuilder();
+ assertThat(builder).isNotNull();
+ }
+
+ @Test
+ void builderSettersReturnBuilder() {
+ final IcebergAppender.Builder> builder = IcebergAppender.newBuilder();
+ assertThat(builder.setCatalogName("c")).isSameAs(builder);
+ assertThat(builder.setCatalogImpl("i")).isSameAs(builder);
+ assertThat(builder.setCatalogUri("u")).isSameAs(builder);
+ assertThat(builder.setCatalogWarehouse("w")).isSameAs(builder);
+ assertThat(builder.setTableNamespace("n")).isSameAs(builder);
+ assertThat(builder.setTableName("t")).isSameAs(builder);
+ assertThat(builder.setBatchSize(10)).isSameAs(builder);
+ assertThat(builder.setFlushIntervalSeconds(5)).isSameAs(builder);
+ }
+}
diff --git a/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergInspectTest.java b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergInspectTest.java
new file mode 100644
index 00000000000..9a4eb1ccbcf
--- /dev/null
+++ b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergInspectTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.logging.log4j.iceberg;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.message.SimpleMessage;
+import org.junit.jupiter.api.Test;
+
+class IcebergInspectTest {
+
+ @Test
+ void createInspectableTable() {
+ final String warehouse = "/tmp/iceberg-inspect";
+ final IcebergManager manager = new IcebergManager(
+ "inspect", "inspect_catalog", "hadoop", null, warehouse, "logs", "app_logs", 500, 3600);
+ manager.startup();
+
+ final Level[] levels = {Level.TRACE, Level.DEBUG, Level.INFO, Level.WARN, Level.ERROR};
+ for (int i = 0; i < 1000; i++) {
+ final LogEvent event = Log4jLogEvent.newBuilder()
+ .setLoggerName("com.example.App")
+ .setLevel(levels[i % levels.length])
+ .setMessage(new SimpleMessage("log message " + i))
+ .setThreadName("main")
+ .setTimeMillis(1700000000000L + i * 1000L)
+ .build();
+ manager.write(event.toImmutable());
+ }
+
+ manager.stop(10, TimeUnit.SECONDS);
+ System.out.println("Iceberg table written to: " + warehouse + "/logs/app_logs");
+ System.out.println("Inspect with: find " + warehouse + " -type f");
+ }
+}
diff --git a/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergManagerTest.java b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergManagerTest.java
new file mode 100644
index 00000000000..8797aac3900
--- /dev/null
+++ b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergManagerTest.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.logging.log4j.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.message.SimpleMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+class IcebergManagerTest {
+
+ private Path warehouseDir;
+ private IcebergManager manager;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ warehouseDir = Files.createTempDirectory("iceberg-test-warehouse");
+ }
+
+ @AfterEach
+ void tearDown() throws IOException {
+ if (manager != null) {
+ manager.stop(5, TimeUnit.SECONDS);
+ }
+ Files.walk(warehouseDir)
+ .sorted(Comparator.reverseOrder())
+ .map(Path::toFile)
+ .forEach(File::delete);
+ }
+
+ private IcebergManager createManager(final String tableName, final int batchSize) {
+ return new IcebergManager(
+ "test",
+ "test_catalog",
+ "hadoop",
+ null,
+ warehouseDir.toAbsolutePath().toString(),
+ "test_ns",
+ tableName,
+ batchSize,
+ 3600);
+ }
+
+ private LogEvent makeEvent(final String loggerName, final Level level, final String message) {
+ return Log4jLogEvent.newBuilder()
+ .setLoggerName(loggerName)
+ .setLevel(level)
+ .setMessage(new SimpleMessage(message))
+ .setThreadName("test-thread")
+ .setTimeMillis(System.currentTimeMillis())
+ .build();
+ }
+
+ private LogEvent makeEventWithThrown(
+ final String loggerName, final Level level, final String message, final Throwable thrown) {
+ return Log4jLogEvent.newBuilder()
+ .setLoggerName(loggerName)
+ .setLevel(level)
+ .setMessage(new SimpleMessage(message))
+ .setThreadName("test-thread")
+ .setTimeMillis(System.currentTimeMillis())
+ .setThrown(thrown)
+ .build();
+ }
+
+ private LogEvent makeEventNullMessage(final String loggerName, final Level level) {
+ return Log4jLogEvent.newBuilder()
+ .setLoggerName(loggerName)
+ .setLevel(level)
+ .setThreadName("test-thread")
+ .setTimeMillis(System.currentTimeMillis())
+ .build();
+ }
+
+ @Test
+ void startupCreatesNewTable() {
+ manager = createManager("new_table", 100);
+ manager.startup();
+
+ assertThat(manager.table).isNotNull();
+ assertThat(manager.catalog.tableExists(TableIdentifier.of(Namespace.of("test_ns"), "new_table")))
+ .isTrue();
+ }
+
+ @Test
+ void startupLoadsExistingTable() {
+ manager = createManager("existing_table", 100);
+ manager.startup();
+
+ final Table firstTable = manager.table;
+ manager.stop(5, TimeUnit.SECONDS);
+
+ manager = createManager("existing_table", 100);
+ manager.startup();
+ assertThat(manager.table).isNotNull();
+ assertThat(manager.table.location()).isEqualTo(firstTable.location());
+ }
+
+ @Test
+ void startupIdempotentWhenAlreadyRunning() {
+ manager = createManager("idempotent_table", 100);
+ manager.startup();
+ final Table firstTable = manager.table;
+
+ manager.startup();
+ assertThat(manager.table).isSameAs(firstTable);
+ }
+
+ @Test
+ void writeAndFlushSingleEvent() throws IOException {
+ manager = createManager("single_event", 100);
+ manager.startup();
+
+ manager.write(makeEvent("com.example.Test", Level.INFO, "hello iceberg").toImmutable());
+ manager.flush();
+
+ manager.table.refresh();
+ try (CloseableIterable records =
+ IcebergGenerics.read(manager.table).build()) {
+ int count = 0;
+ for (final Record record : records) {
+ assertThat(record.getField("level")).isEqualTo("INFO");
+ assertThat(record.getField("logger_name")).isEqualTo("com.example.Test");
+ assertThat(record.getField("message")).isEqualTo("hello iceberg");
+ assertThat(record.getField("thread_name")).isEqualTo("test-thread");
+ assertThat(record.getField("thrown")).isNull();
+ assertThat(record.getField("timestamp")).isNotNull();
+ assertThat(record.getField("event_date")).isNotNull();
+ count++;
+ }
+ assertThat(count).isEqualTo(1);
+ }
+ }
+
+ @Test
+ void writeTriggersFlushWhenBatchSizeReached() throws IOException {
+ manager = createManager("batch_flush", 3);
+ manager.startup();
+
+ manager.write(makeEvent("logger1", Level.DEBUG, "msg1").toImmutable());
+ manager.write(makeEvent("logger2", Level.WARN, "msg2").toImmutable());
+ manager.write(makeEvent("logger3", Level.ERROR, "msg3").toImmutable());
+
+ manager.table.refresh();
+ try (CloseableIterable records =
+ IcebergGenerics.read(manager.table).build()) {
+ int count = 0;
+ for (final Record ignored : records) {
+ count++;
+ }
+ assertThat(count).isEqualTo(3);
+ }
+ }
+
+ @Test
+ void flushOnEmptyBufferIsNoOp() {
+ manager = createManager("empty_flush", 100);
+ manager.startup();
+
+ manager.flush();
+
+ assertThat(manager.table.snapshots()).isEmpty();
+ }
+
+ @Test
+ void eventWithThrownIsRecorded() throws IOException {
+ manager = createManager("thrown_event", 100);
+ manager.startup();
+
+ final RuntimeException ex = new RuntimeException("test failure");
+ manager.write(makeEventWithThrown("com.example.Error", Level.ERROR, "oops", ex)
+ .toImmutable());
+ manager.flush();
+
+ manager.table.refresh();
+ try (CloseableIterable records =
+ IcebergGenerics.read(manager.table).build()) {
+ for (final Record record : records) {
+ assertThat((String) record.getField("thrown")).contains("test failure");
+ }
+ }
+ }
+
+ @Test
+ void eventWithNullMessageIsRecorded() throws IOException {
+ manager = createManager("null_msg", 100);
+ manager.startup();
+
+ manager.write(makeEventNullMessage("com.example.Null", Level.TRACE).toImmutable());
+ manager.flush();
+
+ manager.table.refresh();
+ try (CloseableIterable records =
+ IcebergGenerics.read(manager.table).build()) {
+ for (final Record record : records) {
+ assertThat(record.getField("message")).isNull();
+ assertThat(record.getField("thrown")).isNull();
+ }
+ }
+ }
+
+ @Test
+ void multipleFlushesProduceMultipleSnapshots() throws IOException {
+ manager = createManager("multi_flush", 100);
+ manager.startup();
+
+ manager.write(makeEvent("logger", Level.INFO, "batch1").toImmutable());
+ manager.flush();
+
+ manager.write(makeEvent("logger", Level.INFO, "batch2").toImmutable());
+ manager.flush();
+
+ manager.table.refresh();
+ int snapshotCount = 0;
+ for (final org.apache.iceberg.Snapshot ignored : manager.table.snapshots()) {
+ snapshotCount++;
+ }
+ assertThat(snapshotCount).isEqualTo(2);
+
+ try (CloseableIterable records =
+ IcebergGenerics.read(manager.table).build()) {
+ int count = 0;
+ for (final Record ignored : records) {
+ count++;
+ }
+ assertThat(count).isEqualTo(2);
+ }
+ }
+
+ @Test
+ void stopWhenNotRunningReturnsTrueImmediately() {
+ manager = createManager("not_running", 100);
+ assertThat(manager.stop(1, TimeUnit.SECONDS)).isTrue();
+ manager = null;
+ }
+
+ @Test
+ void stopFlushesRemainingEvents() throws IOException {
+ manager = createManager("stop_flush", 100);
+ manager.startup();
+
+ manager.write(makeEvent("logger", Level.INFO, "pending").toImmutable());
+ manager.stop(5, TimeUnit.SECONDS);
+
+ final IcebergManager reader = createManager("stop_flush", 100);
+ reader.startup();
+ reader.table.refresh();
+ try (CloseableIterable records =
+ IcebergGenerics.read(reader.table).build()) {
+ int count = 0;
+ for (final Record record : records) {
+ assertThat(record.getField("message")).isEqualTo("pending");
+ count++;
+ }
+ assertThat(count).isEqualTo(1);
+ }
+ reader.stop(5, TimeUnit.SECONDS);
+ manager = null;
+ }
+
+ @Test
+ void getName() {
+ manager = createManager("name_test", 100);
+ assertThat(manager.getName()).isEqualTo("test");
+ manager = null;
+ }
+
+ @Test
+ void startupWithNullUriAndWarehouse() {
+ manager = new IcebergManager(
+ "test",
+ "test_catalog",
+ "hadoop",
+ null,
+ warehouseDir.toAbsolutePath().toString(),
+ "test_ns",
+ "null_uri_test",
+ 100,
+ 3600);
+ manager.startup();
+ assertThat(manager.table).isNotNull();
+ }
+
+ @Test
+ void startupWithCatalogUri() {
+ manager = new IcebergManager(
+ "test",
+ "test_catalog",
+ "hadoop",
+ "file:///tmp/unused",
+ warehouseDir.toAbsolutePath().toString(),
+ "test_ns",
+ "uri_test",
+ 100,
+ 3600);
+ manager.startup();
+ assertThat(manager.table).isNotNull();
+ }
+
+ @Test
+ void commitRetrySucceedsAfterTransientFailure() throws IOException {
+ manager = createManager("retry_ok", 100);
+ manager.startup();
+
+ manager.write(makeEvent("logger", Level.INFO, "retry event").toImmutable());
+
+ final Table original = manager.table;
+ final Table spyTable = Mockito.spy(original);
+ final AppendFiles failingAppend = Mockito.mock(AppendFiles.class);
+ Mockito.doThrow(new org.apache.iceberg.exceptions.CommitFailedException("conflict"))
+ .when(failingAppend)
+ .commit();
+ final AppendFiles realAppend = original.newAppend();
+
+ Mockito.when(spyTable.newAppend()).thenReturn(failingAppend).thenReturn(realAppend);
+ manager.table = spyTable;
+
+ manager.flush();
+
+ original.refresh();
+ try (CloseableIterable records = IcebergGenerics.read(original).build()) {
+ int count = 0;
+ for (final Record record : records) {
+ assertThat(record.getField("message")).isEqualTo("retry event");
+ count++;
+ }
+ assertThat(count).isEqualTo(1);
+ }
+ manager.table = original;
+ }
+
+ @Test
+ void commitCrashesAfterExhaustingRetries() {
+ manager = createManager("retry_exhaust", 100);
+ manager.startup();
+
+ manager.write(makeEvent("logger", Level.INFO, "will exhaust retries").toImmutable());
+
+ final Table original = manager.table;
+ final Table mockTable = Mockito.mock(Table.class);
+ Mockito.when(mockTable.location()).thenReturn(original.location());
+ Mockito.when(mockTable.io()).thenReturn(original.io());
+ Mockito.when(mockTable.spec()).thenReturn(original.spec());
+ final AppendFiles failingAppend = Mockito.mock(AppendFiles.class);
+ Mockito.doThrow(new org.apache.iceberg.exceptions.CommitFailedException("persistent conflict"))
+ .when(failingAppend)
+ .commit();
+ Mockito.when(mockTable.newAppend()).thenReturn(failingAppend);
+ manager.table = mockTable;
+
+ assertThatThrownBy(() -> manager.flush())
+ .isInstanceOf(IcebergCommitException.class)
+ .hasMessageContaining("after " + (IcebergManager.MAX_COMMIT_RETRIES + 1) + " attempts")
+ .hasCauseInstanceOf(org.apache.iceberg.exceptions.CommitFailedException.class);
+
+ manager.table = original;
+ }
+
+ @Test
+ void stopHandlesInterruptedException() throws InterruptedException {
+ manager = createManager("interrupt_test", 100);
+ manager.startup();
+
+ final ScheduledExecutorService realScheduler = manager.scheduler;
+ realScheduler.shutdown();
+ final ScheduledExecutorService mockScheduler = Mockito.mock(ScheduledExecutorService.class);
+ Mockito.when(mockScheduler.awaitTermination(Mockito.anyLong(), Mockito.any()))
+ .thenThrow(new InterruptedException("test interrupt"));
+ manager.scheduler = mockScheduler;
+
+ assertThat(manager.stop(5, TimeUnit.SECONDS)).isTrue();
+ assertThat(Thread.interrupted()).isTrue();
+ manager = null;
+ }
+
+ @Test
+ void stopClosesCloseableCatalog() {
+ manager = createManager("closeable_test", 100);
+ manager.startup();
+ // HadoopCatalog implements Closeable, so this path is naturally covered
+ assertThat(manager.catalog).isInstanceOf(Closeable.class);
+ assertThat(manager.stop(5, TimeUnit.SECONDS)).isTrue();
+ manager = null;
+ }
+
+ @Test
+ void stopHandlesCloseableIOException() throws IOException {
+ manager = createManager("close_error_test", 100);
+ manager.startup();
+
+ // Replace catalog with a Closeable mock that throws on close
+ final CloseableCatalog mockCatalog = Mockito.mock(CloseableCatalog.class);
+ Mockito.doThrow(new IOException("close failed")).when(mockCatalog).close();
+ manager.catalog = mockCatalog;
+
+ assertThat(manager.stop(5, TimeUnit.SECONDS)).isTrue();
+ manager = null;
+ }
+
+ interface CloseableCatalog extends Catalog, Closeable {}
+
+ @Test
+ void startupWithNullWarehouse() {
+ manager =
+ new IcebergManager("test", "test_catalog", "hadoop", null, null, "test_ns", "null_wh_test", 100, 3600);
+ try {
+ manager.startup();
+ } catch (final Exception e) {
+ // Expected — hadoop catalog requires a warehouse
+ }
+ manager = null;
+ }
+
+ @Test
+ void stopWithNullFlushTaskAndScheduler() {
+ manager = createManager("null_sched", 100);
+ manager.running = true;
+ manager.catalog = Mockito.mock(Catalog.class);
+ assertThat(manager.stop(1, TimeUnit.SECONDS)).isTrue();
+ manager = null;
+ }
+
+ @Test
+ void stopWithNonCloseableCatalog() {
+ manager = createManager("non_closeable", 100);
+ manager.running = true;
+ manager.catalog = Mockito.mock(Catalog.class);
+ assertThat(manager.stop(1, TimeUnit.SECONDS)).isTrue();
+ manager = null;
+ }
+
+ @Test
+ void writeParquetFileIOExceptionCrashes() {
+ manager = createManager("io_error", 100);
+ manager.startup();
+
+ manager.write(makeEvent("logger", Level.INFO, "will fail IO").toImmutable());
+ final Table original = manager.table;
+ final Table mockTable = Mockito.mock(Table.class);
+ Mockito.when(mockTable.location()).thenReturn(original.location());
+ Mockito.when(mockTable.spec()).thenReturn(original.spec());
+ final org.apache.iceberg.io.FileIO mockIo = Mockito.mock(org.apache.iceberg.io.FileIO.class);
+ Mockito.when(mockTable.io()).thenReturn(mockIo);
+ final OutputFile mockOutput = Mockito.mock(OutputFile.class);
+ Mockito.when(mockIo.newOutputFile(Mockito.anyString())).thenReturn(mockOutput);
+ Mockito.when(mockOutput.createOrOverwrite()).thenThrow(new RuntimeException(new IOException("disk full")));
+ manager.table = mockTable;
+
+ assertThatThrownBy(() -> manager.flush())
+ .isInstanceOf(IcebergCommitException.class)
+ .hasMessageContaining("Failed to write Parquet file");
+ manager.table = original;
+ }
+
+ @Test
+ void commitRetryInterruptedCrashes() {
+ manager = createManager("retry_interrupt", 100);
+ manager.startup();
+
+ manager.write(makeEvent("logger", Level.INFO, "will be interrupted").toImmutable());
+
+ final Table original = manager.table;
+ final Table spyTable = Mockito.spy(original);
+ final AppendFiles failingAppend = Mockito.mock(AppendFiles.class);
+ Mockito.doAnswer(invocation -> {
+ Thread.currentThread().interrupt();
+ throw new org.apache.iceberg.exceptions.CommitFailedException("conflict");
+ })
+ .when(failingAppend)
+ .commit();
+ Mockito.when(spyTable.newAppend()).thenReturn(failingAppend);
+ manager.table = spyTable;
+
+ assertThatThrownBy(() -> manager.flush())
+ .isInstanceOf(IcebergCommitException.class)
+ .hasMessageContaining("Interrupted");
+ Thread.interrupted();
+ manager.table = original;
+ }
+}
diff --git a/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergPerformanceTest.java b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergPerformanceTest.java
new file mode 100644
index 00000000000..29e87d23969
--- /dev/null
+++ b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergPerformanceTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.logging.log4j.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.FileAppender;
+import org.apache.logging.log4j.core.impl.Log4jLogEvent;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+import org.apache.logging.log4j.message.SimpleMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class IcebergPerformanceTest {
+
+ private static final int TOTAL_RECORDS = 1_000_000;
+ private static final int RECORDS_PER_COMMIT = 100_000;
+
+ private Path tempDir;
+ private IcebergManager manager;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ tempDir = Files.createTempDirectory("iceberg-perf-test");
+ }
+
+ @AfterEach
+ void tearDown() throws IOException {
+ if (manager != null) {
+ manager.stop(30, TimeUnit.SECONDS);
+ }
+ Files.walk(tempDir).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
+ }
+
+ private static final Level[] LEVELS = {Level.TRACE, Level.DEBUG, Level.INFO, Level.WARN, Level.ERROR};
+
+ private LogEvent buildEvent(final int i) {
+ return Log4jLogEvent.newBuilder()
+ .setLoggerName("com.example.perf.Logger" + (i % 100))
+ .setLevel(LEVELS[i % LEVELS.length])
+ .setMessage(new SimpleMessage("perf-message-" + i))
+ .setThreadName("perf-thread-" + (i % 10))
+ .setTimeMillis(1700000000000L + i)
+ .build();
+ }
+
+ @Test
+ void writeOneMillionRecordsWithPeriodicCommits() throws IOException {
+
+ // ============================================================
+ // Phase 1: File Appender benchmark
+ // ============================================================
+ final Path logFile = tempDir.resolve("perf-test.log");
+ final PatternLayout layout = PatternLayout.newBuilder()
+ .withPattern("%d %-5level [%t] %logger - %msg%n")
+ .build();
+ final FileAppender fileAppender = FileAppender.newBuilder()
+ .setName("filePerf")
+ .withFileName(logFile.toString())
+ .withImmediateFlush(false)
+ .withBufferedIo(true)
+ .withBufferSize(8192)
+ .setLayout(layout)
+ .build();
+ fileAppender.start();
+
+ final long fileWriteStart = System.nanoTime();
+ for (int i = 0; i < TOTAL_RECORDS; i++) {
+ fileAppender.append(buildEvent(i));
+ }
+ fileAppender.stop(30, TimeUnit.SECONDS);
+ final long fileWriteMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fileWriteStart);
+ final double fileWriteThroughput = (double) TOTAL_RECORDS / fileWriteMs * 1000.0;
+
+ int fileLineCount = 0;
+ try (BufferedReader reader = new BufferedReader(new FileReader(logFile.toFile()))) {
+ while (reader.readLine() != null) {
+ fileLineCount++;
+ }
+ }
+ assertThat(fileLineCount).isEqualTo(TOTAL_RECORDS);
+ final long fileSizeBytes = Files.size(logFile);
+
+ // ============================================================
+ // Phase 2: Iceberg Appender benchmark
+ // ============================================================
+ final Path warehouseDir = tempDir.resolve("warehouse");
+ Files.createDirectories(warehouseDir);
+ manager = new IcebergManager(
+ "perf_test",
+ "perf_catalog",
+ "hadoop",
+ null,
+ warehouseDir.toAbsolutePath().toString(),
+ "perf_ns",
+ "perf_table",
+ RECORDS_PER_COMMIT,
+ 3600);
+ manager.startup();
+
+ final long icebergWriteStart = System.nanoTime();
+ for (int i = 0; i < TOTAL_RECORDS; i++) {
+ manager.write(buildEvent(i).toImmutable());
+ }
+ manager.flush();
+ final long icebergWriteMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - icebergWriteStart);
+ final double icebergWriteThroughput = (double) TOTAL_RECORDS / icebergWriteMs * 1000.0;
+
+ manager.table.refresh();
+ int snapshotCount = 0;
+ for (final Snapshot ignored : manager.table.snapshots()) {
+ snapshotCount++;
+ }
+ final int expectedCommits = (TOTAL_RECORDS + RECORDS_PER_COMMIT - 1) / RECORDS_PER_COMMIT;
+ assertThat(snapshotCount).isEqualTo(expectedCommits);
+
+ // Iceberg read-back
+ final long icebergReadStart = System.nanoTime();
+ final Map levelCounts = new HashMap<>();
+ final Set seenMessages = new HashSet<>();
+ int totalRead = 0;
+
+ try (CloseableIterable records =
+ IcebergGenerics.read(manager.table).build()) {
+ for (final Record record : records) {
+ final String level = (String) record.getField("level");
+ final String message = (String) record.getField("message");
+
+ assertThat(level).isNotNull();
+ assertThat(message).startsWith("perf-message-");
+ assertThat(record.getField("logger_name").toString()).startsWith("com.example.perf.Logger");
+ assertThat(record.getField("thread_name").toString()).startsWith("perf-thread-");
+ assertThat(record.getField("timestamp")).isNotNull();
+ assertThat(record.getField("event_date")).isNotNull();
+
+ levelCounts.merge(level, 1, Integer::sum);
+ seenMessages.add(message);
+ totalRead++;
+ }
+ }
+ final long icebergReadMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - icebergReadStart);
+ final double icebergReadThroughput = (double) totalRead / icebergReadMs * 1000.0;
+
+ assertThat(totalRead).isEqualTo(TOTAL_RECORDS);
+ assertThat(seenMessages).hasSize(TOTAL_RECORDS);
+ final int expectedPerLevel = TOTAL_RECORDS / LEVELS.length;
+ for (final Level level : LEVELS) {
+ assertThat(levelCounts.get(level.name()))
+ .as("Count for level %s", level.name())
+ .isEqualTo(expectedPerLevel);
+ }
+
+ long icebergSizeBytes = 0;
+ try (java.util.stream.Stream paths = Files.walk(warehouseDir)) {
+ icebergSizeBytes = paths.filter(Files::isRegularFile)
+ .mapToLong(p -> p.toFile().length())
+ .sum();
+ }
+
+ // ============================================================
+ // Results
+ // ============================================================
+ System.out.println();
+ System.out.println("========== Performance Comparison (1M records) ==========");
+ System.out.println();
+ System.out.printf("%-25s %15s %15s%n", "", "File Appender", "Iceberg");
+ System.out.printf("%-25s %15s %15s%n", "-------------------------", "---------------", "---------------");
+ System.out.printf("%-25s %12d ms %12d ms%n", "Write time", fileWriteMs, icebergWriteMs);
+ System.out.printf("%-25s %12.0f/s %12.0f/s%n", "Write throughput", fileWriteThroughput, icebergWriteThroughput);
+ System.out.printf("%-25s %15s %12d ms%n", "Read time", "n/a", icebergReadMs);
+ System.out.printf("%-25s %15s %12.0f/s%n", "Read throughput", "n/a", icebergReadThroughput);
+ System.out.printf(
+ "%-25s %12.1f MB %12.1f MB%n",
+ "Disk size", fileSizeBytes / (1024.0 * 1024.0), icebergSizeBytes / (1024.0 * 1024.0));
+ System.out.printf("%-25s %15d %15d%n", "Commits/flushes", 1, snapshotCount);
+ System.out.printf("%-25s %12.1fx %15s%n", "Iceberg overhead", (double) icebergWriteMs / fileWriteMs, "");
+ System.out.printf("%-25s %12.1fx %15s%n", "Iceberg compression", (double) fileSizeBytes / icebergSizeBytes, "");
+ System.out.println();
+ System.out.println("Level distribution: " + levelCounts);
+ System.out.println("All " + TOTAL_RECORDS + " records verified correct in both appenders.");
+ System.out.println("=========================================================");
+ }
+}
diff --git a/pom.xml b/pom.xml
index 87123ce4374..5911d5e6ed2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -248,6 +248,7 @@
log4j-couchdb
log4j-docker
log4j-fuzz-test
+ log4j-iceberg
log4j-iostreams
log4j-jakarta-jms
log4j-jakarta-smtp
@@ -441,6 +442,12 @@
2.23.1
+
+ org.apache.logging.log4j
+ log4j-iceberg
+ ${project.version}
+
+
org.apache.logging.log4j
log4j-iostreams