Skip to content

Commit ecbc970

Browse files
authored
Merge pull request #237 from fairagro/feature/review_harvesting_api
Feature/review harvesting api
2 parents 69f96cd + 6f35911 commit ecbc970

26 files changed

Lines changed: 883 additions & 368 deletions

File tree

.agents/skills/create-specifica-feature/SKILL.md

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,19 @@ condition and the expected output or side-effect.
7676
- One behaviour per checkbox — if you need "and" it is two requirements.
7777
- State the outcome, not the implementation (`→ return 404` not `→ use Flask abort()`).
7878
- Every edge case ends with a concrete outcome — no open-ended statements.
79-
- **Avoid naming concrete classes, methods, or modules in `spec.md`** unless it
80-
is a deliberate act. Naming a class in a requirement couples that requirement to
81-
a design decision — which is fine when the decision is intentional and should be
82-
locked in by the spec, but must be recognised as such. Class names, method names,
83-
and module paths that are not yet decided belong in `design.md`, not `spec.md`.
84-
*Exception:* simple classes whose name fully describes their behaviour — in
85-
particular exception classes (e.g. `InvalidJsonSemanticError`) — may be
86-
referenced freely in `spec.md` without implying a design constraint.
79+
- **`spec.md` must not reference function names, method names, or variable names
80+
unless that element was previously explicitly specified earlier in the same spec.**
81+
Naming an implementation detail in a requirement couples it to a design decision —
82+
such decisions belong in `design.md`. Class names and module paths that are not yet
83+
decided also belong in `design.md`, not `spec.md`.
84+
*Exemptions:*
85+
- Exception classes whose name fully describes their behaviour (e.g.
86+
`InvalidJsonSemanticError`, `DocumentConflictError`) may be referenced freely.
87+
- String or numeric literals that belong to an external protocol (e.g. CouchDB's
88+
`_rev`, `_id`) are not implementation names and may be used freely.
89+
- Domain-level identifiers that also appear as code names (e.g. `arc_id`,
90+
`harvest_id`) may be used when they function as domain terms rather than as
91+
references to a specific code symbol.
8792

8893
---
8994

middleware/api/spec/document-store/spec.md

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ prefix, not by separate databases.
2222
exist if they do not yet exist.
2323
- [ ] Handle `412 Precondition Failed` (database already exists) as a success
2424
during initialization — parallel service startups must not cause crashes.
25-
- [ ] Store ARC documents keyed by `arc_id`; return `is_new` and `has_changes`
26-
flags based on content hash comparison.
25+
- [ ] Store ARC documents keyed by `arc_id`; return flags indicating whether the
26+
document was newly created and whether its content changed, based on content
27+
hash comparison.
2728
- [ ] Support harvest run lifecycle operations: create, retrieve, increment
2829
statistics counters, and finalize a harvest run.
2930
- [ ] Append event records to an ARC document's event log.
@@ -34,7 +35,13 @@ prefix, not by separate databases.
3435
Parallel service startup (two containers connecting simultaneously) → both attempt
3536
database creation; `412 Precondition Failed` is treated as success, not an error.
3637

37-
ARC document already exists with identical content → `has_changes = False`; no
38-
CouchDB write performed for the body; only timestamp fields may be updated.
38+
ARC document already exists with identical content → the content-changed flag is
39+
`false`; no CouchDB write performed for the body; only timestamp fields may be updated.
3940

40-
`get_harvest` for unknown ID → returns `None`; callers raise `ResourceNotFoundError`.
41+
Concurrent writes to the same ARC document (e.g. two harvest workers submitting the
42+
same ARC simultaneously) → the store strips the stale `_rev` from the payload,
43+
re-fetches the current revision on each attempt, and retries up to a configurable
44+
maximum (default 3) on `ConflictError` before raising `DocumentConflictError`.
45+
46+
Fetching a harvest by an unknown ID → the store returns nothing; callers raise
47+
`ResourceNotFoundError`.

middleware/api/spec/harvest-manager/design.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ API endpoint
1111
└─→ HarvestManager
1212
├─→ DocumentStore.create_harvest
1313
├─→ DocumentStore.get_harvest
14+
├─→ DocumentStore.update_harvest (transition_harvest)
1415
├─→ DocumentStore.increment_harvest_statistics
1516
└─→ DocumentStore.finalize_harvest
1617
```
@@ -32,3 +33,26 @@ API endpoint
3233
`HarvestConfig` (a Pydantic model). Application code reads them from the
3334
config object rather than hardcoding values, making them overridable via
3435
environment variables or YAML without a code change.
36+
37+
4. **`transition_harvest` accepts a pre-fetched `HarvestDocument`, not a `harvest_id`**
38+
— The router always fetches the harvest before calling into the service layer
39+
(for RDI auth and existence checks). Passing the already-fetched document
40+
avoids a redundant round-trip to CouchDB and removes the dead
41+
`pre_fetched or await get_harvest()` fallback path. Responsibility for
42+
handling a missing harvest therefore stays in the router (HTTP 404), while
43+
`transition_harvest` focuses solely on ownership validation, the RUNNING
44+
guard, and the DB write.
45+
46+
5. **Single `transition_harvest` replaces separate `complete_harvest` / `cancel_harvest` / `fail_harvest` methods**
47+
— All three terminal transitions share the same shape: ownership check →
48+
RUNNING guard → `DocumentStore.update_harvest`. A single generic method
49+
parameterised over `target_status` avoids duplicating that logic three
50+
times. The only special case is `COMPLETED`: it also persists the current
51+
statistics snapshot (same behaviour as the old `complete_harvest`).
52+
53+
6. **`PATCH /v3/harvests/{harvest_id}` as the canonical state-transition endpoint**
54+
— A single PATCH endpoint with `{"status": "..."}` in the body covers all
55+
terminal transitions. The legacy `DELETE /{harvest_id}` (cancel) and
56+
`POST /{harvest_id}/complete` endpoints are kept for backward compatibility
57+
but are not the preferred path for new clients. The API client's
58+
`cancel_harvest()` and `fail_harvest()` methods both call PATCH internally.

middleware/api/spec/harvest-manager/spec.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ finalization — and enforce client ownership of harvest resources.
1515
- [ ] Increment per-harvest statistics (new-ARC counter and changed-ARC counter)
1616
via `increment_harvest_statistics` for each processed ARC.
1717
- [ ] Finalize a harvest run, marking it complete and recording the final statistics.
18+
- [ ] Transition a harvest run to a target terminal status (`COMPLETED`, `CANCELLED`,
19+
or `FAILED`) via an explicit state-transition operation.
20+
- [ ] Enforce the transition guard: only a `RUNNING` harvest may transition to a
21+
terminal status; raise `ConflictError` otherwise.
1822

1923
## Edge Cases
2024

@@ -25,3 +29,5 @@ finalization — and enforce client ownership of harvest resources.
2529
`expected_datasets` not provided → harvest document is created without a progress denominator; progress reporting shows raw counts only.
2630

2731
Finalize called before all ARCs arrive → harvest is marked complete with whatever statistics are current; no enforcement of `expected_datasets`.
32+
33+
Transition called on a non-`RUNNING` harvest → raise `ConflictError` with the current status in the message.

middleware/api/src/middleware/api/api/fastapi_app.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
__version__ = "0.0.0"
5959

6060

61-
loaded_config = None
6261
if "pytest" in sys.modules:
6362
# pytest is executing this file during a test discovery run.
6463
# No config file is available, so we create a dummy config so that pytest does not fail.
@@ -88,9 +87,27 @@
8887
sys.exit(1)
8988

9089
logging.basicConfig(
91-
level=getattr(logging, loaded_config.log_level), format="%(asctime)s %(levelname)s %(name)s: %(message)s"
90+
level=getattr(logging, loaded_config.log_level),
91+
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
92+
force=True,
9293
)
9394

95+
96+
def _configure_uvicorn_loggers(log_level: str) -> None:
97+
"""Ensure uvicorn loggers use the same format as the middleware logs."""
98+
root_handlers = logging.root.handlers[:]
99+
if not root_handlers:
100+
return
101+
102+
for logger_name in ("uvicorn.access", "uvicorn.error"):
103+
uvicorn_logger = logging.getLogger(logger_name)
104+
uvicorn_logger.handlers = root_handlers[:]
105+
uvicorn_logger.setLevel(getattr(logging, log_level))
106+
uvicorn_logger.propagate = False
107+
108+
109+
_configure_uvicorn_loggers(loaded_config.log_level)
110+
94111
logger = logging.getLogger("middleware_api")
95112

96113

@@ -151,11 +168,14 @@ def __init__(self, app_config: Config) -> None:
151168

152169
logger.debug("API configuration: %s", self._config.model_dump())
153170

154-
# Apply polling log filter to uvicorn access logger
155-
logging.getLogger("uvicorn.access").addFilter(PollingLogFilter())
156-
157171
@asynccontextmanager
158172
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
173+
# Re-apply our log format after uvicorn has finished its own log setup.
174+
# uvicorn.main() installs its own handlers on uvicorn.access/uvicorn.error
175+
# *after* this module is imported, so we must reconfigure here.
176+
_configure_uvicorn_loggers(self._config.log_level)
177+
logging.getLogger("uvicorn.access").addFilter(PollingLogFilter())
178+
159179
# Initialize business logic and its stores
160180
try:
161181
try:

middleware/api/src/middleware/api/api/v3/harvests.py

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
get_common_deps,
1414
get_content_type,
1515
)
16-
from middleware.api.business_logic import BusinessLogic
16+
from middleware.api.business_logic import BusinessLogic, ConflictError
17+
from middleware.api.business_logic.exceptions import DuplicateArcInHarvestError
1718
from middleware.api.document_store.harvest_document import HarvestDocument
1819
from middleware.shared.api_models.v3 import models as v3_models
1920

@@ -88,42 +89,77 @@ async def get_harvest(
8889
return _map_harvest(harvest)
8990

9091

91-
@router.post("/{harvest_id}/complete", response_model=v3_models.HarvestResponse)
92+
@router.post("/{harvest_id}/complete", response_model=v3_models.HarvestResponse, deprecated=True)
9293
async def complete_harvest( # noqa: PLR0913, PLR0917
9394
request: Request,
9495
harvest_id: str,
9596
bl: Annotated[BusinessLogic, Depends(get_business_logic)],
9697
deps: Annotated[CommonApiDependencies, Depends(get_common_deps)],
9798
client_id: Annotated[str | None, Depends(get_client_id)],
9899
) -> v3_models.HarvestResponse:
99-
"""Mark a harvest as completed."""
100+
"""Mark a harvest as completed. [DEPRECATED].
101+
102+
Use ``PATCH /v3/harvests/{harvest_id}`` with ``status=COMPLETED`` instead.
103+
"""
100104
# Fetch once — used for both RDI auth and passed to complete_harvest (C2).
101105
harvest = await bl.harvest_manager.get_harvest(harvest_id)
102106
if not harvest:
103107
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="Harvest not found")
104108

105109
await deps.validate_rdi_authorized(harvest.rdi, request)
106110

107-
harvest = await bl.harvest_manager.complete_harvest(harvest_id, client_id=client_id, pre_fetched=harvest)
111+
harvest = await bl.harvest_manager.complete_harvest(harvest, client_id=client_id)
108112
return _map_harvest(harvest)
109113

110114

111-
@router.delete("/{harvest_id}", status_code=HTTPStatus.NO_CONTENT)
115+
@router.patch("/{harvest_id}", response_model=v3_models.HarvestResponse)
116+
async def patch_harvest_status( # noqa: PLR0913, PLR0917
117+
request: Request,
118+
harvest_id: str,
119+
request_body: v3_models.PatchHarvestRequest,
120+
bl: Annotated[BusinessLogic, Depends(get_business_logic)],
121+
deps: Annotated[CommonApiDependencies, Depends(get_common_deps)],
122+
client_id: Annotated[str | None, Depends(get_client_id)],
123+
_content_type: Annotated[None, Depends(get_content_type)],
124+
) -> v3_models.HarvestResponse:
125+
"""Transition a harvest run to a terminal status (COMPLETED, CANCELLED, or FAILED).
126+
127+
Only a ``RUNNING`` harvest may be transitioned; any other current status returns
128+
409 Conflict.
129+
"""
130+
harvest = await bl.harvest_manager.get_harvest(harvest_id)
131+
if not harvest:
132+
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="Harvest not found")
133+
134+
await deps.validate_rdi_authorized(harvest.rdi, request)
135+
136+
try:
137+
harvest = await bl.harvest_manager.transition_harvest(harvest, request_body.status, client_id=client_id)
138+
except ConflictError as exc:
139+
raise HTTPException(status_code=HTTPStatus.CONFLICT, detail=str(exc)) from exc
140+
141+
return _map_harvest(harvest)
142+
143+
144+
@router.delete("/{harvest_id}", status_code=HTTPStatus.NO_CONTENT, deprecated=True)
112145
async def cancel_harvest(
113146
request: Request,
114147
harvest_id: str,
115148
bl: Annotated[BusinessLogic, Depends(get_business_logic)],
116149
deps: Annotated[CommonApiDependencies, Depends(get_common_deps)],
117150
client_id: Annotated[str | None, Depends(get_client_id)],
118151
) -> None:
119-
"""Cancel a harvest run."""
152+
"""Cancel a harvest run. [DEPRECATED].
153+
154+
Use ``PATCH /v3/harvests/{harvest_id}`` with ``status=CANCELLED`` instead.
155+
"""
120156
# Fetch once — used for both RDI auth and passed to cancel_harvest (C2).
121157
harvest = await bl.harvest_manager.get_harvest(harvest_id)
122158
if not harvest:
123159
raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail="Harvest not found")
124160

125161
await deps.validate_rdi_authorized(harvest.rdi, request)
126-
await bl.harvest_manager.cancel_harvest(harvest_id, client_id=client_id, pre_fetched=harvest)
162+
await bl.harvest_manager.cancel_harvest(harvest, client_id=client_id)
127163

128164

129165
@router.post("/{harvest_id}/arcs", response_model=v3_models.ArcResponse)
@@ -146,7 +182,10 @@ async def submit_arc_in_harvest( # noqa: PLR0913, PLR0917
146182
rdi = harvest.rdi
147183
await deps.validate_rdi_authorized(rdi, request)
148184

149-
result = await bl.create_or_update_arc(rdi, request_body.arc, client_id, harvest_id=harvest_id)
185+
try:
186+
result = await bl.create_or_update_arc(rdi, request_body.arc, client_id, harvest_id=harvest_id)
187+
except DuplicateArcInHarvestError as exc:
188+
raise HTTPException(status_code=HTTPStatus.CONFLICT, detail=str(exc)) from exc
150189

151190
arc_id = result.arc.id
152191
metadata = await bl.get_metadata(arc_id)

middleware/api/src/middleware/api/business_logic/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
AccessDeniedError,
1010
BusinessLogicError,
1111
ConflictError,
12+
DuplicateArcInHarvestError,
1213
InvalidJsonSemanticError,
1314
ResourceNotFoundError,
1415
SetupError,
@@ -23,6 +24,7 @@
2324
"BusinessLogic",
2425
"BusinessLogicError",
2526
"ConflictError",
27+
"DuplicateArcInHarvestError",
2628
"InvalidJsonSemanticError",
2729
"ResourceNotFoundError",
2830
"SetupError",

middleware/api/src/middleware/api/business_logic/arc_manager.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@
99
from opentelemetry import trace
1010

1111
from middleware.api.arc_store import ArcStore, ArcStoreTransientError
12-
from middleware.api.document_store import DocumentStore
12+
from middleware.api.document_store import DocumentStore, DuplicateArcError
1313
from middleware.api.document_store.arc_document import ArcEvent, ArcEventType
1414
from middleware.api.utils import calculate_arc_id, extract_identifier
1515
from middleware.shared.api_models.common.models import ArcOperationResult, ArcResponse, ArcStatus
1616

17-
from .exceptions import BusinessLogicError, InvalidJsonSemanticError, TransientError
17+
from .exceptions import BusinessLogicError, DuplicateArcInHarvestError, InvalidJsonSemanticError, TransientError
1818
from .ports import TaskDispatcher
1919
from .task_payloads import ArcSyncTask
2020

@@ -101,13 +101,6 @@ async def create_or_update_arc(
101101
has_changes = doc_result.has_changes
102102
should_trigger_git = is_new or has_changes
103103

104-
if harvest_id:
105-
await self._doc_store.increment_harvest_statistics(
106-
harvest_id,
107-
is_new=is_new,
108-
has_changes=has_changes,
109-
)
110-
111104
logger.info(
112105
"[%s] Stored ARC %s in CouchDB: is_new=%s, has_changes=%s, trigger_git=%s",
113106
client_id,
@@ -147,6 +140,8 @@ async def create_or_update_arc(
147140
raise
148141
if isinstance(e, BusinessLogicError):
149142
raise
143+
if isinstance(e, DuplicateArcError):
144+
raise DuplicateArcInHarvestError(str(e)) from e
150145
raise BusinessLogicError(f"unexpected error encountered: {str(e)}") from e
151146

152147
async def sync_to_gitlab(self, rdi: str, arc: dict[str, Any]) -> None:

middleware/api/src/middleware/api/business_logic/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ class ConflictError(BusinessLogicError):
1717
"""Arises when the request conflicts with the current resource state."""
1818

1919

20+
class DuplicateArcInHarvestError(ConflictError):
21+
"""Arises when the same ARC is submitted more than once within a harvest run."""
22+
23+
2024
class InvalidJsonSemanticError(BusinessLogicError):
2125
"""Arises when the ARC JSON syntax is valid but semantically incorrect.
2226

0 commit comments

Comments
 (0)