Skip to content

Commit 7ef9f36

Browse files
committed
refactor: Reimplement SQL-to-ARC conversion with dedicated modules for builder, processor, database, models, and stats tracking.
1 parent d49d4d6 commit 7ef9f36

21 files changed

Lines changed: 2368 additions & 1200 deletions

middleware/sql_to_arc/README.md

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ The `sql_to_arc` package converts data from a PostgreSQL database schema into FA
44

55
## Features
66

7-
- Async PostgreSQL access via `psycopg` (v3)
8-
- Mapping of Investigations, Studies, Assays to ARCtrl models
7+
- Async Database access via `sqlalchemy` (asyncio with asyncpg, aiosqlite, etc.)
8+
- SQL View-based mapping of data to ARCtrl models
99
- Batch upload to the Middleware API using `ApiClient`
10-
- Pydantic-based configuration
10+
- Pydantic-based configuration with generic Connection String support
1111

1212
## Requirements
1313

@@ -39,10 +39,7 @@ Configuration is defined by `middleware.sql_to_arc.config.Config` and can be pro
3939

4040
```python
4141
config = Config.from_data({
42-
"db_name": "edaphobase",
43-
"db_user": "postgres",
44-
"db_password": "postgres",
45-
"db_host": "localhost",
42+
"connection_string": "postgresql+asyncpg://user:pass@localhost:5432/edaphobase",
4643
"rdi": "edaphobase",
4744
"api_client": {
4845
"api_url": "http://localhost:8000",

middleware/sql_to_arc/config.example.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ db_port: 5432
2424
# This is used to tag or namespace the converted ARCs
2525
rdi: "edaphobase"
2626

27+
2728
# API Client Configuration
2829
# -----------------------
2930
# Settings for connecting to the Middleware API to upload ARCs

middleware/sql_to_arc/pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ readme = "README.md"
66
requires-python = ">=3.12"
77
dependencies = [
88
"arctrl>=3.0.0b15",
9-
"psycopg[binary]>=3.3.2",
9+
"sqlalchemy>=2.0.45",
1010
"pydantic>=2.12.5",
1111
"shared>=0.0.1",
1212
"api_client>=0.0.1",
13-
"opentelemetry-api>=1.39.1",
13+
"opentelemetry-api>=1.30.0",
1414
]
1515

1616
[tool.uv.sources]
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
"""ARC object building logic for the SQL-to-ARC conversion process."""
2+
3+
import json
4+
import logging
5+
from collections import defaultdict
6+
from typing import Any, cast
7+
8+
from arctrl import ( # type: ignore[import-untyped]
9+
ARC,
10+
ArcTable,
11+
CompositeCell,
12+
CompositeHeader,
13+
IOType,
14+
OntologyAnnotation,
15+
)
16+
17+
from middleware.sql_to_arc.mapper import (
18+
map_assay,
19+
map_contact,
20+
map_investigation,
21+
map_publication,
22+
map_study,
23+
)
24+
from middleware.sql_to_arc.models import ArcBuildData
25+
26+
logger = logging.getLogger(__name__)
27+
28+
29+
def _add_studies_to_arc(arc: ARC, study_rows: list[dict[str, Any]]) -> dict[str, Any]:
30+
"""Add studies to ARC and return study map."""
31+
study_map = {}
32+
for s_row in study_rows:
33+
study = map_study(s_row)
34+
arc.AddRegisteredStudy(study)
35+
study_map[str(s_row["identifier"])] = study
36+
return study_map
37+
38+
39+
def _add_assays_to_arc(arc: ARC, assay_rows: list[dict[str, Any]], study_map: dict[str, Any]) -> dict[str, Any]:
40+
"""Add assays to ARC, link to studies, and return assay map."""
41+
assay_map = {}
42+
for a_row in assay_rows:
43+
assay = map_assay(a_row)
44+
arc.AddAssay(assay)
45+
assay_map[str(a_row["identifier"])] = assay
46+
47+
# Link Assay to Studies
48+
study_ref_json = a_row.get("study_ref")
49+
if not study_ref_json:
50+
continue
51+
52+
try:
53+
study_refs = json.loads(study_ref_json)
54+
if isinstance(study_refs, list):
55+
for s_ref in study_refs:
56+
if s_ref in study_map:
57+
study_map[s_ref].RegisterAssay(assay.Identifier)
58+
except json.JSONDecodeError:
59+
pass
60+
61+
return assay_map
62+
63+
64+
def _add_contacts_to_arc(
65+
arc: ARC,
66+
inv_id: str,
67+
contacts: list[dict[str, Any]],
68+
study_map: dict[str, Any],
69+
assay_map: dict[str, Any],
70+
) -> None:
71+
"""Add contacts to investigation, studies, and assays."""
72+
# Investigation contacts
73+
inv_contacts = [
74+
c for c in contacts if c.get("investigation_ref") == inv_id and c.get("target_type") == "investigation"
75+
]
76+
for c_row in inv_contacts:
77+
arc.Contacts.append(map_contact(c_row))
78+
79+
# Study contacts
80+
for s_id, study in study_map.items():
81+
stu_contacts = [
82+
c
83+
for c in contacts
84+
if c.get("investigation_ref") == inv_id and c.get("target_type") == "study" and c.get("target_ref") == s_id
85+
]
86+
for c_row in stu_contacts:
87+
study.Contacts.append(map_contact(c_row))
88+
89+
# Assay contacts
90+
for a_id, assay in assay_map.items():
91+
ass_contacts = [
92+
c
93+
for c in contacts
94+
if c.get("investigation_ref") == inv_id and c.get("target_type") == "assay" and c.get("target_ref") == a_id
95+
]
96+
for c_row in ass_contacts:
97+
assay.Performers.append(map_contact(c_row))
98+
99+
100+
def _add_publications_to_arc(
101+
arc: ARC, inv_id: str, publications: list[dict[str, Any]], study_map: dict[str, Any]
102+
) -> None:
103+
"""Add publications to investigation and studies."""
104+
# Investigation publications
105+
inv_pubs = [
106+
p for p in publications if p.get("investigation_ref") == inv_id and p.get("target_type") == "investigation"
107+
]
108+
for p_row in inv_pubs:
109+
arc.Publications.append(map_publication(p_row))
110+
111+
# Study publications
112+
for s_id, study in study_map.items():
113+
stu_pubs = [
114+
p
115+
for p in publications
116+
if p.get("investigation_ref") == inv_id and p.get("target_type") == "study" and p.get("target_ref") == s_id
117+
]
118+
for p_row in stu_pubs:
119+
study.Publications.append(map_publication(p_row))
120+
121+
122+
def _get_column_key(r: dict[str, Any]) -> tuple:
123+
"""Extract a unique key for a column definition."""
124+
return (
125+
r.get("column_type"),
126+
r.get("column_io_type"),
127+
r.get("column_value"),
128+
r.get("column_annotation_term"),
129+
r.get("column_annotation_uri"),
130+
r.get("column_annotation_version"),
131+
r.get("column_name"), # Fallback for simple tests
132+
)
133+
134+
135+
def _build_header(key: tuple) -> CompositeHeader | None:
136+
"""Build a CompositeHeader from a column key tuple."""
137+
c_type, c_io, c_val, c_ann_term, c_ann_uri, c_ann_ver, c_name = key
138+
try:
139+
oa = OntologyAnnotation(c_ann_term or "", c_ann_uri or "", c_ann_ver or "")
140+
141+
# Dispatch table for different header types
142+
handlers = {
143+
"input": lambda: CompositeHeader.input(IOType.of_string(c_io or "source_name")),
144+
"output": lambda: CompositeHeader.output(IOType.of_string(c_io or "sample_name")),
145+
"characteristic": lambda: CompositeHeader.characteristic(oa),
146+
"factor": lambda: CompositeHeader.factor(oa),
147+
"parameter": lambda: CompositeHeader.parameter(oa),
148+
"component": lambda: CompositeHeader.component(oa),
149+
"comment": lambda: CompositeHeader.comment(c_val or ""),
150+
"performer": CompositeHeader.performer,
151+
"date": CompositeHeader.date,
152+
}
153+
154+
if c_type in handlers:
155+
return handlers[c_type]()
156+
if c_name:
157+
# Fallback for simple/untyped headers
158+
return CompositeHeader.OfHeaderString(c_name)
159+
160+
except (ValueError, TypeError, AttributeError) as e:
161+
logger.warning("Failed to create header for type %s: %s", c_type, e)
162+
return None
163+
164+
165+
def _build_single_cell(cell_row: dict[str, Any], header: CompositeHeader) -> CompositeCell:
166+
"""Build a single CompositeCell from a database row."""
167+
cv = cell_row.get("cell_value")
168+
cat = cell_row.get("cell_annotation_term")
169+
cau = cell_row.get("cell_annotation_uri") or ""
170+
cav = cell_row.get("cell_annotation_version") or ""
171+
v = cell_row.get("value") # Fallback for old/simple tests
172+
173+
# Unitized cell (value + ontology term)
174+
if cv is not None and cat is not None:
175+
return CompositeCell.unitized(str(cv), OntologyAnnotation(cat, cau, cav))
176+
177+
# Term cell (ontology term only)
178+
if cat is not None:
179+
return CompositeCell.term(OntologyAnnotation(cat, cau, cav))
180+
181+
# Text value? (either from new schema 'cell_value' or fallback 'value')
182+
val_to_use = cv if cv is not None else v
183+
if val_to_use is not None:
184+
if header.IsTermColumn:
185+
# If the column expects a term, wrap the text in an annotation
186+
return CompositeCell.term(OntologyAnnotation(str(val_to_use), "", ""))
187+
return CompositeCell.free_text(str(val_to_use))
188+
189+
return CompositeCell.free_text("")
190+
191+
192+
def _build_column_cells(
193+
rows_map: dict[int, dict[str, Any]], max_row_idx: int, header: CompositeHeader
194+
) -> list[CompositeCell]:
195+
"""Build a list of CompositeCell objects for a column."""
196+
col_cells = []
197+
for idx in range(max_row_idx + 1):
198+
cell_row = rows_map.get(idx)
199+
if not cell_row:
200+
col_cells.append(CompositeCell.free_text(""))
201+
else:
202+
col_cells.append(_build_single_cell(cell_row, header))
203+
return col_cells
204+
205+
206+
def _build_arc_table(t_name: str, rows: list[dict[str, Any]]) -> ArcTable | None:
207+
"""Build an ArcTable from flat database rows."""
208+
if not rows:
209+
return None
210+
211+
table = ArcTable.init(t_name)
212+
213+
# Determine max row index
214+
max_row_idx = max((cast(int, r.get("row_index", 0)) for r in rows), default=-1)
215+
if max_row_idx < 0:
216+
return None
217+
218+
col_keys: list[tuple] = []
219+
seen_keys = set()
220+
col_to_rows: dict[tuple, dict[int, dict[str, Any]]] = defaultdict(dict)
221+
222+
for r in rows:
223+
key = _get_column_key(r)
224+
if key not in seen_keys:
225+
col_keys.append(key)
226+
seen_keys.add(key)
227+
col_to_rows[key][cast(int, r.get("row_index", 0))] = r
228+
229+
for key in col_keys:
230+
header = _build_header(key)
231+
if not header:
232+
continue
233+
234+
# Build Cells for this column
235+
col_cells = _build_column_cells(col_to_rows[key], max_row_idx, header)
236+
table.AddColumn(header, col_cells)
237+
238+
return table
239+
240+
241+
def _process_annotation_tables(
242+
inv_id: str, annotations: list[dict[str, Any]], study_map: dict[str, Any], assay_map: dict[str, Any]
243+
) -> None:
244+
"""Process and add annotation tables."""
245+
tables_groups = defaultdict(list)
246+
for ann in annotations:
247+
if ann.get("investigation_ref") == inv_id:
248+
key = (ann.get("target_type"), ann.get("target_ref"), ann.get("table_name"))
249+
tables_groups[key].append(ann)
250+
251+
for (t_type, t_ref, t_name), rows in tables_groups.items():
252+
if not t_name:
253+
continue
254+
255+
target = None
256+
if t_type == "study" and isinstance(t_ref, str):
257+
target = study_map.get(t_ref)
258+
elif t_type == "assay" and isinstance(t_ref, str):
259+
target = assay_map.get(t_ref)
260+
261+
if target:
262+
table = _build_arc_table(t_name, rows)
263+
if table:
264+
target.AddTable(table)
265+
266+
267+
def build_single_arc_task(data: ArcBuildData) -> ARC:
268+
"""Build a single ARC object from data.
269+
270+
This function is designed to run in a separate process.
271+
"""
272+
inv_id = str(data.investigation_row["identifier"])
273+
274+
# Map Investigation and create ARC
275+
arc_inv = map_investigation(data.investigation_row)
276+
arc = ARC.from_arc_investigation(arc_inv)
277+
278+
# Identify relevant studies and assays
279+
relevant_studies = [s for s in data.studies if s.get("investigation_ref") == inv_id]
280+
relevant_assays = [a for a in data.assays if a.get("investigation_ref") == inv_id]
281+
282+
# Add studies and assays
283+
study_map = _add_studies_to_arc(arc, relevant_studies)
284+
assay_map = _add_assays_to_arc(arc, relevant_assays, study_map)
285+
286+
# Add contacts and publications
287+
_add_contacts_to_arc(arc, inv_id, data.contacts, study_map, assay_map)
288+
_add_publications_to_arc(arc, inv_id, data.publications, study_map)
289+
290+
# Process annotation tables
291+
_process_annotation_tables(inv_id, data.annotations, study_map, assay_map)
292+
293+
return arc

0 commit comments

Comments
 (0)