Skip to content

Commit b19407e

Browse files
committed
Add RabbitMQ connector and remove MSSQL connector
Replace MSSQL connector with RabbitMQ stream connector supporting both input and output operations. Includes Rust connector implementation, Python bindings, integration tests, and an ETL example project.
1 parent f18c6e3 commit b19407e

File tree

24 files changed

+2261
-1582
lines changed

24 files changed

+2261
-1582
lines changed

Cargo.lock

Lines changed: 316 additions & 134 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ pyo3 = { version = "0.25.0", features = ["abi3-py310", "multiple-pymethods"] }
8383
pyo3-async-runtimes = "0.25.0"
8484
pyo3-log = "0.12.4"
8585
qdrant-client = "1.15.0"
86+
rabbitmq-stream-client = "0.11.0"
8687
questdb-rs = "4.0.5"
8788
rand = "0.9.1"
8889
rayon = "1.10.0"
@@ -104,7 +105,6 @@ sysinfo = "0.35.1"
104105
tantivy = "0.22.1" # Note: don't bump this dependency before the RAG integration test failure is investigated
105106
tempfile = "3.20.0"
106107
thiserror = "1.0.63"
107-
tiberius = { version = "0.12", default-features = false, features = ["tds73", "winauth", "rustls"] }
108108
timely = { path = "./external/timely-dataflow/timely", features = ["bincode"] }
109109
tokio = { version = "1.45.1", features = ["rt-multi-thread"] }
110110
tokio-util = { version = "0.7", features = ["compat"] }
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
build:
2+
docker compose up -d
3+
4+
stop:
5+
docker compose down -v --remove-orphans
6+
docker rmi etl-rabbitmq-pathway:latest
7+
docker rmi etl-rabbitmq-stream-producer:latest
8+
9+
connect:
10+
docker compose exec -it pathway bash
11+
connect-prod:
12+
docker compose exec -it stream-producer bash
13+
connect-rabbitmq:
14+
docker compose exec -it rabbitmq bash
15+
psql:
16+
docker compose exec -it postgres psql -U pathway -d etl_db
17+
18+
logs:
19+
docker compose logs pathway
20+
logs-prod:
21+
docker compose logs stream-producer
22+
logs-rabbitmq:
23+
docker compose logs rabbitmq
24+
logs-postgres:
25+
docker compose logs postgres
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# ETL with RabbitMQ Streams in / PostgreSQL out
2+
3+
A streaming ETL pipeline built with Pathway that reads from RabbitMQ Streams,
4+
validates, transforms, aggregates, and joins data, then writes enriched results
5+
to PostgreSQL in real time.
6+
7+
## What Pathway replaces
8+
9+
This example is inspired by three ETL architectures that traditionally require
10+
many moving parts:
11+
12+
| Traditional approach | What Pathway replaces |
13+
|---|---|
14+
| Python/Java workers consuming from queues | `pw.io.rabbitmq.read` |
15+
| Staging databases for validation | UDF-based filtering |
16+
| Airflow DAGs for orchestration | Reactive streaming engine |
17+
| Celery task queues for processing | Single streaming process |
18+
| Separate aggregation jobs | `groupby().reduce()` |
19+
| Multi-step load pipelines | `pw.io.postgres.write` |
20+
21+
**Result:** One Pathway process replaces 5+ services.
22+
23+
## Architecture
24+
25+
```
26+
Producer (rstream) Pathway (single process) PostgreSQL
27+
+----------------+ +------------------------------+ +------------------+
28+
| employees_raw |--->| pw.io.rabbitmq.read | | |
29+
| orders_raw |--->| -> validate (UDFs) |--->| enriched_employees|
30+
| listings_raw |--->| -> transform | | enriched_listings |
31+
+----------------+ | -> aggregate (groupby) | | (snapshot mode) |
32+
| -> join (left joins) | +------------------+
33+
| pw.io.postgres.write |
34+
+------------------------------+
35+
```
36+
37+
## Data sources
38+
39+
- **Employees** (10 records): HR data with intentional quality issues (empty names,
40+
invalid emails, negative salaries) to demonstrate validation
41+
- **Orders** (15+ records): Purchase orders linked to employees by `employee_id`
42+
- **Listings** (10+ records): Rental property listings linked to employees via
43+
`agent_employee_id` (inspired by rental platform crawlers)
44+
45+
## ETL pipeline
46+
47+
1. **Read** three RabbitMQ streams via `pw.io.rabbitmq.read`
48+
2. **Validate** using UDFs: email format, phone format, positive salary/price, non-empty fields
49+
3. **Transform**: normalize emails/departments to lowercase, parse string amounts to float,
50+
concatenate names, derive boolean flags
51+
4. **Aggregate**: order stats per employee (`total_orders`, `total_spent`),
52+
listing stats per agent (`total_listings`, `avg_listing_price`)
53+
5. **Join**: left-join employees with order stats and listing stats;
54+
left-join listings with agent names
55+
6. **Write** to PostgreSQL snapshot tables (auto-updated as new data arrives)
56+
57+
## Launching
58+
59+
```bash
60+
docker compose up -d
61+
# or
62+
make
63+
```
64+
65+
## Checking results
66+
67+
Connect to PostgreSQL:
68+
69+
```bash
70+
make psql
71+
```
72+
73+
Then query the enriched tables:
74+
75+
```sql
76+
SELECT * FROM enriched_employees ORDER BY employee_id;
77+
SELECT * FROM enriched_listings ORDER BY listing_id;
78+
```
79+
80+
As the producer continues sending new orders and listings, re-running these queries
81+
will show updated aggregates in real time.
82+
83+
## Monitoring
84+
85+
- **Pathway logs**: `make logs`
86+
- **Producer logs**: `make logs-prod`
87+
- **RabbitMQ Management UI**: http://localhost:15672 (guest/guest)
88+
89+
## Stopping
90+
91+
```bash
92+
make stop
93+
```
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
version: "3.7"
2+
name: etl-rabbitmq
3+
networks:
4+
etl-rabbitmq-network:
5+
driver: bridge
6+
services:
7+
rabbitmq:
8+
image: rabbitmq:3.13-management
9+
environment:
10+
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: "-rabbitmq_stream advertised_host rabbitmq -rabbitmq_stream advertised_port 5552"
11+
ports:
12+
- 5552:5552
13+
- 5672:5672
14+
- 15672:15672
15+
command: bash -c "rabbitmq-plugins enable --offline rabbitmq_stream && rabbitmq-server"
16+
healthcheck:
17+
test: rabbitmq-diagnostics -q ping
18+
interval: 10s
19+
timeout: 5s
20+
retries: 5
21+
networks:
22+
- etl-rabbitmq-network
23+
postgres:
24+
image: postgres:16
25+
environment:
26+
POSTGRES_USER: pathway
27+
POSTGRES_PASSWORD: pathway
28+
POSTGRES_DB: etl_db
29+
ports:
30+
- 5432:5432
31+
volumes:
32+
- ./sql/init-db.sql:/docker-entrypoint-initdb.d/init-db.sql
33+
healthcheck:
34+
test: ["CMD-SHELL", "pg_isready -U pathway"]
35+
interval: 5s
36+
timeout: 5s
37+
retries: 5
38+
networks:
39+
- etl-rabbitmq-network
40+
stream-producer:
41+
build:
42+
context: .
43+
dockerfile: ./producer-src/Dockerfile
44+
depends_on:
45+
rabbitmq:
46+
condition: service_healthy
47+
networks:
48+
- etl-rabbitmq-network
49+
pathway:
50+
build:
51+
context: .
52+
dockerfile: ./pathway-src/Dockerfile
53+
depends_on:
54+
rabbitmq:
55+
condition: service_healthy
56+
postgres:
57+
condition: service_healthy
58+
networks:
59+
- etl-rabbitmq-network
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
FROM python:3.10
2+
3+
RUN pip install -U pathway
4+
COPY ./pathway-src/etl.py etl.py
5+
6+
CMD ["python", "etl.py"]

0 commit comments

Comments
 (0)