Skip to content

Commit 3a78848

Browse files
committed
Fix RabbitMQ connector: TLS support, rstream API update, AMQP message framing
- Add TLS parameters to DataStorage for RabbitMQ connections - Update rstream Producer API (Producer() + start() instead of Producer.create()) - Send AMQP 1.0 framed messages from producer for compatibility with Rust consumer - Add PYTHONUNBUFFERED to producer Dockerfile - Remap postgres port to avoid conflicts
1 parent b19407e commit 3a78848

File tree

11 files changed

+243
-156
lines changed

11 files changed

+243
-156
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
66
## [Unreleased]
77

88
### Added
9+
- `pw.io.rabbitmq` connector for reading from and writing to RabbitMQ Streams. Supports JSON, plaintext, and raw formats with TLS/mutual TLS configuration. Includes `pw.io.rabbitmq.simple_read` for quick-start usage.
910
- `pw.io.postgres.read` connector, which reads data from a PostgreSQL table directly by parsing the Write-Ahead Log (WAL).
1011
- `pw.io.postgres.write` and `pw.io.postgres.read` now support serialization/deserialization of `np.ndarray` (`int`/`float` elements), homogeneous `tuple` and `list` (via Postgres `ARRAY`; multidimensional rectangular arrays supported).
1112

13+
### Removed
14+
- `pw.io.mssql` connector has been removed.
15+
1216
### Changed
1317
- **BREAKING**: The dependencies for `pw.io.pyfilesystem.read` are no longer included in the default package installation. To install them, please use `pip install pathway[pyfilesystem]`.
1418
- Asynchronous callback for `pw.io.python.write` is now available as `pw.io.OnChangeCallbackAsync`.

examples/projects/rabbitmq-ETL/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ services:
2727
POSTGRES_PASSWORD: pathway
2828
POSTGRES_DB: etl_db
2929
ports:
30-
- 5432:5432
30+
- 5433:5432
3131
volumes:
3232
- ./sql/init-db.sql:/docker-entrypoint-initdb.d/init-db.sql
3333
healthcheck:

examples/projects/rabbitmq-ETL/pathway-src/etl.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717
# To use advanced features with Pathway Scale, get your free license key from
1818
# https://pathway.com/features and paste it below.
1919
# To use Pathway Community, comment out the line below.
20-
pw.set_license_key("demo-license-key-with-telemetry")
20+
# pw.set_license_key("demo-license-key-with-telemetry")
2121

22-
RABBITMQ_URI = "rabbitmq-stream://guest:guest@rabbitmq:5552"
22+
RABBITMQ_URI = "rabbitmq-stream://guest:guest@0.0.0.0:5552"
2323

2424
postgres_settings = {
25-
"host": "postgres",
26-
"port": "5432",
25+
"host": "0.0.0.0",
26+
"port": "5433",
2727
"dbname": "etl_db",
2828
"user": "pathway",
2929
"password": "pathway",

examples/projects/rabbitmq-ETL/producer-src/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
FROM python:3.10
22

3+
ENV PYTHONUNBUFFERED=1
34
RUN pip install rstream
45
COPY ./producer-src/create-streams.py create-streams.py
56

examples/projects/rabbitmq-ETL/producer-src/create-streams.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -170,15 +170,16 @@
170170

171171

172172
async def main():
173-
from rstream import Producer
173+
from rstream import AMQPMessage, Producer
174174

175175
print("Connecting to RabbitMQ Streams...")
176-
producer = await Producer.create(
176+
producer = Producer(
177177
host=RABBITMQ_HOST,
178178
port=RABBITMQ_PORT,
179179
username=RABBITMQ_USER,
180180
password=RABBITMQ_PASSWORD,
181181
)
182+
await producer.start()
182183

183184
# Create all streams
184185
for stream in STREAMS:
@@ -191,21 +192,21 @@ async def main():
191192
# Publish initial employees
192193
print("Publishing employee records...")
193194
for emp in EMPLOYEES:
194-
await producer.send("employees_raw", json.dumps(emp).encode())
195+
await producer.send("employees_raw", AMQPMessage(body=json.dumps(emp).encode()))
195196
await asyncio.sleep(0.3)
196197
print(f"Published {len(EMPLOYEES)} employees")
197198

198199
# Publish initial orders
199200
print("Publishing order records...")
200201
for order in ORDERS:
201-
await producer.send("orders_raw", json.dumps(order).encode())
202+
await producer.send("orders_raw", AMQPMessage(body=json.dumps(order).encode()))
202203
await asyncio.sleep(0.2)
203204
print(f"Published {len(ORDERS)} orders")
204205

205206
# Publish initial listings
206207
print("Publishing listing records...")
207208
for listing in LISTINGS:
208-
await producer.send("listings_raw", json.dumps(listing).encode())
209+
await producer.send("listings_raw", AMQPMessage(body=json.dumps(listing).encode()))
209210
await asyncio.sleep(0.2)
210211
print(f"Published {len(LISTINGS)} listings")
211212

@@ -229,7 +230,7 @@ async def main():
229230
"product": random.choice(PRODUCTS),
230231
"quantity": random.randint(1, 10),
231232
}
232-
await producer.send("orders_raw", json.dumps(new_order).encode())
233+
await producer.send("orders_raw", AMQPMessage(body=json.dumps(new_order).encode()))
233234
print(f"Published new order: {order_id}")
234235

235236
await asyncio.sleep(3)
@@ -245,7 +246,7 @@ async def main():
245246
"posted_date": "2024-12-10",
246247
"agent_employee_id": random.choice(agent_ids),
247248
}
248-
await producer.send("listings_raw", json.dumps(new_listing).encode())
249+
await producer.send("listings_raw", AMQPMessage(body=json.dumps(new_listing).encode()))
249250
print(f"Published new listing: {listing_id}")
250251

251252

integration_tests/db_connectors/conftest.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
DebeziumContext,
44
DynamoDBContext,
55
MongoDBContext,
6-
MssqlContext,
76
MySQLContext,
87
PgvectorContext,
98
PostgresContext,
@@ -50,8 +49,3 @@ def dynamodb():
5049
@pytest.fixture
5150
def mysql():
5251
return MySQLContext()
53-
54-
55-
@pytest.fixture
56-
def mssql():
57-
return MssqlContext()

integration_tests/db_connectors/utils.py

Lines changed: 0 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -80,19 +80,6 @@
8080
+ f"@{MYSQL_DB_HOST}:{MYSQL_DB_PORT}/{MYSQL_DB_NAME}"
8181
)
8282

83-
MSSQL_DB_HOST = "mssql"
84-
MSSQL_DB_PORT = 1433
85-
MSSQL_DB_NAME = "testdb"
86-
MSSQL_DB_USER = "sa"
87-
MSSQL_DB_PASSWORD = "YourStrong!Passw0rd"
88-
MSSQL_CONNECTION_STRING = (
89-
f"Server=tcp:{MSSQL_DB_HOST},{MSSQL_DB_PORT};"
90-
f"Database={MSSQL_DB_NAME};"
91-
f"User Id={MSSQL_DB_USER};"
92-
f"Password={MSSQL_DB_PASSWORD};"
93-
"TrustServerCertificate=true"
94-
)
95-
9683

9784
def is_mysql_reachable():
9885
try:
@@ -110,22 +97,6 @@ def is_mysql_reachable():
11097
return True
11198

11299

113-
def is_mssql_reachable():
114-
try:
115-
import pymssql
116-
117-
pymssql.connect(
118-
server=MSSQL_DB_HOST,
119-
port=MSSQL_DB_PORT,
120-
user=MSSQL_DB_USER,
121-
password=MSSQL_DB_PASSWORD,
122-
database=MSSQL_DB_NAME,
123-
)
124-
except Exception:
125-
return False
126-
127-
return True
128-
129100

130101
@dataclass(frozen=True)
131102
class ColumnProperties:
@@ -550,109 +521,6 @@ def random_table_name(self) -> str:
550521
return f"mysql_{uuid.uuid4().hex}"
551522

552523

553-
class MssqlContext:
554-
def __init__(self):
555-
import pymssql
556-
557-
self.connection = pymssql.connect(
558-
server=MSSQL_DB_HOST,
559-
port=MSSQL_DB_PORT,
560-
user=MSSQL_DB_USER,
561-
password=MSSQL_DB_PASSWORD,
562-
database=MSSQL_DB_NAME,
563-
autocommit=True,
564-
)
565-
self.cursor = self.connection.cursor()
566-
567-
def random_table_name(self) -> str:
568-
return f"mssql_{uuid.uuid4().hex}"
569-
570-
def execute_sql(self, query: str):
571-
self.cursor.execute(query)
572-
573-
def insert_row(
574-
self, table_name: str, values: dict[str, Union[int, bool, str, float]]
575-
) -> None:
576-
field_names = list(values.keys())
577-
placeholders = ", ".join(["%s"] * len(values))
578-
query = f"INSERT INTO {table_name} ({','.join(field_names)}) VALUES ({placeholders})"
579-
print(f"Inserting a row: {query}")
580-
self.cursor.execute(query, tuple(values.values()))
581-
582-
def create_table(self, schema: type[pw.Schema], *, add_special_fields: bool) -> str:
583-
table_name = self.random_table_name()
584-
585-
primary_key_found = False
586-
fields = []
587-
for field_name, field_schema in schema.columns().items():
588-
parts = [f"[{field_name}]"]
589-
field_type = field_schema.dtype
590-
if field_type == dtype.STR:
591-
parts.append("NVARCHAR(MAX)")
592-
elif field_type == dtype.INT:
593-
parts.append("BIGINT")
594-
elif field_type == dtype.FLOAT:
595-
parts.append("FLOAT")
596-
elif field_type == dtype.BOOL:
597-
parts.append("BIT")
598-
else:
599-
raise RuntimeError(f"Unsupported field type {field_type}")
600-
if field_schema.primary_key:
601-
if primary_key_found:
602-
raise AssertionError("Only single primary key supported")
603-
primary_key_found = True
604-
parts.append("PRIMARY KEY NOT NULL")
605-
fields.append(" ".join(parts))
606-
607-
if add_special_fields:
608-
fields.append("[time] BIGINT NOT NULL")
609-
fields.append("[diff] BIGINT NOT NULL")
610-
611-
create_sql = (
612-
f"IF OBJECT_ID(N'{table_name}', N'U') IS NULL "
613-
f"CREATE TABLE {table_name} ({','.join(fields)})"
614-
)
615-
self.cursor.execute(create_sql)
616-
return table_name
617-
618-
def get_table_contents(
619-
self,
620-
table_name: str,
621-
column_names: list[str],
622-
sort_by: Union[str, tuple, None] = None,
623-
) -> list[dict[str, Union[str, int, bool, float]]]:
624-
select_query = f"SELECT {','.join(column_names)} FROM {table_name};"
625-
self.cursor.execute(select_query)
626-
rows = self.cursor.fetchall()
627-
result = []
628-
for row in rows:
629-
row_map = dict(zip(column_names, row))
630-
result.append(row_map)
631-
if sort_by is not None:
632-
if isinstance(sort_by, tuple):
633-
result.sort(key=lambda item: tuple(item[key] for key in sort_by))
634-
else:
635-
result.sort(key=lambda item: item[sort_by])
636-
return result
637-
638-
def get_table_schema(self, table_name: str) -> dict[str, ColumnProperties]:
639-
query = """
640-
SELECT column_name, data_type, is_nullable
641-
FROM information_schema.columns
642-
WHERE table_name = %s AND table_schema = 'dbo'
643-
ORDER BY ordinal_position;
644-
"""
645-
self.cursor.execute(query, (table_name,))
646-
rows = self.cursor.fetchall()
647-
648-
schema_props = {}
649-
for column_name, type_name, is_nullable in rows:
650-
schema_props[column_name] = ColumnProperties(
651-
type_name.lower(), is_nullable.upper() == "YES"
652-
)
653-
return schema_props
654-
655-
656524
class EntryCountChecker:
657525

658526
def __init__(

python/pathway/engine.pyi

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,10 @@ class DataStorage:
930930
ssl_cert_path: str | None = None,
931931
psql_replication: PsqlReplicationSettings | None = None,
932932
schema_name: str | None = None,
933+
rabbitmq_tls_root_certificates: str | None = None,
934+
rabbitmq_tls_client_cert: str | None = None,
935+
rabbitmq_tls_client_key: str | None = None,
936+
rabbitmq_tls_trust_certificates: bool = False,
933937
) -> None: ...
934938
def delta_s3_storage_options(self, *args, **kwargs): ...
935939

0 commit comments

Comments
 (0)