Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ hotdata databases list [-w <id>] [-o table|json|yaml]
hotdata databases create --name <name> [--table <table> ...] [--schema public] [-o table|json|yaml]
hotdata databases <name_or_id> [-o table|json|yaml]
hotdata databases delete <name_or_id>
hotdata databases run [--database <id>] [--description <label>] [--schema public] [--table <table> ...] [--expires-at <duration|timestamp>] <cmd> [args...]
hotdata databases <id> run <cmd> [args...]

hotdata databases tables list <database> [--schema <name>] [-o table|json|yaml]
hotdata databases tables load <database> <table> --file ./data.parquet [--schema public]
Expand All @@ -146,6 +148,7 @@ hotdata databases tables delete <database> <table> [--schema public]

- `create` registers a managed connection (`source_type: managed`) with no external credentials. Use `--table` to declare tables up front (required before `tables load` on the current API).
- `tables load` uploads a **parquet** file (or uses a staged `upload_id` from `POST /v1/files`) and publishes it as the table generation (`replace` mode).
- `run` mints a database-scoped JWT and execs `<cmd>` with `HOTDATA_DATABASE_TOKEN`, `HOTDATA_DATABASE_REFRESH_TOKEN`, `HOTDATA_DATABASE`, `HOTDATA_WORKSPACE`, and `HOTDATA_API_URL` injected into its environment. Pass a database id (group-positional `<id>` like `sandbox run`, or `--database <id>`) to scope an existing database; omit both to auto-create a scratch one using `--description` / `--schema` / `--table` / `--expires-at`. Useful for launching an agent or child process whose API access is restricted to a single database.
- For CSV/JSON uploads without a managed database, use `hotdata datasets create` instead (`datasets.main.*`).

Example:
Expand Down
3 changes: 3 additions & 0 deletions skills/hotdata/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ hotdata databases create [--description <label>] [--table <table> ...] [--schema
hotdata databases set <id_or_description>
hotdata databases <id_or_description> [--workspace-id <workspace_id>] [--output table|json|yaml]
hotdata databases delete <id_or_description> [--workspace-id <workspace_id>]
hotdata databases run [--database <id>] [--description <label>] [--schema public] [--table <table> ...] [--expires-at <duration|timestamp>] [--workspace-id <workspace_id>] <cmd> [args...]
hotdata databases <id> run <cmd> [args...]

# Dot-notation shorthand for load: database.table or database.schema.table
hotdata databases load <database.table> [--file ./data.parquet] [--url <url>] [--upload-id <id>] [--workspace-id <workspace_id>]
Expand All @@ -209,6 +211,7 @@ hotdata databases tables delete <table> [--database <id_or_desc>] [--schema publ
- `tables list` — lists tables with `TABLE` (`<database_id>.<schema>.<table>`), `SYNCED`, `LAST_SYNC`. Uses active database when `--database` is omitted.
- `tables load` — uploads a local parquet file (`--file`), a remote parquet URL (`--url`), or a pre-staged upload (`--upload-id`) and publishes with **replace** mode.
- `tables delete` — drops a table from the managed database.
- `run` — mints a database-scoped JWT (via `POST /v1/auth/database`) and execs `<cmd>` with `HOTDATA_DATABASE_TOKEN`, `HOTDATA_DATABASE_REFRESH_TOKEN`, `HOTDATA_DATABASE`, `HOTDATA_WORKSPACE`, and `HOTDATA_API_URL` injected. Pass a database id as a group positional (`hotdata databases <id> run ...`, sandbox-style) or via `--database <id>`; omit both to auto-create a scratch database using `--description` / `--schema` / `--table` / `--expires-at`. Use this to launch an agent or child process whose API access is scoped to a single database. The minted JWT carries `database`, `workspaces`, `permissions:["read","write"]`, `source:"database_token"`. The session is persisted at `~/.hotdata/database_session.json` (mode `0600`); the child's exit code is propagated.

Example:

Expand Down
28 changes: 28 additions & 0 deletions src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,34 @@ pub enum DatabasesCommands {
#[command(subcommand)]
command: Option<DatabaseTablesCommands>,
},

/// Run a command with a database-scoped token. Creates a new database unless --database is given.
Run {
/// Existing database id to scope the token to (omit to auto-create a database)
#[arg(long)]
database: Option<String>,

/// Description for the auto-created database (only used when --database is omitted)
#[arg(long)]
Comment on lines +642 to +643
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: this user-facing flag is --description but databases create (line 571) uses --name for the same underlying JSON field — both end up serialized as "name" by create_database_request. Two different flag names for the same field on sibling subcommands will trip people up. Picking one (and aligning README/SKILL.md, which currently disagree about which is canonical) would be worth a follow-up. (not blocking)

description: Option<String>,

/// Schema for tables declared in the auto-created database (default: public)
#[arg(long, default_value = "public")]
schema: String,

/// Table to declare in the auto-created database (repeatable)
#[arg(long = "table")]
tables: Vec<String>,

/// When the auto-created database expires. Accepts a relative duration
/// (e.g. 24h, 7d, 90m) or an RFC 3339 timestamp. Defaults to 24h when omitted.
#[arg(long)]
expires_at: Option<String>,

/// Command to execute (everything after `--`)
#[arg(trailing_var_arg = true, required = true)]
cmd: Vec<String>,
},
}

#[derive(Subcommand)]
Expand Down
218 changes: 218 additions & 0 deletions src/database_session.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
//! Persisted database-scoped JWT session.
//!
//! Minted by `POST /v1/auth/database` (grant_type=existing_database +
//! database_id), refreshed via the same endpoint with
//! grant_type=refresh_token. Bound to a single database + workspace;
//! the JWT carries workspace + database read/write scope. The server
//! does not rotate the refresh token.
//!
//! Stored at `~/.hotdata/database_session.json` (mode 0600).

use crate::config;
use crate::util;
use serde::{Deserialize, Serialize};
use std::fs;
use std::io::Write;
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};

// The refresh path below (REFRESH_LEEWAY_SECONDS, now_unix, MintResponse,
// redact, refresh, session_from_response) mirrors sandbox_session.rs and is
// covered by tests, but has no production caller yet: it's reserved for when
// a child of `databases run` re-mints an expiring HOTDATA_DATABASE_TOKEN
// (the child-side ApiClient consumption is not wired up yet). Annotated
// #[allow(dead_code)] until that lands so the build stays warning-clean.
#[allow(dead_code)]
const REFRESH_LEEWAY_SECONDS: u64 = 60;

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct DatabaseSession {
pub access_token: String,
pub refresh_token: String,
pub database_id: String,
pub workspace_id: String,
pub access_expires_at: u64,
pub refresh_expires_at: u64,
}

pub fn session_path() -> Option<PathBuf> {
config::config_dir().ok().map(|d| d.join("database_session.json"))
}

#[allow(dead_code)] // Reserved for flows that re-use a cached database session.
pub fn load() -> Option<DatabaseSession> {
let path = session_path()?;
let raw = fs::read_to_string(&path).ok()?;
serde_json::from_str(&raw).ok()
}

pub fn save(session: &DatabaseSession) -> Result<(), String> {
let path = session_path().ok_or_else(|| "no database session path available".to_string())?;
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).map_err(|e| format!("mkdir failed: {e}"))?;
}
let json = serde_json::to_string_pretty(session)
.map_err(|e| format!("serialize failed: {e}"))?;

use std::os::unix::fs::OpenOptionsExt;
let mut f = fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.mode(0o600)
.open(&path)
.map_err(|e| format!("open failed: {e}"))?;
f.write_all(json.as_bytes())
.map_err(|e| format!("write failed: {e}"))?;
Ok(())
}

#[allow(dead_code)] // Reserved for flows that re-use a cached database session.
pub fn clear() {
if let Some(path) = session_path() {
let _ = fs::remove_file(path);
}
}

Comment thread
pthurlow marked this conversation as resolved.
#[allow(dead_code)] // Part of the reserved refresh path (see REFRESH_LEEWAY_SECONDS).
fn now_unix() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}

#[allow(dead_code)] // Part of the reserved refresh path (see REFRESH_LEEWAY_SECONDS).
#[derive(Deserialize)]
pub(crate) struct MintResponse {
token: String,
refresh_token: String,
database_id: String,
expires_in: u64,
refresh_expires_in: u64,
}

#[allow(dead_code)] // Part of the reserved refresh path (see REFRESH_LEEWAY_SECONDS).
fn redact(s: &str) -> String {
util::mask_credential(s)
}

/// Trade a refresh token for a fresh database JWT (no rotation). Same
/// endpoint as the new-mint path: `POST /v1/auth/database` with
/// grant_type=refresh_token.
#[allow(dead_code)] // Part of the reserved refresh path (see REFRESH_LEEWAY_SECONDS).
pub fn refresh(api_url: &str, refresh_token: &str) -> Result<DatabaseSession, String> {
let url = format!("{}/auth/database", api_url.trim_end_matches('/'));
let body = serde_json::json!({
"grant_type": "refresh_token",
"refresh_token": refresh_token,
});
let body_log = serde_json::json!({
"grant_type": "refresh_token",
"refresh_token": redact(refresh_token),
});

let client = reqwest::blocking::Client::new();
let req = client.post(&url).json(&body);
let (status, body_text) = util::send_debug_with_redaction(
&client,
req,
Some(&body_log),
&["token", "refresh_token"],
)
.map_err(|e| format!("connection error: {e}"))?;
if !status.is_success() {
return Err(format!("database refresh failed: HTTP {status}: {body_text}"));
}
let resp: MintResponse = serde_json::from_str(&body_text)
.map_err(|e| format!("malformed refresh response: {e}"))?;
Ok(session_from_response(resp, String::new()))
}

/// Build a [`DatabaseSession`] from a mint/refresh response. The mint
/// response doesn't carry the workspace public_id, so the caller passes
/// it in (it's what the JWT's `workspaces` claim restricts the bearer
/// to). For refresh, `workspace_id` is left blank — the caller fills it
/// from the prior session, since the database-id ↔ workspace mapping is
/// invariant across refreshes.
#[allow(dead_code)] // Part of the reserved refresh path (see REFRESH_LEEWAY_SECONDS).
pub(crate) fn session_from_response(resp: MintResponse, workspace_id: String) -> DatabaseSession {
let now = now_unix();
DatabaseSession {
access_token: resp.token,
refresh_token: resp.refresh_token,
database_id: resp.database_id,
workspace_id,
access_expires_at: now + resp.expires_in,
refresh_expires_at: now + resp.refresh_expires_in,
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::config::test_helpers::with_temp_config_dir;

fn mk_session(access_offset: i64, refresh_offset: i64) -> DatabaseSession {
let now = now_unix() as i64;
DatabaseSession {
access_token: "cached".into(),
refresh_token: "cached-refresh".into(),
database_id: "dbid_abc".into(),
workspace_id: "work_xyz".into(),
access_expires_at: (now + access_offset).max(0) as u64,
refresh_expires_at: (now + refresh_offset).max(0) as u64,
}
}

#[test]
fn round_trip() {
let (_tmp, _guard) = with_temp_config_dir();
let s = mk_session(3600, 86400);
save(&s).unwrap();
let loaded = load().unwrap();
assert_eq!(loaded.access_token, "cached");
assert_eq!(loaded.database_id, "dbid_abc");
assert_eq!(loaded.workspace_id, "work_xyz");
}

#[test]
fn file_is_mode_0600() {
use std::os::unix::fs::PermissionsExt;
let (_tmp, _guard) = with_temp_config_dir();
save(&mk_session(60, 60)).unwrap();
let mode = fs::metadata(session_path().unwrap()).unwrap().permissions().mode() & 0o777;
assert_eq!(mode, 0o600);
}

#[test]
fn refresh_posts_grant_type_to_database_endpoint() {
let mut server = mockito::Server::new();
let m = server
.mock("POST", "/auth/database")
.match_body(mockito::Matcher::JsonString(
r#"{"grant_type":"refresh_token","refresh_token":"stable-refresh"}"#.to_string(),
))
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"{"ok":true,"token":"new-jwt","refresh_token":"stable-refresh","database_id":"dbid_abc","expires_in":300,"refresh_expires_in":259200}"#,
)
.create();

let s = refresh(&server.url(), "stable-refresh").unwrap();
m.assert();
assert_eq!(s.access_token, "new-jwt");
assert_eq!(s.refresh_token, "stable-refresh");
assert_eq!(s.database_id, "dbid_abc");
}

#[test]
fn refresh_http_error() {
let mut server = mockito::Server::new();
let m = server.mock("POST", "/auth/database").with_status(401).create();
let err = refresh(&server.url(), "x").unwrap_err();
m.assert();
assert!(err.contains("401"));
}
}
Loading
Loading