Skip to content

Commit 5cd5a17

Browse files
committed
chore(hf): migrate to new XetSession API from xet-core PR #747
Replace low-level xet-data/xet-client usage with the new high-level session API from xet_pkg (hf-xet crate, lib name `xet`): - Single hf-xet dependency replaces xet-client + xet-data - ONE cached XetSession per HfCore (write token, serves both reads and writes); removes XetTokenRefresher and TranslatorConfig handling - Writer: XetUploadCommit + XetStreamUpload replace FileUploadSession + SingleFileCleaner; no Mutex needed since oio::Write is &mut self - Reader: XetDownloadStream replaces DownloadStream; range converted from BytesRange to Option<Range<u64>> for new download_stream() API
1 parent 71e0532 commit 5cd5a17

5 files changed

Lines changed: 100 additions & 167 deletions

File tree

core/Cargo.lock

Lines changed: 25 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/services/hf/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,7 @@ reqwest = { version = "0.12", default-features = false, features = [
4545
serde = { workspace = true, features = ["derive"] }
4646
serde_json = { workspace = true }
4747
tokio = { workspace = true, features = ["sync"] }
48-
xet-client = { git = "https://github.com/huggingface/xet-core.git", rev = "101837f6" }
49-
xet-data = { git = "https://github.com/huggingface/xet-core.git", rev = "101837f6" }
48+
hf-xet = { git = "https://github.com/huggingface/xet-core.git", rev = "60a916f74df506a5d10c0656ee3e86a8411ab4f6" }
5049

5150
[dev-dependencies]
5251
opendal-core = { path = "../../core", version = "0.55.0", features = [

core/services/hf/src/core.rs

Lines changed: 16 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,7 @@ use http::Response;
2727
use http::header;
2828
use serde::Deserialize;
2929

30-
use xet_client::cas_client::auth::TokenRefresher;
31-
use xet_data::processing::FileDownloadSession;
32-
use xet_data::processing::FileUploadSession;
33-
use xet_data::processing::XetFileInfo;
34-
use xet_data::processing::configurations::TranslatorConfig;
30+
use xet::xet_session::{XetFileInfo, XetSession, XetSessionBuilder};
3531

3632
use super::error::parse_error;
3733
use super::uri::HfRepo;
@@ -182,34 +178,6 @@ pub(super) struct XetToken {
182178
pub exp: u64,
183179
}
184180

185-
pub(super) struct XetTokenRefresher {
186-
core: HfCore,
187-
token_type: &'static str,
188-
}
189-
190-
impl XetTokenRefresher {
191-
pub(super) fn new(core: &HfCore, token_type: &'static str) -> Self {
192-
Self {
193-
core: core.clone(),
194-
token_type,
195-
}
196-
}
197-
}
198-
199-
#[async_trait::async_trait]
200-
impl TokenRefresher for XetTokenRefresher {
201-
async fn refresh(
202-
&self,
203-
) -> std::result::Result<(String, u64), xet_client::cas_client::auth::AuthError> {
204-
let token = self
205-
.core
206-
.xet_token(self.token_type)
207-
.await
208-
.map_err(xet_client::cas_client::auth::AuthError::token_refresh_failure)?;
209-
Ok((token.access_token, token.exp))
210-
}
211-
}
212-
213181
// Core HuggingFace client that manages API interactions, authentication
214182
// and shared logic for reader/writer/lister.
215183

@@ -226,8 +194,7 @@ pub struct HfCore {
226194
/// inspect headers on 302 responses.
227195
pub no_redirect_client: HttpClient,
228196

229-
upload_config: Arc<OnceCell<Arc<TranslatorConfig>>>,
230-
download_config: Arc<OnceCell<Arc<TranslatorConfig>>>,
197+
xet_session: Arc<OnceCell<XetSession>>,
231198
}
232199

233200
impl Debug for HfCore {
@@ -256,8 +223,7 @@ impl HfCore {
256223
token,
257224
endpoint,
258225
no_redirect_client,
259-
upload_config: Arc::new(OnceCell::new()),
260-
download_config: Arc::new(OnceCell::new()),
226+
xet_session: Arc::new(OnceCell::new()),
261227
}
262228
}
263229

@@ -352,52 +318,22 @@ impl HfCore {
352318
Ok(token)
353319
}
354320

355-
async fn xet_config(&self, token_type: &'static str) -> Result<Arc<TranslatorConfig>> {
356-
let cas_url = self.xet_token(token_type).await?.cas_url;
357-
let refresher = Arc::new(XetTokenRefresher::new(self, token_type));
358-
xet_data::processing::data_client::default_config(
359-
cas_url,
360-
None,
361-
Some(refresher as Arc<dyn TokenRefresher>),
362-
None,
363-
)
364-
.map(Arc::new)
365-
.map_err(|err| {
366-
Error::new(ErrorKind::Unexpected, "failed to create xet config").set_source(err)
367-
})
368-
}
369-
370-
async fn upload_config(&self) -> Result<Arc<TranslatorConfig>> {
371-
let this = self.clone();
372-
self.upload_config
373-
.get_or_try_init(|| async move { this.xet_config("write").await })
374-
.await
375-
.map(Arc::clone)
376-
}
377-
378-
async fn download_config(&self) -> Result<Arc<TranslatorConfig>> {
321+
pub(super) async fn xet_session(&self) -> Result<XetSession> {
379322
let this = self.clone();
380-
self.download_config
381-
.get_or_try_init(|| async move { this.xet_config("read").await })
382-
.await
383-
.map(Arc::clone)
384-
}
385-
386-
pub(super) async fn xet_upload_session(&self) -> Result<Arc<FileUploadSession>> {
387-
FileUploadSession::new(self.upload_config().await?)
388-
.await
389-
.map_err(|err| {
390-
Error::new(ErrorKind::Unexpected, "failed to create upload session").set_source(err)
323+
self.xet_session
324+
.get_or_try_init(|| async move {
325+
let token = this.xet_token("write").await?;
326+
XetSessionBuilder::new()
327+
.with_endpoint(token.cas_url)
328+
.with_token_info(token.access_token, token.exp)
329+
.build()
330+
.map_err(|err| {
331+
Error::new(ErrorKind::Unexpected, "failed to create xet session")
332+
.set_source(err)
333+
})
391334
})
392-
}
393-
394-
pub(super) async fn xet_download_session(&self) -> Result<Arc<FileDownloadSession>> {
395-
FileDownloadSession::new(self.download_config().await?)
396335
.await
397-
.map_err(|err| {
398-
Error::new(ErrorKind::Unexpected, "failed to create download session")
399-
.set_source(err)
400-
})
336+
.cloned()
401337
}
402338

403339
/// Issue a HEAD request and extract XET file info (hash and size).

core/services/hf/src/reader.rs

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@ use http::Response;
1919
use http::StatusCode;
2020
use http::header;
2121

22-
use xet_client::ClientError;
23-
use xet_data::file_reconstruction::FileReconstructionError;
24-
use xet_data::processing::DownloadStream;
25-
use xet_data::processing::XetFileInfo;
22+
use xet::xet_session::{SessionError, XetDownloadStream, XetFileInfo};
2623

2724
use super::core::HfCore;
2825
use super::uri::RepoType;
@@ -31,7 +28,7 @@ use opendal_core::*;
3128

3229
pub enum HfReader {
3330
Http(HttpBody),
34-
Xet(DownloadStream),
31+
Xet(XetDownloadStream),
3532
}
3633

3734
impl HfReader {
@@ -86,32 +83,28 @@ impl HfReader {
8683
file_info: &XetFileInfo,
8784
range: BytesRange,
8885
) -> Result<Self> {
89-
let session = core.xet_download_session().await?;
90-
91-
let (_id, stream) = session
92-
.download_stream_range(file_info, range.to_range())
86+
let session = core.xet_session().await?;
87+
let xet_range = if range.is_full() {
88+
None
89+
} else {
90+
let start = range.offset();
91+
let end = range.size().map(|s| start + s).unwrap_or(u64::MAX);
92+
Some(start..end)
93+
};
94+
let mut stream = session
95+
.download_stream(file_info.clone(), xet_range)
9396
.await
9497
.map_err(|err| {
95-
Error::new(
96-
ErrorKind::Unexpected,
97-
"failed to create xet download stream",
98-
)
99-
.set_source(err)
98+
Error::new(ErrorKind::Unexpected, "failed to create xet download stream")
99+
.set_source(err)
100100
})?;
101+
stream.start();
101102
Ok(Self::Xet(stream))
102103
}
103104
}
104105

105-
fn map_reconstruction_error(e: FileReconstructionError) -> Error {
106-
let kind = match &e {
107-
FileReconstructionError::ClientError(arc) => match arc.as_ref() {
108-
ClientError::FileNotFound(_) | ClientError::XORBNotFound(_) => ErrorKind::NotFound,
109-
ClientError::InvalidRange => ErrorKind::RangeNotSatisfied,
110-
_ => ErrorKind::Unexpected,
111-
},
112-
_ => ErrorKind::Unexpected,
113-
};
114-
Error::new(kind, "xet read error").set_source(e)
106+
fn map_session_error(e: SessionError) -> Error {
107+
Error::new(ErrorKind::Unexpected, "xet read error").set_source(e)
115108
}
116109

117110
impl oio::Read for HfReader {
@@ -121,7 +114,7 @@ impl oio::Read for HfReader {
121114
Self::Xet(stream) => match stream.next().await {
122115
Ok(Some(bytes)) => Ok(Buffer::from(bytes)),
123116
Ok(None) => Ok(Buffer::new()),
124-
Err(e) => Err(map_reconstruction_error(e)),
117+
Err(e) => Err(map_session_error(e)),
125118
},
126119
}
127120
}

0 commit comments

Comments
 (0)