Skip to content

Commit e79c3f4

Browse files
authored
Merge pull request #2 from starichkov/protobuf-support
Added Protobuf support: extended producer and consumer to handle mult…
2 parents a7c0c11 + 4fbb3a6 commit e79c3f4

14 files changed

Lines changed: 862 additions & 32 deletions

.github/workflows/python.yml

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ jobs:
3838
- name: Run tests with coverage
3939
run: |
4040
source .venv/bin/activate
41-
pytest --cov=producer --cov=consumer --cov-report=xml
41+
pytest --cov=producer --cov=consumer --cov=serialization --cov-report=xml
4242
4343
# Only upload coverage from the main Python version (3.13)
4444
- name: Upload coverage to Codecov (only on Python 3.13)
@@ -65,7 +65,7 @@ jobs:
6565
- name: Build images
6666
run: docker compose build
6767

68-
- name: Start stack
68+
- name: Start stack (JSON default)
6969
run: docker compose up -d
7070

7171
- name: Wait for Kafka to be healthy
@@ -80,7 +80,7 @@ jobs:
8080
done
8181
echo "Kafka did not become healthy in time"; docker compose ps; docker compose logs kafka; exit 1
8282
83-
- name: Verify producer and consumer activity
83+
- name: Verify producer and consumer activity (JSON)
8484
run: |
8585
# wait up to 120s for producer to send and consumer to log messages
8686
for i in $(seq 1 120); do
@@ -97,6 +97,58 @@ jobs:
9797
docker compose logs
9898
exit 1
9999
100+
- name: Tear down (after JSON run)
101+
run: docker compose down -v --remove-orphans
102+
103+
- name: Start stack (Protobuf)
104+
run: MESSAGE_FORMAT=protobuf docker compose up -d
105+
106+
- name: Wait for Kafka to be healthy (protobuf run)
107+
run: |
108+
for i in $(seq 1 60); do
109+
status=$(docker inspect -f '{{json .State.Health.Status}}' kafka 2>/dev/null || echo "null")
110+
echo "Kafka health: $status"
111+
if [ "$status" = "\"healthy\"" ]; then
112+
exit 0
113+
fi
114+
sleep 2
115+
done
116+
echo "Kafka did not become healthy in time"; docker compose ps; docker compose logs kafka; exit 1
117+
118+
- name: Verify producer and consumer activity (Protobuf)
119+
run: |
120+
# wait up to 120s for producer to send and consumer to log messages
121+
for i in $(seq 1 120); do
122+
prod=$(docker logs kafka-producer 2>&1 | grep -c 'Sent: key=' || true)
123+
# Protobuf messages are logged as JSON after parsing
124+
cons=$(docker logs kafka-consumer 2>&1 | grep -E -c 'JSON \(' || true)
125+
echo "Attempt $i: producer=$prod consumer=$cons"
126+
if [ "$prod" -ge 3 ] && [ "$cons" -ge 3 ]; then
127+
echo "Producer and consumer logs look good (protobuf)"
128+
exit 0
129+
fi
130+
sleep 1
131+
done
132+
echo "Producer/consumer (protobuf) did not show expected activity"
133+
docker compose logs
134+
exit 1
135+
136+
- name: Verify wire annotation appears (Protobuf)
137+
run: |
138+
# After activity is verified, ensure the consumer logs show at least one wire=protobuf annotation
139+
for i in $(seq 1 60); do
140+
ann=$(docker logs kafka-consumer 2>&1 | grep -c '\[wire=protobuf\]' || true)
141+
echo "Attempt $i: wire_protobuf_annotations=$ann"
142+
if [ "$ann" -ge 1 ]; then
143+
echo "Found wire=protobuf annotation in consumer logs"
144+
exit 0
145+
fi
146+
sleep 1
147+
done
148+
echo "wire=protobuf annotation not found in consumer logs"
149+
docker logs kafka-consumer || true
150+
exit 1
151+
100152
- name: Show service logs (for debugging)
101153
if: failure()
102154
run: docker compose logs --no-color

README.md

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55

66
# Apache Kafka Python Demo
77

8-
## 🧩 Overview
8+
## Overview
99

1010
A minimal Apache Kafka demo using Python featuring:
11-
- A **JSON-producing** `producer.py`
12-
- A **format-tolerant** `consumer.py` supporting JSON & plain text
11+
- A multi-format `producer.py` (JSON by default; Protobuf via env)
12+
- A **format-tolerant** `consumer.py` supporting JSON, Protobuf & plain text
1313
- Partition-awareness, message keying, and consumer group support
1414
- Integration testing via `Testcontainers`
1515
- GitHub Actions & Codecov integration
@@ -18,7 +18,7 @@ It’s designed to simulate a polyglot messaging environment, where different sy
1818

1919
---
2020

21-
## ⚙️ Requirements
21+
## Requirements
2222

2323
- Python 3.10+
2424
- Docker & Docker Compose (for container-based setup) - more details could be found in the [separate section](documentation/docker.md).
@@ -32,7 +32,7 @@ pip install -r requirements.txt
3232

3333
---
3434

35-
## 🚀 Running the Demo
35+
## Running the Demo
3636

3737
### 1. Start Apache Kafka
3838

@@ -46,27 +46,49 @@ docker run -d --name kafka-391 \
4646

4747
More information about helper scripts Kafka provides could be found in the [separate section](documentation/kafka.md).
4848

49-
### 🟢 Run Producer
49+
### Run Producer
5050

5151
```bash
5252
python producer.py
5353
```
5454

55-
Produces random JSON events (`note_created`, `note_updated`, `note_deleted`) with message keys for partitioning.
55+
Produces random events (`note_created`, `note_updated`, `note_deleted`) with message keys for partitioning.
5656

57-
### 🔵 Run Consumer
57+
Formats:
58+
- JSON (default)
59+
- Protobuf (enable via `MESSAGE_FORMAT=protobuf`)
60+
61+
Examples:
62+
63+
```bash
64+
# JSON (default)
65+
python producer.py
66+
67+
# Protobuf
68+
MESSAGE_FORMAT=protobuf python producer.py
69+
```
70+
71+
### Run Consumer
5872

5973
```bash
6074
python consumer.py # All events
6175
python consumer.py --event-type X # Filtered by event_type
6276
python consumer.py --group-id my-group
6377
```
6478

65-
Displays event type, partition, and offset info.
79+
Displays event type, partition, and offset info. The consumer detects the payload format using the Kafka `content-type` header sent by the producer and falls back to JSON-or-plain-text when the header is missing.
80+
81+
Log annotation:
82+
- Each consumed message line includes a suffix indicating the wire format detected: `[wire=protobuf]`, `[wire=json]`, `[wire=text]`, or `[wire=unknown]`.
83+
- Example:
84+
- `✅ JSON (note_created) | key=note_created | partition=0 | offset=1 → {...} [wire=protobuf]`
85+
- `📦 Plain | key=plain | partition=0 | offset=42 → hello [wire=text]`
86+
87+
Note: The `✅ JSON (...)` prefix reflects that the payload was parsed into a dict (even for Protobuf). The `[wire=...]` suffix shows the on-wire format.
6688

6789
---
6890

69-
### 🔑 Message Keys and Partitions
91+
### Message Keys and Partitions
7092

7193
The producer now uses the message's `event_type` as the Kafka **key**, which ensures that:
7294

@@ -99,7 +121,7 @@ To see all partitions used, try increasing the number of unique keys or remove t
99121

100122
---
101123

102-
### 👥 Consumer Groups and Partition Rebalancing
124+
### Consumer Groups and Partition Rebalancing
103125

104126
Kafka uses consumer groups to distribute the workload of processing messages across multiple consumers.
105127

@@ -123,7 +145,7 @@ Note: If you have more partitions than consumers, some consumers may receive mul
123145

124146
---
125147

126-
## 🧪 Running Tests & Coverage
148+
## Running Tests & Coverage
127149

128150
Details could be found in the [separate section](documentation/tests.md).
129151

@@ -136,7 +158,7 @@ scripts/compose_smoke_test.sh --dry-run
136158

137159
---
138160

139-
## 📂 Project Structure
161+
## Project Structure
140162

141163
```
142164
kafka-python-demo/
@@ -155,7 +177,7 @@ kafka-python-demo/
155177

156178
---
157179

158-
## 📌 Notes
180+
## Notes
159181

160182
- Topic name is configurable via environment variable `KAFKA_TOPIC` (default: `notes-topic`).
161183
- You can edit the scripts to change topic names or message structures.
@@ -165,6 +187,7 @@ kafka-python-demo/
165187

166188
- `KAFKA_BOOTSTRAP_SERVERS` — Kafka broker(s), default: `localhost:9092`
167189
- `KAFKA_TOPIC` — topic to produce/consume, default: `notes-topic`
190+
- `MESSAGE_FORMAT` — producer payload format: `json` (default) or `protobuf`
168191

169192
Examples:
170193

@@ -176,11 +199,14 @@ python consumer.py
176199

177200
# Using Docker Compose (host env is picked up by compose)
178201
KAFKA_TOPIC=my-topic docker compose up -d
202+
203+
# Switch producer to Protobuf in Docker Compose
204+
MESSAGE_FORMAT=protobuf docker compose up -d
179205
```
180206

181207
---
182208

183-
## 🔗 Links
209+
## Links
184210

185211
- [Status of Python versions](https://devguide.python.org/versions/)
186212
- [Apache Kafka (Official)](https://kafka.apache.org/)
@@ -192,15 +218,15 @@ KAFKA_TOPIC=my-topic docker compose up -d
192218

193219
---
194220

195-
## 🧾 About TemplateTasks
221+
## About TemplateTasks
196222

197223
TemplateTasks is a personal software development initiative by Vadim Starichkov, focused on sharing open-source libraries, services, and technical demos.
198224

199225
It operates independently and outside the scope of any employment.
200226

201227
All code is released under permissive open-source licenses. The legal structure may evolve as the project grows.
202228

203-
## 📜 License & Attribution
229+
## License & Attribution
204230

205231
This project is licensed under the **MIT License** - see the [LICENSE](https://github.com/starichkov/kafka-python-demo/blob/main/LICENSE.md) file for details.
206232

consumer.py

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from kafka import KafkaConsumer
1010
from logger import get_logger
1111
import argparse
12-
from serialization import DESERIALIZERS
12+
from serialization import DESERIALIZERS, deserializer_for_mime
1313

1414

1515
def build_parser():
@@ -72,22 +72,67 @@ def consume_events(topic, consumer_args, event_type=None, group_id=None):
7272
try:
7373
# Continuously poll for new messages
7474
for message in consumer:
75-
# Try to parse the message as JSON, fall back to plain text if not valid JSON
76-
parsed = try_parse_json(message.value)
75+
# Choose deserializer by content-type header if present; fallback to JSON-or-text
76+
mime = None
77+
try:
78+
# headers: list[tuple[str, bytes]] (kafka-python)
79+
if getattr(message, "headers", None):
80+
for k, v in message.headers:
81+
if (k == "content-type") or (isinstance(k, bytes) and k.decode("ascii", "ignore") == "content-type"):
82+
if isinstance(v, (bytes, bytearray)):
83+
mime = v.decode("ascii", errors="ignore")
84+
else:
85+
mime = str(v)
86+
break
87+
except Exception: # pragma: no cover
88+
mime = None
89+
90+
try:
91+
parser = deserializer_for_mime(mime)
92+
except Exception: # pragma: no cover
93+
parser = DESERIALIZERS["json_or_text"]
94+
95+
# Determine wire format hint for logging
96+
wire_hint = None
97+
if mime:
98+
mime_l = mime.lower()
99+
if "json" in mime_l:
100+
wire_hint = "json"
101+
elif "protobuf" in mime_l:
102+
wire_hint = "protobuf"
103+
elif "text" in mime_l:
104+
wire_hint = "text"
105+
else:
106+
wire_hint = "unknown"
107+
108+
# Parse the message value
109+
try:
110+
parsed = parser(message.value)
111+
except Exception:
112+
# As a last resort, fall back to text
113+
parsed = DESERIALIZERS["plain_text"](message.value)
114+
115+
# If there was no content-type header, infer from parse result
116+
if not mime:
117+
if isinstance(parsed, dict):
118+
wire_hint = "json"
119+
else:
120+
wire_hint = "text"
77121

78122
# Decode key if available
79123
key = message.key.decode('utf-8') if message.key else None
80124
partition = message.partition
81125
offset = message.offset
82126

83127
# Display the message with an appropriate prefix based on its type
128+
suffix = f" [wire={wire_hint}]" if wire_hint else ""
84129
if isinstance(parsed, dict):
85130
if event_type and parsed.get("event_type") != event_type:
86131
continue # Skip non-matching event
87132
logger.info(
88-
f"✅ JSON ({parsed['event_type']}) | key={key} | partition={partition} | offset={offset}{parsed}")
133+
f"✅ JSON ({parsed['event_type']}) | key={key} | partition={partition} | offset={offset}{parsed}{suffix}")
89134
else:
90-
logger.info(f"📦 Plain | key={key} | partition={partition} | offset={offset}{parsed}")
135+
logger.info(f"📦 Plain | key={key} | partition={partition} | offset={offset}{parsed}{suffix}")
91136
except KeyboardInterrupt:
92137
# Handle graceful shutdown on Ctrl+C
93138
logger.info("\nShutting down gracefully...")

docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ services:
4747
environment:
4848
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
4949
KAFKA_TOPIC: ${KAFKA_TOPIC:-notes-topic}
50+
MESSAGE_FORMAT: ${MESSAGE_FORMAT:-json}
5051
restart: on-failure
5152

5253
consumer:

0 commit comments

Comments
 (0)