Skip to content

Commit 5b9f869

Browse files
authored
fix(namespace): align error handling with namespace spec (#6575)
## Summary - Fix RestNamespace to parse the spec-defined flat error response format using the generated `ErrorResponse` model, instead of guessing errors from HTTP status codes - Fix DirectoryNamespace to return correct `NamespaceError` variants per the spec's per-operation error tables - Fix REST adapter to produce spec-compliant error responses using `ErrorResponse` model, and correct HTTP status code mappings (406 for Unsupported, 409 for NamespaceNotEmpty/InvalidTableState) - Rename `ErrorCode::Throttled` to `ErrorCode::Throttling` to match Python/Java SDK naming - Use `{:?}` (Debug) formatting for all source errors to preserve the full error chain
1 parent 462faf7 commit 5b9f869

5 files changed

Lines changed: 219 additions & 268 deletions

File tree

rust/lance-namespace-impls/src/dir.rs

Lines changed: 99 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -783,7 +783,7 @@ impl DirectoryNamespaceBuilder {
783783
.await
784784
.map_err(|e| {
785785
lance_core::Error::from(NamespaceError::Internal {
786-
message: format!("Failed to create object store: {}", e),
786+
message: format!("Failed to create object store: {:?}", e),
787787
})
788788
})?;
789789

@@ -927,7 +927,7 @@ impl DirectoryNamespace {
927927
.await
928928
.map_err(|e| {
929929
lance_core::Error::from(NamespaceError::Internal {
930-
message: format!("Failed to list directory: {}", e),
930+
message: format!("Failed to list directory: {:?}", e),
931931
})
932932
})?;
933933

@@ -1068,7 +1068,7 @@ impl DirectoryNamespace {
10681068
}
10691069

10701070
if status.is_deregistered {
1071-
return Err(NamespaceError::InvalidTableState {
1071+
return Err(NamespaceError::TableNotFound {
10721072
message: format!("Table is deregistered: {}", table_id),
10731073
}
10741074
.into());
@@ -1118,7 +1118,17 @@ impl DirectoryNamespace {
11181118
Ok(mut dataset) => {
11191119
// If a specific version is requested, checkout that version
11201120
if let Some(requested_version) = request.version {
1121-
dataset = dataset.checkout_version(requested_version as u64).await?;
1121+
dataset = dataset
1122+
.checkout_version(requested_version as u64)
1123+
.await
1124+
.map_err(|e| {
1125+
lance_core::Error::from(NamespaceError::TableVersionNotFound {
1126+
message: format!(
1127+
"Version {} not found for table '{}': {}",
1128+
requested_version, table_name, e
1129+
),
1130+
})
1131+
})?;
11221132
}
11231133

11241134
let version_info = dataset.version();
@@ -1523,7 +1533,7 @@ impl DirectoryNamespace {
15231533
.read_transaction_by_version(version)
15241534
.await
15251535
.map_err(|e| {
1526-
lance_core::Error::from(NamespaceError::Internal {
1536+
lance_core::Error::from(NamespaceError::TransactionNotFound {
15271537
message: format!(
15281538
"Failed to read transaction for version {}: {}",
15291539
version, e
@@ -1657,7 +1667,7 @@ impl DirectoryNamespace {
16571667
| Err(ObjectStoreError::Precondition { .. }) => {
16581668
Err(format!("{} already exists", file_description))
16591669
}
1660-
Err(e) => Err(format!("Failed to create {}: {}", file_description, e)),
1670+
Err(e) => Err(format!("Failed to create {}: {:?}", file_description, e)),
16611671
}
16621672
}
16631673

@@ -2226,7 +2236,7 @@ impl LanceNamespace for DirectoryNamespace {
22262236
}
22272237

22282238
if status.is_deregistered {
2229-
return Err(NamespaceError::InvalidTableState {
2239+
return Err(NamespaceError::TableNotFound {
22302240
message: format!("Table is deregistered: {}", table_id),
22312241
}
22322242
.into());
@@ -2250,7 +2260,7 @@ impl LanceNamespace for DirectoryNamespace {
22502260
.await
22512261
.map_err(|e| {
22522262
lance_core::Error::from(NamespaceError::Internal {
2253-
message: format!("Failed to drop table {}: {}", table_name, e),
2263+
message: format!("Failed to drop table {}: {:?}", table_name, e),
22542264
})
22552265
})?;
22562266

@@ -2284,7 +2294,7 @@ impl LanceNamespace for DirectoryNamespace {
22842294
let cursor = Cursor::new(request_data.to_vec());
22852295
let stream_reader = StreamReader::try_new(cursor, None).map_err(|e| {
22862296
lance_core::Error::from(NamespaceError::InvalidInput {
2287-
message: format!("Invalid Arrow IPC stream: {}", e),
2297+
message: format!("Invalid Arrow IPC stream: {:?}", e),
22882298
})
22892299
})?;
22902300
let arrow_schema = stream_reader.schema();
@@ -2294,7 +2304,7 @@ impl LanceNamespace for DirectoryNamespace {
22942304
for batch_result in stream_reader {
22952305
batches.push(batch_result.map_err(|e| {
22962306
lance_core::Error::from(NamespaceError::InvalidInput {
2297-
message: format!("Failed to read batch from IPC stream: {}", e),
2307+
message: format!("Failed to read batch from IPC stream: {:?}", e),
22982308
})
22992309
})?);
23002310
}
@@ -2326,9 +2336,17 @@ impl LanceNamespace for DirectoryNamespace {
23262336
Dataset::write(reader, &table_uri, Some(write_params))
23272337
.await
23282338
.map_err(|e| {
2329-
lance_core::Error::from(NamespaceError::Internal {
2330-
message: format!("Failed to create Lance dataset: {}", e),
2331-
})
2339+
let err_msg = format!("{}", e);
2340+
let ns_err = if err_msg.contains("already exists") {
2341+
NamespaceError::TableAlreadyExists {
2342+
message: format!("Table already exists at '{}': {:?}", table_uri, e),
2343+
}
2344+
} else {
2345+
NamespaceError::Internal {
2346+
message: format!("Failed to create Lance dataset: {:?}", e),
2347+
}
2348+
};
2349+
lance_core::Error::from(ns_err)
23322350
})?;
23332351

23342352
Ok(CreateTableResponse {
@@ -2466,7 +2484,7 @@ impl LanceNamespace for DirectoryNamespace {
24662484
}
24672485

24682486
if status.is_deregistered {
2469-
return Err(NamespaceError::InvalidTableState {
2487+
return Err(NamespaceError::TableNotFound {
24702488
message: format!("Table is already deregistered: {}", table_name),
24712489
}
24722490
.into());
@@ -2759,8 +2777,8 @@ impl LanceNamespace for DirectoryNamespace {
27592777
builder = builder.with_session(sess.clone());
27602778
}
27612779
let mut dataset = builder.load().await.map_err(|e| {
2762-
lance_core::Error::from(NamespaceError::Internal {
2763-
message: format!("Failed to open table at '{}': {}", table_uri, e),
2780+
lance_core::Error::from(NamespaceError::TableNotFound {
2781+
message: format!("Failed to open table at '{}': {:?}", table_uri, e),
27642782
})
27652783
})?;
27662784

@@ -2905,16 +2923,36 @@ impl LanceNamespace for DirectoryNamespace {
29052923
)
29062924
.await
29072925
.map_err(|e| {
2908-
lance_core::Error::from(NamespaceError::Internal {
2909-
message: format!(
2910-
"Failed to create {} index '{}' on column '{}' for table '{}': {}",
2911-
request.index_type,
2912-
request.name.as_deref().unwrap_or("<auto-generated>"),
2913-
request.column,
2914-
table_uri,
2915-
e
2916-
),
2917-
})
2926+
let err_msg = format!("{}", e);
2927+
let ns_err = if err_msg.contains("already exists") {
2928+
NamespaceError::TableIndexAlreadyExists {
2929+
message: format!(
2930+
"Index '{}' already exists on table '{}': {:?}",
2931+
request.name.as_deref().unwrap_or("<auto-generated>"),
2932+
table_uri,
2933+
e
2934+
),
2935+
}
2936+
} else if err_msg.contains("not found") || err_msg.contains("does not exist") {
2937+
NamespaceError::TableColumnNotFound {
2938+
message: format!(
2939+
"Column '{}' not found for table '{}': {:?}",
2940+
request.column, table_uri, e
2941+
),
2942+
}
2943+
} else {
2944+
NamespaceError::Internal {
2945+
message: format!(
2946+
"Failed to create {} index '{}' on column '{}' for table '{}': {:?}",
2947+
request.index_type,
2948+
request.name.as_deref().unwrap_or("<auto-generated>"),
2949+
request.column,
2950+
table_uri,
2951+
e
2952+
),
2953+
}
2954+
};
2955+
lance_core::Error::from(ns_err)
29182956
})?;
29192957

29202958
let transaction_id = dataset
@@ -2947,7 +2985,7 @@ impl LanceNamespace for DirectoryNamespace {
29472985
.await
29482986
.map_err(|e| {
29492987
lance_core::Error::from(NamespaceError::Internal {
2950-
message: format!("Failed to describe table indices for '{}': {}", table_uri, e),
2988+
message: format!("Failed to describe table indices for '{}': {:?}", table_uri, e),
29512989
})
29522990
})?
29532991
.into_iter()
@@ -3018,7 +3056,7 @@ impl LanceNamespace for DirectoryNamespace {
30183056
.load_indices_by_name(index_name)
30193057
.await
30203058
.map_err(|e| {
3021-
lance_core::Error::from(NamespaceError::Internal {
3059+
lance_core::Error::from(NamespaceError::TableIndexNotFound {
30223060
message: format!(
30233061
"Failed to load index '{}' metadata for table '{}': {}",
30243062
index_name, table_uri, e
@@ -3035,7 +3073,7 @@ impl LanceNamespace for DirectoryNamespace {
30353073
let stats = <Dataset as DatasetIndexExt>::index_statistics(&dataset, index_name)
30363074
.await
30373075
.map_err(|e| {
3038-
lance_core::Error::from(NamespaceError::Internal {
3076+
lance_core::Error::from(NamespaceError::TableIndexNotFound {
30393077
message: format!(
30403078
"Failed to describe index statistics for '{}' on table '{}': {}",
30413079
index_name, table_uri, e
@@ -3126,7 +3164,7 @@ impl LanceNamespace for DirectoryNamespace {
31263164
.load_indices_by_name(index_name)
31273165
.await
31283166
.map_err(|e| {
3129-
lance_core::Error::from(NamespaceError::Internal {
3167+
lance_core::Error::from(NamespaceError::TableIndexNotFound {
31303168
message: format!(
31313169
"Failed to load index '{}' before dropping it from table '{}': {}",
31323170
index_name, table_uri, e
@@ -3144,7 +3182,7 @@ impl LanceNamespace for DirectoryNamespace {
31443182
}
31453183

31463184
dataset.drop_index(index_name).await.map_err(|e| {
3147-
lance_core::Error::from(NamespaceError::Internal {
3185+
lance_core::Error::from(NamespaceError::TableIndexNotFound {
31483186
message: format!(
31493187
"Failed to drop index '{}' from table '{}': {}",
31503188
index_name, table_uri, e
@@ -3455,7 +3493,7 @@ impl LanceNamespace for DirectoryNamespace {
34553493
.count_rows(request.predicate)
34563494
.await
34573495
.map_err(|e| NamespaceError::Internal {
3458-
message: format!("Failed to count rows for table at '{}': {}", table_uri, e),
3496+
message: format!("Failed to count rows for table at '{}': {:?}", table_uri, e),
34593497
})?;
34603498

34613499
Ok(count as i64)
@@ -3480,14 +3518,14 @@ impl LanceNamespace for DirectoryNamespace {
34803518
let cursor = Cursor::new(request_data.as_ref());
34813519
let stream_reader =
34823520
StreamReader::try_new(cursor, None).map_err(|e| NamespaceError::InvalidInput {
3483-
message: format!("Invalid Arrow IPC stream: {}", e),
3521+
message: format!("Invalid Arrow IPC stream: {:?}", e),
34843522
})?;
34853523
let arrow_schema = stream_reader.schema();
34863524

34873525
let mut batches = Vec::new();
34883526
for batch_result in stream_reader {
3489-
batches.push(batch_result.map_err(|e| NamespaceError::Internal {
3490-
message: format!("Failed to read batch from IPC stream: {}", e),
3527+
batches.push(batch_result.map_err(|e| NamespaceError::InvalidInput {
3528+
message: format!("Failed to read batch from IPC stream: {:?}", e),
34913529
})?);
34923530
}
34933531

@@ -3531,8 +3569,20 @@ impl LanceNamespace for DirectoryNamespace {
35313569

35323570
Dataset::write(reader, &table_uri, Some(write_params))
35333571
.await
3534-
.map_err(|e| NamespaceError::Internal {
3535-
message: format!("Failed to insert into table at '{}': {}", table_uri, e),
3572+
.map_err(|e| {
3573+
let err_msg = format!("{}", e);
3574+
if err_msg.contains("conflict") || err_msg.contains("CommitConflict") {
3575+
NamespaceError::ConcurrentModification {
3576+
message: format!(
3577+
"Concurrent modification on table at '{}': {:?}",
3578+
table_uri, e
3579+
),
3580+
}
3581+
} else {
3582+
NamespaceError::Internal {
3583+
message: format!("Failed to insert into table at '{}': {:?}", table_uri, e),
3584+
}
3585+
}
35363586
})?;
35373587

35383588
Ok(InsertIntoTableResponse {
@@ -3600,7 +3650,7 @@ impl LanceNamespace for DirectoryNamespace {
36003650
scanner
36013651
.nearest(vector_column, &query_array, k)
36023652
.map_err(|e| NamespaceError::InvalidInput {
3603-
message: format!("Invalid vector search: {}", e),
3653+
message: format!("Invalid vector search: {:?}", e),
36043654
})?;
36053655

36063656
// Apply distance type if specified
@@ -3665,14 +3715,14 @@ impl LanceNamespace for DirectoryNamespace {
36653715
fts = fts
36663716
.with_columns(columns)
36673717
.map_err(|e| NamespaceError::InvalidInput {
3668-
message: format!("Invalid FTS columns: {}", e),
3718+
message: format!("Invalid FTS columns: {:?}", e),
36693719
})?;
36703720
}
36713721

36723722
scanner
36733723
.full_text_search(fts)
36743724
.map_err(|e| NamespaceError::InvalidInput {
3675-
message: format!("Invalid full text search: {}", e),
3725+
message: format!("Invalid full text search: {:?}", e),
36763726
})?;
36773727
}
36783728
// Note: structured_query would require more complex parsing
@@ -3687,7 +3737,7 @@ impl LanceNamespace for DirectoryNamespace {
36873737
scanner
36883738
.project(column_names)
36893739
.map_err(|e| NamespaceError::InvalidInput {
3690-
message: format!("Invalid column projection: {}", e),
3740+
message: format!("Invalid column projection: {:?}", e),
36913741
})?;
36923742
} else if let Some(ref column_aliases) = columns.column_aliases
36933743
&& !column_aliases.is_empty()
@@ -3705,7 +3755,7 @@ impl LanceNamespace for DirectoryNamespace {
37053755
.collect::<Vec<_>>(),
37063756
)
37073757
.map_err(|e| NamespaceError::InvalidInput {
3708-
message: format!("Invalid column alias expression: {}", e),
3758+
message: format!("Invalid column alias expression: {:?}", e),
37093759
})?;
37103760
}
37113761
}
@@ -3717,7 +3767,7 @@ impl LanceNamespace for DirectoryNamespace {
37173767
scanner
37183768
.filter(filter)
37193769
.map_err(|e| NamespaceError::InvalidInput {
3720-
message: format!("Invalid filter expression: {}", e),
3770+
message: format!("Invalid filter expression: {:?}", e),
37213771
})?;
37223772
}
37233773

@@ -3733,7 +3783,7 @@ impl LanceNamespace for DirectoryNamespace {
37333783
let offset = request.offset.map(|o| o as i64);
37343784
scanner.limit(Some(request.k as i64), offset).map_err(|e| {
37353785
NamespaceError::InvalidInput {
3736-
message: format!("Invalid limit/offset: {}", e),
3786+
message: format!("Invalid limit/offset: {:?}", e),
37373787
}
37383788
})?;
37393789
} else if has_vector_query && request.offset.is_some() {
@@ -3742,7 +3792,7 @@ impl LanceNamespace for DirectoryNamespace {
37423792
scanner
37433793
.limit(None, offset)
37443794
.map_err(|e| NamespaceError::InvalidInput {
3745-
message: format!("Invalid offset: {}", e),
3795+
message: format!("Invalid offset: {:?}", e),
37463796
})?;
37473797
}
37483798

@@ -3751,7 +3801,7 @@ impl LanceNamespace for DirectoryNamespace {
37513801
.try_into_batch()
37523802
.await
37533803
.map_err(|e| NamespaceError::Internal {
3754-
message: format!("Failed to execute query: {}", e),
3804+
message: format!("Failed to execute query: {:?}", e),
37553805
})?;
37563806

37573807
// Serialize to Arrow IPC file format
@@ -3760,14 +3810,14 @@ impl LanceNamespace for DirectoryNamespace {
37603810
{
37613811
let mut writer = FileWriter::try_new(&mut buffer, &schema).map_err(|e| {
37623812
NamespaceError::Internal {
3763-
message: format!("Failed to create IPC writer: {}", e),
3813+
message: format!("Failed to create IPC writer: {:?}", e),
37643814
}
37653815
})?;
37663816
writer.write(&batch).map_err(|e| NamespaceError::Internal {
3767-
message: format!("Failed to write batch to IPC: {}", e),
3817+
message: format!("Failed to write batch to IPC: {:?}", e),
37683818
})?;
37693819
writer.finish().map_err(|e| NamespaceError::Internal {
3770-
message: format!("Failed to finish IPC writer: {}", e),
3820+
message: format!("Failed to finish IPC writer: {:?}", e),
37713821
})?;
37723822
}
37733823

0 commit comments

Comments
 (0)