Skip to content

Commit ee96901

Browse files
authored
0.0.38 (#42)
Filter conditions for sinks Support across Marqo, Weaviate, Pinecone, Qdrant, Supabase and SingleStore Fixes to issues in LanceDB Fix to Search results to make score optional
1 parent 773822f commit ee96901

12 files changed

Lines changed: 179 additions & 46 deletions

File tree

neumai/neumai/Pipelines/Pipeline.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from neumai.SinkConnectors.filter_utils import FilterCondition
12
from .PipelineRun import PipelineRun
23
from .TriggerSchedule import TriggerSchedule
34
from neumai.SinkConnectors.SinkConnector import SinkConnector
@@ -152,9 +153,9 @@ def run(self) -> int:
152153
except Exception as e:
153154
raise e
154155

155-
def search(self, query:str, number_of_results:int, filter:dict={}) -> List[NeumSearchResult]:
156+
def search(self, query:str, number_of_results:int, filters:List[FilterCondition]={}) -> List[NeumSearchResult]:
156157
vector_for_query = self.embed.embed_query(query=query)
157-
matches = self.sink.search(vector=vector_for_query, number_of_results=number_of_results, filter=filter)
158+
matches = self.sink.search(vector=vector_for_query, number_of_results=number_of_results, filters=filters)
158159
return matches
159160

160161
# Todo standardize the model serialization as we are mixing FE and BE concepts into the SDK

neumai/neumai/Shared/NeumSearch.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@ class NeumSearchResult(BaseModel):
66

77
id:str = Field(..., description="Search result vector ID")
88
metadata:dict = Field(...,description="Search result vector metadata")
9-
score:float = Field(..., description="Search result similarity score")
9+
score: Optional[float] = Field(None, description="Search result similarity score")
1010
vector: Optional[List[float]] = Field(None, description="Search result vector")

neumai/neumai/SinkConnectors/LanceDBSink.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
)
99
from neumai.SinkConnectors.SinkConnector import SinkConnector
1010
from typing import List, Optional
11+
from neumai.SinkConnectors.filter_utils import FilterCondition
1112
from pydantic import Field
1213

1314
import lancedb
@@ -100,7 +101,7 @@ def store(self, vectors_to_store: List[NeumVector]) -> int:
100101

101102

102103
def search(self, vector: List[float],
103-
number_of_results: int, filter: dict = {}) -> List[NeumSearchResult]:
104+
number_of_results: int, filter: List[FilterCondition] = []) -> List[NeumSearchResult]:
104105

105106
db = self._get_db_connection()
106107
tbl = db.open_table(self.table_name)

neumai/neumai/SinkConnectors/MarqoSink.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ def _get_marqo_filter(self, column: str, value: Any, operator: FilterOperator) -
177177
raise Exception(f"Operator {operator} is currently not supported")
178178

179179

180-
def _get_filter_string_from_filter_condition(self, filter_conditions):
180+
def _get_filter_string_from_filter_condition(self, filter_conditions:List[FilterCondition]):
181181

182182
_filter_string = ""
183183
for condition in filter_conditions:
@@ -192,12 +192,10 @@ def _get_filter_string_from_filter_condition(self, filter_conditions):
192192
return _filter_string
193193

194194

195-
def search(self, vector: List[float], number_of_results: int, filter: List[dict] = [{}]) -> List:
195+
def search(self, vector: List[float], number_of_results: int, filters: List[FilterCondition] = []) -> List:
196196
url = self.url
197197
api_key = self.api_key
198198
index_name = self.index_name
199-
200-
filter = dict_to_filter_condition(filter)
201199
filter_string = self._get_filter_string_from_filter_condition(filter_conditions=filter)
202200

203201
try:

neumai/neumai/SinkConnectors/PineconeSink.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
PineconeIndexInfoException,
1010
PineconeQueryException,
1111
)
12+
from neumai.SinkConnectors.filter_utils import FilterCondition, FilterOperator
1213
from pydantic import Field
1314
import pinecone
1415

@@ -102,21 +103,46 @@ def store(self, vectors_to_store:List[NeumVector]) -> int:
102103
raise PineconeInsertionException(f"Failed to store in Pinecone. Exception - {e}")
103104
return int(vectors_stored)
104105

105-
def search(self, vector: List[float], number_of_results:int, filter:dict = {}) -> List[NeumSearchResult]:
106+
def translate_to_pinecone(filter_conditions:List[FilterCondition]):
107+
query_parts = []
108+
109+
for condition in filter_conditions:
110+
mongo_operator = {
111+
FilterOperator.EQUAL: '$eq',
112+
FilterOperator.NOT_EQUAL: '$ne',
113+
FilterOperator.GREATER_THAN: '$gt',
114+
FilterOperator.GREATER_THAN_OR_EQUAL: '$gte',
115+
FilterOperator.LESS_THAN: '$lt',
116+
FilterOperator.LESS_THAN_OR_EQUAL: '$lte',
117+
FilterOperator.IN: '$in',
118+
}.get(condition.operator, None)
119+
120+
if mongo_operator:
121+
query_parts.append({condition.field: {mongo_operator: condition.value}})
122+
else:
123+
# Handle complex cases like IN, NOT IN, etc.
124+
pass
125+
126+
return {"$and": query_parts} # Combine using $and, can be changed to $or if needed
127+
128+
def search(self, vector: List[float], number_of_results:int, filter:List[FilterCondition] = []) -> List[NeumSearchResult]:
106129
import pinecone
107130
api_key = self.api_key
108131
environment = self.environment
109132
index = self.index
110133
namespace = self.namespace
111134
if environment == "gcp-starter": namespace = None # short-term fix given gcp-starter limitation
135+
136+
filters = self.translate_to_pinecone(filter)
137+
112138
try:
113139
pinecone.init(
114140
api_key=api_key,
115141
environment=environment)
116142
index = pinecone.Index(index)
117143
results = index.query(
118144
vector=vector,
119-
filter=filter,
145+
filter=filters,
120146
top_k=number_of_results,
121147
namespace=namespace,
122148
include_values=False,

neumai/neumai/SinkConnectors/QdrantSink.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
)
99
from neumai.SinkConnectors.SinkConnector import SinkConnector
1010
from typing import List, Optional
11+
from neumai.SinkConnectors.filter_utils import FilterCondition, FilterOperator
1112
from qdrant_client.http.models import Distance, VectorParams
1213
from qdrant_client.http.models import PointStruct
1314
from qdrant_client.http.models import UpdateStatus
1415
from qdrant_client import QdrantClient
16+
from qdrant_client.http.models import Filter
1517
from pydantic import Field
1618

1719
class QdrantSink(SinkConnector):
@@ -85,10 +87,57 @@ def store(self, vectors_to_store:List[NeumVector]) -> int:
8587
return len(points)
8688
raise QdrantInsertionException("Qdrant storing failed. Try again later.")
8789

88-
def search(self, vector: List[float], number_of_results: int, filter:dict = {}) -> List:
90+
def filter_conditions_to_qdrant_filter(filters: List[FilterCondition]) -> dict:
91+
if len(filters) > 1:
92+
weaviate_filter = {
93+
"operator":"And",
94+
"operands" : []
95+
}
96+
for filter in filters:
97+
weaviate_filter = {
98+
"path":[filter.field],
99+
"operator": filter.operator,
100+
"valueText": filter.value
101+
}
102+
weaviate_filter["operands"].append(weaviate_filter)
103+
else:
104+
neum_filter = filters[0]
105+
weaviate_filter = {
106+
"path":[neum_filter.field],
107+
"operator": neum_filter.operator,
108+
"valueText": neum_filter.value
109+
}
110+
return weaviate_filter
111+
112+
def translate_to_qdrant(filter_conditions:List[FilterCondition]):
113+
qdrant_filter = {"must": []}
114+
115+
for condition in filter_conditions:
116+
if condition.operator == FilterOperator.EQUAL:
117+
qdrant_filter["must"].append({"key": condition.field, "match": {"value": condition.value}})
118+
elif condition.operator == FilterOperator.NOT_EQUAL:
119+
# Qdrant doesn't have a direct "not equal" filter, so it's handled with must_not
120+
qdrant_filter.setdefault("must_not", []).append({"key": condition.field, "match": {"value": condition.value}})
121+
elif condition.operator in [FilterOperator.LESS_THAN, FilterOperator.LESS_THAN_OR_EQUAL,
122+
FilterOperator.GREATER_THAN, FilterOperator.GREATER_THAN_OR_EQUAL]:
123+
range_filter = {"key": condition.field, "range": {}}
124+
if condition.operator == FilterOperator.LESS_THAN:
125+
range_filter["range"]["lt"] = condition.value
126+
elif condition.operator == FilterOperator.LESS_THAN_OR_EQUAL:
127+
range_filter["range"]["lte"] = condition.value
128+
elif condition.operator == FilterOperator.GREATER_THAN:
129+
range_filter["range"]["gt"] = condition.value
130+
elif condition.operator == FilterOperator.GREATER_THAN_OR_EQUAL:
131+
range_filter["range"]["gte"] = condition.value
132+
qdrant_filter["must"].append(range_filter)
133+
134+
return qdrant_filter
135+
136+
def search(self, vector: List[float], number_of_results: int, filter:List[FilterCondition]=[]) -> List:
89137
url = self.url
90138
api_key = self.api_key
91139
collection_name = self.collection_name
140+
filters = self.translate_to_qdrant(filter)
92141

93142
try:
94143
qdrant_client = QdrantClient(
@@ -100,6 +149,7 @@ def search(self, vector: List[float], number_of_results: int, filter:dict = {})
100149
query_vector=vector,
101150
with_payload= True,
102151
limit=number_of_results,
152+
query_filter=Filter(**filters)
103153
)
104154
except Exception as e:
105155
raise QdrantQueryException(f"Failed to query Qdrant. Exception - {e}")

neumai/neumai/SinkConnectors/SingleStoreSink.py

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
)
1010
from neumai.SinkConnectors.SinkConnector import SinkConnector
1111
from typing import List, Optional
12+
from neumai.SinkConnectors.filter_utils import FilterCondition, FilterOperator
1213
from pydantic import Field
1314
import singlestoredb as s2
1415

@@ -115,15 +116,39 @@ def store(self, vectors_to_store:List[NeumVector]) -> int:
115116

116117
return len(vectors_to_store), None
117118

118-
def search(self, vector: List[float], number_of_results: int, filter:dict={}) -> List[NeumSearchResult]:
119+
def translate_to_sql(filter_conditions:List[FilterCondition]):
120+
query_parts = []
121+
for condition in filter_conditions:
122+
sql_operator = condition.operator.value
123+
# Handle special formatting for IN, NOT IN, BETWEEN, etc.
124+
if condition.operator in [FilterOperator.IN, FilterOperator.NOT_IN]:
125+
values = '(' + ', '.join(map(str, condition.value.split(','))) + ')'
126+
else:
127+
values = condition.value
128+
129+
query_parts.append(f"{condition.field} {sql_operator} {values}")
130+
131+
conditions_str = " AND ".join(query_parts)
132+
return conditions_str
133+
134+
def search(self, vector: List[float], number_of_results: int, filter:List[FilterCondition]=[]) -> List[NeumSearchResult]:
119135
url = self.url
120136
table = self.table
121137

122-
query = f"""SELECT id, text, dot_product(vector, json_array_pack('{vector}')) AS score
123-
FROM {table}
124-
ORDER BY score DESC
125-
LIMIT {number_of_results}"""
138+
if len(filter)>0:
139+
list_of_fields = ",".join([f.field for f in filter])
140+
query = f"""SELECT id, text, dot_product(vector, json_array_pack('{vector}')) AS score, {list_of_fields}
141+
FROM {table}
142+
WHERE {self.translate_to_sql(filter)}
143+
ORDER BY score DESC
144+
LIMIT {number_of_results}"""
126145

146+
else:
147+
query = f"""SELECT id, text, dot_product(vector, json_array_pack('{vector}')) AS score
148+
FROM {table}
149+
ORDER BY score DESC
150+
LIMIT {number_of_results}"""
151+
127152
try:
128153
with s2.connect(url, results_type="dict") as conn:
129154
with conn.cursor() as cur:

neumai/neumai/SinkConnectors/SinkConnector.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from abc import ABC, abstractmethod
55
from typing import List
66
from pydantic import BaseModel
7+
from neumai.SinkConnectors.filter_utils import FilterCondition
78
import json
89

910
class SinkConnector(ABC, BaseModel):
@@ -32,7 +33,7 @@ def store(self, vectors_to_store:List[NeumVector]) -> int:
3233
"""Store vectors with a given service"""
3334

3435
@abstractmethod
35-
def search(self, vector:List[float], number_of_results:int, filter:dict={}) -> List[NeumSearchResult]:
36+
def search(self, vector:List[float], number_of_results:int, filters:List[FilterCondition]={}) -> List[NeumSearchResult]:
3637
"""Search vectors for a given service"""
3738

3839
@abstractmethod

neumai/neumai/SinkConnectors/SupabaseSink.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
SupabaseIndexInfoException,
1010
SupabaseQueryException
1111
)
12+
from neumai.SinkConnectors.filter_utils import FilterCondition, FilterOperator
1213
from pydantic import Field
1314
import vecs
1415

@@ -83,10 +84,35 @@ def store(self, vectors_to_store:List[NeumVector]) -> int:
8384
vx.disconnect()
8485
return len(vectors_to_store)
8586

86-
def search(self, vector: List[float], number_of_results:int, filter:dict={}) -> List:
87+
def translate_to_supabase(filter_conditions:List[FilterCondition]):
88+
query_parts = []
89+
90+
for condition in filter_conditions:
91+
mongo_operator = {
92+
FilterOperator.EQUAL: '$eq',
93+
FilterOperator.NOT_EQUAL: '$ne',
94+
FilterOperator.GREATER_THAN: '$gt',
95+
FilterOperator.GREATER_THAN_OR_EQUAL: '$gte',
96+
FilterOperator.LESS_THAN: '$lt',
97+
FilterOperator.LESS_THAN_OR_EQUAL: '$lte',
98+
FilterOperator.IN: '$in',
99+
}.get(condition.operator, None)
100+
101+
if mongo_operator:
102+
query_parts.append({condition.field: {mongo_operator: condition.value}})
103+
else:
104+
# Handle complex cases like IN, NOT IN, etc.
105+
pass
106+
107+
return {"$and": query_parts} # Combine using $and, can be changed to $or if needed
108+
109+
def search(self, vector: List[float], number_of_results:int, filter:List[FilterCondition]=[]) -> List:
87110
database_connection = self.database_connection
88111
vx = vecs.create_client(database_connection)
89112
collection_name = self.collection_name
113+
114+
filters = self.translate_to_supabase(filter)
115+
90116
try:
91117
db = vx.get_collection(name=collection_name)
92118
except:
@@ -99,7 +125,7 @@ def search(self, vector: List[float], number_of_results:int, filter:dict={}) ->
99125
include_metadata=True,
100126
include_value=True,
101127
limit=number_of_results,
102-
filters=filter
128+
filters=filters
103129
)
104130
except Exception as e:
105131
raise SupabaseQueryException(f"Error querying vectors from Supabase. Exception: {e}")

neumai/neumai/SinkConnectors/WeaviateSink.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
WeaviateIndexInfoException,
1010
WeaviateQueryException
1111
)
12+
from neumai.SinkConnectors.filter_utils import FilterCondition
1213
from pydantic import Field
1314
from weaviate.util import generate_uuid5, _capitalize_first_letter
1415
import weaviate
@@ -171,7 +172,29 @@ def store(self, vectors_to_store:List[NeumVector]) -> Tuple[List, dict]:
171172

172173
return len(vectors_to_store)
173174

174-
def search(self, vector: List[float], number_of_results: int, filter:dict={}) -> List[NeumSearchResult]:
175+
def filter_conditions_to_weaviate_filter(filters: List[FilterCondition]) -> dict:
176+
if len(filters) > 1:
177+
weaviate_filter = {
178+
"operator":"And",
179+
"operands" : []
180+
}
181+
for filter in filters:
182+
weaviate_filter = {
183+
"path":[filter.field],
184+
"operator": filter.operator,
185+
"valueText": filter.value
186+
}
187+
weaviate_filter["operands"].append(weaviate_filter)
188+
else:
189+
neum_filter = filters[0]
190+
weaviate_filter = {
191+
"path":[neum_filter.field],
192+
"operator": neum_filter.operator,
193+
"valueText": neum_filter.value
194+
}
195+
return weaviate_filter
196+
197+
def search(self, vector: List[float], number_of_results: int, filter:List[FilterCondition]=[]) -> List[NeumSearchResult]:
175198
api_key = self.api_key
176199
url = self.url
177200
# Weaviate requires first letter to be capitalized
@@ -200,7 +223,8 @@ def search(self, vector: List[float], number_of_results: int, filter:dict={}) ->
200223

201224
# Add .with_where(filter) only if filter is not empty
202225
if filter:
203-
client_query = client_query.with_where(filter)
226+
weaviate_filter = self.filter_conditions_to_weaviate_filter(filter)
227+
client_query = client_query.with_where(weaviate_filter)
204228

205229
# Final execution of the query
206230
search_result = client_query.do()

0 commit comments

Comments
 (0)