This repository was archived by the owner on May 5, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathtest_cache_rollup_cron_task.py
More file actions
52 lines (41 loc) · 1.51 KB
/
test_cache_rollup_cron_task.py
File metadata and controls
52 lines (41 loc) · 1.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import datetime as dt
from shared.django_apps.reports.models import LastCacheRollupDate
from shared.django_apps.reports.tests.factories import LastCacheRollupDateFactory
from tasks.cache_rollup_cron_task import CacheRollupTask
from tasks.cache_test_rollups import cache_test_rollups_task_name
def test_cache_rollup_cron_task(mock_storage, transactional_db, mocker):
mocked_app = mocker.patch.object(
CacheRollupTask,
"app",
tasks={
cache_test_rollups_task_name: mocker.MagicMock(),
},
)
rollup_date = LastCacheRollupDateFactory(
last_rollup_date=dt.date.today() - dt.timedelta(days=1),
)
rollup_date.save()
CacheRollupTask().run_cron_task(
_db_session=None,
)
mocked_app.tasks[cache_test_rollups_task_name].s.assert_called_once_with(
repo_id=rollup_date.repository_id,
branch=rollup_date.branch,
update_date=False,
)
def test_cache_rollup_cron_task_delete(mock_storage, transactional_db, mocker):
mocked_app = mocker.patch.object(
CacheRollupTask,
"app",
tasks={
cache_test_rollups_task_name: mocker.MagicMock(),
},
)
rollup_date = LastCacheRollupDateFactory(
last_rollup_date=dt.date.today() - dt.timedelta(days=31),
)
CacheRollupTask().run_cron_task(
_db_session=None,
)
mocked_app.tasks[cache_test_rollups_task_name].s.assert_not_called()
assert LastCacheRollupDate.objects.filter(id=rollup_date.id).first() is None