Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions milvus/MilvusClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ import {
CreateCollectionReq,
ERROR_REASONS,
checkCreateCollectionCompatibility,
SearchReq,
SearchSimpleReq,
HybridSearchReq,
SearchIteratorReq,
QueryReq,
QueryIteratorReq,
GetReq,
SearchResults,
QueryResults,
DEFAULT_PRIMARY_KEY_FIELD,
DEFAULT_METRIC_TYPE,
DEFAULT_VECTOR_FIELD,
Expand Down Expand Up @@ -74,6 +83,19 @@ export class MilvusClient extends GRPCClient {
}
}

/**
* Creates a lightweight DQL session pinned to a target cluster.
* The session reuses this client's connection and injects `cluster_id` into
* search/query/get/iterator request parameters.
* @param clusterId The target cluster id.
*/
session(clusterId: string): MilvusClientSession {
if (typeof clusterId !== 'string' || clusterId.length === 0) {
throw new Error('clusterId must be a non-empty string');
}
return new MilvusClientSession(this, clusterId);
}

// High level API: align with python MilvusClient
/**
* Creates a new collection with the given parameters.
Expand Down Expand Up @@ -236,3 +258,67 @@ export class MilvusClient extends GRPCClient {
return result;
}
}

/**
* Lightweight DQL session bound to a cluster id and backed by a parent client.
* Closing the session only prevents future session calls; it does not close the
* parent client or its gRPC channel pool.
*/
export class MilvusClientSession {
private readonly parent: MilvusClient;
private readonly clusterId: string;
private closed = false;

constructor(parent: MilvusClient, clusterId: string) {
this.parent = parent;
this.clusterId = clusterId;
}

private ensureOpen() {
if (this.closed) {
throw new Error('MilvusClient session is closed');
}
}

private withClusterId<T extends { cluster_id?: string }>(params: T): T {
return { ...params, cluster_id: this.clusterId };
}

close(): void {
this.closed = true;
}

search<T extends SearchReq | SearchSimpleReq | HybridSearchReq>(
params: T
): Promise<SearchResults<T>> {
this.ensureOpen();
return this.parent.search(this.withClusterId(params));
}

hybridSearch<T extends HybridSearchReq>(
params: T
): Promise<SearchResults<T>> {
this.ensureOpen();
return this.parent.hybridSearch(this.withClusterId(params));
}

searchIterator(param: SearchIteratorReq): Promise<any> {
this.ensureOpen();
return this.parent.searchIterator(this.withClusterId(param));
}

query(data: QueryReq): Promise<QueryResults> {
this.ensureOpen();
return this.parent.query(this.withClusterId(data));
}

queryIterator(data: QueryIteratorReq): Promise<any> {
this.ensureOpen();
return this.parent.queryIterator(this.withClusterId(data));
}

get(data: GetReq): Promise<QueryResults> {
this.ensureOpen();
return this.parent.get(this.withClusterId(data));
}
}
1 change: 1 addition & 0 deletions milvus/const/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export const DEFAULT_RESOURCE_GROUP = '__default_resource_group'; // default res
export const DEFAULT_DB = 'default'; // default database name
export const DEFAULT_DYNAMIC_FIELD = '$meta'; // default dynamic field name
export const DEFAULT_COUNT_QUERY_STRING = 'count(*)'; // default count query string
export const CLUSTER_ID = 'cluster_id'; // cluster id routing parameter for DQL requests
export const DEFAULT_HTTP_TIMEOUT = 60000; // default http timeout, 60s
export const DEFAULT_HTTP_ENDPOINT_VERSION = 'v2'; // api version, default v1

Expand Down
16 changes: 15 additions & 1 deletion milvus/grpc/Data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ import {
FloatVector,
FieldPartialUpdateOpType,
FieldPartialUpdateOp,
CLUSTER_ID,
} from '../';
import { Collection } from './Collection';

Expand Down Expand Up @@ -842,11 +843,14 @@ export class Data extends Collection {
const count = await client.count({
collection_name: param.collection_name,
expr: param.expr || param.filter || '',
db_name: param.db_name,
cluster_id: param.cluster_id,
});

// get collection Info
const collectionInfo = await this.describeCollection({
collection_name: param.collection_name,
db_name: param.db_name,
});

// if limit not set, set it to count
Expand Down Expand Up @@ -874,6 +878,7 @@ export class Data extends Collection {
// search iterator special params
const params: any = {
...param.params,
...(param.cluster_id ? { [CLUSTER_ID]: param.cluster_id } : {}),
[ITERATOR_FIELD]: true,
[ITER_SEARCH_V2_KEY]: true,
[ITER_SEARCH_BATCH_SIZE_KEY]: batchSize,
Expand Down Expand Up @@ -956,6 +961,8 @@ export class Data extends Collection {
const count = await client.count({
collection_name: data.collection_name,
expr: userExpr,
db_name: data.db_name,
cluster_id: data.cluster_id,
});
// remove filter field to avoid conflict with expr in query method
const queryData = { ...data };
Expand Down Expand Up @@ -1186,6 +1193,10 @@ export class Data extends Collection {
}

// Execute the query and get the results
if (data.cluster_id) {
queryParams[CLUSTER_ID] = data.cluster_id;
}

const promise: QueryRes = await promisify(
this.channelPool,
'Query',
Expand Down Expand Up @@ -1264,13 +1275,16 @@ export class Data extends Collection {
async count(data: CountReq): Promise<CountResult> {
const req: any = {
collection_name: data.collection_name,
expr: data.expr || '',
expr: data.filter || data.expr || '',
output_fields: [DEFAULT_COUNT_QUERY_STRING],
};

if (data.db_name) {
req.db_name = data.db_name;
}
if (data.cluster_id) {
req.cluster_id = data.cluster_id;
}
const queryResult = await this.query(req);

return {
Expand Down
4 changes: 4 additions & 0 deletions milvus/proto-json/milvus.base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6513,6 +6513,10 @@ export default {
"endTime": {
"type": "int64",
"id": 8
},
"externalSpec": {
"type": "string",
"id": 9
}
}
},
Expand Down
4 changes: 4 additions & 0 deletions milvus/proto-json/milvus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6513,6 +6513,10 @@ export default {
"end_time": {
"type": "int64",
"id": 8
},
"external_spec": {
"type": "string",
"id": 9
}
}
},
Expand Down
1 change: 1 addition & 0 deletions milvus/types/Common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export interface GrpcTimeOut {
timeout?: number;
client_request_id?: string; // optional, trace id for request tracking
'client-request-id'?: string; // optional, trace id for request tracking (alternative format)
cluster_id?: string; // optional, route DQL request to a specific cluster
}
export type PrivilegesTypes =
| CollectionPrivileges
Expand Down
3 changes: 3 additions & 0 deletions milvus/types/Data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {

export interface CountReq extends collectionNameReq {
expr?: string; // filter expression
filter?: string; // alias for expr
}

interface BaseDeleteReq extends collectionNameReq {
Expand Down Expand Up @@ -114,6 +115,8 @@ export interface GetReq extends collectionNameReq {
offset?: number; // skip how many results
limit?: number; // how many results you want
consistency_level?: ConsistencyLevelEnum; // consistency level
transformers?: OutputTransformers; // provide custom data transformer for specific data type like bf16 or f16 vectors
exprValues?: keyValueObj; // template values for filter expression, eg: {key: 'value'}
}

export interface QueryRes extends resStatusResponse {
Expand Down
16 changes: 9 additions & 7 deletions milvus/types/Search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
Int8Vector,
FieldData,
OrderByFields,
GrpcTimeOut,
} from '../';

// Highlighter types
Expand Down Expand Up @@ -132,13 +133,14 @@ export interface SearchSimpleReq extends collectionNameReq {
export type HybridSearchSingleReq = Pick<
SearchParam,
'anns_field' | 'ignore_growing' | 'group_by_field'
> & {
data: SearchData; // vector to search
expr?: string; // filter expression
exprValues?: keyValueObj; // template values for filter expression, eg: {key: 'value'}
params?: keyValueObj; // extra search parameters
transformers?: OutputTransformers; // provide custom data transformer for specific data type like bf16 or f16 vectors
};
> &
GrpcTimeOut & {
data: SearchData; // vector to search
expr?: string; // filter expression
exprValues?: keyValueObj; // template values for filter expression, eg: {key: 'value'}
params?: keyValueObj; // extra search parameters
transformers?: OutputTransformers; // provide custom data transformer for specific data type like bf16 or f16 vectors
};

export interface SearchIteratorReq extends Omit<
SearchSimpleReq,
Expand Down
12 changes: 12 additions & 0 deletions milvus/utils/Search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
keyValueObj,
FunctionObject,
FunctionScore,
CLUSTER_ID,
buildFieldDataMap,
cloneObj,
parseToKeyValue,
Expand Down Expand Up @@ -418,6 +419,13 @@ export const buildSearchRequest = (
);
}

if (!isHybridSearch && params.cluster_id) {
request.search_params = [
...(request.search_params as KeyValuePair[]),
{ key: CLUSTER_ID, value: params.cluster_id },
];
}

// if exprValues is set, add it to the request(inner)
if (userRequest.exprValues) {
request.expr_template_values = formatExprValues(userRequest.exprValues);
Expand Down Expand Up @@ -450,6 +458,9 @@ export const buildSearchRequest = (
'type' in rerank
);
const hasFunctionScore = isFunctionScore(rerank);
const clusterIdParam = params.cluster_id
? [{ key: CLUSTER_ID, value: params.cluster_id }]
: [];

// build highlighter if provided
const highlighter =
Expand Down Expand Up @@ -489,6 +500,7 @@ export const buildSearchRequest = (
key: 'offset',
value: searchSimpleReq.offset ?? 0,
},
...clusterIdParam,
],
},

Expand Down
2 changes: 1 addition & 1 deletion proto
Loading