|
12 | 12 | """ |
13 | 13 | from __future__ import annotations |
14 | 14 |
|
| 15 | +import base64 |
15 | 16 | import dataclasses |
16 | 17 | import datetime as dt |
| 18 | +import hashlib |
| 19 | +import hmac |
| 20 | +import json |
17 | 21 | import logging |
18 | 22 | import os |
| 23 | +import re |
| 24 | +import secrets |
19 | 25 | import subprocess |
20 | 26 | import sys |
21 | 27 | import tempfile |
@@ -59,6 +65,10 @@ class Config: |
59 | 65 | ckan_namespace: str |
60 | 66 | ckan_deployment: str |
61 | 67 |
|
| 68 | + # DataPusher token rotation |
| 69 | + ckan_jwt_secret: str |
| 70 | + ckan_ini_secret_name: str |
| 71 | + |
62 | 72 | # Slack |
63 | 73 | slack_webhook: str | None |
64 | 74 |
|
@@ -87,6 +97,8 @@ def req(name: str) -> str: |
87 | 97 | auth0_client_secret=req("AUTH0_PROD_CLIENT_SECRET"), |
88 | 98 | ckan_namespace=os.environ.get("CKAN_NAMESPACE", "adr-s"), |
89 | 99 | ckan_deployment=os.environ.get("CKAN_DEPLOYMENT", "deploy/ckan"), |
| 100 | + ckan_jwt_secret=req("CKAN_JWT_SECRET"), |
| 101 | + ckan_ini_secret_name=os.environ.get("CKAN_INI_SECRET_NAME", "ckan-ini-secrets"), |
90 | 102 | slack_webhook=os.environ.get("SLACK_WEBHOOK_URL"), |
91 | 103 | ) |
92 | 104 |
|
@@ -441,6 +453,80 @@ def ckan_reindex(cfg: Config) -> None: |
441 | 453 | log.warning("ckan search-index rebuild exited %d — check logs for per-dataset failures", result.returncode) |
442 | 454 |
|
443 | 455 |
|
| 456 | +# ---------- DataPusher token rotation ---------- |
| 457 | + |
| 458 | +def _make_api_token(jwt_secret: str) -> tuple[str, str]: |
| 459 | + """Generate a CKAN-compatible HS256 JWT API token. |
| 460 | +
|
| 461 | + Replicates what `ckan user token add` produces: header.payload.sig with |
| 462 | + no expiry claim. The jti is the DB row ID; the token string is what goes |
| 463 | + in ckan.datapusher.api_token. |
| 464 | +
|
| 465 | + Returns (jti, token_string). |
| 466 | + """ |
| 467 | + jti = secrets.token_urlsafe(48) |
| 468 | + header = base64.urlsafe_b64encode( |
| 469 | + json.dumps({"alg": "HS256", "typ": "JWT"}, separators=(",", ":")).encode() |
| 470 | + ).rstrip(b"=").decode() |
| 471 | + payload = base64.urlsafe_b64encode( |
| 472 | + json.dumps({"jti": jti, "iat": int(time.time())}, separators=(",", ":")).encode() |
| 473 | + ).rstrip(b"=").decode() |
| 474 | + msg = f"{header}.{payload}" |
| 475 | + sig = hmac.new(jwt_secret.encode(), msg.encode(), hashlib.sha256).digest() |
| 476 | + sig_b64 = base64.urlsafe_b64encode(sig).rstrip(b"=").decode() |
| 477 | + return jti, f"{msg}.{sig_b64}" |
| 478 | + |
| 479 | + |
| 480 | +def refresh_datapusher_token(cfg: Config) -> None: |
| 481 | + """Regenerate the DataPusher API token after a DB restore. |
| 482 | +
|
| 483 | + The nightly sync replaces staging's DB with a prod snapshot. Any token |
| 484 | + previously stored only in staging's api_token table is wiped, breaking |
| 485 | + DataPusher's ability to call back to CKAN. This function: |
| 486 | +
|
| 487 | + 1. Generates a fresh JWT using the same secret CKAN uses. |
| 488 | + 2. Inserts a matching row into the restored staging DB. |
| 489 | + 3. Patches ckan-ini-secrets with the new token value. |
| 490 | +
|
| 491 | + Called inside the scale-to-0 window, before CKAN scales back up, so the |
| 492 | + pod starts with a token that already exists in the DB. |
| 493 | + """ |
| 494 | + jti, token = _make_api_token(cfg.ckan_jwt_secret) |
| 495 | + |
| 496 | + run( |
| 497 | + ["psql", "-v", "ON_ERROR_STOP=1", |
| 498 | + "-c", "DELETE FROM api_token WHERE name = 'datapusher_staging';", |
| 499 | + "-c", ( |
| 500 | + f"INSERT INTO api_token (id, name, user_id) " |
| 501 | + f"SELECT '{jti}', 'datapusher_staging', id " |
| 502 | + f"FROM public.user WHERE name = 'admin' LIMIT 1;" |
| 503 | + )], |
| 504 | + env=pg_env(cfg.staging_ckan_url), |
| 505 | + ) |
| 506 | + |
| 507 | + result = subprocess.run( |
| 508 | + ["kubectl", "get", "secret", cfg.ckan_ini_secret_name, |
| 509 | + "-n", cfg.ckan_namespace, "-o", "jsonpath={.data.secrets\\.ini}"], |
| 510 | + capture_output=True, text=True, check=True, |
| 511 | + ) |
| 512 | + current_ini = base64.b64decode(result.stdout.strip()).decode() |
| 513 | + new_ini = re.sub( |
| 514 | + r"(?m)^ckan\.datapusher\.api_token\s*=.*$", |
| 515 | + f"ckan.datapusher.api_token = {token}", |
| 516 | + current_ini, |
| 517 | + ) |
| 518 | + if new_ini == current_ini: |
| 519 | + log.warning("refresh_datapusher_token: ckan.datapusher.api_token not found in secrets.ini — skipping patch") |
| 520 | + return |
| 521 | + new_b64 = base64.b64encode(new_ini.encode()).decode() |
| 522 | + run( |
| 523 | + ["kubectl", "patch", "secret", cfg.ckan_ini_secret_name, |
| 524 | + "-n", cfg.ckan_namespace, "--type=merge", |
| 525 | + "-p", json.dumps({"data": {"secrets.ini": new_b64}})], |
| 526 | + ) |
| 527 | + log.info("datapusher api token refreshed (jti=%s…)", jti[:16]) |
| 528 | + |
| 529 | + |
444 | 530 | # ---------- main ---------- |
445 | 531 |
|
446 | 532 | def main() -> int: |
@@ -472,6 +558,7 @@ def main() -> int: |
472 | 558 | _scale(cfg, scaled, 0) |
473 | 559 | pg_restore_from_blob(cfg, ckan_dump, cfg.staging_ckan_url, "ckan") |
474 | 560 | pg_restore_from_blob(cfg, ds_dump, cfg.staging_ckan_url, "datastore") |
| 561 | + refresh_datapusher_token(cfg) |
475 | 562 | finally: |
476 | 563 | _scale(cfg, scaled, 1) |
477 | 564 | azcopy_sync( |
|
0 commit comments