Skip to content

Commit 03cf1b3

Browse files
zxqfd555Manul from Pathway
authored andcommitted
fail fast in pw.io.postgres.read if the publication doesn't exist (#9955)
GitOrigin-RevId: d923a7039c3f4993cc1d06bfae2566f79585f940
1 parent 9fd8e7c commit 03cf1b3

2 files changed

Lines changed: 46 additions & 0 deletions

File tree

integration_tests/db_connectors/test_postgres.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2115,3 +2115,34 @@ def stream_target():
21152115
assert VALID_STREAMING_ID in ids_out, "Valid streaming row must be present"
21162116
for oor_id in OOR_IDS:
21172117
assert oor_id not in ids_out, f"Out-of-range row {oor_id} must be skipped"
2118+
2119+
2120+
def test_no_publication(tmp_path, postgres):
2121+
class InputSchema(pw.Schema):
2122+
value: str
2123+
2124+
output_path = tmp_path / "output.jsonl"
2125+
table_name = postgres.random_table_name()
2126+
2127+
postgres.execute_sql(
2128+
f"""
2129+
CREATE TABLE {table_name} (
2130+
value TEXT PRIMARY KEY
2131+
);
2132+
"""
2133+
)
2134+
2135+
postgres.execute_sql(f"INSERT INTO {table_name} (value) VALUES ('hello');")
2136+
table = pw.io.postgres.read(
2137+
postgres_settings=POSTGRES_SETTINGS,
2138+
table_name=table_name,
2139+
schema=InputSchema,
2140+
mode="streaming",
2141+
publication_name=f"{table_name}_pub",
2142+
autocommit_duration_ms=10,
2143+
)
2144+
pw.io.jsonlines.write(table, output_path)
2145+
with pytest.raises(
2146+
RuntimeError, match=f"Publication '{table_name}_pub' does not exist"
2147+
):
2148+
pw.run()

src/connectors/postgres.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,6 +1124,9 @@ pub enum ReplicationError {
11241124
#[error(transparent)]
11251125
WalReader(#[from] pg_walstream::error::ReplicationError),
11261126

1127+
#[error("Publication '{name}' does not exist")]
1128+
PublicationNotFound { name: String },
1129+
11271130
#[error(transparent)]
11281131
Query(#[from] postgres::Error),
11291132

@@ -2079,6 +2082,18 @@ impl PsqlReader {
20792082
Self::validate_table_schema(&mut client, &table_ctx)?;
20802083

20812084
let wal_reader = if let Some(replication_settings) = replication_settings {
2085+
let rows = client
2086+
.query(
2087+
"SELECT 1 FROM pg_publication WHERE pubname = $1",
2088+
&[&replication_settings.publication_name],
2089+
)
2090+
.map_err(ReplicationError::Query)?;
2091+
if rows.is_empty() {
2092+
return Err(ReplicationError::PublicationNotFound {
2093+
name: replication_settings.publication_name.clone(),
2094+
}
2095+
.into());
2096+
}
20822097
Some(WalReader::new(replication_settings, table_ctx.clone())?)
20832098
} else {
20842099
None

0 commit comments

Comments
 (0)