Skip to content

Commit d0b249e

Browse files
authored
Merge pull request #142 from hotdata-dev/feat/files-upload-stream
refactor(databases): stream /files upload via SDK upload_stream
2 parents faeef94 + a2e4ad3 commit d0b249e

5 files changed

Lines changed: 244 additions & 96 deletions

File tree

Cargo.lock

Lines changed: 15 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,25 @@ path = "src/main.rs"
1313

1414
[dependencies]
1515
# Hotdata Rust SDK. The CLI's sync wrapper (src/sdk.rs) drives this async SDK
16-
# behind a shared multi-thread tokio runtime. Pinned to the rev that adds the
17-
# CLI-consumption surface (attribution client_id, async submit_query, streaming
18-
# upload_stream) — merged via hotdata-dev/sdk-rust#32.
19-
hotdata = { git = "https://github.com/hotdata-dev/sdk-rust", rev = "8d4018fb899ba52228db44eaffa6caa0eb5b603f", features = ["arrow"] }
16+
# behind a shared multi-thread tokio runtime. The `arrow` feature backs result
17+
# decode; the pinned rev is the first to carry `content_length` on the streaming
18+
# `upload_stream` (sized body, not chunked, so the server can fast-fail an
19+
# oversized upload — see src/sdk.rs::Api::upload_stream).
20+
hotdata = { git = "https://github.com/hotdata-dev/sdk-rust", rev = "1687ba2ffb8f816540d00d7c1fdfe165a8fbd150", features = ["arrow"] }
2021
# Shared multi-thread runtime for the sync wrapper; block_on is called
21-
# concurrently from rayon worker threads (see src/indexes.rs).
22-
tokio = { version = "1", features = ["rt-multi-thread"] }
22+
# concurrently from rayon worker threads (see src/indexes.rs). `sync` backs the
23+
# mpsc channel that bridges the blocking upload reader into an async byte stream.
24+
tokio = { version = "1", features = ["rt-multi-thread", "sync"] }
2325
# CliTokenProvider implements the SDK's #[async_trait] BearerTokenProvider.
2426
async-trait = "0.1"
27+
# Bridge the progress-wrapped blocking upload reader into the async
28+
# `Stream<Item = Result<Bytes, _>>` the SDK's `upload_stream` consumes: a
29+
# spawn_blocking task feeds chunks through a tokio mpsc channel, wrapped as a
30+
# Stream by tokio-stream's ReceiverStream. `futures-core` names the Stream
31+
# bound; `bytes` matches the SDK's chunk type.
32+
bytes = "1"
33+
futures-core = "0.3"
34+
tokio-stream = "0.1"
2535
anstyle = "1.0.13"
2636
clap = { version = "4", features = ["derive"] }
2737
directories = "6"

src/databases.rs

Lines changed: 11 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -327,59 +327,19 @@ fn finish_upload(
327327
size: Option<u64>,
328328
pb: &ProgressBar,
329329
) -> String {
330-
// The streaming `/files` upload stays on the slim raw-HTTP helper: it
331-
// needs no request timeout (a 10 GB+ parquet far outlives the seam's
332-
// 300s default), is one-shot (no 401-retry — the body is consumed on the
333-
// first send), and the SDK's `uploads().upload` is `PathBuf`-only with no
334-
// progress hook or `--url` source. We still carry the same
335-
// `Authorization: Bearer <jwt>` (resolved through the seam's installed
336-
// token provider) and `X-Workspace-Id` header every other call uses.
337-
let upload_client = crate::raw_http::build_upload_client();
338-
let url = format!("{}/files", api.api_url);
339-
let mut req = upload_client
340-
.post(&url)
341-
.header("Content-Type", "application/octet-stream");
342-
if let Some(bearer) = api.current_bearer() {
343-
req = req.header("Authorization", format!("Bearer {bearer}"));
344-
}
345-
if let Some(ws) = api.workspace_id() {
346-
req = req.header("X-Workspace-Id", ws);
347-
}
348-
if let Some(len) = size {
349-
req = req.header("Content-Length", len);
350-
}
351-
let req = req.body(reqwest::blocking::Body::new(reader));
352-
353-
// Body is an opaque stream, so pass `None` for logging; headers
354-
// (including the masked Authorization) still log.
355-
let (status, resp_body) = match crate::util::send_debug(&upload_client, req, None) {
356-
Ok(pair) => pair,
357-
Err(e) => {
358-
eprintln!("error connecting to API: {e}");
359-
std::process::exit(1);
360-
}
361-
};
330+
// Stream the body to `POST /v1/files` through the SDK seam, which drives the
331+
// SDK's `upload_stream` on a dedicated no-timeout client (a 10 GB+ parquet
332+
// far outlives the shared 300s request timeout) and bridges this blocking,
333+
// progress-wrapped `reader` into the async byte stream the SDK consumes.
334+
// `size` becomes the `Content-Length` so the server fast-fails an oversized
335+
// upload before writing bytes; the `--url` source may not know it, hence
336+
// `Option`. Carries the same auth + scope headers as every other SDK call.
337+
let result = api.upload_stream(reader, size);
362338
pb.finish_and_clear();
363339

364-
if !status.is_success() {
365-
use crossterm::style::Stylize;
366-
eprintln!("{}", crate::util::api_error(resp_body).red());
367-
std::process::exit(1);
368-
}
369-
370-
let body: serde_json::Value = match serde_json::from_str(&resp_body) {
371-
Ok(v) => v,
372-
Err(e) => {
373-
eprintln!("error parsing upload response: {e}");
374-
std::process::exit(1);
375-
}
376-
};
377-
match body["id"].as_str() {
378-
Some(id) => id.to_string(),
379-
None => {
380-
eprintln!("error: upload response missing id");
381-
std::process::exit(1);
382-
}
340+
match result {
341+
Ok(id) => id,
342+
Err(e) => e.exit(),
383343
}
384344
}
385345

src/raw_http.rs

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,15 @@
88
//! * the session-token mints (`/v1/auth/database`, `/v1/auth/sandbox`) — a
99
//! distinct grant on distinct endpoints (`database_session.rs` /
1010
//! `sandbox_session.rs`);
11-
//! * the streaming `/files` upload (10 GB+, `--url` source, progress bar, no
12-
//! request timeout, no 401-retry) — the SDK's `uploads().upload` is
13-
//! `PathBuf`-only;
1411
//! * `skill.rs`'s arbitrary-URL markdown fetch.
1512
//!
16-
//! This module owns the two blocking client builders (one timeout-bounded, one
17-
//! no-timeout for uploads) and a thin bearer/header request builder. It does
18-
//! NOT carry the old `ApiClient`'s 401-retry loop: token freshness is now the
19-
//! `CliTokenProvider`'s job (proactive refresh at the 30s leeway), and the
20-
//! upload path was always one-shot anyway.
13+
//! This module owns the timeout-bounded blocking client builder and a thin
14+
//! bearer/header request builder. Token freshness is the `CliTokenProvider`'s
15+
//! job (proactive refresh at the 30s leeway), so these helpers carry no
16+
//! 401-retry loop.
2117
22-
// Consumers (jwt.rs token mints, session mints, the streaming upload,
23-
// skill.rs) are migrated to this helper incrementally; the allow keeps the
24-
// build warning-free until those call sites land.
18+
// Not every helper here is wired to a call site yet; the allow keeps the build
19+
// warning-free.
2520
#![allow(dead_code)]
2621

2722
use std::time::Duration;
@@ -50,19 +45,6 @@ pub fn build_http_client() -> reqwest::blocking::Client {
5045
.expect("reqwest blocking client should always build with these defaults")
5146
}
5247

53-
/// Client used only for streaming file uploads. Deliberately has **no** request
54-
/// timeout: an upload's duration scales with file size and uplink (a 10 GB
55-
/// parquet takes far longer than `HTTP_REQUEST_TIMEOUT`, which is sized for
56-
/// slow server-side work), so a wall-clock cap would abort healthy-but-slow
57-
/// transfers. TCP keepalive is kept so a genuinely dead peer is still reaped by
58-
/// the OS; a live-but-slow upload runs to completion and the user can Ctrl-C.
59-
pub fn build_upload_client() -> reqwest::blocking::Client {
60-
reqwest::blocking::Client::builder()
61-
.tcp_keepalive(TCP_KEEPALIVE_INTERVAL)
62-
.build()
63-
.expect("reqwest blocking client should always build with these defaults")
64-
}
65-
6648
#[cfg(test)]
6749
mod tests {
6850
use super::*;
@@ -72,11 +54,6 @@ mod tests {
7254
let _ = build_http_client();
7355
}
7456

75-
#[test]
76-
fn upload_client_builds() {
77-
let _ = build_upload_client();
78-
}
79-
8057
#[test]
8158
fn redact_keys_cover_token_fields() {
8259
// Guards against silently dropping a sensitive key from debug logs.

0 commit comments

Comments
 (0)