Skip to content

Commit 4a93a5d

Browse files
authored
Merge pull request #188 from fairagro/feature/additional_end_to_end_tests
feat: add harvest statistics tracking and update logic for ARC submis…
2 parents d41c148 + 186db80 commit 4a93a5d

14 files changed

Lines changed: 747 additions & 41 deletions

File tree

middleware/api/src/middleware/api/business_logic/arc_manager.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,13 @@ async def create_or_update_arc(
101101
has_changes = doc_result.has_changes
102102
should_trigger_git = is_new or has_changes
103103

104+
if harvest_id:
105+
await self._doc_store.increment_harvest_statistics(
106+
harvest_id,
107+
is_new=is_new,
108+
has_changes=has_changes,
109+
)
110+
104111
logger.info(
105112
"[%s] Stored ARC %s in CouchDB: is_new=%s, has_changes=%s, trigger_git=%s",
106113
client_id,

middleware/api/src/middleware/api/business_logic/harvest_manager.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from middleware.api.business_logic.config import HarvestConfig
77
from middleware.api.business_logic.exceptions import AccessDeniedError, ResourceNotFoundError
88
from middleware.api.document_store import DocumentStore
9-
from middleware.api.document_store.harvest_document import HarvestDocument
9+
from middleware.api.document_store.harvest_document import HarvestDocument, HarvestStatistics
1010
from middleware.shared.api_models.common.models import HarvestStatus
1111

1212
logger = logging.getLogger(__name__)
@@ -85,12 +85,8 @@ async def complete_harvest(
8585
)
8686
raise AccessDeniedError(f"Harvest {harvest_id} does not belong to client {client_id}")
8787

88-
# Calculate statistics server-side from stored ARCs
89-
statistics = await self._doc_store.get_harvest_statistics(harvest_id)
90-
91-
# Preserve expected_datasets if already set
92-
if harvest.statistics and harvest.statistics.expected_datasets is not None:
93-
statistics.expected_datasets = harvest.statistics.expected_datasets
88+
# Statistics are maintained incrementally during ARC submission.
89+
statistics = harvest.statistics or HarvestStatistics()
9490

9591
updates: dict[str, Any] = {
9692
"status": HarvestStatus.COMPLETED,

middleware/api/src/middleware/api/document_store/__init__.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,26 @@ async def update_harvest(self, harvest_id: str, updates: dict[str, Any]) -> Harv
157157
"""
158158
raise NotImplementedError
159159

160+
@abstractmethod
161+
async def increment_harvest_statistics(
162+
self,
163+
harvest_id: str,
164+
*,
165+
is_new: bool,
166+
has_changes: bool,
167+
) -> None:
168+
"""Atomically increment harvest counters for one submitted ARC.
169+
170+
Implementations must be safe under concurrent updates (e.g. optimistic
171+
concurrency with retry on revision conflicts).
172+
173+
Args:
174+
harvest_id: Harvest identifier.
175+
is_new: Whether the ARC was newly created.
176+
has_changes: Whether an existing ARC changed.
177+
"""
178+
raise NotImplementedError
179+
160180
@abstractmethod
161181
async def list_harvests(
162182
self,

middleware/api/src/middleware/api/document_store/config.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,15 @@ class CouchDBConfig(BaseModel):
2222
description="Default maximum number of documents returned by a Mango query",
2323
),
2424
] = 100
25+
harvest_stats_max_retries: Annotated[
26+
int,
27+
Field(
28+
default=5,
29+
ge=1,
30+
le=100,
31+
description="Maximum retry attempts for atomic harvest statistics updates on CouchDB revision conflicts",
32+
),
33+
] = 5
2534

2635
@field_validator("url")
2736
@classmethod

middleware/api/src/middleware/api/document_store/couchdb.py

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""CouchDB implementation of DocumentStore."""
22

3+
import asyncio
34
import hashlib
45
import json
56
import logging
@@ -8,7 +9,7 @@
89
from typing import Any
910

1011
from middleware.api.document_store.config import CouchDBConfig
11-
from middleware.api.document_store.couchdb_client import CouchDBClient
12+
from middleware.api.document_store.couchdb_client import CouchDBClient, DocumentConflictError
1213
from middleware.api.utils import calculate_arc_id, extract_identifier
1314
from middleware.shared.api_models.common.models import ArcEventType, ArcLifecycleStatus, HarvestStatus
1415

@@ -265,6 +266,60 @@ async def update_harvest(self, harvest_id: str, updates: dict[str, Any]) -> Harv
265266
await self._client.save_document(harvest_id, doc_data)
266267
return doc
267268

269+
async def increment_harvest_statistics(
270+
self,
271+
harvest_id: str,
272+
*,
273+
is_new: bool,
274+
has_changes: bool,
275+
) -> None:
276+
"""Atomically increment harvest counters with optimistic-concurrency retry."""
277+
max_retries = self._config.harvest_stats_max_retries
278+
279+
for attempt in range(1, max_retries + 1):
280+
doc_dict = await self._client.get_document(harvest_id)
281+
if not doc_dict:
282+
logger.warning("Harvest %s not found while incrementing statistics", harvest_id)
283+
return
284+
285+
harvest_doc = HarvestDocument.model_validate(doc_dict)
286+
if not harvest_doc.doc_rev:
287+
raise RuntimeError(f"Harvest {harvest_id} has no _rev; cannot apply atomic update")
288+
289+
stats = harvest_doc.statistics or HarvestStatistics()
290+
stats.arcs_submitted += 1
291+
292+
if is_new:
293+
stats.arcs_new += 1
294+
elif has_changes:
295+
stats.arcs_updated += 1
296+
else:
297+
stats.arcs_unchanged += 1
298+
299+
harvest_doc.statistics = stats
300+
payload = harvest_doc.model_dump(mode="json", by_alias=True, exclude_none=True)
301+
302+
try:
303+
await self._client.save_document_if_revision_matches(
304+
harvest_id,
305+
payload,
306+
expected_rev=harvest_doc.doc_rev,
307+
)
308+
return
309+
except DocumentConflictError:
310+
logger.debug(
311+
"Conflict incrementing harvest statistics for %s (attempt %d/%d)",
312+
harvest_id,
313+
attempt,
314+
max_retries,
315+
)
316+
await asyncio.sleep(0.1) # Add a small delay before retrying
317+
318+
raise RuntimeError(
319+
f"Failed to increment harvest statistics for {harvest_id} after "
320+
f"{max_retries} retries due to revision conflicts"
321+
)
322+
268323
async def list_harvests(
269324
self,
270325
rdi: str | None = None,
@@ -281,11 +336,8 @@ async def list_harvests(
281336

282337
async def get_harvest_statistics(self, harvest_id: str) -> HarvestStatistics:
283338
"""Calculate and return statistics for a specific harvest run."""
284-
# Fetch only the fields we need: event log and document type.
285-
# Excluding arc_content avoids loading potentially large RO-Crate JSON.
286-
projection_fields = ["_id", "type", "metadata.events", "metadata.last_harvest_id"]
287339
selector = {"type": "arc", "metadata.last_harvest_id": harvest_id}
288-
docs = await self._client.find(selector, fields=projection_fields)
340+
docs = await self._client.find_projected(selector, fields=["metadata.events"])
289341

290342
stats = HarvestStatistics()
291343
stats.arcs_submitted = len(docs)

middleware/api/src/middleware/api/document_store/couchdb_client.py

Lines changed: 117 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import logging
77
from http import HTTPStatus
88
from typing import Any, Self
9+
from urllib.parse import quote
910

1011
import aiohttp
1112
from aiocouch import CouchDB, Database
@@ -16,6 +17,10 @@
1617
logger = logging.getLogger(__name__)
1718

1819

20+
class DocumentConflictError(RuntimeError):
21+
"""Raised when a document update conflicts with a newer CouchDB revision."""
22+
23+
1924
class CouchDBClient:
2025
"""Async CouchDB client wrapper."""
2126

@@ -222,7 +227,6 @@ async def find(
222227
selector: dict[str, Any],
223228
limit: int | None = None,
224229
skip: int = 0,
225-
fields: list[str] | None = None,
226230
) -> list[dict[str, Any]]:
227231
"""Find documents using a Mango query selector.
228232
@@ -231,9 +235,6 @@ async def find(
231235
limit: Maximum number of results to return per call.
232236
Defaults to the instance's ``default_query_limit``.
233237
skip: Number of results to skip (for pagination)
234-
fields: Optional list of fields to include in results (projection).
235-
When set, ``arc_content`` and other large fields can be
236-
excluded to reduce memory and network usage.
237238
238239
Returns:
239240
List of matching documents
@@ -242,12 +243,7 @@ async def find(
242243
raise RuntimeError("Not connected to CouchDB")
243244

244245
effective_limit = limit if limit is not None else self._default_query_limit
245-
# aiocouch's find passes extra kwargs through to the _find body.
246-
kwargs: dict[str, Any] = {"limit": effective_limit, "skip": skip}
247-
if fields is not None:
248-
kwargs["fields"] = fields
249-
250-
result = self._db.find(selector, **kwargs)
246+
result = self._db.find(selector, limit=effective_limit, skip=skip)
251247
docs = [dict(doc) async for doc in result]
252248

253249
if len(docs) == effective_limit:
@@ -260,6 +256,117 @@ async def find(
260256

261257
return docs
262258

259+
async def find_projected(
260+
self,
261+
selector: dict[str, Any],
262+
fields: list[str],
263+
limit: int | None = None,
264+
skip: int = 0,
265+
) -> list[dict[str, Any]]:
266+
"""Find documents using CouchDB _find with explicit field projection.
267+
268+
This method uses the raw HTTP endpoint because aiocouch's ``Database.find``
269+
returns full ``Document`` objects and therefore does not support the
270+
``fields`` parameter.
271+
272+
Args:
273+
selector: Mango query selector.
274+
fields: List of fields to return (CouchDB ``fields`` projection).
275+
limit: Maximum number of results to return per call.
276+
Defaults to the instance's ``default_query_limit``.
277+
skip: Number of results to skip (for pagination).
278+
279+
Returns:
280+
List of projected documents.
281+
"""
282+
if not self._db:
283+
raise RuntimeError("Not connected to CouchDB")
284+
if not self._db_name:
285+
raise RuntimeError("Database name is not set")
286+
287+
effective_limit = limit if limit is not None else self._default_query_limit
288+
289+
payload: dict[str, Any] = {
290+
"selector": selector,
291+
"fields": fields,
292+
"limit": effective_limit,
293+
"skip": skip,
294+
}
295+
296+
url = f"{self._url}/{self._db_name}/_find"
297+
session = self._get_session()
298+
async with session.post(url, json=payload) as resp:
299+
if resp.status != HTTPStatus.OK:
300+
text = await resp.text()
301+
logger.error("CouchDB _find with projection failed: %s", text)
302+
raise RuntimeError(f"CouchDB _find failed with status {resp.status}: {text}")
303+
304+
response_data = await resp.json()
305+
306+
docs_raw = response_data.get("docs", [])
307+
docs: list[dict[str, Any]] = [dict(doc) for doc in docs_raw]
308+
309+
if len(docs) == effective_limit:
310+
logger.warning(
311+
"CouchDB find_projected() returned exactly %d documents for selector %s — "
312+
"results may be silently truncated. Use skip/limit for pagination.",
313+
effective_limit,
314+
selector,
315+
)
316+
317+
return docs
318+
319+
async def save_document_if_revision_matches(
320+
self,
321+
doc_id: str,
322+
data: dict[str, Any],
323+
*,
324+
expected_rev: str,
325+
) -> dict[str, Any]:
326+
"""Save a document only if the expected revision still matches.
327+
328+
Uses raw ``PUT /{db}/{docid}`` to allow optimistic-concurrency handling
329+
in higher layers (retry on 409 Conflict).
330+
331+
Args:
332+
doc_id: Document ID.
333+
data: Complete document payload to save.
334+
expected_rev: Revision expected by the caller.
335+
336+
Returns:
337+
Saved document payload including updated ``_rev``.
338+
339+
Raises:
340+
DocumentConflictError: If CouchDB returns 409 conflict.
341+
RuntimeError: For non-success HTTP errors.
342+
"""
343+
if not self._db:
344+
raise RuntimeError("Not connected to CouchDB")
345+
if not self._db_name:
346+
raise RuntimeError("Database name is not set")
347+
348+
payload = dict(data)
349+
payload["_id"] = doc_id
350+
payload["_rev"] = expected_rev
351+
352+
encoded_doc_id = quote(doc_id, safe="")
353+
url = f"{self._url}/{self._db_name}/{encoded_doc_id}"
354+
session = self._get_session()
355+
356+
async with session.put(url, json=payload) as resp:
357+
if resp.status in {HTTPStatus.CREATED, HTTPStatus.ACCEPTED, HTTPStatus.OK}:
358+
response_data = await resp.json()
359+
new_rev = response_data.get("rev")
360+
if isinstance(new_rev, str):
361+
payload["_rev"] = new_rev
362+
return payload
363+
364+
if resp.status == HTTPStatus.CONFLICT:
365+
raise DocumentConflictError(f"Conflict updating document {doc_id}")
366+
367+
text = await resp.text()
368+
raise RuntimeError(f"Failed to update document {doc_id}: {resp.status} {text}")
369+
263370
def _get_session(self) -> aiohttp.ClientSession:
264371
"""Return the shared aiohttp session, creating it on first call."""
265372
if self._session is None:

middleware/api/tests/system_external/conftest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ def config(
9292
"log_level": "DEBUG",
9393
"known_rdis": list(known_rdis),
9494
"client_auth_oid": oid.dotted_string,
95+
"otel": {
96+
"endpoint": None,
97+
},
9598
"require_client_cert": False,
9699
"gitlab_api": {
97100
"url": "https://datahub-dev.ipk-gatersleben.de",
@@ -186,6 +189,9 @@ def worker_process(
186189
"""
187190
gitlab_token = os.getenv("GITLAB_API_TOKEN", "")
188191
worker_cfg: dict[str, Any] = {
192+
"otel": {
193+
"endpoint": None,
194+
},
189195
"couchdb": {
190196
"url": external_services["couchdb_url"],
191197
"user": external_services["couchdb_user"],

0 commit comments

Comments
 (0)