Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
*/
public class AbstractConfig {

private static final Logger log = LoggerFactory.getLogger(AbstractConfig.class);
public static final Logger log = LoggerFactory.getLogger(AbstractConfig.class);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider protected instead of public, or a dedicated logger in AbstractKafkaConfig, to avoid expanding AbstractConfig’s public surface.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair suggestion, we only need this up until we get rid of the configurations which trigger the warnings, but I will swap it to protected


/**
* Configs for which values have been requested, used to detect unused configs.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class ControllerServer(
OptionalLong.empty()
}

val maxIdleIntervalNs = config.metadataMaxIdleIntervalNs.fold(OptionalLong.empty)(OptionalLong.of)
val maxIdleIntervalNs = config.metadataMaxIdleIntervalNs.map[OptionalLong](v => OptionalLong.of(v)).orElse(OptionalLong.empty)

quorumControllerMetrics = new QuorumControllerMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry), time, config.brokerSessionTimeoutMs)

Expand Down
164 changes: 12 additions & 152 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,32 @@

package kafka.server

import java.util
import java.util.concurrent.TimeUnit
import java.util.Properties
import kafka.utils.Logging
import kafka.utils.Implicits._
import org.apache.kafka.common.{Endpoint, Reconfigurable}
import org.apache.kafka.common.config.{ConfigDef, ConfigException, TopicConfig}
import kafka.utils.Logging
import org.apache.kafka.common.config.ConfigDef.ConfigKey
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{ConfigDef, ConfigException, TopicConfig}
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.auth.{KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.common.{Endpoint, Reconfigurable}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig}
import org.apache.kafka.raft.{KRaftConfigs, QuorumConfig}
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.{AbstractKafkaConfig, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, DynamicBrokerConfig => JDynamicBrokerConfig}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs

import scala.jdk.CollectionConverters._
import java.util
import java.util.Properties
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.{RichOption, RichOptional}

object KafkaConfig {
Expand Down Expand Up @@ -110,7 +106,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])

// Cache the current config to avoid acquiring read lock to access from dynamicConfig
@volatile private var currentConfig = this
val processRoles: Set[ProcessRole] = parseProcessRoles()
val processRoles: Set[ProcessRole] = parseProcessRoles().asScala.toSet
private[server] val dynamicConfig = new DynamicBrokerConfig(this)

private[server] def updateCurrentConfig(newConfig: KafkaConfig): Unit = {
Expand Down Expand Up @@ -162,47 +158,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
private val _quotaConfig = new QuotaConfig(this)
def quotaConfig: QuotaConfig = _quotaConfig

/** ********* General Configuration ***********/
val brokerSessionTimeoutMs: Int = getInt(KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG)
val controllerPerformanceSamplePeriodMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS)
val controllerPerformanceAlwaysLogThresholdMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS)

private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map {
case "broker" => ProcessRole.BrokerRole
case "controller" => ProcessRole.ControllerRole
case role => throw new ConfigException(s"Unknown process role '$role'" +
" (only 'broker' and 'controller' are allowed roles)")
}
roles.toSet
}

def isKRaftCombinedMode: Boolean = {
processRoles == Set(ProcessRole.BrokerRole, ProcessRole.ControllerRole)
}

def metadataLogDir: String = {
Option(getString(MetadataLogConfig.METADATA_LOG_DIR_CONFIG)) match {
case Some(dir) => dir
case None => logDirs.get(0)
}
}

val serverMaxStartupTimeMs = getLong(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG)

def messageMaxBytes = getInt(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG)

def getNumReplicaAlterLogDirsThreads: Int = {
val numThreads: Integer = Option(getInt(ServerConfigs.NUM_REPLICA_ALTER_LOG_DIRS_THREADS_CONFIG)).getOrElse(logDirs.size)
numThreads
}

/************* Metadata Configuration ***********/
val metadataSnapshotMaxNewRecordBytes = getLong(MetadataLogConfig.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG)
val metadataSnapshotMaxIntervalMs = getLong(MetadataLogConfig.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG)
val metadataMaxIdleIntervalNs: Option[Long] = {
val value = TimeUnit.NANOSECONDS.convert(getInt(MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG).toLong, TimeUnit.MILLISECONDS)
if (value > 0) Some(value) else None
super.isKRaftCombinedMode(processRoles.asJava)
}

/************* Authorizer Configuration ***********/
Expand Down Expand Up @@ -232,103 +189,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
}
}

/***************** rack configuration **************/
val replicaSelectorClassName = Option(getString(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG))

/** ********* Replication configuration ***********/
val controllerSocketTimeoutMs: Int = getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG)
val defaultReplicationFactor: Int = getInt(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG)
val replicaLagTimeMaxMs = getLong(ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG)
val replicaSocketTimeoutMs = getInt(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG)
val replicaSocketReceiveBufferBytes = getInt(ReplicationConfigs.REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_CONFIG)
val replicaFetchMaxBytes = getInt(ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG)
val replicaFetchWaitMaxMs = getInt(ReplicationConfigs.REPLICA_FETCH_WAIT_MAX_MS_CONFIG)
val replicaFetchMinBytes = getInt(ReplicationConfigs.REPLICA_FETCH_MIN_BYTES_CONFIG)
val replicaFetchResponseMaxBytes = getInt(ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_CONFIG)
val replicaFetchBackoffMs = getInt(ReplicationConfigs.REPLICA_FETCH_BACKOFF_MS_CONFIG)
val replicaHighWatermarkCheckpointIntervalMs = getLong(ReplicationConfigs.REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_CONFIG)
val fetchPurgatoryPurgeIntervalRequests = getInt(ReplicationConfigs.FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG)
val producerPurgatoryPurgeIntervalRequests = getInt(ReplicationConfigs.PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG)
val deleteRecordsPurgatoryPurgeIntervalRequests = getInt(ReplicationConfigs.DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG)
val autoLeaderRebalanceEnable = getBoolean(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG)
val leaderImbalanceCheckIntervalSeconds: Long = getLong(ReplicationConfigs.LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_CONFIG)
val uncleanLeaderElectionCheckIntervalMs: Long = getLong(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG)
def uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG)
def followerFetchLastTieredOffsetEnable: java.lang.Boolean = getBoolean(ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG);

/** ********* Controlled shutdown configuration ***********/
val controlledShutdownEnable = getBoolean(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG)

/** Group coordinator configs */
val groupCoordinatorRebalanceProtocols = {
val protocols = getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG)
.asScala.map(_.toUpperCase).map(GroupType.valueOf).toSet
if (!protocols.contains(GroupType.CLASSIC)) {
throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' protocol is not supported.")
}
if (doLog && protocols.contains(GroupType.SHARE)) {
warn(s"'${GroupType.SHARE}' in `${GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG}` is deprecated. " +
s"Share groups are controlled by the 'share.version' feature. " +
s"This config will be removed in Kafka 5.0.")
}
if (doLog && originals().containsKey(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG)) {
val defaultProtocols = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT
.asScala.map(_.toUpperCase).map(GroupType.valueOf).toSet
val missingProtocols = defaultProtocols -- protocols
if (missingProtocols.nonEmpty) {
warn(s"The config `${GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG}` is deprecated and will be removed in Kafka 5.0. " +
s"The following protocol(s) are currently disabled: ${missingProtocols.mkString(", ")}. " +
s"In Kafka 5.0, all protocols will always be enabled and controlled solely by feature versions " +
s"(group.version, streams.version, share.version) via kafka-features.sh. " +
s"Please remove the configuration, which will restore all protocols to the default enabled state, to prepare for the upgrade.")
} else {
warn(s"The config `${GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG}` is deprecated and will be removed in Kafka 5.0. " +
s"Please remove the configuration to prepare for the upgrade.")
}
}
protocols
}

/** ********* Metric Configuration **************/
val metricNumSamples = getInt(MetricConfigs.METRIC_NUM_SAMPLES_CONFIG)
val metricSampleWindowMs = getLong(MetricConfigs.METRIC_SAMPLE_WINDOW_MS_CONFIG)
val metricRecordingLevel = getString(MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG)

/** ********* Kafka Client Telemetry Metrics Configuration ***********/
val clientTelemetryMaxBytes: Int = getInt(MetricConfigs.CLIENT_TELEMETRY_MAX_BYTES_CONFIG)

/** ********* SSL/SASL Configuration **************/
// Security configs may be overridden for listeners, so it is not safe to use the base values
// Hence the base SSL/SASL configs are not fields of KafkaConfig, listener configs should be
// retrieved using KafkaConfig#valuesWithPrefixOverride
private def saslEnabledMechanisms(listenerName: ListenerName): Set[String] = {
val value = valuesWithPrefixOverride(listenerName.configPrefix).get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG)
if (value != null)
value.asInstanceOf[util.List[String]].asScala.toSet
else
Set.empty[String]
}

def saslMechanismInterBrokerProtocol = getString(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG)

/** ********* Fetch Configuration **************/
val maxIncrementalFetchSessionCacheSlots = getInt(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG)
val fetchMaxBytes = getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG)

/** ********* Request Limit Configuration ***********/
val maxRequestPartitionSizeLimit = getInt(ServerConfigs.MAX_REQUEST_PARTITION_SIZE_LIMIT_CONFIG)

val deleteTopicEnable = getBoolean(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG)
def compressionType = getString(ServerConfigs.COMPRESSION_TYPE_CONFIG)

def gzipCompressionLevel = getInt(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG)
def lz4CompressionLevel = getInt(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG)
def zstdCompressionLevel = getInt(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG)

/** Internal Configurations **/
val unstableApiVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG)
val unstableFeatureVersionsEnabled = getBoolean(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG)

override def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.addReconfigurable(reconfigurable)
}
Expand Down Expand Up @@ -515,7 +375,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
validateAdvertisedControllerListenersNonEmptyForKRaftController()
validateControllerListenerNamesMustAppearInListenersForKRaftController()
} else if (isKRaftCombinedMode) {
} else if (isKRaftCombinedMode(processRoles.asJava)) {
// KRaft combined broker and controller
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2247,7 +2247,7 @@ class ReplicaManager(val config: KafkaConfig,
}

private def createReplicaSelector(metrics: Metrics): Option[Plugin[ReplicaSelector]] = {
config.replicaSelectorClassName.map { className =>
config.replicaSelectorClassName.asScala.map { className =>
val tmpReplicaSelector: ReplicaSelector = Utils.newInstance(className, classOf[ReplicaSelector])
tmpReplicaSelector.configure(config.originals())
Plugin.wrapInstance(tmpReplicaSelector, metrics, ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG)
Expand Down
Loading
Loading