Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 50 additions & 49 deletions docs/user/kafka/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,55 +12,56 @@ create multiple Kafka data store instances when dealing with multiple schemas.

Use the following parameters for a Kafka data store (required parameters are marked with ``*``):

==================================== ======= ====================================================================================================
Parameter Type Description
==================================== ======= ====================================================================================================
``kafka.brokers *`` String Kafka brokers, e.g. ``localhost:9092``
``kafka.zookeepers`` String Kafka zookeepers, e.g ``localhost:2181``, used to persist GeoMesa metadata in Zookeeper instead
of in Kafka topics. See :ref:`no_zookeeper` for details.
``kafka.catalog.topic`` String The Kafka topic used to store schema metadata (when not using Zookeeper)
``kafka.zk.path`` String Zookeeper discoverable path, can be used to namespace feature types (when using Zookeeper)
``kafka.producer.config`` String Configuration options for kafka producer, in Java properties
format. See `Producer Configs <https://kafka.apache.org/documentation.html#producerconfigs>`_
``kafka.producer.clear`` Boolean Send a 'clear' message on startup. This will cause clients to ignore any data that was in the
topic prior to startup
``kafka.consumer.config`` String Configuration options for kafka consumer, in Java properties
format. See `Consumer Configs <https://kafka.apache.org/documentation.html#consumerconfigs>`_
``kafka.consumer.read-back`` String On start up, read messages that were written within this time frame (vs ignore old messages), e.g.
``1 hour``. Use ``Inf`` to read all messages. If enabled, features will not be available for query
until all existing messages are processed. However, feature listeners will still be invoked as
normal. See :ref:`kafka_initial_load`
``kafka.consumer.count`` Integer Number of kafka consumers used per feature type. Set to 0 to disable consuming (i.e. producer only)
``kafka.consumer.group-prefix`` String Prefix to use for kafka group ID, to more easily identify particular data stores
``kafka.consumer.start-on-demand`` Boolean The default behavior is to start consuming a topic only when that feature type is first requested.
This can reduce load if some layers are never queried. Note that care should be taken when setting
this to false, as the store will immediately start consuming from Kafka for all known feature types,
which may require significant memory overhead.
``kafka.topic.partitions`` Integer Number of partitions to use in new kafka topics
``kafka.topic.replication`` Integer Replication factor to use in new kafka topics
``kafka.serialization.type`` String Internal serialization format to use for kafka messages. Must be one of ``kryo``, ``avro``
or ``avro-native``
``kafka.cache.expiry`` String Expire features from in-memory cache after this delay, e.g. "10 minutes". See :ref:`kafka_expiry`
``kafka.cache.expiry.dynamic`` String Expire features dynamically based on CQL predicates. See :ref:`kafka_expiry`
``kafka.cache.event-time`` String Instead of message time, determine expiry based on feature data. See :ref:`kafka_event_time`
``kafka.cache.event-time.ordering`` Boolean Instead of message time, determine feature ordering based on the feature event time.
See :ref:`kafka_event_time`
``kafka.index.cqengine`` String Use CQEngine-based attribute indices for the in-memory feature cache. See :ref:`kafka_cqengine`
``kafka.index.resolution.x`` Integer Number of bins in the x-dimension of the spatial index, by default 360. See
:ref:`kafka_index_resolution`
``kafka.index.resolution.y`` Integer Number of bins in the y-dimension of the spatial index, by default 180. See
:ref:`kafka_index_resolution`
``kafka.index.tiers`` String Number and size of tiers used for indexing geometries with extents, in the form ``x1:y1,x2:y2``.
See :ref:`kafka_ssi`
``kafka.serialization.lazy`` Boolean Use lazy deserialization of features. This may improve processing load at
the expense of slightly slower query times
``kafka.layer.views`` String Additional views on existing schemas to expose as layers. See :ref:`kafka_layer_views` for details
``kafka.metrics.reporters`` String Reporters used to publish Kafka metrics, as TypeSafe config. To use multiple reporters, nest
them under the key ``reporters``. See :ref:`geomesa_metrics` for details
``geomesa.query.loose-bounding-box`` Boolean Use loose bounding boxes, which offer improved performance but are not exact
``geomesa.query.audit`` Boolean Audit incoming queries. By default audits are written to a log file
``geomesa.security.auths`` String Default authorizations used to query data, comma-separated
==================================== ======= ====================================================================================================
============================================ ======= ====================================================================================================
Parameter Type Description
============================================ ======= ====================================================================================================
``kafka.brokers *`` String Kafka brokers, e.g. ``localhost:9092``
``kafka.zookeepers`` String Kafka zookeepers, e.g ``localhost:2181``, used to persist GeoMesa metadata in Zookeeper instead
of in Kafka topics. See :ref:`no_zookeeper` for details.
``kafka.catalog.topic`` String The Kafka topic used to store schema metadata (when not using Zookeeper)
``kafka.zk.path`` String Zookeeper discoverable path, can be used to namespace feature types (when using Zookeeper)
``kafka.producer.config`` String Configuration options for kafka producer, in Java properties
format. See `Producer Configs <https://kafka.apache.org/documentation.html#producerconfigs>`_
``kafka.producer.clear`` Boolean Send a 'clear' message on startup. This will cause clients to ignore any data that was in the
topic prior to startup
``kafka.consumer.config`` String Configuration options for kafka consumer, in Java properties
format. See `Consumer Configs <https://kafka.apache.org/documentation.html#consumerconfigs>`_
``kafka.consumer.read-back`` String On start up, read messages that were written within this time frame (vs ignore old messages), e.g.
``1 hour``. Use ``Inf`` to read all messages. If enabled, features will not be available for query
until all existing messages are processed. However, feature listeners will still be invoked as
normal. See :ref:`kafka_initial_load`
``kafka.consumer.count`` Integer Number of kafka consumers used per feature type. Set to 0 to disable consuming (i.e. producer only)
``kafka.consumer.offset-commit-interval-ms`` Integer Number of milliseconds to pass before commiting offsets for the consumer group.
``kafka.consumer.group-prefix`` String Prefix to use for kafka group ID, to more easily identify particular data stores
``kafka.consumer.start-on-demand`` Boolean The default behavior is to start consuming a topic only when that feature type is first requested.
This can reduce load if some layers are never queried. Note that care should be taken when setting
this to false, as the store will immediately start consuming from Kafka for all known feature types,
which may require significant memory overhead.
``kafka.topic.partitions`` Integer Number of partitions to use in new kafka topics
``kafka.topic.replication`` Integer Replication factor to use in new kafka topics
``kafka.serialization.type`` String Internal serialization format to use for kafka messages. Must be one of ``kryo``, ``avro``
or ``avro-native``
``kafka.cache.expiry`` String Expire features from in-memory cache after this delay, e.g. "10 minutes". See :ref:`kafka_expiry`
``kafka.cache.expiry.dynamic`` String Expire features dynamically based on CQL predicates. See :ref:`kafka_expiry`
``kafka.cache.event-time`` String Instead of message time, determine expiry based on feature data. See :ref:`kafka_event_time`
``kafka.cache.event-time.ordering`` Boolean Instead of message time, determine feature ordering based on the feature event time.
See :ref:`kafka_event_time`
``kafka.index.cqengine`` String Use CQEngine-based attribute indices for the in-memory feature cache. See :ref:`kafka_cqengine`
``kafka.index.resolution.x`` Integer Number of bins in the x-dimension of the spatial index, by default 360. See
:ref:`kafka_index_resolution`
``kafka.index.resolution.y`` Integer Number of bins in the y-dimension of the spatial index, by default 180. See
:ref:`kafka_index_resolution`
``kafka.index.tiers`` String Number and size of tiers used for indexing geometries with extents, in the form ``x1:y1,x2:y2``.
See :ref:`kafka_ssi`
``kafka.serialization.lazy`` Boolean Use lazy deserialization of features. This may improve processing load at
the expense of slightly slower query times
``kafka.layer.views`` String Additional views on existing schemas to expose as layers. See :ref:`kafka_layer_views` for details
``kafka.metrics.reporters`` String Reporters used to publish Kafka metrics, as TypeSafe config. To use multiple reporters, nest
them under the key ``reporters``. See :ref:`geomesa_metrics` for details
``geomesa.query.loose-bounding-box`` Boolean Use loose bounding boxes, which offer improved performance but are not exact
``geomesa.query.audit`` Boolean Audit incoming queries. By default audits are written to a log file
``geomesa.security.auths`` String Default authorizations used to query data, comma-separated
============================================ ======= ====================================================================================================

Programmatic Access
-------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,17 @@ object KafkaCacheLoader extends LazyLogging {
frequency: Long,
serializer: GeoMessageSerializer,
doInitialLoad: Boolean,
initialLoadConfig: ExpiryTimeConfig
) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency)) with KafkaCacheLoader {
initialLoadConfig: ExpiryTimeConfig,
offsetCommitIntervalMs: Long
) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency), offsetCommitIntervalMs) with KafkaCacheLoader {
try { classOf[ConsumerRecord[Any, Any]].getMethod("timestamp") } catch {
case _: NoSuchMethodException => logger.warn("This version of Kafka doesn't support timestamps, using system time")
}

private val initialLoader =
if (doInitialLoad) {
// for the initial load, don't bother spatially indexing until we have the final state
Some(new InitialLoader(sft, consumers, topic, frequency, serializer, initialLoadConfig, this))
Some(new InitialLoader(sft, consumers, topic, frequency, offsetCommitIntervalMs, serializer, initialLoadConfig, this))
} else {
None
}
Expand Down Expand Up @@ -133,10 +134,11 @@ object KafkaCacheLoader extends LazyLogging {
consumers: Seq[Consumer[Array[Byte], Array[Byte]]],
topic: String,
frequency: Long,
offsetCommitIntervalMs: Long,
serializer: GeoMessageSerializer,
ordering: ExpiryTimeConfig,
toLoad: KafkaCacheLoaderImpl
) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency), false) with Runnable {
) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency), offsetCommitIntervalMs, false) with Runnable {

import scala.collection.JavaConverters._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ class KafkaDataStore(
val serializer = serialization.apply(sft)
val initialLoad = config.consumers.readBack.isDefined
val expiry = config.indices.expiry
val loader = new KafkaCacheLoaderImpl(sft, cache, consumers, topic, frequency, serializer, initialLoad, expiry)
val offsetCommitIntervalMs = config.consumers.offsetCommitIntervalMs.getOrElse(10000L)
val loader = new KafkaCacheLoaderImpl(sft, cache, consumers, topic, frequency, serializer, initialLoad, expiry, offsetCommitIntervalMs)
try { loader.start() } catch {
case NonFatal(e) =>
CloseWithLogging(loader)
Expand Down Expand Up @@ -587,7 +588,8 @@ object KafkaDataStore extends LazyLogging {
count: Int,
groupPrefix: String,
properties: Map[String, String],
readBack: Option[Duration]
readBack: Option[Duration],
offsetCommitIntervalMs: Option[Long]
)

case class ProducerConfig(properties: Map[String, String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ object KafkaDataStoreFactory extends GeoMesaDataStoreInfo with LazyLogging {
KafkaDataStoreParams.ConsumerCount,
KafkaDataStoreParams.ConsumerGroupPrefix,
KafkaDataStoreParams.ConsumerConfig,
KafkaDataStoreParams.ConsumerOffsetCommitIntervalMs,
KafkaDataStoreParams.ConsumerReadBack,
KafkaDataStoreParams.CacheExpiry,
KafkaDataStoreParams.DynamicCacheExpiry,
Expand Down Expand Up @@ -151,7 +152,8 @@ object KafkaDataStoreFactory extends GeoMesaDataStoreInfo with LazyLogging {
}
val props = ConsumerConfig.lookupOpt(params).map(_.asScala.toMap).getOrElse(Map.empty[String, String])
val readBack = ConsumerReadBack.lookupOpt(params)
KafkaDataStore.ConsumerConfig(count, prefix, props, readBack)
val offsetCommitIntervalMs = ConsumerOffsetCommitIntervalMs.lookupOpt(params).map(_.toLong)
KafkaDataStore.ConsumerConfig(count, prefix, props, readBack, offsetCommitIntervalMs)
}

val producers = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ object KafkaDataStoreParams extends NamespaceParams {
readWrite = ReadWriteFlag.ReadOnly
)

val ConsumerOffsetCommitIntervalMs =
new GeoMesaParam[java.lang.Long](
"kafka.consumer.offset-commit-interval-ms",
"The frequency of committing offsets for the Kafka consumer",
default = 10000,
readWrite = ReadWriteFlag.ReadOnly
)

val TopicPartitions =
new GeoMesaParam[Integer](
"kafka.topic.partitions",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.util.control.NonFatal
abstract class ThreadedConsumer(
consumers: Seq[Consumer[Array[Byte], Array[Byte]]],
frequency: Duration,
offsetCommitIntervalMs: Long,
closeConsumers: Boolean = true
) extends BaseThreadedConsumer(consumers) {

Expand All @@ -44,6 +45,8 @@ abstract class ThreadedConsumer(

lazy private val topics = consumer.subscription().asScala.mkString(", ")

private var lastOffsetCommitMs = System.currentTimeMillis()

override def run(): Unit = {
try {
var interrupted = false
Expand All @@ -60,7 +63,11 @@ abstract class ThreadedConsumer(
consume(records.next())
}
logger.trace(s"Consumer [$id] finished processing ${result.count()} records from topic $topics")
consumer.commitAsync(callback)
if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) {
logger.trace(s"Consumer [$id] committing offsets")
consumer.commitAsync()
lastOffsetCommitMs = System.currentTimeMillis()
}
errorCount = 0 // reset error count
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,5 +218,6 @@ object LambdaDataStore {
consumers: Int,
expiry: Option[FiniteDuration],
persistBatchSize: Option[Int] = None,
offsetCommitIntervalMs: Long = 10000
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ object LambdaDataStoreParams extends GeoMesaDataStoreParams with SecurityParams
"Offset manager instance to use",
deprecatedKeys = Seq("lamdab.offset-manager", "offsetManager"))

val ConsumerOffsetCommitIntervalMs =
new GeoMesaParam[java.lang.Long](
"lambda.kafka.consumer.offset-commit-interval-ms",
"The frequency of committing offsets for the Kafka consumer",
default = 10000
)

def parse(params: java.util.Map[String, _], namespace: String): LambdaConfig = {
val brokers = BrokersParam.lookup(params)
val expiry = if (!PersistParam.lookup(params).booleanValue) { None } else {
Expand All @@ -121,6 +128,8 @@ object LambdaDataStoreParams extends GeoMesaDataStoreParams with SecurityParams
val zk = ZookeepersParam.lookup(params)
val zkNamespace = s"gm_lambda_$namespace"

LambdaConfig(zk, zkNamespace, producerConfig, consumerConfig, partitions, consumers, expiry, batchSize)
val offsetCommitIntervalMs : Long = ConsumerOffsetCommitIntervalMs.lookupOpt(params).map(_.toLong).getOrElse(10000)

LambdaConfig(zk, zkNamespace, producerConfig, consumerConfig, partitions, consumers, expiry, batchSize, offsetCommitIntervalMs)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class KafkaCacheLoader(
topic: String,
frequency: Long,
serializer: KryoFeatureSerializer,
cache: WritableFeatureCache) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency)) {
cache: WritableFeatureCache,
offsetCommitIntervalMs: Long) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency), offsetCommitIntervalMs) {

startConsumers()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class KafkaStore(
private val loader = {
val consumers = KafkaStore.consumers(config.consumerConfig, topic, offsetManager, config.consumers, cache.partitionAssigned)
val frequency = KafkaStore.LoadIntervalProperty.toDuration.get.toMillis
new KafkaCacheLoader(consumers, topic, frequency, serializer, cache)
new KafkaCacheLoader(consumers, topic, frequency, serializer, cache, config.offsetCommitIntervalMs)
}

override def createSchema(): Unit = {
Expand Down