Skip to content

Commit 5dbae8f

Browse files
committed
Improve Antfly VectorDBBench client
1 parent 2d5437b commit 5dbae8f

2 files changed

Lines changed: 55 additions & 17 deletions

File tree

vectordb_bench/backend/clients/antfly/antfly.py

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,19 @@ def _httpx_host(host: str) -> str:
2929
return "127.0.0.1" if host == "localhost" else host
3030

3131

32+
def _pack_dense_f32(values: list[float]) -> str:
33+
return base64.b64encode(struct.pack(f"<{len(values)}f", *values)).decode("ascii")
34+
35+
36+
def _make_client(base_url: str, timeout: float) -> httpx.Client:
37+
return httpx.Client(
38+
base_url=base_url,
39+
timeout=timeout,
40+
headers={"Connection": "close"},
41+
limits=httpx.Limits(max_keepalive_connections=0, max_connections=1),
42+
)
43+
44+
3245
class Antfly(VectorDB):
3346
def __init__(
3447
self,
@@ -44,7 +57,9 @@ def __init__(
4457
self.collection_name = collection_name
4558
self.dim = dim
4659

47-
self._metadata_base_url = f"http://{_httpx_host(db_config['host'])}:{db_config['port']}/api/v1"
60+
self._metadata_base_url = (
61+
f"http://{_httpx_host(db_config['host'])}:{db_config['port']}/api/v1"
62+
)
4863
self._store_host = _httpx_host(db_config.get("store_host") or db_config["host"])
4964
self._store_port = db_config.get("store_port")
5065
self._use_direct_store_search = bool(db_config.get("use_direct_store_search"))
@@ -53,17 +68,21 @@ def __init__(
5368
num_shards = db_config.get("num_shards", 1)
5469

5570
if self._use_direct_store_search and not self._store_port:
56-
raise ValueError("Antfly direct store search requires store_port to be configured")
71+
raise ValueError(
72+
"Antfly direct store search requires store_port to be configured"
73+
)
5774

58-
client = httpx.Client(base_url=self._metadata_base_url, timeout=60)
75+
client = _make_client(self._metadata_base_url, 60)
5976
try:
6077
if drop_old:
6178
r = client.delete(f"/tables/{self.collection_name}")
6279
log.info(f"Drop table response: {r.status_code}")
6380

6481
table = self._get_table_status_or_none(client)
6582
if table is None:
66-
r = client.post(f"/tables/{self.collection_name}", json={"num_shards": num_shards})
83+
r = client.post(
84+
f"/tables/{self.collection_name}", json={"num_shards": num_shards}
85+
)
6786
log.info(f"Create table response: {r.status_code}")
6887
r.raise_for_status()
6988
else:
@@ -111,7 +130,10 @@ def _wait_for_shard_ready(self, client: httpx.Client):
111130
try:
112131
r = client.post(
113132
f"/tables/{self.collection_name}/batch",
114-
json={"inserts": {"_healthcheck": {"_probe": True}}, "sync_level": "write"},
133+
json={
134+
"inserts": {"_healthcheck": {"_probe": True}},
135+
"sync_level": "write",
136+
},
115137
)
116138
if r.status_code < 500:
117139
client.post(
@@ -123,7 +145,9 @@ def _wait_for_shard_ready(self, client: httpx.Client):
123145
except Exception as exc:
124146
log.debug("Shard readiness probe failed", exc_info=exc)
125147
time.sleep(TABLE_READY_POLL_INTERVAL)
126-
log.warning(f"Shard readiness timeout after {TABLE_READY_TIMEOUT}s, proceeding anyway")
148+
log.warning(
149+
f"Shard readiness timeout after {TABLE_READY_TIMEOUT}s, proceeding anyway"
150+
)
127151

128152
def _get_index_status(self, client: httpx.Client) -> dict | None:
129153
r = client.get(f"/tables/{self.collection_name}/indexes/{INDEX_NAME}")
@@ -174,7 +198,9 @@ def _index_status_is_ready(
174198
return False
175199
return expected_total is None or total_indexed >= expected_total
176200

177-
def _wait_for_index_ready(self, client: httpx.Client, expected_total: int | None = None):
201+
def _wait_for_index_ready(
202+
self, client: httpx.Client, expected_total: int | None = None
203+
):
178204
deadline = time.monotonic() + INDEX_READY_TIMEOUT
179205
last_status = None
180206

@@ -199,11 +225,11 @@ def _wait_for_index_ready(self, client: httpx.Client, expected_total: int | None
199225

200226
@contextmanager
201227
def init(self):
202-
self.client = httpx.Client(base_url=self._metadata_base_url, timeout=120)
228+
self.client = _make_client(self._metadata_base_url, 120)
203229
self.store_client = None
204230
try:
205231
if self._use_direct_store_search:
206-
self.store_client = httpx.Client(base_url=self._store_base_url, timeout=120)
232+
self.store_client = _make_client(self._store_base_url, 120)
207233
yield
208234
finally:
209235
self.client.close()
@@ -215,7 +241,9 @@ def init(self):
215241
@property
216242
def _store_base_url(self) -> str:
217243
if self._store_port is None:
218-
raise ValueError("Antfly store_base_url requested without store_port configured")
244+
raise ValueError(
245+
"Antfly store_base_url requested without store_port configured"
246+
)
219247
return f"http://{self._store_host}:{self._store_port}"
220248

221249
def need_normalize_cosine(self) -> bool:
@@ -295,16 +323,20 @@ def _parse_store_hits(self, data: dict) -> list[int]:
295323
def ready_to_search(self) -> bool:
296324
if getattr(self, "client", None) is not None:
297325
payload = self._get_index_status(self.client)
298-
return self._index_status_is_ready(payload, payload.get("status") if payload else None)
299-
with httpx.Client(base_url=self._metadata_base_url, timeout=120) as client:
326+
return self._index_status_is_ready(
327+
payload, payload.get("status") if payload else None
328+
)
329+
with _make_client(self._metadata_base_url, 120) as client:
300330
payload = self._get_index_status(client)
301-
return self._index_status_is_ready(payload, payload.get("status") if payload else None)
331+
return self._index_status_is_ready(
332+
payload, payload.get("status") if payload else None
333+
)
302334

303335
def optimize(self, data_size: int | None = None):
304336
if getattr(self, "client", None) is not None:
305337
self._wait_for_index_ready(self.client, expected_total=data_size)
306338
return
307-
with httpx.Client(base_url=self._metadata_base_url, timeout=120) as client:
339+
with _make_client(self._metadata_base_url, 120) as client:
308340
self._wait_for_index_ready(client, expected_total=data_size)
309341

310342
def insert_embeddings(
@@ -332,7 +364,9 @@ def insert_embeddings(
332364
"_embeddings": {"vec": serialized_embedding},
333365
}
334366
payload = {"inserts": inserts, "sync_level": "write"}
335-
r = self.client.post(f"/tables/{self.collection_name}/batch", json=payload)
367+
r = self.client.post(
368+
f"/tables/{self.collection_name}/batch", json=payload
369+
)
336370
r.raise_for_status()
337371
except Exception as e:
338372
log.warning(f"Antfly insert error: {e}")

vectordb_bench/backend/clients/antfly/cli.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,13 @@ class AntflyTypedDict(TypedDict):
4747
),
4848
]
4949
search_effort: Annotated[
50-
float,
50+
float | None,
5151
click.option(
52-
"--search-effort", type=float, help="Search effort 0.0-1.0 (higher=better recall, slower)", default=None
52+
"--search-effort",
53+
type=float,
54+
help="Search effort 0.0-1.0 (higher=better recall, slower)",
55+
default=None,
56+
show_default=True,
5357
),
5458
]
5559

0 commit comments

Comments
 (0)