Skip to content

Commit 9e45fa8

Browse files
committed
refactor(databases): stream /files upload via SDK upload_stream
1 parent faeef94 commit 9e45fa8

5 files changed

Lines changed: 252 additions & 90 deletions

File tree

Cargo.lock

Lines changed: 14 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,23 @@ path = "src/main.rs"
1616
# behind a shared multi-thread tokio runtime. Pinned to the rev that adds the
1717
# CLI-consumption surface (attribution client_id, async submit_query, streaming
1818
# upload_stream) — merged via hotdata-dev/sdk-rust#32.
19-
hotdata = { git = "https://github.com/hotdata-dev/sdk-rust", rev = "8d4018fb899ba52228db44eaffa6caa0eb5b603f", features = ["arrow"] }
19+
# TEMP: local path dep while sdk-rust#35 (content_length on upload_stream) is in
20+
# review. Flip back to a `git + rev` pin once that PR squash-merges to main.
21+
hotdata = { path = "/tmp/sdk-rust-133", features = ["arrow"] }
2022
# Shared multi-thread runtime for the sync wrapper; block_on is called
21-
# concurrently from rayon worker threads (see src/indexes.rs).
22-
tokio = { version = "1", features = ["rt-multi-thread"] }
23+
# concurrently from rayon worker threads (see src/indexes.rs). `sync` backs the
24+
# mpsc channel that bridges the blocking upload reader into an async byte stream.
25+
tokio = { version = "1", features = ["rt-multi-thread", "sync"] }
2326
# CliTokenProvider implements the SDK's #[async_trait] BearerTokenProvider.
2427
async-trait = "0.1"
28+
# Bridge the progress-wrapped blocking upload reader into the async
29+
# `Stream<Item = Result<Bytes, _>>` the SDK's `upload_stream` consumes: a
30+
# spawn_blocking task feeds chunks through a tokio mpsc channel, wrapped as a
31+
# Stream by tokio-stream's ReceiverStream. `futures-core` names the Stream
32+
# bound; `bytes` matches the SDK's chunk type.
33+
bytes = "1"
34+
futures-core = "0.3"
35+
tokio-stream = "0.1"
2536
anstyle = "1.0.13"
2637
clap = { version = "4", features = ["derive"] }
2738
directories = "6"

src/databases.rs

Lines changed: 11 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -327,59 +327,19 @@ fn finish_upload(
327327
size: Option<u64>,
328328
pb: &ProgressBar,
329329
) -> String {
330-
// The streaming `/files` upload stays on the slim raw-HTTP helper: it
331-
// needs no request timeout (a 10 GB+ parquet far outlives the seam's
332-
// 300s default), is one-shot (no 401-retry — the body is consumed on the
333-
// first send), and the SDK's `uploads().upload` is `PathBuf`-only with no
334-
// progress hook or `--url` source. We still carry the same
335-
// `Authorization: Bearer <jwt>` (resolved through the seam's installed
336-
// token provider) and `X-Workspace-Id` header every other call uses.
337-
let upload_client = crate::raw_http::build_upload_client();
338-
let url = format!("{}/files", api.api_url);
339-
let mut req = upload_client
340-
.post(&url)
341-
.header("Content-Type", "application/octet-stream");
342-
if let Some(bearer) = api.current_bearer() {
343-
req = req.header("Authorization", format!("Bearer {bearer}"));
344-
}
345-
if let Some(ws) = api.workspace_id() {
346-
req = req.header("X-Workspace-Id", ws);
347-
}
348-
if let Some(len) = size {
349-
req = req.header("Content-Length", len);
350-
}
351-
let req = req.body(reqwest::blocking::Body::new(reader));
352-
353-
// Body is an opaque stream, so pass `None` for logging; headers
354-
// (including the masked Authorization) still log.
355-
let (status, resp_body) = match crate::util::send_debug(&upload_client, req, None) {
356-
Ok(pair) => pair,
357-
Err(e) => {
358-
eprintln!("error connecting to API: {e}");
359-
std::process::exit(1);
360-
}
361-
};
330+
// Stream the body to `POST /v1/files` through the SDK seam, which drives the
331+
// SDK's `upload_stream` on a dedicated no-timeout client (a 10 GB+ parquet
332+
// far outlives the shared 300s request timeout) and bridges this blocking,
333+
// progress-wrapped `reader` into the async byte stream the SDK consumes.
334+
// `size` becomes the `Content-Length` so the server fast-fails an oversized
335+
// upload before writing bytes; the `--url` source may not know it, hence
336+
// `Option`. Carries the same auth + scope headers as every other SDK call.
337+
let result = api.upload_stream(reader, size, "application/octet-stream");
362338
pb.finish_and_clear();
363339

364-
if !status.is_success() {
365-
use crossterm::style::Stylize;
366-
eprintln!("{}", crate::util::api_error(resp_body).red());
367-
std::process::exit(1);
368-
}
369-
370-
let body: serde_json::Value = match serde_json::from_str(&resp_body) {
371-
Ok(v) => v,
372-
Err(e) => {
373-
eprintln!("error parsing upload response: {e}");
374-
std::process::exit(1);
375-
}
376-
};
377-
match body["id"].as_str() {
378-
Some(id) => id.to_string(),
379-
None => {
380-
eprintln!("error: upload response missing id");
381-
std::process::exit(1);
382-
}
340+
match result {
341+
Ok(id) => id,
342+
Err(e) => e.exit(),
383343
}
384344
}
385345

src/raw_http.rs

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,16 @@
88
//! * the session-token mints (`/v1/auth/database`, `/v1/auth/sandbox`) — a
99
//! distinct grant on distinct endpoints (`database_session.rs` /
1010
//! `sandbox_session.rs`);
11-
//! * the streaming `/files` upload (10 GB+, `--url` source, progress bar, no
12-
//! request timeout, no 401-retry) — the SDK's `uploads().upload` is
13-
//! `PathBuf`-only;
1411
//! * `skill.rs`'s arbitrary-URL markdown fetch.
1512
//!
16-
//! This module owns the two blocking client builders (one timeout-bounded, one
17-
//! no-timeout for uploads) and a thin bearer/header request builder. It does
18-
//! NOT carry the old `ApiClient`'s 401-retry loop: token freshness is now the
19-
//! `CliTokenProvider`'s job (proactive refresh at the 30s leeway), and the
20-
//! upload path was always one-shot anyway.
13+
//! (The streaming `/files` upload moved onto the SDK seam's
14+
//! [`Api::upload_stream`](crate::sdk::Api::upload_stream), which owns its own
15+
//! no-timeout client.)
16+
//!
17+
//! This module owns the timeout-bounded blocking client builder and a thin
18+
//! bearer/header request builder. It does NOT carry the old `ApiClient`'s
19+
//! 401-retry loop: token freshness is now the `CliTokenProvider`'s job
20+
//! (proactive refresh at the 30s leeway).
2121
2222
// Consumers (jwt.rs token mints, session mints, the streaming upload,
2323
// skill.rs) are migrated to this helper incrementally; the allow keeps the
@@ -50,19 +50,6 @@ pub fn build_http_client() -> reqwest::blocking::Client {
5050
.expect("reqwest blocking client should always build with these defaults")
5151
}
5252

53-
/// Client used only for streaming file uploads. Deliberately has **no** request
54-
/// timeout: an upload's duration scales with file size and uplink (a 10 GB
55-
/// parquet takes far longer than `HTTP_REQUEST_TIMEOUT`, which is sized for
56-
/// slow server-side work), so a wall-clock cap would abort healthy-but-slow
57-
/// transfers. TCP keepalive is kept so a genuinely dead peer is still reaped by
58-
/// the OS; a live-but-slow upload runs to completion and the user can Ctrl-C.
59-
pub fn build_upload_client() -> reqwest::blocking::Client {
60-
reqwest::blocking::Client::builder()
61-
.tcp_keepalive(TCP_KEEPALIVE_INTERVAL)
62-
.build()
63-
.expect("reqwest blocking client should always build with these defaults")
64-
}
65-
6653
#[cfg(test)]
6754
mod tests {
6855
use super::*;
@@ -72,11 +59,6 @@ mod tests {
7259
let _ = build_http_client();
7360
}
7461

75-
#[test]
76-
fn upload_client_builds() {
77-
let _ = build_upload_client();
78-
}
79-
8062
#[test]
8163
fn redact_keys_cover_token_fields() {
8264
// Guards against silently dropping a sensitive key from debug logs.

0 commit comments

Comments
 (0)