Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions src/adapter/src/catalog/transact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,16 +887,36 @@ impl Catalog {
}

let mut new_entry = entry.clone();
new_entry
let previous = new_entry
.item
.update_timestamp_interval(value, interval)
.update_timestamp_interval(value.clone(), interval)
.map_err(|_| {
AdapterError::Catalog(Error::new(ErrorKind::Internal(
"planner should have rejected invalid alter timestamp interval item type"
.to_string(),
)))
})?;

if Self::should_audit_log_item(new_entry.item()) {
let details = EventDetails::AlterSourceTimestampIntervalV1(
mz_audit_log::AlterSourceTimestampIntervalV1 {
id: id.to_string(),
old_interval: previous.map(|previous| previous.to_string()),
new_interval: value.map(|v| v.to_string()),
},
);
CatalogState::add_to_audit_log(
&state.system_configuration,
oracle_write_ts,
session,
tx,
audit_events,
EventType::Alter,
catalog_type_to_audit_object_type(new_entry.item().typ()),
details,
)?;
}

tx.update_item(id, new_entry.into())?;

Self::log_update(state, &id);
Expand Down Expand Up @@ -984,6 +1004,9 @@ impl Catalog {
typ,
sql,
} => {
let column_name = name.to_string();
let column_type = typ.to_string();
let nullable = typ.nullable;
let mut new_entry = state.get_entry(&id).clone();
let version = new_entry.item.add_column(name, typ, sql)?;
// All versions of a table share the same shard, so it shouldn't matter what
Expand All @@ -998,6 +1021,25 @@ impl Catalog {
};
table.collections.insert(version, new_global_id);

if Self::should_audit_log_item(new_entry.item()) {
let details = EventDetails::AlterAddColumnV1(mz_audit_log::AlterAddColumnV1 {
id: id.to_string(),
column: column_name,
column_type,
nullable,
});
CatalogState::add_to_audit_log(
&state.system_configuration,
oracle_write_ts,
session,
tx,
audit_events,
EventType::Alter,
catalog_type_to_audit_object_type(new_entry.item().typ()),
details,
)?;
}

tx.update_item(id, new_entry.into())?;
storage_collections_to_register.insert(new_global_id, shard_id);
}
Expand Down
43 changes: 43 additions & 0 deletions src/audit-log/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ pub enum EventDetails {
UpdateItemV1(UpdateItemV1),
RenameSchemaV1(RenameSchemaV1),
AlterRetainHistoryV1(AlterRetainHistoryV1),
AlterAddColumnV1(AlterAddColumnV1),
AlterSourceTimestampIntervalV1(AlterSourceTimestampIntervalV1),
ToNewIdV1(ToNewIdV1),
FromPreviousIdV1(FromPreviousIdV1),
SetV1(SetV1),
Expand Down Expand Up @@ -1080,6 +1082,43 @@ pub struct AlterRetainHistoryV1 {
pub new_history: Option<String>,
}

#[derive(
Clone,
Debug,
Serialize,
Deserialize,
PartialOrd,
PartialEq,
Eq,
Ord,
Hash,
Arbitrary
)]
pub struct AlterAddColumnV1 {
pub id: String,
pub column: String,
pub column_type: String,
pub nullable: bool,
}

#[derive(
Clone,
Debug,
Serialize,
Deserialize,
PartialOrd,
PartialEq,
Eq,
Ord,
Hash,
Arbitrary
)]
pub struct AlterSourceTimestampIntervalV1 {
pub id: String,
pub old_interval: Option<String>,
pub new_interval: Option<String>,
}

#[derive(
Clone,
Debug,
Expand Down Expand Up @@ -1192,6 +1231,10 @@ impl EventDetails {
EventDetails::AlterRetainHistoryV1(v) => {
serde_json::to_value(v).expect("must serialize")
}
EventDetails::AlterAddColumnV1(v) => serde_json::to_value(v).expect("must serialize"),
EventDetails::AlterSourceTimestampIntervalV1(v) => {
serde_json::to_value(v).expect("must serialize")
}
EventDetails::ToNewIdV1(v) => serde_json::to_value(v).expect("must serialize"),
EventDetails::FromPreviousIdV1(v) => serde_json::to_value(v).expect("must serialize"),
EventDetails::SetV1(v) => serde_json::to_value(v).expect("must serialize"),
Expand Down
6 changes: 5 additions & 1 deletion src/catalog-protos/objects_hashes.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"name": "objects.rs",
"md5": "dfd89e1c62b7f1663d6429846a76159f"
"md5": "8f899255990eec45ad8f1380d6f80bce"
},
{
"name": "objects_v74.rs",
Expand Down Expand Up @@ -38,5 +38,9 @@
{
"name": "objects_v82.rs",
"md5": "dfd89e1c62b7f1663d6429846a76159f"
},
{
"name": "objects_v83.rs",
"md5": "8f899255990eec45ad8f1380d6f80bce"
}
]
66 changes: 59 additions & 7 deletions src/catalog-protos/src/audit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
//! because of Rust's orphan rules.

use mz_audit_log::{
AlterApplyReplacementV1, AlterDefaultPrivilegeV1, AlterRetainHistoryV1, AlterSetClusterV1,
AlterSourceSinkV1, CreateClusterReplicaV1, CreateClusterReplicaV2, CreateClusterReplicaV3,
CreateClusterReplicaV4, CreateIndexV1, CreateMaterializedViewV1,
CreateOrDropClusterReplicaReasonV1, CreateRoleV1, CreateSourceSinkV1, CreateSourceSinkV2,
CreateSourceSinkV3, CreateSourceSinkV4, DropClusterReplicaV1, DropClusterReplicaV2,
DropClusterReplicaV3, EventDetails, EventType, EventV1, FromPreviousIdV1, FullNameV1,
GrantRoleV1, GrantRoleV2, IdFullNameV1, IdNameV1, RefreshDecisionWithReasonV1,
AlterAddColumnV1, AlterApplyReplacementV1, AlterDefaultPrivilegeV1, AlterRetainHistoryV1,
AlterSetClusterV1, AlterSourceSinkV1, AlterSourceTimestampIntervalV1, CreateClusterReplicaV1,
CreateClusterReplicaV2, CreateClusterReplicaV3, CreateClusterReplicaV4, CreateIndexV1,
CreateMaterializedViewV1, CreateOrDropClusterReplicaReasonV1, CreateRoleV1, CreateSourceSinkV1,
CreateSourceSinkV2, CreateSourceSinkV3, CreateSourceSinkV4, DropClusterReplicaV1,
DropClusterReplicaV2, DropClusterReplicaV3, EventDetails, EventType, EventV1, FromPreviousIdV1,
FullNameV1, GrantRoleV1, GrantRoleV2, IdFullNameV1, IdNameV1, RefreshDecisionWithReasonV1,
RefreshDecisionWithReasonV2, RenameClusterReplicaV1, RenameClusterV1, RenameItemV1,
RenameSchemaV1, RevokeRoleV1, RevokeRoleV2, RotateKeysV1, SchedulingDecisionV1,
SchedulingDecisionsWithReasonsV1, SchedulingDecisionsWithReasonsV2, SchemaV1, SchemaV2, SetV1,
Expand Down Expand Up @@ -1188,6 +1188,50 @@ impl RustType<crate::objects::audit_log_event_v1::AlterRetainHistoryV1> for Alte
}
}

impl RustType<crate::objects::audit_log_event_v1::AlterAddColumnV1> for AlterAddColumnV1 {
fn into_proto(&self) -> crate::objects::audit_log_event_v1::AlterAddColumnV1 {
crate::objects::audit_log_event_v1::AlterAddColumnV1 {
id: self.id.to_string(),
column: self.column.clone(),
column_type: self.column_type.clone(),
nullable: self.nullable,
}
}

fn from_proto(
proto: crate::objects::audit_log_event_v1::AlterAddColumnV1,
) -> Result<Self, TryFromProtoError> {
Ok(AlterAddColumnV1 {
id: proto.id,
column: proto.column,
column_type: proto.column_type,
nullable: proto.nullable,
})
}
}

impl RustType<crate::objects::audit_log_event_v1::AlterSourceTimestampIntervalV1>
for AlterSourceTimestampIntervalV1
{
fn into_proto(&self) -> crate::objects::audit_log_event_v1::AlterSourceTimestampIntervalV1 {
crate::objects::audit_log_event_v1::AlterSourceTimestampIntervalV1 {
id: self.id.to_string(),
old_interval: self.old_interval.clone(),
new_interval: self.new_interval.clone(),
}
}

fn from_proto(
proto: crate::objects::audit_log_event_v1::AlterSourceTimestampIntervalV1,
) -> Result<Self, TryFromProtoError> {
Ok(AlterSourceTimestampIntervalV1 {
id: proto.id,
old_interval: proto.old_interval,
new_interval: proto.new_interval,
})
}
}

impl RustType<crate::objects::audit_log_event_v1::ToNewIdV1> for ToNewIdV1 {
fn into_proto(&self) -> crate::objects::audit_log_event_v1::ToNewIdV1 {
crate::objects::audit_log_event_v1::ToNewIdV1 {
Expand Down Expand Up @@ -1342,6 +1386,10 @@ impl RustType<crate::objects::audit_log_event_v1::Details> for EventDetails {
EventDetails::AlterRetainHistoryV1(details) => {
AlterRetainHistoryV1(details.into_proto())
}
EventDetails::AlterAddColumnV1(details) => AlterAddColumnV1(details.into_proto()),
EventDetails::AlterSourceTimestampIntervalV1(details) => {
AlterSourceTimestampIntervalV1(details.into_proto())
}
EventDetails::ToNewIdV1(details) => ToNewIdV1(details.into_proto()),
EventDetails::FromPreviousIdV1(details) => FromPreviousIdV1(details.into_proto()),
EventDetails::SetV1(details) => SetV1(details.into_proto()),
Expand Down Expand Up @@ -1428,6 +1476,10 @@ impl RustType<crate::objects::audit_log_event_v1::Details> for EventDetails {
ResetAllV1(Empty {}) => Ok(EventDetails::ResetAllV1),
RotateKeysV1(details) => Ok(EventDetails::RotateKeysV1(details.into_rust()?)),
CreateRoleV1(details) => Ok(EventDetails::CreateRoleV1(details.into_rust()?)),
AlterAddColumnV1(details) => Ok(EventDetails::AlterAddColumnV1(details.into_rust()?)),
AlterSourceTimestampIntervalV1(details) => Ok(
EventDetails::AlterSourceTimestampIntervalV1(details.into_rust()?),
),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/catalog-protos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ pub mod objects_v79;
pub mod objects_v80;
pub mod objects_v81;
pub mod objects_v82;
pub mod objects_v83;
pub mod serialization;

/// The current version of the `Catalog`.
///
/// We will initialize new `Catalog`s with this version, and migrate existing `Catalog`s to this
/// version. Whenever the `Catalog` changes, e.g. the types we serialize in the `Catalog`
/// change, we need to bump this version.
pub const CATALOG_VERSION: u64 = 82;
pub const CATALOG_VERSION: u64 = 83;

/// The minimum `Catalog` version number that we support migrating from.
///
Expand Down
37 changes: 37 additions & 0 deletions src/catalog-protos/src/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2298,6 +2298,41 @@ pub mod audit_log_event_v1 {
pub new_history: Option<String>,
}

#[derive(
Clone,
Debug,
PartialEq,
Eq,
PartialOrd,
Ord,
Serialize,
Deserialize,
Arbitrary
)]
pub struct AlterAddColumnV1 {
pub id: String,
pub column: String,
pub column_type: String,
pub nullable: bool,
}

#[derive(
Clone,
Debug,
PartialEq,
Eq,
PartialOrd,
Ord,
Serialize,
Deserialize,
Arbitrary
)]
pub struct AlterSourceTimestampIntervalV1 {
pub id: String,
pub old_interval: Option<String>,
pub new_interval: Option<String>,
}

#[derive(
Clone,
Debug,
Expand Down Expand Up @@ -2490,6 +2525,8 @@ pub mod audit_log_event_v1 {
CreateMaterializedViewV1(CreateMaterializedViewV1),
AlterApplyReplacementV1(AlterApplyReplacementV1),
CreateRoleV1(CreateRoleV1),
AlterAddColumnV1(AlterAddColumnV1),
AlterSourceTimestampIntervalV1(AlterSourceTimestampIntervalV1),
}
}

Expand Down
Loading
Loading