Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ pub enum Commands {
command: Option<DatasetsCommands>,
},

/// Execute a SQL query
/// Execute a SQL query, or check status of a running query
Query {
/// SQL query string
sql: String,
/// SQL query string (omit when using a subcommand)
sql: Option<String>,

/// Workspace ID (defaults to first workspace from login)
#[arg(long, short = 'w')]
Expand All @@ -41,6 +41,9 @@ pub enum Commands {
/// Output format
#[arg(long = "output", short = 'o', default_value = "table", value_parser = ["table", "json", "csv"])]
output: String,

#[command(subcommand)]
command: Option<QueryCommands>,
},

/// Manage workspaces
Expand Down Expand Up @@ -187,6 +190,15 @@ impl From<ShellChoice> for clap_complete::Shell {
}
}

#[derive(Subcommand)]
pub enum QueryCommands {
/// Check the status of a running query and retrieve results
Status {
/// Query run ID
id: String,
},
}

#[derive(Subcommand)]
pub enum AuthCommands {
/// Remove authentication for a profile
Expand Down
21 changes: 18 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod workspace;

use anstyle::AnsiColor;
use clap::{Parser, builder::Styles};
use command::{AuthCommands, Commands, ConnectionsCommands, ConnectionsCreateCommands, DatasetsCommands, IndexesCommands, JobsCommands, QueriesCommands, ResultsCommands, SkillCommands, TablesCommands, WorkspaceCommands};
use command::{AuthCommands, Commands, ConnectionsCommands, ConnectionsCreateCommands, DatasetsCommands, IndexesCommands, JobsCommands, QueriesCommands, QueryCommands, ResultsCommands, SkillCommands, TablesCommands, WorkspaceCommands};

#[derive(Parser)]
#[command(name = "hotdata", version, about = concat!("Hotdata CLI - Command line interface for Hotdata (v", env!("CARGO_PKG_VERSION"), ")"), long_about = None, disable_version_flag = true)]
Expand Down Expand Up @@ -109,9 +109,24 @@ fn main() {
}
}
}
Commands::Query { sql, workspace_id, connection, output } => {
Commands::Query { sql, workspace_id, connection, output, command } => {
let workspace_id = resolve_workspace(workspace_id);
query::execute(&sql, &workspace_id, connection.as_deref(), &output)
match command {
Some(QueryCommands::Status { id }) => {
query::poll(&id, &workspace_id, &output)
}
None => {
match sql {
Some(sql) => query::execute(&sql, &workspace_id, connection.as_deref(), &output),
None => {
use clap::CommandFactory;
let mut cmd = Cli::command();
cmd.build();
cmd.find_subcommand_mut("query").unwrap().print_help().unwrap();
}
}
}
}
}
Commands::Workspaces { command } => match command {
WorkspaceCommands::List { output } => workspace::list(&output),
Expand Down
79 changes: 78 additions & 1 deletion src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,21 @@ pub struct QueryResponse {
pub warning: Option<String>,
}

#[derive(Deserialize)]
struct AsyncResponse {
query_run_id: String,
status: String,
}

#[derive(Deserialize)]
struct QueryRunResponse {
id: String,
status: String,
result_id: Option<String>,
#[serde(default)]
error: Option<String>,
}

fn value_to_string(v: &Value) -> String {
match v {
Value::Null => "NULL".to_string(),
Expand All @@ -33,12 +48,40 @@ fn value_to_string(v: &Value) -> String {
pub fn execute(sql: &str, workspace_id: &str, connection: Option<&str>, format: &str) {
let api = ApiClient::new(Some(workspace_id));

let mut body = serde_json::json!({ "sql": sql });
let mut body = serde_json::json!({
"sql": sql,
"async": true,
"async_after_ms": 1000,
});
if let Some(conn) = connection {
body["connection_id"] = Value::String(conn.to_string());
}

let spinner = indicatif::ProgressBar::new_spinner();
spinner.set_style(
indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
.unwrap(),
);
spinner.set_message("running query...");
spinner.enable_steady_tick(std::time::Duration::from_millis(80));

let (status, resp_body) = api.post_raw("/query", &body);
spinner.finish_and_clear();

if status.as_u16() == 202 {
let async_resp: AsyncResponse = match serde_json::from_str(&resp_body) {
Ok(r) => r,
Err(e) => {
eprintln!("error parsing async response: {e}");
std::process::exit(1);
}
};
use crossterm::style::Stylize;
eprintln!("{}", format!("query still running (status: {})", async_resp.status).yellow());
eprintln!("query_run_id: {}", async_resp.query_run_id);
eprintln!("{}", format!("Poll with: hotdata query status {}", async_resp.query_run_id).dark_grey());
return;
}

if !status.is_success() {
let message = serde_json::from_str::<Value>(&resp_body)
Expand All @@ -61,6 +104,40 @@ pub fn execute(sql: &str, workspace_id: &str, connection: Option<&str>, format:
print_result(&result, format);
}

/// Poll a query run by ID. If succeeded and has a result_id, fetch and display the result.
pub fn poll(query_run_id: &str, workspace_id: &str, format: &str) {
let api = ApiClient::new(Some(workspace_id));

let run: QueryRunResponse = api.get(&format!("/query-runs/{query_run_id}"));

match run.status.as_str() {
"succeeded" => {
match run.result_id {
Some(ref result_id) => {
let result: QueryResponse = api.get(&format!("/results/{result_id}"));
print_result(&result, format);
}
None => {
use crossterm::style::Stylize;
println!("{}", "Query succeeded but no result available.".yellow());
}
}
}
"failed" => {
use crossterm::style::Stylize;
let err = run.error.as_deref().unwrap_or("unknown error");
eprintln!("{}", format!("query failed: {err}").red());
std::process::exit(1);
}
status => {
use crossterm::style::Stylize;
eprintln!("{}", format!("query status: {status}").yellow());
eprintln!("query_run_id: {}", run.id);
eprintln!("{}", format!("Poll again with: hotdata query status {}", run.id).dark_grey());
}
Comment thread
pthurlow marked this conversation as resolved.
}
}

pub fn print_result(result: &QueryResponse, format: &str) {
if let Some(ref warning) = result.warning {
Comment thread
pthurlow marked this conversation as resolved.
eprintln!("warning: {warning}");
Expand Down
Loading