diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/rocksdb/BoundedMemoryConfigSetter.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/rocksdb/BoundedMemoryConfigSetter.java index c3bb6d2..e57b1d0 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/rocksdb/BoundedMemoryConfigSetter.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/rocksdb/BoundedMemoryConfigSetter.java @@ -8,6 +8,7 @@ import static org.hypertrace.core.kafkastreams.framework.rocksdb.RocksDBConfigs.MAX_SIZE_AMPLIFICATION_PERCENT; import static org.hypertrace.core.kafkastreams.framework.rocksdb.RocksDBConfigs.OPTIMIZE_FOR_POINT_LOOKUPS; import static org.hypertrace.core.kafkastreams.framework.rocksdb.RocksDBConfigs.PERIODIC_COMPACTION_SECONDS; +import static org.hypertrace.core.kafkastreams.framework.rocksdb.RocksDBConfigs.WAL_DISABLED; import java.util.Map; import org.apache.kafka.streams.state.RocksDBConfigSetter; @@ -52,6 +53,12 @@ public void setConfig(String storeName, Options options, Map con options.setUseDirectReads(Boolean.valueOf(String.valueOf(configs.get(DIRECT_READS_ENABLED)))); } + if (configs.containsKey(WAL_DISABLED) && Boolean.parseBoolean(String.valueOf(configs.get(WAL_DISABLED)))) { + // WriteOptions.setDisableWAL() is per-write and cannot be applied through RocksDBConfigSetter. + // setManualWalFlush suppresses implicit WAL flushes after each write; closest option-level equivalent. + options.setManualWalFlush(true); + } + if (configs.containsKey(OPTIMIZE_FOR_POINT_LOOKUPS)) { Boolean optimizeForPointLookups = Boolean.valueOf(String.valueOf(configs.get(OPTIMIZE_FOR_POINT_LOOKUPS))); diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/rocksdb/RocksDBConfigs.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/rocksdb/RocksDBConfigs.java index 5d180b0..525be9d 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/rocksdb/RocksDBConfigs.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/rocksdb/RocksDBConfigs.java @@ -36,6 +36,7 @@ public class RocksDBConfigs { public static final String DIRECT_READS_ENABLED = rocksdbPrefix("direct.reads.enabled"); public static final String OPTIMIZE_FOR_POINT_LOOKUPS = rocksdbPrefix("optimize.point.lookups"); public static final String LOG_LEVEL_CONFIG = rocksdbPrefix("log.level"); + public static final String WAL_DISABLED = rocksdbPrefix("disable.wal"); public static String rocksdbPrefix(String configKey) { return ROCKS_DB_PREFIX + configKey;