Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions crates/core/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ pub use table::{
DeleteGsiAction, DeleteTableInput, DeleteTableOutput, DescribeLimitsOutput, DescribeTableInput,
DescribeTableOutput, DescribeTimeToLiveInput, DescribeTimeToLiveOutput,
GlobalSecondaryIndexUpdate, GsiDescription, GsiInput, ListTablesInput, ListTablesOutput,
ListTagsOfResourceInput, ListTagsOfResourceOutput, LsiDescription, LsiInput, Projection,
ProjectionType, ProvisionedThroughput, ProvisionedThroughputDescription, SseDescription,
SseType, StreamSpecification, StreamViewType, TableDescription, TableStatus, Tag,
TagResourceInput, TimeToLiveDescription, TimeToLiveSpecification,
TimeToLiveSpecificationOutput, TimeToLiveStatus, UntagResourceInput, UpdateGsiAction,
UpdateTableInput, UpdateTableOutput, UpdateTimeToLiveInput, UpdateTimeToLiveOutput,
ListTagsOfResourceInput, ListTagsOfResourceOutput, LsiDescription, LsiInput,
OnDemandThroughput, Projection, ProjectionType, ProvisionedThroughput,
ProvisionedThroughputDescription, SseDescription, SseType, StreamSpecification, StreamViewType,
TableDescription, TableStatus, Tag, TagResourceInput, TimeToLiveDescription,
TimeToLiveSpecification, TimeToLiveSpecificationOutput, TimeToLiveStatus, UntagResourceInput,
UpdateGsiAction, UpdateTableInput, UpdateTableOutput, UpdateTimeToLiveInput,
UpdateTimeToLiveOutput,
};
pub use transaction::{
CancellationReason, ItemResponse, TransactConditionCheck, TransactDelete, TransactGet,
Expand Down
88 changes: 88 additions & 0 deletions crates/core/src/types/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,17 @@ pub struct SseDescription {
pub status: String,
#[serde(rename = "SSEType", skip_serializing_if = "Option::is_none")]
pub sse_type: Option<SseType>,
#[serde(rename = "KMSMasterKeyArn", skip_serializing_if = "Option::is_none")]
pub kms_master_key_arn: Option<String>,
}

/// On-demand throughput settings for a table or index.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct OnDemandThroughput {
#[serde(rename = "MaxReadRequestUnits")]
pub max_read_request_units: Option<i64>,
#[serde(rename = "MaxWriteRequestUnits")]
pub max_write_request_units: Option<i64>,
}

/// Summary of the table's billing mode and last update timestamp.
Expand Down Expand Up @@ -264,6 +275,8 @@ pub struct TableDescription {
pub sse_description: Option<SseDescription>,
#[serde(rename = "TableClassSummary", skip_serializing_if = "Option::is_none")]
pub table_class_summary: Option<serde_json::Value>,
#[serde(rename = "OnDemandThroughput", skip_serializing_if = "Option::is_none")]
pub on_demand_throughput: Option<OnDemandThroughput>,
}

/// `CreateTable` request body.
Expand Down Expand Up @@ -293,6 +306,8 @@ pub struct CreateTableInput {
pub deletion_protection_enabled: Option<bool>,
#[serde(rename = "TableClass")]
pub table_class: Option<String>,
#[serde(rename = "OnDemandThroughput")]
pub on_demand_throughput: Option<OnDemandThroughput>,
}

/// `CreateTable` response body.
Expand Down Expand Up @@ -414,6 +429,10 @@ pub struct UpdateTableInput {
pub attribute_definitions: Option<Vec<AttributeDefinition>>,
#[serde(rename = "StreamSpecification")]
pub stream_specification: Option<StreamSpecification>,
#[serde(rename = "TableClass")]
Comment thread
jcshepherd marked this conversation as resolved.
pub table_class: Option<String>,
#[serde(rename = "OnDemandThroughput")]
pub on_demand_throughput: Option<OnDemandThroughput>,
}

/// `UpdateTable` response body.
Expand Down Expand Up @@ -544,3 +563,72 @@ pub struct DescribeLimitsOutput {
#[serde(rename = "TableMaxWriteCapacityUnits")]
pub table_max_write_capacity_units: i64,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn on_demand_throughput_round_trips_json() {
let odt = OnDemandThroughput {
max_read_request_units: Some(100),
max_write_request_units: Some(50),
};
let json = serde_json::to_value(&odt).unwrap();
assert_eq!(json["MaxReadRequestUnits"], 100);
assert_eq!(json["MaxWriteRequestUnits"], 50);
let parsed: OnDemandThroughput = serde_json::from_value(json).unwrap();
assert_eq!(parsed, odt);
}

#[test]
fn on_demand_throughput_deserializes_from_input() {
let input_json = r#"{"MaxReadRequestUnits": 10, "MaxWriteRequestUnits": 5}"#;
let odt: OnDemandThroughput = serde_json::from_str(input_json).unwrap();
assert_eq!(odt.max_read_request_units, Some(10));
assert_eq!(odt.max_write_request_units, Some(5));
}

#[test]
fn sse_description_serializes_with_kms_arn() {
let sse = SseDescription {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does SSEDescription include InaccessibleEncryptionDateTime ? Should it? (Doesn't necessarily have to be part of this review, but it looks to be missing.)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed as we don't simulate inaccessible keys. The field would always be null. Can add later if a customer scenario requires it.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Okay. It's part of the DDB public API model though, isn't it? I'm okay with taking care of it separately, but it's not clear to me that it can be left out from an API model conformance perspective. (Unless we are accepting it in the API but just ignoring it?)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just ignoring it, yes. Its completely pass through. We have no way to simulate a non-conformant KMS key. Because we don't do KMS encryption.

status: "ENABLED".to_string(),
sse_type: Some(SseType::KMS),
kms_master_key_arn: Some("arn:aws:kms:us-east-1:123456789012:key/default".to_string()),
};
let json = serde_json::to_value(&sse).unwrap();
assert_eq!(json["Status"], "ENABLED");
assert_eq!(json["SSEType"], "KMS");
assert_eq!(
json["KMSMasterKeyArn"],
"arn:aws:kms:us-east-1:123456789012:key/default"
);
}

#[test]
fn sse_description_omits_none_fields() {
let sse = SseDescription {
status: "ENABLED".to_string(),
sse_type: None,
kms_master_key_arn: None,
};
let json = serde_json::to_value(&sse).unwrap();
assert_eq!(json["Status"], "ENABLED");
assert!(json.get("SSEType").is_none());
assert!(json.get("KMSMasterKeyArn").is_none());
}

#[test]
fn create_table_input_deserializes_on_demand_throughput() {
let json = r#"{
"TableName": "T",
"KeySchema": [{"AttributeName": "pk", "KeyType": "HASH"}],
"AttributeDefinitions": [{"AttributeName": "pk", "AttributeType": "S"}],
"OnDemandThroughput": {"MaxReadRequestUnits": 10, "MaxWriteRequestUnits": 5}
}"#;
let input: CreateTableInput = serde_json::from_str(json).unwrap();
let odt = input.on_demand_throughput.unwrap();
assert_eq!(odt.max_read_request_units, Some(10));
assert_eq!(odt.max_write_request_units, Some(5));
}
}
1 change: 1 addition & 0 deletions crates/core/src/validation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,7 @@ mod tests {
tags: None,
deletion_protection_enabled: None,
table_class: None,
on_demand_throughput: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/engine/src/import_export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ fn create_table_input_from_params(tcp: &TableCreationParameters) -> CreateTableI
tags: None,
deletion_protection_enabled: None,
table_class: None,
on_demand_throughput: None,
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/engine/src/update_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub async fn handle_update_table(
&& input.provisioned_throughput.is_none()
&& input.deletion_protection_enabled.is_none()
&& input.stream_specification.is_none()
&& input.table_class.is_none()
&& input.on_demand_throughput.is_none()
&& !has_gsi_updates
{
return Err(DynamoDbError::ValidationException(
Expand Down
3 changes: 3 additions & 0 deletions crates/storage-postgres/migrations/001_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ CREATE TABLE IF NOT EXISTS tables (
status_transition_at TIMESTAMPTZ,
stream_label TEXT,
ttl_index_ready BOOLEAN NOT NULL DEFAULT FALSE,
table_class TEXT,
sse_specification JSONB,
on_demand_throughput JSONB,
PRIMARY KEY (account_id, table_name),
CONSTRAINT tables_table_id_unique UNIQUE (table_id)
);
Expand Down
1 change: 1 addition & 0 deletions crates/storage-postgres/src/backup_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ impl BackupEngine for PostgresEngine {
deletion_protection_enabled: None,
sse_specification: None,
table_class: None,
on_demand_throughput: None,
};

let desc = self.create_table(&account_id, create_input).await?;
Expand Down
42 changes: 37 additions & 5 deletions crates/storage-postgres/src/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use extenddb_core::types::{
BillingMode, BillingModeSummary, CreateTableInput, GsiDescription, LsiDescription,
ProvisionedThroughputDescription, TableDescription, TableStatus,
ProvisionedThroughputDescription, SseDescription, SseType, TableDescription, TableStatus,
};
use extenddb_storage::error::StorageError;
use extenddb_storage::util::{index_arn, stream_arn, table_arn};
Expand Down Expand Up @@ -45,6 +45,13 @@ impl PostgresEngine {
.transpose()
.map_err(|e| StorageError::Internal(e.to_string()))?;
let deletion_protection = input.deletion_protection_enabled.unwrap_or(false);
let sse_spec_json = input.sse_specification.as_ref().cloned();
let on_demand_json = input
.on_demand_throughput
.as_ref()
.map(serde_json::to_value)
.transpose()
.map_err(|e| StorageError::Internal(e.to_string()))?;

let mut tx = self
.pool
Expand All @@ -68,15 +75,16 @@ impl PostgresEngine {
(account_id, table_name, key_schema, attribute_definitions, billing_mode,
provisioned_throughput, stream_specification, table_status,
creation_date_time, table_arn, table_id, deletion_protection_enabled,
status_transition_at)
status_transition_at, table_class, sse_specification, on_demand_throughput)
VALUES ($1, $2, $3, $4, $5, $6, $7,
CASE WHEN (SELECT secs FROM delay) = 0
THEN 'ACTIVE' ELSE 'CREATING' END,
NOW(), $8, $9, $10,
CASE WHEN (SELECT secs FROM delay) = 0
THEN NULL
ELSE NOW() + make_interval(secs => (SELECT secs FROM delay))
END)
END,
$11, $12, $13)
RETURNING EXTRACT(EPOCH FROM creation_date_time)::FLOAT8, table_status",
)
.bind(account_id)
Expand All @@ -89,6 +97,9 @@ impl PostgresEngine {
.bind(&table_arn)
.bind(&table_id)
.bind(deletion_protection)
.bind(&input.table_class)
.bind(&sse_spec_json)
.bind(&on_demand_json)
.fetch_one(&mut *tx)
.await
.map_err(|e| match &e {
Expand Down Expand Up @@ -386,8 +397,29 @@ impl PostgresEngine {
latest_stream_arn,
latest_stream_label: stream_label,
deletion_protection_enabled: input.deletion_protection_enabled.unwrap_or(false),
sse_description: None,
table_class_summary: None,
sse_description: input.sse_specification.as_ref().and_then(|spec| {
let enabled = spec
.get("Enabled")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if enabled {
Some(SseDescription {
status: "ENABLED".to_string(),
sse_type: Some(SseType::KMS),
kms_master_key_arn: Some(format!(
"arn:aws:kms:{}:{}:key/default",
self.region, account_id
)),
})
} else {
None
}
}),
table_class_summary: input
.table_class
.as_ref()
.map(|tc| serde_json::json!({ "TableClass": tc })),
on_demand_throughput: input.on_demand_throughput,
})
}
}
3 changes: 2 additions & 1 deletion crates/storage-postgres/src/delete_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ impl PostgresEngine {
provisioned_throughput, stream_specification, table_status,
EXTRACT(EPOCH FROM creation_date_time)::FLOAT8 as creation_epoch,
table_size_bytes, item_count, table_arn, table_id,
deletion_protection_enabled, stream_label
deletion_protection_enabled, stream_label,
table_class, sse_specification, on_demand_throughput
FROM tables WHERE account_id = $1 AND table_name = $2 AND table_status IN ('ACTIVE', 'CREATING')
FOR UPDATE",
)
Expand Down
36 changes: 32 additions & 4 deletions crates/storage-postgres/src/table_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

use extenddb_core::types::{
AttributeDefinition, BillingMode, BillingModeSummary, GsiDescription, KeySchemaElement,
LsiDescription, Projection, ProvisionedThroughputDescription, TableDescription, TableStatus,
LsiDescription, Projection, ProvisionedThroughputDescription, SseDescription, SseType,
TableDescription, TableStatus,
};
use extenddb_storage::error::StorageError;
use extenddb_storage::util::{index_arn, stream_arn};
Expand All @@ -30,6 +31,9 @@ pub(crate) struct TableRow {
pub table_id: String,
pub deletion_protection_enabled: bool,
pub stream_label: Option<String>,
pub table_class: Option<String>,
pub sse_specification: Option<serde_json::Value>,
pub on_demand_throughput: Option<serde_json::Value>,
}

/// Row type for index metadata queries.
Expand Down Expand Up @@ -149,7 +153,8 @@ impl PostgresEngine {
provisioned_throughput, stream_specification, table_status,
EXTRACT(EPOCH FROM creation_date_time)::FLOAT8 as creation_epoch,
table_size_bytes, item_count, table_arn, table_id,
deletion_protection_enabled, stream_label
deletion_protection_enabled, stream_label,
table_class, sse_specification, on_demand_throughput
FROM tables WHERE account_id = $1 AND table_name = $2",
)
.bind(account_id)
Expand Down Expand Up @@ -322,8 +327,31 @@ impl PostgresEngine {
latest_stream_arn,
latest_stream_label: row.stream_label,
deletion_protection_enabled: row.deletion_protection_enabled,
sse_description: None,
table_class_summary: None,
sse_description: row.sse_specification.as_ref().and_then(|spec| {
let enabled = spec
.get("Enabled")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if enabled {
Some(SseDescription {
status: "ENABLED".to_string(),
sse_type: Some(SseType::KMS),
kms_master_key_arn: Some(format!(
"arn:aws:kms:{}:{}:key/default",
self.region, account_id
)),
})
} else {
None
}
}),
table_class_summary: row
.table_class
.as_ref()
.map(|tc| serde_json::json!({ "TableClass": tc })),
on_demand_throughput: row
.on_demand_throughput
.and_then(|v| serde_json::from_value(v).ok()),
})
}
}
29 changes: 29 additions & 0 deletions crates/storage-postgres/src/update_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,35 @@ impl PostgresEngine {
}
}

// Apply table class change.
if let Some(tc) = &input.table_class {
sqlx::query(
"UPDATE tables SET table_class = $1 WHERE account_id = $2 AND table_name = $3",
)
.bind(tc)
.bind(account_id)
.bind(&input.table_name)
.execute(&mut *tx)
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;
}

// Apply on-demand throughput change.
if let Some(odt) = &input.on_demand_throughput {
let odt_json =
serde_json::to_value(odt).map_err(|e| StorageError::Internal(e.to_string()))?;
sqlx::query(
"UPDATE tables SET on_demand_throughput = $1 \
WHERE account_id = $2 AND table_name = $3",
)
.bind(&odt_json)
.bind(account_id)
.bind(&input.table_name)
.execute(&mut *tx)
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;
}

// Apply GSI updates (create/delete).
let mut created_index_ids: Vec<String> = Vec::new();
let mut deleted_index_ids: Vec<String> = Vec::new();
Expand Down
Loading
Loading