Skip to content

Commit a0ad631

Browse files
committed
add kafka sink (with the quix connector to be moved later)
1 parent 2cc816b commit a0ad631

10 files changed

Lines changed: 722 additions & 0 deletions

File tree

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Kafka Replicator Sink
2+
3+
[This connector](https://github.com/quixio/quix-samples/tree/main/python/destinations/kafka) demonstrates how to consume data from a Quix topic and produce it to an external Kafka cluster.
4+
5+
This sink uses the `KafkaReplicatorSink` to serialize and produce messages to an external Kafka cluster, making it easy to replicate data between Kafka clusters or export data from Quix to other Kafka-based systems.
6+
7+
## How to run
8+
9+
Create a [Quix](https://portal.cloud.quix.io/signup?utm_campaign=github) account or log-in and visit the `Connectors` tab to use this connector.
10+
11+
Clicking `Set up connector` allows you to enter your connection details and runtime parameters.
12+
13+
Then either:
14+
* click `Test connection & deploy` to deploy the pre-built and configured container into Quix.
15+
16+
* or click `Customise connector` to inspect or alter the code before deployment.
17+
18+
## Requirements / Prerequisites
19+
20+
You'll need to have an external Kafka cluster accessible either locally or in the cloud.
21+
22+
## Environment Variables
23+
24+
The connector uses the following environment variables:
25+
26+
### Required
27+
- **input**: Name of the input topic to listen to.
28+
- **SINK_OUTPUT_TOPIC**: The target Kafka topic name to produce to on the external Kafka cluster.
29+
- **SINK_BOOTSTRAP_SERVERS**: The external Kafka broker address (e.g., localhost:9092 or broker.example.com:9092).
30+
31+
### Optional
32+
- **CONSUMER_GROUP**: Name of the consumer group for consuming from Quix. Default: "kafka_sink"
33+
- **SINK_KEY_SERIALIZER**: Serializer to use for the message key. Options: json, bytes, string, double, integer. Default: "bytes"
34+
- **SINK_VALUE_SERIALIZER**: Serializer to use for the message value. Options: json, bytes, string, double, integer. Default: "json"
35+
- **SINK_AUTO_CREATE_TOPIC**: Whether to attempt to create the sink topic upon startup. Default: "true"
36+
37+
### Authentication (Optional)
38+
- **SINK_SECURITY_PROTOCOL**: Protocol used to communicate with brokers. Options: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
39+
- **SINK_SASL_MECHANISM**: SASL mechanism for authentication. Options: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER, AWS_MSK_IAM
40+
- **SINK_SASL_USERNAME**: SASL username for external Kafka authentication.
41+
- **SINK_SASL_PASSWORD**: SASL password for external Kafka authentication.
42+
- **SINK_SSL_CA_LOCATION**: Path to the SSL CA certificate file for secure connections.
43+
44+
## Contribute
45+
46+
Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.
47+
48+
## Open source
49+
50+
This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo.
51+
52+
Please star us and mention us on social to show your appreciation.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
FROM python:3.12.5-slim-bookworm
2+
3+
# Set environment variables for non-interactive setup and unbuffered output
4+
ENV DEBIAN_FRONTEND=noninteractive \
5+
PYTHONUNBUFFERED=1 \
6+
PYTHONIOENCODING=UTF-8 \
7+
PYTHONPATH="/app"
8+
9+
# Build argument for setting the main app path
10+
ARG MAINAPPPATH=.
11+
12+
# Set working directory inside the container
13+
WORKDIR /app
14+
15+
# Copy requirements to leverage Docker cache
16+
COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt"
17+
18+
# Install dependencies without caching
19+
RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt"
20+
21+
# Copy entire application into container
22+
COPY . .
23+
24+
# Set working directory to main app path
25+
WORKDIR "/app/${MAINAPPPATH}"
26+
27+
# Define the container's startup command
28+
ENTRYPOINT ["python3", "main.py"]
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
{
2+
"libraryItemId": "kafka-replicator-sink",
3+
"name": "Kafka Replicator Sink",
4+
"language": "Python",
5+
"tags": {
6+
"Pipeline Stage": ["Destination"],
7+
"Type": ["Connectors"],
8+
"Category": ["Data streaming"]
9+
},
10+
"shortDescription": "Consume data from a Quix topic and produce it to an external Kafka cluster",
11+
"DefaultFile": "main.py",
12+
"EntryPoint": "dockerfile",
13+
"RunEntryPoint": "main.py",
14+
"IconFile": "icon.png",
15+
"Variables": [
16+
{
17+
"Name": "input",
18+
"Type": "EnvironmentVariable",
19+
"InputType": "InputTopic",
20+
"Description": "Name of the input topic to listen to.",
21+
"Required": true
22+
},
23+
{
24+
"Name": "SINK_AUTO_CREATE_TOPIC",
25+
"Type": "EnvironmentVariable",
26+
"InputType": "FreeText",
27+
"Description": "Path to the SSL CA certificate file for secure connections. If not provided, system default CA certificates will be used",
28+
"defaultValue": true,
29+
"Required": false
30+
},
31+
{
32+
"Name": "CONSUMER_GROUP",
33+
"Type": "EnvironmentVariable",
34+
"InputType": "FreeText",
35+
"Description": "Name of the consumer group",
36+
"DefaultValue": "kafka_sink",
37+
"Required": false
38+
},
39+
{
40+
"Name": "SINK_OUTPUT_TOPIC",
41+
"Type": "EnvironmentVariable",
42+
"InputType": "FreeText",
43+
"Description": "The target Kafka topic name to produce to on the external Kafka cluster",
44+
"Required": true
45+
},
46+
{
47+
"Name": "SINK_BOOTSTRAP_SERVERS",
48+
"Type": "EnvironmentVariable",
49+
"InputType": "FreeText",
50+
"Description": "The external Kafka broker address (e.g., localhost:9092 or broker.example.com:9092)",
51+
"Required": true
52+
},
53+
{
54+
"Name": "SINK_KEY_SERIALIZER",
55+
"Type": "EnvironmentVariable",
56+
"InputType": "FreeText",
57+
"Description": "Serializer to use for the message key. Options: json, bytes, string, double, integer",
58+
"DefaultValue": "bytes",
59+
"Required": false
60+
},
61+
{
62+
"Name": "SINK_VALUE_SERIALIZER",
63+
"Type": "EnvironmentVariable",
64+
"InputType": "FreeText",
65+
"Description": "Serializer to use for the message value. Options: json, bytes, string, double, integer",
66+
"DefaultValue": "json",
67+
"Required": false
68+
},
69+
{
70+
"Name": "SINK_SECURITY_PROTOCOL",
71+
"Type": "EnvironmentVariable",
72+
"InputType": "FreeText",
73+
"Description": "Protocol used to communicate with brokers. Options: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL",
74+
"Required": false
75+
},
76+
{
77+
"Name": "SINK_SASL_MECHANISM",
78+
"Type": "EnvironmentVariable",
79+
"InputType": "FreeText",
80+
"Description": "SASL mechanism for authentication. Options: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER, AWS_MSK_IAM",
81+
"Required": false
82+
},
83+
{
84+
"Name": "SINK_SASL_USERNAME",
85+
"Type": "EnvironmentVariable",
86+
"InputType": "FreeText",
87+
"Description": "SASL username for external Kafka authentication",
88+
"Required": false
89+
},
90+
{
91+
"Name": "SINK_SASL_PASSWORD",
92+
"Type": "EnvironmentVariable",
93+
"InputType": "Secret",
94+
"Description": "SASL password for external Kafka authentication",
95+
"Required": false
96+
},
97+
{
98+
"Name": "SINK_SSL_CA_LOCATION",
99+
"Type": "EnvironmentVariable",
100+
"InputType": "FreeText",
101+
"Description": "Path to the SSL CA certificate file for secure connections. If not provided, system default CA certificates will be used",
102+
"Required": false
103+
}
104+
],
105+
"DeploySettings": {
106+
"DeploymentType": "Service",
107+
"CpuMillicores": 200,
108+
"MemoryInMb": 200,
109+
"Replicas": 1,
110+
"PublicAccess": false,
111+
"ValidateConnection": true
112+
}
113+
}

python/destinations/kafka/main.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import os
2+
from typing import Tuple, Type
3+
4+
from pydantic_settings import (
5+
BaseSettings as PydanticBaseSettings,
6+
PydanticBaseSettingsSource,
7+
SettingsConfigDict
8+
)
9+
10+
from quixstreams import Application
11+
from quixstreams.kafka.configuration import ConnectionConfig
12+
13+
from sink import KafkaReplicatorSink
14+
15+
16+
class SinkConnectionConfig(ConnectionConfig):
17+
"""
18+
A ConnectionConfig subclass that reads configuration from environment variables
19+
with a SINK_ prefix.
20+
21+
This allows users to configure the sink's Kafka connection using environment
22+
variables like SINK_BOOTSTRAP_SERVERS, SINK_SASL_USERNAME, etc.
23+
24+
Example:
25+
export SINK_BOOTSTRAP_SERVERS=kafka:9092
26+
export SINK_SECURITY_PROTOCOL=SASL_SSL
27+
export SINK_SASL_MECHANISM=PLAIN
28+
export SINK_SASL_USERNAME=myuser
29+
export SINK_SASL_PASSWORD=mypass
30+
31+
# Then create the config
32+
config = SinkConnectionConfig()
33+
sink = KafkaSink(broker_address=config, topic_name="output-topic")
34+
"""
35+
36+
model_config = SettingsConfigDict(
37+
env_prefix="SINK_",
38+
)
39+
40+
@classmethod
41+
def settings_customise_sources(
42+
cls,
43+
settings_cls: Type[PydanticBaseSettings],
44+
init_settings: PydanticBaseSettingsSource,
45+
env_settings: PydanticBaseSettingsSource,
46+
dotenv_settings: PydanticBaseSettingsSource,
47+
file_secret_settings: PydanticBaseSettingsSource,
48+
) -> Tuple[PydanticBaseSettingsSource, ...]:
49+
"""
50+
Enable reading values from environment variables with SINK_ prefix.
51+
"""
52+
return init_settings, env_settings
53+
54+
55+
app = Application(
56+
consumer_group=os.environ["CONSUMER_GROUP"],
57+
auto_offset_reset="earliest",
58+
)
59+
input_topic = app.topic(os.environ['input'])
60+
kafka_sink = KafkaReplicatorSink(
61+
broker_address=SinkConnectionConfig(),
62+
topic_name=os.environ["SINK_OUTPUT_TOPIC"],
63+
key_serializer=os.getenv("SINK_KEY_SERIALIZER", "bytes"),
64+
value_serializer=os.getenv("SINK_VALUE_SERIALIZER", "json"),
65+
origin_topic=input_topic,
66+
auto_create_sink_topic=os.getenv("SINK_AUTO_CREATE_TOPIC", "true").lower() == "true",
67+
)
68+
app.dataframe(input_topic).sink(kafka_sink)
69+
70+
71+
if __name__ == '__main__':
72+
app.run()
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
quixstreams==3.23.1
2+
python-dotenv

0 commit comments

Comments
 (0)