diff --git a/docs/user/kafka/usage.rst b/docs/user/kafka/usage.rst index e6569f91d91e..f919d7fc9f01 100644 --- a/docs/user/kafka/usage.rst +++ b/docs/user/kafka/usage.rst @@ -12,55 +12,56 @@ create multiple Kafka data store instances when dealing with multiple schemas. Use the following parameters for a Kafka data store (required parameters are marked with ``*``): -==================================== ======= ==================================================================================================== -Parameter Type Description -==================================== ======= ==================================================================================================== -``kafka.brokers *`` String Kafka brokers, e.g. ``localhost:9092`` -``kafka.zookeepers`` String Kafka zookeepers, e.g ``localhost:2181``, used to persist GeoMesa metadata in Zookeeper instead - of in Kafka topics. See :ref:`no_zookeeper` for details. -``kafka.catalog.topic`` String The Kafka topic used to store schema metadata (when not using Zookeeper) -``kafka.zk.path`` String Zookeeper discoverable path, can be used to namespace feature types (when using Zookeeper) -``kafka.producer.config`` String Configuration options for kafka producer, in Java properties - format. See `Producer Configs `_ -``kafka.producer.clear`` Boolean Send a 'clear' message on startup. This will cause clients to ignore any data that was in the - topic prior to startup -``kafka.consumer.config`` String Configuration options for kafka consumer, in Java properties - format. See `Consumer Configs `_ -``kafka.consumer.read-back`` String On start up, read messages that were written within this time frame (vs ignore old messages), e.g. - ``1 hour``. Use ``Inf`` to read all messages. If enabled, features will not be available for query - until all existing messages are processed. However, feature listeners will still be invoked as - normal. See :ref:`kafka_initial_load` -``kafka.consumer.count`` Integer Number of kafka consumers used per feature type. Set to 0 to disable consuming (i.e. producer only) -``kafka.consumer.group-prefix`` String Prefix to use for kafka group ID, to more easily identify particular data stores -``kafka.consumer.start-on-demand`` Boolean The default behavior is to start consuming a topic only when that feature type is first requested. - This can reduce load if some layers are never queried. Note that care should be taken when setting - this to false, as the store will immediately start consuming from Kafka for all known feature types, - which may require significant memory overhead. -``kafka.topic.partitions`` Integer Number of partitions to use in new kafka topics -``kafka.topic.replication`` Integer Replication factor to use in new kafka topics -``kafka.serialization.type`` String Internal serialization format to use for kafka messages. Must be one of ``kryo``, ``avro`` - or ``avro-native`` -``kafka.cache.expiry`` String Expire features from in-memory cache after this delay, e.g. "10 minutes". See :ref:`kafka_expiry` -``kafka.cache.expiry.dynamic`` String Expire features dynamically based on CQL predicates. See :ref:`kafka_expiry` -``kafka.cache.event-time`` String Instead of message time, determine expiry based on feature data. See :ref:`kafka_event_time` -``kafka.cache.event-time.ordering`` Boolean Instead of message time, determine feature ordering based on the feature event time. - See :ref:`kafka_event_time` -``kafka.index.cqengine`` String Use CQEngine-based attribute indices for the in-memory feature cache. See :ref:`kafka_cqengine` -``kafka.index.resolution.x`` Integer Number of bins in the x-dimension of the spatial index, by default 360. See - :ref:`kafka_index_resolution` -``kafka.index.resolution.y`` Integer Number of bins in the y-dimension of the spatial index, by default 180. See - :ref:`kafka_index_resolution` -``kafka.index.tiers`` String Number and size of tiers used for indexing geometries with extents, in the form ``x1:y1,x2:y2``. - See :ref:`kafka_ssi` -``kafka.serialization.lazy`` Boolean Use lazy deserialization of features. This may improve processing load at - the expense of slightly slower query times -``kafka.layer.views`` String Additional views on existing schemas to expose as layers. See :ref:`kafka_layer_views` for details -``kafka.metrics.reporters`` String Reporters used to publish Kafka metrics, as TypeSafe config. To use multiple reporters, nest - them under the key ``reporters``. See :ref:`geomesa_metrics` for details -``geomesa.query.loose-bounding-box`` Boolean Use loose bounding boxes, which offer improved performance but are not exact -``geomesa.query.audit`` Boolean Audit incoming queries. By default audits are written to a log file -``geomesa.security.auths`` String Default authorizations used to query data, comma-separated -==================================== ======= ==================================================================================================== +============================================ ======= ==================================================================================================== +Parameter Type Description +============================================ ======= ==================================================================================================== +``kafka.brokers *`` String Kafka brokers, e.g. ``localhost:9092`` +``kafka.zookeepers`` String Kafka zookeepers, e.g ``localhost:2181``, used to persist GeoMesa metadata in Zookeeper instead + of in Kafka topics. See :ref:`no_zookeeper` for details. +``kafka.catalog.topic`` String The Kafka topic used to store schema metadata (when not using Zookeeper) +``kafka.zk.path`` String Zookeeper discoverable path, can be used to namespace feature types (when using Zookeeper) +``kafka.producer.config`` String Configuration options for kafka producer, in Java properties + format. See `Producer Configs `_ +``kafka.producer.clear`` Boolean Send a 'clear' message on startup. This will cause clients to ignore any data that was in the + topic prior to startup +``kafka.consumer.config`` String Configuration options for kafka consumer, in Java properties + format. See `Consumer Configs `_ +``kafka.consumer.read-back`` String On start up, read messages that were written within this time frame (vs ignore old messages), e.g. + ``1 hour``. Use ``Inf`` to read all messages. If enabled, features will not be available for query + until all existing messages are processed. However, feature listeners will still be invoked as + normal. See :ref:`kafka_initial_load` +``kafka.consumer.count`` Integer Number of kafka consumers used per feature type. Set to 0 to disable consuming (i.e. producer only) +``kafka.consumer.offset-commit-interval-ms`` Integer Number of milliseconds to pass before commiting offsets for the consumer group. +``kafka.consumer.group-prefix`` String Prefix to use for kafka group ID, to more easily identify particular data stores +``kafka.consumer.start-on-demand`` Boolean The default behavior is to start consuming a topic only when that feature type is first requested. + This can reduce load if some layers are never queried. Note that care should be taken when setting + this to false, as the store will immediately start consuming from Kafka for all known feature types, + which may require significant memory overhead. +``kafka.topic.partitions`` Integer Number of partitions to use in new kafka topics +``kafka.topic.replication`` Integer Replication factor to use in new kafka topics +``kafka.serialization.type`` String Internal serialization format to use for kafka messages. Must be one of ``kryo``, ``avro`` + or ``avro-native`` +``kafka.cache.expiry`` String Expire features from in-memory cache after this delay, e.g. "10 minutes". See :ref:`kafka_expiry` +``kafka.cache.expiry.dynamic`` String Expire features dynamically based on CQL predicates. See :ref:`kafka_expiry` +``kafka.cache.event-time`` String Instead of message time, determine expiry based on feature data. See :ref:`kafka_event_time` +``kafka.cache.event-time.ordering`` Boolean Instead of message time, determine feature ordering based on the feature event time. + See :ref:`kafka_event_time` +``kafka.index.cqengine`` String Use CQEngine-based attribute indices for the in-memory feature cache. See :ref:`kafka_cqengine` +``kafka.index.resolution.x`` Integer Number of bins in the x-dimension of the spatial index, by default 360. See + :ref:`kafka_index_resolution` +``kafka.index.resolution.y`` Integer Number of bins in the y-dimension of the spatial index, by default 180. See + :ref:`kafka_index_resolution` +``kafka.index.tiers`` String Number and size of tiers used for indexing geometries with extents, in the form ``x1:y1,x2:y2``. + See :ref:`kafka_ssi` +``kafka.serialization.lazy`` Boolean Use lazy deserialization of features. This may improve processing load at + the expense of slightly slower query times +``kafka.layer.views`` String Additional views on existing schemas to expose as layers. See :ref:`kafka_layer_views` for details +``kafka.metrics.reporters`` String Reporters used to publish Kafka metrics, as TypeSafe config. To use multiple reporters, nest + them under the key ``reporters``. See :ref:`geomesa_metrics` for details +``geomesa.query.loose-bounding-box`` Boolean Use loose bounding boxes, which offer improved performance but are not exact +``geomesa.query.audit`` Boolean Audit incoming queries. By default audits are written to a log file +``geomesa.security.auths`` String Default authorizations used to query data, comma-separated +============================================ ======= ==================================================================================================== Programmatic Access ------------------- diff --git a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaCacheLoader.scala b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaCacheLoader.scala index f5854b16c083..280d18106266 100644 --- a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaCacheLoader.scala +++ b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaCacheLoader.scala @@ -76,8 +76,9 @@ object KafkaCacheLoader extends LazyLogging { frequency: Long, serializer: GeoMessageSerializer, doInitialLoad: Boolean, - initialLoadConfig: ExpiryTimeConfig - ) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency)) with KafkaCacheLoader { + initialLoadConfig: ExpiryTimeConfig, + offsetCommitIntervalMs: Long + ) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency), offsetCommitIntervalMs) with KafkaCacheLoader { try { classOf[ConsumerRecord[Any, Any]].getMethod("timestamp") } catch { case _: NoSuchMethodException => logger.warn("This version of Kafka doesn't support timestamps, using system time") } @@ -85,7 +86,7 @@ object KafkaCacheLoader extends LazyLogging { private val initialLoader = if (doInitialLoad) { // for the initial load, don't bother spatially indexing until we have the final state - Some(new InitialLoader(sft, consumers, topic, frequency, serializer, initialLoadConfig, this)) + Some(new InitialLoader(sft, consumers, topic, frequency, offsetCommitIntervalMs, serializer, initialLoadConfig, this)) } else { None } @@ -133,10 +134,11 @@ object KafkaCacheLoader extends LazyLogging { consumers: Seq[Consumer[Array[Byte], Array[Byte]]], topic: String, frequency: Long, + offsetCommitIntervalMs: Long, serializer: GeoMessageSerializer, ordering: ExpiryTimeConfig, toLoad: KafkaCacheLoaderImpl - ) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency), false) with Runnable { + ) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency), offsetCommitIntervalMs, false) with Runnable { import scala.collection.JavaConverters._ diff --git a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala index 2daaf2c35a04..4663fc147354 100644 --- a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala +++ b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala @@ -100,7 +100,8 @@ class KafkaDataStore( val serializer = serialization.apply(sft) val initialLoad = config.consumers.readBack.isDefined val expiry = config.indices.expiry - val loader = new KafkaCacheLoaderImpl(sft, cache, consumers, topic, frequency, serializer, initialLoad, expiry) + val offsetCommitIntervalMs = config.consumers.offsetCommitIntervalMs.getOrElse(10000L) + val loader = new KafkaCacheLoaderImpl(sft, cache, consumers, topic, frequency, serializer, initialLoad, expiry, offsetCommitIntervalMs) try { loader.start() } catch { case NonFatal(e) => CloseWithLogging(loader) @@ -587,7 +588,8 @@ object KafkaDataStore extends LazyLogging { count: Int, groupPrefix: String, properties: Map[String, String], - readBack: Option[Duration] + readBack: Option[Duration], + offsetCommitIntervalMs: Option[Long] ) case class ProducerConfig(properties: Map[String, String]) diff --git a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreFactory.scala b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreFactory.scala index 1469c0b99f54..4925ed9ee3b3 100644 --- a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreFactory.scala +++ b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreFactory.scala @@ -109,6 +109,7 @@ object KafkaDataStoreFactory extends GeoMesaDataStoreInfo with LazyLogging { KafkaDataStoreParams.ConsumerCount, KafkaDataStoreParams.ConsumerGroupPrefix, KafkaDataStoreParams.ConsumerConfig, + KafkaDataStoreParams.ConsumerOffsetCommitIntervalMs, KafkaDataStoreParams.ConsumerReadBack, KafkaDataStoreParams.CacheExpiry, KafkaDataStoreParams.DynamicCacheExpiry, @@ -151,7 +152,8 @@ object KafkaDataStoreFactory extends GeoMesaDataStoreInfo with LazyLogging { } val props = ConsumerConfig.lookupOpt(params).map(_.asScala.toMap).getOrElse(Map.empty[String, String]) val readBack = ConsumerReadBack.lookupOpt(params) - KafkaDataStore.ConsumerConfig(count, prefix, props, readBack) + val offsetCommitIntervalMs = ConsumerOffsetCommitIntervalMs.lookupOpt(params).map(_.toLong) + KafkaDataStore.ConsumerConfig(count, prefix, props, readBack, offsetCommitIntervalMs) } val producers = { diff --git a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreParams.scala b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreParams.scala index af55ab30655d..b0a53b4bbba5 100644 --- a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreParams.scala +++ b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreParams.scala @@ -113,6 +113,14 @@ object KafkaDataStoreParams extends NamespaceParams { readWrite = ReadWriteFlag.ReadOnly ) + val ConsumerOffsetCommitIntervalMs = + new GeoMesaParam[java.lang.Long]( + "kafka.consumer.offset-commit-interval-ms", + "The frequency of committing offsets for the Kafka consumer", + default = 10000, + readWrite = ReadWriteFlag.ReadOnly + ) + val TopicPartitions = new GeoMesaParam[Integer]( "kafka.topic.partitions", diff --git a/geomesa-kafka/geomesa-kafka-utils/src/main/scala/org/locationtech/geomesa/kafka/consumer/ThreadedConsumer.scala b/geomesa-kafka/geomesa-kafka-utils/src/main/scala/org/locationtech/geomesa/kafka/consumer/ThreadedConsumer.scala index 69d0c646c212..6bcbae5a7b78 100644 --- a/geomesa-kafka/geomesa-kafka-utils/src/main/scala/org/locationtech/geomesa/kafka/consumer/ThreadedConsumer.scala +++ b/geomesa-kafka/geomesa-kafka-utils/src/main/scala/org/locationtech/geomesa/kafka/consumer/ThreadedConsumer.scala @@ -21,6 +21,7 @@ import scala.util.control.NonFatal abstract class ThreadedConsumer( consumers: Seq[Consumer[Array[Byte], Array[Byte]]], frequency: Duration, + offsetCommitIntervalMs: Long, closeConsumers: Boolean = true ) extends BaseThreadedConsumer(consumers) { @@ -44,6 +45,8 @@ abstract class ThreadedConsumer( lazy private val topics = consumer.subscription().asScala.mkString(", ") + private var lastOffsetCommitMs = System.currentTimeMillis() + override def run(): Unit = { try { var interrupted = false @@ -60,7 +63,11 @@ abstract class ThreadedConsumer( consume(records.next()) } logger.trace(s"Consumer [$id] finished processing ${result.count()} records from topic $topics") - consumer.commitAsync(callback) + if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) { + logger.trace(s"Consumer [$id] committing offsets") + consumer.commitAsync() + lastOffsetCommitMs = System.currentTimeMillis() + } errorCount = 0 // reset error count } } catch { diff --git a/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/data/LambdaDataStore.scala b/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/data/LambdaDataStore.scala index 4eac62725da8..00c7a372bb2d 100644 --- a/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/data/LambdaDataStore.scala +++ b/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/data/LambdaDataStore.scala @@ -218,5 +218,6 @@ object LambdaDataStore { consumers: Int, expiry: Option[FiniteDuration], persistBatchSize: Option[Int] = None, + offsetCommitIntervalMs: Long = 10000 ) } diff --git a/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/data/LambdaDataStoreParams.scala b/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/data/LambdaDataStoreParams.scala index 37c188a31192..82424549a33e 100644 --- a/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/data/LambdaDataStoreParams.scala +++ b/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/data/LambdaDataStoreParams.scala @@ -104,6 +104,13 @@ object LambdaDataStoreParams extends GeoMesaDataStoreParams with SecurityParams "Offset manager instance to use", deprecatedKeys = Seq("lamdab.offset-manager", "offsetManager")) + val ConsumerOffsetCommitIntervalMs = + new GeoMesaParam[java.lang.Long]( + "lambda.kafka.consumer.offset-commit-interval-ms", + "The frequency of committing offsets for the Kafka consumer", + default = 10000 + ) + def parse(params: java.util.Map[String, _], namespace: String): LambdaConfig = { val brokers = BrokersParam.lookup(params) val expiry = if (!PersistParam.lookup(params).booleanValue) { None } else { @@ -121,6 +128,8 @@ object LambdaDataStoreParams extends GeoMesaDataStoreParams with SecurityParams val zk = ZookeepersParam.lookup(params) val zkNamespace = s"gm_lambda_$namespace" - LambdaConfig(zk, zkNamespace, producerConfig, consumerConfig, partitions, consumers, expiry, batchSize) + val offsetCommitIntervalMs : Long = ConsumerOffsetCommitIntervalMs.lookupOpt(params).map(_.toLong).getOrElse(10000) + + LambdaConfig(zk, zkNamespace, producerConfig, consumerConfig, partitions, consumers, expiry, batchSize, offsetCommitIntervalMs) } } diff --git a/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/kafka/KafkaCacheLoader.scala b/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/kafka/KafkaCacheLoader.scala index 63d3421f1799..39861ef83bad 100644 --- a/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/kafka/KafkaCacheLoader.scala +++ b/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/kafka/KafkaCacheLoader.scala @@ -33,7 +33,8 @@ class KafkaCacheLoader( topic: String, frequency: Long, serializer: KryoFeatureSerializer, - cache: WritableFeatureCache) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency)) { + cache: WritableFeatureCache, + offsetCommitIntervalMs: Long) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency), offsetCommitIntervalMs) { startConsumers() diff --git a/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/kafka/KafkaStore.scala b/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/kafka/KafkaStore.scala index 335f155e6d41..9685fb36428b 100644 --- a/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/kafka/KafkaStore.scala +++ b/geomesa-lambda/geomesa-lambda-datastore/src/main/scala/org/locationtech/geomesa/lambda/stream/kafka/KafkaStore.scala @@ -70,7 +70,7 @@ class KafkaStore( private val loader = { val consumers = KafkaStore.consumers(config.consumerConfig, topic, offsetManager, config.consumers, cache.partitionAssigned) val frequency = KafkaStore.LoadIntervalProperty.toDuration.get.toMillis - new KafkaCacheLoader(consumers, topic, frequency, serializer, cache) + new KafkaCacheLoader(consumers, topic, frequency, serializer, cache, config.offsetCommitIntervalMs) } override def createSchema(): Unit = {