Skip to content

Commit 6996cb3

Browse files
committed
enh: allow to programmatically abort download jobs
1 parent 6d3b420 commit 6996cb3

3 files changed

Lines changed: 42 additions & 1 deletion

File tree

CHANGELOG

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
- feat: add circle archiving in the BagIt (RFC 8493) format
33
- fix: handle resource downloads whose target directory got deleted
44
- fix: check for existence of persistent job list in upload
5+
- enh: allow to programmatically abort download jobs
56
- ref: move setup wizard to `wizard_init` submodule
67
1.0.0
78
- fix: do not attempt to updates non-existent eternal jobs on exit

dcoraid/download/job.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
import pathlib
55
import shutil
6+
import threading
67
import traceback
78

89
import time
@@ -357,7 +358,7 @@ def set_state(self, state):
357358
logger.error(f"{self.traceback}")
358359
self.state = state
359360

360-
def task_download_resource(self):
361+
def task_download_resource(self, abort_event: threading.Event = None):
361362
"""Start the download
362363
363364
The progress of the download is monitored and written
@@ -434,6 +435,12 @@ def task_download_resource(self):
434435
# for condensed data.
435436
and not self.condensed):
436437
hasher.update(chunk)
438+
if (abort_event is not None
439+
and abort_event.is_set()):
440+
logger.warning(f"Aborting job {self}")
441+
self.set_state("abort")
442+
return
443+
437444
self.sha256sum_dl = hasher.hexdigest()
438445
self.end_time = time.perf_counter()
439446
self.set_state("downloaded")

tests/test_download_job.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import shutil
22
import tempfile
3+
import threading
4+
import time
35
from unittest import mock
46
import uuid
57

@@ -22,6 +24,37 @@ def test_initialize():
2224
assert dj.state == "init"
2325

2426

27+
def test_download_abort(tmp_path):
28+
# leukocytes.rtdc
29+
res_id = "f7fa778f-6abd-1b53-ae5f-9ce12601d6f8"
30+
event_abort = threading.Event()
31+
api = common.get_api()
32+
dj = job.DownloadJob(api=api,
33+
resource_id=res_id,
34+
download_path=tmp_path,
35+
)
36+
assert dj.state == "init"
37+
38+
thread = threading.Thread(target=lambda: dj.task_download_resource(event_abort))
39+
thread.start()
40+
41+
for ii in range(100):
42+
bytes_dl = dj.get_status()["bytes downloaded"]
43+
if bytes_dl > 1024*1024:
44+
break
45+
time.sleep(.1)
46+
47+
event_abort.set()
48+
thread.join(timeout=10)
49+
50+
assert dj.get_status()["state"] == "abort"
51+
52+
# There should be an incomplete temporary download file
53+
assert not dj.path.exists()
54+
assert dj.path_temp.exists()
55+
assert dj.path_temp.stat().st_size < dj.get_status()["bytes total"] - 1
56+
57+
2558
def test_download_deleted_directory():
2659
api = common.get_api()
2760
td = tempfile.mkdtemp(prefix="test-download")

0 commit comments

Comments
 (0)