From 3c2b9fa4c4f59b5af4d59bb49dfc4f13a2612918 Mon Sep 17 00:00:00 2001 From: oravi Date: Mon, 20 Apr 2026 00:02:30 +0300 Subject: [PATCH] feat: add `edr artifacts` commands for warehouse artifact queries Adds a new `edr artifacts` subcommand group that lets agents and scripts query Elementary's warehouse artifact tables directly. Twelve commands across six entities (test-results, run-results, invocations, models, sources, tests) with list and get-by-id variants. Output is a stable JSON envelope on stdout (with `-o table` for humans), errors are JSON on stderr with `{error, code, details}`, and exit codes follow 0/1/2 for success/user-error/system-error. List commands paginate via `--limit` and report `has_more`. Logs are redirected to stderr when running `edr artifacts ...` so stdout stays pipe-safe. Co-Authored-By: Claude Opus 4.7 --- elementary/artifacts/README.md | 101 ++++++++ elementary/artifacts/__init__.py | 0 elementary/artifacts/cli.py | 48 ++++ elementary/artifacts/common.py | 92 +++++++ elementary/artifacts/entities/__init__.py | 0 elementary/artifacts/entities/invocations.py | 221 +++++++++++++++++ elementary/artifacts/entities/models.py | 191 +++++++++++++++ elementary/artifacts/entities/run_results.py | 224 +++++++++++++++++ elementary/artifacts/entities/sources.py | 197 +++++++++++++++ elementary/artifacts/entities/test_results.py | 214 +++++++++++++++++ elementary/artifacts/entities/tests.py | 227 ++++++++++++++++++ elementary/artifacts/fetching.py | 74 ++++++ elementary/artifacts/output.py | 70 ++++++ elementary/artifacts/runner.py | 41 ++++ elementary/cli/cli.py | 24 +- .../macros/artifacts/_filter_helpers.sql | 67 ++++++ .../macros/artifacts/get_dbt_invocation.sql | 36 +++ .../macros/artifacts/get_dbt_invocations.sql | 63 +++++ .../macros/artifacts/get_dbt_model.sql | 32 +++ .../macros/artifacts/get_dbt_models.sql | 56 +++++ .../macros/artifacts/get_dbt_run_result.sql | 39 +++ .../macros/artifacts/get_dbt_run_results.sql | 63 +++++ .../macros/artifacts/get_dbt_source.sql | 34 +++ .../macros/artifacts/get_dbt_sources.sql | 53 ++++ .../macros/artifacts/get_dbt_test.sql | 44 ++++ .../macros/artifacts/get_dbt_tests.sql | 70 ++++++ .../artifacts/get_elementary_test_result.sql | 34 +++ .../artifacts/get_elementary_test_results.sql | 63 +++++ elementary/utils/log.py | 16 +- 29 files changed, 2384 insertions(+), 10 deletions(-) create mode 100644 elementary/artifacts/README.md create mode 100644 elementary/artifacts/__init__.py create mode 100644 elementary/artifacts/cli.py create mode 100644 elementary/artifacts/common.py create mode 100644 elementary/artifacts/entities/__init__.py create mode 100644 elementary/artifacts/entities/invocations.py create mode 100644 elementary/artifacts/entities/models.py create mode 100644 elementary/artifacts/entities/run_results.py create mode 100644 elementary/artifacts/entities/sources.py create mode 100644 elementary/artifacts/entities/test_results.py create mode 100644 elementary/artifacts/entities/tests.py create mode 100644 elementary/artifacts/fetching.py create mode 100644 elementary/artifacts/output.py create mode 100644 elementary/artifacts/runner.py create mode 100644 elementary/monitor/dbt_project/macros/artifacts/_filter_helpers.sql create mode 100644 elementary/monitor/dbt_project/macros/artifacts/get_dbt_invocation.sql create mode 100644 elementary/monitor/dbt_project/macros/artifacts/get_dbt_invocations.sql create mode 100644 elementary/monitor/dbt_project/macros/artifacts/get_dbt_model.sql create mode 100644 elementary/monitor/dbt_project/macros/artifacts/get_dbt_models.sql create mode 100644 elementary/monitor/dbt_project/macros/artifacts/get_dbt_run_result.sql create mode 100644 elementary/monitor/dbt_project/macros/artifacts/get_dbt_run_results.sql create mode 100644 elementary/monitor/dbt_project/macros/artifacts/get_dbt_source.sql create mode 100644 elementary/monitor/dbt_project/macros/artifacts/get_dbt_sources.sql create mode 100644 elementary/monitor/dbt_project/macros/artifacts/get_dbt_test.sql create mode 100644 elementary/monitor/dbt_project/macros/artifacts/get_dbt_tests.sql create mode 100644 elementary/monitor/dbt_project/macros/artifacts/get_elementary_test_result.sql create mode 100644 elementary/monitor/dbt_project/macros/artifacts/get_elementary_test_results.sql diff --git a/elementary/artifacts/README.md b/elementary/artifacts/README.md new file mode 100644 index 000000000..56799cc3a --- /dev/null +++ b/elementary/artifacts/README.md @@ -0,0 +1,101 @@ +# `edr artifacts` + +Query Elementary's warehouse artifact tables from the CLI for agent and script consumption. + +Every subcommand emits a JSON envelope on stdout by default. Logs and warnings go to stderr, so `edr artifacts ... | jq` always works without filtering. Errors are written to stderr as JSON with a stable `{error, code, details}` shape. Exit code `0` on success, `1` on user error (bad arguments, not found), `2` on system error (connection, unexpected). + +## Global options + +All commands accept: + +| Flag | Purpose | +| --- | --- | +| `-o, --output {json,table}` | Output format. `json` (default) for agents; `table` for humans. | +| `--profile NAME` | Override the profile in `profiles.yml` (default `elementary`). | +| `-t, --profile-target NAME` | Target to load from the selected profile. | +| `-p, --profiles-dir PATH` | Directory containing `profiles.yml`. Defaults to CWD then `~/.dbt/`. | +| `--project-dir PATH` | Directory containing `dbt_project.yml`. Defaults to CWD. | +| `-c, --config-dir PATH` | Directory containing edr's `config.yml`. | +| `--target-path PATH` | Where edr writes its logs. | + +List commands also accept `--limit N` (1–1000, default 200) and return `has_more: true` when the page is truncated. Fetch the next page by narrowing filters — cursors are not supported. + +## JSON envelopes + +List responses: + +```json +{"count": 10, "has_more": false, "": [...], "data": {"length": 10}} +``` + +Single-get responses: + +```json +{"": {...}} +``` + +Error responses (stderr): + +```json +{"error": "Test test.x.y.z not found.", "code": "NOT_FOUND", "details": {"unique_id": "test.x.y.z"}} +``` + +## Commands + +### Test results + +- `edr artifacts test-results` — list test execution results. +- `edr artifacts test-result ` — fetch a single test result. + +Filters: `--test-unique-id`, `--model-unique-id`, `--test-type`, `--test-sub-type`, `--test-name` (LIKE), `--status`, `--table-name` (LIKE), `--column-name`, `--database-name`, `--schema-name`, `--severity`, `--detected-after`, `--detected-before`. + +### Run results (dbt model execution) + +- `edr artifacts run-results` — list model run results. +- `edr artifacts run-result ` — fetch a single run result. + +Filters: `--unique-id`, `--invocation-id`, `--status`, `--resource-type`, `--materialization`, `--name` (LIKE), `--started-after`, `--started-before`, `--execution-time-gt`, `--execution-time-lt`, `--include-compiled-code/--no-include-compiled-code` (default: exclude). + +### Invocations + +- `edr artifacts invocations` — list dbt invocations (default window: last 7 days). +- `edr artifacts invocation ` — fetch a single invocation. + +Filters: `--invocation-id` (repeatable), `--command`, `--project-name`, `--orchestrator`, `--job-id`, `--job-run-id`, `--target-name`, `--target-schema`, `--target-profile-name`, `--full-refresh/--no-full-refresh`, `--started-after`, `--started-before`. + +### Models + +- `edr artifacts models` — list dbt model definitions. +- `edr artifacts model ` — fetch a single model definition. + +Filters: `--database-name`, `--schema-name`, `--materialization`, `--name` (LIKE over name/alias), `--package-name`, `--group-name`, `--generated-after`, `--generated-before`. + +### Sources + +- `edr artifacts sources` — list dbt source definitions. +- `edr artifacts source ` — fetch a single source. + +Filters: `--database-name`, `--schema-name`, `--source-name`, `--name` (LIKE on table name), `--identifier`, `--package-name`, `--generated-after`, `--generated-before`. + +### Tests + +- `edr artifacts tests` — list dbt test definitions. +- `edr artifacts test ` — fetch a single test definition. + +Filters: `--database-name`, `--schema-name`, `--name` (LIKE over name/short_name/alias), `--package-name`, `--test-type {generic,singular,expectation}`, `--test-namespace`, `--severity {warn,error}`, `--parent-model-unique-id`, `--quality-dimension`, `--group-name`, `--generated-after`, `--generated-before`. + +## Examples + +```bash +# Recent failed tests as JSON +edr artifacts test-results --status fail --detected-after 2026-01-01 --limit 50 + +# One invocation, human-readable +edr artifacts invocation 4a0b... -o table + +# Models in a schema, filtered by name +edr artifacts models --schema-name analytics --name orders + +# Tests covering a specific model +edr artifacts tests --parent-model-unique-id model.my_pkg.orders +``` diff --git a/elementary/artifacts/__init__.py b/elementary/artifacts/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/elementary/artifacts/cli.py b/elementary/artifacts/cli.py new file mode 100644 index 000000000..cc7370ff8 --- /dev/null +++ b/elementary/artifacts/cli.py @@ -0,0 +1,48 @@ +import sys + +import click + +from elementary.artifacts.entities import invocations as invocations_cmds +from elementary.artifacts.entities import models as models_cmds +from elementary.artifacts.entities import run_results as run_results_cmds +from elementary.artifacts.entities import sources as sources_cmds +from elementary.artifacts.entities import test_results as test_results_cmds +from elementary.artifacts.entities import tests as tests_cmds +from elementary.artifacts.fetching import ArtifactFetchError +from elementary.artifacts.output import ErrorCode, emit_error + + +@click.group("artifacts") +def artifacts(): + """Query Elementary's artifact tables for agent and script consumption. + + Every subcommand emits JSON to stdout by default (use `-o table` for + human-readable output). Errors are written to stderr as JSON with a + stable `{error, code, details}` shape. Exit code 0 on success, 1 on + user error, 2 on system error. + """ + pass + + +def _handle_fetch_error(exc: ArtifactFetchError) -> None: + code = emit_error(str(exc), exc.code, exc.details) + sys.exit(code) + + +def _handle_bad_argument(message: str, details: dict = None) -> None: + code = emit_error(message, ErrorCode.BAD_ARGUMENT, details or {}) + sys.exit(code) + + +artifacts.add_command(test_results_cmds.test_results) +artifacts.add_command(test_results_cmds.test_result) +artifacts.add_command(run_results_cmds.run_results) +artifacts.add_command(run_results_cmds.run_result) +artifacts.add_command(invocations_cmds.invocations) +artifacts.add_command(invocations_cmds.invocation) +artifacts.add_command(models_cmds.models) +artifacts.add_command(models_cmds.model) +artifacts.add_command(sources_cmds.sources) +artifacts.add_command(sources_cmds.source) +artifacts.add_command(tests_cmds.tests) +artifacts.add_command(tests_cmds.test) diff --git a/elementary/artifacts/common.py b/elementary/artifacts/common.py new file mode 100644 index 000000000..e51681bea --- /dev/null +++ b/elementary/artifacts/common.py @@ -0,0 +1,92 @@ +import sys +from typing import Callable + +import click + +from elementary.config.config import Config + + +def is_artifacts_invocation() -> bool: + """True when the current edr command is `edr artifacts ...`. + + Used at CLI bootstrap to gate stdout-polluting side effects (logo, + version-upgrade banner, info logs) so `edr artifacts` can emit pure JSON. + """ + argv = sys.argv[1:] + for arg in argv: + if arg.startswith("-"): + continue + return arg == "artifacts" + return False + + +def common_options(func: Callable) -> Callable: + func = click.option( + "--output", + "-o", + type=click.Choice(["json", "table"]), + default="json", + help="Output format. JSON is default and intended for agent/script use.", + )(func) + func = click.option( + "--target-path", + type=str, + default=Config.DEFAULT_TARGET_PATH, + help="Absolute target path for saving edr files such as logs.", + )(func) + func = click.option( + "--config-dir", + "-c", + type=click.Path(), + default=Config.DEFAULT_CONFIG_DIR, + help="Directory containing edr's config.yml.", + )(func) + func = click.option( + "--profile", + "profile_name", + type=str, + default=None, + help=( + "Override the profile name from profiles.yml (defaults to " + "'elementary'). Useful when elementary artifact tables live " + "in a warehouse configured under a different profile." + ), + )(func) + func = click.option( + "--profile-target", + "-t", + type=str, + default=None, + help="Which target to load from the selected profile.", + )(func) + func = click.option( + "--profiles-dir", + "-p", + type=click.Path(exists=True), + default=None, + help="Directory containing profiles.yml. Defaults to CWD then HOME/.dbt/.", + )(func) + func = click.option( + "--project-dir", + type=click.Path(exists=True), + default=None, + help="Directory containing dbt_project.yml. Defaults to CWD.", + )(func) + return func + + +def build_config( + config_dir: str, + profiles_dir, + project_dir, + profile_target, + target_path: str, +) -> Config: + return Config( + config_dir=config_dir, + profiles_dir=profiles_dir, + project_dir=project_dir, + profile_target=profile_target, + target_path=target_path, + quiet_logs=True, + ) diff --git a/elementary/artifacts/entities/__init__.py b/elementary/artifacts/entities/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/elementary/artifacts/entities/invocations.py b/elementary/artifacts/entities/invocations.py new file mode 100644 index 000000000..6daad0db1 --- /dev/null +++ b/elementary/artifacts/entities/invocations.py @@ -0,0 +1,221 @@ +import sys +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Optional, Tuple + +import click + +from elementary.artifacts.common import build_config, common_options +from elementary.artifacts.fetching import ( + ArtifactFetchError, + apply_pagination, + run_macro, +) +from elementary.artifacts.output import ( + ErrorCode, + emit_error, + emit_json, + emit_table, +) +from elementary.artifacts.runner import create_artifacts_runner + +LIST_COLUMNS = [ + "invocation_id", + "run_started_at", + "command", + "orchestrator", + "target_name", + "target_schema", + "threads", + "full_refresh", +] + + +@click.command("invocations") +@common_options +@click.option("--command", "command_arg", default=None, help="Filter by dbt command (e.g. 'build', 'run', 'test').") +@click.option("--project-name", default=None, help="Filter by dbt project name.") +@click.option( + "--orchestrator", + default=None, + help="Filter by orchestrator (e.g. 'github_actions', 'airflow').", +) +@click.option( + "--job-id", + default=None, + help="Filter by orchestrator job ID (NOT dbt invocation ID — use --invocation-id for that).", +) +@click.option("--job-run-id", default=None, help="Filter by orchestrator job run ID.") +@click.option( + "--invocation-id", + "invocation_ids", + multiple=True, + help="Filter by dbt invocation ID. Repeatable.", +) +@click.option("--target-name", default=None, help="Filter by target name (case-insensitive exact).") +@click.option("--target-schema", default=None, help="Filter by target schema (case-insensitive exact).") +@click.option( + "--target-profile-name", + default=None, + help="Filter by target profile name (case-insensitive exact).", +) +@click.option( + "--full-refresh/--no-full-refresh", + "full_refresh", + default=None, + help="Filter by full_refresh flag.", +) +@click.option( + "--started-after", + default=None, + help="Filter: run_started_at >= (ISO 8601). Defaults to 7 days ago.", +) +@click.option( + "--started-before", + default=None, + help="Filter: run_started_at <= (ISO 8601). Defaults to now.", +) +@click.option( + "--limit", + type=click.IntRange(1, 1000), + default=200, + help="Maximum number of invocations to return (default 200, max 1000).", +) +def invocations( + output, + target_path, + config_dir, + profile_name, + profile_target, + profiles_dir, + project_dir, + command_arg, + project_name, + orchestrator, + job_id, + job_run_id, + invocation_ids, + target_name, + target_schema, + target_profile_name, + full_refresh, + started_after, + started_before, + limit, +): + """List dbt invocations. Returns command, orchestrator, target, timing.""" + try: + started_after_iso, started_before_iso = _resolve_time_window( + started_after, started_before + ) + config = build_config( + config_dir, profiles_dir, project_dir, profile_target, target_path + ) + runner = create_artifacts_runner(config, profile=profile_name) + rows = run_macro( + runner, + "elementary_cli.get_dbt_invocations", + { + "command": command_arg, + "project_name": project_name, + "orchestrator": orchestrator, + "job_id": job_id, + "job_run_id": job_run_id, + "invocation_ids": list(invocation_ids) if invocation_ids else None, + "target_name": target_name, + "target_schema": target_schema, + "target_profile_name": target_profile_name, + "full_refresh": full_refresh, + "started_after": started_after_iso, + "started_before": started_before_iso, + "limit": limit + 1, + }, + ) or [] + trimmed, has_more = apply_pagination(rows, limit) + _emit_list(trimmed, has_more, output) + except ArtifactFetchError as exc: + sys.exit(emit_error(str(exc), exc.code, exc.details)) + except Exception as exc: + sys.exit( + emit_error( + f"Unexpected error: {exc}", + ErrorCode.INTERNAL_ERROR, + {"type": type(exc).__name__}, + ) + ) + + +@click.command("invocation") +@common_options +@click.argument("invocation_id") +def invocation( + invocation_id, + output, + target_path, + config_dir, + profile_name, + profile_target, + profiles_dir, + project_dir, +): + """Get a single dbt invocation by its ID.""" + try: + config = build_config( + config_dir, profiles_dir, project_dir, profile_target, target_path + ) + runner = create_artifacts_runner(config, profile=profile_name) + rows = run_macro( + runner, + "elementary_cli.get_dbt_invocation", + {"invocation_id": invocation_id}, + ) or [] + if not rows: + sys.exit( + emit_error( + f"Invocation {invocation_id} not found.", + ErrorCode.NOT_FOUND, + {"invocation_id": invocation_id}, + ) + ) + _emit_single(rows[0], output) + except ArtifactFetchError as exc: + sys.exit(emit_error(str(exc), exc.code, exc.details)) + except Exception as exc: + sys.exit( + emit_error( + f"Unexpected error: {exc}", + ErrorCode.INTERNAL_ERROR, + {"type": type(exc).__name__}, + ) + ) + + +def _resolve_time_window( + started_after: Optional[str], started_before: Optional[str] +) -> Tuple[str, str]: + now = datetime.now(timezone.utc) + if started_after is None: + started_after = (now - timedelta(days=7)).isoformat() + if started_before is None: + started_before = now.isoformat() + return started_after, started_before + + +def _emit_list(rows: List[Dict[str, Any]], has_more: bool, output: str) -> None: + if output == "table": + emit_table(rows, LIST_COLUMNS) + return + emit_json( + { + "count": len(rows), + "has_more": has_more, + "invocations": rows, + "data": {"length": len(rows)}, + } + ) + + +def _emit_single(row: Dict[str, Any], output: str) -> None: + if output == "table": + emit_table([row], LIST_COLUMNS) + return + emit_json({"invocation": row}) diff --git a/elementary/artifacts/entities/models.py b/elementary/artifacts/entities/models.py new file mode 100644 index 000000000..569d622b5 --- /dev/null +++ b/elementary/artifacts/entities/models.py @@ -0,0 +1,191 @@ +import sys +from typing import Any, Dict, List + +import click + +from elementary.artifacts.common import build_config, common_options +from elementary.artifacts.fetching import ( + ArtifactFetchError, + apply_pagination, + parse_json_field, + run_macro, +) +from elementary.artifacts.output import ( + ErrorCode, + emit_error, + emit_json, + emit_table, +) +from elementary.artifacts.runner import create_artifacts_runner + +LIST_COLUMNS = [ + "unique_id", + "name", + "materialization", + "database_name", + "schema_name", + "package_name", + "group_name", +] + +JSON_FIELDS = ("depends_on_nodes", "tags", "owner") + + +@click.command("models") +@common_options +@click.option("--database-name", default=None, help="Filter by database (case-insensitive exact).") +@click.option("--schema-name", default=None, help="Filter by schema (case-insensitive exact).") +@click.option( + "--materialization", + default=None, + help="Filter by materialization (e.g. 'table', 'view', 'incremental').", +) +@click.option( + "--name", + default=None, + help="Search by model name or alias (case-insensitive LIKE).", +) +@click.option("--package-name", default=None, help="Filter by dbt package (case-insensitive exact).") +@click.option("--group-name", default=None, help="Filter by group (case-insensitive exact).") +@click.option( + "--generated-after", + default=None, + help="Filter: generated_at >= (ISO 8601 format).", +) +@click.option( + "--generated-before", + default=None, + help="Filter: generated_at <= (ISO 8601 format).", +) +@click.option( + "--limit", + type=click.IntRange(1, 1000), + default=200, + help="Maximum number of models to return (default 200, max 1000).", +) +def models( + output, + target_path, + config_dir, + profile_name, + profile_target, + profiles_dir, + project_dir, + database_name, + schema_name, + materialization, + name, + package_name, + group_name, + generated_after, + generated_before, + limit, +): + """List dbt models. Returns unique_id, materialization, schema, package, dependencies.""" + try: + config = build_config( + config_dir, profiles_dir, project_dir, profile_target, target_path + ) + runner = create_artifacts_runner(config, profile=profile_name) + rows = run_macro( + runner, + "elementary_cli.get_dbt_models", + { + "database_name": database_name, + "schema_name": schema_name, + "materialization": materialization, + "name": name, + "package_name": package_name, + "group_name": group_name, + "generated_after": generated_after, + "generated_before": generated_before, + "limit": limit + 1, + }, + ) or [] + trimmed, has_more = apply_pagination(rows, limit) + _hydrate_json_fields(trimmed) + _emit_list(trimmed, has_more, output) + except ArtifactFetchError as exc: + sys.exit(emit_error(str(exc), exc.code, exc.details)) + except Exception as exc: + sys.exit( + emit_error( + f"Unexpected error: {exc}", + ErrorCode.INTERNAL_ERROR, + {"type": type(exc).__name__}, + ) + ) + + +@click.command("model") +@common_options +@click.argument("unique_id") +def model( + unique_id, + output, + target_path, + config_dir, + profile_name, + profile_target, + profiles_dir, + project_dir, +): + """Get a single dbt model by its unique_id.""" + try: + config = build_config( + config_dir, profiles_dir, project_dir, profile_target, target_path + ) + runner = create_artifacts_runner(config, profile=profile_name) + rows = run_macro( + runner, + "elementary_cli.get_dbt_model", + {"unique_id": unique_id}, + ) or [] + if not rows: + sys.exit( + emit_error( + f"Model {unique_id} not found.", + ErrorCode.NOT_FOUND, + {"unique_id": unique_id}, + ) + ) + _hydrate_json_fields(rows) + _emit_single(rows[0], output) + except ArtifactFetchError as exc: + sys.exit(emit_error(str(exc), exc.code, exc.details)) + except Exception as exc: + sys.exit( + emit_error( + f"Unexpected error: {exc}", + ErrorCode.INTERNAL_ERROR, + {"type": type(exc).__name__}, + ) + ) + + +def _hydrate_json_fields(rows: List[Dict[str, Any]]) -> None: + for row in rows: + for field in JSON_FIELDS: + if field in row: + row[field] = parse_json_field(row[field]) + + +def _emit_list(rows: List[Dict[str, Any]], has_more: bool, output: str) -> None: + if output == "table": + emit_table(rows, LIST_COLUMNS) + return + emit_json( + { + "count": len(rows), + "has_more": has_more, + "models": rows, + "data": {"length": len(rows)}, + } + ) + + +def _emit_single(row: Dict[str, Any], output: str) -> None: + if output == "table": + emit_table([row], LIST_COLUMNS) + return + emit_json({"model": row}) diff --git a/elementary/artifacts/entities/run_results.py b/elementary/artifacts/entities/run_results.py new file mode 100644 index 000000000..87fc2ef08 --- /dev/null +++ b/elementary/artifacts/entities/run_results.py @@ -0,0 +1,224 @@ +import sys +from typing import Any, Dict, List + +import click + +from elementary.artifacts.common import build_config, common_options +from elementary.artifacts.fetching import ( + ArtifactFetchError, + apply_pagination, + run_macro, +) +from elementary.artifacts.output import ( + ErrorCode, + emit_error, + emit_json, + emit_table, +) +from elementary.artifacts.runner import create_artifacts_runner + +LIST_COLUMNS = [ + "model_execution_id", + "execute_started_at", + "status", + "resource_type", + "materialization", + "execution_time", + "name", + "unique_id", +] + + +@click.command("run-results") +@common_options +@click.option("--unique-id", default=None, help="Filter by asset unique_id (exact).") +@click.option( + "--invocation-id", + default=None, + help="Filter to a specific dbt invocation.", +) +@click.option( + "--status", + default=None, + help="Filter by status (e.g. 'success', 'error', 'skipped', 'warn', 'fail', 'pass').", +) +@click.option( + "--resource-type", + default=None, + help="Filter by resource type (e.g. 'model', 'test', 'snapshot', 'seed').", +) +@click.option( + "--materialization", + default=None, + help="Filter by materialization (e.g. 'table', 'view', 'incremental').", +) +@click.option( + "--name", default=None, help="Search by resource name (case-insensitive LIKE)." +) +@click.option( + "--started-after", + default=None, + help="Filter: execute_started_at >= (ISO 8601 format).", +) +@click.option( + "--started-before", + default=None, + help="Filter: execute_started_at <= (ISO 8601 format).", +) +@click.option( + "--execution-time-gt", + type=float, + default=None, + help="Filter: execution_time > value (seconds).", +) +@click.option( + "--execution-time-lt", + type=float, + default=None, + help="Filter: execution_time < value (seconds).", +) +@click.option( + "--include-compiled-code", + is_flag=True, + default=False, + help="Include compiled_code and adapter_response fields (large).", +) +@click.option( + "--limit", + type=click.IntRange(1, 1000), + default=200, + help="Maximum number of run results to return (default 200, max 1000).", +) +def run_results( + output, + target_path, + config_dir, + profile_name, + profile_target, + profiles_dir, + project_dir, + unique_id, + invocation_id, + status, + resource_type, + materialization, + name, + started_after, + started_before, + execution_time_gt, + execution_time_lt, + include_compiled_code, + limit, +): + """List dbt run results. Returns execution status, timing, materialization, details.""" + try: + config = build_config( + config_dir, profiles_dir, project_dir, profile_target, target_path + ) + runner = create_artifacts_runner(config, profile=profile_name) + rows = run_macro( + runner, + "elementary_cli.get_dbt_run_results", + { + "unique_id": unique_id, + "invocation_id": invocation_id, + "status": status, + "resource_type": resource_type, + "materialization": materialization, + "name": name, + "started_after": started_after, + "started_before": started_before, + "execution_time_gt": execution_time_gt, + "execution_time_lt": execution_time_lt, + "lightweight": not include_compiled_code, + "limit": limit + 1, + }, + ) or [] + trimmed, has_more = apply_pagination(rows, limit) + _emit_list(trimmed, has_more, output) + except ArtifactFetchError as exc: + sys.exit(emit_error(str(exc), exc.code, exc.details)) + except Exception as exc: + sys.exit( + emit_error( + f"Unexpected error: {exc}", + ErrorCode.INTERNAL_ERROR, + {"type": type(exc).__name__}, + ) + ) + + +@click.command("run-result") +@common_options +@click.option( + "--include-compiled-code", + is_flag=True, + default=False, + help="Include compiled_code and adapter_response fields (large).", +) +@click.argument("model_execution_id") +def run_result( + model_execution_id, + output, + target_path, + config_dir, + profile_name, + profile_target, + profiles_dir, + project_dir, + include_compiled_code, +): + """Get a single dbt run result by its model_execution_id.""" + try: + config = build_config( + config_dir, profiles_dir, project_dir, profile_target, target_path + ) + runner = create_artifacts_runner(config, profile=profile_name) + rows = run_macro( + runner, + "elementary_cli.get_dbt_run_result", + { + "model_execution_id": model_execution_id, + "include_compiled_code": include_compiled_code, + }, + ) or [] + if not rows: + sys.exit( + emit_error( + f"Run result {model_execution_id} not found.", + ErrorCode.NOT_FOUND, + {"model_execution_id": model_execution_id}, + ) + ) + _emit_single(rows[0], output) + except ArtifactFetchError as exc: + sys.exit(emit_error(str(exc), exc.code, exc.details)) + except Exception as exc: + sys.exit( + emit_error( + f"Unexpected error: {exc}", + ErrorCode.INTERNAL_ERROR, + {"type": type(exc).__name__}, + ) + ) + + +def _emit_list(rows: List[Dict[str, Any]], has_more: bool, output: str) -> None: + if output == "table": + emit_table(rows, LIST_COLUMNS) + return + emit_json( + { + "count": len(rows), + "has_more": has_more, + "run_results": rows, + "data": {"length": len(rows)}, + } + ) + + +def _emit_single(row: Dict[str, Any], output: str) -> None: + if output == "table": + emit_table([row], LIST_COLUMNS) + return + emit_json({"run_result": row}) diff --git a/elementary/artifacts/entities/sources.py b/elementary/artifacts/entities/sources.py new file mode 100644 index 000000000..866ef4e0d --- /dev/null +++ b/elementary/artifacts/entities/sources.py @@ -0,0 +1,197 @@ +import sys +from typing import Any, Dict, List + +import click + +from elementary.artifacts.common import build_config, common_options +from elementary.artifacts.fetching import ( + ArtifactFetchError, + apply_pagination, + parse_json_field, + run_macro, +) +from elementary.artifacts.output import ( + ErrorCode, + emit_error, + emit_json, + emit_table, +) +from elementary.artifacts.runner import create_artifacts_runner + +LIST_COLUMNS = [ + "unique_id", + "source_name", + "name", + "identifier", + "database_name", + "schema_name", + "package_name", + "loaded_at_field", +] + +JSON_FIELDS = ( + "freshness_warn_after", + "freshness_error_after", + "tags", + "owner", +) + + +@click.command("sources") +@common_options +@click.option("--database-name", default=None, help="Filter by database (case-insensitive exact).") +@click.option("--schema-name", default=None, help="Filter by schema (case-insensitive exact).") +@click.option( + "--source-name", + default=None, + help="Filter by parent source name (case-insensitive exact).", +) +@click.option("--name", default=None, help="Search by table name (case-insensitive LIKE).") +@click.option( + "--identifier", + default=None, + help="Filter by warehouse identifier (case-insensitive exact).", +) +@click.option("--package-name", default=None, help="Filter by dbt package (case-insensitive exact).") +@click.option( + "--generated-after", + default=None, + help="Filter: generated_at >= (ISO 8601 format).", +) +@click.option( + "--generated-before", + default=None, + help="Filter: generated_at <= (ISO 8601 format).", +) +@click.option( + "--limit", + type=click.IntRange(1, 1000), + default=200, + help="Maximum number of sources to return (default 200, max 1000).", +) +def sources( + output, + target_path, + config_dir, + profile_name, + profile_target, + profiles_dir, + project_dir, + database_name, + schema_name, + source_name, + name, + identifier, + package_name, + generated_after, + generated_before, + limit, +): + """List dbt sources. Returns source_name, identifier, database/schema, freshness config.""" + try: + config = build_config( + config_dir, profiles_dir, project_dir, profile_target, target_path + ) + runner = create_artifacts_runner(config, profile=profile_name) + rows = run_macro( + runner, + "elementary_cli.get_dbt_sources", + { + "database_name": database_name, + "schema_name": schema_name, + "source_name": source_name, + "name": name, + "identifier": identifier, + "package_name": package_name, + "generated_after": generated_after, + "generated_before": generated_before, + "limit": limit + 1, + }, + ) or [] + trimmed, has_more = apply_pagination(rows, limit) + _hydrate_json_fields(trimmed) + _emit_list(trimmed, has_more, output) + except ArtifactFetchError as exc: + sys.exit(emit_error(str(exc), exc.code, exc.details)) + except Exception as exc: + sys.exit( + emit_error( + f"Unexpected error: {exc}", + ErrorCode.INTERNAL_ERROR, + {"type": type(exc).__name__}, + ) + ) + + +@click.command("source") +@common_options +@click.argument("unique_id") +def source( + unique_id, + output, + target_path, + config_dir, + profile_name, + profile_target, + profiles_dir, + project_dir, +): + """Get a single dbt source by its unique_id.""" + try: + config = build_config( + config_dir, profiles_dir, project_dir, profile_target, target_path + ) + runner = create_artifacts_runner(config, profile=profile_name) + rows = run_macro( + runner, + "elementary_cli.get_dbt_source", + {"unique_id": unique_id}, + ) or [] + if not rows: + sys.exit( + emit_error( + f"Source {unique_id} not found.", + ErrorCode.NOT_FOUND, + {"unique_id": unique_id}, + ) + ) + _hydrate_json_fields(rows) + _emit_single(rows[0], output) + except ArtifactFetchError as exc: + sys.exit(emit_error(str(exc), exc.code, exc.details)) + except Exception as exc: + sys.exit( + emit_error( + f"Unexpected error: {exc}", + ErrorCode.INTERNAL_ERROR, + {"type": type(exc).__name__}, + ) + ) + + +def _hydrate_json_fields(rows: List[Dict[str, Any]]) -> None: + for row in rows: + for field in JSON_FIELDS: + if field in row: + row[field] = parse_json_field(row[field]) + + +def _emit_list(rows: List[Dict[str, Any]], has_more: bool, output: str) -> None: + if output == "table": + emit_table(rows, LIST_COLUMNS) + return + emit_json( + { + "count": len(rows), + "has_more": has_more, + "sources": rows, + "data": {"length": len(rows)}, + } + ) + + +def _emit_single(row: Dict[str, Any], output: str) -> None: + if output == "table": + emit_table([row], LIST_COLUMNS) + return + emit_json({"source": row}) diff --git a/elementary/artifacts/entities/test_results.py b/elementary/artifacts/entities/test_results.py new file mode 100644 index 000000000..8a66bdd1b --- /dev/null +++ b/elementary/artifacts/entities/test_results.py @@ -0,0 +1,214 @@ +import sys +from typing import Any, Dict, List, Optional + +import click + +from elementary.artifacts.common import build_config, common_options +from elementary.artifacts.fetching import ( + ArtifactFetchError, + apply_pagination, + run_macro, +) +from elementary.artifacts.output import ( + ErrorCode, + emit_error, + emit_json, + emit_table, +) +from elementary.artifacts.runner import create_artifacts_runner + +LIST_COLUMNS = [ + "id", + "detected_at", + "status", + "test_type", + "test_name", + "table_name", + "column_name", +] + + +@click.command("test-results") +@common_options +@click.option("--test-unique-id", default=None, help="Filter by test unique_id (exact).") +@click.option( + "--model-unique-id", default=None, help="Filter by model unique_id (exact)." +) +@click.option( + "--test-type", + default=None, + help="Filter by test type (e.g. 'dbt_test', 'anomaly_detection', 'schema_change').", +) +@click.option( + "--test-sub-type", + default=None, + help="Filter by test sub-type (exact, case-insensitive).", +) +@click.option( + "--test-name", default=None, help="Search by test name (case-insensitive LIKE)." +) +@click.option( + "--status", + default=None, + help="Filter by status (e.g. 'pass', 'fail', 'warn', 'error').", +) +@click.option( + "--table-name", default=None, help="Search by table name (case-insensitive LIKE)." +) +@click.option( + "--column-name", default=None, help="Filter by column name (exact, case-insensitive)." +) +@click.option( + "--database-name", + default=None, + help="Filter by database name (exact, case-insensitive).", +) +@click.option( + "--schema-name", + default=None, + help="Filter by schema name (exact, case-insensitive).", +) +@click.option("--severity", default=None, help="Filter by severity (e.g. 'WARN', 'ERROR').") +@click.option( + "--detected-after", + default=None, + help="Filter: detected_at >= (ISO 8601 format).", +) +@click.option( + "--detected-before", + default=None, + help="Filter: detected_at <= (ISO 8601 format).", +) +@click.option( + "--limit", + type=click.IntRange(1, 1000), + default=200, + help="Maximum number of test results to return (default 200, max 1000).", +) +def test_results( + output, + target_path, + config_dir, + profile_name, + profile_target, + profiles_dir, + project_dir, + test_unique_id, + model_unique_id, + test_type, + test_sub_type, + test_name, + status, + table_name, + column_name, + database_name, + schema_name, + severity, + detected_after, + detected_before, + limit, +): + """List Elementary test results. Returns status, detected_at, test type, table/column.""" + try: + config = build_config( + config_dir, profiles_dir, project_dir, profile_target, target_path + ) + runner = create_artifacts_runner(config, profile=profile_name) + rows = run_macro( + runner, + "elementary_cli.get_elementary_test_results", + { + "test_unique_id": test_unique_id, + "model_unique_id": model_unique_id, + "test_type": test_type, + "test_sub_type": test_sub_type, + "test_name": test_name, + "status": status, + "table_name": table_name, + "column_name": column_name, + "database_name": database_name, + "schema_name": schema_name, + "severity": severity, + "detected_after": detected_after, + "detected_before": detected_before, + "limit": limit + 1, + }, + ) or [] + trimmed, has_more = apply_pagination(rows, limit) + _emit_list(trimmed, has_more, output) + except ArtifactFetchError as exc: + sys.exit(emit_error(str(exc), exc.code, exc.details)) + except Exception as exc: + sys.exit( + emit_error( + f"Unexpected error: {exc}", + ErrorCode.INTERNAL_ERROR, + {"type": type(exc).__name__}, + ) + ) + + +@click.command("test-result") +@common_options +@click.argument("test_result_id") +def test_result( + test_result_id, + output, + target_path, + config_dir, + profile_name, + profile_target, + profiles_dir, + project_dir, +): + """Get a single Elementary test result by its ID.""" + try: + config = build_config( + config_dir, profiles_dir, project_dir, profile_target, target_path + ) + runner = create_artifacts_runner(config, profile=profile_name) + rows = run_macro( + runner, + "elementary_cli.get_elementary_test_result", + {"test_result_id": test_result_id}, + ) or [] + if not rows: + sys.exit( + emit_error( + f"Test result {test_result_id} not found.", + ErrorCode.NOT_FOUND, + {"test_result_id": test_result_id}, + ) + ) + _emit_single(rows[0], output) + except ArtifactFetchError as exc: + sys.exit(emit_error(str(exc), exc.code, exc.details)) + except Exception as exc: + sys.exit( + emit_error( + f"Unexpected error: {exc}", + ErrorCode.INTERNAL_ERROR, + {"type": type(exc).__name__}, + ) + ) + + +def _emit_list(rows: List[Dict[str, Any]], has_more: bool, output: str) -> None: + if output == "table": + emit_table(rows, LIST_COLUMNS) + return + emit_json( + { + "count": len(rows), + "has_more": has_more, + "test_results": rows, + "data": {"length": len(rows)}, + } + ) + + +def _emit_single(row: Dict[str, Any], output: str) -> None: + if output == "table": + emit_table([row], LIST_COLUMNS) + return + emit_json({"test_result": row}) diff --git a/elementary/artifacts/entities/tests.py b/elementary/artifacts/entities/tests.py new file mode 100644 index 000000000..d0af68798 --- /dev/null +++ b/elementary/artifacts/entities/tests.py @@ -0,0 +1,227 @@ +import sys +from typing import Any, Dict, List + +import click + +from elementary.artifacts.common import build_config, common_options +from elementary.artifacts.fetching import ( + ArtifactFetchError, + apply_pagination, + parse_json_field, + run_macro, +) +from elementary.artifacts.output import ( + ErrorCode, + emit_error, + emit_json, + emit_table, +) +from elementary.artifacts.runner import create_artifacts_runner + +LIST_COLUMNS = [ + "unique_id", + "short_name", + "test_column_name", + "severity", + "type", + "parent_model_unique_id", + "package_name", +] + +LIST_JSON_FIELDS = ("tags", "model_tags", "model_owners") + +DETAIL_JSON_FIELDS = ( + "test_params", + "tags", + "model_tags", + "model_owners", + "meta", + "depends_on_macros", + "depends_on_nodes", +) + + +@click.command("tests") +@common_options +@click.option("--database-name", default=None, help="Filter by database (case-insensitive exact).") +@click.option("--schema-name", default=None, help="Filter by schema (case-insensitive exact).") +@click.option( + "--name", + default=None, + help="Search by test name, short_name, or alias (case-insensitive LIKE).", +) +@click.option("--package-name", default=None, help="Filter by dbt package (case-insensitive exact).") +@click.option( + "--test-type", + type=click.Choice(["generic", "singular", "expectation"], case_sensitive=False), + default=None, + help="Filter by test type.", +) +@click.option( + "--test-namespace", + default=None, + help="Filter by test namespace/package (e.g. 'elementary', 'dbt_expectations').", +) +@click.option( + "--severity", + type=click.Choice(["warn", "error"], case_sensitive=False), + default=None, + help="Filter by severity.", +) +@click.option( + "--parent-model-unique-id", + default=None, + help="Filter by the unique_id of the tested model.", +) +@click.option("--quality-dimension", default=None, help="Filter by quality dimension (case-insensitive exact).") +@click.option("--group-name", default=None, help="Filter by group (case-insensitive exact).") +@click.option( + "--generated-after", + default=None, + help="Filter: generated_at >= (ISO 8601 format).", +) +@click.option( + "--generated-before", + default=None, + help="Filter: generated_at <= (ISO 8601 format).", +) +@click.option( + "--limit", + type=click.IntRange(1, 1000), + default=200, + help="Maximum number of tests to return (default 200, max 1000).", +) +def tests( + output, + target_path, + config_dir, + profile_name, + profile_target, + profiles_dir, + project_dir, + database_name, + schema_name, + name, + package_name, + test_type, + test_namespace, + severity, + parent_model_unique_id, + quality_dimension, + group_name, + generated_after, + generated_before, + limit, +): + """List dbt test definitions. Returns unique_id, short_name, severity, type, parent model.""" + try: + config = build_config( + config_dir, profiles_dir, project_dir, profile_target, target_path + ) + runner = create_artifacts_runner(config, profile=profile_name) + rows = run_macro( + runner, + "elementary_cli.get_dbt_tests", + { + "database_name": database_name, + "schema_name": schema_name, + "name": name, + "package_name": package_name, + "test_type": test_type, + "test_namespace": test_namespace, + "severity": severity, + "parent_model_unique_id": parent_model_unique_id, + "quality_dimension": quality_dimension, + "group_name": group_name, + "generated_after": generated_after, + "generated_before": generated_before, + "limit": limit + 1, + }, + ) or [] + trimmed, has_more = apply_pagination(rows, limit) + _hydrate_json_fields(trimmed, LIST_JSON_FIELDS) + _emit_list(trimmed, has_more, output) + except ArtifactFetchError as exc: + sys.exit(emit_error(str(exc), exc.code, exc.details)) + except Exception as exc: + sys.exit( + emit_error( + f"Unexpected error: {exc}", + ErrorCode.INTERNAL_ERROR, + {"type": type(exc).__name__}, + ) + ) + + +@click.command("test") +@common_options +@click.argument("unique_id") +def test( + unique_id, + output, + target_path, + config_dir, + profile_name, + profile_target, + profiles_dir, + project_dir, +): + """Get a single dbt test definition by its unique_id.""" + try: + config = build_config( + config_dir, profiles_dir, project_dir, profile_target, target_path + ) + runner = create_artifacts_runner(config, profile=profile_name) + rows = run_macro( + runner, + "elementary_cli.get_dbt_test", + {"unique_id": unique_id}, + ) or [] + if not rows: + sys.exit( + emit_error( + f"Test {unique_id} not found.", + ErrorCode.NOT_FOUND, + {"unique_id": unique_id}, + ) + ) + _hydrate_json_fields(rows, DETAIL_JSON_FIELDS) + _emit_single(rows[0], output) + except ArtifactFetchError as exc: + sys.exit(emit_error(str(exc), exc.code, exc.details)) + except Exception as exc: + sys.exit( + emit_error( + f"Unexpected error: {exc}", + ErrorCode.INTERNAL_ERROR, + {"type": type(exc).__name__}, + ) + ) + + +def _hydrate_json_fields(rows: List[Dict[str, Any]], fields) -> None: + for row in rows: + for field in fields: + if field in row: + row[field] = parse_json_field(row[field]) + + +def _emit_list(rows: List[Dict[str, Any]], has_more: bool, output: str) -> None: + if output == "table": + emit_table(rows, LIST_COLUMNS) + return + emit_json( + { + "count": len(rows), + "has_more": has_more, + "tests": rows, + "data": {"length": len(rows)}, + } + ) + + +def _emit_single(row: Dict[str, Any], output: str) -> None: + if output == "table": + emit_table([row], LIST_COLUMNS) + return + emit_json({"test": row}) diff --git a/elementary/artifacts/fetching.py b/elementary/artifacts/fetching.py new file mode 100644 index 000000000..e6729e8e6 --- /dev/null +++ b/elementary/artifacts/fetching.py @@ -0,0 +1,74 @@ +import json +from typing import Any, Dict, List, Optional, Tuple + +from elementary.artifacts.output import ErrorCode +from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner +from elementary.exceptions.exceptions import DbtCommandError + + +class ArtifactFetchError(Exception): + def __init__(self, message: str, code: str, details: Optional[dict] = None): + super().__init__(message) + self.code = code + self.details = details or {} + + +def run_macro( + dbt_runner: BaseDbtRunner, + macro_name: str, + macro_args: Dict[str, Any], +) -> Any: + """Run a macro in the internal dbt project, parse the JSON result. + + Raises ArtifactFetchError with a stable code on failure so callers can + emit a consistent error envelope. + """ + try: + response = dbt_runner.run_operation( + macro_name=macro_name, + macro_args=macro_args, + log_errors=True, + quiet=True, + ) + except DbtCommandError as exc: + raise ArtifactFetchError( + f"dbt command failed while running {macro_name}.", + ErrorCode.WAREHOUSE_ERROR, + {"dbt_error": str(exc)}, + ) from exc + except Exception as exc: + raise ArtifactFetchError( + f"Unexpected failure running {macro_name}: {exc}", + ErrorCode.INTERNAL_ERROR, + ) from exc + + if not response: + return None + try: + return json.loads(response[0]) + except (ValueError, IndexError) as exc: + raise ArtifactFetchError( + f"Malformed response from {macro_name}.", + ErrorCode.MALFORMED_ARTIFACTS, + {"raw": response[0] if response else None}, + ) from exc + + +def apply_pagination(rows: List[Any], limit: int) -> Tuple[List[Any], bool]: + """Given rows fetched with `limit + 1`, trim to `limit` and report truncation.""" + has_more = len(rows) > limit + return rows[:limit], has_more + + +def parse_json_field(value: Any) -> Any: + """dbt returns list/dict columns as JSON-encoded strings. Parse them.""" + if value is None: + return None + if isinstance(value, (list, dict)): + return value + if isinstance(value, str): + try: + return json.loads(value) + except ValueError: + return value + return value diff --git a/elementary/artifacts/output.py b/elementary/artifacts/output.py new file mode 100644 index 000000000..84ed7f53c --- /dev/null +++ b/elementary/artifacts/output.py @@ -0,0 +1,70 @@ +import json +import sys +from typing import Any, Dict, List, Optional, Sequence + + +class ErrorCode: + BAD_ARGUMENT = "BAD_ARGUMENT" + NOT_AUTHENTICATED = "NOT_AUTHENTICATED" + NOT_FOUND = "NOT_FOUND" + WAREHOUSE_ERROR = "WAREHOUSE_ERROR" + MALFORMED_ARTIFACTS = "MALFORMED_ARTIFACTS" + INTERNAL_ERROR = "INTERNAL_ERROR" + + +_USER_ERROR_CODES = { + ErrorCode.BAD_ARGUMENT, + ErrorCode.NOT_AUTHENTICATED, + ErrorCode.NOT_FOUND, +} + + +def exit_code_for(code: str) -> int: + return 1 if code in _USER_ERROR_CODES else 2 + + +def emit_json(payload: Dict[str, Any]) -> None: + sys.stdout.write(json.dumps(payload, default=str)) + sys.stdout.write("\n") + sys.stdout.flush() + + +def emit_table(rows: Sequence[Dict[str, Any]], columns: Sequence[str]) -> None: + if not rows: + sys.stdout.write("(no results)\n") + sys.stdout.flush() + return + widths = {c: len(c) for c in columns} + stringified: List[Dict[str, str]] = [] + for row in rows: + s = {c: _stringify(row.get(c)) for c in columns} + stringified.append(s) + for c in columns: + widths[c] = max(widths[c], len(s[c])) + header = " ".join(c.ljust(widths[c]) for c in columns) + sep = " ".join("-" * widths[c] for c in columns) + sys.stdout.write(header + "\n") + sys.stdout.write(sep + "\n") + for row in stringified: + sys.stdout.write(" ".join(row[c].ljust(widths[c]) for c in columns) + "\n") + sys.stdout.flush() + + +def _stringify(value: Any) -> str: + if value is None: + return "" + if isinstance(value, (dict, list)): + return json.dumps(value, default=str) + return str(value) + + +def emit_error( + message: str, + code: str, + details: Optional[Dict[str, Any]] = None, +) -> int: + envelope: Dict[str, Any] = {"error": message, "code": code, "details": details or {}} + sys.stderr.write(json.dumps(envelope, default=str)) + sys.stderr.write("\n") + sys.stderr.flush() + return exit_code_for(code) diff --git a/elementary/artifacts/runner.py b/elementary/artifacts/runner.py new file mode 100644 index 000000000..3beb996d3 --- /dev/null +++ b/elementary/artifacts/runner.py @@ -0,0 +1,41 @@ +import types +from typing import Optional + +from elementary.clients.dbt.command_line_dbt_runner import CommandLineDbtRunner +from elementary.clients.dbt.factory import create_dbt_runner +from elementary.config.config import Config +from elementary.monitor import dbt_project_utils + + +def create_artifacts_runner( + config: Config, profile: Optional[str] = None +) -> CommandLineDbtRunner: + runner = create_dbt_runner( + dbt_project_utils.CLI_DBT_PROJECT_PATH, + config.profiles_dir, + config.profile_target, + env_vars=config.env_vars, + run_deps_if_needed=config.run_dbt_deps_if_needed, + ) + if profile: + _inject_profile_override(runner, profile) + return runner + + +def _inject_profile_override(runner: CommandLineDbtRunner, profile: str) -> None: + """Override the profile name from the internal dbt_project.yml. + + The internal elementary_cli project hardcodes `profile: elementary`, but + artifact commands need to be runnable against any working profile in the + user's profiles.yml (e.g. `elementary_analytics`). We inject `--profile X` + into every dbt invocation by wrapping `_run_command`. + """ + original = runner._run_command + + def patched(self, command_args, *args, **kwargs): + new_args = list(command_args) + if new_args: + new_args = [new_args[0], "--profile", profile] + new_args[1:] + return original(command_args=new_args, *args, **kwargs) + + runner._run_command = types.MethodType(patched, runner) diff --git a/elementary/cli/cli.py b/elementary/cli/cli.py index 1a1329487..987209b7f 100644 --- a/elementary/cli/cli.py +++ b/elementary/cli/cli.py @@ -5,6 +5,8 @@ import elementary.cli.logo import elementary.cli.upgrade +from elementary.artifacts.cli import artifacts +from elementary.artifacts.common import is_artifacts_invocation from elementary.config.config import Config from elementary.monitor.cli import monitor, report, send_report from elementary.operations.cli import run_operation @@ -12,8 +14,11 @@ from elementary.utils import package from elementary.utils.log import get_logger, set_root_logger_handlers -elementary.cli.logo.print_elementary_logo() -elementary.cli.upgrade.recommend_version_upgrade() +_ARTIFACTS_MODE = is_artifacts_invocation() + +if not _ARTIFACTS_MODE: + elementary.cli.logo.print_elementary_logo() + elementary.cli.upgrade.recommend_version_upgrade() logger = get_logger(__name__) @@ -44,7 +49,8 @@ def get_command(self, ctx, name): def format_help(self, ctx, formatter): try: - click.echo("Loading dependencies (this might take a few seconds)") + if not _ARTIFACTS_MODE: + click.echo("Loading dependencies (this might take a few seconds)") AnonymousCommandLineTracking(config=Config()).track_cli_help() except Exception: pass @@ -55,9 +61,14 @@ def format_help(self, ctx, formatter): def invoke(self, ctx: click.Context) -> Any: files_target_path = get_log_path(ctx) - quiet_logs = get_quiet_logs(ctx) - set_root_logger_handlers("elementary", files_target_path, quiet_logs=quiet_logs) - if not quiet_logs: + quiet_logs = get_quiet_logs(ctx) or _ARTIFACTS_MODE + set_root_logger_handlers( + "elementary", + files_target_path, + quiet_logs=quiet_logs, + use_stderr=_ARTIFACTS_MODE, + ) + if not quiet_logs and not _ARTIFACTS_MODE: click.echo( "Any feedback and suggestions are welcomed! join our community here - " "https://bit.ly/slack-elementary\n" @@ -82,6 +93,7 @@ def cli(): cli.add_command(report) cli.add_command(send_report, name="send-report") cli.add_command(run_operation, name="run-operation") +cli.add_command(artifacts) if __name__ == "__main__": diff --git a/elementary/monitor/dbt_project/macros/artifacts/_filter_helpers.sql b/elementary/monitor/dbt_project/macros/artifacts/_filter_helpers.sql new file mode 100644 index 000000000..0127fb651 --- /dev/null +++ b/elementary/monitor/dbt_project/macros/artifacts/_filter_helpers.sql @@ -0,0 +1,67 @@ +{# + Helpers for building parameterized WHERE clauses in artifacts macros. + Values come from the edr CLI — we still escape single quotes for hygiene + before embedding into SQL. +#} + +{%- macro _sql_escape(value) -%} +{{ value | string | replace("'", "''") }} +{%- endmacro -%} + +{%- macro _eq_filter(column, value) -%} + {%- if value is not none -%} + and {{ column }} = '{{ elementary_cli._sql_escape(value) }}' + {%- endif -%} +{%- endmacro -%} + +{%- macro _eq_ci_filter(column, value) -%} + {%- if value is not none -%} + and lower({{ column }}) = lower('{{ elementary_cli._sql_escape(value) }}') + {%- endif -%} +{%- endmacro -%} + +{%- macro _like_ci_filter(column, value) -%} + {%- if value is not none -%} + and lower({{ column }}) like lower('%{{ elementary_cli._sql_escape(value) }}%') + {%- endif -%} +{%- endmacro -%} + +{%- macro _bool_filter(column, value) -%} + {%- if value is not none -%} + and {{ column }} = {{ 'true' if value else 'false' }} + {%- endif -%} +{%- endmacro -%} + +{%- macro _in_filter(column, values) -%} + {%- if values is not none and values | length > 0 -%} + and {{ column }} in ( + {%- for v in values -%} + '{{ elementary_cli._sql_escape(v) }}'{% if not loop.last %}, {% endif %} + {%- endfor -%} + ) + {%- endif -%} +{%- endmacro -%} + +{%- macro _gte_filter(column, value) -%} + {%- if value is not none -%} + and {{ column }} >= '{{ elementary_cli._sql_escape(value) }}' + {%- endif -%} +{%- endmacro -%} + +{%- macro _lte_filter(column, value) -%} + {%- if value is not none -%} + and {{ column }} <= '{{ elementary_cli._sql_escape(value) }}' + {%- endif -%} +{%- endmacro -%} + +{%- macro _gt_number_filter(column, value) -%} + {%- if value is not none -%} + and {{ column }} > {{ value }} + {%- endif -%} +{%- endmacro -%} + +{%- macro _lt_number_filter(column, value) -%} + {%- if value is not none -%} + and {{ column }} < {{ value }} + {%- endif -%} +{%- endmacro -%} diff --git a/elementary/monitor/dbt_project/macros/artifacts/get_dbt_invocation.sql b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_invocation.sql new file mode 100644 index 000000000..53d511ff3 --- /dev/null +++ b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_invocation.sql @@ -0,0 +1,36 @@ +{%- macro get_dbt_invocation(invocation_id) -%} + {% set relation = ref('elementary', 'dbt_invocations') %} + {% if not elementary.relation_exists(relation) %} + {% do return([]) %} + {% endif %} + + {% set query %} + select + invocation_id, + project_name, + run_started_at, + run_completed_at, + dbt_version, + command, + orchestrator, + job_id, + job_name, + job_url, + job_run_id, + job_run_url, + target_name, + target_database, + target_schema, + target_profile_name, + threads, + full_refresh, + selected, + invocation_vars + from {{ relation }} + where invocation_id = '{{ elementary_cli._sql_escape(invocation_id) }}' + limit 1 + {% endset %} + + {% set results = elementary.run_query(query) %} + {% do return(elementary.agate_to_dicts(results)) %} +{%- endmacro -%} diff --git a/elementary/monitor/dbt_project/macros/artifacts/get_dbt_invocations.sql b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_invocations.sql new file mode 100644 index 000000000..39838f077 --- /dev/null +++ b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_invocations.sql @@ -0,0 +1,63 @@ +{%- macro get_dbt_invocations( + command=none, + project_name=none, + orchestrator=none, + job_id=none, + job_run_id=none, + invocation_ids=none, + target_name=none, + target_schema=none, + target_profile_name=none, + full_refresh=none, + started_after=none, + started_before=none, + limit=200 +) -%} + {% set relation = ref('elementary', 'dbt_invocations') %} + {% if not elementary.relation_exists(relation) %} + {% do return([]) %} + {% endif %} + + {% set query %} + select + invocation_id, + project_name, + run_started_at, + run_completed_at, + dbt_version, + command, + orchestrator, + job_id, + job_name, + job_url, + job_run_id, + job_run_url, + target_name, + target_database, + target_schema, + target_profile_name, + threads, + full_refresh, + selected, + invocation_vars + from {{ relation }} + where 1=1 + {{ elementary_cli._eq_ci_filter('command', command) }} + {{ elementary_cli._eq_ci_filter('project_name', project_name) }} + {{ elementary_cli._eq_ci_filter('orchestrator', orchestrator) }} + {{ elementary_cli._eq_filter('job_id', job_id) }} + {{ elementary_cli._eq_filter('job_run_id', job_run_id) }} + {{ elementary_cli._in_filter('invocation_id', invocation_ids) }} + {{ elementary_cli._eq_ci_filter('target_name', target_name) }} + {{ elementary_cli._eq_ci_filter('target_schema', target_schema) }} + {{ elementary_cli._eq_ci_filter('target_profile_name', target_profile_name) }} + {{ elementary_cli._bool_filter('full_refresh', full_refresh) }} + {{ elementary_cli._gte_filter('run_started_at', started_after) }} + {{ elementary_cli._lte_filter('run_started_at', started_before) }} + order by run_started_at desc + limit {{ limit }} + {% endset %} + + {% set results = elementary.run_query(query) %} + {% do return(elementary.agate_to_dicts(results)) %} +{%- endmacro -%} diff --git a/elementary/monitor/dbt_project/macros/artifacts/get_dbt_model.sql b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_model.sql new file mode 100644 index 000000000..23a37818b --- /dev/null +++ b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_model.sql @@ -0,0 +1,32 @@ +{%- macro get_dbt_model(unique_id) -%} + {% set relation = ref('elementary', 'dbt_models') %} + {% if not elementary.relation_exists(relation) %} + {% do return([]) %} + {% endif %} + + {% set query %} + select + unique_id, + name, + database_name, + schema_name, + alias, + description, + package_name, + materialization, + path, + original_path, + patch_path, + depends_on_nodes, + group_name, + tags, + owner, + generated_at + from {{ relation }} + where unique_id = '{{ elementary_cli._sql_escape(unique_id) }}' + limit 1 + {% endset %} + + {% set results = elementary.run_query(query) %} + {% do return(elementary.agate_to_dicts(results)) %} +{%- endmacro -%} diff --git a/elementary/monitor/dbt_project/macros/artifacts/get_dbt_models.sql b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_models.sql new file mode 100644 index 000000000..3103f0ee1 --- /dev/null +++ b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_models.sql @@ -0,0 +1,56 @@ +{%- macro get_dbt_models( + database_name=none, + schema_name=none, + materialization=none, + name=none, + package_name=none, + group_name=none, + generated_after=none, + generated_before=none, + limit=200 +) -%} + {% set relation = ref('elementary', 'dbt_models') %} + {% if not elementary.relation_exists(relation) %} + {% do return([]) %} + {% endif %} + + {% set query %} + select + unique_id, + name, + database_name, + schema_name, + alias, + description, + package_name, + materialization, + path, + original_path, + patch_path, + depends_on_nodes, + group_name, + tags, + owner, + generated_at + from {{ relation }} + where 1=1 + {{ elementary_cli._eq_ci_filter('database_name', database_name) }} + {{ elementary_cli._eq_ci_filter('schema_name', schema_name) }} + {{ elementary_cli._eq_ci_filter('materialization', materialization) }} + {%- if name is not none %} + and ( + lower(name) like lower('%{{ elementary_cli._sql_escape(name) }}%') + or lower(alias) like lower('%{{ elementary_cli._sql_escape(name) }}%') + ) + {%- endif %} + {{ elementary_cli._eq_ci_filter('package_name', package_name) }} + {{ elementary_cli._eq_ci_filter('group_name', group_name) }} + {{ elementary_cli._gte_filter('generated_at', generated_after) }} + {{ elementary_cli._lte_filter('generated_at', generated_before) }} + order by unique_id + limit {{ limit }} + {% endset %} + + {% set results = elementary.run_query(query) %} + {% do return(elementary.agate_to_dicts(results)) %} +{%- endmacro -%} diff --git a/elementary/monitor/dbt_project/macros/artifacts/get_dbt_run_result.sql b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_run_result.sql new file mode 100644 index 000000000..103fee4ca --- /dev/null +++ b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_run_result.sql @@ -0,0 +1,39 @@ +{%- macro get_dbt_run_result(model_execution_id, include_compiled_code=false) -%} + {% set relation = ref('elementary', 'dbt_run_results') %} + {% if not elementary.relation_exists(relation) %} + {% do return([]) %} + {% endif %} + + {% set heavy_cols = 'compiled_code, adapter_response' %} + {% set base_cols %} + model_execution_id, + unique_id, + invocation_id, + name, + status, + resource_type, + materialization, + execution_time, + execute_started_at, + execute_completed_at, + compile_started_at, + compile_completed_at, + full_refresh, + message, + thread_id, + query_id, + created_at + {% endset %} + + {% set projection = base_cols ~ ', ' ~ heavy_cols if include_compiled_code else base_cols %} + + {% set query %} + select {{ projection }} + from {{ relation }} + where model_execution_id = '{{ elementary_cli._sql_escape(model_execution_id) }}' + limit 1 + {% endset %} + + {% set results = elementary.run_query(query) %} + {% do return(elementary.agate_to_dicts(results)) %} +{%- endmacro -%} diff --git a/elementary/monitor/dbt_project/macros/artifacts/get_dbt_run_results.sql b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_run_results.sql new file mode 100644 index 000000000..18632ebb6 --- /dev/null +++ b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_run_results.sql @@ -0,0 +1,63 @@ +{%- macro get_dbt_run_results( + unique_id=none, + invocation_id=none, + status=none, + resource_type=none, + materialization=none, + name=none, + started_after=none, + started_before=none, + execution_time_gt=none, + execution_time_lt=none, + lightweight=true, + limit=200 +) -%} + {% set relation = ref('elementary', 'dbt_run_results') %} + {% if not elementary.relation_exists(relation) %} + {% do return([]) %} + {% endif %} + + {% set heavy_cols = 'compiled_code, adapter_response' %} + {% set base_cols %} + model_execution_id, + unique_id, + invocation_id, + name, + status, + resource_type, + materialization, + execution_time, + execute_started_at, + execute_completed_at, + compile_started_at, + compile_completed_at, + full_refresh, + message, + thread_id, + query_id, + created_at + {% endset %} + + {% set projection = base_cols if lightweight else base_cols ~ ', ' ~ heavy_cols %} + + {% set query %} + select {{ projection }} + from {{ relation }} + where 1=1 + {{ elementary_cli._eq_filter('unique_id', unique_id) }} + {{ elementary_cli._eq_filter('invocation_id', invocation_id) }} + {{ elementary_cli._eq_ci_filter('status', status) }} + {{ elementary_cli._eq_ci_filter('resource_type', resource_type) }} + {{ elementary_cli._eq_ci_filter('materialization', materialization) }} + {{ elementary_cli._like_ci_filter('name', name) }} + {{ elementary_cli._gte_filter('execute_started_at', started_after) }} + {{ elementary_cli._lte_filter('execute_started_at', started_before) }} + {{ elementary_cli._gt_number_filter('execution_time', execution_time_gt) }} + {{ elementary_cli._lt_number_filter('execution_time', execution_time_lt) }} + order by execute_started_at desc + limit {{ limit }} + {% endset %} + + {% set results = elementary.run_query(query) %} + {% do return(elementary.agate_to_dicts(results)) %} +{%- endmacro -%} diff --git a/elementary/monitor/dbt_project/macros/artifacts/get_dbt_source.sql b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_source.sql new file mode 100644 index 000000000..225e164a8 --- /dev/null +++ b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_source.sql @@ -0,0 +1,34 @@ +{%- macro get_dbt_source(unique_id) -%} + {% set relation = ref('elementary', 'dbt_sources') %} + {% if not elementary.relation_exists(relation) %} + {% do return([]) %} + {% endif %} + + {% set query %} + select + unique_id, + name, + database_name, + schema_name, + source_name, + identifier, + description, + package_name, + path, + original_path, + freshness_warn_after, + freshness_error_after, + freshness_description, + loaded_at_field, + source_description, + tags, + owner, + generated_at + from {{ relation }} + where unique_id = '{{ elementary_cli._sql_escape(unique_id) }}' + limit 1 + {% endset %} + + {% set results = elementary.run_query(query) %} + {% do return(elementary.agate_to_dicts(results)) %} +{%- endmacro -%} diff --git a/elementary/monitor/dbt_project/macros/artifacts/get_dbt_sources.sql b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_sources.sql new file mode 100644 index 000000000..0d0edebc5 --- /dev/null +++ b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_sources.sql @@ -0,0 +1,53 @@ +{%- macro get_dbt_sources( + database_name=none, + schema_name=none, + source_name=none, + name=none, + identifier=none, + package_name=none, + generated_after=none, + generated_before=none, + limit=200 +) -%} + {% set relation = ref('elementary', 'dbt_sources') %} + {% if not elementary.relation_exists(relation) %} + {% do return([]) %} + {% endif %} + + {% set query %} + select + unique_id, + name, + database_name, + schema_name, + source_name, + identifier, + description, + package_name, + path, + original_path, + freshness_warn_after, + freshness_error_after, + freshness_description, + loaded_at_field, + source_description, + tags, + owner, + generated_at + from {{ relation }} + where 1=1 + {{ elementary_cli._eq_ci_filter('database_name', database_name) }} + {{ elementary_cli._eq_ci_filter('schema_name', schema_name) }} + {{ elementary_cli._eq_ci_filter('source_name', source_name) }} + {{ elementary_cli._like_ci_filter('name', name) }} + {{ elementary_cli._eq_ci_filter('identifier', identifier) }} + {{ elementary_cli._eq_ci_filter('package_name', package_name) }} + {{ elementary_cli._gte_filter('generated_at', generated_after) }} + {{ elementary_cli._lte_filter('generated_at', generated_before) }} + order by unique_id + limit {{ limit }} + {% endset %} + + {% set results = elementary.run_query(query) %} + {% do return(elementary.agate_to_dicts(results)) %} +{%- endmacro -%} diff --git a/elementary/monitor/dbt_project/macros/artifacts/get_dbt_test.sql b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_test.sql new file mode 100644 index 000000000..81b2a5350 --- /dev/null +++ b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_test.sql @@ -0,0 +1,44 @@ +{%- macro get_dbt_test(unique_id) -%} + {% set relation = ref('elementary', 'dbt_tests') %} + {% if not elementary.relation_exists(relation) %} + {% do return([]) %} + {% endif %} + + {% set query %} + select + unique_id, + name, + short_name, + alias, + database_name, + schema_name, + test_column_name, + severity, + warn_if, + error_if, + test_params, + test_namespace, + test_original_name, + type, + parent_model_unique_id, + package_name, + quality_dimension, + group_name, + tags, + model_tags, + model_owners, + meta, + depends_on_macros, + depends_on_nodes, + description, + original_path, + path, + generated_at + from {{ relation }} + where unique_id = '{{ elementary_cli._sql_escape(unique_id) }}' + limit 1 + {% endset %} + + {% set results = elementary.run_query(query) %} + {% do return(elementary.agate_to_dicts(results)) %} +{%- endmacro -%} diff --git a/elementary/monitor/dbt_project/macros/artifacts/get_dbt_tests.sql b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_tests.sql new file mode 100644 index 000000000..dd2a20279 --- /dev/null +++ b/elementary/monitor/dbt_project/macros/artifacts/get_dbt_tests.sql @@ -0,0 +1,70 @@ +{%- macro get_dbt_tests( + database_name=none, + schema_name=none, + name=none, + package_name=none, + test_type=none, + test_namespace=none, + severity=none, + parent_model_unique_id=none, + quality_dimension=none, + group_name=none, + generated_after=none, + generated_before=none, + limit=200 +) -%} + {% set relation = ref('elementary', 'dbt_tests') %} + {% if not elementary.relation_exists(relation) %} + {% do return([]) %} + {% endif %} + + {% set query %} + select + unique_id, + name, + short_name, + alias, + database_name, + schema_name, + test_column_name, + severity, + type, + test_namespace, + test_original_name, + parent_model_unique_id, + package_name, + quality_dimension, + group_name, + tags, + model_tags, + model_owners, + description, + path, + generated_at + from {{ relation }} + where 1=1 + {{ elementary_cli._eq_ci_filter('database_name', database_name) }} + {{ elementary_cli._eq_ci_filter('schema_name', schema_name) }} + {%- if name is not none %} + and ( + lower(name) like lower('%{{ elementary_cli._sql_escape(name) }}%') + or lower(short_name) like lower('%{{ elementary_cli._sql_escape(name) }}%') + or lower(alias) like lower('%{{ elementary_cli._sql_escape(name) }}%') + ) + {%- endif %} + {{ elementary_cli._eq_ci_filter('package_name', package_name) }} + {{ elementary_cli._eq_ci_filter('type', test_type) }} + {{ elementary_cli._eq_ci_filter('test_namespace', test_namespace) }} + {{ elementary_cli._eq_ci_filter('severity', severity) }} + {{ elementary_cli._eq_filter('parent_model_unique_id', parent_model_unique_id) }} + {{ elementary_cli._eq_ci_filter('quality_dimension', quality_dimension) }} + {{ elementary_cli._eq_ci_filter('group_name', group_name) }} + {{ elementary_cli._gte_filter('generated_at', generated_after) }} + {{ elementary_cli._lte_filter('generated_at', generated_before) }} + order by unique_id + limit {{ limit }} + {% endset %} + + {% set results = elementary.run_query(query) %} + {% do return(elementary.agate_to_dicts(results)) %} +{%- endmacro -%} diff --git a/elementary/monitor/dbt_project/macros/artifacts/get_elementary_test_result.sql b/elementary/monitor/dbt_project/macros/artifacts/get_elementary_test_result.sql new file mode 100644 index 000000000..c137680c7 --- /dev/null +++ b/elementary/monitor/dbt_project/macros/artifacts/get_elementary_test_result.sql @@ -0,0 +1,34 @@ +{%- macro get_elementary_test_result(test_result_id) -%} + {% set relation = ref('elementary', 'elementary_test_results') %} + {% if not elementary.relation_exists(relation) %} + {% do return([]) %} + {% endif %} + + {% set query %} + select + id, + test_execution_id, + test_unique_id, + model_unique_id, + detected_at, + database_name, + schema_name, + table_name, + column_name, + test_type, + test_sub_type, + test_name, + test_short_name, + severity, + status, + test_results_description, + failures, + failed_row_count + from {{ relation }} + where id = '{{ elementary_cli._sql_escape(test_result_id) }}' + limit 1 + {% endset %} + + {% set results = elementary.run_query(query) %} + {% do return(elementary.agate_to_dicts(results)) %} +{%- endmacro -%} diff --git a/elementary/monitor/dbt_project/macros/artifacts/get_elementary_test_results.sql b/elementary/monitor/dbt_project/macros/artifacts/get_elementary_test_results.sql new file mode 100644 index 000000000..f0d830205 --- /dev/null +++ b/elementary/monitor/dbt_project/macros/artifacts/get_elementary_test_results.sql @@ -0,0 +1,63 @@ +{%- macro get_elementary_test_results( + test_unique_id=none, + model_unique_id=none, + test_type=none, + test_sub_type=none, + test_name=none, + status=none, + table_name=none, + column_name=none, + database_name=none, + schema_name=none, + severity=none, + detected_after=none, + detected_before=none, + limit=200 +) -%} + {% set relation = ref('elementary', 'elementary_test_results') %} + {% if not elementary.relation_exists(relation) %} + {% do return([]) %} + {% endif %} + + {% set query %} + select + id, + test_execution_id, + test_unique_id, + model_unique_id, + detected_at, + database_name, + schema_name, + table_name, + column_name, + test_type, + test_sub_type, + test_name, + test_short_name, + severity, + status, + test_results_description, + failures, + failed_row_count + from {{ relation }} + where 1=1 + {{ elementary_cli._eq_filter('test_unique_id', test_unique_id) }} + {{ elementary_cli._eq_filter('model_unique_id', model_unique_id) }} + {{ elementary_cli._eq_ci_filter('test_type', test_type) }} + {{ elementary_cli._eq_ci_filter('test_sub_type', test_sub_type) }} + {{ elementary_cli._like_ci_filter('test_name', test_name) }} + {{ elementary_cli._eq_ci_filter('status', status) }} + {{ elementary_cli._like_ci_filter('table_name', table_name) }} + {{ elementary_cli._eq_ci_filter('column_name', column_name) }} + {{ elementary_cli._eq_ci_filter('database_name', database_name) }} + {{ elementary_cli._eq_ci_filter('schema_name', schema_name) }} + {{ elementary_cli._eq_ci_filter('severity', severity) }} + {{ elementary_cli._gte_filter('detected_at', detected_after) }} + {{ elementary_cli._lte_filter('detected_at', detected_before) }} + order by detected_at desc + limit {{ limit }} + {% endset %} + + {% set results = elementary.run_query(query) %} + {% do return(elementary.agate_to_dicts(results)) %} +{%- endmacro -%} diff --git a/elementary/utils/log.py b/elementary/utils/log.py index 3d1397616..5b3ceadf0 100644 --- a/elementary/utils/log.py +++ b/elementary/utils/log.py @@ -30,8 +30,9 @@ def format(self, record): ROTATION_BACKUP_COUNT = 4 -def get_console_handler(quiet_logs: bool = False): - console_handler = logging.StreamHandler(sys.stdout) +def get_console_handler(quiet_logs: bool = False, use_stderr: bool = False): + stream = sys.stderr if use_stderr else sys.stdout + console_handler = logging.StreamHandler(stream) console_handler.setFormatter(FORMATTER) if is_debug(): console_handler.setLevel(logging.DEBUG) @@ -60,13 +61,20 @@ def get_logger(logger_name): return logger -def set_root_logger_handlers(logger_name, files_target_path, quiet_logs: bool = False): +def set_root_logger_handlers( + logger_name, + files_target_path, + quiet_logs: bool = False, + use_stderr: bool = False, +): logger = logging.getLogger(logger_name) # Disable propagation to root logger to avoid duplicate logs logger.propagate = False - logger.addHandler(get_console_handler(quiet_logs=quiet_logs)) + logger.addHandler( + get_console_handler(quiet_logs=quiet_logs, use_stderr=use_stderr) + ) logger.addHandler(get_file_handler(files_target_path)) # Set logger level to DEBUG so it doesn't filter messages (handler will filter)