55from typing import Any , Final
66
77from opensearchpy import OpenSearch
8+ from packaging .version import Version
9+ from packaging .version import parse as parse_version
810
911from vectordb_bench .backend .filter import Filter , FilterOp
1012
1719WAITING_FOR_FORCE_MERGE_SEC : Final [int ] = 30
1820SECONDS_WAITING_FOR_REPLICAS_TO_BE_ENABLED_SEC : Final [int ] = 30
1921
22+ # Central registry for version-dependent OpenSearch index settings.
23+ # Add new rules here to automatically support future versions.
24+ VERSION_SPECIFIC_SETTING_RULES = [
25+ {
26+ "name" : "knn.advanced.approximate_threshold" ,
27+ "applies" : lambda version , _ : version >= Version ("3.0" ),
28+ "value" : lambda _ : "-1" ,
29+ },
30+ ]
31+
2032
2133class OpenSearchError (Exception ):
2234 """Custom exception for OpenSearch operations."""
@@ -216,10 +228,39 @@ def need_normalize_cosine(self) -> bool:
216228 """Whether this database needs to normalize dataset to support COSINE metric."""
217229 return True
218230
231+ def _get_cluster_version (self , client : OpenSearch ) -> Version :
232+ """
233+ Return the OpenSearch cluster version as a comparable Version object.
234+ Raises an exception if the version cannot be determined.
235+ """
236+ try :
237+ info = client .info ()
238+ raw_version_str = info .get ("version" , {}).get ("number" , "" )
239+ if not raw_version_str :
240+ raise ValueError ("Received empty version string from OpenSearch" ) # noqa: TRY301
241+ cluster_version = parse_version (raw_version_str )
242+ log .debug (f"Detected OpenSearch version: { cluster_version } " )
243+ return cluster_version # noqa: TRY300
244+ except Exception :
245+ log .exception ("Failed to determine OpenSearch version" )
246+ raise
247+
219248 def _get_settings_manager (self , client : OpenSearch ) -> OpenSearchSettingsManager :
220249 """Get settings manager for the given client."""
221250 return OpenSearchSettingsManager (client , self .index_name )
222251
252+ def _get_version_specific_settings (self , cluster_version : Version ) -> dict :
253+ """
254+ Builds and returns a dictionary of applicable version-specific settings.
255+ """
256+ version_specific_settings = {}
257+ for setting in VERSION_SPECIFIC_SETTING_RULES :
258+ if setting ["applies" ](cluster_version , self .case_config ):
259+ name = setting ["name" ]
260+ value = setting ["value" ](self .case_config )
261+ version_specific_settings [name ] = value
262+ return version_specific_settings
263+
223264 def _get_bulk_manager (self , client : OpenSearch ) -> BulkInsertManager :
224265 """Get bulk insert manager for the given client."""
225266 return BulkInsertManager (client , self .index_name , self .case_config )
@@ -241,18 +282,24 @@ def _create_index(self, client: OpenSearch) -> None:
241282 settings_manager .apply_cluster_settings (
242283 cluster_settings , "Successfully updated cluster settings for index creation"
243284 )
285+ # Base settings that are safe for all versions
244286 settings = {
245287 "index" : {
246288 "knn" : True ,
247289 "number_of_shards" : self .case_config .number_of_shards ,
248290 "number_of_replicas" : self .case_config .number_of_replicas ,
249291 "translog.flush_threshold_size" : self .case_config .flush_threshold_size ,
250- "knn.advanced.approximate_threshold" : "-1" ,
251292 "replication.type" : self .case_config .replication_type ,
252293 },
253294 "refresh_interval" : self .case_config .refresh_interval ,
254295 }
255296 settings ["index" ]["knn.algo_param.ef_search" ] = ef_search_value
297+
298+ version_specific_settings = self ._get_version_specific_settings (self ._get_cluster_version (client ))
299+ if version_specific_settings :
300+ log .info (f"Applying version-dependent settings: { version_specific_settings } " )
301+ settings ["index" ].update (version_specific_settings )
302+
256303 # Build properties mapping, excluding _id which is automatically handled by OpenSearch
257304 properties = {}
258305
0 commit comments