Skip to content

Commit 0f4630a

Browse files
committed
fix: remember ETag of uploaded resources (close #80)
1 parent 20503a1 commit 0f4630a

5 files changed

Lines changed: 126 additions & 18 deletions

File tree

CHANGELOG

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
0.18.3
2+
- fix: remember ETag of uploaded resources (#80)
23
- enh: allow sharing individual datasets instead of collections (#32)
34
- enh: add preview option for dataset description in upload dialog (#52)
5+
- enh: introduce write lock for eternal job updates
46
0.18.2
57
- ref: make action buttons in filter view hideable
68
0.18.2

dcoraid/upload/job.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import shutil
66
import time
77
import traceback
8+
from typing import Callable
89
import warnings
910

1011
import dclab
@@ -54,6 +55,7 @@ def __init__(self,
5455
resource_paths: list[str | pathlib.Path],
5556
resource_names: list[str] = None,
5657
resource_supplements: list[dict] = None,
58+
resource_etags: list[str] = None,
5759
collections: list[str] = None,
5860
task_id: str = None,
5961
cache_dir: str | pathlib.Path = None):
@@ -79,6 +81,8 @@ def __init__(self,
7981
on DCOR
8082
resource_supplements: list of dict
8183
Supplementary resource information
84+
resource_etags: list
85+
Any previously computed ETags of the resources
8286
collections: list of strings
8387
List of unique identifiers of collections that this
8488
dataset should be appended to
@@ -152,7 +156,7 @@ def __init__(self,
152156
for ff in self.paths]
153157
self.file_bytes_uploaded = [0] * len(self.paths)
154158
#: ETags for the files uploaded by this UploadJob instance
155-
self.etags = [None] * len(self.paths)
159+
self.etags = resource_etags or [None] * len(self.paths)
156160
#: Resource index indicating upload progress
157161
self.index = 0
158162
self.start_time = None
@@ -188,6 +192,7 @@ def __getstate__(self):
188192
"resource_paths": [str(pp) for pp in self.paths],
189193
"resource_names": self.resource_names,
190194
"resource_supplements": self.supplements,
195+
"resource_etags": self.etags,
191196
"task_id": self.task_id,
192197
}
193198
return uj_state
@@ -550,12 +555,17 @@ def task_compress_resources(self):
550555
self.file_sizes[ii] = path_out.stat().st_size
551556
self.set_state("parcel")
552557

553-
def task_upload_resources(self):
558+
def task_upload_resources(self,
559+
run_after_upload: Callable = None
560+
):
554561
"""Start the upload
555562
556563
The progress of the upload is monitored and written
557564
to attributes. The current status can be retrieved
558565
via :func:`UploadJob.get_status`.
566+
567+
``run_after_upload(self)`` is called after each resource upload.
568+
This is used to update the eternal/immortalized job on disk.
559569
"""
560570
if self.state == "parcel":
561571
# reset everything
@@ -605,6 +615,8 @@ def task_upload_resources(self):
605615
self.etags[ii] = etag
606616
self.paths_uploaded.append(path)
607617
self.wait_time += srv_time
618+
if run_after_upload is not None:
619+
run_after_upload(self)
608620
self.end_time = time.perf_counter()
609621
self.set_state("online")
610622
else:

dcoraid/upload/queue.py

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import logging
22
import pathlib
33
import time
4+
import threading
5+
import traceback
46
import traceback as tb
57
import warnings
68

@@ -27,6 +29,7 @@ def __init__(self, path):
2729
self.path_queued = self.path / "queued"
2830
self.path_completed.mkdir(parents=True, exist_ok=True)
2931
self.path_queued.mkdir(parents=True, exist_ok=True)
32+
self.write_locks = {}
3033

3134
def __contains__(self, item):
3235
if isinstance(item, UploadJob):
@@ -49,6 +52,10 @@ def get_queued_dataset_ids(self):
4952
"""Return list of DCOR dataset IDs corresponding to queued jobs"""
5053
return sorted([pp.stem for pp in self.path_queued.glob("*.json")])
5154

55+
def get_write_lock(self, dataset_id):
56+
"""Return a `threading.Lock` object for write operations"""
57+
return self.write_locks.setdefault(dataset_id, threading.Lock())
58+
5259
def is_job_done(self, dataset_id):
5360
jp = self.path_completed / (dataset_id + ".json")
5461
return jp.exists()
@@ -68,21 +75,27 @@ def immortalize_job(self, upload_job):
6875
# is done after checking for queued (above case).
6976
raise FileExistsError(f"The job '{upload_job.dataset_id}' is "
7077
f"already done!")
71-
save_task(upload_job=upload_job, path=pout)
78+
with self.get_write_lock(upload_job.dataset_id):
79+
save_task(upload_job=upload_job, path=pout)
7280

7381
def job_exists(self, dataset_id):
74-
return self.is_job_queued(dataset_id) or self.is_job_done(dataset_id)
82+
# Use a write lock here to avoid race conditions
83+
with self.get_write_lock(dataset_id):
84+
return (self.is_job_queued(dataset_id)
85+
or self.is_job_done(dataset_id))
7586

7687
def obliterate_job(self, dataset_id):
7788
"""Remove a job from the persistent queue list"""
7889
pdel = self.path_queued / (dataset_id + ".json")
79-
pdel.unlink()
90+
with self.get_write_lock(dataset_id):
91+
pdel.unlink()
8092

8193
def set_job_done(self, dataset_id):
8294
"""Move the job from the queue to the complete list"""
8395
pin = self.path_queued / (dataset_id + ".json")
8496
pout = self.path_completed / (dataset_id + ".json")
85-
pin.rename(pout)
97+
with self.get_write_lock(dataset_id):
98+
pin.rename(pout)
8699

87100
def summon_job(self, dataset_id, api, cache_dir=None):
88101
"""Instantiate job from the persistent queue list"""
@@ -91,6 +104,20 @@ def summon_job(self, dataset_id, api, cache_dir=None):
91104
assert upload_job.dataset_id == dataset_id
92105
return upload_job
93106

107+
def update_job(self, upload_job):
108+
"""Update an immortalized job dictionary
109+
110+
The job must exist. Updating a job makes sense when e.g.
111+
the ETag of a resource becomes known.
112+
"""
113+
if not self.job_exists(upload_job.dataset_id):
114+
raise ValueError(f"Cannot update non-existent job {upload_job}")
115+
with self.get_write_lock(upload_job.dataset_id):
116+
pq = self.path_queued / (upload_job.dataset_id + ".json")
117+
pc = self.path_completed / (upload_job.dataset_id + ".json")
118+
pp = pq if pq.exists() else pc
119+
save_task(upload_job=upload_job, path=pp)
120+
94121

95122
class UploadQueue:
96123
def __init__(self, api, path_persistent_job_list=None, cache_dir=None):
@@ -142,13 +169,19 @@ def __init__(self, api, path_persistent_job_list=None, cache_dir=None):
142169
else:
143170
self.jobs_eternal = None
144171
self.daemon_compress = CompressDaemon(self.jobs)
145-
self.daemon_upload = UploadDaemon(self.jobs)
172+
self.daemon_upload = UploadDaemon(self.jobs, self.jobs_eternal)
146173
self.daemon_verify = VerifyDaemon(self.jobs)
147174

148175
def __contains__(self, upload_job):
149176
return upload_job in self.jobs
150177

151178
def __del__(self):
179+
# Attempt to update the eternal jobs (important for ETags)
180+
for job in self.jobs:
181+
try:
182+
self.jobs_eternal.update_job(job)
183+
except BaseException:
184+
self.logger.error(traceback.format_exc())
152185
self.daemon_upload.shutdown_flag.set()
153186
self.daemon_verify.shutdown_flag.set()
154187
self.daemon_compress.shutdown_flag.set()
@@ -199,7 +232,7 @@ def abort_job(self, dataset_id):
199232
# the daemon to set all the other jobs to "abort".
200233
self.daemon_upload.shutdown_flag.set()
201234
self.daemon_upload.terminate()
202-
self.daemon_upload = UploadDaemon(self.jobs)
235+
self.daemon_upload = UploadDaemon(self.jobs, self.jobs_eternal)
203236
elif prev_state == "compress":
204237
self.daemon_compress.shutdown_flag.set()
205238
self.daemon_compress.terminate()
@@ -327,12 +360,20 @@ def __init__(self, jobs):
327360

328361

329362
class UploadDaemon(Daemon):
330-
def __init__(self, jobs):
363+
def __init__(self, jobs, jobs_eternal):
331364
"""Upload daemon"""
365+
if jobs_eternal is None:
366+
run_after_upload = None
367+
else:
368+
run_after_upload = jobs_eternal.update_job
332369
super(UploadDaemon, self).__init__(
333370
jobs,
334371
job_trigger_state="parcel",
335-
job_function_name="task_upload_resources")
372+
job_function_name="task_upload_resources",
373+
job_function_kwargs={
374+
"run_after_upload": run_after_upload
375+
}
376+
)
336377

337378

338379
class VerifyDaemon(Daemon):

dcoraid/worker/daemon.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,17 @@
1111

1212

1313
class Daemon(KThread):
14-
def __init__(self, queue, job_trigger_state, job_function_name):
14+
def __init__(self,
15+
jobs,
16+
job_trigger_state: str,
17+
job_function_name: str,
18+
job_function_kwargs: dict = None,
19+
):
1520
"""Daemon base class for running uploads/downloads in the background"""
16-
self.queue = queue
21+
self.jobs = jobs
1722
self.job_trigger_state = job_trigger_state
1823
self.job_function_name = job_function_name
24+
self.job_function_kwargs = job_function_kwargs
1925
super(Daemon, self).__init__()
2026
self.daemon = True # We don't have to worry about ending this thread
2127

@@ -31,7 +37,7 @@ def run(self):
3137
try:
3238
while not self.shutdown_flag.is_set():
3339
# Get the first job that is in the trigger state
34-
for job in self.queue:
40+
for job in self.jobs:
3541
if job.state == self.job_trigger_state:
3642
break
3743
else:
@@ -42,7 +48,10 @@ def run(self):
4248
task = getattr(job, self.job_function_name)
4349
logger = logging.getLogger(__name__)
4450
try:
45-
task()
51+
if self.job_function_kwargs is None:
52+
task()
53+
else:
54+
task(**self.job_function_kwargs)
4655
except ConnectionTimeoutErrors:
4756
# Set the job to the error state for 10s (so the user
4857
# sees it in the UI) and then go back to the initial

tests/test_upload_queue.py

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import pathlib
23
import tempfile
34
import time
@@ -87,6 +88,24 @@ def test_queue_create_dataset_with_resource():
8788
common.wait_for_job(joblist, data["id"])
8889

8990

91+
def test_queue_create_dataset_with_resource_write_etag(tmp_path):
92+
api = common.get_api()
93+
# create some metadata
94+
dataset_dict = common.make_dataset_dict(hint="create-with-resource")
95+
# post dataset creation request
96+
data = dataset_create(dataset_dict=dataset_dict, api=api)
97+
joblist = UploadQueue(api=api,
98+
path_persistent_job_list=tmp_path)
99+
joblist.new_job(dataset_id=data["id"],
100+
paths=[dpath])
101+
pujl = joblist.jobs_eternal
102+
common.wait_for_job(joblist, data["id"])
103+
104+
# Make sure the resource etag is stored in the uploaded job.
105+
uj_d = json.loads((pujl.path_completed / f"{data["id"]}.json").read_text())
106+
assert len(uj_d["upload_job"]["resource_etags"][0]) == 32
107+
108+
90109
def test_queue_find_zombie_caches():
91110
api = common.get_api()
92111
# create some metadata
@@ -245,6 +264,20 @@ def test_persistent_upload_joblist_error_exists():
245264
pujl.immortalize_job(uj)
246265

247266

267+
def test_persistent_upload_joblist_error_exists_done():
268+
"""test things when a job is done"""
269+
api = common.get_api()
270+
td = pathlib.Path(tempfile.mkdtemp(prefix="persistent_uj_list_"))
271+
pujl_path = td / "joblistdir"
272+
task_path = common.make_upload_task()
273+
pujl = PersistentUploadJobList(pujl_path)
274+
uj = load_task(task_path, api=api)
275+
pujl.immortalize_job(uj)
276+
pujl.set_job_done(uj.dataset_id)
277+
with pytest.raises(FileExistsError, match="is already done"):
278+
pujl.immortalize_job(uj)
279+
280+
248281
def test_persistent_upload_joblist_skip_finished_resources():
249282
api = common.get_api()
250283
td = pathlib.Path(tempfile.mkdtemp(prefix="persistent_uj_list_"))
@@ -340,7 +373,7 @@ def test_persistent_upload_joblist_skip_queued_resources():
340373
assert uq.jobs_eternal.num_queued == 1
341374

342375

343-
def test_persistent_upload_joblist_error_exists_done():
376+
def test_persistent_upload_joblist_update():
344377
"""test things when a job is done"""
345378
api = common.get_api()
346379
td = pathlib.Path(tempfile.mkdtemp(prefix="persistent_uj_list_"))
@@ -349,9 +382,20 @@ def test_persistent_upload_joblist_error_exists_done():
349382
pujl = PersistentUploadJobList(pujl_path)
350383
uj = load_task(task_path, api=api)
351384
pujl.immortalize_job(uj)
352-
pujl.set_job_done(uj.dataset_id)
353-
with pytest.raises(FileExistsError, match="is already done"):
354-
pujl.immortalize_job(uj)
385+
386+
uj.etags[0] = etag = str(uuid.uuid4())
387+
pujl.update_job(uj)
388+
389+
assert pujl.job_exists(uj.dataset_id)
390+
assert pujl.is_job_queued(uj.dataset_id)
391+
assert not pujl.is_job_done(uj.dataset_id)
392+
393+
ids = pujl.get_queued_dataset_ids()
394+
assert uj.dataset_id in ids
395+
396+
# make sure the etag was stored
397+
uj_d = json.loads((pujl.path_queued / f"{uj.dataset_id}.json").read_text())
398+
assert uj_d["upload_job"]["resource_etags"][0] == etag
355399

356400

357401
def test_persistent_upload_joblist_warning_dataset_deleted_on_server():

0 commit comments

Comments
 (0)