-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgraph_builder.py
More file actions
46 lines (40 loc) · 1.6 KB
/
graph_builder.py
File metadata and controls
46 lines (40 loc) · 1.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import networkx as nx
from typing import List, Dict, Callable
def build_graph(edge_records, node_info):
G = nx.DiGraph()
for nid, meta in node_info.items():
G.add_node(nid, **meta)
for rec in edge_records:
src = rec['source']
dst = rec['target']
attrs = {k:v for k,v in rec.items() if k not in ('source','target')}
G.add_edge(src, dst, **attrs)
return G
def make_cost_fn(selected_cost_columns: List[str]):
if not selected_cost_columns:
# fallback default cost columns if present
def cost(edge_data):
return float(edge_data.get('TransitionCost', 0)) + float(edge_data.get('WaitingTime', 0))
return cost
def cost(edge_data):
total = 0.0
for c in selected_cost_columns:
total += float(edge_data.get(c, 0) or 0)
return total
return cost
def make_heuristic_fn(G, selected_column: str):
if not selected_column:
return lambda n: 0
# Node-based heuristic: use average of outgoing edge attribute if not present as node attribute
def h(n):
node_attr = G.nodes[n]
if selected_column in node_attr and isinstance(node_attr[selected_column], (int, float)):
return float(node_attr[selected_column])
# fallback: average of edges
edges = list(G.out_edges(n, data=True))
if not edges:
return 0.0
vals = [e[2].get(selected_column, 0) for e in edges]
nums = [float(v) for v in vals if isinstance(v, (int, float)) or str(v).replace('.','',1).isdigit()]
return sum(nums)/len(nums) if nums else 0.0
return h