1111
1212
1313##############
14- # Class: Monarch KG source loader
14+ # Class: Monarch KG base loader
1515#
16- # Desc: Class that loads/parses the Monarch KG data.
16+ # Desc: Base class with shared logic for loading/parsing the Monarch KG data.
17+ # Subclasses control whether predicate/knowledge-source filtering is applied.
1718##############
18- class MonarchKGLoader (SourceDataLoader ):
19+ class MonarchKGBaseLoader (SourceDataLoader ):
1920
20- source_id : str = 'MonarchKG'
21+ source_id : str = None # overridden by subclass
2122 provenance_id : str = 'infores:monarchinitiative'
22- parsing_version : str = '1.3 '
23+ parsing_version : str = '1.5 '
2324
2425 def __init__ (self , test_mode : bool = False , source_data_dir : str = None ):
2526 """
@@ -28,35 +29,11 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None):
2829 """
2930 super ().__init__ (test_mode = test_mode , source_data_dir = source_data_dir )
3031
31- # there is a /latest/ for this url, but without a valid get_latest_source_version function,
32- # it could create a mismatch, pin to this version for now
33- self .data_url = 'https://data.monarchinitiative.org/monarch-kg-dev/latest/'
32+ self .data_url = 'https://data.monarchinitiative.org/monarch-kg/latest/'
3433 self .monarch_graph_archive = 'monarch-kg.jsonl.tar.gz'
3534 self .monarch_edge_file_archive_path = 'monarch-kg_edges.jsonl'
3635 self .data_files = [self .monarch_graph_archive ]
3736
38- self .desired_predicates = {
39- 'biolink:causes' ,
40- 'biolink:contributes_to' ,
41- 'biolink:has_phenotype' ,
42- 'biolink:expressed_in'
43- }
44-
45- self .knowledge_source_ignore_list = {
46- 'infores:ctd' ,
47- 'infores:reactome' ,
48- 'infores:goa' ,
49- 'infores:cafa' ,
50- 'infores:bhf-ucl' ,
51- 'infores:aruk-ucl' ,
52- 'infores:parkinsonsuk-ucl' ,
53- 'infores:alzheimers-university-of-toronto' ,
54- 'infores:agbase' ,
55- 'infores:dictybase' ,
56- 'infores:ntnu-sb' ,
57- 'infores:wb'
58- }
59-
6037 self .knowledge_source_mapping = {
6138 'infores:alliancegenome' : 'infores:agrkb' ,
6239 'infores:hgnc-ucl' : 'infores:hgnc' ,
@@ -69,10 +46,12 @@ def get_latest_source_version(self) -> str:
6946 """
7047 latest_version = None
7148 try :
72- metadata_yaml : requests .Response = requests .get ("https://data.monarchinitiative.org/monarch-kg-dev/latest/metadata.yaml" )
49+ metadata_yaml : requests .Response = requests .get (
50+ 'https://data.monarchinitiative.org/monarch-kg/latest/metadata.yaml'
51+ )
7352 for line in metadata_yaml .text .split ('\n ' ):
74- if " kg-version:" in line :
75- latest_version = line .replace (" kg-version:" , "" ).strip ()
53+ if ' kg-version:' in line :
54+ latest_version = line .replace (' kg-version:' , '' ).strip ()
7655 if latest_version is None :
7756 raise ValueError ("Cannot find 'kg-version' in Monarch KG metadata yaml." )
7857 except Exception as e :
@@ -85,50 +64,56 @@ def get_data(self) -> bool:
8564 data_puller .pull_via_http (source_data_url , self .data_path )
8665 return True
8766
67+ def filter_edge (self , subject_id : str , object_id : str , predicate : str ,
68+ primary_knowledge_source : str , aggregator_knowledge_sources : list ) -> bool :
69+ """
70+ Returns True if the edge should be skipped.
71+ Subclasses override this to apply filtering.
72+ """
73+ return False
74+
8875 def parse_data (self ) -> dict :
8976 """
90- Parses the data file for graph nodes/edges
77+ Parses the data file for graph nodes/edges.
9178
9279 :return: ret_val: load_metadata
9380 """
9481 record_counter = 0
9582 skipped_bad_record_counter = 0
96- skipped_ignore_knowledge_source = 0
97- skipped_undesired_predicate = 0
83+ skipped_filtered_counter = 0
84+
9885 full_tar_path = os .path .join (self .data_path , self .monarch_graph_archive )
9986 protected_edge_labels = [SUBJECT_ID , OBJECT_ID , PREDICATE , PRIMARY_KNOWLEDGE_SOURCE ,
100- AGGREGATOR_KNOWLEDGE_SOURCES , KNOWLEDGE_LEVEL , AGENT_TYPE ,
101- PUBLICATIONS , "biolink:primary_knowledge_source" , "biolink:aggregator_knowledge_source" ]
87+ AGGREGATOR_KNOWLEDGE_SOURCES , KNOWLEDGE_LEVEL , AGENT_TYPE ,
88+ PUBLICATIONS , 'biolink:primary_knowledge_source' ,
89+ 'biolink:aggregator_knowledge_source' ]
10290
10391 with tarfile .open (full_tar_path , 'r' ) as tar_files :
10492 with tar_files .extractfile (self .monarch_edge_file_archive_path ) as edges_file :
10593 for line in edges_file :
10694 monarch_edge = orjson .loads (line )
107- # normally we wouldn't use constants to read FROM a source,
108- # but in this case monarch kg is biolink compliant, so they should be the same
10995 subject_id = monarch_edge [SUBJECT_ID ]
11096 object_id = monarch_edge [OBJECT_ID ]
11197 predicate = monarch_edge [PREDICATE ]
11298 if not (subject_id and object_id and predicate ):
11399 skipped_bad_record_counter += 1
114100 continue
115101
116- if predicate not in self .desired_predicates :
117- skipped_undesired_predicate += 1
118- continue
119-
120- # get the knowledge sources, map them to something else if needed,
121- # then check if edge should be ignored due to the knowledge source
122- primary_knowledge_source = self .knowledge_source_mapping .get (monarch_edge [PRIMARY_KNOWLEDGE_SOURCE ],
123- monarch_edge [PRIMARY_KNOWLEDGE_SOURCE ])
102+ primary_knowledge_source = self .knowledge_source_mapping .get (
103+ monarch_edge [PRIMARY_KNOWLEDGE_SOURCE ],
104+ monarch_edge [PRIMARY_KNOWLEDGE_SOURCE ]
105+ )
124106 if monarch_edge .get (AGGREGATOR_KNOWLEDGE_SOURCES , False ):
125- aggregator_knowledge_sources = [self .knowledge_source_mapping .get (ks , ks )
126- for ks in monarch_edge [AGGREGATOR_KNOWLEDGE_SOURCES ]]
107+ aggregator_knowledge_sources = [
108+ self .knowledge_source_mapping .get (ks , ks )
109+ for ks in monarch_edge [AGGREGATOR_KNOWLEDGE_SOURCES ]
110+ ]
127111 else :
128112 aggregator_knowledge_sources = []
129- if primary_knowledge_source in self .knowledge_source_ignore_list or \
130- any ([ks in self .knowledge_source_ignore_list for ks in aggregator_knowledge_sources ]):
131- skipped_ignore_knowledge_source += 1
113+
114+ if self .filter_edge (subject_id , object_id , predicate ,
115+ primary_knowledge_source , aggregator_knowledge_sources ):
116+ skipped_filtered_counter += 1
132117 continue
133118
134119 edge_properties = {
@@ -142,7 +127,7 @@ def parse_data(self) -> dict:
142127 for edge_attribute in monarch_edge :
143128 if edge_attribute not in protected_edge_labels \
144129 and monarch_edge [edge_attribute ] \
145- and edge_attribute != " qualifiers" :
130+ and edge_attribute != ' qualifiers' :
146131 edge_properties [edge_attribute ] = monarch_edge [edge_attribute ]
147132
148133 output_edge = kgxedge (
@@ -157,10 +142,83 @@ def parse_data(self) -> dict:
157142 self .output_file_writer .write_node (subject_id )
158143 self .output_file_writer .write_kgx_edge (output_edge )
159144 record_counter += 1
145+
160146 load_metadata : dict = {
161147 'num_source_lines' : record_counter ,
162148 'unusable_source_lines' : skipped_bad_record_counter ,
163- 'lines_skipped_due_to_undesired_predicate' : skipped_undesired_predicate ,
164- 'lines_skipped_due_to_knowledge_source_ignore_list' : skipped_ignore_knowledge_source
149+ 'lines_skipped_due_to_filtering' : skipped_filtered_counter ,
165150 }
166151 return load_metadata
152+
153+
154+ ##############
155+ # Class: Monarch KG loader — ROBOKOP curated subset
156+ #
157+ # Desc: Filters to the predicates and knowledge sources used in the ROBOKOP graph.
158+ # Sources already ingested separately (CTD, Reactome, GOA, etc.) are excluded
159+ # to avoid duplication in the merged graph.
160+ ##############
161+ class MonarchKGLoader (MonarchKGBaseLoader ):
162+
163+ source_id : str = 'MonarchKG'
164+
165+ def __init__ (self , test_mode : bool = False , source_data_dir : str = None ):
166+ super ().__init__ (test_mode = test_mode , source_data_dir = source_data_dir )
167+
168+ self .desired_predicates = {
169+ 'biolink:causes' ,
170+ 'biolink:contributes_to' ,
171+ 'biolink:has_phenotype' ,
172+ 'biolink:expressed_in'
173+ }
174+
175+ self .knowledge_source_ignore_list = {
176+ 'infores:ctd' ,
177+ 'infores:reactome' ,
178+ 'infores:goa' ,
179+ 'infores:cafa' ,
180+ 'infores:bhf-ucl' ,
181+ 'infores:aruk-ucl' ,
182+ 'infores:parkinsonsuk-ucl' ,
183+ 'infores:alzheimers-university-of-toronto' ,
184+ 'infores:agbase' ,
185+ 'infores:dictybase' ,
186+ 'infores:ntnu-sb' ,
187+ 'infores:wb'
188+ }
189+
190+ # Curie prefixes known not to normalize — edges where subject or object
191+ # starts with any of these are discarded.
192+ self .non_normalizable_curie_prefixes = {
193+ 'ZP' , 'phenopacket.store' , 'WB' , 'CLINVAR' , 'FYPO' ,
194+ 'PomBase' , 'MMRRC' , 'WBPhenotype' , 'CAID' , 'XPO' , 'CUREID'
195+ }
196+
197+ def filter_edge (self , subject_id : str , object_id : str , predicate : str ,
198+ primary_knowledge_source : str , aggregator_knowledge_sources : list ) -> bool :
199+ if predicate not in self .desired_predicates :
200+ return True
201+ if primary_knowledge_source in self .knowledge_source_ignore_list or \
202+ any (ks in self .knowledge_source_ignore_list for ks in aggregator_knowledge_sources ):
203+ return True
204+ for curie in (subject_id , object_id ):
205+ prefix = curie .split (':' )[0 ]
206+ if prefix in self .non_normalizable_curie_prefixes :
207+ return True
208+ return False
209+
210+
211+ ##############
212+ # Class: Monarch KG Full loader — complete Monarch KG
213+ #
214+ # Desc: Loads the entire Monarch KG without predicate or knowledge-source filtering.
215+ # Intended for producing a standalone Monarch KG graph (e.g. for Automat),
216+ # rather than the curated ROBOKOP subset.
217+ ##############
218+ class MonarchKGFullLoader (MonarchKGBaseLoader ):
219+
220+ source_id : str = 'MonarchKGFull'
221+
222+ def __init__ (self , test_mode : bool = False , source_data_dir : str = None ):
223+ super ().__init__ (test_mode = test_mode , source_data_dir = source_data_dir )
224+ # No filtering — filter_edge inherits the base class no-op
0 commit comments