Skip to content

Commit d304649

Browse files
committed
refactor: Extract ARC processing logic, data models, builder, and stats into dedicated modules.
1 parent 5d353ea commit d304649

9 files changed

Lines changed: 719 additions & 674 deletions

File tree

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
"""ARC object building logic for the SQL-to-ARC conversion process."""
2+
3+
import json
4+
import logging
5+
from collections import defaultdict
6+
from typing import Any, cast
7+
8+
from arctrl import ( # type: ignore[import-untyped]
9+
ARC,
10+
ArcTable,
11+
CompositeCell,
12+
CompositeHeader,
13+
IOType,
14+
OntologyAnnotation,
15+
)
16+
17+
from middleware.sql_to_arc.mapper import (
18+
map_assay,
19+
map_contact,
20+
map_investigation,
21+
map_publication,
22+
map_study,
23+
)
24+
from middleware.sql_to_arc.models import ArcBuildData
25+
26+
logger = logging.getLogger(__name__)
27+
28+
29+
def _add_studies_to_arc(arc: ARC, study_rows: list[dict[str, Any]]) -> dict[str, Any]:
30+
"""Add studies to ARC and return study map."""
31+
study_map = {}
32+
for s_row in study_rows:
33+
study = map_study(s_row)
34+
arc.AddRegisteredStudy(study)
35+
study_map[str(s_row["identifier"])] = study
36+
return study_map
37+
38+
39+
def _add_assays_to_arc(arc: ARC, assay_rows: list[dict[str, Any]], study_map: dict[str, Any]) -> dict[str, Any]:
40+
"""Add assays to ARC, link to studies, and return assay map."""
41+
assay_map = {}
42+
for a_row in assay_rows:
43+
assay = map_assay(a_row)
44+
arc.AddAssay(assay)
45+
assay_map[str(a_row["identifier"])] = assay
46+
47+
# Link Assay to Studies
48+
study_ref_json = a_row.get("study_ref")
49+
if not study_ref_json:
50+
continue
51+
52+
try:
53+
study_refs = json.loads(study_ref_json)
54+
if isinstance(study_refs, list):
55+
for s_ref in study_refs:
56+
if s_ref in study_map:
57+
study_map[s_ref].RegisterAssay(assay.Identifier)
58+
except json.JSONDecodeError:
59+
pass
60+
61+
return assay_map
62+
63+
64+
def _add_contacts_to_arc(
65+
arc: ARC,
66+
inv_id: str,
67+
contacts: list[dict[str, Any]],
68+
study_map: dict[str, Any],
69+
assay_map: dict[str, Any],
70+
) -> None:
71+
"""Add contacts to investigation, studies, and assays."""
72+
# Investigation contacts
73+
inv_contacts = [
74+
c for c in contacts if c.get("investigation_ref") == inv_id and c.get("target_type") == "investigation"
75+
]
76+
for c_row in inv_contacts:
77+
arc.Contacts.append(map_contact(c_row))
78+
79+
# Study contacts
80+
for s_id, study in study_map.items():
81+
stu_contacts = [
82+
c
83+
for c in contacts
84+
if c.get("investigation_ref") == inv_id and c.get("target_type") == "study" and c.get("target_ref") == s_id
85+
]
86+
for c_row in stu_contacts:
87+
study.Contacts.append(map_contact(c_row))
88+
89+
# Assay contacts
90+
for a_id, assay in assay_map.items():
91+
ass_contacts = [
92+
c
93+
for c in contacts
94+
if c.get("investigation_ref") == inv_id and c.get("target_type") == "assay" and c.get("target_ref") == a_id
95+
]
96+
for c_row in ass_contacts:
97+
assay.Performers.append(map_contact(c_row))
98+
99+
100+
def _add_publications_to_arc(
101+
arc: ARC, inv_id: str, publications: list[dict[str, Any]], study_map: dict[str, Any]
102+
) -> None:
103+
"""Add publications to investigation and studies."""
104+
# Investigation publications
105+
inv_pubs = [
106+
p for p in publications if p.get("investigation_ref") == inv_id and p.get("target_type") == "investigation"
107+
]
108+
for p_row in inv_pubs:
109+
arc.Publications.append(map_publication(p_row))
110+
111+
# Study publications
112+
for s_id, study in study_map.items():
113+
stu_pubs = [
114+
p
115+
for p in publications
116+
if p.get("investigation_ref") == inv_id and p.get("target_type") == "study" and p.get("target_ref") == s_id
117+
]
118+
for p_row in stu_pubs:
119+
study.Publications.append(map_publication(p_row))
120+
121+
122+
def _get_column_key(r: dict[str, Any]) -> tuple:
123+
"""Extract a unique key for a column definition."""
124+
return (
125+
r.get("column_type"),
126+
r.get("column_io_type"),
127+
r.get("column_value"),
128+
r.get("column_annotation_term"),
129+
r.get("column_annotation_uri"),
130+
r.get("column_annotation_version"),
131+
r.get("column_name"), # Fallback for simple tests
132+
)
133+
134+
135+
def _build_header(key: tuple) -> CompositeHeader | None:
136+
"""Build a CompositeHeader from a column key tuple."""
137+
c_type, c_io, c_val, c_ann_term, c_ann_uri, c_ann_ver, c_name = key
138+
header: CompositeHeader | None = None
139+
try:
140+
oa = OntologyAnnotation(c_ann_term or "", c_ann_uri or "", c_ann_ver or "")
141+
if c_type == "input":
142+
header = CompositeHeader.input(IOType.of_string(c_io or "source_name"))
143+
elif c_type == "output":
144+
header = CompositeHeader.output(IOType.of_string(c_io or "sample_name"))
145+
elif c_type == "characteristic":
146+
header = CompositeHeader.characteristic(oa)
147+
elif c_type == "factor":
148+
header = CompositeHeader.factor(oa)
149+
elif c_type == "parameter":
150+
header = CompositeHeader.parameter(oa)
151+
elif c_type == "component":
152+
header = CompositeHeader.component(oa)
153+
elif c_type == "comment":
154+
header = CompositeHeader.comment(c_val or "")
155+
elif c_type == "performer":
156+
header = CompositeHeader.performer()
157+
elif c_type == "date":
158+
header = CompositeHeader.date()
159+
elif c_name:
160+
# Fallback
161+
header = CompositeHeader.OfHeaderString(c_name)
162+
except (ValueError, TypeError, AttributeError) as e:
163+
logger.warning("Failed to create header for type %s: %s", c_type, e)
164+
return header
165+
166+
167+
def _build_column_cells(
168+
rows_map: dict[int, dict[str, Any]], max_row_idx: int, header: CompositeHeader
169+
) -> list[CompositeCell]:
170+
"""Build a list of CompositeCell objects for a column."""
171+
col_cells = []
172+
for idx in range(max_row_idx + 1):
173+
cell_row = rows_map.get(idx)
174+
if not cell_row:
175+
col_cells.append(CompositeCell.free_text(""))
176+
continue
177+
178+
cv = cell_row.get("cell_value")
179+
cat = cell_row.get("cell_annotation_term")
180+
cau = cell_row.get("cell_annotation_uri")
181+
cav = cell_row.get("cell_annotation_version")
182+
v = cell_row.get("value") # Fallback for old/simple tests
183+
184+
# Unitized cell?
185+
if cv is not None and cat is not None:
186+
col_cells.append(CompositeCell.unitized(str(cv), OntologyAnnotation(cat, cau or "", cav or "")))
187+
elif cat is not None:
188+
col_cells.append(CompositeCell.term(OntologyAnnotation(cat, cau or "", cav or "")))
189+
elif cv is not None:
190+
if header.IsTermColumn:
191+
col_cells.append(CompositeCell.term(OntologyAnnotation(str(cv), "", "")))
192+
else:
193+
col_cells.append(CompositeCell.free_text(str(cv)))
194+
elif v is not None:
195+
if header.IsTermColumn:
196+
col_cells.append(CompositeCell.term(OntologyAnnotation(str(v), "", "")))
197+
else:
198+
col_cells.append(CompositeCell.free_text(str(v)))
199+
else:
200+
col_cells.append(CompositeCell.free_text(""))
201+
return col_cells
202+
203+
204+
def _build_arc_table(t_name: str, rows: list[dict[str, Any]]) -> ArcTable | None:
205+
"""Build an ArcTable from flat database rows."""
206+
if not rows:
207+
return None
208+
209+
table = ArcTable.init(t_name)
210+
211+
# Determine max row index
212+
max_row_idx = max((cast(int, r.get("row_index", 0)) for r in rows), default=-1)
213+
if max_row_idx < 0:
214+
return None
215+
216+
col_keys: list[tuple] = []
217+
seen_keys = set()
218+
col_to_rows: dict[tuple, dict[int, dict[str, Any]]] = defaultdict(dict)
219+
220+
for r in rows:
221+
key = _get_column_key(r)
222+
if key not in seen_keys:
223+
col_keys.append(key)
224+
seen_keys.add(key)
225+
col_to_rows[key][cast(int, r.get("row_index", 0))] = r
226+
227+
for key in col_keys:
228+
header = _build_header(key)
229+
if not header:
230+
continue
231+
232+
# Build Cells for this column
233+
col_cells = _build_column_cells(col_to_rows[key], max_row_idx, header)
234+
table.AddColumn(header, col_cells)
235+
236+
return table
237+
238+
239+
def _process_annotation_tables(
240+
inv_id: str, annotations: list[dict[str, Any]], study_map: dict[str, Any], assay_map: dict[str, Any]
241+
) -> None:
242+
"""Process and add annotation tables."""
243+
tables_groups = defaultdict(list)
244+
for ann in annotations:
245+
if ann.get("investigation_ref") == inv_id:
246+
key = (ann.get("target_type"), ann.get("target_ref"), ann.get("table_name"))
247+
tables_groups[key].append(ann)
248+
249+
for (t_type, t_ref, t_name), rows in tables_groups.items():
250+
if not t_name:
251+
continue
252+
253+
target = None
254+
if t_type == "study" and isinstance(t_ref, str):
255+
target = study_map.get(t_ref)
256+
elif t_type == "assay" and isinstance(t_ref, str):
257+
target = assay_map.get(t_ref)
258+
259+
if target:
260+
table = _build_arc_table(t_name, rows)
261+
if table:
262+
target.AddTable(table)
263+
264+
265+
def build_single_arc_task(data: ArcBuildData) -> ARC:
266+
"""Build a single ARC object from data.
267+
268+
This function is designed to run in a separate process.
269+
"""
270+
inv_id = str(data.investigation_row["identifier"])
271+
272+
# Map Investigation and create ARC
273+
arc_inv = map_investigation(data.investigation_row)
274+
arc = ARC.from_arc_investigation(arc_inv)
275+
276+
# Identify relevant studies and assays
277+
relevant_studies = [s for s in data.studies if s.get("investigation_ref") == inv_id]
278+
relevant_assays = [a for a in data.assays if a.get("investigation_ref") == inv_id]
279+
280+
# Add studies and assays
281+
study_map = _add_studies_to_arc(arc, relevant_studies)
282+
assay_map = _add_assays_to_arc(arc, relevant_assays, study_map)
283+
284+
# Add contacts and publications
285+
_add_contacts_to_arc(arc, inv_id, data.contacts, study_map, assay_map)
286+
_add_publications_to_arc(arc, inv_id, data.publications, study_map)
287+
288+
# Process annotation tables
289+
_process_annotation_tables(inv_id, data.annotations, study_map, assay_map)
290+
291+
return arc

0 commit comments

Comments
 (0)