From 43911a45b77051342d46028f5c51e6f87f127737 Mon Sep 17 00:00:00 2001 From: Max R Date: Thu, 16 Apr 2026 23:32:20 +0200 Subject: [PATCH] Add quiet CLI mode --- README.md | 1 + sqlite_export_for_ynab/_main.py | 96 +++++++++++++++++++++++++-------- tests/_main_test.py | 64 +++++++++++++++++++++- 3 files changed, 136 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 5e2ea9d..6e85f7e 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,7 @@ $ sqlite-export-for-ynab ``` Running it again will pull only data that changed since the last pull (this is done with [Delta Requests](https://api.ynab.com/#deltas)). If you want to wipe the DB and pull all data again use the `--full-refresh` flag. +Pass `--quiet` to suppress all CLI output, including progress bars. You can specify the DB path with the following options 1. The `--db` flag. diff --git a/sqlite_export_for_ynab/_main.py b/sqlite_export_for_ynab/_main.py index 6eefc2c..2007e49 100644 --- a/sqlite_export_for_ynab/_main.py +++ b/sqlite_export_for_ynab/_main.py @@ -78,14 +78,20 @@ async def async_main( parser.add_argument( "--version", action="version", version=f"%(prog)s {version(_PACKAGE)}" ) + parser.add_argument( + "--quiet", + action="store_true", + help="Suppress all CLI output, including progress bars.", + ) args = parser.parse_args(argv) db: Path = args.db full_refresh: bool = args.full_refresh + quiet: bool = args.quiet token = resolve_token(token_override) - await sync(token, db, full_refresh) + await sync(token, db, full_refresh, quiet=quiet) return 0 @@ -102,7 +108,14 @@ def default_db_path() -> Path: ) -async def sync(token: str, db: Path, full_refresh: bool) -> None: +def _print(message: str, *, quiet: bool) -> None: + if not quiet: + print(message) + + +async def sync( + token: str, db: Path, full_refresh: bool, *, quiet: bool = False +) -> None: async with aiohttp.ClientSession() as session: plans = (await YnabClient(token, session)("plans"))["plans"] @@ -116,22 +129,22 @@ async def sync(token: str, db: Path, full_refresh: bool) -> None: cur = con.cursor() if full_refresh: - print("Dropping relations...") + _print("Dropping relations...", quiet=quiet) cur.executescript(contents("drop-relations.sql")) con.commit() - print("Done") + _print("Done", quiet=quiet) relations = get_relations(cur) if relations != _ALL_RELATIONS: - print("Recreating relations...") + _print("Recreating relations...", quiet=quiet) cur.executescript(contents("create-relations.sql")) con.commit() - print("Done") + _print("Done", quiet=quiet) - print("Fetching plan data...") + _print("Fetching plan data...", quiet=quiet) lkos = get_last_knowledge_of_server(cur) async with aiohttp.ClientSession() as session: - with tldm(desc="Plan Data", total=len(plans) * 5) as pbar: + with tldm(desc="Plan Data", total=len(plans) * 5, disable=quiet) as pbar: yc = ProgressYnabClient(YnabClient(token, session), pbar) account_jobs = jobs(yc, "accounts", plan_ids, lkos) @@ -161,7 +174,7 @@ async def sync(token: str, db: Path, full_refresh: bool) -> None: plan_ids, all_txn_data, strict=True ) } - print("Done") + _print("Done", quiet=quiet) if ( not any(t["accounts"] for t in all_account_data) @@ -170,25 +183,27 @@ async def sync(token: str, db: Path, full_refresh: bool) -> None: and not any(t["transactions"] for t in all_txn_data) and not any(s["scheduled_transactions"] for s in all_sched_txn_data) ): - print("No new data fetched") + _print("No new data fetched", quiet=quiet) else: - print("Inserting plan data...") + _print("Inserting plan data...", quiet=quiet) insert_plans(cur, plans, new_lkos) for plan_id, account_data in zip(plan_ids, all_account_data, strict=True): - insert_accounts(cur, plan_id, account_data["accounts"]) + insert_accounts(cur, plan_id, account_data["accounts"], quiet=quiet) for plan_id, cat_data in zip(plan_ids, all_cat_data, strict=True): - insert_category_groups(cur, plan_id, cat_data["category_groups"]) + insert_category_groups( + cur, plan_id, cat_data["category_groups"], quiet=quiet + ) for plan_id, payee_data in zip(plan_ids, all_payee_data, strict=True): - insert_payees(cur, plan_id, payee_data["payees"]) + insert_payees(cur, plan_id, payee_data["payees"], quiet=quiet) for plan_id, txn_data in zip(plan_ids, all_txn_data, strict=True): - insert_transactions(cur, plan_id, txn_data["transactions"]) + insert_transactions(cur, plan_id, txn_data["transactions"], quiet=quiet) for plan_id, sched_txn_data in zip( plan_ids, all_sched_txn_data, strict=True ): insert_scheduled_transactions( - cur, plan_id, sched_txn_data["scheduled_transactions"] + cur, plan_id, sched_txn_data["scheduled_transactions"], quiet=quiet ) - print("Done") + _print("Done", quiet=quiet) def contents(filename: str) -> str: @@ -255,7 +270,11 @@ def insert_plans( def insert_accounts( - cur: sqlite3.Cursor, plan_id: str, accounts: list[dict[str, Any]] + cur: sqlite3.Cursor, + plan_id: str, + accounts: list[dict[str, Any]], + *, + quiet: bool = False, ) -> None: # YNAB's LoanAccountPeriodValues are untyped dicts so we need to turn them into a more standard sub-entry view updated_accounts = [ @@ -283,11 +302,16 @@ def insert_accounts( "accounts", "account_periodic_values", "account_periodic_values", + quiet=quiet, ) def insert_category_groups( - cur: sqlite3.Cursor, plan_id: str, category_groups: list[dict[str, Any]] + cur: sqlite3.Cursor, + plan_id: str, + category_groups: list[dict[str, Any]], + *, + quiet: bool = False, ) -> None: return insert_nested_entries( cur, @@ -297,21 +321,30 @@ def insert_category_groups( "category_groups", "categories", "categories", + quiet=quiet, ) def insert_payees( - cur: sqlite3.Cursor, plan_id: str, payees: list[dict[str, Any]] + cur: sqlite3.Cursor, + plan_id: str, + payees: list[dict[str, Any]], + *, + quiet: bool = False, ) -> None: if not payees: return - for payee in tldm(payees, desc="Payees"): + for payee in tldm(payees, desc="Payees", disable=quiet): insert_entry(cur, "payees", plan_id, payee) def insert_transactions( - cur: sqlite3.Cursor, plan_id: str, transactions: list[dict[str, Any]] + cur: sqlite3.Cursor, + plan_id: str, + transactions: list[dict[str, Any]], + *, + quiet: bool = False, ) -> None: return insert_nested_entries( cur, @@ -321,11 +354,16 @@ def insert_transactions( "transactions", "subtransactions", "subtransactions", + quiet=quiet, ) def insert_scheduled_transactions( - cur: sqlite3.Cursor, plan_id: str, scheduled_transactions: list[dict[str, Any]] + cur: sqlite3.Cursor, + plan_id: str, + scheduled_transactions: list[dict[str, Any]], + *, + quiet: bool = False, ) -> None: return insert_nested_entries( cur, @@ -335,6 +373,7 @@ def insert_scheduled_transactions( "scheduled_transactions", "subtransactions", "scheduled_subtransactions", + quiet=quiet, ) @@ -347,6 +386,8 @@ def insert_nested_entries( entries_name: Literal["accounts"], subentries_name: Literal["account_periodic_values"], subentries_table_name: Literal["account_periodic_values"], + *, + quiet: bool = False, ) -> None: ... @@ -359,6 +400,8 @@ def insert_nested_entries( entries_name: Literal["category_groups"], subentries_name: Literal["categories"], subentries_table_name: Literal["categories"], + *, + quiet: bool = False, ) -> None: ... @@ -371,6 +414,8 @@ def insert_nested_entries( entries_name: Literal["transactions"], subentries_name: Literal["subtransactions"], subentries_table_name: Literal["subtransactions"], + *, + quiet: bool = False, ) -> None: ... @@ -383,6 +428,8 @@ def insert_nested_entries( entries_name: Literal["scheduled_transactions"], subentries_name: Literal["subtransactions"], subentries_table_name: Literal["scheduled_subtransactions"], + *, + quiet: bool = False, ) -> None: ... @@ -413,6 +460,8 @@ def insert_nested_entries( | Literal["subtransactions"] | Literal["scheduled_subtransactions"] ), + *, + quiet: bool = False, ) -> None: if not entries: return @@ -420,6 +469,7 @@ def insert_nested_entries( with tldm( total=sum(1 + len(e[subentries_name]) for e in entries), desc=desc, + disable=quiet, ) as pbar: for entry in entries: insert_entry( diff --git a/tests/_main_test.py b/tests/_main_test.py index 14aa8ef..1c31670 100644 --- a/tests/_main_test.py +++ b/tests/_main_test.py @@ -576,7 +576,7 @@ def test_main_ok(sync, tmp_path, monkeypatch): monkeypatch.setenv(_ENV_TOKEN, TOKEN) ret = main(("--db", str(tmp_path / "db.sqlite"))) - sync.assert_called() + sync.assert_called_once_with(TOKEN, tmp_path / "db.sqlite", False, quiet=False) assert ret == 0 @@ -593,7 +593,19 @@ def test_main_uses_token_override(sync, tmp_path, monkeypatch): ret = main(("--db", str(tmp_path / "db.sqlite")), token_override="override-token") - sync.assert_called_once_with("override-token", tmp_path / "db.sqlite", False) + sync.assert_called_once_with( + "override-token", tmp_path / "db.sqlite", False, quiet=False + ) + assert ret == 0 + + +@patch("sqlite_export_for_ynab._main.sync") +def test_main_quiet(sync, tmp_path, monkeypatch): + monkeypatch.setenv(_ENV_TOKEN, TOKEN) + + ret = main(("--db", str(tmp_path / "db.sqlite"), "--quiet")) + + sync.assert_called_once_with(TOKEN, tmp_path / "db.sqlite", False, quiet=True) assert ret == 0 @@ -654,6 +666,54 @@ async def test_sync_no_data(tmp_path, mock_aioresponses): await sync(TOKEN, db, False) +@pytest.mark.asyncio +@pytest.mark.usefixtures(mock_aioresponses.__name__) +async def test_sync_no_data_quiet(tmp_path, mock_aioresponses, capsys): + mock_aioresponses.get( + PLANS_ENDPOINT_RE, body=json.dumps({"data": {"plans": PLANS}}) + ) + mock_aioresponses.get( + ACCOUNTS_ENDPOINT_RE, + body=json.dumps({"data": {"accounts": []}}), + repeat=True, + ) + mock_aioresponses.get( + CATEGORIES_ENDPOINT_RE, + body=json.dumps({"data": {"category_groups": []}}), + repeat=True, + ) + mock_aioresponses.get( + PAYEES_ENDPOINT_RE, body=json.dumps({"data": {"payees": []}}), repeat=True + ) + mock_aioresponses.get( + TRANSACTIONS_ENDPOINT_RE, + body=json.dumps( + { + "data": { + "transactions": [], + "server_knowledge": SERVER_KNOWLEDGE_1, + } + } + ), + repeat=True, + ) + mock_aioresponses.get( + SCHEDULED_TRANSACTIONS_ENDPOINT_RE, + body=json.dumps({"data": {"scheduled_transactions": []}}), + repeat=True, + ) + + db = tmp_path / "db.sqlite" + with sqlite3.connect(db) as con: + con.executescript(contents("create-relations.sql")) + + await sync(TOKEN, db, False, quiet=True) + + out, err = capsys.readouterr() + assert out == "" + assert err == "" + + @pytest.mark.asyncio @pytest.mark.usefixtures(mock_aioresponses.__name__) async def test_sync(tmp_path, mock_aioresponses):