Skip to content

Commit dde191f

Browse files
committed
Rework concurrent upload using upsert strategy
Replace the old try/except IntegrityError + cleanup loop pattern with an atomic upsert in Upload.create_upload(). Decouple the upload directory name from the DB primary key via transaction_id. Upload directory is created at this stage, no need to take care for it later. With the upsert logic, the ObjectDeletedError scenario which happened because a concurrent request could delete a stale upload row mid-operation is eliminated: - During push_finish, the heartbeat context manager continuously updates last_ping, keeping the upload fresh throughout the operation - A concurrent request can only take over an upload whose last_ping has expired - Since heartbeat prevents expiry, no other request can claim the row while push_finish is running - The upload object therefore stays valid for the full lifetime of the request — ObjectDeletedError becomes impossible
1 parent 71bb92f commit dde191f

9 files changed

Lines changed: 308 additions & 251 deletions

server/mergin/sync/models.py

Lines changed: 92 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from flask_login import current_user
2020
from pygeodiff import GeoDiff
2121
from sqlalchemy import text, null, desc, nullslast, tuple_
22-
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM
22+
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM, insert
2323
from sqlalchemy.types import String
2424
from sqlalchemy.ext.hybrid import hybrid_property
2525
from pygeodiff.geodifflib import GeoDiffLibError, GeoDiffLibConflictError
@@ -1808,6 +1808,7 @@ class Upload(db.Model):
18081808
created = db.Column(db.DateTime, default=datetime.utcnow)
18091809
# last ping time to determine if upload is still active
18101810
last_ping = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
1811+
transaction_id = db.Column(db.String, unique=True, nullable=False, index=True)
18111812

18121813
user = db.relationship("User")
18131814
project = db.relationship(
@@ -1825,10 +1826,98 @@ def __init__(self, project: Project, version: int, changes: dict, user_id: int):
18251826
self.version = version
18261827
self.changes = ChangesSchema().dump(changes)
18271828
self.user_id = user_id
1829+
self.transaction_id = str(uuid.uuid4())
1830+
1831+
@classmethod
1832+
def create_upload(
1833+
cls, project_id: str, version: int, changes: dict, user_id: int
1834+
) -> Upload | None:
1835+
"""Create upload session, it can either create a new record or handover an existing one but with new transaction id
1836+
Old transaction folder is removed and new one is created.
1837+
"""
1838+
now = datetime.now(timezone.utc)
1839+
expiration = current_app.config["LOCKFILE_EXPIRATION"]
1840+
new_tx_id = str(uuid.uuid4())
1841+
1842+
# CTE captures the existing row's transaction_id BEFORE the upsert (pre-statement snapshot)
1843+
# NULL in RETURNING means fresh INSERT, non-NULL means we took over a stale upload
1844+
existing_cte = (
1845+
db.select(Upload.transaction_id)
1846+
.where(
1847+
Upload.project_id == project_id,
1848+
Upload.version == version,
1849+
)
1850+
.cte("existing")
1851+
)
1852+
1853+
stmt = (
1854+
insert(Upload)
1855+
.values(
1856+
id=str(uuid.uuid4()),
1857+
transaction_id=new_tx_id,
1858+
project_id=project_id,
1859+
version=version,
1860+
user_id=user_id,
1861+
last_ping=now,
1862+
changes=ChangesSchema().dump(changes),
1863+
)
1864+
.add_cte(existing_cte)
1865+
)
1866+
1867+
upsert_stmt = stmt.on_conflict_do_update(
1868+
constraint="uq_upload_project_id",
1869+
set_={
1870+
"transaction_id": new_tx_id,
1871+
"user_id": user_id,
1872+
"last_ping": now,
1873+
"changes": ChangesSchema().dump(changes),
1874+
},
1875+
# ONLY update if the existing row is stale
1876+
where=(Upload.last_ping < (now - timedelta(seconds=expiration))),
1877+
)
1878+
1879+
upsert_stmt = upsert_stmt.returning(
1880+
Upload,
1881+
db.select(existing_cte.c.transaction_id)
1882+
.scalar_subquery()
1883+
.label("old_transaction_id"),
1884+
)
1885+
1886+
result = db.session.execute(upsert_stmt).fetchone()
1887+
db.session.commit()
1888+
1889+
# if nothing returned, it means the WHERE clause failed (active upload)
1890+
if not result:
1891+
return
1892+
1893+
upload = result.Upload
1894+
old_transaction_id = result.old_transaction_id
1895+
os.makedirs(upload.upload_dir)
1896+
1897+
# old_transaction_id is NULL on fresh INSERT, set to old UUID when taking over a stale upload
1898+
if old_transaction_id:
1899+
upload.project.sync_failed(
1900+
"", "push_lost", "Push artefact removed by subsequent push", user_id
1901+
)
1902+
if os.path.exists(
1903+
os.path.join(
1904+
upload.project.storage.project_dir, "tmp", old_transaction_id
1905+
)
1906+
):
1907+
move_to_tmp(
1908+
os.path.join(
1909+
upload.project.storage.project_dir, "tmp", old_transaction_id
1910+
),
1911+
old_transaction_id,
1912+
)
1913+
1914+
return upload
18281915

18291916
@property
18301917
def upload_dir(self):
1831-
return os.path.join(self.project.storage.project_dir, "tmp", self.id)
1918+
return os.path.join(
1919+
self.project.storage.project_dir, "tmp", self.transaction_id
1920+
)
18321921

18331922
def is_active(self):
18341923
"""Check if upload is still active because there was a ping from underlying process"""
@@ -1896,7 +1985,7 @@ def clear(self):
18961985
Uploaded files and table records are removed, and another upload can start.
18971986
"""
18981987
try:
1899-
move_to_tmp(self.upload_dir, self.id)
1988+
move_to_tmp(self.upload_dir, self.transaction_id)
19001989
db.session.delete(self)
19011990
db.session.commit()
19021991
except Exception:

server/mergin/sync/permissions.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -271,17 +271,16 @@ def check_project_permissions(
271271
return None
272272

273273

274-
def get_upload(transaction_id):
275-
upload = Upload.query.get_or_404(transaction_id)
274+
def get_upload_or_fail(transaction_id: str) -> Upload:
275+
upload = Upload.query.filter_by(transaction_id=transaction_id).first_or_404()
276276
# upload to 'removed' projects is forbidden
277277
if upload.project.removed_at:
278278
abort(404)
279279

280280
if upload.user_id != current_user.id:
281281
abort(403, "You do not have permissions for ongoing upload")
282282

283-
upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", transaction_id)
284-
return upload, upload_dir
283+
return upload
285284

286285

287286
def projects_query(permission, as_admin=True, public=True):

server/mergin/sync/public_api_controller.py

Lines changed: 44 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from pygeodiff import GeoDiffLibError
2626
from flask_login import current_user
2727
from sqlalchemy import and_, desc, asc
28-
from sqlalchemy.exc import IntegrityError
28+
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
2929
from gevent import sleep
3030
import base64
3131
from werkzeug.exceptions import HTTPException, Conflict
@@ -70,7 +70,7 @@
7070
require_project,
7171
projects_query,
7272
ProjectPermissions,
73-
get_upload,
73+
get_upload_or_fail,
7474
require_project_by_uuid,
7575
)
7676
from .utils import (
@@ -774,13 +774,6 @@ def project_push(namespace, project_name):
774774
if all(len(changes[key]) == 0 for key in changes.keys()):
775775
abort(400, "No changes")
776776

777-
# reject upload early if there is another one already running
778-
pending_upload = Upload.query.filter_by(
779-
project_id=project.id, version=version
780-
).first()
781-
if pending_upload and pending_upload.is_active():
782-
abort(400, "Another process is running. Please try later.")
783-
784777
try:
785778
ChangesSchema().validate(changes)
786779
upload_changes = ChangesSchema().dump(changes)
@@ -812,44 +805,20 @@ def project_push(namespace, project_name):
812805
if requested_storage > ws.storage:
813806
return StorageLimitHit(current_usage, ws.storage).response(422)
814807

815-
upload = Upload(project, version, upload_changes, current_user.id)
816-
db.session.add(upload)
817808
try:
818-
# Creating upload transaction with different project's version is possible.
819-
db.session.commit()
809+
upload = Upload.create_upload(
810+
project.id, version, upload_changes, current_user.id
811+
)
812+
if not upload:
813+
abort(400, "Another process is running. Please try later.")
814+
820815
logging.info(
821-
f"Upload transaction {upload.id} created for project: {project.id}, version: {version}"
816+
f"Upload transaction {upload.transaction_id} created for project: {project.id}, version: {version}"
822817
)
823-
except IntegrityError:
818+
except (IntegrityError, SQLAlchemyError) as err:
824819
db.session.rollback()
825-
# check and clean dangling uploads or abort
826-
for current_upload in project.uploads.all():
827-
if current_upload.is_active():
828-
abort(400, "Another process is running. Please try later.")
829-
db.session.delete(current_upload)
830-
db.session.commit()
831-
# previous push attempt is definitely lost
832-
project.sync_failed(
833-
"",
834-
"push_lost",
835-
"Push artefact removed by subsequent push",
836-
current_user.id,
837-
)
838-
839-
# Try again after cleanup
840-
db.session.add(upload)
841-
try:
842-
db.session.commit()
843-
logging.info(
844-
f"Upload transaction {upload.id} created for project: {project.id}, version: {version}"
845-
)
846-
move_to_tmp(upload.upload_dir)
847-
except IntegrityError as err:
848-
logging.error(f"Failed to create upload session: {str(err)}")
849-
abort(422, "Failed to create upload session. Please try later.")
850-
851-
# Create transaction folder
852-
os.makedirs(upload.upload_dir)
820+
logging.exception(f"Failed to create upload: {str(err)}")
821+
abort(422, "Failed to create upload session. Please try later.")
853822

854823
# Update immediately without uploading of new/modified files and remove transaction after successful commit
855824
if not (changes["added"] or changes["updated"]):
@@ -874,15 +843,15 @@ def project_push(namespace, project_name):
874843
db.session.commit()
875844
logging.info(
876845
f"A project version {ProjectVersion.to_v_name(next_version)} for project: {project.id} created. "
877-
f"Transaction id: {upload.id}. No upload."
846+
f"Transaction id: {upload.transaction_id}. No upload."
878847
)
879848
project_version_created.send(pv)
880849
push_finished.send(pv)
881850
return jsonify(ProjectSchema().dump(project)), 200
882851
except IntegrityError as err:
883852
db.session.rollback()
884853
logging.exception(
885-
f"Failed to upload a new project version using transaction id: {upload.id}: {str(err)}"
854+
f"Failed to upload a new project version using transaction id: {upload.transaction_id}: {str(err)}"
886855
)
887856
abort(422, "Failed to upload a new project version. Please try later.")
888857
except gevent.timeout.Timeout:
@@ -891,7 +860,7 @@ def project_push(namespace, project_name):
891860
finally:
892861
upload.clear()
893862

894-
return {"transaction": upload.id}, 200
863+
return {"transaction": upload.transaction_id}, 200
895864

896865

897866
@auth_required
@@ -908,7 +877,7 @@ def chunk_upload(transaction_id, chunk_id):
908877
909878
:rtype: Dict
910879
"""
911-
upload, upload_dir = get_upload(transaction_id)
880+
upload = get_upload_or_fail(transaction_id)
912881
request.view_args["project"] = upload.project
913882
chunks = []
914883
for file in upload.changes["added"] + upload.changes["updated"]:
@@ -917,7 +886,7 @@ def chunk_upload(transaction_id, chunk_id):
917886
if chunk_id not in chunks:
918887
abort(404)
919888

920-
dest = os.path.join(upload_dir, "chunks", chunk_id)
889+
dest = os.path.join(upload.upload_dir, "chunks", chunk_id)
921890
with upload.heartbeat(30):
922891
try:
923892
# we could have used request.data here, but it could eventually cause OOM issue
@@ -950,7 +919,7 @@ def push_finish(transaction_id):
950919
951920
:rtype: None
952921
"""
953-
upload, upload_dir = get_upload(transaction_id)
922+
upload = get_upload_or_fail(transaction_id)
954923
request.view_args["project"] = upload.project
955924
project = upload.project
956925
next_version = project.next_version()
@@ -989,7 +958,7 @@ def push_finish(transaction_id):
989958

990959
abort(422, f"Failed to create new version: {msg}")
991960

992-
files_dir = os.path.join(upload_dir, "files", v_next_version)
961+
files_dir = os.path.join(upload.upload_dir, "files", v_next_version)
993962
target_dir = os.path.join(project.storage.project_dir, v_next_version)
994963
if os.path.exists(target_dir):
995964
pv = ProjectVersion.query.filter_by(
@@ -1007,29 +976,31 @@ def push_finish(transaction_id):
1007976
move_to_tmp(target_dir)
1008977

1009978
try:
1010-
user_agent = get_user_agent(request)
1011-
device_id = get_device_id(request)
1012-
pv = ProjectVersion(
1013-
project,
1014-
next_version,
1015-
current_user.id,
1016-
file_changes,
1017-
get_ip(request),
1018-
user_agent,
1019-
device_id,
1020-
)
1021-
db.session.add(pv)
1022-
db.session.add(project)
1023-
db.session.commit()
979+
# let's keep upload alive until all work is done so no one else can claim it
980+
with upload.heartbeat(5):
981+
user_agent = get_user_agent(request)
982+
device_id = get_device_id(request)
983+
pv = ProjectVersion(
984+
project,
985+
next_version,
986+
current_user.id,
987+
file_changes,
988+
get_ip(request),
989+
user_agent,
990+
device_id,
991+
)
992+
db.session.add(pv)
993+
db.session.add(project)
994+
db.session.commit()
1024995

1025-
# let's move uploaded files where they are expected to be
1026-
os.renames(files_dir, version_dir)
996+
# let's move uploaded files where they are expected to be
997+
os.renames(files_dir, version_dir)
1027998

1028-
logging.info(
1029-
f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}."
1030-
)
1031-
project_version_created.send(pv)
1032-
push_finished.send(pv)
999+
logging.info(
1000+
f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}."
1001+
)
1002+
project_version_created.send(pv)
1003+
push_finished.send(pv)
10331004
except (psycopg2.Error, FileNotFoundError, IntegrityError) as err:
10341005
db.session.rollback()
10351006
logging.exception(
@@ -1059,10 +1030,8 @@ def push_cancel(transaction_id):
10591030
10601031
:rtype: None
10611032
"""
1062-
upload, upload_dir = get_upload(transaction_id)
1063-
db.session.delete(upload)
1064-
db.session.commit()
1065-
move_to_tmp(upload_dir)
1033+
upload = get_upload_or_fail(transaction_id)
1034+
upload.clear()
10661035
return NoContent, 200
10671036

10681037

0 commit comments

Comments
 (0)