Skip to content

Commit c93b50f

Browse files
scotlunsjondricek
andauthored
chore: automate release info updates (#233)
Co-authored-by: Jared Ondricek <jondricek@mitre.org>
1 parent 227b12e commit c93b50f

4 files changed

Lines changed: 377 additions & 9 deletions

File tree

justfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ default:
55
# Install development dependencies
66
install:
77
uv sync --all-extras
8+
uv run pre-commit install
9+
uv run pre-commit install --hook-type commit-msg
810

911
# Upgrade all uv dependencies
1012
upgrade:

scripts/update_release_info.py

Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
"""Update ATT&CK release hash metadata in mitreattack/release_info.py."""
2+
3+
from __future__ import annotations
4+
5+
import argparse
6+
import ast
7+
import json
8+
import pprint
9+
import re
10+
import subprocess
11+
import sys
12+
import urllib.error
13+
import urllib.parse
14+
import urllib.request
15+
from dataclasses import dataclass
16+
from pathlib import Path
17+
from typing import Any
18+
19+
DOMAINS = ("enterprise", "mobile", "ics")
20+
SCRIPT_PATH = Path(__file__).resolve()
21+
REPO_ROOT = SCRIPT_PATH.parents[1]
22+
RELEASE_INFO_DISPLAY_PATH = Path("mitreattack/release_info.py")
23+
RELEASE_INFO_PATH = REPO_ROOT / RELEASE_INFO_DISPLAY_PATH
24+
REQUIRED_ASSIGNMENTS = ("LATEST_VERSION", "STIX20", "STIX21")
25+
26+
27+
@dataclass(frozen=True)
28+
class ReleaseSource:
29+
"""GitHub release source for an ATT&CK STIX version."""
30+
31+
stix_version: str
32+
owner: str
33+
repo: str
34+
tag_prefix: str
35+
assignment_name: str
36+
37+
38+
RELEASE_SOURCES = (
39+
ReleaseSource(
40+
stix_version="2.0",
41+
owner="mitre",
42+
repo="cti",
43+
tag_prefix="ATT&CK-v",
44+
assignment_name="STIX20",
45+
),
46+
ReleaseSource(
47+
stix_version="2.1",
48+
owner="mitre-attack",
49+
repo="attack-stix-data",
50+
tag_prefix="v",
51+
assignment_name="STIX21",
52+
),
53+
)
54+
55+
56+
def main() -> None:
57+
"""Parse arguments and update release_info.py."""
58+
parser = argparse.ArgumentParser(description=__doc__)
59+
parser.add_argument("version", nargs="?", help="ATT&CK release version, for example 19.1")
60+
parser.add_argument("--dry-run", action="store_true", help="Print a summary of updates instead of writing.")
61+
args = parser.parse_args()
62+
63+
version = args.version or fetch_latest_common_version()
64+
hashes = fetch_release_hashes(version=version)
65+
source = RELEASE_INFO_PATH.read_text()
66+
updated = update_release_info_source(source, version=version, release_hashes=hashes)
67+
68+
if args.dry_run:
69+
print(format_dry_run_summary(source, version=version, release_hashes=hashes))
70+
return
71+
72+
RELEASE_INFO_PATH.write_text(updated)
73+
subprocess.run(["uv", "run", "--extra", "dev", "ruff", "format", str(RELEASE_INFO_PATH)], check=True, cwd=REPO_ROOT)
74+
75+
print(f"Updated {RELEASE_INFO_DISPLAY_PATH} for ATT&CK v{version}")
76+
77+
78+
def fetch_latest_common_version() -> str:
79+
"""Fetch the latest non-prerelease version present in both STIX release repos."""
80+
latest_versions = {source.stix_version: fetch_latest_version(source) for source in RELEASE_SOURCES}
81+
if latest_versions["2.0"] != latest_versions["2.1"]:
82+
raise SystemExit(
83+
"Latest STIX release versions do not match: "
84+
f"STIX 2.0={latest_versions['2.0']}, STIX 2.1={latest_versions['2.1']}. "
85+
"Pass the desired ATT&CK version explicitly."
86+
)
87+
return latest_versions["2.0"]
88+
89+
90+
def fetch_latest_version(source: ReleaseSource) -> str:
91+
"""Fetch the latest GitHub release version for one STIX source."""
92+
release = github_json(f"https://api.github.com/repos/{source.owner}/{source.repo}/releases/latest")
93+
return version_from_tag(release["tag_name"], source.tag_prefix)
94+
95+
96+
def fetch_release_hashes(version: str) -> dict[str, dict[str, str]]:
97+
"""Fetch SHA256 hashes for every required STIX source and domain."""
98+
release_hashes: dict[str, dict[str, str]] = {}
99+
for source in RELEASE_SOURCES:
100+
release_hashes[source.assignment_name] = fetch_source_hashes(source, version=version)
101+
return release_hashes
102+
103+
104+
def fetch_source_hashes(source: ReleaseSource, version: str) -> dict[str, str]:
105+
"""Fetch SHA256 hashes for one STIX release source."""
106+
tag = f"{source.tag_prefix}{version}"
107+
url = f"https://api.github.com/repos/{source.owner}/{source.repo}/releases/tags/{quote_tag(tag)}"
108+
release = github_json(url)
109+
assets = release.get("assets", [])
110+
hashes: dict[str, str] = {}
111+
112+
for domain in DOMAINS:
113+
asset = find_domain_asset(assets, domain=domain, version=version)
114+
digest = asset.get("digest")
115+
if isinstance(digest, str) and digest.startswith("sha256:"):
116+
hashes[domain] = digest.removeprefix("sha256:")
117+
continue
118+
119+
browser_download_url = asset.get("browser_download_url")
120+
if not isinstance(browser_download_url, str):
121+
raise SystemExit(f"Missing browser_download_url for {source.owner}/{source.repo} {tag} {asset.get('name')}")
122+
hashes[domain] = fetch_sha256(browser_download_url)
123+
124+
return hashes
125+
126+
127+
def update_release_info_source(source: str, version: str, release_hashes: dict[str, dict[str, str]]) -> str:
128+
"""Return release_info.py source updated with the given ATT&CK version and hashes."""
129+
tree = ast.parse(source)
130+
assignments = find_assignments(tree)
131+
132+
stix20 = ast.literal_eval(assignments["STIX20"].value)
133+
stix21 = ast.literal_eval(assignments["STIX21"].value)
134+
135+
for assignment_name, stix_hashes in (("STIX20", stix20), ("STIX21", stix21)):
136+
for domain in DOMAINS:
137+
stix_hashes[domain][version] = release_hashes[assignment_name][domain]
138+
139+
replacements = {
140+
"LATEST_VERSION": f'LATEST_VERSION = "{version}"',
141+
"STIX20": format_assignment("STIX20", stix20),
142+
"STIX21": format_assignment("STIX21", stix21),
143+
}
144+
145+
return replace_assignments(source, assignments, replacements)
146+
147+
148+
def format_dry_run_summary(source: str, version: str, release_hashes: dict[str, dict[str, str]]) -> str:
149+
"""Return a targeted summary of release_info.py changes without printing the full file."""
150+
tree = ast.parse(source)
151+
assignments = find_assignments(tree)
152+
latest_version = ast.literal_eval(assignments["LATEST_VERSION"].value)
153+
stix_values = {
154+
"STIX20": ast.literal_eval(assignments["STIX20"].value),
155+
"STIX21": ast.literal_eval(assignments["STIX21"].value),
156+
}
157+
158+
lines = [f"Would update {RELEASE_INFO_DISPLAY_PATH} for ATT&CK v{version}", "LATEST_VERSION:"]
159+
if latest_version == version:
160+
lines.append(f' unchanged LATEST_VERSION = "{version}"')
161+
else:
162+
lines.append(f'- LATEST_VERSION = "{latest_version}"')
163+
lines.append(f'+ LATEST_VERSION = "{version}"')
164+
165+
for assignment_name, domain_hashes in release_hashes.items():
166+
lines.append(f"{assignment_name}:")
167+
for domain in DOMAINS:
168+
current_hash = stix_values[assignment_name].get(domain, {}).get(version)
169+
new_hash = domain_hashes[domain]
170+
if current_hash is None:
171+
lines.append(f"+ {assignment_name}.{domain}[{version!r}] = {new_hash!r}")
172+
elif current_hash == new_hash:
173+
lines.append(f" unchanged {assignment_name}.{domain}[{version!r}] = {new_hash!r}")
174+
else:
175+
lines.append(f"- {assignment_name}.{domain}[{version!r}] = {current_hash!r}")
176+
lines.append(f"+ {assignment_name}.{domain}[{version!r}] = {new_hash!r}")
177+
178+
return "\n".join(lines)
179+
180+
181+
def find_assignments(tree: ast.Module) -> dict[str, ast.Assign]:
182+
"""Find required top-level assignment nodes."""
183+
assignments: dict[str, ast.Assign] = {}
184+
for node in tree.body:
185+
if not isinstance(node, ast.Assign):
186+
continue
187+
for target in node.targets:
188+
if isinstance(target, ast.Name) and target.id in REQUIRED_ASSIGNMENTS:
189+
assignments[target.id] = node
190+
191+
missing = sorted(set(REQUIRED_ASSIGNMENTS) - set(assignments))
192+
if missing:
193+
raise SystemExit(f"Missing required assignment(s): {', '.join(missing)}")
194+
return assignments
195+
196+
197+
def replace_assignments(source: str, assignments: dict[str, ast.Assign], replacements: dict[str, str]) -> str:
198+
"""Replace assignment source ranges using AST line numbers."""
199+
lines = source.splitlines()
200+
for name, replacement in sorted(replacements.items(), key=lambda item: assignments[item[0]].lineno, reverse=True):
201+
node = assignments[name]
202+
if node.end_lineno is None:
203+
raise SystemExit(f"Could not determine source range for {name}")
204+
lines[node.lineno - 1 : node.end_lineno] = replacement.splitlines()
205+
return "\n".join(lines) + "\n"
206+
207+
208+
def format_assignment(name: str, value: Any) -> str:
209+
"""Format a Python assignment for release hash data."""
210+
return f"{name} = {pprint.pformat(value, width=120, sort_dicts=False)}"
211+
212+
213+
def find_domain_asset(assets: list[dict[str, Any]], domain: str, version: str) -> dict[str, Any]:
214+
"""Find the GitHub release asset for a domain."""
215+
candidate_names = {
216+
f"{domain}-attack.json",
217+
f"{domain}-attack-{version}.json",
218+
}
219+
for asset in assets:
220+
if asset.get("name") in candidate_names:
221+
return asset
222+
raise SystemExit(
223+
f"Could not find release asset for {domain}. Expected one of: {', '.join(sorted(candidate_names))}"
224+
)
225+
226+
227+
def github_json(url: str) -> Any:
228+
"""Fetch JSON from the GitHub API."""
229+
request = urllib.request.Request(url, headers=github_headers())
230+
try:
231+
with urllib.request.urlopen(request) as response:
232+
return json.loads(response.read().decode("utf-8"))
233+
except urllib.error.HTTPError as error:
234+
raise SystemExit(f"GitHub API request failed for {url}: HTTP {error.code}") from error
235+
except urllib.error.URLError as error:
236+
raise SystemExit(f"GitHub API request failed for {url}: {error.reason}") from error
237+
238+
239+
def fetch_sha256(url: str) -> str:
240+
"""Download an asset and return its SHA256 hash."""
241+
import hashlib
242+
243+
request = urllib.request.Request(url, headers=github_headers())
244+
sha256_hash = hashlib.sha256()
245+
try:
246+
with urllib.request.urlopen(request) as response:
247+
while chunk := response.read(1024 * 1024):
248+
sha256_hash.update(chunk)
249+
except urllib.error.HTTPError as error:
250+
raise SystemExit(f"Asset download failed for {url}: HTTP {error.code}") from error
251+
except urllib.error.URLError as error:
252+
raise SystemExit(f"Asset download failed for {url}: {error.reason}") from error
253+
return sha256_hash.hexdigest()
254+
255+
256+
def github_headers() -> dict[str, str]:
257+
"""Build GitHub request headers."""
258+
return {
259+
"Accept": "application/vnd.github+json",
260+
"User-Agent": "mitreattack-python-release-info-updater",
261+
"X-GitHub-Api-Version": "2022-11-28",
262+
}
263+
264+
265+
def version_from_tag(tag: str, tag_prefix: str) -> str:
266+
"""Extract an ATT&CK version from a release tag."""
267+
if not tag.startswith(tag_prefix):
268+
raise SystemExit(f"Release tag {tag!r} does not start with expected prefix {tag_prefix!r}")
269+
version = tag.removeprefix(tag_prefix)
270+
if not re.fullmatch(r"\d+\.\d+(?:[-.][A-Za-z0-9]+)?", version):
271+
raise SystemExit(f"Could not parse ATT&CK release version from tag {tag!r}")
272+
return version
273+
274+
275+
def quote_tag(tag: str) -> str:
276+
"""URL-quote a release tag for GitHub API paths."""
277+
return urllib.parse.quote(tag, safe="")
278+
279+
280+
if __name__ == "__main__":
281+
try:
282+
main()
283+
except KeyboardInterrupt:
284+
sys.exit(130)

tests/test_release_info_updater.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""Tests for the release_info.py updater script."""
2+
3+
from __future__ import annotations
4+
5+
import importlib.util
6+
import sys
7+
from pathlib import Path
8+
9+
SCRIPT_PATH = Path(__file__).parents[1] / "scripts" / "update_release_info.py"
10+
11+
12+
def load_updater():
13+
"""Load the updater script as a test module."""
14+
spec = importlib.util.spec_from_file_location("update_release_info", SCRIPT_PATH)
15+
assert spec
16+
assert spec.loader
17+
module = importlib.util.module_from_spec(spec)
18+
sys.modules["update_release_info"] = module
19+
spec.loader.exec_module(module)
20+
return module
21+
22+
23+
def test_update_release_info_source_updates_required_values():
24+
"""Updater rewrites release constants while preserving unrelated code."""
25+
updater = load_updater()
26+
source = '''"""Release info."""
27+
28+
LATEST_VERSION = "1.0"
29+
30+
STIX20 = {
31+
"enterprise": {"1.0": "old-enterprise-20"},
32+
"mobile": {"1.0": "old-mobile-20"},
33+
"ics": {"1.0": "old-ics-20"},
34+
"pre": {"1.0": "old-pre-20"},
35+
}
36+
37+
STIX21 = {
38+
"enterprise": {"1.0": "old-enterprise-21"},
39+
"mobile": {"1.0": "old-mobile-21"},
40+
"ics": {"1.0": "old-ics-21"},
41+
}
42+
43+
44+
def keep_me():
45+
return "unchanged"
46+
'''
47+
release_hashes = {
48+
"STIX20": {
49+
"enterprise": "new-enterprise-20",
50+
"mobile": "new-mobile-20",
51+
"ics": "new-ics-20",
52+
},
53+
"STIX21": {
54+
"enterprise": "new-enterprise-21",
55+
"mobile": "new-mobile-21",
56+
"ics": "new-ics-21",
57+
},
58+
}
59+
60+
updated = updater.update_release_info_source(source, version="2.0", release_hashes=release_hashes)
61+
62+
assert 'LATEST_VERSION = "2.0"' in updated
63+
assert "'2.0': 'new-enterprise-20'" in updated
64+
assert "'2.0': 'new-mobile-20'" in updated
65+
assert "'2.0': 'new-ics-20'" in updated
66+
assert "'2.0': 'new-enterprise-21'" in updated
67+
assert "'2.0': 'new-mobile-21'" in updated
68+
assert "'2.0': 'new-ics-21'" in updated
69+
assert "'pre': {'1.0': 'old-pre-20'}" in updated
70+
assert 'return "unchanged"' in updated
71+
72+
73+
def test_find_domain_asset_accepts_current_and_versioned_names():
74+
"""Asset lookup accepts release asset naming variants from ATT&CK data repos."""
75+
updater = load_updater()
76+
assets = [
77+
{"name": "enterprise-attack-2.0.json"},
78+
{"name": "mobile-attack.json"},
79+
]
80+
81+
assert updater.find_domain_asset(assets, domain="enterprise", version="2.0")["name"] == "enterprise-attack-2.0.json"
82+
assert updater.find_domain_asset(assets, domain="mobile", version="2.0")["name"] == "mobile-attack.json"

0 commit comments

Comments
 (0)