Skip to content

Commit 2311398

Browse files
committed
use job ID from upload response
1 parent ab47756 commit 2311398

10 files changed

Lines changed: 38 additions & 112 deletions

File tree

rust/crates/sift_cli/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ indicatif = { workspace = true }
3030
parquet = { workspace = true }
3131
pbjson-types = { workspace = true }
3232
reqwest = { workspace = true }
33+
serde_json = { workspace = true }
3334
sift_pbfs = { workspace = true }
3435
tdms = { workspace = true }
3536
hdf5 = { workspace = true }

rust/crates/sift_cli/src/cmd/import/csv.rs

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use anyhow::{Context as AnyhowContext, Result, anyhow};
99
use chrono::DateTime;
1010
use crossterm::style::Stylize;
1111
use pbjson_types::Timestamp;
12-
use reqwest::header::{CONTENT_ENCODING, CONTENT_TYPE};
1312
use sift_rs::{
1413
common::r#type::v1::{ChannelConfig, ChannelDataType},
1514
data_imports::v2::{
@@ -25,15 +24,12 @@ use crate::{
2524
Context,
2625
import::utils::{try_parse_bit_field_config, try_parse_enum_config},
2726
},
28-
util::{
29-
api::{create_grpc_channel, create_rest_client},
30-
tty::Output,
31-
},
27+
util::{api::create_grpc_channel, tty::Output},
3228
};
3329

3430
use super::{
3531
preview_import_config,
36-
utils::{gzip_file, validate_time_format},
32+
utils::{upload_gzipped_file, validate_time_format},
3733
wait_for_job_completion,
3834
};
3935

@@ -75,29 +71,10 @@ pub async fn run(ctx: Context, args: ImportCsvArgs) -> Result<ExitCode> {
7571
.into_inner();
7672

7773
csv_file.rewind()?;
78-
let compressed_data = gzip_file(csv_file)?;
79-
80-
let rest_client = create_rest_client(&ctx)?;
81-
let res = rest_client
82-
.post(upload_url)
83-
.header(CONTENT_ENCODING, "gzip")
84-
.header(CONTENT_TYPE, "text/csv")
85-
.body(compressed_data)
86-
.send()
74+
let job_id = upload_gzipped_file(&ctx, &upload_url, csv_file, "text/csv")
8775
.await
8876
.context("failed to upload CSV file")?;
8977

90-
if !res.status().is_success() {
91-
let status = res.status();
92-
let text = res
93-
.text()
94-
.await
95-
.unwrap_or_else(|_| "<failed to read body>".into());
96-
return Err(anyhow!(
97-
"failed to upload CSV with http status {status}: {text}"
98-
));
99-
}
100-
10178
let location = args.run.as_ref().map_or_else(
10279
|| format!("asset '{}'", args.asset.cyan()),
10380
|r| format!("run '{}'", r.clone().cyan()),
@@ -113,7 +90,7 @@ pub async fn run(ctx: Context, args: ImportCsvArgs) -> Result<ExitCode> {
11390

11491
return Ok(ExitCode::SUCCESS);
11592
}
116-
wait_for_job_completion(grpc_channel, location).await
93+
wait_for_job_completion(grpc_channel, job_id, location).await
11794
}
11895

11996
fn create_data_import_request<R: io::Read>(
@@ -300,14 +277,14 @@ fn create_data_import_request<R: io::Read>(
300277
let mut enum_configs = Vec::new();
301278
let mut bit_field_configs = Vec::new();
302279

303-
if data_type == ChannelDataType::Enum.into() {
280+
if data_type == i32::from(ChannelDataType::Enum) {
304281
let Some(configs) = enum_configs_iter.next() else {
305282
return Err(anyhow!(
306283
"'{name}' was declared as type enum but --enum-config was not specified"
307284
));
308285
};
309286
enum_configs = configs;
310-
} else if data_type == ChannelDataType::BitField.into() {
287+
} else if data_type == i32::from(ChannelDataType::BitField) {
311288
let Some(configs) = bit_field_configs_iter.next() else {
312289
return Err(anyhow!(
313290
"'{name}' was declared as type bit-field but --bit-field-config was not specified"

rust/crates/sift_cli/src/cmd/import/hdf5/import.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ pub async fn run(ctx: Context, args: ImportHdf5Args) -> Result<ExitCode> {
7979
.context("error creating data import for hdf5")?
8080
.into_inner();
8181

82-
upload_gzipped_file(&ctx, &upload_url, file, "application/x-hdf5")
82+
let job_id = upload_gzipped_file(&ctx, &upload_url, file, "application/x-hdf5")
8383
.await
8484
.context("failed to upload hdf5 file")?;
8585

@@ -99,7 +99,7 @@ pub async fn run(ctx: Context, args: ImportHdf5Args) -> Result<ExitCode> {
9999
return Ok(ExitCode::SUCCESS);
100100
}
101101

102-
wait_for_job_completion(grpc_channel, location).await
102+
wait_for_job_completion(grpc_channel, job_id, location).await
103103
}
104104

105105
pub fn build_hdf5_config(args: &ImportHdf5Args) -> Result<Hdf5Config> {

rust/crates/sift_cli/src/cmd/import/mod.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,9 @@ use crossterm::style::Stylize;
33
use std::{process::ExitCode, time::Duration};
44
use tokio::time::sleep;
55

6-
use sift_rs::{
7-
SiftChannel,
8-
common::r#type::v1::ChannelConfig,
9-
jobs::v1::{JobStatus, JobType},
10-
};
6+
use sift_rs::{SiftChannel, common::r#type::v1::ChannelConfig, jobs::v1::JobStatus};
117

12-
use crate::util::{job::JobServiceWrapper, progress::Spinner, tty::Output, user::get_user_id};
8+
use crate::util::{job::JobServiceWrapper, progress::Spinner, tty::Output};
139

1410
pub mod backup;
1511
pub mod csv;
@@ -25,18 +21,15 @@ const INDENT_4: &str = " ";
2521

2622
pub async fn wait_for_job_completion(
2723
grpc_channel: SiftChannel,
24+
job_id: String,
2825
import_output_location: String,
2926
) -> Result<ExitCode> {
3027
let spinner = Spinner::new();
3128
spinner.set_message(format!("{} file for processing", "Uploaded".green()));
3229

33-
let user_id = get_user_id(grpc_channel.clone()).await?;
3430
let mut job_service = JobServiceWrapper::new(grpc_channel.clone());
3531

36-
let Some(mut job) = job_service
37-
.get_latest_job_for_user(&user_id, JobType::DataImport)
38-
.await?
39-
else {
32+
let Some(mut job) = job_service.get_job(&job_id).await? else {
4033
spinner.finish_and_clear();
4134

4235
Output::new()

rust/crates/sift_cli/src/cmd/import/parquet/flat_dataset.rs

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::{collections::HashMap, fs::File, io::Seek, process::ExitCode};
22

33
use anyhow::{Context as AnyhowContext, Result, anyhow};
44
use crossterm::style::Stylize;
5-
use reqwest::header::{CONTENT_ENCODING, CONTENT_TYPE};
65
use sift_rs::{
76
common::r#type::v1::{ChannelConfig, ChannelDataType},
87
data_imports::v2::{
@@ -20,14 +19,11 @@ use crate::{
2019
import::{
2120
parquet::FooterMetadata,
2221
preview_import_config,
23-
utils::{gzip_file, try_parse_bit_field_config, try_parse_enum_config},
22+
utils::{try_parse_bit_field_config, try_parse_enum_config, upload_gzipped_file},
2423
wait_for_job_completion,
2524
},
2625
},
27-
util::{
28-
api::{create_grpc_channel, create_rest_client},
29-
tty::Output,
30-
},
26+
util::{api::create_grpc_channel, tty::Output},
3127
};
3228

3329
pub async fn run(ctx: Context, args: FlatDatasetArgs) -> Result<ExitCode> {
@@ -78,29 +74,10 @@ pub async fn run(ctx: Context, args: FlatDatasetArgs) -> Result<ExitCode> {
7874
.into_inner();
7975

8076
file.rewind()?;
81-
let compressed_data = gzip_file(file)?;
82-
83-
let rest_client = create_rest_client(&ctx)?;
84-
let res = rest_client
85-
.post(upload_url)
86-
.header(CONTENT_ENCODING, "gzip")
87-
.header(CONTENT_TYPE, "application/vnd.apache.parquet")
88-
.body(compressed_data)
89-
.send()
77+
let job_id = upload_gzipped_file(&ctx, &upload_url, file, "application/vnd.apache.parquet")
9078
.await
9179
.context("failed to upload Parquet file")?;
9280

93-
if !res.status().is_success() {
94-
let status = res.status();
95-
let text = res
96-
.text()
97-
.await
98-
.unwrap_or_else(|_| "<failed to read body>".into());
99-
return Err(anyhow!(
100-
"failed to upload Parquet with http status {status}: {text}"
101-
));
102-
}
103-
10481
let location = args.run.as_ref().map_or_else(
10582
|| format!("asset '{}'", args.asset.cyan()),
10683
|r| format!("run '{}'", r.clone().cyan()),
@@ -116,7 +93,7 @@ pub async fn run(ctx: Context, args: FlatDatasetArgs) -> Result<ExitCode> {
11693

11794
return Ok(ExitCode::SUCCESS);
11895
}
119-
wait_for_job_completion(grpc_channel, location).await
96+
wait_for_job_completion(grpc_channel, job_id, location).await
12097
}
12198

12299
fn update_config_with_overrides(

rust/crates/sift_cli/src/cmd/import/tdms/detect_tdms_config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ pub async fn run(ctx: Context, args: ImportTdmsArgs) -> Result<ExitCode> {
6969
.context("error creating data import for tdms")?
7070
.into_inner();
7171

72-
upload_gzipped_file(&ctx, &upload_url, file, "application/octet-stream")
72+
let job_id = upload_gzipped_file(&ctx, &upload_url, file, "application/octet-stream")
7373
.await
7474
.context("failed to upload tdms file")?;
7575

@@ -89,7 +89,7 @@ pub async fn run(ctx: Context, args: ImportTdmsArgs) -> Result<ExitCode> {
8989
return Ok(ExitCode::SUCCESS);
9090
}
9191

92-
wait_for_job_completion(grpc_channel, location).await
92+
wait_for_job_completion(grpc_channel, job_id, location).await
9393
}
9494

9595
pub fn build_tdms_config(args: &ImportTdmsArgs) -> Result<TdmsConfig> {

rust/crates/sift_cli/src/cmd/import/utils.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ use sift_rs::common::r#type::v1::{ChannelBitFieldElement, ChannelEnumType};
1111
use crate::{cli::time::TimeFormat, cmd::Context as CmdContext, util::api::create_rest_client};
1212

1313
/// Gzip and upload a file to a pre-signed upload URL with the given content type.
14-
/// Reads from the file's current cursor position.
14+
/// Reads from the file's current cursor position. Returns the job ID from the
15+
/// upload response.
1516
pub async fn upload_gzipped_file(
1617
ctx: &CmdContext,
1718
upload_url: &str,
1819
file: File,
1920
content_type: &str,
20-
) -> Result<()> {
21+
) -> Result<String> {
2122
let compressed_data = gzip_file(file)?;
2223
let rest_client = create_rest_client(ctx).context("failed to create rest client")?;
2324

@@ -38,7 +39,21 @@ pub async fn upload_gzipped_file(
3839
.unwrap_or_else(|_| "<failed to read body>".into());
3940
return Err(anyhow!("upload failed with http status {status}: {text}"));
4041
}
41-
Ok(())
42+
extract_job_id(res).await
43+
}
44+
45+
/// Parses the `jobId` from a successful upload response.
46+
async fn extract_job_id(res: reqwest::Response) -> Result<String> {
47+
let body_text = res
48+
.text()
49+
.await
50+
.context("failed to read upload response body")?;
51+
let body: serde_json::Value =
52+
serde_json::from_str(&body_text).context("failed to parse upload response as JSON")?;
53+
body.get("jobId")
54+
.and_then(|v| v.as_str())
55+
.map(String::from)
56+
.ok_or_else(|| anyhow!("upload response did not include jobId: {body_text}"))
4257
}
4358

4459
/// Be sure that the file's cursor is rewinded to the start before hand.

rust/crates/sift_cli/src/util/job.rs

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::ops::{Deref, DerefMut};
33
use anyhow::{Context, Result};
44
use sift_rs::{
55
SiftChannel,
6-
jobs::v1::{Job, JobType, ListJobsRequest, job_service_client::JobServiceClient},
6+
jobs::v1::{Job, ListJobsRequest, job_service_client::JobServiceClient},
77
};
88

99
pub struct JobServiceWrapper(JobServiceClient<SiftChannel>);
@@ -28,27 +28,6 @@ impl JobServiceWrapper {
2828
JobServiceWrapper(job_service)
2929
}
3030

31-
pub async fn get_latest_job_for_user(
32-
&mut self,
33-
user_id: &str,
34-
job_type: JobType,
35-
) -> Result<Option<Job>> {
36-
let jt = job_type.as_str_name();
37-
38-
let res = self
39-
.list_jobs(ListJobsRequest {
40-
page_size: 1,
41-
filter: format!("job_type == '{jt}' && created_by_user_id == '{user_id}'"),
42-
order_by: "created_date desc".into(),
43-
..Default::default()
44-
})
45-
.await
46-
.context("failed to retrieve latest user job")?
47-
.into_inner();
48-
49-
Ok(res.jobs.first().cloned())
50-
}
51-
5231
pub async fn get_job(&mut self, job_id: &str) -> Result<Option<Job>> {
5332
let res = self
5433
.list_jobs(ListJobsRequest {

rust/crates/sift_cli/src/util/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,3 @@ pub mod channel;
44
pub mod job;
55
pub mod progress;
66
pub mod tty;
7-
pub mod user;

rust/crates/sift_cli/src/util/user.rs

Lines changed: 0 additions & 15 deletions
This file was deleted.

0 commit comments

Comments
 (0)