Skip to content
This repository was archived by the owner on May 5, 2025. It is now read-only.

Commit 7840d9f

Browse files
feat: add version metadata in ta_cache_rollups (#1192)
1 parent 8bf6551 commit 7840d9f

5 files changed

Lines changed: 70 additions & 12 deletions

services/test_analytics/ta_cache_rollups.py

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from datetime import UTC
22
from io import BytesIO
3+
from typing import cast
34

45
import polars as pl
56
import shared.storage
@@ -24,8 +25,33 @@ def rollup_blob_path(repoid: int, branch: str | None = None) -> str:
2425
)
2526

2627

27-
POLARS_SCHEMA = [
28+
# version number that the cache rollup task will be writing to GCS
29+
# if you're creating a new version of the schema, increment this
30+
VERSION = "1"
31+
32+
# list of schemas, you should leave the old ones here as a reference for now
33+
# old schemas should basically be expired after 60 days, since there would be
34+
# no relevant data included in those files after that amount of time
35+
36+
# so from the time you deprecate an old schema, you only have to keep handling it
37+
# for 60 days
38+
NO_VERSION_POLARS_SCHEMA = [
39+
"computed_name",
40+
("flags", pl.List(pl.String)),
41+
"failing_commits",
42+
"last_duration",
43+
"avg_duration",
44+
"pass_count",
45+
"fail_count",
46+
"flaky_fail_count",
47+
"skip_count",
48+
("updated_at", pl.Datetime(time_zone=UTC)),
49+
"timestamp_bin",
50+
]
51+
52+
V1_POLARS_SCHEMA = [
2853
"computed_name",
54+
"testsuite",
2955
("flags", pl.List(pl.String)),
3056
"failing_commits",
3157
"last_duration",
@@ -40,7 +66,6 @@ def rollup_blob_path(repoid: int, branch: str | None = None) -> str:
4066

4167

4268
def cache_rollups(repoid: int, branch: str | None = None):
43-
storage_service = shared.storage.get_appropriate_storage_service(repoid)
4469
serialized_table: BytesIO
4570

4671
with read_rollups_from_db_summary.labels("new").time():
@@ -55,6 +80,7 @@ def cache_rollups(repoid: int, branch: str | None = None):
5580
data = [
5681
{
5782
"computed_name": summary.computed_name,
83+
"testsuite": summary.testsuite,
5884
"flags": summary.flags,
5985
"failing_commits": summary.failing_commits,
6086
"last_duration": summary.last_duration_seconds,
@@ -69,15 +95,20 @@ def cache_rollups(repoid: int, branch: str | None = None):
6995
for summary in summaries
7096
]
7197

72-
serialized_table = pl.DataFrame(
98+
df = pl.DataFrame(
7399
data,
74-
POLARS_SCHEMA,
100+
V1_POLARS_SCHEMA,
75101
orient="row",
76-
).write_ipc(None)
102+
)
103+
serialized_table = df.write_ipc(None)
77104

78105
serialized_table.seek(0)
79106

107+
storage_service = shared.storage.get_appropriate_storage_service(repoid)
80108
storage_service.write_file(
81-
settings.GCS_BUCKET_NAME, rollup_blob_path(repoid, branch), serialized_table
109+
cast(str, settings.GCS_BUCKET_NAME),
110+
rollup_blob_path(repoid, branch),
111+
serialized_table,
112+
metadata={"version": VERSION},
82113
)
83114
rollup_size_summary.labels("new").observe(serialized_table.tell())

services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups__0.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
"computed_name2",
44
"computed_name"
55
],
6+
"testsuite": [
7+
"testsuite2",
8+
"testsuite"
9+
],
610
"flags": [
711
[
812
"test-rollups2"

services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_branch__0.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
"computed_name",
44
"computed_name2"
55
],
6+
"testsuite": [
7+
"testsuite",
8+
"testsuite2"
9+
],
610
"flags": [
711
[
812
"test-rollups"

services/test_analytics/tests/snapshots/ta_cache_rollups__cache_test_rollups_use_timeseries_main__0.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
"computed_name2",
44
"computed_name"
55
],
6+
"testsuite": [
7+
"testsuite2",
8+
"testsuite"
9+
],
610
"flags": [
711
[
812
"test-rollups2"

services/test_analytics/tests/test_ta_cache_rollups.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import datetime as dt
2+
from typing import cast
23

34
import polars as pl
45
import pytest
@@ -8,14 +9,22 @@
89
TestrunBranchSummary,
910
TestrunSummary,
1011
)
12+
from shared.storage.minio import MinioStorageService
1113

14+
from services.test_analytics.ta_cache_rollups import VERSION
1215
from services.test_analytics.utils import calc_test_id
1316
from tasks.cache_test_rollups import CacheTestRollupsTask
1417

1518

16-
def read_table(storage, storage_path: str):
19+
def read_table(
20+
storage: MinioStorageService,
21+
storage_path: str,
22+
meta_container: dict[str, str] | None = None,
23+
):
1724
decompressed_table: bytes = storage.read_file(
18-
get_config("services", "minio", "bucket", default="archive"), storage_path
25+
cast(str, get_config("services", "minio", "bucket", default="archive")),
26+
storage_path,
27+
metadata_container=meta_container,
1928
)
2029
return pl.read_ipc(decompressed_table)
2130

@@ -82,8 +91,11 @@ def test_cache_test_rollups(storage, snapshot):
8291
branch=None,
8392
impl_type="new",
8493
)
85-
86-
table = read_table(storage, "test_analytics/repo_rollups/1.arrow")
94+
meta = {}
95+
table = read_table(
96+
storage, "test_analytics/repo_rollups/1.arrow", meta_container=meta
97+
)
98+
assert meta["version"] == VERSION
8799
table_dict = table.to_dict(as_series=False)
88100
del table_dict["timestamp_bin"]
89101
del table_dict["updated_at"]
@@ -174,8 +186,11 @@ def test_cache_test_rollups_use_timeseries_main(storage, snapshot):
174186
branch="main",
175187
impl_type="new",
176188
)
177-
178-
table = read_table(storage, "test_analytics/branch_rollups/1/main.arrow")
189+
meta = {}
190+
table = read_table(
191+
storage, "test_analytics/branch_rollups/1/main.arrow", meta_container=meta
192+
)
193+
assert meta["version"] == VERSION
179194
table_dict = table.to_dict(as_series=False)
180195
del table_dict["timestamp_bin"]
181196
del table_dict["updated_at"]

0 commit comments

Comments
 (0)