-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdemo_api_main.py
More file actions
179 lines (137 loc) · 5.45 KB
/
demo_api_main.py
File metadata and controls
179 lines (137 loc) · 5.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
"""
Demo API Mock for the FAIRagro SQL-to-ARC converter.
This module provides a lightweight FastAPI server that simulates the Middleware API.
It receives ARC RO-Crate payloads, deserializes them using the arctrl library,
and writes the resulting ARC directory structure to the local file system.
"""
import json
import os
import re
import traceback
from datetime import UTC, datetime
from pathlib import Path
from arctrl import ARC
from arctrl.py.fable_modules.fable_library.async_ import start_as_task # type: ignore[import-untyped]
from fastapi import FastAPI, Request
app = FastAPI()
# Root directory under which all ARC data and error logs are stored.
OUTPUT_ROOT = Path("/data/arcs")
def _get_target_owner() -> tuple[int, int] | None:
uid_value = os.environ.get("LOCAL_UID")
gid_value = os.environ.get("LOCAL_GID")
if uid_value is None or gid_value is None:
return None
try:
return int(uid_value), int(gid_value)
except ValueError:
print(f"Invalid LOCAL_UID/LOCAL_GID: {uid_value}/{gid_value}")
return None
def _chown_tree(path: Path) -> None:
owner = _get_target_owner()
if owner is None or not path.exists():
return
uid, gid = owner
def apply_ownership(target: Path) -> None:
os.chown(target, uid, gid)
apply_ownership(path)
if path.is_dir():
for root, dirs, files in os.walk(path):
root_path = Path(root)
apply_ownership(root_path)
for name in dirs:
apply_ownership(root_path / name)
for name in files:
apply_ownership(root_path / name)
def _handle_error(arc_dir: Path, rdi: str, arc_id: str, exc: Exception) -> None:
tb = traceback.format_exc()
print(f"Error writing ARC for {rdi}/{arc_id} (dir={arc_dir}): {exc}\n{tb}")
# Pre-compiled pattern for safe ARC directory names (no path traversal, predictable charset).
_SAFE_NAME_PATTERN = re.compile(r"^[A-Za-z0-9_.-]{1,64}$")
def _generate_random_arc_id() -> str:
return f"arc_{os.urandom(4).hex()}"
def _derive_safe_arc_id(base_dir: Path, raw_id: object) -> tuple[str, Path]:
"""
Derive a safe ARC identifier and corresponding directory path.
Always returns a valid (arc_id, path) pair that is guaranteed to be
contained within base_dir. Falls back to a random ID when the provided
raw_id cannot be used safely.
"""
# Resolve symlinks on the base directory once so all comparisons are stable.
base_real = Path(os.path.realpath(base_dir))
def _fallback() -> tuple[str, Path]:
rid = _generate_random_arc_id()
return rid, base_real / rid
if not (isinstance(raw_id, str) and raw_id.strip()):
return _fallback()
safe_name = os.path.normpath(Path(raw_id.strip()).name)
if not safe_name or safe_name in {".", ".."} or not _SAFE_NAME_PATTERN.match(safe_name):
return _fallback()
# Normalize with realpath and verify containment *before* returning the path.
# This is the CodeQL-recommended pattern for preventing path traversal:
# construct → realpath → startswith-check.
candidate_real = Path(os.path.realpath(base_real / safe_name))
if not str(candidate_real).startswith(str(base_real) + os.sep):
return _fallback()
return safe_name, candidate_real
def _rejected_response(rdi: str | None, now: str) -> dict[str, str | dict[str, str]]:
return {
"arc_id": "invalid",
"status": "error",
"metadata": {
"rdi": rdi or "unknown",
"arc_hash": "demo_hash",
"status": "REJECTED",
"first_seen": now,
"last_seen": now,
},
}
@app.post("/v3/arcs")
async def upload_arc(request: Request) -> dict[str, str | dict[str, str]]:
"""Handle the submission of an ARC RO-Crate.
Receives the RO-Crate JSON-LD payload, validates it, and uses the arctrl
library to reconstruct the ARC directory structure. Results are saved to
the local 'demo_output' volume.
"""
rdi = request.query_params.get("rdi")
data = await request.json()
arc_payload = data.get("arc", data)
if rdi is None:
rdi = data.get("rdi", "unknown")
output_path = OUTPUT_ROOT
output_path.mkdir(parents=True, exist_ok=True)
_chown_tree(output_path)
now = datetime.now(UTC).isoformat()
arc_id, arc_dir = _derive_safe_arc_id(output_path, arc_payload.get("identifier"))
payload_path = arc_dir.with_suffix(".payload.json")
with open(payload_path, "w", encoding="utf-8") as handle:
json.dump(arc_payload, handle, indent=2)
_chown_tree(payload_path)
try:
arc_json = json.dumps(arc_payload)
arc = ARC.from_rocrate_json_string(arc_json)
await start_as_task(arc.WriteAsync(str(arc_dir)))
_chown_tree(arc_dir)
print(f"Saved ARC structure for {rdi} as {arc_id} using arctrl")
except (json.JSONDecodeError, OSError, RuntimeError) as exc:
_handle_error(arc_dir, rdi, arc_id, exc)
except Exception as exc: # noqa: BLE001
_handle_error(arc_dir, rdi, arc_id, exc)
return {
"arc_id": arc_id,
"status": "created",
"metadata": {
"rdi": rdi,
"arc_hash": "demo_hash",
"status": "ACTIVE",
"first_seen": now,
"last_seen": now,
},
}
@app.get("/live")
def live() -> dict[str, str]:
"""
Liveness probe for the demo API.
Returns:
dict: A simple status indicator.
"""
return {"status": "ok"}