-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtopo.py
More file actions
157 lines (136 loc) · 6.46 KB
/
Copy pathtopo.py
File metadata and controls
157 lines (136 loc) · 6.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from qns.network.topology import Topology
from qns.utils.rnd import get_rand
from typing import List, Tuple
from qns.entity.node.node import QNode
from qns.entity.qchannel import QuantumChannel
from qns.utils.rnd import get_randint
import networkx as nx
import itertools
import numpy as np
import json
import random
class Node(QNode):
"""量子网络节点类,继承自QNode
Attributes:
id: 节点唯一标识符
pos_x: 节点的x坐标位置
pos_y: 节点的y坐标位置
capacity: 节点容量(纠缠资源数量)
"""
def __init__(self, name: str, id: int, pos_x: float = 0.0, pos_y: float = 0.0, swapping_pron: float = 0.5, capacity: int = 50):
super().__init__(name=name)
self.id = id
# 如果未指定坐标,则随机生成0-100之间的位置
self.pos_x = np.random.rand()*100 if pos_x == 0.0 else pos_x
self.pos_y = np.random.rand()*100 if pos_y == 0.0 else pos_y
self.capacity = capacity
class Channel(QuantumChannel):
"""量子信道类,用于连接两个量子节点
Attributes:
src: 信道的源节点
dest: 信道的目标节点
capacity: 信道容量(可生成的纠缠对数量)
init_fidelity: 初始保真度
pur_table: 提纯表,记录不同提纯次数后的保真度
distance: 信道距离
sigma: 标准差参数
entanglemnt_pool: 存储该信道上的纠缠对列表
"""
def __init__(self, name: str, src: Node, dest: Node, capacity: int = 50, init_fidelity: float = 0.65, sigma: float = 0.05, entanglement_pool: List[float] = [], distance: float = 0.0):
super().__init__()
self.src = src
self.dest = dest
self.name = name
self.capacity = capacity
self.init_fidelity = init_fidelity
self.pur_table = [] # 提纯表:[(保真度, 保真度提升), ...]
self.distance = distance
self.sigma = sigma
self.entanglemnt_pool = entanglement_pool # 存储该信道上的纠缠对
def get_distance(node1: Node, node2: Node) -> float:
"""Calculate Euclidean distance between two nodes."""
return np.sqrt((node1.pos_x - node2.pos_x) ** 2 + (node1.pos_y - node2.pos_y) ** 2)
class WaxManTopology(Topology):
"""基于Waxman模型的量子网络拓扑生成器
Waxman模型是一种经典的网络拓扑生成算法,能够生成具有小世界特性的随机图。
通过控制alpha和beta参数,可以调节网络的连通性和边长分布。
Attributes:
num_nodes: 网络中的节点数量
alpha: Waxman模型参数,控制边的生成概率
beta: Waxman模型参数,控制距离对边生成概率的影响
capacity: 每条链路的默认容量
"""
def __init__(self, num_nodes: int, alpha: float = 0.1, beta: float = 0.4, capacity: int = 1):
"""
初始化Waxman拓扑生成器
Args:
num_nodes (int): 拓扑中的节点数量
alpha (float): Waxman模型alpha参数,影响边的密度
beta (float): Waxman模型beta参数,影响距离衰减因子
capacity (int): 链路容量
"""
super().__init__(num_nodes)
self.num_nodes = num_nodes
self.alpha = alpha
self.beta = beta
self.capacity = capacity
def build(self) -> Tuple[List[Node], List[Channel]]:
"""构建Waxman随机拓扑结构
该方法执行以下步骤:
1. 在二维平面上随机生成节点位置
2. 计算所有节点对之间的距离
3. 根据Waxman模型的概率公式决定是否在节点间建立连接
4. 确保生成的拓扑是连通的,如果不连通则重新生成
Returns:
Tuple[List[Node], List[Channel]]: 返回节点列表和量子信道列表
"""
num = 0
while True:
num += 1
nl = [] # 节点列表
ll = [] # 信道列表
g = nx.Graph() # 用于检查连通性的图
# 创建节点:在100x100的平面内随机分布
for i in range(self.num_nodes):
pos_x = get_randint(0, 100)
pos_y = get_randint(0, 100)
node = Node(name=f"Node_{i}", id=i, pos_x=pos_x, pos_y=pos_y, capacity=0)
nl.append(node)
g.add_node(i, pos=(pos_x, pos_y))
L = 0 # 最大距离
dl = {} # 距离字典:{(node1, node2): distance}
cb = itertools.combinations(nl, 2) # 获取所有节点对组合
# 计算所有节点对之间的欧几里得距离
for (ni, nj) in cb:
d = get_distance(ni, nj)
dl[(ni, nj)] = d
L = max(L, d) # 更新最大距离
# 根据Waxman模型建立连接
for (ni, nj), d in dl.items():
# Waxman模型的概率公式:p = alpha * exp(-d / (beta * L))
# 这里简化为固定概率0.25
# p = self.alpha * np.exp(-d / (self.beta * L))
p = 0.25
if get_rand() < p: # 以概率p建立连接
# 随机生成初始保真度(0.75-0.85之间)
init_fidelity = np.random.uniform(0.75, 0.85)
capacity = int(self.capacity)
# 更新两端节点的容量
ni.capacity += capacity
nj.capacity += capacity
# 创建量子信道
channel = Channel(name=f"Channel_{ni.id}_{nj.id}", src=ni, dest=nj, capacity=capacity, init_fidelity=init_fidelity)
ll.append(channel)
# 将信道添加到两端节点
ni.add_qchannel(channel)
nj.add_qchannel(channel)
# 在图中添加边
g.add_edge(ni.id, nj.id, weight=d)
# 检查拓扑是否连通,如果连通则退出循环
if nx.is_connected(g):
break
# 如果尝试超过50次仍未生成连通拓扑,则放弃
if num >= 50:
print(f"尝试生成连通拓扑失败超过{num}次,当前节点数为{self.num_nodes},请调整参数后重试。")
break
return nl, ll