Skip to content

codezelaca/de-kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Kafka Clickstream Streaming Pipeline

An end-to-end local data engineering project that simulates website clickstream traffic, publishes events to Apache Kafka, validates and processes them in near real-time, routes invalid events to a dead-letter queue (DLQ), and writes rolling summary metrics to CSV.

This project is designed for learning and portfolio use, and demonstrates core streaming concepts:

  • Event generation and producer patterns
  • Kafka topic setup and partitioning
  • Consumer-side validation and DLQ handling
  • Manual offset commit after processing
  • Periodic aggregation and sink output

Architecture

Data flow:

click_generator.py -> producer.py -> Kafka topic website_clicks -> consumer.py ->

  • valid events -> rolling metrics -> data/processed_click_summary.csv
  • invalid events -> Kafka topic website_clicks_dlq

Tech Stack

  • Python
  • Apache Kafka (running in local KRaft mode via Docker Compose)
  • confluent-kafka Python client
  • python-dotenv for environment-based configuration

Repository Structure

.
|-- docker-compose.yml
|-- requirements.txt
|-- README.md
|-- data/
|   `-- processed_click_summary.csv
`-- src/
	|-- __init__.py
	|-- click_generator.py
	|-- config.py
	|-- consumer.py
	|-- producer.py
	|-- topic_setup.py
	`-- utils.py

Prerequisites

  • Python 3.10+
  • Docker Desktop (or Docker Engine + Compose)

Quick Start

1. Install Python dependencies

pip install -r requirements.txt

Why: installs Kafka client and environment loading packages needed by the Python applications.

2. Start Kafka locally

docker compose up -d

Why: starts a single-node Kafka broker (KRaft mode) defined in docker-compose.yml.

3. Create topics

python -m src.topic_setup

Creates topics (if missing):

  • website_clicks
  • website_clicks_dlq

4. Start the consumer (Terminal 1)

python -m src.consumer

Consumer responsibilities:

  • Reads from website_clicks
  • Validates each event
  • Routes invalid events to website_clicks_dlq
  • Maintains running counters
  • Prints a snapshot every 10 seconds
  • Writes snapshots to data/processed_click_summary.csv
  • Commits offsets only after processing

5. Start the producer (Terminal 2)

python -m src.producer

Producer responsibilities:

  • Generates click events continuously
  • Injects a small percentage of intentionally invalid events
  • Serializes event payloads to JSON
  • Uses user_id as message key
  • Publishes to website_clicks

Configuration

Configuration is centralized in src/config.py and loaded from environment variables.

Supported variables and defaults:

Variable Default Description
BOOTSTRAP_SERVERS localhost:9092 Kafka broker address
CLICK_TOPIC website_clicks Main topic for click events
DLQ_TOPIC website_clicks_dlq Topic for invalid events
CONSUMER_GROUP clickstream-consumer-group Consumer group id
PARTITIONS 3 Partition count when creating topics
REPLICATION_FACTOR 1 Replication factor when creating topics
PRODUCER_DELAY_SECONDS 0.4 Delay between produced events

You can set these in your shell or in a local .env file at project root.

Event Contract

Generated click events include:

  • event_id
  • event_type
  • user_id
  • session_id
  • page_url
  • referrer
  • device_type
  • country
  • timestamp

Validation rules in src/utils.py:

  • Required fields must exist
  • event_type must be click
  • device_type must be one of: mobile, desktop, tablet
  • page_url must start with /

Invalid events are sent to the DLQ with:

  • validation failure reason
  • original event payload
  • failure timestamp

Output

The consumer writes summary snapshots to data/processed_click_summary.csv.

CSV columns:

  • snapshot_time
  • total_clicks
  • unique_users
  • top_page
  • top_page_clicks

Kafka Runtime Notes

  • Consumer uses auto.offset.reset=earliest
  • Auto-commit is disabled
  • Offsets are committed synchronously after each processed message

These choices make processing behavior explicit and easier to reason about for learning and debugging.

Stop and Cleanup

Stop Python apps with Ctrl+C in each terminal.

Stop Kafka:

docker compose down

Optionally remove broker volumes/state (if you add volumes in future):

docker compose down -v

Troubleshooting

Cannot connect to Kafka (Connection refused)

  • Confirm Docker is running.
  • Check broker container status: docker ps
  • Verify port 9092 is not blocked by another process.

Topic creation fails

  • Ensure Kafka is healthy before running topic setup.
  • Re-run: python -m src.topic_setup

Consumer not receiving data

  • Confirm producer is running.
  • Confirm both apps point to same BOOTSTRAP_SERVERS and topic names.

CSV file not updating

  • The consumer writes snapshots every 10 seconds.
  • Ensure valid events are being produced (not all routed to DLQ).

Learning Extensions

Possible next upgrades:

  • Add schema enforcement (Avro/JSON Schema/Protobuf)
  • Add unit/integration tests for generator, validation, and consumer flow
  • Expose metrics with Prometheus
  • Persist output to a database or object store instead of CSV
  • Add dashboards for clickstream analytics

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages