Skip to content

Commit 48ed8f0

Browse files
committed
chore: automate release info updates
1 parent 15c1a5f commit 48ed8f0

2 files changed

Lines changed: 349 additions & 0 deletions

File tree

scripts/update_release_info.py

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
"""Update ATT&CK release hash metadata in mitreattack/release_info.py."""
2+
3+
from __future__ import annotations
4+
5+
import argparse
6+
import ast
7+
import json
8+
import pprint
9+
import re
10+
import subprocess
11+
import sys
12+
import urllib.error
13+
import urllib.parse
14+
import urllib.request
15+
from dataclasses import dataclass
16+
from pathlib import Path
17+
from typing import Any
18+
19+
DOMAINS = ("enterprise", "mobile", "ics")
20+
RELEASE_INFO_PATH = Path("mitreattack/release_info.py")
21+
REQUIRED_ASSIGNMENTS = ("LATEST_VERSION", "STIX20", "STIX21")
22+
23+
24+
@dataclass(frozen=True)
25+
class ReleaseSource:
26+
"""GitHub release source for an ATT&CK STIX version."""
27+
28+
stix_version: str
29+
owner: str
30+
repo: str
31+
tag_prefix: str
32+
assignment_name: str
33+
34+
35+
RELEASE_SOURCES = (
36+
ReleaseSource(
37+
stix_version="2.0",
38+
owner="mitre",
39+
repo="cti",
40+
tag_prefix="ATT&CK-v",
41+
assignment_name="STIX20",
42+
),
43+
ReleaseSource(
44+
stix_version="2.1",
45+
owner="mitre-attack",
46+
repo="attack-stix-data",
47+
tag_prefix="v",
48+
assignment_name="STIX21",
49+
),
50+
)
51+
52+
53+
def main() -> None:
54+
"""Parse arguments and update release_info.py."""
55+
parser = argparse.ArgumentParser(description=__doc__)
56+
parser.add_argument("version", nargs="?", help="ATT&CK release version, for example 19.1")
57+
parser.add_argument(
58+
"--release-info",
59+
type=Path,
60+
default=RELEASE_INFO_PATH,
61+
help=f"Path to release_info.py. Defaults to {RELEASE_INFO_PATH}",
62+
)
63+
parser.add_argument("--token", default=None, help="GitHub token. Defaults to GITHUB_TOKEN if set.")
64+
parser.add_argument("--dry-run", action="store_true", help="Print the updated file instead of writing it.")
65+
parser.add_argument("--no-format", action="store_true", help="Skip running ruff format after writing.")
66+
args = parser.parse_args()
67+
68+
version = args.version or fetch_latest_common_version(token=args.token)
69+
hashes = fetch_release_hashes(version=version, token=args.token)
70+
updated = update_release_info_source(args.release_info.read_text(), version=version, release_hashes=hashes)
71+
72+
if args.dry_run:
73+
print(updated)
74+
return
75+
76+
args.release_info.write_text(updated)
77+
if not args.no_format:
78+
subprocess.run(["uv", "run", "--extra", "dev", "ruff", "format", str(args.release_info)], check=True)
79+
80+
print(f"Updated {args.release_info} for ATT&CK v{version}")
81+
82+
83+
def fetch_latest_common_version(token: str | None = None) -> str:
84+
"""Fetch the latest non-prerelease version present in both STIX release repos."""
85+
latest_versions = {source.stix_version: fetch_latest_version(source, token=token) for source in RELEASE_SOURCES}
86+
if latest_versions["2.0"] != latest_versions["2.1"]:
87+
raise SystemExit(
88+
"Latest STIX release versions do not match: "
89+
f"STIX 2.0={latest_versions['2.0']}, STIX 2.1={latest_versions['2.1']}. "
90+
"Pass the desired ATT&CK version explicitly."
91+
)
92+
return latest_versions["2.0"]
93+
94+
95+
def fetch_latest_version(source: ReleaseSource, token: str | None = None) -> str:
96+
"""Fetch the latest GitHub release version for one STIX source."""
97+
release = github_json(f"https://api.github.com/repos/{source.owner}/{source.repo}/releases/latest", token=token)
98+
return version_from_tag(release["tag_name"], source.tag_prefix)
99+
100+
101+
def fetch_release_hashes(version: str, token: str | None = None) -> dict[str, dict[str, str]]:
102+
"""Fetch SHA256 hashes for every required STIX source and domain."""
103+
release_hashes: dict[str, dict[str, str]] = {}
104+
for source in RELEASE_SOURCES:
105+
release_hashes[source.assignment_name] = fetch_source_hashes(source, version=version, token=token)
106+
return release_hashes
107+
108+
109+
def fetch_source_hashes(source: ReleaseSource, version: str, token: str | None = None) -> dict[str, str]:
110+
"""Fetch SHA256 hashes for one STIX release source."""
111+
tag = f"{source.tag_prefix}{version}"
112+
url = f"https://api.github.com/repos/{source.owner}/{source.repo}/releases/tags/{quote_tag(tag)}"
113+
release = github_json(url, token=token)
114+
assets = release.get("assets", [])
115+
hashes: dict[str, str] = {}
116+
117+
for domain in DOMAINS:
118+
asset = find_domain_asset(assets, domain=domain, version=version)
119+
digest = asset.get("digest")
120+
if isinstance(digest, str) and digest.startswith("sha256:"):
121+
hashes[domain] = digest.removeprefix("sha256:")
122+
continue
123+
124+
browser_download_url = asset.get("browser_download_url")
125+
if not isinstance(browser_download_url, str):
126+
raise SystemExit(f"Missing browser_download_url for {source.owner}/{source.repo} {tag} {asset.get('name')}")
127+
hashes[domain] = fetch_sha256(browser_download_url, token=token)
128+
129+
return hashes
130+
131+
132+
def update_release_info_source(source: str, version: str, release_hashes: dict[str, dict[str, str]]) -> str:
133+
"""Return release_info.py source updated with the given ATT&CK version and hashes."""
134+
tree = ast.parse(source)
135+
assignments = find_assignments(tree)
136+
137+
stix20 = ast.literal_eval(assignments["STIX20"].value)
138+
stix21 = ast.literal_eval(assignments["STIX21"].value)
139+
140+
for assignment_name, stix_hashes in (("STIX20", stix20), ("STIX21", stix21)):
141+
for domain in DOMAINS:
142+
stix_hashes[domain][version] = release_hashes[assignment_name][domain]
143+
144+
replacements = {
145+
"LATEST_VERSION": f'LATEST_VERSION = "{version}"',
146+
"STIX20": format_assignment("STIX20", stix20),
147+
"STIX21": format_assignment("STIX21", stix21),
148+
}
149+
150+
return replace_assignments(source, assignments, replacements)
151+
152+
153+
def find_assignments(tree: ast.Module) -> dict[str, ast.Assign]:
154+
"""Find required top-level assignment nodes."""
155+
assignments: dict[str, ast.Assign] = {}
156+
for node in tree.body:
157+
if not isinstance(node, ast.Assign):
158+
continue
159+
for target in node.targets:
160+
if isinstance(target, ast.Name) and target.id in REQUIRED_ASSIGNMENTS:
161+
assignments[target.id] = node
162+
163+
missing = sorted(set(REQUIRED_ASSIGNMENTS) - set(assignments))
164+
if missing:
165+
raise SystemExit(f"Missing required assignment(s): {', '.join(missing)}")
166+
return assignments
167+
168+
169+
def replace_assignments(source: str, assignments: dict[str, ast.Assign], replacements: dict[str, str]) -> str:
170+
"""Replace assignment source ranges using AST line numbers."""
171+
lines = source.splitlines()
172+
for name, replacement in sorted(replacements.items(), key=lambda item: assignments[item[0]].lineno, reverse=True):
173+
node = assignments[name]
174+
if node.end_lineno is None:
175+
raise SystemExit(f"Could not determine source range for {name}")
176+
lines[node.lineno - 1 : node.end_lineno] = replacement.splitlines()
177+
return "\n".join(lines) + "\n"
178+
179+
180+
def format_assignment(name: str, value: Any) -> str:
181+
"""Format a Python assignment for release hash data."""
182+
return f"{name} = {pprint.pformat(value, width=120, sort_dicts=False)}"
183+
184+
185+
def find_domain_asset(assets: list[dict[str, Any]], domain: str, version: str) -> dict[str, Any]:
186+
"""Find the GitHub release asset for a domain."""
187+
candidate_names = {
188+
f"{domain}-attack.json",
189+
f"{domain}-attack-{version}.json",
190+
}
191+
for asset in assets:
192+
if asset.get("name") in candidate_names:
193+
return asset
194+
raise SystemExit(
195+
f"Could not find release asset for {domain}. Expected one of: {', '.join(sorted(candidate_names))}"
196+
)
197+
198+
199+
def github_json(url: str, token: str | None = None) -> Any:
200+
"""Fetch JSON from the GitHub API."""
201+
request = urllib.request.Request(url, headers=github_headers(token))
202+
try:
203+
with urllib.request.urlopen(request) as response:
204+
return json.loads(response.read().decode("utf-8"))
205+
except urllib.error.HTTPError as error:
206+
raise SystemExit(f"GitHub API request failed for {url}: HTTP {error.code}") from error
207+
except urllib.error.URLError as error:
208+
raise SystemExit(f"GitHub API request failed for {url}: {error.reason}") from error
209+
210+
211+
def fetch_sha256(url: str, token: str | None = None) -> str:
212+
"""Download an asset and return its SHA256 hash."""
213+
import hashlib
214+
215+
request = urllib.request.Request(url, headers=github_headers(token))
216+
sha256_hash = hashlib.sha256()
217+
try:
218+
with urllib.request.urlopen(request) as response:
219+
while chunk := response.read(1024 * 1024):
220+
sha256_hash.update(chunk)
221+
except urllib.error.HTTPError as error:
222+
raise SystemExit(f"Asset download failed for {url}: HTTP {error.code}") from error
223+
except urllib.error.URLError as error:
224+
raise SystemExit(f"Asset download failed for {url}: {error.reason}") from error
225+
return sha256_hash.hexdigest()
226+
227+
228+
def github_headers(token: str | None = None) -> dict[str, str]:
229+
"""Build GitHub request headers."""
230+
headers = {
231+
"Accept": "application/vnd.github+json",
232+
"User-Agent": "mitreattack-python-release-info-updater",
233+
"X-GitHub-Api-Version": "2022-11-28",
234+
}
235+
token = token or env_github_token()
236+
if token:
237+
headers["Authorization"] = f"Bearer {token}"
238+
return headers
239+
240+
241+
def env_github_token() -> str | None:
242+
"""Return GITHUB_TOKEN from the environment without importing os at module import time."""
243+
import os
244+
245+
return os.environ.get("GITHUB_TOKEN")
246+
247+
248+
def version_from_tag(tag: str, tag_prefix: str) -> str:
249+
"""Extract an ATT&CK version from a release tag."""
250+
if not tag.startswith(tag_prefix):
251+
raise SystemExit(f"Release tag {tag!r} does not start with expected prefix {tag_prefix!r}")
252+
version = tag.removeprefix(tag_prefix)
253+
if not re.fullmatch(r"\d+\.\d+(?:[-.][A-Za-z0-9]+)?", version):
254+
raise SystemExit(f"Could not parse ATT&CK release version from tag {tag!r}")
255+
return version
256+
257+
258+
def quote_tag(tag: str) -> str:
259+
"""URL-quote a release tag for GitHub API paths."""
260+
return urllib.parse.quote(tag, safe="")
261+
262+
263+
if __name__ == "__main__":
264+
try:
265+
main()
266+
except KeyboardInterrupt:
267+
sys.exit(130)

tests/test_release_info_updater.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""Tests for the release_info.py updater script."""
2+
3+
from __future__ import annotations
4+
5+
import importlib.util
6+
import sys
7+
from pathlib import Path
8+
9+
SCRIPT_PATH = Path(__file__).parents[1] / "scripts" / "update_release_info.py"
10+
11+
12+
def load_updater():
13+
"""Load the updater script as a test module."""
14+
spec = importlib.util.spec_from_file_location("update_release_info", SCRIPT_PATH)
15+
assert spec
16+
assert spec.loader
17+
module = importlib.util.module_from_spec(spec)
18+
sys.modules["update_release_info"] = module
19+
spec.loader.exec_module(module)
20+
return module
21+
22+
23+
def test_update_release_info_source_updates_required_values():
24+
"""Updater rewrites release constants while preserving unrelated code."""
25+
updater = load_updater()
26+
source = '''"""Release info."""
27+
28+
LATEST_VERSION = "1.0"
29+
30+
STIX20 = {
31+
"enterprise": {"1.0": "old-enterprise-20"},
32+
"mobile": {"1.0": "old-mobile-20"},
33+
"ics": {"1.0": "old-ics-20"},
34+
"pre": {"1.0": "old-pre-20"},
35+
}
36+
37+
STIX21 = {
38+
"enterprise": {"1.0": "old-enterprise-21"},
39+
"mobile": {"1.0": "old-mobile-21"},
40+
"ics": {"1.0": "old-ics-21"},
41+
}
42+
43+
44+
def keep_me():
45+
return "unchanged"
46+
'''
47+
release_hashes = {
48+
"STIX20": {
49+
"enterprise": "new-enterprise-20",
50+
"mobile": "new-mobile-20",
51+
"ics": "new-ics-20",
52+
},
53+
"STIX21": {
54+
"enterprise": "new-enterprise-21",
55+
"mobile": "new-mobile-21",
56+
"ics": "new-ics-21",
57+
},
58+
}
59+
60+
updated = updater.update_release_info_source(source, version="2.0", release_hashes=release_hashes)
61+
62+
assert 'LATEST_VERSION = "2.0"' in updated
63+
assert "'2.0': 'new-enterprise-20'" in updated
64+
assert "'2.0': 'new-mobile-20'" in updated
65+
assert "'2.0': 'new-ics-20'" in updated
66+
assert "'2.0': 'new-enterprise-21'" in updated
67+
assert "'2.0': 'new-mobile-21'" in updated
68+
assert "'2.0': 'new-ics-21'" in updated
69+
assert "'pre': {'1.0': 'old-pre-20'}" in updated
70+
assert 'return "unchanged"' in updated
71+
72+
73+
def test_find_domain_asset_accepts_current_and_versioned_names():
74+
"""Asset lookup accepts release asset naming variants from ATT&CK data repos."""
75+
updater = load_updater()
76+
assets = [
77+
{"name": "enterprise-attack-2.0.json"},
78+
{"name": "mobile-attack.json"},
79+
]
80+
81+
assert updater.find_domain_asset(assets, domain="enterprise", version="2.0")["name"] == "enterprise-attack-2.0.json"
82+
assert updater.find_domain_asset(assets, domain="mobile", version="2.0")["name"] == "mobile-attack.json"

0 commit comments

Comments
 (0)