Skip to content

Commit da5556a

Browse files
committed
✨(backend) add async_mode flag
the command must be killable. this adds a async_mode flag to preserve async feature and allow running sync. Signed-off-by: charles <charles.englebert@protonmail.com>
1 parent 6149223 commit da5556a

3 files changed

Lines changed: 122 additions & 29 deletions

File tree

src/backend/core/admin.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ def run_indexing_view(request: HttpRequest):
249249
upper_time_bound=convert_to_isoformat(
250250
request.POST.get("upper_time_bound")
251251
),
252+
async_mode=True,
252253
)
253254
messages.success(request, _("Indexing triggered!"))
254255
else:

src/backend/core/management/commands/index.py

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
"""
44

55
import logging
6+
import time
67
from datetime import datetime
78

89
from django.conf import settings
910
from django.core.management.base import BaseCommand, CommandError
1011

12+
from core import models
1113
from core.services.search_indexers import get_document_indexer
1214
from core.tasks.search import batch_document_indexer_task
1315

@@ -45,6 +47,13 @@ def add_arguments(self, parser):
4547
default=None,
4648
help="DateTime in ISO format. Only documents updated before this date will be indexed",
4749
)
50+
parser.add_argument(
51+
"--async",
52+
action="store_true",
53+
dest="async_mode",
54+
default=False,
55+
help="Whether to execute indexing asynchronously in a Celery task (default: False)",
56+
)
4857

4958
def handle(self, *args, **options):
5059
"""Launch and log search index generation."""
@@ -53,15 +62,37 @@ def handle(self, *args, **options):
5362
if not indexer:
5463
raise CommandError("The indexer is not enabled or properly configured.")
5564

56-
batch_document_indexer_task.apply_async(
57-
kwargs={
58-
"lower_time_bound": options["lower_time_bound"],
59-
"upper_time_bound": options["upper_time_bound"],
60-
"batch_size": options["batch_size"],
61-
"crash_safe_mode": True,
62-
},
63-
)
65+
if options["async_mode"]:
66+
batch_document_indexer_task.apply_async(
67+
kwargs={
68+
"lower_time_bound": options["lower_time_bound"],
69+
"upper_time_bound": options["upper_time_bound"],
70+
"batch_size": options["batch_size"],
71+
"crash_safe_mode": True,
72+
},
73+
)
74+
logger.info(
75+
"Document indexing task sent to worker",
76+
)
77+
else:
78+
logger.info("Starting to regenerate Find index...")
79+
start = time.perf_counter()
6480

65-
logger.info(
66-
"Document indexing task sent to worker",
67-
)
81+
try:
82+
count = indexer.index(
83+
queryset=models.Document.objects.time_filter(
84+
lower_time_bound=options["lower_time_bound"],
85+
upper_time_bound=options["upper_time_bound"],
86+
),
87+
batch_size=options["batch_size"],
88+
crash_safe_mode=True,
89+
)
90+
except Exception as err:
91+
raise CommandError("Unable to regenerate index") from err
92+
93+
duration = time.perf_counter() - start
94+
logger.info(
95+
"Search index regenerated from %d document(s) in %.2f seconds.",
96+
count,
97+
duration,
98+
)

src/backend/core/tests/commands/test_index.py

Lines changed: 79 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -83,17 +83,17 @@ def test_index_with_both_bounds_success():
8383
lower_time_bound=lower_time_bound.isoformat(),
8484
upper_time_bound=upper_time_bound.isoformat(),
8585
)
86-
all_push_call_args = [
86+
pushed_document_ids = [
8787
document["id"]
8888
for call_arg_list in mock_push.call_args_list
8989
for document in call_arg_list.args[0]
9090
]
9191

9292
# Only documents in window should be indexed
93-
assert str(document_too_early.id) not in all_push_call_args
94-
assert str(document_in_window_1.id) in all_push_call_args
95-
assert str(document_in_window_2.id) in all_push_call_args
96-
assert str(document_too_late.id) not in all_push_call_args
93+
assert str(document_too_early.id) not in pushed_document_ids
94+
assert str(document_in_window_1.id) in pushed_document_ids
95+
assert str(document_in_window_2.id) in pushed_document_ids
96+
assert str(document_too_late.id) not in pushed_document_ids
9797

9898
# Checkpoint should be set to last indexed document's updated_at
9999
assert (
@@ -150,13 +150,14 @@ def push_with_failure_on_batch_2(data):
150150
# First run: simulate crash on batch 3
151151
with mock.patch.object(FindDocumentIndexer, "push") as mock_push:
152152
mock_push.side_effect = push_with_failure_on_batch_2
153-
call_command(
154-
"index",
155-
batch_size=batch_size,
156-
lower_time_bound=lower_time_bound.isoformat(),
157-
upper_time_bound=upper_time_bound.isoformat(),
158-
)
159-
all_push_call_args = [
153+
with pytest.raises(CommandError):
154+
call_command(
155+
"index",
156+
batch_size=batch_size,
157+
lower_time_bound=lower_time_bound.isoformat(),
158+
upper_time_bound=upper_time_bound.isoformat(),
159+
)
160+
pushed_document_ids = [
160161
document["id"]
161162
for call_arg_list in mock_push.call_args_list
162163
for document in call_arg_list.args[0]
@@ -167,13 +168,13 @@ def push_with_failure_on_batch_2(data):
167168
assert checkpoint == documents[3].updated_at.isoformat()
168169
# first 2 batches should be indexed successfully
169170
for i in range(0, 4):
170-
assert str(documents[i].id) in all_push_call_args
171+
assert str(documents[i].id) in pushed_document_ids
171172
# next batch should have been attempted but failed
172173
for i in range(4, 6):
173-
assert str(documents[i].id) in all_push_call_args
174+
assert str(documents[i].id) in pushed_document_ids
174175
# last batches indexing should not have been attempted
175176
for i in range(6, 8):
176-
assert str(documents[i].id) not in all_push_call_args
177+
assert str(documents[i].id) not in pushed_document_ids
177178

178179
# Second run: resume from checkpoint
179180
with mock.patch.object(FindDocumentIndexer, "push") as mock_push:
@@ -183,7 +184,7 @@ def push_with_failure_on_batch_2(data):
183184
lower_time_bound=checkpoint,
184185
upper_time_bound=upper_time_bound.isoformat(),
185186
)
186-
all_push_call_args = [
187+
pushed_document_ids = [
187188
document["id"]
188189
for call_arg_list in mock_push.call_args_list
189190
for document in call_arg_list.args[0]
@@ -193,12 +194,12 @@ def push_with_failure_on_batch_2(data):
193194
# except the last document of the last batch which is on the checkpoint boundary
194195
# -> doc 0, 1 and 2
195196
for i in range(0, 3):
196-
assert str(documents[i].id) not in all_push_call_args
197+
assert str(documents[i].id) not in pushed_document_ids
197198
# next batches should be indexed including the document at the checkpoint boundary
198199
# which has already been indexed and is re-indexed
199200
# -> doc 3 to the end
200201
for i in range(3, 8):
201-
assert str(documents[i].id) in all_push_call_args
202+
assert str(documents[i].id) in pushed_document_ids
202203

203204

204205
@pytest.mark.django_db
@@ -211,3 +212,63 @@ def test_index_improperly_configured(indexer_settings):
211212
call_command("index")
212213

213214
assert str(err.value) == "The indexer is not enabled or properly configured."
215+
216+
217+
@pytest.mark.django_db
218+
@pytest.mark.usefixtures("indexer_settings")
219+
def test_index_with_async_flag(settings):
220+
"""Test the command `index` with --async=True runs task asynchronously."""
221+
cache.clear()
222+
lower_time_bound = datetime(2024, 2, 1, tzinfo=timezone.utc)
223+
224+
with mock.patch(
225+
"core.management.commands.index.batch_document_indexer_task"
226+
) as mock_task:
227+
with mock.patch.object(FindDocumentIndexer, "push") as mock_push:
228+
call_command(
229+
"index", async_mode=True, lower_time_bound=lower_time_bound.isoformat()
230+
)
231+
# push not be called synchronously
232+
mock_push.assert_not_called()
233+
# task called asynchronously
234+
mock_task.apply_async.assert_called_once_with(
235+
kwargs={
236+
"lower_time_bound": lower_time_bound.isoformat(),
237+
"upper_time_bound": None,
238+
"batch_size": settings.SEARCH_INDEXER_BATCH_SIZE,
239+
"crash_safe_mode": True,
240+
}
241+
)
242+
243+
assert cache.get(BULK_INDEXER_CHECKPOINT) is None
244+
245+
246+
@pytest.mark.django_db
247+
@pytest.mark.usefixtures("indexer_settings")
248+
def test_index_without_async_flag():
249+
"""Test the command `index` with --async=False runs synchronously."""
250+
cache.clear()
251+
lower_time_bound = datetime(2024, 2, 1, tzinfo=timezone.utc)
252+
253+
document = create_document_with_updated_at(
254+
updated_at=lower_time_bound + timedelta(days=10)
255+
)
256+
257+
with mock.patch(
258+
"core.management.commands.index.batch_document_indexer_task"
259+
) as mock_task:
260+
with mock.patch.object(FindDocumentIndexer, "push") as mock_push:
261+
call_command(
262+
"index", async_mode=False, lower_time_bound=lower_time_bound.isoformat()
263+
)
264+
# push is called synchronously to index the document
265+
pushed_document_ids = [
266+
document["id"]
267+
for call_arg_list in mock_push.call_args_list
268+
for document in call_arg_list.args[0]
269+
]
270+
assert str(document.id) in pushed_document_ids
271+
# async task not called
272+
mock_task.apply_async.assert_not_called()
273+
274+
assert cache.get(BULK_INDEXER_CHECKPOINT) == document.updated_at.isoformat()

0 commit comments

Comments
 (0)