Skip to content

Commit 340c48f

Browse files
authored
feat(query): Async query execution by returning early from long running queries
1 parent 398f839 commit 340c48f

5 files changed

Lines changed: 156 additions & 25 deletions

File tree

README.md

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,15 @@ hotdata datasets create --url "https://example.com/data.parquet" --label "My Dat
138138
## Query
139139

140140
```sh
141-
hotdata query "<sql>" [--workspace-id <id>] [--connection <connection_id>] [--format table|json|csv]
141+
hotdata query "<sql>" [-w <id>] [--connection <connection_id>] [-o table|json|csv]
142+
hotdata query status <query_run_id> [-o table|json|csv]
142143
```
143144

144-
- Default format is `table`, which prints results with row count and execution time.
145+
- Default output is `table`, which prints results with row count and execution time.
145146
- Use `--connection` to scope the query to a specific connection.
147+
- Long-running queries automatically fall back to async execution and return a `query_run_id`.
148+
- Use `hotdata query status <query_run_id>` to poll for results.
149+
- Exit codes for `query status`: `0` = succeeded, `1` = failed, `2` = still running (poll again).
146150

147151
## Saved Queries
148152

@@ -163,13 +167,21 @@ hotdata queries run <query_id> [--format table|json|csv]
163167
## Search
164168

165169
```sh
166-
hotdata search "<query>" --table <connection.schema.table> --column <column> [--select <columns>] [--limit <n>] [--format table|json|csv]
170+
# BM25 full-text search
171+
hotdata search "query text" --table <connection.schema.table> --column <column> [--select <columns>] [--limit <n>] [-o table|json|csv]
172+
173+
# Vector search with --model (calls OpenAI to embed the query)
174+
hotdata search "query text" --table <table> --column <vector_column> --model text-embedding-3-small [--limit <n>]
175+
176+
# Vector search with piped embedding
177+
echo '[0.1, -0.2, ...]' | hotdata search --table <table> --column <vector_column> [--limit <n>]
167178
```
168179

169-
- Full-text search using BM25 across a table column.
170-
- Requires a BM25 index on the target column (see `indexes create`).
171-
- Results are ordered by relevance score (descending).
172-
- `--select` specifies which columns to return (comma-separated, defaults to all). The `score` column is automatically appended when `--select` is used.
180+
- Without `--model` and with query text: BM25 full-text search. Requires a BM25 index on the target column.
181+
- With `--model`: generates an embedding via OpenAI and performs vector search using `l2_distance`. Requires `OPENAI_API_KEY` env var.
182+
- Without query text and with piped stdin: reads a vector (raw JSON array or OpenAI embedding response) and performs vector search.
183+
- BM25 results are ordered by relevance score (descending). Vector results are ordered by distance (ascending).
184+
- `--select` specifies which columns to return (comma-separated, defaults to all).
173185

174186
## Indexes
175187

skills/hotdata-cli/SKILL.md

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -163,16 +163,21 @@ Use `hotdata datasets <dataset_id>` to look up the `table_name` before writing q
163163

164164
### Execute SQL Query
165165
```
166-
hotdata query "<sql>" [--workspace-id <workspace_id>] [--connection <connection_id>] [--format table|json|csv]
166+
hotdata query "<sql>" [-w <workspace_id>] [--connection <connection_id>] [-o table|json|csv]
167+
hotdata query status <query_run_id> [-o table|json|csv]
167168
```
168-
- Default format is `table`, which prints results with row count and execution time.
169+
- Default output is `table`, which prints results with row count and execution time.
169170
- Use `--connection` to scope the query to a specific connection.
170171
- Use `hotdata tables list` to discover tables and columns — do not query `information_schema` directly.
171172
- **Always use PostgreSQL dialect SQL.**
173+
- Long-running queries automatically fall back to async execution and return a `query_run_id`.
174+
- Use `hotdata query status <query_run_id>` to poll for results.
175+
- Exit codes for `query status`: `0` = succeeded, `1` = failed, `2` = still running (poll again).
176+
- **When a query returns a `query_run_id`, use `query status` to poll rather than re-running the query.**
172177

173178
### Get Query Result
174179
```
175-
hotdata results <result_id> [--workspace-id <workspace_id>] [--format table|json|csv]
180+
hotdata results <result_id> [-w <workspace_id>] [-o table|json|csv]
176181
```
177182
- Retrieves a previously executed query result by its result ID.
178183
- Query results include a `result-id` in the footer (e.g. `[result-id: rslt...]`).
@@ -195,23 +200,31 @@ hotdata queries run <query_id> [--format table|json|csv]
195200

196201
### Search
197202
```
198-
hotdata search "<query>" --table <connection.schema.table> --column <column> [--select <columns>] [--limit <n>] [--format table|json|csv]
203+
# BM25 full-text search
204+
hotdata search "query text" --table <connection.schema.table> --column <column> [--select <columns>] [--limit <n>] [-o table|json|csv]
205+
206+
# Vector search with --model (calls OpenAI to embed the query)
207+
hotdata search "query text" --table <table> --column <vector_column> --model text-embedding-3-small [--limit <n>]
208+
209+
# Vector search with piped embedding
210+
echo '[0.1, -0.2, ...]' | hotdata search --table <table> --column <vector_column> [--limit <n>]
199211
```
200-
- Full-text search using BM25 across a table column.
201-
- Requires a BM25 index on the target column (see `indexes create`).
202-
- Results are ordered by relevance score (descending).
203-
- `--select` specifies which columns to return (comma-separated, defaults to all). The `score` column is automatically appended when `--select` is used.
212+
- Without `--model` and with query text: BM25 full-text search. Requires a BM25 index on the target column.
213+
- With `--model`: generates an embedding via OpenAI and performs vector search using `l2_distance`. Requires `OPENAI_API_KEY` env var. Supported models: `text-embedding-3-small`, `text-embedding-3-large`.
214+
- Without query text and with piped stdin: reads a vector (raw JSON array or OpenAI embedding response) and performs vector search.
215+
- BM25 results are ordered by relevance score (descending). Vector results are ordered by distance (ascending).
216+
- `--select` specifies which columns to return (comma-separated, defaults to all).
204217
- Default limit is 10.
218+
- **For BM25 search, create a BM25 index on the target column first. For vector search, create a vector index.**
205219

206220
### Indexes
207221
```
208-
hotdata indexes list --connection-id <id> --schema <schema> --table <table> [--workspace-id <workspace_id>] [--format table|json|yaml]
209-
hotdata indexes create --connection-id <id> --schema <schema> --table <table> --name <name> --columns <cols> [--type sorted|bm25|vector] [--metric l2|cosine|dot] [--async]
222+
hotdata indexes list -c <connection_id> --schema <schema> --table <table> [-w <workspace_id>] [-o table|json|yaml]
223+
hotdata indexes create -c <connection_id> --schema <schema> --table <table> --name <name> --columns <cols> [--type sorted|bm25|vector] [--metric l2|cosine|dot] [--async]
210224
```
211225
- `list` shows indexes on a table with name, type, columns, status, and creation date.
212226
- `create` creates an index. Use `--type bm25` for full-text search, `--type vector` for vector search (requires `--metric`).
213227
- `--async` submits index creation as a background job. Use `hotdata jobs <job_id>` to check status.
214-
- **Before using `hotdata search`, create a BM25 index on the target column.**
215228

216229
### Jobs
217230
```

src/command.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ pub enum Commands {
2525
command: Option<DatasetsCommands>,
2626
},
2727

28-
/// Execute a SQL query
28+
/// Execute a SQL query, or check status of a running query
2929
Query {
30-
/// SQL query string
31-
sql: String,
30+
/// SQL query string (omit when using a subcommand)
31+
sql: Option<String>,
3232

3333
/// Workspace ID (defaults to first workspace from login)
3434
#[arg(long, short = 'w')]
@@ -41,6 +41,9 @@ pub enum Commands {
4141
/// Output format
4242
#[arg(long = "output", short = 'o', default_value = "table", value_parser = ["table", "json", "csv"])]
4343
output: String,
44+
45+
#[command(subcommand)]
46+
command: Option<QueryCommands>,
4447
},
4548

4649
/// Manage workspaces
@@ -187,6 +190,16 @@ impl From<ShellChoice> for clap_complete::Shell {
187190
}
188191
}
189192

193+
#[derive(Subcommand)]
194+
pub enum QueryCommands {
195+
/// Check the status of a running query and retrieve results.
196+
/// Exit codes: 0 = succeeded, 1 = failed, 2 = still running (poll again)
197+
Status {
198+
/// Query run ID
199+
id: String,
200+
},
201+
}
202+
190203
#[derive(Subcommand)]
191204
pub enum AuthCommands {
192205
/// Remove authentication for a profile

src/main.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ mod workspace;
1919

2020
use anstyle::AnsiColor;
2121
use clap::{Parser, builder::Styles};
22-
use command::{AuthCommands, Commands, ConnectionsCommands, ConnectionsCreateCommands, DatasetsCommands, IndexesCommands, JobsCommands, QueriesCommands, ResultsCommands, SkillCommands, TablesCommands, WorkspaceCommands};
22+
use command::{AuthCommands, Commands, ConnectionsCommands, ConnectionsCreateCommands, DatasetsCommands, IndexesCommands, JobsCommands, QueriesCommands, QueryCommands, ResultsCommands, SkillCommands, TablesCommands, WorkspaceCommands};
2323

2424
#[derive(Parser)]
2525
#[command(name = "hotdata", version, about = concat!("Hotdata CLI - Command line interface for Hotdata (v", env!("CARGO_PKG_VERSION"), ")"), long_about = None, disable_version_flag = true)]
@@ -109,9 +109,24 @@ fn main() {
109109
}
110110
}
111111
}
112-
Commands::Query { sql, workspace_id, connection, output } => {
112+
Commands::Query { sql, workspace_id, connection, output, command } => {
113113
let workspace_id = resolve_workspace(workspace_id);
114-
query::execute(&sql, &workspace_id, connection.as_deref(), &output)
114+
match command {
115+
Some(QueryCommands::Status { id }) => {
116+
query::poll(&id, &workspace_id, &output)
117+
}
118+
None => {
119+
match sql {
120+
Some(sql) => query::execute(&sql, &workspace_id, connection.as_deref(), &output),
121+
None => {
122+
use clap::CommandFactory;
123+
let mut cmd = Cli::command();
124+
cmd.build();
125+
cmd.find_subcommand_mut("query").unwrap().print_help().unwrap();
126+
}
127+
}
128+
}
129+
}
115130
}
116131
Commands::Workspaces { command } => match command {
117132
WorkspaceCommands::List { output } => workspace::list(&output),

src/query.rs

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,21 @@ pub struct QueryResponse {
1313
pub warning: Option<String>,
1414
}
1515

16+
#[derive(Deserialize)]
17+
struct AsyncResponse {
18+
query_run_id: String,
19+
status: String,
20+
}
21+
22+
#[derive(Deserialize)]
23+
struct QueryRunResponse {
24+
id: String,
25+
status: String,
26+
result_id: Option<String>,
27+
#[serde(default)]
28+
error: Option<String>,
29+
}
30+
1631
fn value_to_string(v: &Value) -> String {
1732
match v {
1833
Value::Null => "NULL".to_string(),
@@ -33,12 +48,40 @@ fn value_to_string(v: &Value) -> String {
3348
pub fn execute(sql: &str, workspace_id: &str, connection: Option<&str>, format: &str) {
3449
let api = ApiClient::new(Some(workspace_id));
3550

36-
let mut body = serde_json::json!({ "sql": sql });
51+
let mut body = serde_json::json!({
52+
"sql": sql,
53+
"async": true,
54+
"async_after_ms": 1000,
55+
});
3756
if let Some(conn) = connection {
3857
body["connection_id"] = Value::String(conn.to_string());
3958
}
4059

60+
let spinner = indicatif::ProgressBar::new_spinner();
61+
spinner.set_style(
62+
indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
63+
.unwrap(),
64+
);
65+
spinner.set_message("running query...");
66+
spinner.enable_steady_tick(std::time::Duration::from_millis(80));
67+
4168
let (status, resp_body) = api.post_raw("/query", &body);
69+
spinner.finish_and_clear();
70+
71+
if status.as_u16() == 202 {
72+
let async_resp: AsyncResponse = match serde_json::from_str(&resp_body) {
73+
Ok(r) => r,
74+
Err(e) => {
75+
eprintln!("error parsing async response: {e}");
76+
std::process::exit(1);
77+
}
78+
};
79+
use crossterm::style::Stylize;
80+
eprintln!("{}", format!("query still running (status: {})", async_resp.status).yellow());
81+
eprintln!("query_run_id: {}", async_resp.query_run_id);
82+
eprintln!("{}", format!("Poll with: hotdata query status {}", async_resp.query_run_id).dark_grey());
83+
std::process::exit(2);
84+
}
4285

4386
if !status.is_success() {
4487
let message = serde_json::from_str::<Value>(&resp_body)
@@ -61,6 +104,41 @@ pub fn execute(sql: &str, workspace_id: &str, connection: Option<&str>, format:
61104
print_result(&result, format);
62105
}
63106

107+
/// Poll a query run by ID. If succeeded and has a result_id, fetch and display the result.
108+
pub fn poll(query_run_id: &str, workspace_id: &str, format: &str) {
109+
let api = ApiClient::new(Some(workspace_id));
110+
111+
let run: QueryRunResponse = api.get(&format!("/query-runs/{query_run_id}"));
112+
113+
match run.status.as_str() {
114+
"succeeded" => {
115+
match run.result_id {
116+
Some(ref result_id) => {
117+
let result: QueryResponse = api.get(&format!("/results/{result_id}"));
118+
print_result(&result, format);
119+
}
120+
None => {
121+
use crossterm::style::Stylize;
122+
println!("{}", "Query succeeded but no result available.".yellow());
123+
}
124+
}
125+
}
126+
"failed" => {
127+
use crossterm::style::Stylize;
128+
let err = run.error.as_deref().unwrap_or("unknown error");
129+
eprintln!("{}", format!("query failed: {err}").red());
130+
std::process::exit(1);
131+
}
132+
status => {
133+
use crossterm::style::Stylize;
134+
eprintln!("{}", format!("query status: {status}").yellow());
135+
eprintln!("query_run_id: {}", run.id);
136+
eprintln!("{}", format!("Poll again with: hotdata query status {}", run.id).dark_grey());
137+
std::process::exit(2);
138+
}
139+
}
140+
}
141+
64142
pub fn print_result(result: &QueryResponse, format: &str) {
65143
if let Some(ref warning) = result.warning {
66144
eprintln!("warning: {warning}");

0 commit comments

Comments
 (0)