Skip to content

Commit 17fb0f6

Browse files
postgres: add subscription DDL and predefined-role grant/revoke parsing
1 parent bd7f70e commit 17fb0f6

File tree

5 files changed

+591
-7
lines changed

5 files changed

+591
-7
lines changed

src/ast/mod.rs

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2459,6 +2459,8 @@ pub enum CommentObject {
24592459
Schema,
24602460
/// A sequence.
24612461
Sequence,
2462+
/// A subscription.
2463+
Subscription,
24622464
/// A table.
24632465
Table,
24642466
/// A type.
@@ -2483,6 +2485,7 @@ impl fmt::Display for CommentObject {
24832485
CommentObject::Role => f.write_str("ROLE"),
24842486
CommentObject::Schema => f.write_str("SCHEMA"),
24852487
CommentObject::Sequence => f.write_str("SEQUENCE"),
2488+
CommentObject::Subscription => f.write_str("SUBSCRIPTION"),
24862489
CommentObject::Table => f.write_str("TABLE"),
24872490
CommentObject::Type => f.write_str("TYPE"),
24882491
CommentObject::User => f.write_str("USER"),
@@ -3682,6 +3685,12 @@ pub enum Statement {
36823685
/// A `CREATE SERVER` statement.
36833686
CreateServer(CreateServerStatement),
36843687
/// ```sql
3688+
/// CREATE SUBSCRIPTION
3689+
/// ```
3690+
///
3691+
/// Note: this is a PostgreSQL-specific statement.
3692+
CreateSubscription(CreateSubscription),
3693+
/// ```sql
36853694
/// CREATE POLICY
36863695
/// ```
36873696
/// See [PostgreSQL](https://www.postgresql.org/docs/current/sql-createpolicy.html)
@@ -3768,6 +3777,12 @@ pub enum Statement {
37683777
operation: AlterRoleOperation,
37693778
},
37703779
/// ```sql
3780+
/// ALTER SUBSCRIPTION
3781+
/// ```
3782+
///
3783+
/// Note: this is a PostgreSQL-specific statement.
3784+
AlterSubscription(AlterSubscription),
3785+
/// ```sql
37713786
/// ALTER POLICY <NAME> ON <TABLE NAME> [<OPERATION>]
37723787
/// ```
37733788
/// (Postgresql-specific)
@@ -5436,6 +5451,9 @@ impl fmt::Display for Statement {
54365451
Statement::CreateServer(stmt) => {
54375452
write!(f, "{stmt}")
54385453
}
5454+
Statement::CreateSubscription(stmt) => {
5455+
write!(f, "{stmt}")
5456+
}
54395457
Statement::CreatePolicy(policy) => write!(f, "{policy}"),
54405458
Statement::CreateConnector(create_connector) => create_connector.fmt(f),
54415459
Statement::CreateOperator(create_operator) => create_operator.fmt(f),
@@ -5475,6 +5493,9 @@ impl fmt::Display for Statement {
54755493
Statement::AlterRole { name, operation } => {
54765494
write!(f, "ALTER ROLE {name} {operation}")
54775495
}
5496+
Statement::AlterSubscription(alter_subscription) => {
5497+
write!(f, "{alter_subscription}")
5498+
}
54785499
Statement::AlterPolicy(alter_policy) => write!(f, "{alter_policy}"),
54795500
Statement::AlterConnector {
54805501
name,
@@ -6760,6 +6781,11 @@ pub enum Action {
67606781
BindServiceEndpoint,
67616782
/// Connect permission.
67626783
Connect,
6784+
/// Custom privilege name (primarily PostgreSQL).
6785+
Custom {
6786+
/// The custom privilege identifier.
6787+
name: Ident,
6788+
},
67636789
/// Create action, optionally specifying an object type.
67646790
Create {
67656791
/// Optional object type to create.
@@ -6874,6 +6900,7 @@ impl fmt::Display for Action {
68746900
Action::Audit => f.write_str("AUDIT")?,
68756901
Action::BindServiceEndpoint => f.write_str("BIND SERVICE ENDPOINT")?,
68766902
Action::Connect => f.write_str("CONNECT")?,
6903+
Action::Custom { name } => write!(f, "{name}")?,
68776904
Action::Create { obj_type } => {
68786905
f.write_str("CREATE")?;
68796906
if let Some(obj_type) = obj_type {
@@ -8246,6 +8273,8 @@ pub enum ObjectType {
82468273
Role,
82478274
/// A sequence.
82488275
Sequence,
8276+
/// A subscription.
8277+
Subscription,
82498278
/// A stage.
82508279
Stage,
82518280
/// A type definition.
@@ -8267,6 +8296,7 @@ impl fmt::Display for ObjectType {
82678296
ObjectType::Database => "DATABASE",
82688297
ObjectType::Role => "ROLE",
82698298
ObjectType::Sequence => "SEQUENCE",
8299+
ObjectType::Subscription => "SUBSCRIPTION",
82708300
ObjectType::Stage => "STAGE",
82718301
ObjectType::Type => "TYPE",
82728302
ObjectType::User => "USER",
@@ -8770,6 +8800,206 @@ impl fmt::Display for CreateServerOption {
87708800
}
87718801
}
87728802

8803+
/// A subscription option used by `CREATE/ALTER SUBSCRIPTION`.
8804+
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
8805+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
8806+
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
8807+
pub struct SubscriptionOption {
8808+
/// Subscription parameter name.
8809+
pub name: Ident,
8810+
/// Optional parameter value.
8811+
pub value: Option<Expr>,
8812+
}
8813+
8814+
impl fmt::Display for SubscriptionOption {
8815+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
8816+
if let Some(value) = &self.value {
8817+
write!(f, "{} = {}", self.name, value)
8818+
} else {
8819+
write!(f, "{}", self.name)
8820+
}
8821+
}
8822+
}
8823+
8824+
/// A `CREATE SUBSCRIPTION` statement.
8825+
///
8826+
/// [PostgreSQL Documentation](https://www.postgresql.org/docs/current/sql-createsubscription.html)
8827+
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
8828+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
8829+
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
8830+
pub struct CreateSubscription {
8831+
/// Subscription name.
8832+
pub name: ObjectName,
8833+
/// Connection string.
8834+
pub connection: String,
8835+
/// Publication names.
8836+
pub publications: Vec<Ident>,
8837+
/// Optional subscription parameters from `WITH (...)`.
8838+
pub with_options: Vec<SubscriptionOption>,
8839+
}
8840+
8841+
impl fmt::Display for CreateSubscription {
8842+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
8843+
write!(
8844+
f,
8845+
"CREATE SUBSCRIPTION {} CONNECTION '{}' PUBLICATION {}",
8846+
self.name,
8847+
value::escape_single_quote_string(&self.connection),
8848+
display_comma_separated(&self.publications)
8849+
)?;
8850+
8851+
if !self.with_options.is_empty() {
8852+
write!(f, " WITH ({})", display_comma_separated(&self.with_options))?;
8853+
}
8854+
8855+
Ok(())
8856+
}
8857+
}
8858+
8859+
/// An `ALTER SUBSCRIPTION` statement.
8860+
///
8861+
/// [PostgreSQL Documentation](https://www.postgresql.org/docs/current/sql-altersubscription.html)
8862+
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
8863+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
8864+
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
8865+
pub struct AlterSubscription {
8866+
/// Subscription name.
8867+
pub name: ObjectName,
8868+
/// Operation to perform.
8869+
pub operation: AlterSubscriptionOperation,
8870+
}
8871+
8872+
impl fmt::Display for AlterSubscription {
8873+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
8874+
write!(f, "ALTER SUBSCRIPTION {} {}", self.name, self.operation)
8875+
}
8876+
}
8877+
8878+
/// Operations supported by `ALTER SUBSCRIPTION`.
8879+
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
8880+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
8881+
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
8882+
pub enum AlterSubscriptionOperation {
8883+
/// Update the subscription connection string.
8884+
Connection {
8885+
/// New connection string.
8886+
connection: String,
8887+
},
8888+
/// Replace subscription publications.
8889+
SetPublication {
8890+
/// Publication names.
8891+
publications: Vec<Ident>,
8892+
/// Optional `WITH (...)` parameters.
8893+
with_options: Vec<SubscriptionOption>,
8894+
},
8895+
/// Add publications to the subscription.
8896+
AddPublication {
8897+
/// Publication names.
8898+
publications: Vec<Ident>,
8899+
/// Optional `WITH (...)` parameters.
8900+
with_options: Vec<SubscriptionOption>,
8901+
},
8902+
/// Drop publications from the subscription.
8903+
DropPublication {
8904+
/// Publication names.
8905+
publications: Vec<Ident>,
8906+
/// Optional `WITH (...)` parameters.
8907+
with_options: Vec<SubscriptionOption>,
8908+
},
8909+
/// Refresh subscription publications.
8910+
RefreshPublication {
8911+
/// Optional `WITH (...)` parameters.
8912+
with_options: Vec<SubscriptionOption>,
8913+
},
8914+
/// Enable the subscription.
8915+
Enable,
8916+
/// Disable the subscription.
8917+
Disable,
8918+
/// Set subscription parameters.
8919+
SetOptions {
8920+
/// Parameters within `SET (...)`.
8921+
options: Vec<SubscriptionOption>,
8922+
},
8923+
/// Change subscription owner.
8924+
OwnerTo {
8925+
/// New owner.
8926+
owner: Owner,
8927+
},
8928+
/// Rename the subscription.
8929+
RenameTo {
8930+
/// New subscription name.
8931+
new_name: ObjectName,
8932+
},
8933+
}
8934+
8935+
impl fmt::Display for AlterSubscriptionOperation {
8936+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
8937+
fn write_with_options(
8938+
f: &mut fmt::Formatter<'_>,
8939+
with_options: &[SubscriptionOption],
8940+
) -> fmt::Result {
8941+
if !with_options.is_empty() {
8942+
write!(f, " WITH ({})", display_comma_separated(with_options))?;
8943+
}
8944+
Ok(())
8945+
}
8946+
8947+
match self {
8948+
AlterSubscriptionOperation::Connection { connection } => {
8949+
write!(
8950+
f,
8951+
"CONNECTION '{}'",
8952+
value::escape_single_quote_string(connection)
8953+
)
8954+
}
8955+
AlterSubscriptionOperation::SetPublication {
8956+
publications,
8957+
with_options,
8958+
} => {
8959+
write!(
8960+
f,
8961+
"SET PUBLICATION {}",
8962+
display_comma_separated(publications)
8963+
)?;
8964+
write_with_options(f, with_options)
8965+
}
8966+
AlterSubscriptionOperation::AddPublication {
8967+
publications,
8968+
with_options,
8969+
} => {
8970+
write!(
8971+
f,
8972+
"ADD PUBLICATION {}",
8973+
display_comma_separated(publications)
8974+
)?;
8975+
write_with_options(f, with_options)
8976+
}
8977+
AlterSubscriptionOperation::DropPublication {
8978+
publications,
8979+
with_options,
8980+
} => {
8981+
write!(
8982+
f,
8983+
"DROP PUBLICATION {}",
8984+
display_comma_separated(publications)
8985+
)?;
8986+
write_with_options(f, with_options)
8987+
}
8988+
AlterSubscriptionOperation::RefreshPublication { with_options } => {
8989+
write!(f, "REFRESH PUBLICATION")?;
8990+
write_with_options(f, with_options)
8991+
}
8992+
AlterSubscriptionOperation::Enable => write!(f, "ENABLE"),
8993+
AlterSubscriptionOperation::Disable => write!(f, "DISABLE"),
8994+
AlterSubscriptionOperation::SetOptions { options } => {
8995+
write!(f, "SET ({})", display_comma_separated(options))
8996+
}
8997+
AlterSubscriptionOperation::OwnerTo { owner } => write!(f, "OWNER TO {owner}"),
8998+
AlterSubscriptionOperation::RenameTo { new_name } => write!(f, "RENAME TO {new_name}"),
8999+
}
9000+
}
9001+
}
9002+
87739003
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
87749004
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
87759005
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
@@ -11669,6 +11899,8 @@ impl fmt::Display for VacuumStatement {
1166911899
pub enum Reset {
1167011900
/// Resets all session parameters to their default values.
1167111901
ALL,
11902+
/// Resets session authorization to the session user.
11903+
SessionAuthorization,
1167211904

1167311905
/// Resets a specific session parameter to its default value.
1167411906
ConfigurationParameter(ObjectName),
@@ -11751,6 +11983,7 @@ impl fmt::Display for ResetStatement {
1175111983
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1175211984
match &self.reset {
1175311985
Reset::ALL => write!(f, "RESET ALL"),
11986+
Reset::SessionAuthorization => write!(f, "RESET SESSION AUTHORIZATION"),
1175411987
Reset::ConfigurationParameter(param) => write!(f, "RESET {}", param),
1175511988
}
1175611989
}
@@ -11888,6 +12121,12 @@ impl From<CreateServerStatement> for Statement {
1188812121
}
1188912122
}
1189012123

12124+
impl From<CreateSubscription> for Statement {
12125+
fn from(c: CreateSubscription) -> Self {
12126+
Self::CreateSubscription(c)
12127+
}
12128+
}
12129+
1189112130
impl From<CreateConnector> for Statement {
1189212131
fn from(c: CreateConnector) -> Self {
1189312132
Self::CreateConnector(c)
@@ -11918,6 +12157,12 @@ impl From<AlterSchema> for Statement {
1191812157
}
1191912158
}
1192012159

12160+
impl From<AlterSubscription> for Statement {
12161+
fn from(a: AlterSubscription) -> Self {
12162+
Self::AlterSubscription(a)
12163+
}
12164+
}
12165+
1192112166
impl From<AlterType> for Statement {
1192212167
fn from(a: AlterType) -> Self {
1192312168
Self::AlterType(a)

src/ast/spans.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,7 @@ impl Spanned for Statement {
382382
Statement::DropOperatorClass(drop_operator_class) => drop_operator_class.span(),
383383
Statement::CreateSecret { .. } => Span::empty(),
384384
Statement::CreateServer { .. } => Span::empty(),
385+
Statement::CreateSubscription { .. } => Span::empty(),
385386
Statement::CreateConnector { .. } => Span::empty(),
386387
Statement::CreateOperator(create_operator) => create_operator.span(),
387388
Statement::CreateOperatorFamily(create_operator_family) => {
@@ -407,6 +408,7 @@ impl Spanned for Statement {
407408
Statement::AlterOperatorFamily { .. } => Span::empty(),
408409
Statement::AlterOperatorClass { .. } => Span::empty(),
409410
Statement::AlterRole { .. } => Span::empty(),
411+
Statement::AlterSubscription { .. } => Span::empty(),
410412
Statement::AlterSession { .. } => Span::empty(),
411413
Statement::AttachDatabase { .. } => Span::empty(),
412414
Statement::AttachDuckDBDatabase { .. } => Span::empty(),

src/keywords.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,7 @@ define_keywords!(
808808
PROGRAM,
809809
PROJECTION,
810810
PUBLIC,
811+
PUBLICATION,
811812
PURCHASE,
812813
PURGE,
813814
QUALIFY,
@@ -994,6 +995,7 @@ define_keywords!(
994995
STRUCT,
995996
SUBMULTISET,
996997
SUBSCRIPT,
998+
SUBSCRIPTION,
997999
SUBSTR,
9981000
SUBSTRING,
9991001
SUBSTRING_REGEX,

0 commit comments

Comments
 (0)