Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 157 additions & 15 deletions server/mergin/sync/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
#
# SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial
from __future__ import annotations
from contextlib import contextmanager
import json
import logging
import os
import threading
import time
import uuid
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Optional, List, Dict, Set, Tuple
from dataclasses import dataclass, asdict
Expand All @@ -17,11 +19,11 @@
from flask_login import current_user
from pygeodiff import GeoDiff
from sqlalchemy import text, null, desc, nullslast, tuple_
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM, insert
from sqlalchemy.types import String
from sqlalchemy.ext.hybrid import hybrid_property
from pygeodiff.geodifflib import GeoDiffLibError, GeoDiffLibConflictError
from flask import current_app
from flask import Flask, current_app

from .files import (
DeltaChangeMerged,
Expand All @@ -44,7 +46,6 @@
LOG_BASE,
Checkpoint,
generate_checksum,
Toucher,
get_chunk_location,
get_project_path,
is_supported_type,
Expand Down Expand Up @@ -1805,6 +1806,9 @@ class Upload(db.Model):
db.Integer, db.ForeignKey("user.id", ondelete="CASCADE"), nullable=True
)
created = db.Column(db.DateTime, default=datetime.utcnow)
# last ping time to determine if upload is still active
last_ping = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
transaction_id = db.Column(db.String, unique=True, nullable=False, index=True)
Comment thread
varmar05 marked this conversation as resolved.
Outdated

user = db.relationship("User")
project = db.relationship(
Expand All @@ -1822,28 +1826,166 @@ def __init__(self, project: Project, version: int, changes: dict, user_id: int):
self.version = version
self.changes = ChangesSchema().dump(changes)
self.user_id = user_id
self.transaction_id = str(uuid.uuid4())

@property
def upload_dir(self):
return os.path.join(self.project.storage.project_dir, "tmp", self.id)
@classmethod
def create_upload(
cls, project_id: str, version: int, changes: dict, user_id: int
) -> Upload | None:
"""Create upload session, it can either create a new record or handover an existing one but with new transaction id
Old transaction folder is removed and new one is created.
"""
now = datetime.now(timezone.utc)
expiration = current_app.config["LOCKFILE_EXPIRATION"]
new_tx_id = str(uuid.uuid4())

# CTE captures the existing row's transaction_id BEFORE the upsert (pre-statement snapshot)
# NULL in RETURNING means fresh INSERT, non-NULL means we took over a stale upload
existing_cte = (
db.select(Upload.transaction_id)
.where(
Upload.project_id == project_id,
Upload.version == version,
)
.cte("existing")
)

stmt = (
insert(Upload)
.values(
id=str(uuid.uuid4()),
transaction_id=new_tx_id,
project_id=project_id,
version=version,
user_id=user_id,
last_ping=now,
changes=ChangesSchema().dump(changes),
)
.add_cte(existing_cte)
)

upsert_stmt = stmt.on_conflict_do_update(
constraint="uq_upload_project_id",
set_={
"transaction_id": new_tx_id,
"user_id": user_id,
"last_ping": now,
"changes": ChangesSchema().dump(changes),
},
# ONLY update if the existing row is stale
where=(Upload.last_ping < (now - timedelta(seconds=expiration))),
)
Comment thread
varmar05 marked this conversation as resolved.
Outdated

upsert_stmt = upsert_stmt.returning(
Upload,
db.select(existing_cte.c.transaction_id)
.scalar_subquery()
.label("old_transaction_id"),
)

result = db.session.execute(upsert_stmt).fetchone()
db.session.commit()

# if nothing returned, it means the WHERE clause failed (active upload)
if not result:
return

upload = result.Upload
old_transaction_id = result.old_transaction_id
os.makedirs(upload.upload_dir)

# old_transaction_id is NULL on fresh INSERT, set to old UUID when taking over a stale upload
if old_transaction_id:
upload.project.sync_failed(
"", "push_lost", "Push artefact removed by subsequent push", user_id
)
if os.path.exists(
os.path.join(
upload.project.storage.project_dir, "tmp", old_transaction_id
)
):
move_to_tmp(
os.path.join(
upload.project.storage.project_dir, "tmp", old_transaction_id
),
old_transaction_id,
)

Comment thread
varmar05 marked this conversation as resolved.
return upload

@property
def lockfile(self):
return os.path.join(self.upload_dir, "lockfile")
def upload_dir(self):
return os.path.join(
self.project.storage.project_dir, "tmp", self.transaction_id
)

def is_active(self):
Comment thread
varmar05 marked this conversation as resolved.
Outdated
"""Check if upload is still active because there was a ping (lockfile update) from underlying process"""
return os.path.exists(self.lockfile) and (
time.time() - os.path.getmtime(self.lockfile)
< current_app.config["LOCKFILE_EXPIRATION"]
"""Check if upload is still active because there was a ping from underlying process"""
return datetime.now(tz=timezone.utc) < self.last_ping.replace(
tzinfo=timezone.utc
) + timedelta(seconds=current_app.config["LOCKFILE_EXPIRATION"])
Comment thread
varmar05 marked this conversation as resolved.
Outdated

def _heartbeat_task(self, app: Flask, stop_event: threading.Event, timeout: int):
"""
Background task: Runs as a Thread (Sync) or Greenlet (Gevent) based on worker type.
Comment thread
varmar05 marked this conversation as resolved.
Outdated
Uses a fresh engine connection to stay pool-efficient.
"""
# manual context push is required for background execution
with app.app_context():
while not stop_event.is_set():
try:
# db.engine.begin() is efficient and isolated, it immediately returns a connection to the pool
with db.engine.begin() as conn:
conn.execute(
db.text(
"UPDATE upload SET last_ping = NOW() WHERE id = :id"
),
{"id": self.id},
)
except Exception as e:
logging.exception(
f"Upload heartbeat failed for ID {self.project_id} and version {self.version}: {e}"
)

# wait for x seconds, but wake up immediately if stop_event is set
stop_event.wait(timeout)

@contextmanager
def heartbeat(self, timeout: int = 5):
"""
Context manager to be used inside a Flask route.

Example of usage:
-----------------
with upload.heartbeat(interval):
do_something_slow
"""
# we need to pass a real Flask app object to the thread
app = current_app._get_current_object()
stop_event = threading.Event()

bg = threading.Thread(
target=self._heartbeat_task, args=(app, stop_event, timeout), daemon=True
)

bg.start()
try:
yield
finally:
# signal the loop to stop
stop_event.set()

# wait for the task to finish its last SQL call.
# in Gevent, this yields to other requests (non-blocking), while in Sync, this blocks the current thread for up to 2s
# this is to protect main thread / greenlet from zombie bg processes
bg.join(timeout=2)

def clear(self):
"""Clean up pending upload.
Uploaded files and table records are removed, and another upload can start.
"""
try:
move_to_tmp(self.upload_dir, self.id)
move_to_tmp(self.upload_dir, self.transaction_id)
db.session.delete(self)
db.session.commit()
except Exception:
Expand All @@ -1864,7 +2006,7 @@ def process_chunks(
to_remove = [i.path for i in file_changes if i.change == PushChangeType.DELETE]
current_files = [f for f in self.project.files if f.path not in to_remove]

with Toucher(self.lockfile, 5):
with self.heartbeat(5):
for f in file_changes:
if f.change == PushChangeType.DELETE:
continue
Expand Down
7 changes: 3 additions & 4 deletions server/mergin/sync/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,17 +271,16 @@ def check_project_permissions(
return None


def get_upload(transaction_id):
upload = Upload.query.get_or_404(transaction_id)
def get_upload_or_fail(transaction_id: str) -> Upload:
upload = Upload.query.filter_by(transaction_id=transaction_id).first_or_404()
# upload to 'removed' projects is forbidden
if upload.project.removed_at:
abort(404)

if upload.user_id != current_user.id:
abort(403, "You do not have permissions for ongoing upload")

upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", transaction_id)
return upload, upload_dir
return upload


def projects_query(permission, as_admin=True, public=True):
Expand Down
2 changes: 1 addition & 1 deletion server/mergin/sync/public_api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ paths:
- do integrity check comparing uploaded file sizes with what was expected
- move uploaded files to new version dir and applying sync changes (e.g. geodiff apply_changeset)
- bump up version in database
- remove artifacts (chunks, lockfile) by moving them to tmp directory"
- remove artifacts (chunks) by moving them to tmp directory"
operationId: push_finish
parameters:
- name: transaction_id
Expand Down
Loading
Loading