Skip to content

Commit 20215a6

Browse files
committed
better
1 parent f6c0a41 commit 20215a6

1 file changed

Lines changed: 107 additions & 14 deletions

File tree

uc-catalog/src/committer.rs

Lines changed: 107 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -81,20 +81,24 @@ impl<C: UCCommitClient + 'static> Committer for UCCommitter<C> {
8181
})
8282
.transpose()?,
8383
);
84+
let commit_version = commit_metadata.version();
8485
let handle = tokio::runtime::Handle::try_current().map_err(|_| {
8586
DeltaError::generic("UCCommitter may only be used within a tokio runtime")
8687
})?;
87-
tokio::task::block_in_place(|| {
88-
handle.block_on(async move {
89-
self.commits_client
90-
.commit(commit_req)
91-
.await
92-
.map_err(|e| DeltaError::Generic(format!("UC commit error: {e}")))
93-
})
94-
})?;
95-
Ok(CommitResponse::Committed {
96-
file_meta: committed,
97-
})
88+
let result = tokio::task::block_in_place(|| {
89+
handle.block_on(async move { self.commits_client.commit(commit_req).await })
90+
});
91+
match result {
92+
Ok(()) => Ok(CommitResponse::Committed {
93+
file_meta: committed,
94+
}),
95+
Err(uc_client::error::Error::ApiError { status: 409, .. }) => {
96+
Ok(CommitResponse::Conflict {
97+
version: commit_version,
98+
})
99+
}
100+
Err(e) => Err(DeltaError::Generic(format!("UC commit error: {e}"))),
101+
}
98102
}
99103

100104
fn is_catalog_committer(&self) -> bool {
@@ -130,11 +134,31 @@ mod tests {
130134
use std::fs;
131135
use uc_client::error::Result;
132136

133-
struct MockCommitsClient;
137+
struct MockCommitsClient {
138+
commit_result: std::sync::Mutex<Option<Result<()>>>,
139+
}
140+
141+
impl MockCommitsClient {
142+
fn always_unimplemented() -> Self {
143+
Self {
144+
commit_result: std::sync::Mutex::new(None),
145+
}
146+
}
147+
148+
fn with_commit_result(result: Result<()>) -> Self {
149+
Self {
150+
commit_result: std::sync::Mutex::new(Some(result)),
151+
}
152+
}
153+
}
134154

135155
impl UCCommitClient for MockCommitsClient {
136156
async fn commit(&self, _: CommitRequest) -> Result<()> {
137-
unimplemented!()
157+
self.commit_result
158+
.lock()
159+
.unwrap()
160+
.take()
161+
.expect("MockCommitsClient::commit called but no result was configured")
138162
}
139163
}
140164

@@ -153,6 +177,74 @@ mod tests {
153177
.unwrap()
154178
}
155179

180+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
181+
async fn test_commit_conflict_on_409() {
182+
use delta_kernel::schema::{DataType, StructField, StructType};
183+
use delta_kernel::transaction::create_table::create_table;
184+
use delta_kernel::transaction::CommitResult;
185+
186+
let tmp_dir = tempfile::tempdir().unwrap();
187+
let table_path = tmp_dir.path().to_str().unwrap();
188+
189+
let mock_client = MockCommitsClient::with_commit_result(Err(
190+
uc_client::error::Error::ApiError {
191+
status: 409,
192+
message: "Commit version already accepted. Current table version is 0".into(),
193+
},
194+
));
195+
let committer = UCCommitter::new(Arc::new(mock_client), "test_table_id");
196+
let engine = DefaultEngine::builder(Arc::new(LocalFileSystem::new())).build();
197+
198+
let schema = Arc::new(
199+
StructType::try_new(vec![StructField::new("id", DataType::INTEGER, false)]).unwrap(),
200+
);
201+
let result = create_table(table_path, schema, "test_engine")
202+
.build(&engine, Box::new(committer))
203+
.unwrap()
204+
.commit(&engine)
205+
.unwrap();
206+
207+
assert!(
208+
matches!(result, CommitResult::ConflictedTransaction(_)),
209+
"Expected ConflictedTransaction, got: {:?}",
210+
result
211+
);
212+
}
213+
214+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
215+
async fn test_commit_non_conflict_error_propagates() {
216+
use delta_kernel::schema::{DataType, StructField, StructType};
217+
use delta_kernel::transaction::create_table::create_table;
218+
219+
let tmp_dir = tempfile::tempdir().unwrap();
220+
let table_path = tmp_dir.path().to_str().unwrap();
221+
222+
let mock_client = MockCommitsClient::with_commit_result(Err(
223+
uc_client::error::Error::ApiError {
224+
status: 500,
225+
message: "Internal server error".into(),
226+
},
227+
));
228+
let committer = UCCommitter::new(Arc::new(mock_client), "test_table_id");
229+
let engine = DefaultEngine::builder(Arc::new(LocalFileSystem::new())).build();
230+
231+
let schema = Arc::new(
232+
StructType::try_new(vec![StructField::new("id", DataType::INTEGER, false)]).unwrap(),
233+
);
234+
let result = create_table(table_path, schema, "test_engine")
235+
.build(&engine, Box::new(committer))
236+
.unwrap()
237+
.commit(&engine);
238+
239+
assert!(result.is_err(), "Expected error for non-409 API error");
240+
let err_msg = result.unwrap_err().to_string();
241+
assert!(
242+
err_msg.contains("UC commit error"),
243+
"Error should contain 'UC commit error', got: {}",
244+
err_msg
245+
);
246+
}
247+
156248
#[tokio::test]
157249
async fn test_publish() {
158250
let tmp_dir = tempfile::tempdir().unwrap();
@@ -189,7 +281,8 @@ mod tests {
189281

190282
// ===== WHEN =====
191283
let publish_metadata = PublishMetadata::try_new(12, catalog_commits).unwrap();
192-
let committer = UCCommitter::new(Arc::new(MockCommitsClient), "testUcTableId");
284+
let committer =
285+
UCCommitter::new(Arc::new(MockCommitsClient::always_unimplemented()), "test_table_id");
193286
let engine = DefaultEngine::builder(Arc::new(LocalFileSystem::new())).build();
194287
committer.publish(&engine, publish_metadata).unwrap();
195288

0 commit comments

Comments
 (0)