diff --git a/milvus/grpc/Resource.ts b/milvus/grpc/Resource.ts index 3485cdc3..2e8417f3 100644 --- a/milvus/grpc/Resource.ts +++ b/milvus/grpc/Resource.ts @@ -11,10 +11,69 @@ import { UpdateRresourceGroupReq, TransferNodeReq, TransferReplicaReq, + UpdateReplicateConfigurationReq, + GetReplicateConfigurationReq, + GetReplicateConfigurationResponse, promisify, } from '../'; export class Resource extends Partition { + /** + * Update cross-cluster replication configuration. + * + * @param {UpdateReplicateConfigurationReq} data - The request parameters. + * @param {ReplicateCluster[]} data.clusters - Cluster configurations. + * @param {CrossClusterTopology[]} [data.cross_cluster_topology] - Replication topology between clusters. + * @param {boolean} [data.force_promote] - Whether to force promote the current cluster to primary. + * @param {number} [data.timeout] - An optional duration of time in milliseconds to allow for the RPC. + * + * @returns {Promise} The response status of the operation. + */ + async updateReplicateConfiguration( + data: UpdateReplicateConfigurationReq + ): Promise { + if (!data || !data.clusters) { + throw new Error('The `clusters` property is missing.'); + } + + const { clusters, cross_cluster_topology, force_promote, timeout } = data; + return await promisify( + this.channelPool, + 'UpdateReplicateConfiguration', + { + replicate_configuration: { + clusters, + cross_cluster_topology: cross_cluster_topology || [], + }, + force_promote: force_promote || false, + }, + timeout || this.timeout + ); + } + + /** + * Get cross-cluster replication configuration. + * + * @param {GetReplicateConfigurationReq} [data] - Optional request parameters. + * @param {number} [data.timeout] - An optional duration of time in milliseconds to allow for the RPC. + * + * @returns {Promise} The replication configuration response. + */ + async getReplicateConfiguration( + data: GetReplicateConfigurationReq = {} + ): Promise { + return await promisify( + this.channelPool, + 'GetReplicateConfiguration', + {}, + data.timeout || this.timeout + ); + } + + // aliases + update_replicate_configuration = this.updateReplicateConfiguration; + get_replicate_configuration = this.getReplicateConfiguration; + /** * Creates a resource group. * diff --git a/milvus/types/Resource.ts b/milvus/types/Resource.ts index 4425113c..13ac9e39 100644 --- a/milvus/types/Resource.ts +++ b/milvus/types/Resource.ts @@ -62,3 +62,36 @@ export interface ListResourceGroupsResponse extends resStatusResponse { export interface DescribeResourceGroupResponse extends resStatusResponse { resource_group: ResourceGroup; } + +export interface ReplicateConnectionParam { + uri: string; + token?: string; +} + +export interface ReplicateCluster { + cluster_id: string; + connection_param: ReplicateConnectionParam; + pchannels?: string[]; +} + +export interface CrossClusterTopology { + source_cluster_id: string; + target_cluster_id: string; +} + +export interface ReplicateConfiguration { + clusters: ReplicateCluster[]; + cross_cluster_topology?: CrossClusterTopology[]; +} + +export interface UpdateReplicateConfigurationReq extends GrpcTimeOut { + clusters: ReplicateCluster[]; + cross_cluster_topology?: CrossClusterTopology[]; + force_promote?: boolean; +} + +export interface GetReplicateConfigurationReq extends GrpcTimeOut {} + +export interface GetReplicateConfigurationResponse extends resStatusResponse { + configuration: ReplicateConfiguration; +} diff --git a/test/utils/ReplicateConfiguration.spec.ts b/test/utils/ReplicateConfiguration.spec.ts new file mode 100644 index 00000000..09636386 --- /dev/null +++ b/test/utils/ReplicateConfiguration.spec.ts @@ -0,0 +1,174 @@ +import { ErrorCode, MilvusClient } from '../../milvus'; + +describe('Replicate configuration API', () => { + const createClient = () => { + const client = new MilvusClient({ + address: 'localhost:19530', + __SKIP_CONNECT__: true, + }); + const calls: { method: string; params: any; options: any }[] = []; + (client as any).channelPool = { + acquire: jest.fn().mockResolvedValue({ + UpdateReplicateConfiguration: (params: any, options: any, cb: any) => { + calls.push({ + method: 'UpdateReplicateConfiguration', + params, + options, + }); + cb(null, { error_code: ErrorCode.SUCCESS, reason: '' }); + }, + GetReplicateConfiguration: (params: any, options: any, cb: any) => { + calls.push({ method: 'GetReplicateConfiguration', params, options }); + cb(null, { + status: { error_code: ErrorCode.SUCCESS, reason: '' }, + configuration: { + clusters: [ + { + cluster_id: 'source-cluster', + connection_param: { + uri: 'http://source:19530', + token: 'source-token', + }, + pchannels: ['source-pchannel'], + }, + ], + cross_cluster_topology: [ + { + source_cluster_id: 'source-cluster', + target_cluster_id: 'target-cluster', + }, + ], + }, + }); + }, + }), + release: jest.fn(), + }; + + return { client, calls }; + }; + + it('should update replicate configuration', async () => { + const { client, calls } = createClient(); + + const res = await client.updateReplicateConfiguration({ + clusters: [ + { + cluster_id: 'source-cluster', + connection_param: { + uri: 'http://source:19530', + token: 'source-token', + }, + pchannels: ['source-pchannel'], + }, + { + cluster_id: 'target-cluster', + connection_param: { + uri: 'http://target:19530', + token: 'target-token', + }, + }, + ], + cross_cluster_topology: [ + { + source_cluster_id: 'source-cluster', + target_cluster_id: 'target-cluster', + }, + ], + force_promote: true, + timeout: 1000, + }); + + expect(res.error_code).toEqual(ErrorCode.SUCCESS); + expect(calls[0]).toEqual( + expect.objectContaining({ + method: 'UpdateReplicateConfiguration', + params: { + replicate_configuration: { + clusters: [ + { + cluster_id: 'source-cluster', + connection_param: { + uri: 'http://source:19530', + token: 'source-token', + }, + pchannels: ['source-pchannel'], + }, + { + cluster_id: 'target-cluster', + connection_param: { + uri: 'http://target:19530', + token: 'target-token', + }, + }, + ], + cross_cluster_topology: [ + { + source_cluster_id: 'source-cluster', + target_cluster_id: 'target-cluster', + }, + ], + }, + force_promote: true, + }, + }) + ); + expect(calls[0].options.deadline).toBeInstanceOf(Date); + }); + + it('should default optional update fields', async () => { + const { client, calls } = createClient(); + + await client.update_replicate_configuration({ + clusters: [ + { + cluster_id: 'source-cluster', + connection_param: { uri: 'http://source:19530' }, + }, + ], + }); + + expect(calls[0].params).toEqual({ + replicate_configuration: { + clusters: [ + { + cluster_id: 'source-cluster', + connection_param: { uri: 'http://source:19530' }, + }, + ], + cross_cluster_topology: [], + }, + force_promote: false, + }); + }); + + it('should reject update without clusters', async () => { + const { client } = createClient(); + + await expect( + client.updateReplicateConfiguration({} as any) + ).rejects.toThrow('The `clusters` property is missing.'); + }); + + it('should get replicate configuration', async () => { + const { client, calls } = createClient(); + + const res = await client.getReplicateConfiguration({ timeout: 1000 }); + + expect(res.status.error_code).toEqual(ErrorCode.SUCCESS); + expect(res.configuration.clusters[0]).toEqual({ + cluster_id: 'source-cluster', + connection_param: { + uri: 'http://source:19530', + token: 'source-token', + }, + pchannels: ['source-pchannel'], + }); + expect(calls[0]).toEqual( + expect.objectContaining({ + method: 'GetReplicateConfiguration', + params: {}, + }) + ); + }); +});