Skip to content

Commit 1534e57

Browse files
zxqfd555Manul from Pathway
authored andcommitted
add a basic tls auth layer to the postgres connector (#9817)
GitOrigin-RevId: f723f2db2274a8841a62470e51b8d2ed5d190d0a
1 parent 0f94840 commit 1534e57

16 files changed

Lines changed: 942 additions & 400 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
99
- `pw.io.kafka.read` and `pw.io.kafka.write` connectors now support OAUTHBEARER authentication.
1010
- `pw.io.mongodb.write` connector now supports an `output_table_type` parameter with two modes: `stream_of_changes` (default) and `snapshot`. In `snapshot` mode, the connector maintains the current state of the Pathway table in MongoDB using the `_id` field as the primary key, while `stream_of_changes` preserves the existing behavior by writing all events with `time` and `diff` flags to reflect transactional minibatches and the nature of each change.
1111
- Workers can now automatically scale up or down based on pipeline load, using a configurable monitoring window. This feature requires persistence to be enabled and can be configured via `worker_scaling_enabled` and `workload_tracking_window_ms` in `pw.persistence.Config`. Please refer to the tutorial for more details.
12+
- `pw.io.postgres.write` now properly supports TLS configuration via `sslmode` and `sslrootcert` connection string parameters.
1213

1314
## [0.29.0] - 2026-01-22
1415

Cargo.lock

Lines changed: 80 additions & 27 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ log = { version = "0.4.27", features = ["std"] }
6363
lz4_flex = "0.11.5"
6464
mongodb = { version = "3.2.2", features = ["sync"] }
6565
mysql = "26.0.1"
66+
native-tls = "0.2.14"
6667
ndarray = { version = "0.15.6", features = ["serde"] }
6768
nix = { version = "0.30.1", features = ["fs", "user", "resource"] }
6869
num-integer = "0.1.46"
@@ -75,6 +76,7 @@ opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio", "rt-tokio-curr
7576
ordered-float = { version = "4.6.0", features = ["serde"] }
7677
pgvector = { version = "0.4.1", features = ["postgres", "halfvec"] }
7778
postgres = { version = "0.19.10", features = ["with-chrono-0_4", "with-serde_json-1"] }
79+
postgres-native-tls = "0.5.2"
7880
prometheus-client = "0.23.1"
7981
pyo3 = { version = "0.25.0", features = ["abi3-py310", "multiple-pymethods"] }
8082
pyo3-async-runtimes = "0.25.0"

integration_tests/db_connectors/conftest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
MySQLContext,
77
PgvectorContext,
88
PostgresContext,
9+
PostgresWithTlsContext,
910
QuestDBContext,
1011
)
1112

@@ -15,6 +16,11 @@ def postgres():
1516
return PostgresContext()
1617

1718

19+
@pytest.fixture
20+
def postgres_with_tls():
21+
return PostgresWithTlsContext()
22+
23+
1824
@pytest.fixture
1925
def pgvector():
2026
return PgvectorContext()

integration_tests/db_connectors/test_mysql.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,25 @@
44

55
import pandas as pd
66
import pytest
7-
from utils import MYSQL_CONNECTION_STRING, MYSQL_DB_NAME, SimpleObject
7+
from utils import (
8+
MYSQL_CONNECTION_STRING,
9+
MYSQL_DB_NAME,
10+
SimpleObject,
11+
is_mysql_reachable,
12+
)
813

914
import pathway as pw
1015
from pathway.internals import api
1116
from pathway.internals.parse_graph import G
1217

18+
# FIXME: the mysql Docker container used in the integration tests is unstable and
19+
# sometimes is unavailable, even though the healthcheck passes
20+
xfail_if_mysql_failed_to_start = pytest.mark.xfail(
21+
not is_mysql_reachable(), reason="mysql has failed to start"
22+
)
1323

24+
25+
@xfail_if_mysql_failed_to_start
1426
@pytest.mark.parametrize("output_table_type", ["stream_of_changes", "snapshot"])
1527
@pytest.mark.parametrize("init_mode", ["default", "create_if_not_exists", "replace"])
1628
def test_outputs(output_table_type, init_mode, mysql):
@@ -98,6 +110,7 @@ def run(offset: int):
98110
]
99111

100112

113+
@xfail_if_mysql_failed_to_start
101114
@pytest.mark.parametrize("are_types_optional", [False, True])
102115
@pytest.mark.parametrize("init_mode", ["create_if_not_exists", "replace"])
103116
def test_different_types_schema_and_serialization(init_mode, are_types_optional, mysql):
@@ -223,6 +236,7 @@ class InputSchema(pw.Schema): # type:ignore
223236
assert column_props.is_nullable == are_types_optional, column_name
224237

225238

239+
@xfail_if_mysql_failed_to_start
226240
def test_output_snapshot_overwrite_by_key(mysql):
227241
words = ["one", "two", "three", "one", "four", "two", "one", "one", "two", "four"]
228242

@@ -254,6 +268,7 @@ class InputSchema(pw.Schema):
254268
assert external_schema["count"].type_name == "bigint"
255269

256270

271+
@xfail_if_mysql_failed_to_start
257272
def test_mysql_overwrites_old_snapshot(mysql):
258273
table_name = mysql.random_table_name()
259274

@@ -312,6 +327,7 @@ def run(offset: int):
312327
]
313328

314329

330+
@xfail_if_mysql_failed_to_start
315331
def test_mysql_composite_snapshot_key(mysql):
316332
table_name = mysql.random_table_name()
317333

@@ -357,6 +373,7 @@ def run(offset: int):
357373
]
358374

359375

376+
@xfail_if_mysql_failed_to_start
360377
@pytest.mark.parametrize("malfomed_primary_key", [None, []])
361378
def test_mysql_no_snapshot_key(malfomed_primary_key, mysql):
362379
table_name = mysql.random_table_name()
@@ -390,6 +407,7 @@ class InputSchema(pw.Schema):
390407
pw.run()
391408

392409

410+
@xfail_if_mysql_failed_to_start
393411
def test_mysql_single_column_snapshot_mode(mysql):
394412
table_name = mysql.random_table_name()
395413

0 commit comments

Comments
 (0)