Skip to content

Commit 15bf193

Browse files
committed
Simplify code
1 parent 68907dc commit 15bf193

3 files changed

Lines changed: 59 additions & 94 deletions

File tree

.github/workflows/continuous-benchmark-transfer.yaml

Lines changed: 26 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -55,22 +55,30 @@ jobs:
5555
- name: Generate machine matrix
5656
id: generate
5757
run: |
58-
NODES=${{ env.CLUSTER_NODES }}
5958
RUN_ID=${{ github.run_id }}
6059
61-
MACHINES='['
62-
NODE_NAMES=''
63-
for i in $(seq 0 $((NODES - 1))); do
64-
[ $i -gt 0 ] && MACHINES+=','
65-
MACHINES+="{\"name\":\"node-${i}\",\"suffix\":\"node-${i}\",\"type\":\"SERVER_TYPE\"}"
66-
[ -n "$NODE_NAMES" ] && NODE_NAMES+=','
67-
NODE_NAMES+="transfer-bench-node-${i}-${RUN_ID}"
68-
done
69-
MACHINES+=",{\"name\":\"client\",\"suffix\":\"client\",\"type\":\"CLIENT_TYPE\"}]"
70-
71-
echo "matrix={\"machine\":${MACHINES}}" >> $GITHUB_OUTPUT
72-
echo "node_names=${NODE_NAMES}" >> $GITHUB_OUTPUT
73-
echo "client_name=transfer-bench-client-${RUN_ID}" >> $GITHUB_OUTPUT
60+
python >> $GITHUB_OUTPUT << "EOF"
61+
import json
62+
import os
63+
64+
NODES = int(os.environ["CLUSTER_NODES"])
65+
RUN_ID = ${{ github.run_id }}
66+
67+
machines = []
68+
node_names = []
69+
for i in range(NODES):
70+
machines.append({
71+
"name": f"node-{i}", "suffix": f"node-{i}", "type": "SERVER_TYPE"
72+
})
73+
node_names.append(f"transfer-bench-node-{i}-{RUN_ID}")
74+
machines.append({
75+
"name": "client", "suffix": "client", "type": "CLIENT_TYPE"
76+
})
77+
78+
print(f"matrix={json.dumps({'machine': machines})}")
79+
print(f'node_names={" ".join(node_names)}')
80+
print(f"client_name=transfer-bench-client-{RUN_ID}")
81+
EOF
7482
7583
setupCluster:
7684
name: Setup ${{ matrix.machine.name }}
@@ -159,41 +167,8 @@ jobs:
159167
env:
160168
HCLOUD_TOKEN: ${{ secrets.HCLOUD_TOKEN }}
161169
- name: Delete machines
162-
run: |
163-
NODE_NAMES="${{ needs.generateMatrix.outputs.node_names }}"
164-
CLIENT_NAME="${{ needs.generateMatrix.outputs.client_name }}"
165-
166-
ALL_MACHINES=()
167-
IFS=',' read -ra NODES <<< "$NODE_NAMES"
168-
for NODE in "${NODES[@]}"; do
169-
[ -n "$NODE" ] && ALL_MACHINES+=("$NODE")
170-
done
171-
[ -n "$CLIENT_NAME" ] && ALL_MACHINES+=("$CLIENT_NAME")
172-
173-
echo "Deleting ${#ALL_MACHINES[@]} machines in parallel..."
174-
175-
PIDS=()
176-
for MACHINE in "${ALL_MACHINES[@]}"; do
177-
(
178-
echo "Deleting $MACHINE..."
179-
bash tools/hetzner/remove_server.sh "$MACHINE"
180-
) &
181-
PIDS+=($!)
182-
done
183-
184-
FAILED=0
185-
for i in "${!PIDS[@]}"; do
186-
if wait ${PIDS[$i]}; then
187-
echo "Deleted ${ALL_MACHINES[$i]}"
188-
else
189-
echo "Failed to delete ${ALL_MACHINES[$i]}"
190-
FAILED=$((FAILED + 1))
191-
fi
192-
done
193-
194-
if [ $FAILED -gt 0 ]; then
195-
echo "Warning: $FAILED deletion(s) failed"
196-
exit 1
197-
fi
198170
env:
199-
HCLOUD_TOKEN: ${{ secrets.HCLOUD_TOKEN }}
171+
NODE_NAMES: ${{ needs.generateMatrix.outputs.node_names }}
172+
CLIENT_NAME: ${{ needs.generateMatrix.outputs.client_name }}
173+
HCLOUD_TOKEN: ${{ secrets.HCLOUD_TOKEN }}
174+
run: hcloud server delete $NODE_NAMES $CLIENT_NAME

ansible/README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
Add inventory.ini in [ansible/playbooks/](playbooks) with the following content:
99
```ini
1010
[remote_machines]
11-
;note that machine's name should be benchmark-machine
12-
node-0 ansible_host=${YOUR_SERVER_IP} ansible_user=${YOUR_USER}
11+
;note that machine's name should be node-*
12+
node-0 ansible_host=${YOUR_SERVER_IP} private_ip=${YOUR_SERVER_PRIVATE_IP} ansible_user=${YOUR_USER}
13+
;for multi-node benchmarks (playbook-transfer-speed.yml), add more machines here, i.e.:
14+
;node-1 ansible_host=x.x.x.x private_ip=x.x.x.x ansible_user=root
1315

1416
;optional, only required for some benchmarks, i.e for playbook-transfer-speed.yml
1517
[client_machines]

ansible/playbooks/roles/run-transfer-speed/files/shard_transfer.py

Lines changed: 29 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
1+
"""Benchmark shard transfer speed between Qdrant cluster nodes.
2+
3+
Uploads a dataset to a Qdrant cluster, then repeatedly replicates a shard
4+
between two peers to measure transfer throughput (points/s and MB/s).
5+
"""
6+
17
import json
28
import os
39
import statistics
410
import sys
511
import time
12+
from collections import Counter
613
from pathlib import Path
714

815
import numpy as np
@@ -28,12 +35,14 @@ def __init__(self, uris: list[str]):
2835
}
2936
self.primary = self.clients[uris[0]]
3037

31-
def cluster_info(self, client=None):
32-
return (
38+
def cluster_info(self, client=None) -> models.CollectionClusterInfo:
39+
result = (
3340
(client or self.primary)
3441
.http.distributed_api.collection_cluster_info(COLLECTION)
35-
.dict()["result"]
42+
.result
3643
)
44+
assert result
45+
return result
3746

3847
def setup(self, dims: int):
3948
try:
@@ -81,46 +90,25 @@ def wait_green(self, timeout=1800):
8190
return
8291
raise TimeoutError(f"Collection not green after {timeout}s")
8392

84-
def storage_types(self, uri: str) -> dict:
93+
def storage_types(self, uri: str) -> Counter:
8594
try:
86-
r = requests.get(f"{uri}/telemetry?details_level=6", timeout=10)
87-
if not r.ok:
88-
print(f" Telemetry request failed: {r.status_code}")
89-
return {}
90-
data = r.json()
91-
collections = (
92-
data.get("result", {}).get("collections", {}).get("collections", [])
93-
)
94-
if not collections:
95-
print(
96-
f" No collections in telemetry, keys: {list(data.get('result', {}).keys())}"
97-
)
98-
return {}
99-
for coll in collections:
100-
if coll.get("id") == COLLECTION:
101-
types = {}
102-
for shard in coll.get("shards", []):
103-
local = shard.get("local")
104-
if not local:
105-
continue
106-
for seg in local.get("segments", []):
107-
for vec in (
108-
seg.get("config", {}).get("vector_data", {}).values()
109-
):
110-
st = vec.get("storage_type", "unknown")
111-
types[st] = types.get(st, 0) + 1
112-
return types
113-
print(
114-
f" Collection '{COLLECTION}' not found, available: {[c.get('id') for c in collections]}"
95+
data = requests.get(f"{uri}/telemetry?details_level=6", timeout=10).json()
96+
return Counter(
97+
vec["storage_type"]
98+
for coll in data["result"]["collections"]["collections"]
99+
if coll.get("id") == COLLECTION
100+
for shard in coll["shards"]
101+
for seg in (shard["local"] or {}).get("segments", [])
102+
for vec in seg["config"]["vector_data"].values()
115103
)
116104
except Exception as e:
117105
print(f" Telemetry error: {e}")
118-
return {}
106+
return Counter()
119107

120108
def wait_mmap(self, uri: str, timeout=180):
121109
print("Waiting for Mmap segments...")
122110
start = time.time()
123-
types = {}
111+
types = Counter()
124112
while time.time() - start < timeout:
125113
types = self.storage_types(uri)
126114
mmap = types.get("Mmap", 0)
@@ -134,7 +122,7 @@ def wait_mmap(self, uri: str, timeout=180):
134122
def wait_transfer(self, timeout=600):
135123
start = time.time()
136124
while time.time() - start < timeout:
137-
if not self.cluster_info().get("shard_transfers"):
125+
if not self.cluster_info().shard_transfers:
138126
return
139127
time.sleep(0.5)
140128
raise TimeoutError("Transfer timeout")
@@ -165,15 +153,15 @@ def run(self, vectors: np.ndarray, runs: int) -> dict:
165153
self.wait_mmap(list(self.clients.keys())[0])
166154

167155
info = self.cluster_info()
168-
from_peer = info["peer_id"]
169-
shard_id = info["local_shards"][0]["shard_id"]
156+
from_peer = info.peer_id
157+
shard_id = info.local_shards[0].shard_id
170158

171159
to_peer = None
172160
for client in self.clients.values():
173161
node = self.cluster_info(client)
174-
if node["peer_id"] != from_peer:
175-
to_peer = node["peer_id"]
176-
if shard_id in {s["shard_id"] for s in node.get("local_shards", [])}:
162+
if node.peer_id != from_peer:
163+
to_peer = node.peer_id
164+
if shard_id in {s.shard_id for s in node.local_shards}:
177165
self.drop_replica(shard_id, to_peer)
178166
time.sleep(2)
179167
break

0 commit comments

Comments
 (0)