Skip to content

Commit 05f9e99

Browse files
committed
skip[ci]: dedup keys for results.json (the benchmark) runs
Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent b380c53 commit 05f9e99

3 files changed

Lines changed: 149 additions & 3 deletions

File tree

.github/workflows/bench.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929
3030
sudo apt-get update && sudo apt-get install -y jq
3131
bash scripts/commit-json.sh > new-commit.json
32-
bash scripts/cat-s3.sh vortex-ci-benchmark-results commits.json new-commit.json
32+
python3 scripts/cat-s3.py vortex-ci-benchmark-results commits.json new-commit.json
3333
3434
bench:
3535
timeout-minutes: 120
@@ -100,7 +100,7 @@ jobs:
100100
- name: Upload Benchmark Results
101101
shell: bash
102102
run: |
103-
bash scripts/cat-s3.sh vortex-ci-benchmark-results data.json.gz results.json
103+
python3 scripts/cat-s3.py vortex-ci-benchmark-results data.json.gz results.json
104104
105105
- name: Alert incident.io
106106
if: failure()

.github/workflows/sql-benchmarks.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ jobs:
274274
if: inputs.mode == 'develop'
275275
shell: bash
276276
run: |
277-
bash scripts/cat-s3.sh vortex-ci-benchmark-results data.json.gz results.json
277+
python3 scripts/cat-s3.py vortex-ci-benchmark-results data.json.gz results.json
278278
279279
- name: Alert incident.io
280280
if: failure() && inputs.mode == 'develop'

scripts/cat-s3.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
#!/usr/bin/env python3
2+
# SPDX-License-Identifier: Apache-2.0
3+
# SPDX-FileCopyrightText: Copyright the Vortex contributors
4+
5+
"""Append JSONL benchmark results to an S3 object with duplicate-commit detection and optimistic locking."""
6+
7+
import argparse
8+
import gzip
9+
import json
10+
import subprocess
11+
import sys
12+
import tempfile
13+
import time
14+
15+
16+
def head_etag(bucket: str, key: str) -> str | None:
17+
result = subprocess.run(
18+
[
19+
"aws", "s3api", "head-object",
20+
"--bucket", bucket,
21+
"--key", key,
22+
"--query", "ETag",
23+
"--output", "text",
24+
],
25+
capture_output=True,
26+
text=True,
27+
)
28+
if result.returncode != 0:
29+
return None
30+
etag = result.stdout.strip()
31+
if not etag or etag == "null":
32+
return None
33+
return etag
34+
35+
36+
def get_object(bucket: str, key: str, dest: str, if_match: str) -> bool:
37+
result = subprocess.run(
38+
[
39+
"aws", "s3api", "get-object",
40+
"--bucket", bucket,
41+
"--key", key,
42+
"--if-match", if_match,
43+
dest,
44+
],
45+
)
46+
return result.returncode == 0
47+
48+
49+
def put_object(bucket: str, key: str, body: str, if_match: str) -> bool:
50+
result = subprocess.run(
51+
[
52+
"aws", "s3api", "put-object",
53+
"--bucket", bucket,
54+
"--key", key,
55+
"--body", body,
56+
"--if-match", if_match,
57+
],
58+
)
59+
return result.returncode == 0
60+
61+
62+
def read_jsonl(path: str) -> list[str]:
63+
"""Read a JSONL file, returning raw lines."""
64+
with open(path) as f:
65+
return [line for line in f if line.strip()]
66+
67+
68+
def extract_commit_ids(lines: list[str]) -> set[str]:
69+
"""Extract unique commit_id values from JSONL lines."""
70+
ids = set()
71+
for line in lines:
72+
obj = json.loads(line)
73+
if "commit_id" in obj:
74+
ids.add(obj["commit_id"])
75+
return ids
76+
77+
78+
def main() -> None:
79+
parser = argparse.ArgumentParser(description="Append JSONL to an S3 object with dedup")
80+
parser.add_argument("bucket", help="S3 bucket name")
81+
parser.add_argument("key", help="S3 object key")
82+
parser.add_argument("local_file", help="Local JSONL file to append")
83+
parser.add_argument("--max-retries", type=int, default=100)
84+
args = parser.parse_args()
85+
86+
is_gz = args.key.endswith(".gz")
87+
new_lines = read_jsonl(args.local_file)
88+
new_commit_ids = extract_commit_ids(new_lines)
89+
90+
for attempt in range(1, args.max_retries + 1):
91+
etag = head_etag(args.bucket, args.key)
92+
if etag is None:
93+
print("Failed to retrieve ETag.", file=sys.stderr)
94+
sys.exit(1)
95+
96+
with tempfile.NamedTemporaryFile(delete=False) as tmp:
97+
local_copy = tmp.name
98+
99+
if not get_object(args.bucket, args.key, local_copy, etag):
100+
print(f"ETag mismatch during download (attempt {attempt}), retrying...", file=sys.stderr)
101+
continue
102+
103+
# Read existing data.
104+
if is_gz:
105+
with gzip.open(local_copy, "rt") as f:
106+
existing_lines = [line for line in f if line.strip()]
107+
else:
108+
with open(local_copy) as f:
109+
existing_lines = [line for line in f if line.strip()]
110+
111+
# Check for duplicate commits.
112+
existing_commit_ids = extract_commit_ids(existing_lines)
113+
duplicates = new_commit_ids & existing_commit_ids
114+
if duplicates:
115+
print(
116+
f"ERROR: commit(s) {', '.join(sorted(duplicates))} already exist in "
117+
f"s3://{args.bucket}/{args.key}. Refusing to append duplicate data.",
118+
file=sys.stderr,
119+
)
120+
sys.exit(1)
121+
122+
# Concatenate.
123+
combined = "".join(existing_lines) + "".join(new_lines)
124+
125+
with tempfile.NamedTemporaryFile(delete=False, suffix=".gz" if is_gz else "") as tmp:
126+
output_path = tmp.name
127+
if is_gz:
128+
with gzip.open(output_path, "wt") as f:
129+
f.write(combined)
130+
else:
131+
with open(output_path, "w") as f:
132+
f.write(combined)
133+
134+
if put_object(args.bucket, args.key, output_path, etag):
135+
print("File updated and uploaded successfully.")
136+
return
137+
138+
print(f"ETag mismatch during upload (attempt {attempt}), retrying...", file=sys.stderr)
139+
time.sleep(0.1)
140+
141+
print(f"Too many failures: {args.max_retries}.", file=sys.stderr)
142+
sys.exit(1)
143+
144+
145+
if __name__ == "__main__":
146+
main()

0 commit comments

Comments
 (0)