Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions doc/user/content/reference/system-catalog/mz_catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ other objects in the system catalog.
<!-- RELATION_SPEC mz_catalog.mz_audit_events -->
Field | Type | Meaning
----------------|------------------------------|--------
`id ` | [`uint8`] | Materialize's unique, monotonically increasing ID for the event.
`event_type` | [`text`] | The type of the event: `create`, `drop`, or `alter`.
`object_type` | [`text`] | The type of the affected object: `cluster`, `cluster-replica`, `connection`, `database`, `function`, `index`, `materialized-view`, `role`, `schema`, `secret`, `sink`, `source`, `table`, `type`, or `view`.
`id` | [`uint8`] | Materialize's unique, monotonically increasing ID for the event.
`event_type` | [`text`] | The type of the event: `create`, `drop`, `alter`, `grant`, `revoke`, or `comment`.
`object_type` | [`text`] | The type of the affected object: `cluster`, `cluster-replica`, `connection`, `continual-task`, `database`, `func`, `index`, `materialized-view`, `network-policy`, `role`, `schema`, `secret`, `sink`, `source`, `system`, `table`, `type`, or `view`.
`details` | [`jsonb`] | Additional details about the event. The shape of the details varies based on `event_type` and `object_type`.
`user` | [`text`] | The user who triggered the event, or `NULL` if triggered by the system.
`occurred_at` | [`timestamp with time zone`] | The time at which the event occurred. Guaranteed to be in order of event creation. Events created in the same transaction will have identical values.
Expand Down
5 changes: 2 additions & 3 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,7 @@ impl Catalog {
let mut storage = openable_storage
.open(now().into(), &bootstrap_args)
.await
.expect("can open durable catalog")
.0;
.expect("can open durable catalog");
// Drain updates.
let _ = storage
.sync_to_current_updates()
Expand All @@ -387,7 +386,7 @@ impl Catalog {
.with_default_deploy_generation()
.build()
.await?;
let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
let storage = openable_storage.open(now().into(), bootstrap_args).await?;
let system_parameter_defaults = BTreeMap::default();
Self::open_debug_catalog_inner(
persist_client,
Expand Down
33 changes: 5 additions & 28 deletions src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1460,22 +1460,6 @@ impl CatalogState {
}
}

/// Generate a list of `BuiltinTableUpdate`s that correspond to a list of updates made to the
/// durable catalog.
#[instrument]
pub(crate) fn generate_builtin_table_updates(
&self,
updates: Vec<StateUpdate>,
) -> Vec<BuiltinTableUpdate> {
let mut builtin_table_updates = Vec::new();
for StateUpdate { kind, ts: _, diff } in updates {
let builtin_table_update = self.generate_builtin_table_update(kind, diff);
let builtin_table_update = self.resolve_builtin_table_updates(builtin_table_update);
builtin_table_updates.extend(builtin_table_update);
}
builtin_table_updates
}

/// Generate a list of `BuiltinTableUpdate`s that correspond to a single update made to the
/// durable catalog.
#[instrument(level = "debug")]
Expand Down Expand Up @@ -1528,21 +1512,14 @@ impl CatalogState {
}
StateUpdateKind::TemporaryItem(item) => self.pack_item_update(item.id, diff),
StateUpdateKind::Item(item) => self.pack_item_update(item.id, diff),
StateUpdateKind::Comment(comment) => vec![self.pack_comment_update(
comment.object_id,
comment.sub_component,
&comment.comment,
diff,
)],
StateUpdateKind::Comment(_) => Vec::new(),
StateUpdateKind::SourceReferences(source_references) => {
self.pack_source_references_update(&source_references, diff)
}
StateUpdateKind::AuditLog(audit_log) => {
vec![
self.pack_audit_log_update(&audit_log.event, diff)
.expect("could not pack audit log update"),
]
}
// mz_audit_events is a MaterializedView backed by
// mz_internal.mz_catalog_raw, so audit log rows do not produce
// builtin table updates here.
StateUpdateKind::AuditLog(_) => Vec::new(),
StateUpdateKind::Database(_)
| StateUpdateKind::Schema(_)
| StateUpdateKind::NetworkPolicy(_)
Expand Down
113 changes: 5 additions & 108 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ mod notice;
use bytesize::ByteSize;
use ipnet::IpNet;
use mz_adapter_types::compaction::CompactionWindow;
use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
use mz_audit_log::VersionedStorageUsage;
use mz_catalog::SYSTEM_CONN_ID;
use mz_catalog::builtin::{
BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AWS_CONNECTIONS,
MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZE_INTERNAL,
MZ_CLUSTER_REPLICA_SIZES, MZ_COLUMNS, MZ_COMMENTS, MZ_EGRESS_IPS, MZ_FUNCTIONS,
MZ_CLUSTER_REPLICA_SIZES, MZ_COLUMNS, MZ_EGRESS_IPS, MZ_FUNCTIONS,
MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_KAFKA_CONNECTIONS,
MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES, MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES,
MZ_MAP_TYPES, MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES,
Expand All @@ -29,7 +29,7 @@ use mz_catalog::builtin::{
};
use mz_catalog::config::AwsPrincipalContext;
use mz_catalog::durable::SourceReferences;
use mz_catalog::memory::error::{Error, ErrorKind};
use mz_catalog::memory::error::Error;
use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, Connection, DataSourceDesc, Func, Index, MaterializedView, Sink,
Table, TableDataSource, Type, View,
Expand All @@ -52,7 +52,7 @@ use mz_repr::{
use mz_sql::ast::{CreateIndexStatement, Statement, UnresolvedItemName};
use mz_sql::catalog::{CatalogType, TypeCategory};
use mz_sql::func::FuncImplCatalogDetails;
use mz_sql::names::{CommentObjectId, SchemaSpecifier};
use mz_sql::names::SchemaSpecifier;
use mz_sql::plan::{ConnectionDetails, SshKey};
use mz_sql_parser::ast::display::AstDisplay;
use mz_storage_client::client::TableData;
Expand Down Expand Up @@ -1353,57 +1353,6 @@ impl CatalogState {
)
}

pub fn pack_audit_log_update(
&self,
event: &VersionedEvent,
diff: Diff,
) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
let (event_type, object_type, details, user, occurred_at): (
&EventType,
&ObjectType,
&EventDetails,
&Option<String>,
u64,
) = match event {
VersionedEvent::V1(ev) => (
&ev.event_type,
&ev.object_type,
&ev.details,
&ev.user,
ev.occurred_at,
),
};
let details = Jsonb::from_serde_json(details.as_json())
.map_err(|e| {
Error::new(ErrorKind::Unstructured(format!(
"could not pack audit log update: {}",
e
)))
})?
.into_row();
let details = details
.iter()
.next()
.expect("details created above with a single jsonb column");
let dt = mz_ore::now::to_datetime(occurred_at);
let id = event.sortable_id();
Ok(BuiltinTableUpdate::row(
&*MZ_AUDIT_EVENTS,
Row::pack_slice(&[
Datum::UInt64(id),
Datum::String(&format!("{}", event_type)),
Datum::String(&format!("{}", object_type)),
details,
match user {
Some(user) => Datum::String(user),
None => Datum::Null,
},
Datum::TimestampTz(dt.try_into().expect("must fit")),
]),
diff,
))
}

pub fn pack_storage_usage_update(
&self,
VersionedStorageUsage::V1(event): VersionedStorageUsage,
Expand Down Expand Up @@ -1576,58 +1525,6 @@ impl CatalogState {
row
}

pub fn pack_comment_update(
&self,
object_id: CommentObjectId,
column_pos: Option<usize>,
comment: &str,
diff: Diff,
) -> BuiltinTableUpdate<&'static BuiltinTable> {
// Use the audit log representation so it's easier to join against.
let object_type = mz_sql::catalog::ObjectType::from(object_id);
let audit_type = super::object_type_to_audit_object_type(object_type);
let object_type_str = audit_type.to_string();

let object_id_str = match object_id {
CommentObjectId::Table(global_id)
| CommentObjectId::View(global_id)
| CommentObjectId::MaterializedView(global_id)
| CommentObjectId::Source(global_id)
| CommentObjectId::Sink(global_id)
| CommentObjectId::Index(global_id)
| CommentObjectId::Func(global_id)
| CommentObjectId::Connection(global_id)
| CommentObjectId::Secret(global_id)
| CommentObjectId::Type(global_id) => global_id.to_string(),
CommentObjectId::Role(role_id) => role_id.to_string(),
CommentObjectId::Database(database_id) => database_id.to_string(),
CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
};
let column_pos_datum = match column_pos {
Some(pos) => {
// TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
let pos =
i32::try_from(pos).expect("we constrain this value in the planning layer");
Datum::Int32(pos)
}
None => Datum::Null,
};

BuiltinTableUpdate::row(
&*MZ_COMMENTS,
Row::pack_slice(&[
Datum::String(&object_id_str),
Datum::String(&object_type_str),
column_pos_datum,
Datum::String(comment),
]),
diff,
)
}

pub fn pack_webhook_source_update(
&self,
item_id: CatalogItemId,
Expand Down
12 changes: 12 additions & 0 deletions src/adapter/src/catalog/open/builtin_schema_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,18 @@ static MIGRATIONS: LazyLock<Vec<MigrationStep>> = LazyLock::new(|| {
MZ_CATALOG_SCHEMA,
"mz_system_privileges",
),
MigrationStep::replacement(
"26.31.0-dev.0",
CatalogItemType::MaterializedView,
MZ_INTERNAL_SCHEMA,
"mz_comments",
),
MigrationStep::replacement(
"26.31.0-dev.0",
CatalogItemType::MaterializedView,
MZ_CATALOG_SCHEMA,
"mz_audit_events",
),
]
});

Expand Down
Loading
Loading