diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 39d015ec82cd0..3ad72dcf2fc18 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -45,7 +45,7 @@ */ public class AbstractConfig { - private static final Logger log = LoggerFactory.getLogger(AbstractConfig.class); + public static final Logger log = LoggerFactory.getLogger(AbstractConfig.class); /** * Configs for which values have been requested, used to detect unused configs. diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 08eff37b396f7..749486c3f3c60 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -229,7 +229,7 @@ class ControllerServer( OptionalLong.empty() } - val maxIdleIntervalNs = config.metadataMaxIdleIntervalNs.fold(OptionalLong.empty)(OptionalLong.of) + val maxIdleIntervalNs = config.metadataMaxIdleIntervalNs.map[OptionalLong](v => OptionalLong.of(v)).orElse(OptionalLong.empty) quorumControllerMetrics = new QuorumControllerMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry), time, config.brokerSessionTimeoutMs) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index c5075d5bd9949..e776d1a62d9c0 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -17,36 +17,32 @@ package kafka.server -import java.util -import java.util.concurrent.TimeUnit -import java.util.Properties -import kafka.utils.Logging import kafka.utils.Implicits._ -import org.apache.kafka.common.{Endpoint, Reconfigurable} -import org.apache.kafka.common.config.{ConfigDef, ConfigException, TopicConfig} +import kafka.utils.Logging import org.apache.kafka.common.config.ConfigDef.ConfigKey import org.apache.kafka.common.config.internals.BrokerSecurityConfigs +import org.apache.kafka.common.config.{ConfigDef, ConfigException, TopicConfig} import org.apache.kafka.common.internals.Plugin import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.security.auth.KafkaPrincipalSerde -import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.auth.{KafkaPrincipalSerde, SecurityProtocol} import org.apache.kafka.common.utils.Utils -import org.apache.kafka.coordinator.group.Group.GroupType -import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig +import org.apache.kafka.common.{Endpoint, Reconfigurable} import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig import org.apache.kafka.coordinator.share.ShareCoordinatorConfig import org.apache.kafka.network.SocketServerConfigs -import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig} +import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig} import org.apache.kafka.security.authorizer.AuthorizerUtils import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.config.{AbstractKafkaConfig, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, DynamicBrokerConfig => JDynamicBrokerConfig} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig -import org.apache.kafka.server.metrics.MetricConfigs -import scala.jdk.CollectionConverters._ +import java.util +import java.util.Properties import scala.collection.{Map, Seq} +import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters.{RichOption, RichOptional} object KafkaConfig { @@ -110,7 +106,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) // Cache the current config to avoid acquiring read lock to access from dynamicConfig @volatile private var currentConfig = this - val processRoles: Set[ProcessRole] = parseProcessRoles() + val processRoles: Set[ProcessRole] = parseProcessRoles().asScala.toSet private[server] val dynamicConfig = new DynamicBrokerConfig(this) private[server] def updateCurrentConfig(newConfig: KafkaConfig): Unit = { @@ -162,47 +158,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) private val _quotaConfig = new QuotaConfig(this) def quotaConfig: QuotaConfig = _quotaConfig - /** ********* General Configuration ***********/ - val brokerSessionTimeoutMs: Int = getInt(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG) - val controllerPerformanceSamplePeriodMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS) - val controllerPerformanceAlwaysLogThresholdMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS) - - private def parseProcessRoles(): Set[ProcessRole] = { - val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map { - case "broker" => ProcessRole.BrokerRole - case "controller" => ProcessRole.ControllerRole - case role => throw new ConfigException(s"Unknown process role '$role'" + - " (only 'broker' and 'controller' are allowed roles)") - } - roles.toSet - } - def isKRaftCombinedMode: Boolean = { - processRoles == Set(ProcessRole.BrokerRole, ProcessRole.ControllerRole) - } - - def metadataLogDir: String = { - Option(getString(MetadataLogConfig.METADATA_LOG_DIR_CONFIG)) match { - case Some(dir) => dir - case None => logDirs.get(0) - } - } - - val serverMaxStartupTimeMs = getLong(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG) - - def messageMaxBytes = getInt(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG) - - def getNumReplicaAlterLogDirsThreads: Int = { - val numThreads: Integer = Option(getInt(ServerConfigs.NUM_REPLICA_ALTER_LOG_DIRS_THREADS_CONFIG)).getOrElse(logDirs.size) - numThreads - } - - /************* Metadata Configuration ***********/ - val metadataSnapshotMaxNewRecordBytes = getLong(MetadataLogConfig.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG) - val metadataSnapshotMaxIntervalMs = getLong(MetadataLogConfig.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG) - val metadataMaxIdleIntervalNs: Option[Long] = { - val value = TimeUnit.NANOSECONDS.convert(getInt(MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG).toLong, TimeUnit.MILLISECONDS) - if (value > 0) Some(value) else None + super.isKRaftCombinedMode(processRoles.asJava) } /************* Authorizer Configuration ***********/ @@ -232,103 +189,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) } } - /***************** rack configuration **************/ - val replicaSelectorClassName = Option(getString(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG)) - - /** ********* Replication configuration ***********/ - val controllerSocketTimeoutMs: Int = getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG) - val defaultReplicationFactor: Int = getInt(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG) - val replicaLagTimeMaxMs = getLong(ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG) - val replicaSocketTimeoutMs = getInt(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG) - val replicaSocketReceiveBufferBytes = getInt(ReplicationConfigs.REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_CONFIG) - val replicaFetchMaxBytes = getInt(ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG) - val replicaFetchWaitMaxMs = getInt(ReplicationConfigs.REPLICA_FETCH_WAIT_MAX_MS_CONFIG) - val replicaFetchMinBytes = getInt(ReplicationConfigs.REPLICA_FETCH_MIN_BYTES_CONFIG) - val replicaFetchResponseMaxBytes = getInt(ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_CONFIG) - val replicaFetchBackoffMs = getInt(ReplicationConfigs.REPLICA_FETCH_BACKOFF_MS_CONFIG) - val replicaHighWatermarkCheckpointIntervalMs = getLong(ReplicationConfigs.REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_CONFIG) - val fetchPurgatoryPurgeIntervalRequests = getInt(ReplicationConfigs.FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG) - val producerPurgatoryPurgeIntervalRequests = getInt(ReplicationConfigs.PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG) - val deleteRecordsPurgatoryPurgeIntervalRequests = getInt(ReplicationConfigs.DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG) - val autoLeaderRebalanceEnable = getBoolean(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG) - val leaderImbalanceCheckIntervalSeconds: Long = getLong(ReplicationConfigs.LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_CONFIG) - val uncleanLeaderElectionCheckIntervalMs: Long = getLong(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG) - def uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG) - def followerFetchLastTieredOffsetEnable: java.lang.Boolean = getBoolean(ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG); - - /** ********* Controlled shutdown configuration ***********/ - val controlledShutdownEnable = getBoolean(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG) - - /** Group coordinator configs */ - val groupCoordinatorRebalanceProtocols = { - val protocols = getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG) - .asScala.map(_.toUpperCase).map(GroupType.valueOf).toSet - if (!protocols.contains(GroupType.CLASSIC)) { - throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' protocol is not supported.") - } - if (doLog && protocols.contains(GroupType.SHARE)) { - warn(s"'${GroupType.SHARE}' in `${GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG}` is deprecated. " + - s"Share groups are controlled by the 'share.version' feature. " + - s"This config will be removed in Kafka 5.0.") - } - if (doLog && originals().containsKey(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG)) { - val defaultProtocols = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT - .asScala.map(_.toUpperCase).map(GroupType.valueOf).toSet - val missingProtocols = defaultProtocols -- protocols - if (missingProtocols.nonEmpty) { - warn(s"The config `${GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG}` is deprecated and will be removed in Kafka 5.0. " + - s"The following protocol(s) are currently disabled: ${missingProtocols.mkString(", ")}. " + - s"In Kafka 5.0, all protocols will always be enabled and controlled solely by feature versions " + - s"(group.version, streams.version, share.version) via kafka-features.sh. " + - s"Please remove the configuration, which will restore all protocols to the default enabled state, to prepare for the upgrade.") - } else { - warn(s"The config `${GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG}` is deprecated and will be removed in Kafka 5.0. " + - s"Please remove the configuration to prepare for the upgrade.") - } - } - protocols - } - - /** ********* Metric Configuration **************/ - val metricNumSamples = getInt(MetricConfigs.METRIC_NUM_SAMPLES_CONFIG) - val metricSampleWindowMs = getLong(MetricConfigs.METRIC_SAMPLE_WINDOW_MS_CONFIG) - val metricRecordingLevel = getString(MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG) - - /** ********* Kafka Client Telemetry Metrics Configuration ***********/ - val clientTelemetryMaxBytes: Int = getInt(MetricConfigs.CLIENT_TELEMETRY_MAX_BYTES_CONFIG) - - /** ********* SSL/SASL Configuration **************/ - // Security configs may be overridden for listeners, so it is not safe to use the base values - // Hence the base SSL/SASL configs are not fields of KafkaConfig, listener configs should be - // retrieved using KafkaConfig#valuesWithPrefixOverride - private def saslEnabledMechanisms(listenerName: ListenerName): Set[String] = { - val value = valuesWithPrefixOverride(listenerName.configPrefix).get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG) - if (value != null) - value.asInstanceOf[util.List[String]].asScala.toSet - else - Set.empty[String] - } - - def saslMechanismInterBrokerProtocol = getString(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG) - - /** ********* Fetch Configuration **************/ - val maxIncrementalFetchSessionCacheSlots = getInt(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG) - val fetchMaxBytes = getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG) - - /** ********* Request Limit Configuration ***********/ - val maxRequestPartitionSizeLimit = getInt(ServerConfigs.MAX_REQUEST_PARTITION_SIZE_LIMIT_CONFIG) - - val deleteTopicEnable = getBoolean(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG) - def compressionType = getString(ServerConfigs.COMPRESSION_TYPE_CONFIG) - - def gzipCompressionLevel = getInt(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG) - def lz4CompressionLevel = getInt(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG) - def zstdCompressionLevel = getInt(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG) - - /** Internal Configurations **/ - val unstableApiVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG) - val unstableFeatureVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG) - override def addReconfigurable(reconfigurable: Reconfigurable): Unit = { dynamicConfig.addReconfigurable(reconfigurable) } @@ -515,7 +375,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) validateControllerQuorumVotersMustContainNodeIdForKRaftController() validateAdvertisedControllerListenersNonEmptyForKRaftController() validateControllerListenerNamesMustAppearInListenersForKRaftController() - } else if (isKRaftCombinedMode) { + } else if (isKRaftCombinedMode(processRoles.asJava)) { // KRaft combined broker and controller validateQuorumVotersAndQuorumBootstrapServerForKRaft() validateControllerQuorumVotersMustContainNodeIdForKRaftController() diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 50d6f0ff4a3af..dbf554285838e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2247,7 +2247,7 @@ class ReplicaManager(val config: KafkaConfig, } private def createReplicaSelector(metrics: Metrics): Option[Plugin[ReplicaSelector]] = { - config.replicaSelectorClassName.map { className => + config.replicaSelectorClassName.asScala.map { className => val tmpReplicaSelector: ReplicaSelector = Utils.newInstance(className, classOf[ReplicaSelector]) tmpReplicaSelector.configure(config.originals()) Plugin.wrapInstance(tmpReplicaSelector, metrics, ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG) diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index d129fdcd65b08..8d19c28c23ff8 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.Group.GroupType; import org.apache.kafka.coordinator.group.GroupConfig; import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig; @@ -40,6 +41,7 @@ import org.apache.kafka.raft.KRaftConfigs; import org.apache.kafka.raft.MetadataLogConfig; import org.apache.kafka.raft.QuorumConfig; +import org.apache.kafka.server.ProcessRole; import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; import org.apache.kafka.server.metrics.MetricConfigs; import org.apache.kafka.server.util.Csv; @@ -48,7 +50,9 @@ import org.apache.commons.validator.routines.InetAddressValidator; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -89,8 +93,11 @@ public abstract class AbstractKafkaConfig extends AbstractConfig { AddPartitionsToTxnConfig.CONFIG_DEF )); + private final boolean doLog; + public AbstractKafkaConfig(ConfigDef definition, Map originals, Map configProviderProps, boolean doLog) { super(definition, originals, configProviderProps, doLog); + this.doLog = doLog; } public List logDirs() { @@ -635,4 +642,295 @@ public Map extractGroupConfigMap() { ); return defaults; } + + // ********* General Configuration ********** + + public int brokerSessionTimeoutMs() { + return getInt(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG); + } + + public long controllerPerformanceSamplePeriodMs() { + return getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS); + } + + public long controllerPerformanceAlwaysLogThresholdMs() { + return getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS); + } + + public Set parseProcessRoles() { + List roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG); + Set result = new HashSet<>(); + for (String role : roles) { + switch (role) { + case "broker" -> result.add(ProcessRole.BrokerRole); + case "controller" -> result.add(ProcessRole.ControllerRole); + default -> throw new ConfigException("Unknown process role '" + role + + "' (only 'broker' and 'controller' are allowed roles)"); + } + } + return Collections.unmodifiableSet(result); + } + + public boolean isKRaftCombinedMode(Set processRoles) { + return processRoles.equals(Set.of(ProcessRole.BrokerRole, ProcessRole.ControllerRole)); + } + + public String metadataLogDir() { + String dir = getString(MetadataLogConfig.METADATA_LOG_DIR_CONFIG); + return dir != null ? dir : logDirs().get(0); + } + + public long serverMaxStartupTimeMs() { + return getLong(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG); + } + + public Integer messageMaxBytes() { + return getInt(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG); + } + + public int getNumReplicaAlterLogDirsThreads() { + Integer numThreads = getInt(ServerConfigs.NUM_REPLICA_ALTER_LOG_DIRS_THREADS_CONFIG); + return numThreads != null ? numThreads : logDirs().size(); + } + + // ********* Metadata Configuration ********** + + public long metadataSnapshotMaxNewRecordBytes() { + return getLong(MetadataLogConfig.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG); + } + + public long metadataSnapshotMaxIntervalMs() { + return getLong(MetadataLogConfig.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG); + } + + public Optional metadataMaxIdleIntervalNs() { + long value = TimeUnit.NANOSECONDS.convert( + getInt(MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG).longValue(), + TimeUnit.MILLISECONDS); + return value > 0 ? Optional.of(value) : Optional.empty(); + } + + // ********* Rack Configuration ********** + + public Optional replicaSelectorClassName() { + return Optional.ofNullable(getString(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG)); + } + + // ********* Replication Configuration ********** + + public int controllerSocketTimeoutMs() { + return getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG); + } + + public int defaultReplicationFactor() { + return getInt(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG); + } + + public long replicaLagTimeMaxMs() { + return getLong(ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG); + } + + public int replicaSocketTimeoutMs() { + return getInt(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG); + } + + public int replicaSocketReceiveBufferBytes() { + return getInt(ReplicationConfigs.REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_CONFIG); + } + + public int replicaFetchMaxBytes() { + return getInt(ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG); + } + + public int replicaFetchWaitMaxMs() { + return getInt(ReplicationConfigs.REPLICA_FETCH_WAIT_MAX_MS_CONFIG); + } + + public int replicaFetchMinBytes() { + return getInt(ReplicationConfigs.REPLICA_FETCH_MIN_BYTES_CONFIG); + } + + public int replicaFetchResponseMaxBytes() { + return getInt(ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_CONFIG); + } + + public int replicaFetchBackoffMs() { + return getInt(ReplicationConfigs.REPLICA_FETCH_BACKOFF_MS_CONFIG); + } + + public long replicaHighWatermarkCheckpointIntervalMs() { + return getLong(ReplicationConfigs.REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_CONFIG); + } + + public int fetchPurgatoryPurgeIntervalRequests() { + return getInt(ReplicationConfigs.FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG); + } + + public int producerPurgatoryPurgeIntervalRequests() { + return getInt(ReplicationConfigs.PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG); + } + + public int deleteRecordsPurgatoryPurgeIntervalRequests() { + return getInt(ReplicationConfigs.DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG); + } + + public boolean autoLeaderRebalanceEnable() { + return getBoolean(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG); + } + + public long leaderImbalanceCheckIntervalSeconds() { + return getLong(ReplicationConfigs.LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_CONFIG); + } + + public long uncleanLeaderElectionCheckIntervalMs() { + return getLong(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG); + } + + public Boolean uncleanLeaderElectionEnable() { + return getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG); + } + + public Boolean followerFetchLastTieredOffsetEnable() { + return getBoolean(ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG); + } + + // ********* Controlled Shutdown Configuration ********** + + public boolean controlledShutdownEnable() { + return getBoolean(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG); + } + + // ********* Group Coordinator Configuration ********** + + @SuppressWarnings("removal") + public Set groupCoordinatorRebalanceProtocols() { + Set protocols = getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG) + .stream() + .map(s -> GroupType.valueOf(s.toUpperCase(Locale.ROOT))) + .collect(Collectors.toUnmodifiableSet()); + + if (!protocols.contains(GroupType.CLASSIC)) { + throw new ConfigException("Disabling the '" + GroupType.CLASSIC + "' protocol is not supported."); + } + + if (doLog && protocols.contains(GroupType.SHARE)) { + log.warn("'{}' in `{}` is deprecated. " + + "Share groups are controlled by the 'share.version' feature. " + + "This config will be removed in Kafka 5.0.", + GroupType.SHARE, + GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG); + } + + if (doLog && originals().containsKey(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG)) { + Set defaultProtocols = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT + .stream() + .map(s -> GroupType.valueOf(s.toUpperCase(Locale.ROOT))) + .collect(Collectors.toUnmodifiableSet()); + + Set missingProtocols = new HashSet<>(defaultProtocols); + missingProtocols.removeAll(protocols); + + if (!missingProtocols.isEmpty()) { + log.warn("The config `{}` is deprecated and will be removed in Kafka 5.0. " + + "The following protocol(s) are currently disabled: {}. " + + "In Kafka 5.0, all protocols will always be enabled and controlled solely by feature versions " + + "(group.version, streams.version, share.version) via kafka-features.sh. " + + "Please remove the configuration, which will restore all protocols to the default enabled state, to prepare for the upgrade.", + GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, + missingProtocols.stream().map(GroupType::toString).collect(Collectors.joining(", "))); + } else { + log.warn("The config `{}` is deprecated and will be removed in Kafka 5.0. " + + "Please remove the configuration to prepare for the upgrade.", + GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG); + } + } + + return protocols; + } + + // ********* Metric Configuration ********** + + public int metricNumSamples() { + return getInt(MetricConfigs.METRIC_NUM_SAMPLES_CONFIG); + } + + public long metricSampleWindowMs() { + return getLong(MetricConfigs.METRIC_SAMPLE_WINDOW_MS_CONFIG); + } + + public String metricRecordingLevel() { + return getString(MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG); + } + + // ********* Kafka Client Telemetry Metrics Configuration ********** + + public int clientTelemetryMaxBytes() { + return getInt(MetricConfigs.CLIENT_TELEMETRY_MAX_BYTES_CONFIG); + } + + // ********* SSL/SASL Configuration ********** + // Security configs may be overridden for listeners, so it is not safe to use the base values. + // Hence the base SSL/SASL configs are not fields of KafkaConfig, listener configs should be + // retrieved using KafkaConfig#valuesWithPrefixOverride + + @SuppressWarnings("unchecked") + public Set saslEnabledMechanisms(ListenerName listenerName) { + Object value = valuesWithPrefixOverride(listenerName.configPrefix()) + .get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG); + if (value != null) { + return Set.copyOf((List) value); + } + return Set.of(); + } + + public String saslMechanismInterBrokerProtocol() { + return getString(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG); + } + + // ********* Fetch Configuration ********** + + public int maxIncrementalFetchSessionCacheSlots() { + return getInt(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG); + } + + public int fetchMaxBytes() { + return getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG); + } + + // ********* Request Limit Configuration ********** + + public int maxRequestPartitionSizeLimit() { + return getInt(ServerConfigs.MAX_REQUEST_PARTITION_SIZE_LIMIT_CONFIG); + } + + public boolean deleteTopicEnable() { + return getBoolean(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG); + } + + public String compressionType() { + return getString(ServerConfigs.COMPRESSION_TYPE_CONFIG); + } + + public Integer gzipCompressionLevel() { + return getInt(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG); + } + + public Integer lz4CompressionLevel() { + return getInt(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG); + } + + public Integer zstdCompressionLevel() { + return getInt(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG); + } + + // ********* Internal Configurations ********** + + public boolean unstableApiVersionsEnabled() { + return getBoolean(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG); + } + + public boolean unstableFeatureVersionsEnabled() { + return getBoolean(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG); + } + }