Skip to content

Commit 5c7fc13

Browse files
committed
[Feature] Add Alibaba Cloud RDS MySQL client
* Alibaba Cloud RDS MySQL introduced vector support in version 20251031
1 parent 23dd9cf commit 5c7fc13

8 files changed

Lines changed: 484 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ All the database client supported
6060
| oceanbase | `pip install vectordb-bench[oceanbase]` |
6161
| hologres | `pip install vectordb-bench[hologres]` |
6262
| tencent_es | `pip install vectordb-bench[tencent_es]` |
63+
| alisql | `pip install 'vectordb-bench[alisql]'` |
6364

6465
### Run
6566

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ clickhouse = [ "clickhouse-connect" ]
9999
vespa = [ "pyvespa" ]
100100
lancedb = [ "lancedb" ]
101101
oceanbase = [ "mysql-connector-python" ]
102+
alisql = [ "mysql-connector-python" ]
102103

103104
[project.urls]
104105
"repository" = "https://github.com/zilliztech/VectorDBBench"

vectordb_bench/backend/clients/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class DB(Enum):
5252
S3Vectors = "S3Vectors"
5353
Hologres = "Alibaba Cloud Hologres"
5454
TencentElasticsearch = "TencentElasticsearch"
55+
AliSQL = "AlibabaCloudRDSMySQL"
5556

5657
@property
5758
def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912, C901, PLR0915
@@ -206,6 +207,11 @@ def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912, C901, PLR0915
206207

207208
return TencentElasticsearch
208209

210+
if self == DB.AliSQL:
211+
from .alisql.alisql import AliSQL
212+
213+
return AliSQL
214+
209215
msg = f"Unknown DB: {self.name}"
210216
raise ValueError(msg)
211217

@@ -362,6 +368,11 @@ def config_cls(self) -> type[DBConfig]: # noqa: PLR0911, PLR0912, C901, PLR0915
362368

363369
return TencentElasticsearchConfig
364370

371+
if self == DB.AliSQL:
372+
from .alisql.config import AliSQLConfig
373+
374+
return AliSQLConfig
375+
365376
msg = f"Unknown DB: {self.name}"
366377
raise ValueError(msg)
367378

@@ -493,6 +504,11 @@ def case_config_cls( # noqa: C901, PLR0911, PLR0912
493504

494505
return TencentElasticsearchIndexConfig
495506

507+
if self == DB.AliSQL:
508+
from .alisql.alisql import AliSQLIndexConfig
509+
510+
return AliSQLIndexConfig
511+
496512
# DB.Pinecone, DB.Chroma, DB.Redis
497513
return EmptyDBCaseConfig
498514

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
import logging
2+
from contextlib import contextmanager
3+
4+
import mysql.connector as mysql
5+
import numpy as np
6+
7+
from ..api import VectorDB
8+
from .config import AliSQLConfigDict, AliSQLIndexConfig
9+
10+
log = logging.getLogger(__name__)
11+
12+
13+
class AliSQL(VectorDB):
14+
def __init__(
15+
self,
16+
dim: int,
17+
db_config: AliSQLConfigDict,
18+
db_case_config: AliSQLIndexConfig,
19+
collection_name: str = "vec_collection",
20+
drop_old: bool = False,
21+
**kwargs,
22+
):
23+
self.name = "AliSQL"
24+
self.db_config = db_config
25+
self.case_config = db_case_config
26+
self.db_name = "vectordbbench"
27+
self.table_name = collection_name
28+
self.dim = dim
29+
30+
# construct basic units
31+
self.conn, self.cursor = self._create_connection()
32+
33+
if drop_old:
34+
self._drop_db()
35+
self._create_db_table(dim)
36+
37+
self.cursor.close()
38+
self.conn.close()
39+
self.cursor = None
40+
self.conn = None
41+
42+
def _create_connection(self):
43+
conn = mysql.connect(
44+
host=self.db_config["host"],
45+
user=self.db_config["user"],
46+
port=self.db_config["port"],
47+
password=self.db_config["password"],
48+
buffered=True,
49+
)
50+
cursor = conn.cursor()
51+
52+
assert conn is not None, "Connection is not initialized"
53+
assert cursor is not None, "Cursor is not initialized"
54+
55+
return conn, cursor
56+
57+
def _drop_db(self):
58+
assert self.conn is not None, "Connection is not initialized"
59+
assert self.cursor is not None, "Cursor is not initialized"
60+
log.info(f"{self.name} client drop db : {self.db_name}")
61+
62+
# flush tables before dropping database to avoid some locking issue
63+
self.cursor.execute("FLUSH TABLES")
64+
self.cursor.execute(f"DROP DATABASE IF EXISTS {self.db_name}")
65+
self.cursor.execute("COMMIT")
66+
self.cursor.execute("FLUSH TABLES")
67+
68+
def _create_db_table(self, dim: int):
69+
assert self.conn is not None, "Connection is not initialized"
70+
assert self.cursor is not None, "Cursor is not initialized"
71+
72+
index_param = self.case_config.index_param()
73+
74+
try:
75+
log.info(f"{self.name} client create database : {self.db_name}")
76+
self.cursor.execute(f"CREATE DATABASE {self.db_name}")
77+
78+
log.info(f"{self.name} client create table : {self.table_name}")
79+
self.cursor.execute(f"USE {self.db_name}")
80+
81+
self.cursor.execute(
82+
f"""
83+
CREATE TABLE {self.table_name} (
84+
id INT PRIMARY KEY,
85+
v VECTOR({self.dim}) NOT NULL
86+
)
87+
"""
88+
)
89+
self.cursor.execute("COMMIT")
90+
91+
except Exception as e:
92+
log.warning(f"Failed to create table: {self.table_name} error: {e}")
93+
raise e from None
94+
95+
@contextmanager
96+
def init(self):
97+
"""create and destory connections to database.
98+
99+
Examples:
100+
>>> with self.init():
101+
>>> self.insert_embeddings()
102+
"""
103+
self.conn, self.cursor = self._create_connection()
104+
105+
index_param = self.case_config.index_param()
106+
search_param = self.case_config.search_param()
107+
108+
# maximize allowed package size
109+
self.cursor.execute("SET GLOBAL max_allowed_packet = 1073741824")
110+
111+
if index_param["index_type"] == "HNSW":
112+
if index_param["cache_size"] is not None:
113+
self.cursor.execute(f"SET GLOBAL vidx_hnsw_cache_size = {index_param['cache_size']}")
114+
if search_param["ef_search"] is not None:
115+
self.cursor.execute(f"SET GLOBAL vidx_hnsw_ef_search = {search_param['ef_search']}")
116+
self.cursor.execute("COMMIT")
117+
118+
self.insert_sql = f"INSERT INTO {self.db_name}.{self.table_name} (id, v) VALUES (%s, %s)" # noqa: S608
119+
self.select_sql = (
120+
f"SELECT id FROM {self.db_name}.{self.table_name} " # noqa: S608
121+
f"ORDER by vec_distance_{search_param['metric_type']}(v, %s) LIMIT %s"
122+
)
123+
self.select_sql_with_filter = (
124+
f"SELECT id FROM {self.db_name}.{self.table_name} WHERE id >= %s " # noqa: S608
125+
f"ORDER by vec_distance_{search_param['metric_type']}(v, %s) LIMIT %s"
126+
)
127+
128+
try:
129+
yield
130+
finally:
131+
self.cursor.close()
132+
self.conn.close()
133+
self.cursor = None
134+
self.conn = None
135+
136+
def ready_to_load(self) -> bool:
137+
pass
138+
139+
def optimize(self, data_size: int) -> None:
140+
assert self.conn is not None, "Connection is not initialized"
141+
assert self.cursor is not None, "Cursor is not initialized"
142+
143+
index_param = self.case_config.index_param()
144+
145+
try:
146+
index_options = f"DISTANCE={index_param['metric_type']}"
147+
if index_param["index_type"] == "HNSW" and index_param["M"] is not None:
148+
index_options += f" M={index_param['M']}"
149+
150+
self.cursor.execute(
151+
f"""
152+
ALTER TABLE {self.db_name}.{self.table_name}
153+
ADD VECTOR KEY v(v) {index_options}
154+
"""
155+
)
156+
self.cursor.execute("COMMIT")
157+
158+
except Exception as e:
159+
log.warning(f"Failed to create index: {self.table_name} error: {e}")
160+
raise e from None
161+
162+
@staticmethod
163+
def vector_to_hex(v): # noqa: ANN001
164+
return np.array(v, "float32").tobytes()
165+
166+
def insert_embeddings(
167+
self,
168+
embeddings: list[list[float]],
169+
metadata: list[int],
170+
**kwargs,
171+
) -> tuple[int, Exception]:
172+
"""Insert embeddings into the database.
173+
Should call self.init() first.
174+
"""
175+
assert self.conn is not None, "Connection is not initialized"
176+
assert self.cursor is not None, "Cursor is not initialized"
177+
178+
try:
179+
metadata_arr = np.array(metadata)
180+
embeddings_arr = np.array(embeddings)
181+
182+
batch_data = []
183+
for i, row in enumerate(metadata_arr):
184+
batch_data.append((int(row), self.vector_to_hex(embeddings_arr[i])))
185+
186+
self.cursor.executemany(self.insert_sql, batch_data)
187+
self.cursor.execute("COMMIT")
188+
self.cursor.execute("FLUSH TABLES")
189+
190+
return len(metadata), None
191+
except Exception as e:
192+
log.warning(f"Failed to insert data into Vector table ({self.table_name}), error: {e}")
193+
return 0, e
194+
195+
def search_embedding(
196+
self,
197+
query: list[float],
198+
k: int = 100,
199+
filters: dict | None = None,
200+
timeout: int | None = None,
201+
**kwargs,
202+
) -> list[int]:
203+
assert self.conn is not None, "Connection is not initialized"
204+
assert self.cursor is not None, "Cursor is not initialized"
205+
206+
search_param = self.case_config.search_param() # noqa: F841
207+
208+
try:
209+
if filters:
210+
self.cursor.execute(self.select_sql_with_filter, (filters.get("id"), self.vector_to_hex(query), k))
211+
else:
212+
self.cursor.execute(self.select_sql, (self.vector_to_hex(query), k))
213+
return [row[0] for row in self.cursor.fetchall()]
214+
215+
except mysql.Error:
216+
log.exception("Failed to execute search query")
217+
raise
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
from typing import Annotated, Unpack
2+
3+
import click
4+
from pydantic import SecretStr
5+
6+
from vectordb_bench.backend.clients import DB
7+
8+
from ....cli.cli import (
9+
CommonTypedDict,
10+
cli,
11+
click_parameter_decorators_from_typed_dict,
12+
run,
13+
)
14+
15+
16+
class AliSQLTypedDict(CommonTypedDict):
17+
user_name: Annotated[
18+
str,
19+
click.option(
20+
"--username",
21+
type=str,
22+
help="Username",
23+
required=True,
24+
),
25+
]
26+
password: Annotated[
27+
str,
28+
click.option(
29+
"--password",
30+
type=str,
31+
help="Password",
32+
required=True,
33+
),
34+
]
35+
36+
host: Annotated[
37+
str,
38+
click.option(
39+
"--host",
40+
type=str,
41+
help="Db host",
42+
default="127.0.0.1",
43+
),
44+
]
45+
46+
port: Annotated[
47+
int,
48+
click.option(
49+
"--port",
50+
type=int,
51+
default=3306,
52+
help="DB Port",
53+
),
54+
]
55+
56+
57+
class AliSQLHNSWTypedDict(AliSQLTypedDict):
58+
m: Annotated[
59+
int | None,
60+
click.option(
61+
"--m",
62+
type=int,
63+
help="M parameter in HNSW vector indexing",
64+
required=False,
65+
),
66+
]
67+
68+
ef_search: Annotated[
69+
int | None,
70+
click.option(
71+
"--ef-search",
72+
type=int,
73+
help="AliSQL system variable vidx_hnsw_ef_search",
74+
required=False,
75+
),
76+
]
77+
78+
cache_size: Annotated[
79+
int | None,
80+
click.option(
81+
"--cache-size",
82+
type=int,
83+
help="AliSQL system variable vidx_hnsw_cache_size",
84+
required=False,
85+
),
86+
]
87+
88+
89+
@cli.command()
90+
@click_parameter_decorators_from_typed_dict(AliSQLHNSWTypedDict)
91+
def AliSQLHNSW(
92+
**parameters: Unpack[AliSQLHNSWTypedDict],
93+
):
94+
from .config import AliSQLConfig, AliSQLHNSWConfig
95+
96+
run(
97+
db=DB.AliSQL,
98+
db_config=AliSQLConfig(
99+
db_label=parameters["db_label"],
100+
user_name=parameters["username"],
101+
password=SecretStr(parameters["password"]),
102+
host=parameters["host"],
103+
port=parameters["port"],
104+
),
105+
db_case_config=AliSQLHNSWConfig(
106+
M=parameters["m"],
107+
ef_search=parameters["ef_search"],
108+
cache_size=parameters["cache_size"],
109+
),
110+
**parameters,
111+
)

0 commit comments

Comments
 (0)