diff --git a/docs/user/kafka/spring_stream_binder.rst b/docs/user/kafka/spring_stream_binder.rst new file mode 100644 index 000000000000..b92cd8ba6cc9 --- /dev/null +++ b/docs/user/kafka/spring_stream_binder.rst @@ -0,0 +1,191 @@ +Spring Cloud Stream Geomesa Kafka Datastore Binder +================================================== + +The Spring Cloud Stream Geomesa Kafka Datastore Binder provides an easy way for Spring Cloud Stream apps to hook into +Geomesa Kafka Datastore to process events. + +If you are unfamiliar with Spring Cloud Stream, see the official documentation for an introduction: +https://spring.io/projects/spring-cloud-stream + +Input/Output Types +------------ + +This binder will provide all ``KafkaFeatureEvent`` s from kafka datastore to your configured function definitions. Each +function will have to do it's own type comparison to see if the event is a ``KafkaFeatureEvent.KafkaFeatureChanged``, +``KafkaFeatureEvent.KafkaFeatureRemoved``, or another event type. + +The module also ships with a SimpleFeature converter, which allows you to configure function definitions that consume +or produces ``SimpleFeature`` s and avoid working with ``KafkaFeatureEvent`` s directly. + +.. note:: + + The SimpleFeature converter extracts the SimpleFeature out of ``KafkaFeatureEvent.KafkaFeatureChanged`` events and + ignores all others. Any function definition that consumes SimpleFeatures will miss the + ``KafkaFeatureEvent.KafkaFeatureRemoved`` and ``KafkaFeatureEvent.KafkaFeatureCleared`` messages. And any function + definition that only writes SimpleFeatures will not be able to send those messages. + +Configuration +------------- + +The configuration options are under spring.cloud.stream.kafka-datastore.binder. This binder will accept any +configuration options for the standard java geomesa kafka-datastore, with the periods ('.') replaced with dashes ('-'). +For example, to specify kafka.catalog.topic for the binder, set: + +.. code-block:: yaml + + spring: + cloud: + stream: + kafka-datastore: + binder: + kafka-catalog-topic: geomesa-catalog-topic + +For a full list of configuration options, see: https://www.geomesa.org/documentation/stable/user/kafka/usage.html + +Examples +-------- + +Simple Logger App +----------------- + +.. code-block:: java + + @Bean + public Consumer log() { + return obj -> logger.info(obj.toString()); + } + +.. code-block:: yaml + + spring: + cloud: + function: + definition: log + stream: + kafka-datastore.binder: + kafka-brokers: kafka:9092 + kafka-zookeepers: zookeeper:2181 + function.bindings: + log-in-0: input + bindings: + input: + destination: messages + group: logger + +Simple Enricher App +------------------- + +.. code-block:: java + + @Bean + public Function attachSourceField() { + return sf -> { + sf.setAttribute("source", "un-labelled source"); + return sf; + }; + } + +.. code-block:: yaml + + spring: + cloud: + function: + definition: attachSourceField + stream: + kafka-datastore.binder: + kafka-brokers: kafka:9092 + kafka-zookeepers: zookeeper:2181 + function.bindings: + attachSourceField-in-0: input + attachSourceField-out-0: output + bindings: + input: + destination: un-labelled-source-ob + group: sft-reader + output: + destination: observations + group: sft-writer + +Simple Filter App +------------------- + +.. code-block:: java + + @Bean + public Function excludeMoving() { + return sf -> { + if (sf.getAttribute("status").equals("IN_TRANSIT")) { + return null; + } + return sf; + }; + } + + +.. code-block:: yaml + + spring: + cloud: + function: + definition: filterMoving + stream: + kafka-datastore.binder: + kafka-brokers: kafka:9092 + kafka-zookeepers: zookeeper:2181 + function.bindings: + filterMoving-in-0: input + filterMoving-out-0: output + bindings: + input: + destination: movingAndUnmovingThings + group: sft-reader + output: + destination: unMovingThings + group: sft-writer + +Multiple Datastore App +---------------------- + +In the case of multi-bindings, you simply need to submit override the proper kafka-datastore fields in the environment +field. + +.. code-block:: java + + @Bean + public Function passThrough() { + return event -> event; + } + +.. code-block:: yaml + + spring: + cloud: + function: + definition: passThrough + stream: + kafka-datastore.binder: + kafka-brokers: kafka:9092 + kafka-zookeepers: zookeeper:2181 + function.bindings: + passThrough-in-0: input + passThrough-out-0: output + binders: + kds-start: + type: kafka-datastore + environment: + spring.cloud.stream.kafka-datastore.binder: + kafka-zk-path: geomesa/start + kds-end: + type: kafka-datastore + environment: + spring.cloud.stream.kafka-datastore.binder: + kafka-zk-path: geomesa/end + bindings: + input: + destination: observations + group: sft-reader + binder: kds-start + output: + destination: observations + group: sft-writer + binder: kds-end diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml new file mode 100644 index 000000000000..cf487d828cb5 --- /dev/null +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/pom.xml @@ -0,0 +1,116 @@ + + + 4.0.0 + + org.locationtech.geomesa + geomesa-kafka_2.12 + 5.1.0-SNAPSHOT + + + geomesa-kafka-spring-cloud-stream-binder + GeoMesa Kafka Spring Cloud Stream Binder + + + UTF-8 + 2021.0.9 + 2.7.18 + + + + + org.locationtech.geomesa + geomesa-kafka-datastore_2.12 + + + org.springframework.cloud + spring-cloud-stream + + + org.springframework.boot + spring-boot-starter-log4j2 + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + + + + org.apache.curator + curator-client + + + log4j + log4j + + + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-recipes + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.apache.logging.log4j + log4j-to-slf4j + + + ch.qos.logback + logback-classic + + + org.junit.jupiter + junit-jupiter + + + + + junit + junit + test + + + + + + + + org.slf4j + slf4j-reload4j + test + + + + + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + + \ No newline at end of file diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfiguration.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfiguration.java new file mode 100644 index 000000000000..e7bb93592c1c --- /dev/null +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfiguration.java @@ -0,0 +1,75 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.kafka.spring.binder; + +import org.geotools.api.data.DataStore; +import org.geotools.api.data.DataStoreFinder; +import org.locationtech.geomesa.kafka.spring.binder.converters.SimpleFeatureConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +@Configuration +@Import({ PropertyPlaceholderAutoConfiguration.class }) +@EnableConfigurationProperties({KafkaDatastoreBinderConfigurationProperties.class}) +public class KafkaDatastoreBinderConfiguration { + private static final Logger logger = LoggerFactory.getLogger(KafkaDatastoreBinderConfiguration.class); + + @Autowired + KafkaDatastoreBinderConfigurationProperties kafkaDatastoreBinderConfigurationProperties; + + @Bean + @ConditionalOnMissingBean + public Supplier dsFactory() { + return () -> { + Map inParameters = new HashMap<>(); + kafkaDatastoreBinderConfigurationProperties.getBinder() + .forEach((key, value) -> inParameters.put(key.replace('-', '.'), value)); + logger.info("Binder config: {}", kafkaDatastoreBinderConfigurationProperties.getBinder()); + logger.info("Connecting to the KDS with params: {}", inParameters); + + try { + return DataStoreFinder.getDataStore(inParameters); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + + + @Bean + @ConditionalOnMissingBean + public KafkaDatastoreBinderProvisioner kafkaDatastoreBinderProvisioner() { + return new KafkaDatastoreBinderProvisioner(); + } + + @Bean + @ConditionalOnMissingBean + public KafkaDatastoreMessageBinder kafkaDatastoreMessageBinder(KafkaDatastoreBinderProvisioner kafkaDatastoreBinderProvisioner) { + return new KafkaDatastoreMessageBinder(null, kafkaDatastoreBinderProvisioner, dsFactory()); + } + + @Bean + @ConditionalOnMissingBean + public SimpleFeatureConverter simpleFeatureConverter() { + return new SimpleFeatureConverter(); + } +} diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfigurationProperties.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfigurationProperties.java new file mode 100644 index 000000000000..558970012e34 --- /dev/null +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderConfigurationProperties.java @@ -0,0 +1,28 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.kafka.spring.binder; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +@ConfigurationProperties(prefix = "spring.cloud.stream.kafka-datastore") + public class KafkaDatastoreBinderConfigurationProperties { + public Map binder = new HashMap<>(); + + public Map getBinder() { + return binder; + } + + public void setBinder(Map additionalProperties) { + this.binder = additionalProperties; + } +} diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderProvisioner.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderProvisioner.java new file mode 100644 index 000000000000..5b97b2f3e44b --- /dev/null +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreBinderProvisioner.java @@ -0,0 +1,52 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.kafka.spring.binder; + +import org.springframework.cloud.stream.binder.ConsumerProperties; +import org.springframework.cloud.stream.binder.ProducerProperties; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.cloud.stream.provisioning.ProvisioningProvider; + +public class KafkaDatastoreBinderProvisioner implements ProvisioningProvider { + + @Override + public ProducerDestination provisionProducerDestination( + final String name, + final ProducerProperties producerProperties) { + return new KafkaDatastoreDestination(name); + } + + @Override + public ConsumerDestination provisionConsumerDestination( + final String name, + final String group, + final ConsumerProperties consumerProperties) { + return new KafkaDatastoreDestination(name); + } + + private class KafkaDatastoreDestination implements ProducerDestination, ConsumerDestination { + + private final String destination; + + private KafkaDatastoreDestination(final String destination) { + this.destination = destination; + } + + @Override + public String getName() { + return destination; + } + + @Override + public String getNameForPartition(int partition) { + return destination; + } + } +} diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinder.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinder.java new file mode 100644 index 000000000000..acd190c69d21 --- /dev/null +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinder.java @@ -0,0 +1,116 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.kafka.spring.binder; + +import org.geotools.api.data.DataStore; +import org.geotools.api.data.FeatureWriter; +import org.geotools.api.data.SimpleFeatureStore; +import org.geotools.api.data.Transaction; +import org.geotools.api.feature.simple.SimpleFeature; +import org.geotools.api.feature.simple.SimpleFeatureType; +import org.geotools.api.filter.Filter; +import org.geotools.api.filter.FilterFactory; +import org.geotools.factory.CommonFactoryFinder; +import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent; +import org.locationtech.geomesa.utils.geotools.FeatureUtils; +import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypeLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; +import org.springframework.cloud.stream.binder.ConsumerProperties; +import org.springframework.cloud.stream.binder.ProducerProperties; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.integration.core.MessageProducer; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +import java.io.IOException; +import java.util.Set; +import java.util.function.Supplier; + +public class KafkaDatastoreMessageBinder extends AbstractMessageChannelBinder { + private static final Logger logger = LoggerFactory.getLogger(KafkaDatastoreMessageBinder.class); + + private final Supplier dsFactory; + private final DataStore ds; + FeatureWriter writer; + FilterFactory ff; + + public KafkaDatastoreMessageBinder( + String[] headersToEmbed, + KafkaDatastoreBinderProvisioner provisioningProvider, + Supplier dsFactory + ) { + super(headersToEmbed, provisioningProvider); + this.dsFactory = dsFactory; + this.ds = dsFactory.get(); + ff = CommonFactoryFinder.getFilterFactory(); + } + + // Maybe handle a Collection and a Collection + @Override + protected MessageHandler createProducerMessageHandler( + final ProducerDestination destination, + final ProducerProperties producerProperties, + final MessageChannel errorChannel) { + return message -> { + String sfName = destination.getName(); + SimpleFeature payload; + + try { + var sft = SimpleFeatureTypeLoader.sftForName(sfName); + if (sft.isDefined()) { + ds.createSchema(sft.get()); + } else { + try { + ds.getSchema(sfName); + logger.debug("There is no local schema for {}, but we found it in the kds", sfName); + } catch (IOException e) { + logger.error("There is no sft schema {} in the kds {} or locally", sfName, ds.getInfo().getDescription(), e); + } + } + + if (message.getPayload() instanceof SimpleFeature) { + payload = (SimpleFeature) message.getPayload(); + } else if (message.getPayload() instanceof KafkaFeatureEvent.KafkaFeatureChanged) { + payload = ((KafkaFeatureEvent.KafkaFeatureChanged) message.getPayload()).feature(); + } else if (message.getPayload() instanceof KafkaFeatureEvent.KafkaFeatureRemoved) { + var remove = (KafkaFeatureEvent.KafkaFeatureRemoved) message.getPayload(); + SimpleFeatureStore featureStore = (SimpleFeatureStore) ds.getFeatureSource(sfName); + featureStore.removeFeatures(ff.id(Set.of(ff.featureId(remove.id())))); + return; + } else if (message.getPayload() instanceof KafkaFeatureEvent.KafkaFeatureCleared) { + SimpleFeatureStore featureStore = (SimpleFeatureStore) ds.getFeatureSource(sfName); + featureStore.removeFeatures(Filter.INCLUDE); + return; + } else { + logger.warn("Could not process message with header {} and payload {}", message.getHeaders(), message.getPayload()); + return; + } + + if (writer == null) { + writer = ds.getFeatureWriterAppend(sfName, Transaction.AUTO_COMMIT); + } + + FeatureUtils.write(writer, payload, true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + + @Override + protected MessageProducer createConsumerEndpoint( + final ConsumerDestination destination, + final String group, + final ConsumerProperties properties) { + return new KafkaDatastoreMessageProducer(destination, dsFactory); + } +} diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageProducer.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageProducer.java new file mode 100644 index 000000000000..98e4b4f85944 --- /dev/null +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageProducer.java @@ -0,0 +1,70 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.kafka.spring.binder; + + +import org.geotools.api.data.DataStore; +import org.geotools.api.data.FeatureEvent; +import org.geotools.api.data.SimpleFeatureStore; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import java.io.IOException; +import java.util.function.Supplier; + +public class KafkaDatastoreMessageProducer extends MessageProducerSupport { + private static final Logger logger = LoggerFactory.getLogger(KafkaDatastoreMessageProducer.class); + + private final ConsumerDestination destination; + private final Supplier dsFactory; + private DataStore ds; + + public KafkaDatastoreMessageProducer(ConsumerDestination destination, + Supplier dsFactory) { + this.destination = destination; + this.dsFactory = dsFactory; + } + + @Override + public void doStart() { + SimpleFeatureStore fs = getSimpleFeatureStore(); + + fs.addFeatureListener(featureEvent -> { + Message receivedMessage = MessageBuilder + .withPayload(featureEvent) + .setHeader("contentType", "application/kafka-feature-event") + .build(); + sendMessage(receivedMessage); + }); + } + + private @NotNull SimpleFeatureStore getSimpleFeatureStore() { + SimpleFeatureStore fs = null; + while (fs == null) { + try { + this.ds = dsFactory.get(); + fs = (SimpleFeatureStore) ds.getFeatureSource(destination.getName()); + } catch (IOException e) { + logger.warn("Could not connect to KDS input, waiting for KDS to be created. Error: {}", e.getMessage()); + try { + Thread.sleep(5000); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + logger.info("Successfully connected to the input KDS!"); + return fs; + } +} diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/converters/SimpleFeatureConverter.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/converters/SimpleFeatureConverter.java new file mode 100644 index 000000000000..496aef697e56 --- /dev/null +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/java/org/locationtech/geomesa/kafka/spring/binder/converters/SimpleFeatureConverter.java @@ -0,0 +1,54 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.kafka.spring.binder.converters; + +import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent; +import org.geotools.api.feature.simple.SimpleFeature; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.AbstractMessageConverter; +import org.springframework.util.MimeType; + +public class SimpleFeatureConverter extends AbstractMessageConverter { + + public SimpleFeatureConverter() { + super(new MimeType("application", "kafka-feature-event")); + } + + @Override + protected boolean supports(Class aClass) { + return KafkaFeatureEvent.KafkaFeatureChanged.class.equals(aClass); + } + + @Override + protected boolean canConvertFrom(Message message, Class targetClass) { + return message.getPayload() instanceof KafkaFeatureEvent.KafkaFeatureChanged; + } + + @Override + protected Object convertFromInternal(Message message, Class targetClass, Object conversionHint) { + KafkaFeatureEvent.KafkaFeatureChanged event = (KafkaFeatureEvent.KafkaFeatureChanged) message.getPayload(); + return event.feature(); + } + + @Override + protected Object convertToInternal(Object payload, MessageHeaders headers, + Object conversionHint) { + return payload; + } + + @Override + protected MimeType getDefaultContentType(Object toBeConverted) { + if (toBeConverted instanceof SimpleFeature) { + return new MimeType("application", "simple-feature"); + } else { + return super.getDefaultContentType(toBeConverted); + } + } +} diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders new file mode 100644 index 000000000000..efa66a3b0df4 --- /dev/null +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders @@ -0,0 +1,2 @@ +kafka-datastore:\ +org.locationtech.geomesa.kafka.spring.binder.KafkaDatastoreBinderConfiguration \ No newline at end of file diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinderTest.java b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinderTest.java new file mode 100644 index 000000000000..077a5ef1f382 --- /dev/null +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/java/org/locationtech/geomesa/kafka/spring/binder/KafkaDatastoreMessageBinderTest.java @@ -0,0 +1,200 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.kafka.spring.binder; + +import org.geotools.api.data.DataStore; +import org.geotools.api.data.FeatureWriter; +import org.geotools.api.data.SimpleFeatureStore; +import org.geotools.api.feature.simple.SimpleFeature; +import org.geotools.api.feature.simple.SimpleFeatureType; +import org.geotools.api.filter.Filter; +import org.geotools.feature.simple.SimpleFeatureBuilder; +import org.junit.Before; +import org.junit.Test; +import org.locationtech.geomesa.kafka.utils.KafkaFeatureEvent; +import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypeLoader; +import org.springframework.cloud.stream.binder.ProducerProperties; +import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.support.MessageBuilder; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Date; +import java.util.function.Supplier; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +public class KafkaDatastoreMessageBinderTest { + + DataStore ds; + FeatureWriter featureWriter; + SimpleFeatureStore simpleFeatureStore; + SubscribableChannel errorChannel; + Supplier dsFactory; + + @Before + public void init() { + ds = mock(DataStore.class); + featureWriter = mock(FeatureWriter.class); + simpleFeatureStore = mock(SimpleFeatureStore.class); + errorChannel = mock(SubscribableChannel.class); + dsFactory = () -> ds; + } + + @Test + public void producerMessageHandler_canWriteSft() throws IOException { + + SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get(); + SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0"); + doReturn(featureWriter).when(ds).getFeatureWriterAppend(any(), any()); + doReturn(writeableFeature).when(featureWriter).next(); + + KafkaDatastoreBinderProvisioner provisioningProvider = new KafkaDatastoreBinderProvisioner(); + provisioningProvider = new KafkaDatastoreBinderProvisioner(); + KafkaDatastoreMessageBinder messageBinder = new KafkaDatastoreMessageBinder(new String[]{}, provisioningProvider, dsFactory); + + ProducerProperties producerProperties = new ProducerProperties(); + ProducerDestination destination = provisioningProvider.provisionProducerDestination("test-out", producerProperties); + + MessageHandler handler = messageBinder.createProducerMessageHandler(destination, producerProperties, errorChannel); + + + SimpleFeature simpleFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id1"); + simpleFeature.setAttribute("id", "123456"); + Message message = MessageBuilder.withPayload(simpleFeature) + .setHeader("featureType", "application/simple-feature") + .build(); + + handler.handleMessage(message); + + verify(featureWriter).write(); + assertThat(writeableFeature.getAttribute("id")).isEqualTo("123456"); + } + + @Test + public void producerMessageHandler_canWriteKafkaFeatureEventChanged() throws IOException { + var now = Instant.now(); + SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get(); + SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0"); + doReturn(featureWriter).when(ds).getFeatureWriterAppend(any(), any()); + doReturn(writeableFeature).when(featureWriter).next(); + + KafkaDatastoreBinderProvisioner provisioningProvider = new KafkaDatastoreBinderProvisioner(); + provisioningProvider = new KafkaDatastoreBinderProvisioner(); + KafkaDatastoreMessageBinder messageBinder = new KafkaDatastoreMessageBinder(new String[]{}, provisioningProvider, dsFactory); + + ProducerProperties producerProperties = new ProducerProperties(); + ProducerDestination destination = provisioningProvider.provisionProducerDestination("test-out", producerProperties); + + MessageHandler handler = messageBinder.createProducerMessageHandler(destination, producerProperties, errorChannel); + + + SimpleFeature simpleFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id1"); + simpleFeature.setAttribute("id", "123456"); + simpleFeature.setAttribute("dtg", now); + KafkaFeatureEvent changed = new KafkaFeatureEvent.KafkaFeatureChanged("test", simpleFeature, Instant.now().getEpochSecond()); + Message message = MessageBuilder.withPayload(changed) + .setHeader("featureType", "application/kafka-feature-event") + .build(); + + handler.handleMessage(message); + + verify(featureWriter).write(); + assertThat(writeableFeature.getAttribute("id")).isEqualTo("123456"); + assertThat(writeableFeature.getAttribute("dtg")).isEqualTo(Date.from(now)); + } + + @Test + public void producerMessageHandler_canWriteKafkaFeatureEventRemoved() throws IOException { + SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get(); + SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0"); + doReturn(simpleFeatureStore).when(ds).getFeatureSource("test-out"); + + KafkaDatastoreBinderProvisioner provisioningProvider = new KafkaDatastoreBinderProvisioner(); + provisioningProvider = new KafkaDatastoreBinderProvisioner(); + KafkaDatastoreMessageBinder messageBinder = new KafkaDatastoreMessageBinder(new String[]{}, provisioningProvider, dsFactory); + + ProducerProperties producerProperties = new ProducerProperties(); + ProducerDestination destination = provisioningProvider.provisionProducerDestination("test-out", producerProperties); + + MessageHandler handler = messageBinder.createProducerMessageHandler(destination, producerProperties, errorChannel); + + + KafkaFeatureEvent removed = new KafkaFeatureEvent.KafkaFeatureRemoved("test-out", "id1", null, Instant.now().getEpochSecond()); + Message message = MessageBuilder.withPayload(removed) + .setHeader("featureType", "application/kafka-feature-event") + .build(); + + handler.handleMessage(message); + + verify(simpleFeatureStore).removeFeatures(any(Filter.class)); + } + + @Test + public void producerMessageHandler_canWriteKafkaFeatureEventClear() throws IOException { + SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get(); + SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0"); + doReturn(simpleFeatureStore).when(ds).getFeatureSource("test-out"); + + KafkaDatastoreBinderProvisioner provisioningProvider = new KafkaDatastoreBinderProvisioner(); + provisioningProvider = new KafkaDatastoreBinderProvisioner(); + KafkaDatastoreMessageBinder messageBinder = new KafkaDatastoreMessageBinder(new String[]{}, provisioningProvider, dsFactory); + + ProducerProperties producerProperties = new ProducerProperties(); + ProducerDestination destination = provisioningProvider.provisionProducerDestination("test-out", producerProperties); + + MessageHandler handler = messageBinder.createProducerMessageHandler(destination, producerProperties, errorChannel); + + + KafkaFeatureEvent cleared = new KafkaFeatureEvent.KafkaFeatureCleared("test-out", Instant.now().getEpochSecond()); + Message message = MessageBuilder.withPayload(cleared) + .setHeader("featureType", "application/kafka-feature-event") + .build(); + + handler.handleMessage(message); + + verify(simpleFeatureStore).removeFeatures(Filter.INCLUDE); + } + + @Test + public void producerMessageHandler_loadSftFromClasspath() throws IOException { + SimpleFeatureType sft = SimpleFeatureTypeLoader.sftForName("observation").get(); + SimpleFeature writeableFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id0"); + doReturn(featureWriter).when(ds).getFeatureWriterAppend(any(), any()); + doReturn(writeableFeature).when(featureWriter).next(); + + KafkaDatastoreBinderProvisioner provisioningProvider = new KafkaDatastoreBinderProvisioner(); + provisioningProvider = new KafkaDatastoreBinderProvisioner(); + KafkaDatastoreMessageBinder messageBinder = new KafkaDatastoreMessageBinder(new String[]{}, provisioningProvider, dsFactory); + + ProducerProperties producerProperties = new ProducerProperties(); + ProducerDestination destination = provisioningProvider.provisionProducerDestination("observation", producerProperties); + + // Doesn't throw an error + MessageHandler handler = messageBinder.createProducerMessageHandler(destination, producerProperties, errorChannel); + + SimpleFeature simpleFeature = SimpleFeatureBuilder.build(sft, new ArrayList<>(), "id1"); + simpleFeature.setAttribute("id", "123456"); + KafkaFeatureEvent changed = new KafkaFeatureEvent.KafkaFeatureChanged("test", simpleFeature, Instant.now().getEpochSecond()); + Message message = MessageBuilder.withPayload(changed) + .setHeader("featureType", "application/kafka-feature-event") + .build(); + + handler.handleMessage(message); + + verify(ds).createSchema(any()); + } + +} \ No newline at end of file diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/resources/observation.conf b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/resources/observation.conf new file mode 100644 index 000000000000..a277fe846853 --- /dev/null +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/resources/observation.conf @@ -0,0 +1,7 @@ +geomesa.sfts.observation = { + attributes = [ + { name = "location", type = "Point" } + { name = "dtg", type = "Date" } + { name = "id", type = "String" } + ] +} \ No newline at end of file diff --git a/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/resources/reference.conf b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/resources/reference.conf new file mode 100644 index 000000000000..028092e2438c --- /dev/null +++ b/geomesa-kafka/geomesa-kafka-spring-cloud-stream-binder/src/test/resources/reference.conf @@ -0,0 +1 @@ +include "observation.conf" \ No newline at end of file diff --git a/geomesa-kafka/pom.xml b/geomesa-kafka/pom.xml index 994adb5ca33c..3b8898fde893 100644 --- a/geomesa-kafka/pom.xml +++ b/geomesa-kafka/pom.xml @@ -18,6 +18,7 @@ geomesa-kafka-gs-plugin geomesa-kafka-tools geomesa-kafka-utils + geomesa-kafka-spring-cloud-stream-binder