Skip to content

Commit 2d07ea6

Browse files
rust(feat): parquet scpr/mcpr imports (#584)
1 parent 4962f03 commit 2d07ea6

9 files changed

Lines changed: 688 additions & 33 deletions

File tree

rust/crates/sift_cli/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ path = "src/main.rs"
1717

1818
[dependencies]
1919
anyhow = { workspace = true }
20+
arrow-array = { workspace = true }
2021
arrow-schema = { workspace = true }
2122
chrono = { workspace = true }
2223
clap = { workspace = true }
@@ -42,5 +43,4 @@ zip = { workspace = true }
4243

4344
[dev-dependencies]
4445
indoc = { workspace = true }
45-
arrow-array = { workspace = true }
4646
bytes = { workspace = true }

rust/crates/sift_cli/src/cli/mod.rs

Lines changed: 78 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use clap::{Parser, Subcommand, crate_description, crate_version};
22
use clap_complete::Shell;
3-
use parquet::ComplexTypesMode;
3+
use parquet::{ChannelMode, ComplexTypesMode};
44
pub mod hdf5;
55
pub mod tdms;
66
use hdf5::Hdf5Schema;
@@ -343,20 +343,17 @@ pub enum ImportParquetCmd {
343343
/// A parquet file where every column is exclusive to a single channel except for the time
344344
/// column
345345
FlatDataset(FlatDatasetArgs),
346+
347+
/// A parquet file laid out single-channel-per-row, either one channel for the whole file
348+
/// (single mode) or with a name column identifying the channel for each row (multi mode).
349+
#[command(name = "cpr")]
350+
ChannelPerRow(ChannelPerRowArgs),
346351
}
347352

348353
#[derive(clap::Args)]
349354
pub struct FlatDatasetArgs {
350-
/// Path to the Parquet file to import
351-
pub path: PathBuf,
352-
353-
/// Name of the asset this data belongs to
354-
#[arg(short, long)]
355-
pub asset: String,
356-
357-
/// Optional run name to associate with this import
358-
#[arg(short, long)]
359-
pub run: Option<String>,
355+
#[command(flatten)]
356+
pub common: CommonImportArgs,
360357

361358
/// Paths of data columns to import; can be specified multiple times
362359
#[arg(short, long)]
@@ -398,14 +395,79 @@ pub struct FlatDatasetArgs {
398395
/// Strategy for handling complex types (maps, lists, structs)
399396
#[arg(short = 'm', long, default_value_t = ComplexTypesMode::default())]
400397
pub complex_types_mode: ComplexTypesMode,
398+
}
401399

402-
/// Wait until the import finishes processing
403-
#[arg(short, long)]
404-
pub wait: bool,
400+
#[derive(clap::Args)]
401+
pub struct ChannelPerRowArgs {
402+
#[command(flatten)]
403+
pub common: CommonImportArgs,
405404

406-
/// Preview the parsed schema without uploading
405+
/// Channel mode: single-channel or multi-channel
406+
#[arg(long)]
407+
pub mode: ChannelMode,
408+
409+
/// Path to the time column
407410
#[arg(short, long)]
408-
pub preview: bool,
411+
pub time_path: String,
412+
413+
/// Time format used in the time column
414+
#[arg(short = 'f', long)]
415+
pub time_format: TimeFormat,
416+
417+
/// Start time (RFC3339) to use if time format is relative
418+
#[arg(short = 's', long)]
419+
pub relative_start_time: Option<String>,
420+
421+
/// Path to the column holding values (used in both modes)
422+
#[arg(long)]
423+
pub data_path: String,
424+
425+
/// Channel name for every row in the file
426+
#[arg(
427+
long,
428+
required_if_eq("mode", "single"),
429+
conflicts_with = "name_path",
430+
help_heading = "Single mode options"
431+
)]
432+
pub channel_name: Option<String>,
433+
434+
/// Data type for the channel. Use `"infer"` to have the program infer the
435+
/// data type from the parquet schema.
436+
#[arg(
437+
long,
438+
conflicts_with = "name_path",
439+
help_heading = "Single mode options"
440+
)]
441+
pub data_type: Option<DataType>,
442+
443+
/// Channel units
444+
#[arg(
445+
long,
446+
conflicts_with = "name_path",
447+
help_heading = "Single mode options"
448+
)]
449+
pub unit: Option<String>,
450+
451+
/// Channel description
452+
#[arg(
453+
short = 'n',
454+
long,
455+
conflicts_with = "name_path",
456+
help_heading = "Single mode options"
457+
)]
458+
pub description: Option<String>,
459+
460+
/// Path to the column holding channel names
461+
#[arg(
462+
long,
463+
required_if_eq("mode", "multi"),
464+
help_heading = "Multi mode options"
465+
)]
466+
pub name_path: Option<String>,
467+
468+
/// Strategy for handling complex types (maps, lists, structs)
469+
#[arg(short = 'm', long, default_value_t = ComplexTypesMode::default())]
470+
pub complex_types_mode: ComplexTypesMode,
409471
}
410472

411473
#[derive(clap::Args)]

rust/crates/sift_cli/src/cli/parquet.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,21 @@ impl Display for ComplexTypesMode {
3838
}
3939
}
4040
}
41+
42+
/// Whether the file holds a single channel or many channels keyed by a name column.
43+
#[derive(Debug, Copy, Clone, PartialEq, Eq, ValueEnum)]
44+
pub enum ChannelMode {
45+
/// File has [time, value]. All rows belong to one named channel.
46+
Single,
47+
/// File has [time, name_column, value_column]. Channels created per unique name.
48+
Multi,
49+
}
50+
51+
impl Display for ChannelMode {
52+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53+
match self {
54+
Self::Single => write!(f, "single"),
55+
Self::Multi => write!(f, "multi"),
56+
}
57+
}
58+
}
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
use std::{collections::HashSet, fs::File, io::Seek, process::ExitCode};
2+
3+
use anyhow::{Context as AnyhowContext, Result, anyhow};
4+
use crossterm::style::Stylize;
5+
use parquet::file::reader::{ChunkReader, FileReader, SerializedFileReader};
6+
use parquet::record::Field;
7+
use parquet::schema::types::Type as ParquetSchemaType;
8+
use sift_rs::{
9+
common::r#type::v1::ChannelConfig,
10+
data_imports::v2::{
11+
CreateDataImportFromUploadRequest, CreateDataImportFromUploadResponse,
12+
ParquetComplexTypesImportMode, ParquetConfig,
13+
data_import_service_client::DataImportServiceClient, parquet_config::Config,
14+
parquet_single_channel_per_row_config::Config as ChannelPerRowInnerConfig,
15+
},
16+
};
17+
18+
use crate::cli::ChannelPerRowArgs;
19+
use crate::cmd::import::parquet::detect_parquet_schema::detect_channel_per_row_config;
20+
use crate::cmd::{
21+
Context,
22+
import::{
23+
parquet::FooterMetadata, preview_import_config, utils::upload_gzipped_file,
24+
wait_for_job_completion,
25+
},
26+
};
27+
use crate::util::{api::create_grpc_channel, tty::Output};
28+
29+
pub async fn run(ctx: Context, args: ChannelPerRowArgs) -> Result<ExitCode> {
30+
let grpc_channel = create_grpc_channel(&ctx)?;
31+
let mut data_imports_client = DataImportServiceClient::new(grpc_channel.clone());
32+
let mut file = File::open(&args.common.path).context("failed to open parquet file")?;
33+
let footer_md = FooterMetadata::try_from(&mut file)?;
34+
35+
let channel_per_row_config =
36+
detect_channel_per_row_config(&file, &args).context("failed to detect parquet schema")?;
37+
38+
if args.common.preview {
39+
let run_label = args
40+
.common
41+
.run_id
42+
.as_deref()
43+
.filter(|s| !s.is_empty())
44+
.or(args.common.run.as_deref())
45+
.unwrap_or("");
46+
47+
let multi_channels: Vec<ChannelConfig> = match channel_per_row_config.config.as_ref() {
48+
Some(ChannelPerRowInnerConfig::MultiChannel(multi)) => {
49+
let data_type = channel_per_row_config
50+
.columns
51+
.iter()
52+
.find(|c| c.path == multi.data_path)
53+
.and_then(|c| c.column_config.as_ref())
54+
.map(|c| c.data_type)
55+
.unwrap_or_default();
56+
57+
discover_multi_channel_names_for_preview(file, &multi.name_path)?
58+
.into_iter()
59+
.map(|name| ChannelConfig {
60+
name,
61+
data_type,
62+
..Default::default()
63+
})
64+
.collect()
65+
}
66+
_ => Vec::new(),
67+
};
68+
69+
let preview_channels: Vec<&ChannelConfig> = match channel_per_row_config.config.as_ref() {
70+
Some(ChannelPerRowInnerConfig::SingleChannel(single)) => {
71+
single.channel.iter().collect()
72+
}
73+
Some(ChannelPerRowInnerConfig::MultiChannel(_)) => multi_channels.iter().collect(),
74+
None => Vec::new(),
75+
};
76+
77+
preview_import_config(&args.common.asset, run_label, &preview_channels);
78+
return Ok(ExitCode::SUCCESS);
79+
}
80+
81+
let parquet_config = ParquetConfig {
82+
config: Some(Config::SingleChannelPerRow(channel_per_row_config)),
83+
..Default::default()
84+
};
85+
let create_data_import_req = create_data_import_request(&args, parquet_config, footer_md)?;
86+
87+
let CreateDataImportFromUploadResponse { upload_url, .. } = data_imports_client
88+
.create_data_import_from_upload(create_data_import_req)
89+
.await
90+
.context("error creating data import")?
91+
.into_inner();
92+
93+
file.rewind()?;
94+
let job_id = upload_gzipped_file(&ctx, &upload_url, file, "application/vnd.apache.parquet")
95+
.await
96+
.context("failed to upload Parquet file")?;
97+
98+
let location = args.common.run.as_ref().map_or_else(
99+
|| format!("asset '{}'", args.common.asset.cyan()),
100+
|r| format!("run '{}'", r.clone().cyan()),
101+
);
102+
103+
if !args.common.wait {
104+
Output::new()
105+
.line(format!("{} file for processing", "Uploaded".green()))
106+
.tip(format!(
107+
"Once processing is complete the data will be available on the {location}."
108+
))
109+
.print();
110+
return Ok(ExitCode::SUCCESS);
111+
}
112+
113+
wait_for_job_completion(grpc_channel, job_id, location).await
114+
}
115+
116+
fn create_data_import_request(
117+
args: &ChannelPerRowArgs,
118+
config: ParquetConfig,
119+
footer_md: FooterMetadata,
120+
) -> Result<CreateDataImportFromUploadRequest> {
121+
Ok(CreateDataImportFromUploadRequest {
122+
parquet_config: Some(ParquetConfig {
123+
asset_name: args.common.asset.clone(),
124+
run_name: args.common.run.clone().unwrap_or_default(),
125+
run_id: args.common.run_id.clone().unwrap_or_default(),
126+
footer_offset: footer_md.offset,
127+
footer_length: u32::try_from(footer_md.length)
128+
.context("parquet footer length too large")?,
129+
complex_types_import_mode: ParquetComplexTypesImportMode::from(
130+
args.complex_types_mode.clone(),
131+
)
132+
.into(),
133+
config: config.config,
134+
}),
135+
..Default::default()
136+
})
137+
}
138+
139+
/// Scan the parquet file's name column and return the distinct channel names
140+
/// it contains (sorted, deduped). Used by multi-mode preview so the user can
141+
/// see what channels the server will create at ingest.
142+
pub(super) fn discover_multi_channel_names_for_preview<R: ChunkReader + 'static>(
143+
file: R,
144+
name_path: &str,
145+
) -> Result<Vec<String>> {
146+
let reader = SerializedFileReader::new(file).context("failed to build parquet file reader")?;
147+
148+
let file_schema = reader.metadata().file_metadata().schema();
149+
let root_name = file_schema.name().to_string();
150+
let name_field = file_schema
151+
.get_fields()
152+
.iter()
153+
.find(|t| t.name() == name_path)
154+
.with_context(|| format!("name column '{name_path}' not found in parquet schema"))?
155+
.clone();
156+
157+
let projection = ParquetSchemaType::group_type_builder(&root_name)
158+
.with_fields(vec![name_field])
159+
.build()
160+
.context("failed to build parquet projection schema")?;
161+
162+
let row_iter = reader
163+
.get_row_iter(Some(projection))
164+
.context("failed to build parquet row iterator")?;
165+
166+
let mut seen: HashSet<String> = HashSet::new();
167+
for row_result in row_iter {
168+
let row = row_result.context("failed to read parquet row")?;
169+
let (_, field) = row
170+
.get_column_iter()
171+
.next()
172+
.ok_or_else(|| anyhow!("internal: projected row missing column"))?;
173+
match field {
174+
Field::Str(s) => {
175+
seen.insert(s.clone());
176+
}
177+
Field::Null => {}
178+
other => {
179+
return Err(anyhow!(
180+
"name column '{name_path}' must be a string type; got {other:?}"
181+
));
182+
}
183+
}
184+
}
185+
186+
let mut names: Vec<String> = seen.into_iter().collect();
187+
names.sort();
188+
Ok(names)
189+
}

0 commit comments

Comments
 (0)