Skip to content

Commit 71bb92f

Browse files
committed
Replace lockfile with upload last_ping db column
This is a refactoring that replaces the filesystem-based Toucher/lockfile mechanism with a database-based heartbeat approach for tracking active uploads. This eliminates the NFS-specific hack (os.access() + os.utime()) that was needed to bust NFS attribute caches in the old approach. The new approach is cleaner and more cloud-native.
1 parent 2166263 commit 71bb92f

8 files changed

Lines changed: 105 additions & 73 deletions

File tree

server/mergin/sync/models.py

Lines changed: 65 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22
#
33
# SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial
44
from __future__ import annotations
5+
from contextlib import contextmanager
56
import json
67
import logging
78
import os
9+
import threading
810
import time
911
import uuid
10-
from datetime import datetime, timedelta
12+
from datetime import datetime, timedelta, timezone
1113
from enum import Enum
1214
from typing import Optional, List, Dict, Set, Tuple
1315
from dataclasses import dataclass, asdict
@@ -21,7 +23,7 @@
2123
from sqlalchemy.types import String
2224
from sqlalchemy.ext.hybrid import hybrid_property
2325
from pygeodiff.geodifflib import GeoDiffLibError, GeoDiffLibConflictError
24-
from flask import current_app
26+
from flask import Flask, current_app
2527

2628
from .files import (
2729
DeltaChangeMerged,
@@ -44,7 +46,6 @@
4446
LOG_BASE,
4547
Checkpoint,
4648
generate_checksum,
47-
Toucher,
4849
get_chunk_location,
4950
get_project_path,
5051
is_supported_type,
@@ -1805,6 +1806,8 @@ class Upload(db.Model):
18051806
db.Integer, db.ForeignKey("user.id", ondelete="CASCADE"), nullable=True
18061807
)
18071808
created = db.Column(db.DateTime, default=datetime.utcnow)
1809+
# last ping time to determine if upload is still active
1810+
last_ping = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
18081811

18091812
user = db.relationship("User")
18101813
project = db.relationship(
@@ -1827,17 +1830,67 @@ def __init__(self, project: Project, version: int, changes: dict, user_id: int):
18271830
def upload_dir(self):
18281831
return os.path.join(self.project.storage.project_dir, "tmp", self.id)
18291832

1830-
@property
1831-
def lockfile(self):
1832-
return os.path.join(self.upload_dir, "lockfile")
1833-
18341833
def is_active(self):
1835-
"""Check if upload is still active because there was a ping (lockfile update) from underlying process"""
1836-
return os.path.exists(self.lockfile) and (
1837-
time.time() - os.path.getmtime(self.lockfile)
1838-
< current_app.config["LOCKFILE_EXPIRATION"]
1834+
"""Check if upload is still active because there was a ping from underlying process"""
1835+
return datetime.now(tz=timezone.utc) < self.last_ping.replace(
1836+
tzinfo=timezone.utc
1837+
) + timedelta(seconds=current_app.config["LOCKFILE_EXPIRATION"])
1838+
1839+
def _heartbeat_task(self, app: Flask, stop_event: threading.Event, timeout: int):
1840+
"""
1841+
Background task: Runs as a Thread (Sync) or Greenlet (Gevent) based on worker type.
1842+
Uses a fresh engine connection to stay pool-efficient.
1843+
"""
1844+
# manual context push is required for background execution
1845+
with app.app_context():
1846+
while not stop_event.is_set():
1847+
try:
1848+
# db.engine.begin() is efficient and isolated, it immediately returns a connection to the pool
1849+
with db.engine.begin() as conn:
1850+
conn.execute(
1851+
db.text(
1852+
"UPDATE upload SET last_ping = NOW() WHERE id = :id"
1853+
),
1854+
{"id": self.id},
1855+
)
1856+
except Exception as e:
1857+
logging.exception(
1858+
f"Upload heartbeat failed for ID {self.project_id} and version {self.version}: {e}"
1859+
)
1860+
1861+
# wait for x seconds, but wake up immediately if stop_event is set
1862+
stop_event.wait(timeout)
1863+
1864+
@contextmanager
1865+
def heartbeat(self, timeout: int = 5):
1866+
"""
1867+
Context manager to be used inside a Flask route.
1868+
1869+
Example of usage:
1870+
-----------------
1871+
with upload.heartbeat(interval):
1872+
do_something_slow
1873+
"""
1874+
# we need to pass a real Flask app object to the thread
1875+
app = current_app._get_current_object()
1876+
stop_event = threading.Event()
1877+
1878+
bg = threading.Thread(
1879+
target=self._heartbeat_task, args=(app, stop_event, timeout), daemon=True
18391880
)
18401881

1882+
bg.start()
1883+
try:
1884+
yield
1885+
finally:
1886+
# signal the loop to stop
1887+
stop_event.set()
1888+
1889+
# wait for the task to finish its last SQL call.
1890+
# in Gevent, this yields to other requests (non-blocking), while in Sync, this blocks the current thread for up to 2s
1891+
# this is to protect main thread / greenlet from zombie bg processes
1892+
bg.join(timeout=2)
1893+
18411894
def clear(self):
18421895
"""Clean up pending upload.
18431896
Uploaded files and table records are removed, and another upload can start.
@@ -1864,7 +1917,7 @@ def process_chunks(
18641917
to_remove = [i.path for i in file_changes if i.change == PushChangeType.DELETE]
18651918
current_files = [f for f in self.project.files if f.path not in to_remove]
18661919

1867-
with Toucher(self.lockfile, 5):
1920+
with self.heartbeat(5):
18681921
for f in file_changes:
18691922
if f.change == PushChangeType.DELETE:
18701923
continue

server/mergin/sync/public_api.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -699,7 +699,7 @@ paths:
699699
- do integrity check comparing uploaded file sizes with what was expected
700700
- move uploaded files to new version dir and applying sync changes (e.g. geodiff apply_changeset)
701701
- bump up version in database
702-
- remove artifacts (chunks, lockfile) by moving them to tmp directory"
702+
- remove artifacts (chunks) by moving them to tmp directory"
703703
operationId: push_finish
704704
parameters:
705705
- name: transaction_id

server/mergin/sync/public_api_controller.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@
7575
)
7676
from .utils import (
7777
generate_checksum,
78-
Toucher,
7978
get_ip,
8079
get_user_agent,
8180
generate_location,
@@ -849,11 +848,10 @@ def project_push(namespace, project_name):
849848
logging.error(f"Failed to create upload session: {str(err)}")
850849
abort(422, "Failed to create upload session. Please try later.")
851850

852-
# Create transaction folder and lockfile
851+
# Create transaction folder
853852
os.makedirs(upload.upload_dir)
854-
open(upload.lockfile, "w").close()
855853

856-
# Update immediately without uploading of new/modified files and remove transaction/lockfile after successful commit
854+
# Update immediately without uploading of new/modified files and remove transaction after successful commit
857855
if not (changes["added"] or changes["updated"]):
858856
next_version = version + 1
859857
file_changes = files_changes_from_upload(
@@ -920,7 +918,7 @@ def chunk_upload(transaction_id, chunk_id):
920918
abort(404)
921919

922920
dest = os.path.join(upload_dir, "chunks", chunk_id)
923-
with Toucher(upload.lockfile, 30):
921+
with upload.heartbeat(30):
924922
try:
925923
# we could have used request.data here, but it could eventually cause OOM issue
926924
save_to_file(request.stream, dest, current_app.config["MAX_CHUNK_SIZE"])
@@ -945,7 +943,7 @@ def push_finish(transaction_id):
945943
- do integrity check comparing uploaded file sizes with what was expected
946944
- move uploaded files to new version dir and applying sync changes (e.g. geodiff apply_changeset)
947945
- bump up version in database
948-
- remove artifacts (chunks, lockfile) by moving them to tmp directory
946+
- remove artifacts (chunks) by moving them to tmp directory
949947
950948
:param transaction_id: Transaction id.
951949
:type transaction_id: str

server/mergin/sync/public_api_v2_controller.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,9 +327,8 @@ def create_project_version(id):
327327
logging.error(f"Failed to create upload session: {str(err)}")
328328
return AnotherUploadRunning().response(409)
329329

330-
# Create transaction folder and lockfile
330+
# Create transaction folder
331331
os.makedirs(upload.upload_dir)
332-
open(upload.lockfile, "w").close()
333332

334333
file_changes, errors = upload.process_chunks(use_shared_chunk_dir=True)
335334
# files consistency or geodiff related issues, project push would never succeed, whole upload is aborted

server/mergin/sync/utils.py

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -57,53 +57,6 @@ def generate_checksum(file, chunk_size=4096):
5757
checksum.update(chunk)
5858

5959

60-
class Toucher:
61-
"""
62-
Helper class to periodically update modification time of file during
63-
execution of longer lasting task.
64-
65-
Example of usage:
66-
-----------------
67-
with Toucher(file, interval):
68-
do_something_slow
69-
70-
"""
71-
72-
def __init__(self, lockfile, interval):
73-
self.lockfile = lockfile
74-
self.interval = interval
75-
self.running = False
76-
self.timer = None
77-
78-
def __enter__(self):
79-
self.acquire()
80-
81-
def __exit__(self, type, value, tb): # pylint: disable=W0612,W0622
82-
self.release()
83-
84-
def release(self):
85-
self.running = False
86-
if self.timer:
87-
self.timer.cancel()
88-
self.timer = None
89-
90-
def acquire(self):
91-
self.running = True
92-
self.touch_lockfile()
93-
94-
def touch_lockfile(self):
95-
# do an NFS ACCESS procedure request to clear the attribute cache (for various pods to actually see the file)
96-
# https://docs.aws.amazon.com/efs/latest/ug/troubleshooting-efs-general.html#custom-nfs-settings-write-delays
97-
os.access(self.lockfile, os.W_OK)
98-
with open(self.lockfile, "a"):
99-
os.utime(self.lockfile, None)
100-
101-
sleep(0) # to unblock greenlet
102-
if self.running:
103-
self.timer = Timer(self.interval, self.touch_lockfile)
104-
self.timer.start()
105-
106-
10760
def is_qgis(path: str) -> bool:
10861
"""
10962
Check if file is a QGIS project file.

server/mergin/tests/test_project_controller.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,7 +1294,6 @@ def create_transaction(username, changes, version=1):
12941294
db.session.commit()
12951295
upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", upload.id)
12961296
os.makedirs(upload_dir)
1297-
open(os.path.join(upload_dir, "lockfile"), "w").close()
12981297
return upload, upload_dir
12991298

13001299

@@ -1320,6 +1319,7 @@ def test_chunk_upload(client, app):
13201319
resp = client.post(url, data=data, headers=headers)
13211320
assert resp.status_code == 200
13221321
assert resp.json["checksum"] == checksum.hexdigest()
1322+
assert os.path.exists(os.path.join(upload_dir, "chunks", chunk_id))
13231323

13241324
# tests to send bigger chunk than allowed
13251325
app.config["MAX_CHUNK_SIZE"] = 10 * CHUNK_SIZE
@@ -1332,6 +1332,8 @@ def test_chunk_upload(client, app):
13321332
failure = SyncFailuresHistory.query.filter_by(project_id=upload.project.id).first()
13331333
assert failure.error_type == "chunk_upload"
13341334
assert failure.error_details == "Too big chunk"
1335+
# residual after upload was removed
1336+
assert not os.path.exists(os.path.join(upload_dir, "chunks", chunk_id))
13351337

13361338
# tests with transaction with no uploads expected
13371339
changes = _get_changes(test_project_dir)
@@ -1342,9 +1344,8 @@ def test_chunk_upload(client, app):
13421344
resp2 = client.post(url, data=data, headers=headers)
13431345
assert resp2.status_code == 404
13441346
assert SyncFailuresHistory.query.count() == 1
1345-
1346-
# cleanup
1347-
shutil.rmtree(upload_dir)
1347+
# we do not have any chunks, so parent dir was removed as well
1348+
assert not os.path.exists(os.path.join(upload_dir))
13481349

13491350

13501351
def upload_chunks(upload_dir, changes, src_dir=test_project_dir):

server/mergin/tests/test_public_api_v2.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1035,7 +1035,6 @@ def test_create_version_failures(client):
10351035
db.session.add(upload)
10361036
db.session.commit()
10371037
os.makedirs(upload.upload_dir)
1038-
open(upload.lockfile, "w").close()
10391038

10401039
response = client.post(f"v2/projects/{project.id}/versions", json=data)
10411040
assert response.status_code == 409
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"""Add last_ping to upload
2+
3+
Revision ID: e3a7f2b1c94d
4+
Revises: 4b4648483770
5+
Create Date: 2026-04-14 00:00:00.000000
6+
7+
"""
8+
9+
from alembic import op
10+
import sqlalchemy as sa
11+
12+
13+
# revision identifiers, used by Alembic.
14+
revision = "e3a7f2b1c94d"
15+
down_revision = "4b4648483770"
16+
branch_labels = None
17+
depends_on = None
18+
19+
20+
def upgrade():
21+
op.add_column("upload", sa.Column("last_ping", sa.DateTime(), nullable=True))
22+
# backfill existing rows before adding NOT NULL constraint
23+
op.execute("UPDATE upload SET last_ping = NOW() WHERE last_ping IS NULL")
24+
op.alter_column("upload", "last_ping", nullable=False)
25+
26+
27+
def downgrade():
28+
# drop the column but required lockfiles will be missing - make sure all uploads are gone
29+
op.drop_column("upload", "last_ping")

0 commit comments

Comments
 (0)