diff --git a/rust/crates/sift_cli/Cargo.toml b/rust/crates/sift_cli/Cargo.toml index fc04b8919..d3bcb2756 100644 --- a/rust/crates/sift_cli/Cargo.toml +++ b/rust/crates/sift_cli/Cargo.toml @@ -30,6 +30,7 @@ indicatif = { workspace = true } parquet = { workspace = true } pbjson-types = { workspace = true } reqwest = { workspace = true } +serde_json = { workspace = true } sift_pbfs = { workspace = true } tdms = { workspace = true } hdf5 = { workspace = true } diff --git a/rust/crates/sift_cli/src/cmd/import/csv.rs b/rust/crates/sift_cli/src/cmd/import/csv.rs index 627556efd..eade97445 100644 --- a/rust/crates/sift_cli/src/cmd/import/csv.rs +++ b/rust/crates/sift_cli/src/cmd/import/csv.rs @@ -9,7 +9,6 @@ use anyhow::{Context as AnyhowContext, Result, anyhow}; use chrono::DateTime; use crossterm::style::Stylize; use pbjson_types::Timestamp; -use reqwest::header::{CONTENT_ENCODING, CONTENT_TYPE}; use sift_rs::{ common::r#type::v1::{ChannelConfig, ChannelDataType}, data_imports::v2::{ @@ -25,15 +24,12 @@ use crate::{ Context, import::utils::{try_parse_bit_field_config, try_parse_enum_config}, }, - util::{ - api::{create_grpc_channel, create_rest_client}, - tty::Output, - }, + util::{api::create_grpc_channel, tty::Output}, }; use super::{ preview_import_config, - utils::{gzip_file, validate_time_format}, + utils::{upload_gzipped_file, validate_time_format}, wait_for_job_completion, }; @@ -75,29 +71,10 @@ pub async fn run(ctx: Context, args: ImportCsvArgs) -> Result { .into_inner(); csv_file.rewind()?; - let compressed_data = gzip_file(csv_file)?; - - let rest_client = create_rest_client(&ctx)?; - let res = rest_client - .post(upload_url) - .header(CONTENT_ENCODING, "gzip") - .header(CONTENT_TYPE, "text/csv") - .body(compressed_data) - .send() + let job_id = upload_gzipped_file(&ctx, &upload_url, csv_file, "text/csv") .await .context("failed to upload CSV file")?; - if !res.status().is_success() { - let status = res.status(); - let text = res - .text() - .await - .unwrap_or_else(|_| "".into()); - return Err(anyhow!( - "failed to upload CSV with http status {status}: {text}" - )); - } - let location = args.run.as_ref().map_or_else( || format!("asset '{}'", args.asset.cyan()), |r| format!("run '{}'", r.clone().cyan()), @@ -113,7 +90,7 @@ pub async fn run(ctx: Context, args: ImportCsvArgs) -> Result { return Ok(ExitCode::SUCCESS); } - wait_for_job_completion(grpc_channel, location).await + wait_for_job_completion(grpc_channel, job_id, location).await } fn create_data_import_request( @@ -300,14 +277,14 @@ fn create_data_import_request( let mut enum_configs = Vec::new(); let mut bit_field_configs = Vec::new(); - if data_type == ChannelDataType::Enum.into() { + if data_type == i32::from(ChannelDataType::Enum) { let Some(configs) = enum_configs_iter.next() else { return Err(anyhow!( "'{name}' was declared as type enum but --enum-config was not specified" )); }; enum_configs = configs; - } else if data_type == ChannelDataType::BitField.into() { + } else if data_type == i32::from(ChannelDataType::BitField) { let Some(configs) = bit_field_configs_iter.next() else { return Err(anyhow!( "'{name}' was declared as type bit-field but --bit-field-config was not specified" diff --git a/rust/crates/sift_cli/src/cmd/import/hdf5/import.rs b/rust/crates/sift_cli/src/cmd/import/hdf5/import.rs index c6e3674ab..ba1f91f93 100644 --- a/rust/crates/sift_cli/src/cmd/import/hdf5/import.rs +++ b/rust/crates/sift_cli/src/cmd/import/hdf5/import.rs @@ -79,7 +79,7 @@ pub async fn run(ctx: Context, args: ImportHdf5Args) -> Result { .context("error creating data import for hdf5")? .into_inner(); - upload_gzipped_file(&ctx, &upload_url, file, "application/x-hdf5") + let job_id = upload_gzipped_file(&ctx, &upload_url, file, "application/x-hdf5") .await .context("failed to upload hdf5 file")?; @@ -99,7 +99,7 @@ pub async fn run(ctx: Context, args: ImportHdf5Args) -> Result { return Ok(ExitCode::SUCCESS); } - wait_for_job_completion(grpc_channel, location).await + wait_for_job_completion(grpc_channel, job_id, location).await } pub fn build_hdf5_config(args: &ImportHdf5Args) -> Result { diff --git a/rust/crates/sift_cli/src/cmd/import/mod.rs b/rust/crates/sift_cli/src/cmd/import/mod.rs index 40e5c3b2e..c223242a0 100644 --- a/rust/crates/sift_cli/src/cmd/import/mod.rs +++ b/rust/crates/sift_cli/src/cmd/import/mod.rs @@ -3,13 +3,9 @@ use crossterm::style::Stylize; use std::{process::ExitCode, time::Duration}; use tokio::time::sleep; -use sift_rs::{ - SiftChannel, - common::r#type::v1::ChannelConfig, - jobs::v1::{JobStatus, JobType}, -}; +use sift_rs::{SiftChannel, common::r#type::v1::ChannelConfig, jobs::v1::JobStatus}; -use crate::util::{job::JobServiceWrapper, progress::Spinner, tty::Output, user::get_user_id}; +use crate::util::{job::JobServiceWrapper, progress::Spinner, tty::Output}; pub mod backup; pub mod csv; @@ -25,18 +21,15 @@ const INDENT_4: &str = " "; pub async fn wait_for_job_completion( grpc_channel: SiftChannel, + job_id: String, import_output_location: String, ) -> Result { let spinner = Spinner::new(); spinner.set_message(format!("{} file for processing", "Uploaded".green())); - let user_id = get_user_id(grpc_channel.clone()).await?; let mut job_service = JobServiceWrapper::new(grpc_channel.clone()); - let Some(mut job) = job_service - .get_latest_job_for_user(&user_id, JobType::DataImport) - .await? - else { + let Some(mut job) = job_service.get_job(&job_id).await? else { spinner.finish_and_clear(); Output::new() diff --git a/rust/crates/sift_cli/src/cmd/import/parquet/flat_dataset.rs b/rust/crates/sift_cli/src/cmd/import/parquet/flat_dataset.rs index 72f3bca8f..80aba690b 100644 --- a/rust/crates/sift_cli/src/cmd/import/parquet/flat_dataset.rs +++ b/rust/crates/sift_cli/src/cmd/import/parquet/flat_dataset.rs @@ -2,7 +2,6 @@ use std::{collections::HashMap, fs::File, io::Seek, process::ExitCode}; use anyhow::{Context as AnyhowContext, Result, anyhow}; use crossterm::style::Stylize; -use reqwest::header::{CONTENT_ENCODING, CONTENT_TYPE}; use sift_rs::{ common::r#type::v1::{ChannelConfig, ChannelDataType}, data_imports::v2::{ @@ -20,14 +19,11 @@ use crate::{ import::{ parquet::FooterMetadata, preview_import_config, - utils::{gzip_file, try_parse_bit_field_config, try_parse_enum_config}, + utils::{try_parse_bit_field_config, try_parse_enum_config, upload_gzipped_file}, wait_for_job_completion, }, }, - util::{ - api::{create_grpc_channel, create_rest_client}, - tty::Output, - }, + util::{api::create_grpc_channel, tty::Output}, }; pub async fn run(ctx: Context, args: FlatDatasetArgs) -> Result { @@ -78,29 +74,10 @@ pub async fn run(ctx: Context, args: FlatDatasetArgs) -> Result { .into_inner(); file.rewind()?; - let compressed_data = gzip_file(file)?; - - let rest_client = create_rest_client(&ctx)?; - let res = rest_client - .post(upload_url) - .header(CONTENT_ENCODING, "gzip") - .header(CONTENT_TYPE, "application/vnd.apache.parquet") - .body(compressed_data) - .send() + let job_id = upload_gzipped_file(&ctx, &upload_url, file, "application/vnd.apache.parquet") .await .context("failed to upload Parquet file")?; - if !res.status().is_success() { - let status = res.status(); - let text = res - .text() - .await - .unwrap_or_else(|_| "".into()); - return Err(anyhow!( - "failed to upload Parquet with http status {status}: {text}" - )); - } - let location = args.run.as_ref().map_or_else( || format!("asset '{}'", args.asset.cyan()), |r| format!("run '{}'", r.clone().cyan()), @@ -116,7 +93,7 @@ pub async fn run(ctx: Context, args: FlatDatasetArgs) -> Result { return Ok(ExitCode::SUCCESS); } - wait_for_job_completion(grpc_channel, location).await + wait_for_job_completion(grpc_channel, job_id, location).await } fn update_config_with_overrides( diff --git a/rust/crates/sift_cli/src/cmd/import/tdms/detect_tdms_config.rs b/rust/crates/sift_cli/src/cmd/import/tdms/detect_tdms_config.rs index 96cfe52d5..f2fe3cc81 100644 --- a/rust/crates/sift_cli/src/cmd/import/tdms/detect_tdms_config.rs +++ b/rust/crates/sift_cli/src/cmd/import/tdms/detect_tdms_config.rs @@ -69,7 +69,7 @@ pub async fn run(ctx: Context, args: ImportTdmsArgs) -> Result { .context("error creating data import for tdms")? .into_inner(); - upload_gzipped_file(&ctx, &upload_url, file, "application/octet-stream") + let job_id = upload_gzipped_file(&ctx, &upload_url, file, "application/octet-stream") .await .context("failed to upload tdms file")?; @@ -89,7 +89,7 @@ pub async fn run(ctx: Context, args: ImportTdmsArgs) -> Result { return Ok(ExitCode::SUCCESS); } - wait_for_job_completion(grpc_channel, location).await + wait_for_job_completion(grpc_channel, job_id, location).await } pub fn build_tdms_config(args: &ImportTdmsArgs) -> Result { diff --git a/rust/crates/sift_cli/src/cmd/import/utils.rs b/rust/crates/sift_cli/src/cmd/import/utils.rs index 8fbc73830..af5f03dea 100644 --- a/rust/crates/sift_cli/src/cmd/import/utils.rs +++ b/rust/crates/sift_cli/src/cmd/import/utils.rs @@ -11,13 +11,14 @@ use sift_rs::common::r#type::v1::{ChannelBitFieldElement, ChannelEnumType}; use crate::{cli::time::TimeFormat, cmd::Context as CmdContext, util::api::create_rest_client}; /// Gzip and upload a file to a pre-signed upload URL with the given content type. -/// Reads from the file's current cursor position. +/// Reads from the file's current cursor position. Returns the job ID from the +/// upload response. pub async fn upload_gzipped_file( ctx: &CmdContext, upload_url: &str, file: File, content_type: &str, -) -> Result<()> { +) -> Result { let compressed_data = gzip_file(file)?; let rest_client = create_rest_client(ctx).context("failed to create rest client")?; @@ -38,7 +39,22 @@ pub async fn upload_gzipped_file( .unwrap_or_else(|_| "".into()); return Err(anyhow!("upload failed with http status {status}: {text}")); } - Ok(()) + extract_job_id(res).await +} + +/// Parses the `jobId` from a successful upload response. +async fn extract_job_id(res: reqwest::Response) -> Result { + let body_text = res + .text() + .await + .context("failed to read upload response body")?; + let body: serde_json::Value = + serde_json::from_str(&body_text).context("failed to parse upload response as JSON")?; + body.get("jobId") + .and_then(|v| v.as_str()) + .filter(|s| !s.is_empty()) + .map(String::from) + .ok_or_else(|| anyhow!("upload response did not include jobId: {body_text}")) } /// Be sure that the file's cursor is rewinded to the start before hand. diff --git a/rust/crates/sift_cli/src/util/job.rs b/rust/crates/sift_cli/src/util/job.rs index e24fccc63..0bb8ff581 100644 --- a/rust/crates/sift_cli/src/util/job.rs +++ b/rust/crates/sift_cli/src/util/job.rs @@ -3,7 +3,7 @@ use std::ops::{Deref, DerefMut}; use anyhow::{Context, Result}; use sift_rs::{ SiftChannel, - jobs::v1::{Job, JobType, ListJobsRequest, job_service_client::JobServiceClient}, + jobs::v1::{Job, ListJobsRequest, job_service_client::JobServiceClient}, }; pub struct JobServiceWrapper(JobServiceClient); @@ -28,27 +28,6 @@ impl JobServiceWrapper { JobServiceWrapper(job_service) } - pub async fn get_latest_job_for_user( - &mut self, - user_id: &str, - job_type: JobType, - ) -> Result> { - let jt = job_type.as_str_name(); - - let res = self - .list_jobs(ListJobsRequest { - page_size: 1, - filter: format!("job_type == '{jt}' && created_by_user_id == '{user_id}'"), - order_by: "created_date desc".into(), - ..Default::default() - }) - .await - .context("failed to retrieve latest user job")? - .into_inner(); - - Ok(res.jobs.first().cloned()) - } - pub async fn get_job(&mut self, job_id: &str) -> Result> { let res = self .list_jobs(ListJobsRequest { diff --git a/rust/crates/sift_cli/src/util/mod.rs b/rust/crates/sift_cli/src/util/mod.rs index e1e3fe2f3..634ba6379 100644 --- a/rust/crates/sift_cli/src/util/mod.rs +++ b/rust/crates/sift_cli/src/util/mod.rs @@ -4,4 +4,3 @@ pub mod channel; pub mod job; pub mod progress; pub mod tty; -pub mod user; diff --git a/rust/crates/sift_cli/src/util/user.rs b/rust/crates/sift_cli/src/util/user.rs deleted file mode 100644 index 6331cb7d8..000000000 --- a/rust/crates/sift_cli/src/util/user.rs +++ /dev/null @@ -1,15 +0,0 @@ -use anyhow::{Context, Result}; -use sift_rs::{ - SiftChannel, - me::v2::{GetMeRequest, me_service_client::MeServiceClient}, -}; - -pub async fn get_user_id(grpc_channel: SiftChannel) -> Result { - let mut me_service = MeServiceClient::new(grpc_channel); - let res = me_service - .get_me(GetMeRequest::default()) - .await - .context("failed to retrieve user info from provided --profile")? - .into_inner(); - Ok(res.user_id) -}