Skip to content
This repository was archived by the owner on May 5, 2025. It is now read-only.

Commit 2973343

Browse files
authored
Keep track of in-progress Uploads (#764)
1 parent d07dcfb commit 2973343

10 files changed

Lines changed: 232 additions & 37 deletions

File tree

conftest.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,6 @@ def mock_configuration(mocker):
134134
"secret_access_key": "codecov-default-secret",
135135
"verify_ssl": False,
136136
},
137-
"redis_url": "redis://redis:@localhost:6379/",
138137
"smtp": {
139138
"host": "mailhog",
140139
"port": 1025,

services/processing/__init__.py

Whitespace-only changes.

services/processing/state.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
"""
2+
This abstracts the "processing state" for a commit.
3+
4+
It takes care that each upload for a specific commit is going through the following
5+
states:
6+
7+
- "processing": when an upload was received and is being parsed/processed.
8+
- "processed": the upload has been processed and an "intermediate report" has been stored,
9+
the upload is now waiting to be merged into the "master report".
10+
- "merged": the upload was fully merged into the "master report".
11+
12+
The logic in this file also makes sure that processing and merging happens in an "optimal" way
13+
meaning that:
14+
15+
- "postprocessing", which means triggering notifications and other followup work
16+
only happens once for a commit.
17+
- merging should happen in batches, as that involves loading a bunch of "intermediate report"s
18+
into memory, which should be bounded.
19+
- (ideally in the future) an upload that has been processed into an "intermediate report"
20+
should be merged directly into the "master report" without doing a storage roundtrip for that
21+
"intermediate report".
22+
"""
23+
24+
from dataclasses import dataclass
25+
26+
from shared.metrics import Counter
27+
28+
from services.redis import get_redis_connection
29+
30+
MERGE_BATCH_SIZE = 5
31+
32+
CLEARED_UPLOADS = Counter(
33+
"worker_processing_cleared_uploads",
34+
"Number of uploads cleared from queue because of errors",
35+
)
36+
37+
38+
@dataclass
39+
class UploadNumbers:
40+
processing: int
41+
"""
42+
The number of uploads currently being processed.
43+
"""
44+
45+
processed: int
46+
"""
47+
The number of uploads that have been processed,
48+
and are waiting on being merged into the "master report".
49+
"""
50+
51+
52+
def should_perform_merge(uploads: UploadNumbers) -> bool:
53+
"""
54+
Determines whether a merge should be performed.
55+
56+
This is the case when no more uploads are expected,
57+
or we reached the desired batch size for merging.
58+
"""
59+
return uploads.processing == 0 or uploads.processed >= MERGE_BATCH_SIZE
60+
61+
62+
def should_trigger_postprocessing(uploads: UploadNumbers) -> bool:
63+
"""
64+
Determines whether post-processing steps, such as notifications, etc,
65+
should be performed.
66+
67+
This is the case when no more uploads are expected,
68+
and all the processed uploads have been merged into the "master report".
69+
"""
70+
return uploads.processing == 0 and uploads.processed == 0
71+
72+
73+
class ProcessingState:
74+
def __init__(self, repoid: int, commitsha: str) -> None:
75+
self._redis = get_redis_connection()
76+
self.repoid = repoid
77+
self.commitsha = commitsha
78+
79+
def get_upload_numbers(self):
80+
processing = self._redis.scard(self._redis_key("processing"))
81+
processed = self._redis.scard(self._redis_key("processed"))
82+
return UploadNumbers(processing, processed)
83+
84+
def mark_uploads_as_processing(self, upload_ids: list[int]):
85+
self._redis.sadd(self._redis_key("processing"), *upload_ids)
86+
87+
def clear_in_progress_uploads(self, upload_ids: list[int]):
88+
removed_uploads = self._redis.srem(self._redis_key("processing"), *upload_ids)
89+
if removed_uploads > 0:
90+
# the normal flow would move the uploads from the "processing" set
91+
# to the "processed" set via `mark_upload_as_processed`.
92+
# this function here is only called in the error case and we don't expect
93+
# this to be triggered often, if at all.
94+
CLEARED_UPLOADS.inc(removed_uploads)
95+
96+
def mark_upload_as_processed(self, upload_id: int):
97+
res = self._redis.smove(
98+
self._redis_key("processing"), self._redis_key("processed"), upload_id
99+
)
100+
if not res:
101+
# this can happen when `upload_id` was never in the source set,
102+
# which probably is the case during initial deployment as
103+
# the code adding this to the initial set was not deployed yet
104+
# TODO: make sure to remove this code after a grace period
105+
self._redis.sadd(self._redis_key("processed"), upload_id)
106+
107+
def mark_uploads_as_merged(self, upload_ids: list[int]):
108+
self._redis.srem(self._redis_key("processed"), *upload_ids)
109+
110+
def get_uploads_for_merging(self) -> set[int]:
111+
return set(
112+
int(id)
113+
for id in self._redis.srandmember(
114+
self._redis_key("processed"), MERGE_BATCH_SIZE
115+
)
116+
)
117+
118+
def _redis_key(self, state: str) -> str:
119+
return f"upload-processing-state/{self.repoid}/{self.commitsha}/{state}"
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
from uuid import uuid4
2+
3+
from services.processing.state import (
4+
ProcessingState,
5+
should_perform_merge,
6+
should_trigger_postprocessing,
7+
)
8+
9+
10+
def test_single_upload():
11+
state = ProcessingState(1234, uuid4().hex)
12+
state.mark_uploads_as_processing([1])
13+
14+
state.mark_upload_as_processed(1)
15+
16+
# this is the only in-progress upload, nothing more to expect
17+
assert should_perform_merge(state.get_upload_numbers())
18+
19+
assert state.get_uploads_for_merging() == {1}
20+
state.mark_uploads_as_merged([1])
21+
22+
assert should_trigger_postprocessing(state.get_upload_numbers())
23+
24+
25+
def test_concurrent_uploads():
26+
state = ProcessingState(1234, uuid4().hex)
27+
state.mark_uploads_as_processing([1])
28+
29+
state.mark_upload_as_processed(1)
30+
# meanwhile, another upload comes in:
31+
state.mark_uploads_as_processing([2])
32+
33+
# not merging/postprocessing yet, as that will be debounced with the second upload
34+
assert not should_perform_merge(state.get_upload_numbers())
35+
36+
state.mark_upload_as_processed(2)
37+
38+
assert should_perform_merge(state.get_upload_numbers())
39+
40+
assert state.get_uploads_for_merging() == {1, 2}
41+
state.mark_uploads_as_merged([1, 2])
42+
43+
assert should_trigger_postprocessing(state.get_upload_numbers())
44+
45+
46+
def test_batch_merging_many_uploads():
47+
state = ProcessingState(1234, uuid4().hex)
48+
49+
state.mark_uploads_as_processing([1, 2, 3, 4, 5, 6, 7, 8, 9])
50+
51+
for id in range(1, 9):
52+
state.mark_upload_as_processed(id)
53+
54+
# we have only processed 8 out of 9. we want to do a batched merge
55+
assert should_perform_merge(state.get_upload_numbers())
56+
merging = state.get_uploads_for_merging()
57+
assert len(merging) == 5 # = MERGE_BATCH_SIZE
58+
state.mark_uploads_as_merged(merging)
59+
60+
# but no notifications yet
61+
assert not should_trigger_postprocessing(state.get_upload_numbers())
62+
63+
state.mark_upload_as_processed(9)
64+
65+
# with the last upload being processed, we do another merge, and then trigger notifications
66+
assert should_perform_merge(state.get_upload_numbers())
67+
merging = state.get_uploads_for_merging()
68+
assert len(merging) == 4
69+
state.mark_uploads_as_merged(merging)
70+
71+
assert should_trigger_postprocessing(state.get_upload_numbers())

services/tests/test_redis.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
from services.redis import get_redis_connection
2-
from test_utils.base import BaseTestCase
32

43

5-
class TestRedis(BaseTestCase):
6-
def test_get_redis_connection(self, mocker, mock_configuration):
7-
mocked = mocker.patch("services.redis.Redis.from_url")
8-
res = get_redis_connection()
9-
assert res is not None
10-
mocked.assert_called_with("redis://redis:@localhost:6379/")
4+
def test_get_redis_connection(mocker):
5+
mocked = mocker.patch("services.redis.Redis.from_url")
6+
res = get_redis_connection()
7+
assert res is not None
8+
mocked.assert_called_with("redis://redis:6379")

tasks/tests/unit/test_send_email_task.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ def mock_configuration_no_smtp(mocker):
3030
"secret_access_key": "codecov-default-secret",
3131
"verify_ssl": False,
3232
},
33-
"redis_url": "redis://redis:@localhost:6379/",
3433
},
3534
"setup": {
3635
"codecov_url": "https://codecov.io",

tasks/tests/unit/test_upload_processing_task.py

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ def test_upload_processor_task_call(
5151
dbsession,
5252
codecov_vcr,
5353
mock_storage,
54-
mock_redis,
5554
celery_app,
5655
):
5756
mocker.patch.object(
@@ -149,11 +148,6 @@ def test_upload_processor_task_call(
149148
}
150149

151150
mocked_1.assert_called_with(commit.commitid, None)
152-
mock_redis.lock.assert_called_with(
153-
f"upload_processing_lock_{commit.repoid}_{commit.commitid}",
154-
blocking_timeout=5,
155-
timeout=300,
156-
)
157151

158152
@pytest.mark.integration
159153
@pytest.mark.django_db(databases={"default"})
@@ -164,7 +158,6 @@ def test_upload_processor_task_call_should_delete(
164158
dbsession,
165159
codecov_vcr,
166160
mock_storage,
167-
mock_redis,
168161
celery_app,
169162
):
170163
mocker.patch.object(
@@ -272,15 +265,10 @@ def test_upload_processor_task_call_should_delete(
272265
}
273266

274267
mocked_1.assert_called_with(commit.commitid, None)
275-
mock_redis.lock.assert_called_with(
276-
f"upload_processing_lock_{commit.repoid}_{commit.commitid}",
277-
blocking_timeout=5,
278-
timeout=300,
279-
)
280268

281269
@pytest.mark.django_db(databases={"default"})
282270
def test_upload_processor_call_with_upload_obj(
283-
self, mocker, mock_configuration, dbsession, mock_storage
271+
self, mocker, dbsession, mock_storage
284272
):
285273
mocker.patch.object(
286274
USE_LABEL_INDEX_IN_REPORT_PROCESSING_BY_REPO_ID,
@@ -400,7 +388,6 @@ def test_upload_task_call_existing_chunks(
400388
dbsession,
401389
codecov_vcr,
402390
mock_storage,
403-
mock_redis,
404391
celery_app,
405392
):
406393
mocker.patch.object(
@@ -462,11 +449,6 @@ def test_upload_task_call_existing_chunks(
462449
assert expected_result == result
463450
assert commit.message == "dsidsahdsahdsa"
464451
mocked_1.assert_called_with(commit.commitid, None)
465-
mock_redis.lock.assert_called_with(
466-
f"upload_processing_lock_{commit.repoid}_{commit.commitid}",
467-
blocking_timeout=5,
468-
timeout=300,
469-
)
470452

471453
@pytest.mark.django_db(databases={"default"})
472454
def test_upload_task_call_exception_within_individual_upload(
@@ -476,7 +458,6 @@ def test_upload_task_call_exception_within_individual_upload(
476458
dbsession,
477459
codecov_vcr,
478460
mock_storage,
479-
mock_redis,
480461
celery_app,
481462
):
482463
commit = CommitFactory.create(
@@ -607,7 +588,6 @@ def test_upload_task_call_with_expired_report(
607588
dbsession,
608589
mock_repo_provider,
609590
mock_storage,
610-
mock_redis,
611591
celery_app,
612592
):
613593
mocked_1 = mocker.patch.object(ArchiveService, "read_chunks")
@@ -697,7 +677,6 @@ def test_upload_task_process_individual_report_with_notfound_report(
697677
dbsession,
698678
mock_repo_provider,
699679
mock_storage,
700-
mock_redis,
701680
):
702681
mocked_1 = mocker.patch.object(ArchiveService, "read_chunks")
703682
mocked_1.return_value = None
@@ -765,7 +744,6 @@ def test_upload_task_call_with_empty_report(
765744
dbsession,
766745
mock_repo_provider,
767746
mock_storage,
768-
mock_redis,
769747
celery_app,
770748
):
771749
mocked_1 = mocker.patch.object(ArchiveService, "read_chunks")
@@ -860,7 +838,6 @@ def test_upload_task_call_no_successful_report(
860838
dbsession,
861839
mock_repo_provider,
862840
mock_storage,
863-
mock_redis,
864841
celery_app,
865842
):
866843
mocked_1 = mocker.patch.object(ArchiveService, "read_chunks")
@@ -951,7 +928,6 @@ def test_upload_task_call_softtimelimit(
951928
dbsession,
952929
mock_repo_provider,
953930
mock_storage,
954-
mock_redis,
955931
celery_app,
956932
):
957933
mocked_1 = mocker.patch.object(ArchiveService, "read_chunks")
@@ -996,7 +972,6 @@ def test_upload_task_call_celeryerror(
996972
dbsession,
997973
mock_repo_provider,
998974
mock_storage,
999-
mock_redis,
1000975
celery_app,
1001976
):
1002977
mocked_1 = mocker.patch.object(ArchiveService, "read_chunks")

tasks/upload.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from helpers.save_commit_error import save_commit_error
3838
from services.archive import ArchiveService
3939
from services.bundle_analysis.report import BundleAnalysisReportService
40+
from services.processing.state import ProcessingState
4041
from services.redis import download_archive_from_redis, get_redis_connection
4142
from services.report import (
4243
BaseReportService,
@@ -671,6 +672,11 @@ def create_parallel_tasks(
671672
checkpoints: CheckpointLogger,
672673
run_fully_parallel: bool,
673674
):
675+
state = ProcessingState(commit.repoid, commit.commitid)
676+
state.mark_uploads_as_processing(
677+
[int(upload["upload_pk"]) for upload in argument_list]
678+
)
679+
674680
parallel_processing_tasks = [
675681
upload_processor_task.s(
676682
repoid=commit.repoid,

0 commit comments

Comments
 (0)