diff --git a/log4j-iceberg/pom.xml b/log4j-iceberg/pom.xml new file mode 100644 index 00000000000..18cde56d19b --- /dev/null +++ b/log4j-iceberg/pom.xml @@ -0,0 +1,226 @@ + + + + + 4.0.0 + + + org.apache.logging.log4j + log4j + ${revision} + ../log4j-parent + + + log4j-iceberg + + Apache Log4j Iceberg + Apache Iceberg appender for Log4j. Writes log events as rows in an Iceberg table using Parquet files. + + + org.apache.logging.log4j.core + 1.10.1 + 1.16.0 + 3.4.1 + + + + + + org.xerial.snappy + snappy-java + 1.1.10.7 + + + io.airlift + aircompressor + 2.0.2 + + + org.apache.commons + commons-text + 1.11.0 + + + + + + + + org.apache.logging.log4j + log4j-core + + + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + + org.apache.iceberg + iceberg-parquet + ${iceberg.version} + + + org.apache.iceberg + iceberg-data + ${iceberg.version} + + + org.apache.iceberg + iceberg-api + ${iceberg.version} + + + + + org.apache.parquet + parquet-avro + ${parquet.version} + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.slf4j + slf4j-reload4j + + + ch.qos.reload4j + reload4j + + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + org.slf4j + slf4j-reload4j + + + ch.qos.reload4j + reload4j + + + org.apache.hadoop + hadoop-yarn-common + + + org.apache.hadoop + hadoop-yarn-client + + + org.apache.hadoop + hadoop-hdfs-client + + + + + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.apache.logging.log4j + log4j-core-test + test + + + org.mockito + mockito-core + test + + + org.assertj + assertj-core + 3.26.3 + test + + + + + + + + + biz.aQute.bnd + bnd-baseline-maven-plugin + + + check-api-compat + + baseline + + + true + + + + + + org.jacoco + jacoco-maven-plugin + 0.8.12 + + + + prepare-agent + + + + report + + report + + test + + + check + + check + + + + + BUNDLE + + + LINE + COVEREDRATIO + 1.00 + + + + + + + + + + + + diff --git a/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergAppender.java b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergAppender.java new file mode 100644 index 00000000000..50290990466 --- /dev/null +++ b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergAppender.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.iceberg; + +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.Core; +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.Property; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory; +import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required; + +/** + * Log4j appender that writes log events as rows in an Apache Iceberg table + * backed by Parquet data files. + * + *

Configuration example:

+ *
+ * <Iceberg name="IcebergAppender"
+ *          catalogName="my_catalog"
+ *          catalogImpl="org.apache.iceberg.rest.RESTCatalog"
+ *          catalogUri="http://localhost:8181"
+ *          catalogWarehouse="s3://my-bucket/warehouse"
+ *          tableNamespace="logs"
+ *          tableName="app_logs"
+ *          batchSize="1000"
+ *          flushIntervalSeconds="30">
+ *   <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n"/>
+ * </Iceberg>
+ * 
+ */ +@Plugin(name = "Iceberg", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true) +public class IcebergAppender extends AbstractAppender { + + private final IcebergManager manager; + + private IcebergAppender( + final String name, + final Filter filter, + final boolean ignoreExceptions, + final Property[] properties, + final IcebergManager manager) { + super(name, filter, null, ignoreExceptions, properties); + this.manager = manager; + } + + @Override + public void append(final LogEvent event) { + manager.write(event.toImmutable()); + } + + @Override + public void start() { + manager.startup(); + super.start(); + } + + @Override + public boolean stop(final long timeout, final TimeUnit timeUnit) { + setStopping(); + boolean stopped = super.stop(timeout, timeUnit, false); + stopped &= manager.stop(timeout, timeUnit); + setStopped(); + return stopped; + } + + @PluginBuilderFactory + public static > B newBuilder() { + return new Builder().asBuilder(); + } + + public static class Builder> extends AbstractAppender.Builder + implements org.apache.logging.log4j.core.util.Builder { + + @PluginBuilderAttribute + private String catalogName = "log4j"; + + @PluginBuilderAttribute + @Required(message = "No catalog implementation class provided") + private String catalogImpl; + + @PluginBuilderAttribute + private String catalogUri; + + @PluginBuilderAttribute + private String catalogWarehouse; + + @PluginBuilderAttribute + private String tableNamespace = "default"; + + @PluginBuilderAttribute + @Required(message = "No Iceberg table name provided") + private String tableName; + + @PluginBuilderAttribute + private int batchSize = 1000; + + @PluginBuilderAttribute + private int flushIntervalSeconds = 30; + + public B setCatalogName(final String catalogName) { + this.catalogName = catalogName; + return asBuilder(); + } + + public B setCatalogImpl(final String catalogImpl) { + this.catalogImpl = catalogImpl; + return asBuilder(); + } + + public B setCatalogUri(final String catalogUri) { + this.catalogUri = catalogUri; + return asBuilder(); + } + + public B setCatalogWarehouse(final String catalogWarehouse) { + this.catalogWarehouse = catalogWarehouse; + return asBuilder(); + } + + public B setTableNamespace(final String tableNamespace) { + this.tableNamespace = tableNamespace; + return asBuilder(); + } + + public B setTableName(final String tableName) { + this.tableName = tableName; + return asBuilder(); + } + + public B setBatchSize(final int batchSize) { + this.batchSize = batchSize; + return asBuilder(); + } + + public B setFlushIntervalSeconds(final int flushIntervalSeconds) { + this.flushIntervalSeconds = flushIntervalSeconds; + return asBuilder(); + } + + @Override + public IcebergAppender build() { + final IcebergManager manager = new IcebergManager( + getName(), + catalogName, + catalogImpl, + catalogUri, + catalogWarehouse, + tableNamespace, + tableName, + batchSize, + flushIntervalSeconds); + return new IcebergAppender(getName(), getFilter(), isIgnoreExceptions(), null, manager); + } + } +} diff --git a/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergCommitException.java b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergCommitException.java new file mode 100644 index 00000000000..1fd4eecaa3b --- /dev/null +++ b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergCommitException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.iceberg; + +public class IcebergCommitException extends RuntimeException { + + IcebergCommitException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergManager.java b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergManager.java new file mode 100644 index 00000000000..47993c8973a --- /dev/null +++ b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/IcebergManager.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.iceberg; + +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.status.StatusLogger; + +/** + * Manages the lifecycle of the Iceberg catalog, table, and periodic flush of + * buffered log events into Parquet data files committed to the Iceberg table. + */ +class IcebergManager { + + private static final StatusLogger LOGGER = StatusLogger.getLogger(); + static final int MAX_COMMIT_RETRIES = 4; + private static final long RETRY_BASE_SLEEP_MS = 100; + + static final Schema LOG_SCHEMA = new Schema( + Types.NestedField.required(1, "timestamp", Types.TimestampType.withZone()), + Types.NestedField.required(2, "level", Types.StringType.get()), + Types.NestedField.required(3, "logger_name", Types.StringType.get()), + Types.NestedField.optional(4, "message", Types.StringType.get()), + Types.NestedField.optional(5, "thread_name", Types.StringType.get()), + Types.NestedField.optional(6, "thrown", Types.StringType.get()), + Types.NestedField.required(7, "event_date", Types.DateType.get())); + + private final String name; + private final String catalogName; + private final String catalogImpl; + private final String catalogUri; + private final String catalogWarehouse; + private final String tableNamespace; + private final String tableName; + private final int batchSize; + private final int flushIntervalSeconds; + + private final ReentrantLock lock = new ReentrantLock(); + private List buffer; + + Catalog catalog; + Table table; + ScheduledExecutorService scheduler; + ScheduledFuture flushTask; + volatile boolean running; + + IcebergManager( + final String name, + final String catalogName, + final String catalogImpl, + final String catalogUri, + final String catalogWarehouse, + final String tableNamespace, + final String tableName, + final int batchSize, + final int flushIntervalSeconds) { + this.name = name; + this.catalogName = catalogName; + this.catalogImpl = catalogImpl; + this.catalogUri = catalogUri; + this.catalogWarehouse = catalogWarehouse; + this.tableNamespace = tableNamespace; + this.tableName = tableName; + this.batchSize = batchSize; + this.flushIntervalSeconds = flushIntervalSeconds; + this.buffer = new ArrayList<>(batchSize); + } + + void startup() { + if (running) { + return; + } + LOGGER.info("Starting IcebergManager [{}]", name); + + final Map catalogProperties = new HashMap<>(); + catalogProperties.put("type", catalogImpl); + if (catalogUri != null) { + catalogProperties.put("uri", catalogUri); + } + if (catalogWarehouse != null) { + catalogProperties.put("warehouse", catalogWarehouse); + } + + catalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProperties, new Configuration()); + final TableIdentifier tableId = TableIdentifier.of(Namespace.of(tableNamespace), tableName); + + if (!((org.apache.iceberg.catalog.SupportsNamespaces) catalog).namespaceExists(Namespace.of(tableNamespace))) { + ((org.apache.iceberg.catalog.SupportsNamespaces) catalog).createNamespace(Namespace.of(tableNamespace)); + } + + if (!catalog.tableExists(tableId)) { + table = catalog.createTable(tableId, LOG_SCHEMA); + LOGGER.info("Created Iceberg table {}", tableId); + } else { + table = catalog.loadTable(tableId); + LOGGER.info("Loaded existing Iceberg table {}", tableId); + } + + scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + final Thread t = new Thread(r, "log4j-iceberg-flush-" + name); + t.setDaemon(true); + return t; + }); + flushTask = scheduler.scheduleAtFixedRate( + this::flush, flushIntervalSeconds, flushIntervalSeconds, TimeUnit.SECONDS); + + running = true; + } + + void write(final LogEvent event) { + lock.lock(); + try { + buffer.add(event); + if (buffer.size() >= batchSize) { + flushBuffer(); + } + } finally { + lock.unlock(); + } + } + + void flush() { + lock.lock(); + try { + flushBuffer(); + } finally { + lock.unlock(); + } + } + + private void flushBuffer() { + if (buffer.isEmpty()) { + return; + } + final List events = buffer; + buffer = new ArrayList<>(batchSize); + + final DataFile dataFile; + try { + dataFile = writeParquetFile(events); + } catch (final Exception e) { + throw new IcebergCommitException( + "Failed to write Parquet file for Iceberg table " + tableNamespace + "." + tableName, e); + } + + commitWithRetry(dataFile, events.size()); + } + + private void commitWithRetry(final DataFile dataFile, final int eventCount) { + int attempt = 0; + while (true) { + try { + final AppendFiles append = table.newAppend(); + append.appendFile(dataFile); + append.commit(); + LOGGER.debug("Committed {} log events to Iceberg table {}.{}", eventCount, tableNamespace, tableName); + return; + } catch (final Exception e) { + if (attempt >= MAX_COMMIT_RETRIES) { + throw new IcebergCommitException( + "Failed to commit to Iceberg table " + + tableNamespace + + "." + + tableName + + " after " + + (MAX_COMMIT_RETRIES + 1) + + " attempts", + e); + } + final long sleepMs = RETRY_BASE_SLEEP_MS * (1L << attempt); + LOGGER.warn( + "Commit attempt {} failed for Iceberg table {}.{}, retrying in {} ms", + attempt + 1, + tableNamespace, + tableName, + sleepMs, + e); + try { + Thread.sleep(sleepMs); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IcebergCommitException( + "Interrupted while retrying commit to Iceberg table " + tableNamespace + "." + tableName, + e); + } + table.refresh(); + attempt++; + } + } + } + + private DataFile writeParquetFile(final List events) throws IOException { + final String filename = String.format( + "%s/data/%s-%d-%d.parquet", + table.location(), + name, + System.currentTimeMillis(), + Thread.currentThread().getId()); + final OutputFile outputFile = table.io().newOutputFile(filename); + + try (DataWriter writer = Parquet.writeData(outputFile) + .schema(LOG_SCHEMA) + .withSpec(table.spec()) + .createWriterFunc(GenericParquetWriter::create) + .overwrite() + .build()) { + for (final LogEvent event : events) { + writer.write(toRecord(event)); + } + } + + return org.apache.iceberg.DataFiles.builder(table.spec()) + .withPath(filename) + .withFileSizeInBytes(outputFile.toInputFile().getLength()) + .withRecordCount(events.size()) + .withFormat(org.apache.iceberg.FileFormat.PARQUET) + .build(); + } + + private GenericRecord toRecord(final LogEvent event) { + final GenericRecord record = GenericRecord.create(LOG_SCHEMA); + final Instant instant = Instant.ofEpochMilli(event.getTimeMillis()); + record.setField("timestamp", instant.atOffset(ZoneOffset.UTC)); + record.setField("level", event.getLevel().name()); + record.setField("logger_name", event.getLoggerName()); + record.setField( + "message", event.getMessage() != null ? event.getMessage().getFormattedMessage() : null); + record.setField("thread_name", event.getThreadName()); + record.setField("thrown", event.getThrown() != null ? event.getThrown().toString() : null); + record.setField("event_date", instant.atOffset(ZoneOffset.UTC).toLocalDate()); + return record; + } + + boolean stop(final long timeout, final TimeUnit timeUnit) { + if (!running) { + return true; + } + LOGGER.info("Stopping IcebergManager [{}]", name); + running = false; + + if (flushTask != null) { + flushTask.cancel(false); + } + if (scheduler != null) { + scheduler.shutdown(); + try { + scheduler.awaitTermination(timeout, timeUnit); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + flush(); + + if (catalog instanceof java.io.Closeable) { + try { + ((java.io.Closeable) catalog).close(); + } catch (final IOException e) { + LOGGER.warn("Error closing Iceberg catalog", e); + } + } + return true; + } + + String getName() { + return name; + } +} diff --git a/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/package-info.java b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/package-info.java new file mode 100644 index 00000000000..fb606a77d36 --- /dev/null +++ b/log4j-iceberg/src/main/java/org/apache/logging/log4j/iceberg/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Log4j appender for writing log events to Apache Iceberg tables as Parquet files. + */ +@Export +@Version("2.26.0") +package org.apache.logging.log4j.iceberg; + +import org.osgi.annotation.bundle.Export; +import org.osgi.annotation.versioning.Version; diff --git a/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergAppenderTest.java b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergAppenderTest.java new file mode 100644 index 00000000000..87c8c9b62b3 --- /dev/null +++ b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergAppenderTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.impl.Log4jLogEvent; +import org.apache.logging.log4j.message.SimpleMessage; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class IcebergAppenderTest { + + private Path warehouseDir; + private IcebergAppender appender; + + @BeforeEach + void setUp() throws IOException { + warehouseDir = Files.createTempDirectory("iceberg-appender-test"); + } + + @AfterEach + void tearDown() throws IOException { + if (appender != null && appender.isStarted()) { + appender.stop(5, TimeUnit.SECONDS); + } + Files.walk(warehouseDir) + .sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } + + @SuppressWarnings("unchecked") + private IcebergAppender buildAppender(final String tableName) { + final IcebergAppender.Builder> builder = IcebergAppender.newBuilder(); + builder.setName("testAppender"); + builder.setCatalogName("test_catalog"); + builder.setCatalogImpl("hadoop"); + builder.setCatalogWarehouse(warehouseDir.toAbsolutePath().toString()); + builder.setTableNamespace("test_ns"); + builder.setTableName(tableName); + builder.setBatchSize(100); + builder.setFlushIntervalSeconds(3600); + return builder.build(); + } + + @Test + void builderCreatesAppender() { + appender = buildAppender("builder_test"); + assertThat(appender).isNotNull(); + assertThat(appender.getName()).isEqualTo("testAppender"); + } + + @Test + void startAndStop() { + appender = buildAppender("start_stop"); + appender.start(); + assertThat(appender.isStarted()).isTrue(); + + assertThat(appender.stop(5, TimeUnit.SECONDS)).isTrue(); + assertThat(appender.isStopped()).isTrue(); + } + + @Test + void appendWritesEvents() throws IOException { + appender = buildAppender("append_test"); + appender.start(); + + final LogEvent event = Log4jLogEvent.newBuilder() + .setLoggerName("com.example.AppenderTest") + .setLevel(Level.WARN) + .setMessage(new SimpleMessage("appender test message")) + .setThreadName("test-thread") + .setTimeMillis(System.currentTimeMillis()) + .build(); + + appender.append(event); + appender.stop(5, TimeUnit.SECONDS); + + final IcebergManager reader = new IcebergManager( + "reader", + "test_catalog", + "hadoop", + null, + warehouseDir.toAbsolutePath().toString(), + "test_ns", + "append_test", + 100, + 3600); + reader.startup(); + reader.table.refresh(); + try (CloseableIterable records = + IcebergGenerics.read(reader.table).build()) { + int count = 0; + for (final Record record : records) { + assertThat(record.getField("level")).isEqualTo("WARN"); + assertThat(record.getField("message")).isEqualTo("appender test message"); + count++; + } + assertThat(count).isEqualTo(1); + } + reader.stop(5, TimeUnit.SECONDS); + } + + @Test + void newBuilderFactory() { + final IcebergAppender.Builder builder = IcebergAppender.newBuilder(); + assertThat(builder).isNotNull(); + } + + @Test + void builderSettersReturnBuilder() { + final IcebergAppender.Builder builder = IcebergAppender.newBuilder(); + assertThat(builder.setCatalogName("c")).isSameAs(builder); + assertThat(builder.setCatalogImpl("i")).isSameAs(builder); + assertThat(builder.setCatalogUri("u")).isSameAs(builder); + assertThat(builder.setCatalogWarehouse("w")).isSameAs(builder); + assertThat(builder.setTableNamespace("n")).isSameAs(builder); + assertThat(builder.setTableName("t")).isSameAs(builder); + assertThat(builder.setBatchSize(10)).isSameAs(builder); + assertThat(builder.setFlushIntervalSeconds(5)).isSameAs(builder); + } +} diff --git a/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergInspectTest.java b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergInspectTest.java new file mode 100644 index 00000000000..9a4eb1ccbcf --- /dev/null +++ b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergInspectTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.iceberg; + +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.impl.Log4jLogEvent; +import org.apache.logging.log4j.message.SimpleMessage; +import org.junit.jupiter.api.Test; + +class IcebergInspectTest { + + @Test + void createInspectableTable() { + final String warehouse = "/tmp/iceberg-inspect"; + final IcebergManager manager = new IcebergManager( + "inspect", "inspect_catalog", "hadoop", null, warehouse, "logs", "app_logs", 500, 3600); + manager.startup(); + + final Level[] levels = {Level.TRACE, Level.DEBUG, Level.INFO, Level.WARN, Level.ERROR}; + for (int i = 0; i < 1000; i++) { + final LogEvent event = Log4jLogEvent.newBuilder() + .setLoggerName("com.example.App") + .setLevel(levels[i % levels.length]) + .setMessage(new SimpleMessage("log message " + i)) + .setThreadName("main") + .setTimeMillis(1700000000000L + i * 1000L) + .build(); + manager.write(event.toImmutable()); + } + + manager.stop(10, TimeUnit.SECONDS); + System.out.println("Iceberg table written to: " + warehouse + "/logs/app_logs"); + System.out.println("Inspect with: find " + warehouse + " -type f"); + } +} diff --git a/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergManagerTest.java b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergManagerTest.java new file mode 100644 index 00000000000..8797aac3900 --- /dev/null +++ b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergManagerTest.java @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.OutputFile; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.impl.Log4jLogEvent; +import org.apache.logging.log4j.message.SimpleMessage; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +class IcebergManagerTest { + + private Path warehouseDir; + private IcebergManager manager; + + @BeforeEach + void setUp() throws IOException { + warehouseDir = Files.createTempDirectory("iceberg-test-warehouse"); + } + + @AfterEach + void tearDown() throws IOException { + if (manager != null) { + manager.stop(5, TimeUnit.SECONDS); + } + Files.walk(warehouseDir) + .sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } + + private IcebergManager createManager(final String tableName, final int batchSize) { + return new IcebergManager( + "test", + "test_catalog", + "hadoop", + null, + warehouseDir.toAbsolutePath().toString(), + "test_ns", + tableName, + batchSize, + 3600); + } + + private LogEvent makeEvent(final String loggerName, final Level level, final String message) { + return Log4jLogEvent.newBuilder() + .setLoggerName(loggerName) + .setLevel(level) + .setMessage(new SimpleMessage(message)) + .setThreadName("test-thread") + .setTimeMillis(System.currentTimeMillis()) + .build(); + } + + private LogEvent makeEventWithThrown( + final String loggerName, final Level level, final String message, final Throwable thrown) { + return Log4jLogEvent.newBuilder() + .setLoggerName(loggerName) + .setLevel(level) + .setMessage(new SimpleMessage(message)) + .setThreadName("test-thread") + .setTimeMillis(System.currentTimeMillis()) + .setThrown(thrown) + .build(); + } + + private LogEvent makeEventNullMessage(final String loggerName, final Level level) { + return Log4jLogEvent.newBuilder() + .setLoggerName(loggerName) + .setLevel(level) + .setThreadName("test-thread") + .setTimeMillis(System.currentTimeMillis()) + .build(); + } + + @Test + void startupCreatesNewTable() { + manager = createManager("new_table", 100); + manager.startup(); + + assertThat(manager.table).isNotNull(); + assertThat(manager.catalog.tableExists(TableIdentifier.of(Namespace.of("test_ns"), "new_table"))) + .isTrue(); + } + + @Test + void startupLoadsExistingTable() { + manager = createManager("existing_table", 100); + manager.startup(); + + final Table firstTable = manager.table; + manager.stop(5, TimeUnit.SECONDS); + + manager = createManager("existing_table", 100); + manager.startup(); + assertThat(manager.table).isNotNull(); + assertThat(manager.table.location()).isEqualTo(firstTable.location()); + } + + @Test + void startupIdempotentWhenAlreadyRunning() { + manager = createManager("idempotent_table", 100); + manager.startup(); + final Table firstTable = manager.table; + + manager.startup(); + assertThat(manager.table).isSameAs(firstTable); + } + + @Test + void writeAndFlushSingleEvent() throws IOException { + manager = createManager("single_event", 100); + manager.startup(); + + manager.write(makeEvent("com.example.Test", Level.INFO, "hello iceberg").toImmutable()); + manager.flush(); + + manager.table.refresh(); + try (CloseableIterable records = + IcebergGenerics.read(manager.table).build()) { + int count = 0; + for (final Record record : records) { + assertThat(record.getField("level")).isEqualTo("INFO"); + assertThat(record.getField("logger_name")).isEqualTo("com.example.Test"); + assertThat(record.getField("message")).isEqualTo("hello iceberg"); + assertThat(record.getField("thread_name")).isEqualTo("test-thread"); + assertThat(record.getField("thrown")).isNull(); + assertThat(record.getField("timestamp")).isNotNull(); + assertThat(record.getField("event_date")).isNotNull(); + count++; + } + assertThat(count).isEqualTo(1); + } + } + + @Test + void writeTriggersFlushWhenBatchSizeReached() throws IOException { + manager = createManager("batch_flush", 3); + manager.startup(); + + manager.write(makeEvent("logger1", Level.DEBUG, "msg1").toImmutable()); + manager.write(makeEvent("logger2", Level.WARN, "msg2").toImmutable()); + manager.write(makeEvent("logger3", Level.ERROR, "msg3").toImmutable()); + + manager.table.refresh(); + try (CloseableIterable records = + IcebergGenerics.read(manager.table).build()) { + int count = 0; + for (final Record ignored : records) { + count++; + } + assertThat(count).isEqualTo(3); + } + } + + @Test + void flushOnEmptyBufferIsNoOp() { + manager = createManager("empty_flush", 100); + manager.startup(); + + manager.flush(); + + assertThat(manager.table.snapshots()).isEmpty(); + } + + @Test + void eventWithThrownIsRecorded() throws IOException { + manager = createManager("thrown_event", 100); + manager.startup(); + + final RuntimeException ex = new RuntimeException("test failure"); + manager.write(makeEventWithThrown("com.example.Error", Level.ERROR, "oops", ex) + .toImmutable()); + manager.flush(); + + manager.table.refresh(); + try (CloseableIterable records = + IcebergGenerics.read(manager.table).build()) { + for (final Record record : records) { + assertThat((String) record.getField("thrown")).contains("test failure"); + } + } + } + + @Test + void eventWithNullMessageIsRecorded() throws IOException { + manager = createManager("null_msg", 100); + manager.startup(); + + manager.write(makeEventNullMessage("com.example.Null", Level.TRACE).toImmutable()); + manager.flush(); + + manager.table.refresh(); + try (CloseableIterable records = + IcebergGenerics.read(manager.table).build()) { + for (final Record record : records) { + assertThat(record.getField("message")).isNull(); + assertThat(record.getField("thrown")).isNull(); + } + } + } + + @Test + void multipleFlushesProduceMultipleSnapshots() throws IOException { + manager = createManager("multi_flush", 100); + manager.startup(); + + manager.write(makeEvent("logger", Level.INFO, "batch1").toImmutable()); + manager.flush(); + + manager.write(makeEvent("logger", Level.INFO, "batch2").toImmutable()); + manager.flush(); + + manager.table.refresh(); + int snapshotCount = 0; + for (final org.apache.iceberg.Snapshot ignored : manager.table.snapshots()) { + snapshotCount++; + } + assertThat(snapshotCount).isEqualTo(2); + + try (CloseableIterable records = + IcebergGenerics.read(manager.table).build()) { + int count = 0; + for (final Record ignored : records) { + count++; + } + assertThat(count).isEqualTo(2); + } + } + + @Test + void stopWhenNotRunningReturnsTrueImmediately() { + manager = createManager("not_running", 100); + assertThat(manager.stop(1, TimeUnit.SECONDS)).isTrue(); + manager = null; + } + + @Test + void stopFlushesRemainingEvents() throws IOException { + manager = createManager("stop_flush", 100); + manager.startup(); + + manager.write(makeEvent("logger", Level.INFO, "pending").toImmutable()); + manager.stop(5, TimeUnit.SECONDS); + + final IcebergManager reader = createManager("stop_flush", 100); + reader.startup(); + reader.table.refresh(); + try (CloseableIterable records = + IcebergGenerics.read(reader.table).build()) { + int count = 0; + for (final Record record : records) { + assertThat(record.getField("message")).isEqualTo("pending"); + count++; + } + assertThat(count).isEqualTo(1); + } + reader.stop(5, TimeUnit.SECONDS); + manager = null; + } + + @Test + void getName() { + manager = createManager("name_test", 100); + assertThat(manager.getName()).isEqualTo("test"); + manager = null; + } + + @Test + void startupWithNullUriAndWarehouse() { + manager = new IcebergManager( + "test", + "test_catalog", + "hadoop", + null, + warehouseDir.toAbsolutePath().toString(), + "test_ns", + "null_uri_test", + 100, + 3600); + manager.startup(); + assertThat(manager.table).isNotNull(); + } + + @Test + void startupWithCatalogUri() { + manager = new IcebergManager( + "test", + "test_catalog", + "hadoop", + "file:///tmp/unused", + warehouseDir.toAbsolutePath().toString(), + "test_ns", + "uri_test", + 100, + 3600); + manager.startup(); + assertThat(manager.table).isNotNull(); + } + + @Test + void commitRetrySucceedsAfterTransientFailure() throws IOException { + manager = createManager("retry_ok", 100); + manager.startup(); + + manager.write(makeEvent("logger", Level.INFO, "retry event").toImmutable()); + + final Table original = manager.table; + final Table spyTable = Mockito.spy(original); + final AppendFiles failingAppend = Mockito.mock(AppendFiles.class); + Mockito.doThrow(new org.apache.iceberg.exceptions.CommitFailedException("conflict")) + .when(failingAppend) + .commit(); + final AppendFiles realAppend = original.newAppend(); + + Mockito.when(spyTable.newAppend()).thenReturn(failingAppend).thenReturn(realAppend); + manager.table = spyTable; + + manager.flush(); + + original.refresh(); + try (CloseableIterable records = IcebergGenerics.read(original).build()) { + int count = 0; + for (final Record record : records) { + assertThat(record.getField("message")).isEqualTo("retry event"); + count++; + } + assertThat(count).isEqualTo(1); + } + manager.table = original; + } + + @Test + void commitCrashesAfterExhaustingRetries() { + manager = createManager("retry_exhaust", 100); + manager.startup(); + + manager.write(makeEvent("logger", Level.INFO, "will exhaust retries").toImmutable()); + + final Table original = manager.table; + final Table mockTable = Mockito.mock(Table.class); + Mockito.when(mockTable.location()).thenReturn(original.location()); + Mockito.when(mockTable.io()).thenReturn(original.io()); + Mockito.when(mockTable.spec()).thenReturn(original.spec()); + final AppendFiles failingAppend = Mockito.mock(AppendFiles.class); + Mockito.doThrow(new org.apache.iceberg.exceptions.CommitFailedException("persistent conflict")) + .when(failingAppend) + .commit(); + Mockito.when(mockTable.newAppend()).thenReturn(failingAppend); + manager.table = mockTable; + + assertThatThrownBy(() -> manager.flush()) + .isInstanceOf(IcebergCommitException.class) + .hasMessageContaining("after " + (IcebergManager.MAX_COMMIT_RETRIES + 1) + " attempts") + .hasCauseInstanceOf(org.apache.iceberg.exceptions.CommitFailedException.class); + + manager.table = original; + } + + @Test + void stopHandlesInterruptedException() throws InterruptedException { + manager = createManager("interrupt_test", 100); + manager.startup(); + + final ScheduledExecutorService realScheduler = manager.scheduler; + realScheduler.shutdown(); + final ScheduledExecutorService mockScheduler = Mockito.mock(ScheduledExecutorService.class); + Mockito.when(mockScheduler.awaitTermination(Mockito.anyLong(), Mockito.any())) + .thenThrow(new InterruptedException("test interrupt")); + manager.scheduler = mockScheduler; + + assertThat(manager.stop(5, TimeUnit.SECONDS)).isTrue(); + assertThat(Thread.interrupted()).isTrue(); + manager = null; + } + + @Test + void stopClosesCloseableCatalog() { + manager = createManager("closeable_test", 100); + manager.startup(); + // HadoopCatalog implements Closeable, so this path is naturally covered + assertThat(manager.catalog).isInstanceOf(Closeable.class); + assertThat(manager.stop(5, TimeUnit.SECONDS)).isTrue(); + manager = null; + } + + @Test + void stopHandlesCloseableIOException() throws IOException { + manager = createManager("close_error_test", 100); + manager.startup(); + + // Replace catalog with a Closeable mock that throws on close + final CloseableCatalog mockCatalog = Mockito.mock(CloseableCatalog.class); + Mockito.doThrow(new IOException("close failed")).when(mockCatalog).close(); + manager.catalog = mockCatalog; + + assertThat(manager.stop(5, TimeUnit.SECONDS)).isTrue(); + manager = null; + } + + interface CloseableCatalog extends Catalog, Closeable {} + + @Test + void startupWithNullWarehouse() { + manager = + new IcebergManager("test", "test_catalog", "hadoop", null, null, "test_ns", "null_wh_test", 100, 3600); + try { + manager.startup(); + } catch (final Exception e) { + // Expected — hadoop catalog requires a warehouse + } + manager = null; + } + + @Test + void stopWithNullFlushTaskAndScheduler() { + manager = createManager("null_sched", 100); + manager.running = true; + manager.catalog = Mockito.mock(Catalog.class); + assertThat(manager.stop(1, TimeUnit.SECONDS)).isTrue(); + manager = null; + } + + @Test + void stopWithNonCloseableCatalog() { + manager = createManager("non_closeable", 100); + manager.running = true; + manager.catalog = Mockito.mock(Catalog.class); + assertThat(manager.stop(1, TimeUnit.SECONDS)).isTrue(); + manager = null; + } + + @Test + void writeParquetFileIOExceptionCrashes() { + manager = createManager("io_error", 100); + manager.startup(); + + manager.write(makeEvent("logger", Level.INFO, "will fail IO").toImmutable()); + final Table original = manager.table; + final Table mockTable = Mockito.mock(Table.class); + Mockito.when(mockTable.location()).thenReturn(original.location()); + Mockito.when(mockTable.spec()).thenReturn(original.spec()); + final org.apache.iceberg.io.FileIO mockIo = Mockito.mock(org.apache.iceberg.io.FileIO.class); + Mockito.when(mockTable.io()).thenReturn(mockIo); + final OutputFile mockOutput = Mockito.mock(OutputFile.class); + Mockito.when(mockIo.newOutputFile(Mockito.anyString())).thenReturn(mockOutput); + Mockito.when(mockOutput.createOrOverwrite()).thenThrow(new RuntimeException(new IOException("disk full"))); + manager.table = mockTable; + + assertThatThrownBy(() -> manager.flush()) + .isInstanceOf(IcebergCommitException.class) + .hasMessageContaining("Failed to write Parquet file"); + manager.table = original; + } + + @Test + void commitRetryInterruptedCrashes() { + manager = createManager("retry_interrupt", 100); + manager.startup(); + + manager.write(makeEvent("logger", Level.INFO, "will be interrupted").toImmutable()); + + final Table original = manager.table; + final Table spyTable = Mockito.spy(original); + final AppendFiles failingAppend = Mockito.mock(AppendFiles.class); + Mockito.doAnswer(invocation -> { + Thread.currentThread().interrupt(); + throw new org.apache.iceberg.exceptions.CommitFailedException("conflict"); + }) + .when(failingAppend) + .commit(); + Mockito.when(spyTable.newAppend()).thenReturn(failingAppend); + manager.table = spyTable; + + assertThatThrownBy(() -> manager.flush()) + .isInstanceOf(IcebergCommitException.class) + .hasMessageContaining("Interrupted"); + Thread.interrupted(); + manager.table = original; + } +} diff --git a/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergPerformanceTest.java b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergPerformanceTest.java new file mode 100644 index 00000000000..29e87d23969 --- /dev/null +++ b/log4j-iceberg/src/test/java/org/apache/logging/log4j/iceberg/IcebergPerformanceTest.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.logging.log4j.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.FileAppender; +import org.apache.logging.log4j.core.impl.Log4jLogEvent; +import org.apache.logging.log4j.core.layout.PatternLayout; +import org.apache.logging.log4j.message.SimpleMessage; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class IcebergPerformanceTest { + + private static final int TOTAL_RECORDS = 1_000_000; + private static final int RECORDS_PER_COMMIT = 100_000; + + private Path tempDir; + private IcebergManager manager; + + @BeforeEach + void setUp() throws IOException { + tempDir = Files.createTempDirectory("iceberg-perf-test"); + } + + @AfterEach + void tearDown() throws IOException { + if (manager != null) { + manager.stop(30, TimeUnit.SECONDS); + } + Files.walk(tempDir).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); + } + + private static final Level[] LEVELS = {Level.TRACE, Level.DEBUG, Level.INFO, Level.WARN, Level.ERROR}; + + private LogEvent buildEvent(final int i) { + return Log4jLogEvent.newBuilder() + .setLoggerName("com.example.perf.Logger" + (i % 100)) + .setLevel(LEVELS[i % LEVELS.length]) + .setMessage(new SimpleMessage("perf-message-" + i)) + .setThreadName("perf-thread-" + (i % 10)) + .setTimeMillis(1700000000000L + i) + .build(); + } + + @Test + void writeOneMillionRecordsWithPeriodicCommits() throws IOException { + + // ============================================================ + // Phase 1: File Appender benchmark + // ============================================================ + final Path logFile = tempDir.resolve("perf-test.log"); + final PatternLayout layout = PatternLayout.newBuilder() + .withPattern("%d %-5level [%t] %logger - %msg%n") + .build(); + final FileAppender fileAppender = FileAppender.newBuilder() + .setName("filePerf") + .withFileName(logFile.toString()) + .withImmediateFlush(false) + .withBufferedIo(true) + .withBufferSize(8192) + .setLayout(layout) + .build(); + fileAppender.start(); + + final long fileWriteStart = System.nanoTime(); + for (int i = 0; i < TOTAL_RECORDS; i++) { + fileAppender.append(buildEvent(i)); + } + fileAppender.stop(30, TimeUnit.SECONDS); + final long fileWriteMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fileWriteStart); + final double fileWriteThroughput = (double) TOTAL_RECORDS / fileWriteMs * 1000.0; + + int fileLineCount = 0; + try (BufferedReader reader = new BufferedReader(new FileReader(logFile.toFile()))) { + while (reader.readLine() != null) { + fileLineCount++; + } + } + assertThat(fileLineCount).isEqualTo(TOTAL_RECORDS); + final long fileSizeBytes = Files.size(logFile); + + // ============================================================ + // Phase 2: Iceberg Appender benchmark + // ============================================================ + final Path warehouseDir = tempDir.resolve("warehouse"); + Files.createDirectories(warehouseDir); + manager = new IcebergManager( + "perf_test", + "perf_catalog", + "hadoop", + null, + warehouseDir.toAbsolutePath().toString(), + "perf_ns", + "perf_table", + RECORDS_PER_COMMIT, + 3600); + manager.startup(); + + final long icebergWriteStart = System.nanoTime(); + for (int i = 0; i < TOTAL_RECORDS; i++) { + manager.write(buildEvent(i).toImmutable()); + } + manager.flush(); + final long icebergWriteMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - icebergWriteStart); + final double icebergWriteThroughput = (double) TOTAL_RECORDS / icebergWriteMs * 1000.0; + + manager.table.refresh(); + int snapshotCount = 0; + for (final Snapshot ignored : manager.table.snapshots()) { + snapshotCount++; + } + final int expectedCommits = (TOTAL_RECORDS + RECORDS_PER_COMMIT - 1) / RECORDS_PER_COMMIT; + assertThat(snapshotCount).isEqualTo(expectedCommits); + + // Iceberg read-back + final long icebergReadStart = System.nanoTime(); + final Map levelCounts = new HashMap<>(); + final Set seenMessages = new HashSet<>(); + int totalRead = 0; + + try (CloseableIterable records = + IcebergGenerics.read(manager.table).build()) { + for (final Record record : records) { + final String level = (String) record.getField("level"); + final String message = (String) record.getField("message"); + + assertThat(level).isNotNull(); + assertThat(message).startsWith("perf-message-"); + assertThat(record.getField("logger_name").toString()).startsWith("com.example.perf.Logger"); + assertThat(record.getField("thread_name").toString()).startsWith("perf-thread-"); + assertThat(record.getField("timestamp")).isNotNull(); + assertThat(record.getField("event_date")).isNotNull(); + + levelCounts.merge(level, 1, Integer::sum); + seenMessages.add(message); + totalRead++; + } + } + final long icebergReadMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - icebergReadStart); + final double icebergReadThroughput = (double) totalRead / icebergReadMs * 1000.0; + + assertThat(totalRead).isEqualTo(TOTAL_RECORDS); + assertThat(seenMessages).hasSize(TOTAL_RECORDS); + final int expectedPerLevel = TOTAL_RECORDS / LEVELS.length; + for (final Level level : LEVELS) { + assertThat(levelCounts.get(level.name())) + .as("Count for level %s", level.name()) + .isEqualTo(expectedPerLevel); + } + + long icebergSizeBytes = 0; + try (java.util.stream.Stream paths = Files.walk(warehouseDir)) { + icebergSizeBytes = paths.filter(Files::isRegularFile) + .mapToLong(p -> p.toFile().length()) + .sum(); + } + + // ============================================================ + // Results + // ============================================================ + System.out.println(); + System.out.println("========== Performance Comparison (1M records) =========="); + System.out.println(); + System.out.printf("%-25s %15s %15s%n", "", "File Appender", "Iceberg"); + System.out.printf("%-25s %15s %15s%n", "-------------------------", "---------------", "---------------"); + System.out.printf("%-25s %12d ms %12d ms%n", "Write time", fileWriteMs, icebergWriteMs); + System.out.printf("%-25s %12.0f/s %12.0f/s%n", "Write throughput", fileWriteThroughput, icebergWriteThroughput); + System.out.printf("%-25s %15s %12d ms%n", "Read time", "n/a", icebergReadMs); + System.out.printf("%-25s %15s %12.0f/s%n", "Read throughput", "n/a", icebergReadThroughput); + System.out.printf( + "%-25s %12.1f MB %12.1f MB%n", + "Disk size", fileSizeBytes / (1024.0 * 1024.0), icebergSizeBytes / (1024.0 * 1024.0)); + System.out.printf("%-25s %15d %15d%n", "Commits/flushes", 1, snapshotCount); + System.out.printf("%-25s %12.1fx %15s%n", "Iceberg overhead", (double) icebergWriteMs / fileWriteMs, ""); + System.out.printf("%-25s %12.1fx %15s%n", "Iceberg compression", (double) fileSizeBytes / icebergSizeBytes, ""); + System.out.println(); + System.out.println("Level distribution: " + levelCounts); + System.out.println("All " + TOTAL_RECORDS + " records verified correct in both appenders."); + System.out.println("========================================================="); + } +} diff --git a/pom.xml b/pom.xml index 87123ce4374..5911d5e6ed2 100644 --- a/pom.xml +++ b/pom.xml @@ -248,6 +248,7 @@ log4j-couchdb log4j-docker log4j-fuzz-test + log4j-iceberg log4j-iostreams log4j-jakarta-jms log4j-jakarta-smtp @@ -441,6 +442,12 @@ 2.23.1 + + org.apache.logging.log4j + log4j-iceberg + ${project.version} + + org.apache.logging.log4j log4j-iostreams