11from opensearchpy import NotFoundError , OpenSearch
22
33from benchmark .dataset import Dataset
4- from engine .base_client import IncompatibilityError
54from engine .base_client .configure import BaseConfigurator
65from engine .base_client .distances import Distance
76from engine .clients .opensearch .config import (
109 OPENSEARCH_PORT ,
1110 OPENSEARCH_USER ,
1211)
12+ from engine .clients .opensearch .utils import get_index_thread_qty
1313
1414
1515class OpenSearchConfigurator (BaseConfigurator ):
@@ -40,28 +40,37 @@ def __init__(self, host, collection_params: dict, connection_params: dict):
4040 )
4141
4242 def clean (self ):
43- try :
43+ is_index_available = self .client .indices .exists (index = OPENSEARCH_INDEX ,
44+ params = {
45+ "timeout" : 300 ,
46+ })
47+ if (is_index_available ):
48+ print (f"Deleting index: { OPENSEARCH_INDEX } , as it is already present" )
4449 self .client .indices .delete (
4550 index = OPENSEARCH_INDEX ,
4651 params = {
4752 "timeout" : 300 ,
4853 },
4954 )
50- except NotFoundError :
51- pass
55+
5256
5357 def recreate (self , dataset : Dataset , collection_params ):
54- if dataset .config .distance == Distance .DOT :
55- raise IncompatibilityError
56- if dataset .config .vector_size > 1024 :
57- raise IncompatibilityError
58+ self ._update_cluster_settings ()
59+ distance = self .DISTANCE_MAPPING [dataset .config .distance ]
60+ if dataset .config .distance == Distance .COSINE :
61+ distance = self .DISTANCE_MAPPING [Distance .DOT ]
62+ print (f"Using distance type: { distance } as dataset distance is : { dataset .config .distance } " )
5863
5964 self .client .indices .create (
6065 index = OPENSEARCH_INDEX ,
6166 body = {
6267 "settings" : {
6368 "index" : {
6469 "knn" : True ,
70+ "refresh_interval" : - 1 ,
71+ "number_of_replicas" : 0 ,
72+ "number_of_shards" : 1 ,
73+ "knn.advanced.approximate_threshold" : "-1"
6574 }
6675 },
6776 "mappings" : {
@@ -72,18 +81,13 @@ def recreate(self, dataset: Dataset, collection_params):
7281 "method" : {
7382 ** {
7483 "name" : "hnsw" ,
75- "engine" : "lucene" ,
76- "space_type" : self .DISTANCE_MAPPING [
77- dataset .config .distance
78- ],
79- "parameters" : {
80- "m" : 16 ,
81- "ef_construction" : 100 ,
82- },
84+ "engine" : "faiss" ,
85+ "space_type" : distance ,
86+ ** collection_params .get ("method" )
8387 },
84- ** collection_params .get ("method" ),
8588 },
8689 },
90+ # this doesn't work for nmslib, we need see what to do here, may be remove them
8791 ** self ._prepare_fields_config (dataset ),
8892 }
8993 },
@@ -94,6 +98,16 @@ def recreate(self, dataset: Dataset, collection_params):
9498 cluster_manager_timeout = "5m" ,
9599 )
96100
101+ def _update_cluster_settings (self ):
102+ index_thread_qty = get_index_thread_qty (self .client )
103+ cluster_settings_body = {
104+ "persistent" : {
105+ "knn.memory.circuit_breaker.limit" : "75%" , # putting a higher value to ensure that even with small cluster the latencies for vector search are good
106+ "knn.algo_param.index_thread_qty" : index_thread_qty
107+ }
108+ }
109+ self .client .cluster .put_settings (cluster_settings_body )
110+
97111 def _prepare_fields_config (self , dataset : Dataset ):
98112 return {
99113 field_name : {
@@ -104,3 +118,9 @@ def _prepare_fields_config(self, dataset: Dataset):
104118 }
105119 for field_name , field_type in dataset .config .schema .items ()
106120 }
121+
122+ def execution_params (self , distance , vector_size ) -> dict :
123+ # normalize the vectors if cosine similarity is there.
124+ if distance == Distance .COSINE :
125+ return {"normalize" : "true" }
126+ return {}
0 commit comments