@@ -828,20 +828,28 @@ def process_instance(instance):
828828 MERGE (c)-[:APPEARS]->(s)
829829 """
830830
831- try :
832- client ._execute_cypher (query , params = instance )
833- except Exception as e :
834- # AGE creates label tables on first use - parallel threads may race
835- if "already exists" in str (e ):
836- # Extract label name from error message
837- import re
838- match = re .search (r'relation "(\w+)" already exists' , str (e ))
839- if match :
840- Console .info (f" Initializing AGE label: { match .group (1 )} " )
841- # Retry - label table now exists
831+ import time
832+ max_retries = 5
833+ for attempt in range (max_retries + 1 ):
834+ try :
842835 client ._execute_cypher (query , params = instance )
843- else :
844- raise
836+ break
837+ except Exception as e :
838+ error_str = str (e )
839+ if "already exists" in error_str and attempt < max_retries :
840+ # AGE creates label tables on first use - parallel threads may race
841+ import re
842+ match = re .search (r'relation "(\w+)" already exists' , error_str )
843+ if match :
844+ Console .info (f" Initializing AGE label: { match .group (1 )} " )
845+ time .sleep (0.1 * (attempt + 1 ))
846+ continue
847+ elif "Entity failed to be updated" in error_str and attempt < max_retries :
848+ # AGE MVCC conflict from concurrent MERGE on same node
849+ time .sleep (0.1 * (attempt + 1 ))
850+ continue
851+ else :
852+ raise
845853
846854 # Thread-safe progress tracking
847855 with progress_lock :
@@ -880,48 +888,64 @@ def process_relationship(rel):
880888 """Process single relationship"""
881889 # Dynamic relationship type (IMPLIES, SUPPORTS, etc.)
882890 # Use OPTIONAL MATCH to handle missing nodes gracefully
883- query = f"""
884- OPTIONAL MATCH (c1:Concept {{concept_id: $from_id}})
885- OPTIONAL MATCH (c2:Concept {{concept_id: $to_id}})
886- WITH c1, c2
887- WHERE c1 IS NOT NULL AND c2 IS NOT NULL
888- MERGE (c1)-[r: { rel [ 'type' ] } ]->(c2)
889- SET r = $properties
890- RETURN count(r) as created
891- """
892-
893- try :
894- result = client . _execute_cypher ( query , params = {
891+ props = rel . get ( "properties" ) or {}
892+ if props :
893+ query = f"""
894+ OPTIONAL MATCH (c1:Concept {{concept_id: $from_id}})
895+ OPTIONAL MATCH (c2:Concept {{concept_id: $to_id}})
896+ WITH c1, c2
897+ WHERE c1 IS NOT NULL AND c2 IS NOT NULL
898+ MERGE (c1)-[r: { rel [ 'type' ] } ]->(c2)
899+ SET r = $properties
900+ RETURN count(r) as created
901+ """
902+ params = {
895903 "from_id" : rel ["from" ],
896904 "to_id" : rel ["to" ],
897- "properties" : rel ["properties" ]
898- }, fetch_one = True )
899- except Exception as e :
900- # AGE concurrency issues with parallel processing
901- error_str = str (e )
902-
903- if "already exists" in error_str :
904- # AGE creates edge type tables on first use - parallel threads may race
905- import re
906- match = re .search (r'relation "(\w+)" already exists' , error_str )
907- if match :
908- Console .info (f" Initializing edge type: { match .group (1 )} " )
909- # Retry - edge type table now exists
910- result = client ._execute_cypher (query , params = {
911- "from_id" : rel ["from" ],
912- "to_id" : rel ["to" ],
913- "properties" : rel ["properties" ]
914- }, fetch_one = True )
915- elif "Entity failed to be updated" in error_str :
916- # AGE concurrency: Multiple threads updating same relationship
917- # Retry once - conflict should be resolved
918- result = client ._execute_cypher (query , params = {
919- "from_id" : rel ["from" ],
920- "to_id" : rel ["to" ],
921- "properties" : rel ["properties" ]
922- }, fetch_one = True )
923- else :
924- raise
905+ "properties" : props
906+ }
907+ else :
908+ # Skip SET for empty properties — AGE rejects SET r = {} with
909+ # "SET clause expects a map"
910+ query = f"""
911+ OPTIONAL MATCH (c1:Concept {{concept_id: $from_id}})
912+ OPTIONAL MATCH (c2:Concept {{concept_id: $to_id}})
913+ WITH c1, c2
914+ WHERE c1 IS NOT NULL AND c2 IS NOT NULL
915+ MERGE (c1)-[r:{ rel ['type' ]} ]->(c2)
916+ RETURN count(r) as created
917+ """
918+ params = {
919+ "from_id" : rel ["from" ],
920+ "to_id" : rel ["to" ],
921+ }
922+
923+ import time
924+ max_retries = 5
925+ for attempt in range (max_retries + 1 ):
926+ try :
927+ result = client ._execute_cypher (query , params = params , fetch_one = True )
928+ break
929+ except Exception as e :
930+ error_str = str (e )
931+
932+ if "already exists" in error_str and attempt < max_retries :
933+ # AGE creates edge type tables on first use - parallel threads may race
934+ import re
935+ match = re .search (r'relation "(\w+)" already exists' , error_str )
936+ if match :
937+ Console .info (f" Initializing edge type: { match .group (1 )} " )
938+ time .sleep (0.1 * (attempt + 1 ))
939+ continue
940+ elif "Entity failed to be updated" in error_str and attempt < max_retries :
941+ # AGE MVCC conflict from concurrent MERGE on same edge
942+ time .sleep (0.1 * (attempt + 1 ))
943+ continue
944+ elif "SET clause expects a map" in error_str and attempt < max_retries :
945+ time .sleep (0.1 * (attempt + 1 ))
946+ continue
947+ else :
948+ raise
925949
926950 created = 0
927951 if result and int (str (result .get ("created" , 0 ))) > 0 :
0 commit comments