Skip to content

Commit 4efc137

Browse files
committed
chore(hf): migrate to new XetSession API from xet-core PR #747
Replace low-level xet-data/xet-client usage with the new high-level session API from xet_pkg (hf-xet crate, lib name `xet`): - Single hf-xet dependency replaces xet-client + xet-data - ONE cached XetSession per HfCore (write token, serves both reads and writes); removes XetTokenRefresher and TranslatorConfig handling - Writer: XetUploadCommit + XetStreamUpload replace FileUploadSession + SingleFileCleaner; no Mutex needed since oio::Write is &mut self - Reader: XetDownloadStream replaces DownloadStream; range converted from BytesRange to Option<Range<u64>> for new download_stream() API
1 parent 9d27503 commit 4efc137

4 files changed

Lines changed: 75 additions & 161 deletions

File tree

core/services/hf/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,7 @@ reqwest = { version = "0.12", default-features = false, features = [
4545
serde = { workspace = true, features = ["derive"] }
4646
serde_json = { workspace = true }
4747
tokio = { workspace = true, features = ["sync"] }
48-
xet-client = { git = "https://github.com/huggingface/xet-core.git", rev = "101837f6" }
49-
xet-data = { git = "https://github.com/huggingface/xet-core.git", rev = "101837f6" }
48+
hf-xet = { git = "https://github.com/huggingface/xet-core.git", rev = "60a916f74df506a5d10c0656ee3e86a8411ab4f6" }
5049

5150
[dev-dependencies]
5251
opendal-core = { path = "../../core", version = "0.55.0", features = [

core/services/hf/src/core.rs

Lines changed: 16 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,7 @@ use http::Response;
2727
use http::header;
2828
use serde::Deserialize;
2929

30-
use xet_client::cas_client::auth::TokenRefresher;
31-
use xet_data::processing::FileDownloadSession;
32-
use xet_data::processing::FileUploadSession;
33-
use xet_data::processing::XetFileInfo;
34-
use xet_data::processing::configurations::TranslatorConfig;
30+
use xet::xet_session::{XetFileInfo, XetSession, XetSessionBuilder};
3531

3632
use super::error::parse_error;
3733
use super::uri::HfRepo;
@@ -182,34 +178,6 @@ pub(super) struct XetToken {
182178
pub exp: u64,
183179
}
184180

185-
pub(super) struct XetTokenRefresher {
186-
core: HfCore,
187-
token_type: &'static str,
188-
}
189-
190-
impl XetTokenRefresher {
191-
pub(super) fn new(core: &HfCore, token_type: &'static str) -> Self {
192-
Self {
193-
core: core.clone(),
194-
token_type,
195-
}
196-
}
197-
}
198-
199-
#[async_trait::async_trait]
200-
impl TokenRefresher for XetTokenRefresher {
201-
async fn refresh(
202-
&self,
203-
) -> std::result::Result<(String, u64), xet_client::cas_client::auth::AuthError> {
204-
let token = self
205-
.core
206-
.xet_token(self.token_type)
207-
.await
208-
.map_err(xet_client::cas_client::auth::AuthError::token_refresh_failure)?;
209-
Ok((token.access_token, token.exp))
210-
}
211-
}
212-
213181
// Core HuggingFace client that manages API interactions, authentication
214182
// and shared logic for reader/writer/lister.
215183

@@ -226,8 +194,7 @@ pub struct HfCore {
226194
/// inspect headers on 302 responses.
227195
pub no_redirect_client: HttpClient,
228196

229-
upload_config: Arc<OnceCell<Arc<TranslatorConfig>>>,
230-
download_config: Arc<OnceCell<Arc<TranslatorConfig>>>,
197+
xet_session: Arc<OnceCell<XetSession>>,
231198
}
232199

233200
impl Debug for HfCore {
@@ -256,8 +223,7 @@ impl HfCore {
256223
token,
257224
endpoint,
258225
no_redirect_client,
259-
upload_config: Arc::new(OnceCell::new()),
260-
download_config: Arc::new(OnceCell::new()),
226+
xet_session: Arc::new(OnceCell::new()),
261227
}
262228
}
263229

@@ -352,52 +318,22 @@ impl HfCore {
352318
Ok(token)
353319
}
354320

355-
async fn xet_config(&self, token_type: &'static str) -> Result<Arc<TranslatorConfig>> {
356-
let cas_url = self.xet_token(token_type).await?.cas_url;
357-
let refresher = Arc::new(XetTokenRefresher::new(self, token_type));
358-
xet_data::processing::data_client::default_config(
359-
cas_url,
360-
None,
361-
Some(refresher as Arc<dyn TokenRefresher>),
362-
None,
363-
)
364-
.map(Arc::new)
365-
.map_err(|err| {
366-
Error::new(ErrorKind::Unexpected, "failed to create xet config").set_source(err)
367-
})
368-
}
369-
370-
async fn upload_config(&self) -> Result<Arc<TranslatorConfig>> {
371-
let this = self.clone();
372-
self.upload_config
373-
.get_or_try_init(|| async move { this.xet_config("write").await })
374-
.await
375-
.map(Arc::clone)
376-
}
377-
378-
async fn download_config(&self) -> Result<Arc<TranslatorConfig>> {
321+
pub(super) async fn xet_session(&self) -> Result<XetSession> {
379322
let this = self.clone();
380-
self.download_config
381-
.get_or_try_init(|| async move { this.xet_config("read").await })
382-
.await
383-
.map(Arc::clone)
384-
}
385-
386-
pub(super) async fn xet_upload_session(&self) -> Result<Arc<FileUploadSession>> {
387-
FileUploadSession::new(self.upload_config().await?)
388-
.await
389-
.map_err(|err| {
390-
Error::new(ErrorKind::Unexpected, "failed to create upload session").set_source(err)
323+
self.xet_session
324+
.get_or_try_init(|| async move {
325+
let token = this.xet_token("write").await?;
326+
XetSessionBuilder::new()
327+
.with_endpoint(token.cas_url)
328+
.with_token_info(token.access_token, token.exp)
329+
.build()
330+
.map_err(|err| {
331+
Error::new(ErrorKind::Unexpected, "failed to create xet session")
332+
.set_source(err)
333+
})
391334
})
392-
}
393-
394-
pub(super) async fn xet_download_session(&self) -> Result<Arc<FileDownloadSession>> {
395-
FileDownloadSession::new(self.download_config().await?)
396335
.await
397-
.map_err(|err| {
398-
Error::new(ErrorKind::Unexpected, "failed to create download session")
399-
.set_source(err)
400-
})
336+
.cloned()
401337
}
402338

403339
/// Issue a HEAD request and extract XET file info (hash and size).

core/services/hf/src/reader.rs

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@ use http::Response;
1919
use http::StatusCode;
2020
use http::header;
2121

22-
use xet_client::ClientError;
23-
use xet_data::file_reconstruction::FileReconstructionError;
24-
use xet_data::processing::DownloadStream;
25-
use xet_data::processing::XetFileInfo;
22+
use xet::xet_session::{SessionError, XetDownloadStream, XetFileInfo};
2623

2724
use super::core::HfCore;
2825
use super::uri::RepoType;
@@ -31,7 +28,7 @@ use opendal_core::*;
3128

3229
pub enum HfReader {
3330
Http(HttpBody),
34-
Xet(DownloadStream),
31+
Xet(XetDownloadStream),
3532
}
3633

3734
impl HfReader {
@@ -86,32 +83,28 @@ impl HfReader {
8683
file_info: &XetFileInfo,
8784
range: BytesRange,
8885
) -> Result<Self> {
89-
let session = core.xet_download_session().await?;
90-
91-
let (_id, stream) = session
92-
.download_stream_range(file_info, range.to_range())
86+
let session = core.xet_session().await?;
87+
let xet_range = if range.is_full() {
88+
None
89+
} else {
90+
let start = range.offset();
91+
let end = range.size().map(|s| start + s).unwrap_or(u64::MAX);
92+
Some(start..end)
93+
};
94+
let mut stream = session
95+
.download_stream(file_info.clone(), xet_range)
9396
.await
9497
.map_err(|err| {
95-
Error::new(
96-
ErrorKind::Unexpected,
97-
"failed to create xet download stream",
98-
)
99-
.set_source(err)
98+
Error::new(ErrorKind::Unexpected, "failed to create xet download stream")
99+
.set_source(err)
100100
})?;
101+
stream.start();
101102
Ok(Self::Xet(stream))
102103
}
103104
}
104105

105-
fn map_reconstruction_error(e: FileReconstructionError) -> Error {
106-
let kind = match &e {
107-
FileReconstructionError::ClientError(arc) => match arc.as_ref() {
108-
ClientError::FileNotFound(_) | ClientError::XORBNotFound(_) => ErrorKind::NotFound,
109-
ClientError::InvalidRange => ErrorKind::RangeNotSatisfied,
110-
_ => ErrorKind::Unexpected,
111-
},
112-
_ => ErrorKind::Unexpected,
113-
};
114-
Error::new(kind, "xet read error").set_source(e)
106+
fn map_session_error(e: SessionError) -> Error {
107+
Error::new(ErrorKind::Unexpected, "xet read error").set_source(e)
115108
}
116109

117110
impl oio::Read for HfReader {
@@ -121,7 +114,7 @@ impl oio::Read for HfReader {
121114
Self::Xet(stream) => match stream.next().await {
122115
Ok(Some(bytes)) => Ok(Buffer::from(bytes)),
123116
Ok(None) => Ok(Buffer::new()),
124-
Err(e) => Err(map_reconstruction_error(e)),
117+
Err(e) => Err(map_session_error(e)),
125118
},
126119
}
127120
}

core/services/hf/src/writer.rs

Lines changed: 40 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,14 @@
1616
// under the License.
1717

1818
use std::sync::Arc;
19-
use std::sync::Mutex;
2019

2120
use base64::Engine;
2221

2322
use super::core::{BucketOperation, CommitFile, HfCore, LfsFile};
2423
use super::uri::RepoType;
2524
use opendal_core::raw::*;
2625
use opendal_core::*;
27-
use xet_data::processing::FileUploadSession;
28-
use xet_data::processing::Sha256Policy;
29-
use xet_data::processing::SingleFileCleaner;
26+
use xet::xet_session::{Sha256Policy, XetStreamUpload, XetUploadCommit};
3027

3128
/// Writer that handles both regular (small) and XET (large) file uploads.
3229
pub enum HfWriter {
@@ -40,8 +37,8 @@ pub enum HfWriter {
4037
Xet {
4138
core: Arc<HfCore>,
4239
path: String,
43-
session: Arc<FileUploadSession>,
44-
cleaner: Mutex<Option<SingleFileCleaner>>,
40+
commit: XetUploadCommit,
41+
stream: Option<XetStreamUpload>,
4542
},
4643
}
4744

@@ -54,17 +51,23 @@ impl HfWriter {
5451
|| core.determine_upload_mode(&path).await? == "lfs";
5552

5653
let writer = if use_xet {
57-
let session = core.xet_upload_session().await?;
58-
let (_id, cleaner) = session
59-
.start_clean(None, None, Sha256Policy::Compute)
54+
let session = core.xet_session().await?;
55+
let commit = session.new_upload_commit().await.map_err(|err| {
56+
Error::new(ErrorKind::Unexpected, "failed to create xet upload commit")
57+
.set_source(err)
58+
})?;
59+
let stream = commit
60+
.upload_stream(None, Sha256Policy::Compute)
61+
.await
6062
.map_err(|err| {
61-
Error::new(ErrorKind::Unexpected, "failed to start xet upload").set_source(err)
63+
Error::new(ErrorKind::Unexpected, "failed to start xet upload stream")
64+
.set_source(err)
6265
})?;
6366
HfWriter::Xet {
6467
core,
6568
path,
66-
session,
67-
cleaner: Mutex::new(Some(cleaner)),
69+
commit,
70+
stream: Some(stream),
6871
}
6972
} else {
7073
HfWriter::Regular {
@@ -93,14 +96,9 @@ impl oio::Write for HfWriter {
9396
buf.push(bs);
9497
Ok(())
9598
}
96-
HfWriter::Xet { cleaner, .. } => {
97-
// get_mut() required here: lock() would hold a MutexGuard across .await
98-
let cleaner = cleaner
99-
.get_mut()
100-
.expect("mutex poisoned")
101-
.as_mut()
102-
.expect("cleaner missing");
103-
cleaner.add_data(&bs.to_bytes()).await.map_err(|err| {
99+
HfWriter::Xet { stream, .. } => {
100+
let stream = stream.as_ref().expect("stream missing");
101+
stream.write(bs.to_bytes()).await.map_err(|err| {
104102
Error::new(ErrorKind::Unexpected, "failed to write xet chunk").set_source(err)
105103
})
106104
}
@@ -130,65 +128,51 @@ impl oio::Write for HfWriter {
130128
HfWriter::Xet {
131129
core,
132130
path,
133-
session,
134-
cleaner,
131+
commit,
132+
stream,
135133
} => {
136-
let cleaner = cleaner
137-
.get_mut()
138-
.expect("mutex poisoned")
134+
let stream = stream
139135
.take()
140-
.ok_or_else(|| {
141-
Error::new(ErrorKind::Unexpected, "xet writer already closed")
142-
})?;
136+
.ok_or_else(|| Error::new(ErrorKind::Unexpected, "xet writer already closed"))?;
143137

144-
let (file_info, _metrics) = cleaner.finish().await.map_err(|err| {
138+
let file_meta = stream.finish().await.map_err(|err| {
145139
Error::new(ErrorKind::Unexpected, "failed to finish xet upload").set_source(err)
146140
})?;
147-
141+
let file_info = file_meta.xet_info;
148142
let content_length = file_info
149143
.file_size()
150144
.expect("file_size must be set after finish()");
151145
let meta = Metadata::default().with_content_length(content_length);
152146

147+
commit.commit().await.map_err(|err| {
148+
Error::new(ErrorKind::Unexpected, "failed to commit xet upload")
149+
.set_source(err)
150+
})?;
151+
153152
if core.repo.repo_type == RepoType::Bucket {
154153
let xet_hash = file_info.hash().to_string();
155-
session.clone().finalize().await.map_err(|err| {
156-
Error::new(
157-
ErrorKind::Unexpected,
158-
"failed to finalize xet upload session",
159-
)
160-
.set_source(err)
161-
})?;
162-
let operation = BucketOperation::AddFile {
154+
core.bucket_batch(vec![BucketOperation::AddFile {
163155
path: path.clone(),
164156
xet_hash,
165-
};
166-
core.bucket_batch(vec![operation]).await?;
157+
}])
158+
.await?;
167159
Ok(meta)
168160
} else {
169161
let sha256 = file_info.sha256().ok_or_else(|| {
170162
Error::new(ErrorKind::Unexpected, "xet upload returned no sha256")
171163
})?;
172-
session.clone().finalize().await.map_err(|err| {
173-
Error::new(
174-
ErrorKind::Unexpected,
175-
"failed to finalize xet upload session",
176-
)
177-
.set_source(err)
178-
})?;
179164
let lfs_file = LfsFile {
180165
path: path.clone(),
181166
oid: sha256.to_string(),
182167
algo: "sha256".to_string(),
183168
size: content_length,
184169
};
185170
let resp = core.commit_files(vec![], vec![lfs_file], vec![]).await?;
186-
187-
if let Some(commit_oid) = resp.commit_oid {
188-
Ok(meta.with_version(commit_oid))
171+
Ok(if let Some(oid) = resp.commit_oid {
172+
meta.with_version(oid)
189173
} else {
190-
Ok(meta)
191-
}
174+
meta
175+
})
192176
}
193177
}
194178
}
@@ -199,8 +183,10 @@ impl oio::Write for HfWriter {
199183
HfWriter::Regular { buf, .. } => {
200184
buf.clear();
201185
}
202-
HfWriter::Xet { cleaner, .. } => {
203-
cleaner.get_mut().expect("mutex poisoned").take();
186+
HfWriter::Xet { stream, .. } => {
187+
if let Some(s) = stream.take() {
188+
s.abort();
189+
}
204190
}
205191
}
206192
Ok(())

0 commit comments

Comments
 (0)