|
1 | 1 | import os |
2 | 2 | import json |
3 | 3 | import subprocess |
4 | | -from typing import Any |
| 4 | +from typing import Any, Dict, List |
5 | 5 | import boto3 |
6 | 6 | from botocore.exceptions import ClientError |
7 | 7 |
|
8 | | -# Load config from environment |
| 8 | +# --- Configuration --- |
9 | 9 | ACCOUNT_ID = os.environ["R2_ACCOUNT_ID"] |
10 | 10 | ACCESS_KEY_ID = os.environ["R2_ACCESS_KEY_ID"] |
11 | 11 | SECRET_ACCESS_KEY = os.environ["R2_SECRET_ACCESS_KEY"] |
|
14 | 14 | ENDPOINT_URL = f"https://{ACCOUNT_ID}.r2.cloudflarestorage.com" |
15 | 15 | MANIFEST_FILE = "manifest.json" |
16 | 16 |
|
17 | | -print("Starting dataset publish process...") |
18 | | - |
| 17 | +# --- Boto3 S3 Client --- |
19 | 18 | client = boto3.client( |
20 | 19 | "s3", |
21 | 20 | endpoint_url=ENDPOINT_URL, |
22 | 21 | aws_access_key_id=ACCESS_KEY_ID, |
23 | 22 | aws_secret_access_key=SECRET_ACCESS_KEY, |
24 | 23 | ) |
25 | 24 |
|
26 | | -with open(MANIFEST_FILE) as f: |
27 | | - manifest_data = json.load(f) |
28 | | - |
29 | | -needs_update = False |
30 | | -for dataset in manifest_data: |
31 | | - for i, entry in enumerate(dataset["history"]): |
32 | | - # Find the entry that was just merged |
33 | | - if entry.get("commit") == "pending-merge": |
34 | | - needs_update = True |
35 | | - |
36 | | - # --- Get commit details from Git --- |
37 | | - # Get the hash of the merge commit |
38 | | - commit_hash = ( |
39 | | - subprocess.check_output( |
40 | | - ["git", "log", "-1", "--pretty=%h", "--", MANIFEST_FILE] |
41 | | - ) |
42 | | - .decode() |
43 | | - .strip() |
44 | | - ) |
45 | | - # Get the subject line (title) of the merge commit |
46 | | - commit_subject = ( |
47 | | - subprocess.check_output( |
48 | | - ["git", "log", "-1", "--pretty=%s", "--", MANIFEST_FILE] |
49 | | - ) |
50 | | - .decode() |
51 | | - .strip() |
52 | | - ) |
53 | | - |
54 | | - entry["commit"] = commit_hash |
55 | | - entry["description"] = commit_subject # Set the description |
56 | | - # --- End of Git details --- |
57 | | - |
58 | | - # If it was a staged file, process it. Otherwise, it was a rollback. |
59 | | - if "staging_key" in entry and entry["staging_key"]: |
60 | | - staging_key = entry.pop("staging_key") |
61 | | - final_key = entry["r2_object_key"] |
62 | | - print( |
63 | | - f"Publishing: {dataset['fileName']} v{entry['version'].lstrip('v')}" |
64 | | - ) |
65 | | - print(f" Description: {commit_subject}") |
66 | | - try: |
67 | | - copy_source: Any = {"Bucket": STAGING_BUCKET, "Key": staging_key} |
68 | | - client.copy_object( |
69 | | - CopySource=copy_source, Bucket=PROD_BUCKET, Key=final_key |
70 | | - ) |
71 | | - print(" ✅ Server-side copy successful.") |
72 | | - client.delete_object(Bucket=STAGING_BUCKET, Key=staging_key) |
73 | | - print(" ✅ Staging object deleted.") |
74 | | - except ClientError as e: |
75 | | - print(f" ❌ ERROR: Could not process object. Reason: {e}") |
76 | | - exit(1) |
77 | | - else: |
78 | | - print( |
79 | | - f"Finalizing rollback: {dataset['fileName']} v{entry['version'].lstrip('v')}" |
80 | | - ) |
81 | | - print(f" Description: {entry['description']}") |
82 | 25 |
|
83 | | - dataset["history"][i] = entry |
84 | | - break # Process one entry per dataset per run |
| 26 | +def get_commit_details() -> Dict[str, str]: |
| 27 | + """Gets the hash and subject of the latest commit affecting the manifest.""" |
| 28 | + commit_hash = ( |
| 29 | + subprocess.check_output( |
| 30 | + ["git", "log", "-1", "--pretty=%h", "--", MANIFEST_FILE] |
| 31 | + ) |
| 32 | + .decode() |
| 33 | + .strip() |
| 34 | + ) |
| 35 | + commit_subject = ( |
| 36 | + subprocess.check_output( |
| 37 | + ["git", "log", "-1", "--pretty=%s", "--", MANIFEST_FILE] |
| 38 | + ) |
| 39 | + .decode() |
| 40 | + .strip() |
| 41 | + ) |
| 42 | + return {"hash": commit_hash, "subject": commit_subject} |
85 | 43 |
|
86 | | - if needs_update: |
87 | | - break # Process one dataset per run |
88 | 44 |
|
89 | | -if needs_update: |
90 | | - print("\nFinalizing manifest file with new commit details...") |
| 45 | +def finalize_manifest(updated_data: List[Dict[str, Any]], commit_message: str) -> None: |
| 46 | + """Writes the updated manifest, commits, and pushes the changes.""" |
| 47 | + print("\nFinalizing manifest file...") |
91 | 48 | with open(MANIFEST_FILE, "w") as f: |
92 | | - json.dump(manifest_data, f, indent=2, ensure_ascii=False) |
93 | | - f.write("\n") # Add a trailing newline for linters |
| 49 | + json.dump(updated_data, f, indent=2, ensure_ascii=False) |
| 50 | + f.write("\n") |
94 | 51 |
|
95 | 52 | print("Committing and pushing finalized manifest...") |
96 | 53 | subprocess.run(["git", "config", "user.name", "github-actions[bot]"]) |
97 | 54 | subprocess.run( |
98 | 55 | ["git", "config", "user.email", "github-actions[bot]@users.noreply.github.com"] |
99 | 56 | ) |
100 | 57 | subprocess.run(["git", "add", MANIFEST_FILE]) |
101 | | - subprocess.run(["git", "commit", "-m", "ci: Finalize manifest after publish"]) |
| 58 | + subprocess.run(["git", "commit", "-m", commit_message]) |
102 | 59 | subprocess.run(["git", "push"]) |
103 | 60 | print("✅ Manifest finalized.") |
104 | | -else: |
105 | | - print("No staged datasets found to publish.") |
106 | 61 |
|
107 | | -print("\nPublish process complete.") |
| 62 | + |
| 63 | +def handle_deletions(manifest_data: List[Dict[str, Any]]) -> bool: |
| 64 | + """ |
| 65 | + Scans for and processes all pending deletions. |
| 66 | + Returns True if any deletions were processed. |
| 67 | + """ |
| 68 | + print("--- Phase 1: Checking for pending deletions ---") |
| 69 | + datasets_to_keep = [] |
| 70 | + objects_to_delete_from_r2 = [] |
| 71 | + processed_deletion = False |
| 72 | + |
| 73 | + for dataset in manifest_data: |
| 74 | + if dataset.get("status") == "pending-deletion": |
| 75 | + processed_deletion = True |
| 76 | + print(f"Found dataset marked for full deletion: {dataset['fileName']}") |
| 77 | + for entry in dataset.get("history", []): |
| 78 | + if "r2_object_key" in entry: |
| 79 | + objects_to_delete_from_r2.append({"Key": entry["r2_object_key"]}) |
| 80 | + else: |
| 81 | + versions_to_keep = [] |
| 82 | + for entry in dataset.get("history", []): |
| 83 | + if entry.get("status") == "pending-deletion": |
| 84 | + processed_deletion = True |
| 85 | + print( |
| 86 | + f"Found version marked for deletion: {dataset['fileName']} v{entry['version']}" |
| 87 | + ) |
| 88 | + if "r2_object_key" in entry: |
| 89 | + objects_to_delete_from_r2.append( |
| 90 | + {"Key": entry["r2_object_key"]} |
| 91 | + ) |
| 92 | + else: |
| 93 | + versions_to_keep.append(entry) |
| 94 | + dataset["history"] = versions_to_keep |
| 95 | + datasets_to_keep.append(dataset) |
| 96 | + |
| 97 | + if not processed_deletion: |
| 98 | + print("No pending deletions found.") |
| 99 | + return False |
| 100 | + |
| 101 | + if objects_to_delete_from_r2: |
| 102 | + print( |
| 103 | + f"\nDeleting {len(objects_to_delete_from_r2)} objects from production R2 bucket..." |
| 104 | + ) |
| 105 | + for i in range(0, len(objects_to_delete_from_r2), 1000): |
| 106 | + chunk = objects_to_delete_from_r2[i : i + 1000] |
| 107 | + response = client.delete_objects( |
| 108 | + Bucket=PROD_BUCKET, Delete={"Objects": chunk, "Quiet": True} |
| 109 | + ) |
| 110 | + if response.get("Errors"): |
| 111 | + print(" ❌ ERROR during batch deletion:", response["Errors"]) |
| 112 | + exit(1) |
| 113 | + print("✅ Successfully deleted objects from R2.") |
| 114 | + |
| 115 | + finalize_manifest(datasets_to_keep, "ci: Finalize manifest after data deletion") |
| 116 | + return True |
| 117 | + |
| 118 | + |
| 119 | +def handle_publications(manifest_data: List[Dict[str, Any]]) -> bool: |
| 120 | + """ |
| 121 | + Scans for and processes one pending publication or rollback. |
| 122 | + Returns True if a publication was processed. |
| 123 | + """ |
| 124 | + print("\n--- Phase 2: Checking for pending publications ---") |
| 125 | + for dataset in manifest_data: |
| 126 | + for i, entry in enumerate(dataset["history"]): |
| 127 | + if entry.get("commit") == "pending-merge": |
| 128 | + commit_details = get_commit_details() |
| 129 | + entry["commit"] = commit_details["hash"] |
| 130 | + # Only overwrite description if it's the placeholder |
| 131 | + if entry.get("description") == "pending-merge": |
| 132 | + entry["description"] = commit_details["subject"] |
| 133 | + |
| 134 | + if "staging_key" in entry and entry["staging_key"]: |
| 135 | + staging_key = entry.pop("staging_key") |
| 136 | + final_key = entry["r2_object_key"] |
| 137 | + print(f"Publishing: {dataset['fileName']} v{entry['version']}") |
| 138 | + print(f" Description: {entry['description']}") |
| 139 | + try: |
| 140 | + copy_source: Any = { |
| 141 | + "Bucket": STAGING_BUCKET, |
| 142 | + "Key": staging_key, |
| 143 | + } |
| 144 | + client.copy_object( |
| 145 | + CopySource=copy_source, Bucket=PROD_BUCKET, Key=final_key |
| 146 | + ) |
| 147 | + print(" ✅ Server-side copy successful.") |
| 148 | + client.delete_object(Bucket=STAGING_BUCKET, Key=staging_key) |
| 149 | + print(" ✅ Staging object deleted.") |
| 150 | + except ClientError as e: |
| 151 | + print(f" ❌ ERROR: Could not process object. Reason: {e}") |
| 152 | + exit(1) |
| 153 | + else: |
| 154 | + print( |
| 155 | + f"Finalizing rollback: {dataset['fileName']} v{entry['version']}" |
| 156 | + ) |
| 157 | + print(f" Description: {entry['description']}") |
| 158 | + |
| 159 | + dataset["history"][i] = entry |
| 160 | + finalize_manifest( |
| 161 | + manifest_data, |
| 162 | + f"ci: Publish {dataset['fileName']} v{entry['version']}", |
| 163 | + ) |
| 164 | + return True # Process only one publication per run |
| 165 | + |
| 166 | + print("No pending publications found.") |
| 167 | + return False |
| 168 | + |
| 169 | + |
| 170 | +def main(): |
| 171 | + """Main execution block.""" |
| 172 | + print("Starting dataset publish/cleanup process...") |
| 173 | + with open(MANIFEST_FILE) as f: |
| 174 | + manifest_data = json.load(f) |
| 175 | + |
| 176 | + # Prioritize deletions. If any are found, the script will exit after handling them. |
| 177 | + deletions_processed = handle_deletions(manifest_data) |
| 178 | + if deletions_processed: |
| 179 | + print("\nDeletions were processed. Exiting to allow for a clean next run.") |
| 180 | + return |
| 181 | + |
| 182 | + # If no deletions, check for publications. |
| 183 | + handle_publications(manifest_data) |
| 184 | + print("\nProcess complete.") |
| 185 | + |
| 186 | + |
| 187 | +if __name__ == "__main__": |
| 188 | + main() |
0 commit comments