Skip to content

Commit 37f1eec

Browse files
committed
PubSub: add support for Kafka
1 parent c3c3eb8 commit 37f1eec

6 files changed

Lines changed: 150 additions & 4 deletions

File tree

docs/source/pubsub.rst

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,41 @@ Example directive:
9999
channel: messages/a/data # optional
100100
hidden: false # default
101101
102+
Kafka
103+
^^^^^
104+
105+
Example directive:
106+
107+
.. code-block:: yaml
108+
109+
pubsub:
110+
name: Kafka
111+
broker:
112+
url: tcp://localhost:9092
113+
channel: messages-a-data
114+
# if using authentication:
115+
# sasl_mechanism: PLAIN # default PLAIN
116+
# sasl_security_protocol: SASL_PLAINTEXT # default SASL_PLAINTEXT
117+
hidden: true # default false
118+
119+
.. note::
120+
121+
For any Pub/Sub endpoints requiring authentication, encode the ``url`` value as follows:
122+
123+
* ``mqtt://username:password@localhost:1883``
124+
* ``https://username:password@localhost``
125+
126+
As with any section of the pygeoapi configuration, environment variables may be used as needed, for example
127+
to set username/password information in a URL. If ``pubsub.broker.url`` contains authentication, and
128+
``pubsub.broker.hidden`` is ``false``, the authentication information will be stripped from the URL
129+
before displaying it on the landing page.
130+
131+
.. note::
132+
133+
If a ``channel`` is defined, it is used as a prefix to the relevant OGC API endpoint used.
134+
135+
If a ``channel`` is not defined, only the relevant OGC API endpoint is used.
136+
102137
HTTP
103138
^^^^
104139

pygeoapi/plugin.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
},
8989
'pubsub': {
9090
'HTTP': 'pygeoapi.pubsub.http.HTTPPubSubClient',
91+
'Kafka': 'pygeoapi.pubsub.kafka.KafkaPubSubClient',
9192
'MQTT': 'pygeoapi.pubsub.mqtt.MQTTPubSubClient'
9293
}
9394
}

pygeoapi/pubsub/http.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
class HTTPPubSubClient(BasePubSubClient):
4242
"""HTTP client"""
4343

44-
def __init__(self, broker_url):
44+
def __init__(self, publisher_def):
4545
"""
4646
Initialize object
4747
@@ -50,7 +50,7 @@ def __init__(self, broker_url):
5050
:returns: pygeoapi.pubsub.http.HTTPPubSubClient
5151
"""
5252

53-
super().__init__(broker_url)
53+
super().__init__(publisher_def)
5454
self.name = 'HTTP'
5555
self.type = 'http'
5656
self.auth = None

pygeoapi/pubsub/kafka.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
# =================================================================
2+
#
3+
# Authors: Tom Kralidis <tomkralidis@gmail.com>
4+
#
5+
# Copyright (c) 2026 Tom Kralidis
6+
#
7+
# Permission is hereby granted, free of charge, to any person
8+
# obtaining a copy of this software and associated documentation
9+
# files (the "Software"), to deal in the Software without
10+
# restriction, including without limitation the rights to use,
11+
# copy, modify, merge, publish, distribute, sublicense, and/or sell
12+
# copies of the Software, and to permit persons to whom the
13+
# Software is furnished to do so, subject to the following
14+
# conditions:
15+
#
16+
# The above copyright notice and this permission notice shall be
17+
# included in all copies or substantial portions of the Software.
18+
#
19+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
20+
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
21+
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
22+
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
23+
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
24+
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25+
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
26+
# OTHER DEALINGS IN THE SOFTWARE.
27+
#
28+
# =================================================================
29+
30+
import logging
31+
32+
from kafka import errors, KafkaProducer
33+
34+
from pygeoapi.pubsub.base import BasePubSubClient, PubSubClientConnectionError
35+
from pygeoapi.util import to_json
36+
37+
LOGGER = logging.getLogger(__name__)
38+
39+
40+
class KafkaPubSubClient(BasePubSubClient):
41+
"""Kafka client"""
42+
43+
def __init__(self, publisher_def):
44+
"""
45+
Initialize object
46+
47+
:param publisher_def: provider definition
48+
49+
:returns: pygeoapi.pubsub.kafka.KafkaPubSubClient
50+
"""
51+
52+
super().__init__(publisher_def)
53+
self.name = 'Kafka'
54+
self.type = 'kafka'
55+
self.sasl_mechanism = publisher_def.get('sasl.mechanism', 'PLAIN')
56+
self.security_protocol = publisher_def.get('security.protocol', 'SASL_SSL') # noqa
57+
58+
msg = f'Initializing to broker {self.broker_safe_url} with id {self.client_id}' # noqa
59+
LOGGER.debug(msg)
60+
61+
def connect(self) -> None:
62+
"""
63+
Connect to an Kafka broker
64+
65+
:returns: None
66+
"""
67+
68+
args = {
69+
'bootstrap_servers': f'{self.broker_url.hostname}:{self.broker_url.port}', # noqa
70+
'client_id': self.client_id,
71+
'value_serializer': lambda v: to_json(v).encode('utf-8')
72+
}
73+
if None not in [self.broker_url.username, self.broker_url.password]:
74+
args.update({
75+
'security.protocol': self.security_protocol,
76+
'sasl.mechanism': self.sasl_mechanism,
77+
'sasl.username': self.broker_url.username,
78+
'sasl.password': self.broker_url.password
79+
})
80+
81+
LOGGER.debug('Creating Kafka producer')
82+
try:
83+
self.producer = KafkaProducer(**args)
84+
except errors.NoBrokersAvailable as err:
85+
raise PubSubClientConnectionError(err)
86+
87+
def pub(self, channel: str, message: str) -> bool:
88+
"""
89+
Publish a message to a broker/channel
90+
91+
:param channel: `str` of topic
92+
:param message: `str` of message
93+
94+
:returns: `bool` of publish result
95+
"""
96+
97+
LOGGER.debug(f'Publishing to broker {self.broker_safe_url}')
98+
LOGGER.debug(f'Channel: {channel}')
99+
LOGGER.debug(f'Message: {message}')
100+
LOGGER.debug('Sanitizing channel for HTTP')
101+
channel = channel.replace('/', '-')
102+
channel = channel.replace(':', '-')
103+
LOGGER.debug(f'Sanitized channel for Kafka: {channel}')
104+
105+
self.producer.send(channel, value=message)
106+
self.producer.flush()
107+
108+
def __repr__(self):
109+
return f'<HTTPPubSubClient> {self.broker_safe_url}'

pygeoapi/pubsub/mqtt.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
class MQTTPubSubClient(BasePubSubClient):
4040
"""MQTT client"""
4141

42-
def __init__(self, broker_url):
42+
def __init__(self, publisher_def):
4343
"""
4444
Initialize object
4545
@@ -48,7 +48,7 @@ def __init__(self, broker_url):
4848
:returns: pycsw.pubsub.mqtt.MQTTPubSubClient
4949
"""
5050

51-
super().__init__(broker_url)
51+
super().__init__(publisher_def)
5252
self.type = 'mqtt'
5353
self.port = self.broker_url.port
5454

requirements-pubsub.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1+
kafka-python
12
paho-mqtt

0 commit comments

Comments
 (0)