Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion resources/cli-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ To regenerate this schema from existing code, use the following command:
**Subcommands:**

* `add` — Add a new dataset or modify an existing one
* `apply` — Add a new dataset or modify an existing one
* `completions` — Generate tab-completion scripts for your shell
* `config` — Get or set configuration options
* `delete [rm]` — Delete a dataset
Expand Down Expand Up @@ -66,11 +67,12 @@ Add a new dataset or modify an existing one

**Options:**

* `--dry-run` — Show the changes to be applied without actually doing them
* `-r`, `--recursive` — Recursively search for all manifest in the specified directory
* `--replace` — Delete and re-add datasets that already exist
* `--stdin` — Read manifests from standard input
* `--name <N>` — Overrides the name in a loaded manifest
* `--visibility <VIS>` — Changing the visibility of the added dataset
* `--visibility <VIS>` — Visibility of the added dataset

Possible values: `private`, `public`

Expand Down Expand Up @@ -100,6 +102,39 @@ To add dataset from a repository see `kamu pull` command.



## `kamu apply`

Add a new dataset or modify an existing one

**Usage:** `kamu apply [OPTIONS] [MANIFEST]...`

**Arguments:**

* `<MANIFEST>` — Dataset manifest reference(s) (path, or URL)

**Options:**

* `--dry-run` — Show the changes to be applied without actually doing them
* `-r`, `--recursive` — Recursively search for all manifest in the specified directory
* `--stdin` — Read manifests from standard input
* `--visibility <VIS>` — Visibility of the added dataset

Possible values: `private`, `public`


**Examples:**

Compare the state of a dataset to a manifest:

kamu apply --dry-run org.example.data.yaml

Synchronize all objects with the state described in manifests found in the current directory:

kamu apply --recursive .




## `kamu completions`

Generate tab-completion scripts for your shell
Expand Down
44 changes: 43 additions & 1 deletion src/app/cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub enum PasswordHashingMode {
#[derive(Debug, clap::Subcommand)]
pub enum Command {
Add(Add),
Apply(Apply),
Complete(Complete),
Completions(Completions),
Config(Config),
Expand Down Expand Up @@ -169,6 +170,10 @@ Add a dataset from manifest hosted externally (e.g. on GihHub):
To add dataset from a repository see `kamu pull` command.
"#)]
pub struct Add {
/// Show the changes to be applied without actually doing them
#[arg(long)]
pub dry_run: bool,

/// Recursively search for all manifest in the specified directory
#[arg(long, short = 'r')]
pub recursive: bool,
Expand All @@ -185,7 +190,44 @@ pub struct Add {
#[arg(long, value_name = "N")]
pub name: Option<odf::DatasetAlias>,

/// Changing the visibility of the added dataset
/// Visibility of the added dataset
#[arg(long, value_name = "VIS", value_enum)]
pub visibility: Option<parsers::DatasetVisibility>,

/// Dataset manifest reference(s) (path, or URL)
#[arg()]
pub manifest: Vec<String>,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

/// Add a new dataset or modify an existing one
#[derive(Debug, clap::Args)]
#[command(after_help = r#"
**Examples:**

Compare the state of a dataset to a manifest:

kamu apply --dry-run org.example.data.yaml

Synchronize all objects with the state described in manifests found in the current directory:

kamu apply --recursive .
"#)]
pub struct Apply {
/// Show the changes to be applied without actually doing them
#[arg(long)]
pub dry_run: bool,

/// Recursively search for all manifest in the specified directory
#[arg(long, short = 'r')]
pub recursive: bool,

/// Read manifests from standard input
#[arg(long)]
pub stdin: bool,

/// Visibility of the added dataset
#[arg(long, value_name = "VIS", value_enum)]
pub visibility: Option<parsers::DatasetVisibility>,

Expand Down
14 changes: 14 additions & 0 deletions src/app/cli/src/cli_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub fn get_command(
AddCommand::builder(
c.manifest,
c.name,
c.dry_run,
c.recursive,
c.replace,
c.stdin,
Expand All @@ -44,6 +45,19 @@ pub fn get_command(
.cast(),
),

cli::Command::Apply(c) => Box::new(
ApplyCommand::builder(
c.manifest,
c.dry_run,
c.recursive,
c.stdin,
c.visibility
.map(Into::into)
.unwrap_or(tenancy_config.default_dataset_visibility()),
)
.cast(),
),

cli::Command::Complete(c) => Box::new(CompleteCommand::builder(c.input, c.current).cast()),

cli::Command::Completions(c) => Box::new(CompletionsCommand::builder(c.shell).cast()),
Expand Down
177 changes: 74 additions & 103 deletions src/app/cli/src/commands/add_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::{HashSet, LinkedList};
use std::sync::Arc;

use internal_error::*;
use kamu::domain::*;
use kamu_datasets::{
CreateDatasetFromSnapshotError,
CreateDatasetFromSnapshotUseCase,
CreateDatasetResult,
CreateDatasetUseCaseOptions,
CreateDatasetsFromSnapshotsPlanningError,
CreateDatasetsFromSnapshotsPlanningResult,
DeleteDatasetUseCase,
};

Expand All @@ -40,6 +41,9 @@ pub struct AddCommand {
#[dill::component(explicit)]
name: Option<odf::DatasetAlias>,

#[dill::component(explicit)]
dry_run: bool,

#[dill::component(explicit)]
recursive: bool,

Expand Down Expand Up @@ -129,70 +133,6 @@ impl AddCommand {

Ok(false)
}

pub async fn create_datasets_from_snapshots(
&self,
snapshots: Vec<odf::DatasetSnapshot>,
create_options: CreateDatasetUseCaseOptions,
) -> Vec<(
odf::DatasetAlias,
Result<CreateDatasetResult, CreateDatasetFromSnapshotError>,
)> {
let snapshots_ordered =
self.sort_snapshots_in_dependency_order(snapshots.into_iter().collect());

let mut ret = Vec::new();
for snapshot in snapshots_ordered {
let alias = snapshot.name.clone();
let res = self
.create_dataset_from_snapshot
.execute(snapshot, create_options)
.await;

ret.push((alias, res));
}
ret
}

#[allow(clippy::linkedlist)]
fn sort_snapshots_in_dependency_order(
&self,
mut snapshots: LinkedList<odf::DatasetSnapshot>,
) -> Vec<odf::DatasetSnapshot> {
let mut ordered = Vec::with_capacity(snapshots.len());
let mut pending: HashSet<odf::DatasetRef> =
snapshots.iter().map(|s| s.name.clone().into()).collect();
let mut added: HashSet<odf::DatasetAlias> = HashSet::new();

// TODO: cycle detection
while !snapshots.is_empty() {
let snapshot = snapshots.pop_front().unwrap();

use odf::metadata::EnumWithVariants;
let transform = snapshot
.metadata
.iter()
.find_map(|e| e.as_variant::<odf::metadata::SetTransform>());

let has_pending_deps = if let Some(transform) = transform {
transform.inputs.iter().any(|input| {
pending.contains(&input.dataset_ref)
&& snapshot.name.as_local_ref() != input.dataset_ref // Check for circular dependency
})
} else {
false
};

if !has_pending_deps {
pending.remove(&snapshot.name.clone().into());
added.insert(snapshot.name.clone());
ordered.push(snapshot);
} else {
snapshots.push_back(snapshot);
}
}
ordered
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -252,6 +192,7 @@ impl Command for AddCommand {
}

// Delete existing datasets if we are replacing
// TODO: Move into use case?
if self.replace {
let mut already_exist = Vec::new();
for s in &snapshots {
Expand All @@ -277,61 +218,91 @@ impl Command for AddCommand {
}
};

let create_options = CreateDatasetUseCaseOptions {
dataset_visibility: self.dataset_visibility,
};
let mut add_results = self
.create_datasets_from_snapshots(snapshots, create_options)
.await;

add_results.sort_by(|(id_a, _), (id_b, _)| id_a.cmp(id_b));

let mut num_added = 0;
let CreateDatasetsFromSnapshotsPlanningResult { plan, errors } = match self
.create_dataset_from_snapshot
.prepare(
snapshots,
CreateDatasetUseCaseOptions {
dataset_visibility: self.dataset_visibility,
},
)
.await
{
Ok(res) => Ok(res),
Err(CreateDatasetsFromSnapshotsPlanningError::CyclicDependency(_)) => {
Err(CLIError::usage_error("Aborted on cyclic dependency"))
}
Err(CreateDatasetsFromSnapshotsPlanningError::Internal(err)) => Err(err.into()),
}?;

let mut errors_with_contexts = Vec::new();

for (id, res) in add_results {
match res {
Ok(_) => {
num_added += 1;

if !self.output_config.quiet {
eprintln!("{}: {}", console::style("Added").green(), id);
}
}
Err(CreateDatasetFromSnapshotError::NameCollision(_)) => {
for (snaphot, err) in errors {
match err {
CreateDatasetFromSnapshotError::NameCollision(_) => {
if !self.output_config.quiet {
eprintln!(
"{}: {}: Already exists",
console::style("Skipped").yellow(),
id
snaphot.name,
);
}
}
Err(err) => {
errors_with_contexts.push((err, format!("Failed to add dataset {id}")));
err => {
errors_with_contexts
.push((err, format!("Failed to add dataset {}", snaphot.name)));
}
}
}

if errors_with_contexts.is_empty() {
if !self.output_config.quiet {
if !errors_with_contexts.is_empty() {
return Err(BatchError::new(
format!("Failed to add {} manifest(s)", errors_with_contexts.len()),
errors_with_contexts,
)
.into());
}

if self.dry_run {
let plan = serde_yaml::to_string(&plan).int_err()?;
eprintln!(
"{}\n{}\n{}",
console::style("Execution plan:").yellow().bold(),
console::style(&plan).dim(),
console::style("Exiting early due to --dry-run")
.yellow()
.bold()
);
return Ok(());
}

let mut add_results = self
.create_dataset_from_snapshot
.apply(plan)
.await
.int_err()?;

add_results.sort_by(|a, b| a.dataset_handle.alias.cmp(&b.dataset_handle.alias));

if !self.output_config.quiet {
for res in &add_results {
eprintln!(
"{}",
console::style(format!("Added {num_added} dataset(s)"))
.green()
.bold()
"{}: {}",
console::style("Added").green(),
res.dataset_handle.alias
);
}
}

Ok(())
} else {
Err(BatchError::new(
format!("Failed to load {} manifest(s)", errors_with_contexts.len()),
errors_with_contexts,
)
.into())
if !self.output_config.quiet {
eprintln!(
"{}",
console::style(format!("Added {} dataset(s)", add_results.len()))
.green()
.bold()
);
}

Ok(())
}
}

Expand Down
Loading