Skip to content

Commit 71e0532

Browse files
committed
refactor(hf): cache TranslatorConfig instead of sessions, serialize integration tests
- Cache Arc<TranslatorConfig> per read/write direction; create a fresh FileUploadSession/FileDownloadSession per operation so that finalize() on one write doesn't poison the next - Factor repeated config construction into xet_config(token_type) - Mark all write/delete integration tests with #[serial] to prevent 412 "commit has happened since" races on the shared test repo - Add serial_test dev-dependency
1 parent c8e8c1a commit 71e0532

5 files changed

Lines changed: 114 additions & 54 deletions

File tree

core/Cargo.lock

Lines changed: 42 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/services/hf/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,5 @@ opendal-core = { path = "../../core", version = "0.55.0", features = [
5353
"reqwest-rustls-tls",
5454
] }
5555
serde_json = { workspace = true }
56+
serial_test = "3"
5657
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }

core/services/hf/src/core.rs

Lines changed: 45 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use xet_client::cas_client::auth::TokenRefresher;
3131
use xet_data::processing::FileDownloadSession;
3232
use xet_data::processing::FileUploadSession;
3333
use xet_data::processing::XetFileInfo;
34+
use xet_data::processing::configurations::TranslatorConfig;
3435

3536
use super::error::parse_error;
3637
use super::uri::HfRepo;
@@ -225,8 +226,8 @@ pub struct HfCore {
225226
/// inspect headers on 302 responses.
226227
pub no_redirect_client: HttpClient,
227228

228-
download_session: Arc<OnceCell<Arc<FileDownloadSession>>>,
229-
upload_session: Arc<OnceCell<Arc<FileUploadSession>>>,
229+
upload_config: Arc<OnceCell<Arc<TranslatorConfig>>>,
230+
download_config: Arc<OnceCell<Arc<TranslatorConfig>>>,
230231
}
231232

232233
impl Debug for HfCore {
@@ -255,8 +256,8 @@ impl HfCore {
255256
token,
256257
endpoint,
257258
no_redirect_client,
258-
download_session: Arc::new(OnceCell::new()),
259-
upload_session: Arc::new(OnceCell::new()),
259+
upload_config: Arc::new(OnceCell::new()),
260+
download_config: Arc::new(OnceCell::new()),
260261
}
261262
}
262263

@@ -351,58 +352,54 @@ impl HfCore {
351352
Ok(token)
352353
}
353354

354-
pub(super) async fn xet_upload_session(&self) -> Result<Arc<FileUploadSession>> {
355-
self.upload_session
356-
.get_or_try_init(|| async {
357-
let cas_url = self.xet_token("write").await?.cas_url;
358-
let refresher = Arc::new(XetTokenRefresher::new(self, "write"));
359-
let config = xet_data::processing::data_client::default_config(
360-
cas_url,
361-
None,
362-
Some(refresher as Arc<dyn TokenRefresher>),
363-
None,
364-
)
365-
.map_err(|err| {
366-
Error::new(ErrorKind::Unexpected, "failed to create upload config")
367-
.set_source(err)
368-
})?;
369-
FileUploadSession::new(Arc::new(config))
370-
.await
371-
.map_err(|err| {
372-
Error::new(ErrorKind::Unexpected, "failed to create upload session")
373-
.set_source(err)
374-
})
375-
})
355+
async fn xet_config(&self, token_type: &'static str) -> Result<Arc<TranslatorConfig>> {
356+
let cas_url = self.xet_token(token_type).await?.cas_url;
357+
let refresher = Arc::new(XetTokenRefresher::new(self, token_type));
358+
xet_data::processing::data_client::default_config(
359+
cas_url,
360+
None,
361+
Some(refresher as Arc<dyn TokenRefresher>),
362+
None,
363+
)
364+
.map(Arc::new)
365+
.map_err(|err| {
366+
Error::new(ErrorKind::Unexpected, "failed to create xet config").set_source(err)
367+
})
368+
}
369+
370+
async fn upload_config(&self) -> Result<Arc<TranslatorConfig>> {
371+
let this = self.clone();
372+
self.upload_config
373+
.get_or_try_init(|| async move { this.xet_config("write").await })
376374
.await
377375
.map(Arc::clone)
378376
}
379377

380-
pub(super) async fn xet_download_session(&self) -> Result<Arc<FileDownloadSession>> {
381-
self.download_session
382-
.get_or_try_init(|| async {
383-
let cas_url = self.xet_token("read").await?.cas_url;
384-
let refresher = Arc::new(XetTokenRefresher::new(self, "read"));
385-
let config = xet_data::processing::data_client::default_config(
386-
cas_url,
387-
None,
388-
Some(refresher as Arc<dyn TokenRefresher>),
389-
None,
390-
)
391-
.map_err(|err| {
392-
Error::new(ErrorKind::Unexpected, "failed to create download config")
393-
.set_source(err)
394-
})?;
395-
FileDownloadSession::new(Arc::new(config))
396-
.await
397-
.map_err(|err| {
398-
Error::new(ErrorKind::Unexpected, "failed to create download session")
399-
.set_source(err)
400-
})
401-
})
378+
async fn download_config(&self) -> Result<Arc<TranslatorConfig>> {
379+
let this = self.clone();
380+
self.download_config
381+
.get_or_try_init(|| async move { this.xet_config("read").await })
402382
.await
403383
.map(Arc::clone)
404384
}
405385

386+
pub(super) async fn xet_upload_session(&self) -> Result<Arc<FileUploadSession>> {
387+
FileUploadSession::new(self.upload_config().await?)
388+
.await
389+
.map_err(|err| {
390+
Error::new(ErrorKind::Unexpected, "failed to create upload session").set_source(err)
391+
})
392+
}
393+
394+
pub(super) async fn xet_download_session(&self) -> Result<Arc<FileDownloadSession>> {
395+
FileDownloadSession::new(self.download_config().await?)
396+
.await
397+
.map_err(|err| {
398+
Error::new(ErrorKind::Unexpected, "failed to create download session")
399+
.set_source(err)
400+
})
401+
}
402+
406403
/// Issue a HEAD request and extract XET file info (hash and size).
407404
///
408405
/// Returns `None` if the `X-Xet-Hash` header is absent or empty.

core/services/hf/src/deleter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,11 @@ impl oio::BatchDelete for HfDeleter {
6363
mod tests {
6464
use super::super::backend::test_utils::testing_operator;
6565
use opendal_core::*;
66+
use serial_test::serial;
6667

6768
#[tokio::test]
6869
#[ignore]
70+
#[serial]
6971
async fn test_delete_nonexistent() {
7072
let op = testing_operator();
7173

@@ -76,6 +78,7 @@ mod tests {
7678

7779
#[tokio::test]
7880
#[ignore]
81+
#[serial]
7982
async fn test_delete_then_read() {
8083
let op = testing_operator();
8184
let path = "tests/delete-then-read.txt";
@@ -95,6 +98,7 @@ mod tests {
9598

9699
#[tokio::test]
97100
#[ignore]
101+
#[serial]
98102
async fn test_batch_delete() {
99103
let op = testing_operator();
100104
let paths = [

core/services/hf/src/writer.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,13 @@ impl oio::Write for HfWriter {
9595
}
9696
HfWriter::Xet { cleaner, .. } => {
9797
// get_mut() required here: lock() would hold a MutexGuard across .await
98-
let cleaner = cleaner.get_mut().expect("mutex poisoned")
99-
.as_mut().expect("cleaner missing");
98+
let cleaner = cleaner
99+
.get_mut()
100+
.expect("mutex poisoned")
101+
.as_mut()
102+
.expect("cleaner missing");
100103
cleaner.add_data(&bs.to_bytes()).await.map_err(|err| {
101-
Error::new(ErrorKind::Unexpected, "failed to write xet chunk")
102-
.set_source(err)
104+
Error::new(ErrorKind::Unexpected, "failed to write xet chunk").set_source(err)
103105
})
104106
}
105107
}
@@ -136,8 +138,8 @@ impl oio::Write for HfWriter {
136138
.expect("mutex poisoned")
137139
.take()
138140
.ok_or_else(|| {
139-
Error::new(ErrorKind::Unexpected, "xet writer already closed")
140-
})?;
141+
Error::new(ErrorKind::Unexpected, "xet writer already closed")
142+
})?;
141143

142144
let (file_info, _metrics) = cleaner.finish().await.map_err(|err| {
143145
Error::new(ErrorKind::Unexpected, "failed to finish xet upload").set_source(err)
@@ -211,6 +213,7 @@ mod tests {
211213
use super::super::backend::test_utils::testing_operator;
212214
use super::*;
213215
use base64::Engine;
216+
use serial_test::serial;
214217

215218
// --- Unit tests (no network required) ---
216219

@@ -243,6 +246,7 @@ mod tests {
243246

244247
#[tokio::test]
245248
#[ignore]
249+
#[serial]
246250
async fn test_write_http() {
247251
let op = testing_operator();
248252
op.write("test-file.txt", b"Hello, HuggingFace!".as_slice())
@@ -252,6 +256,7 @@ mod tests {
252256

253257
#[tokio::test]
254258
#[ignore]
259+
#[serial]
255260
async fn test_write_http_with_content_type() {
256261
let op = testing_operator();
257262
op.write_with("test.json", br#"{"test": "data"}"#.as_slice())
@@ -262,6 +267,7 @@ mod tests {
262267

263268
#[tokio::test]
264269
#[ignore]
270+
#[serial]
265271
async fn test_write_xet() {
266272
let op = testing_operator();
267273
op.write("test-xet.bin", b"Binary data for XET test".as_slice())
@@ -273,6 +279,7 @@ mod tests {
273279
/// in commit) and verify the content roundtrips correctly.
274280
#[tokio::test]
275281
#[ignore]
282+
#[serial]
276283
async fn test_write_regular_roundtrip() {
277284
let op = testing_operator();
278285
let path = "tests/regular-roundtrip.txt";
@@ -292,6 +299,7 @@ mod tests {
292299
/// Verify stat returns correct metadata after writing.
293300
#[tokio::test]
294301
#[ignore]
302+
#[serial]
295303
async fn test_write_and_stat() {
296304
let op = testing_operator();
297305
let path = "tests/stat-after-write.txt";
@@ -310,6 +318,7 @@ mod tests {
310318
/// Overwriting an existing file should replace its content.
311319
#[tokio::test]
312320
#[ignore]
321+
#[serial]
313322
async fn test_write_overwrite() {
314323
let op = testing_operator();
315324
let path = "tests/overwrite-test.txt";
@@ -331,6 +340,7 @@ mod tests {
331340
/// Full lifecycle: write → stat → read → delete → confirm gone.
332341
#[tokio::test]
333342
#[ignore]
343+
#[serial]
334344
async fn test_write_delete_lifecycle() {
335345
let op = testing_operator();
336346
let path = "tests/lifecycle-test.txt";
@@ -351,6 +361,7 @@ mod tests {
351361
/// Write an empty (0-byte) file and verify roundtrip.
352362
#[tokio::test]
353363
#[ignore]
364+
#[serial]
354365
async fn test_write_empty_file_roundtrip() {
355366
let op = testing_operator();
356367
let path = "tests/empty-file.txt";
@@ -372,6 +383,7 @@ mod tests {
372383
/// HuggingFace creates intermediate directories implicitly.
373384
#[tokio::test]
374385
#[ignore]
386+
#[serial]
375387
async fn test_write_nested_directory() {
376388
let op = testing_operator();
377389
let path = "tests/deep/nested/dir/file.txt";
@@ -390,6 +402,7 @@ mod tests {
390402
/// Write a file with special characters in the path.
391403
#[tokio::test]
392404
#[ignore]
405+
#[serial]
393406
async fn test_write_special_characters_in_path() {
394407
let op = testing_operator();
395408
let path = "tests/special chars (1).txt";
@@ -409,6 +422,7 @@ mod tests {
409422

410423
#[tokio::test]
411424
#[ignore]
425+
#[serial]
412426
async fn test_bucket_write() {
413427
let op = testing_bucket_operator();
414428
let path = "test-bucket-file.txt";
@@ -426,6 +440,7 @@ mod tests {
426440

427441
#[tokio::test]
428442
#[ignore]
443+
#[serial]
429444
async fn test_bucket_write_roundtrip() {
430445
let op = testing_bucket_operator();
431446
let path = "tests/bucket-roundtrip.bin";
@@ -446,6 +461,7 @@ mod tests {
446461

447462
#[tokio::test]
448463
#[ignore]
464+
#[serial]
449465
async fn test_bucket_overwrite() {
450466
let op = testing_bucket_operator();
451467
let path = "tests/bucket-overwrite.txt";

0 commit comments

Comments
 (0)