Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions milvus/grpc/Resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,69 @@ import {
UpdateRresourceGroupReq,
TransferNodeReq,
TransferReplicaReq,
UpdateReplicateConfigurationReq,
GetReplicateConfigurationReq,
GetReplicateConfigurationResponse,
promisify,
} from '../';

export class Resource extends Partition {
/**
* Update cross-cluster replication configuration.
*
* @param {UpdateReplicateConfigurationReq} data - The request parameters.
* @param {ReplicateCluster[]} data.clusters - Cluster configurations.
* @param {CrossClusterTopology[]} [data.cross_cluster_topology] - Replication topology between clusters.
* @param {boolean} [data.force_promote] - Whether to force promote the current cluster to primary.
* @param {number} [data.timeout] - An optional duration of time in milliseconds to allow for the RPC.
*
* @returns {Promise<ResStatus>} The response status of the operation.
*/
async updateReplicateConfiguration(
data: UpdateReplicateConfigurationReq
): Promise<ResStatus> {
if (!data || !data.clusters) {
throw new Error('The `clusters` property is missing.');
}

const { clusters, cross_cluster_topology, force_promote, timeout } = data;
return await promisify(
this.channelPool,
'UpdateReplicateConfiguration',
{
replicate_configuration: {
clusters,
cross_cluster_topology: cross_cluster_topology || [],
},
force_promote: force_promote || false,
},
timeout || this.timeout
);
}

/**
* Get cross-cluster replication configuration.
*
* @param {GetReplicateConfigurationReq} [data] - Optional request parameters.
* @param {number} [data.timeout] - An optional duration of time in milliseconds to allow for the RPC.
*
* @returns {Promise<GetReplicateConfigurationResponse>} The replication configuration response.
*/
async getReplicateConfiguration(
data: GetReplicateConfigurationReq = {}
): Promise<GetReplicateConfigurationResponse> {
return await promisify(
this.channelPool,
'GetReplicateConfiguration',
{},
data.timeout || this.timeout
);
}

// aliases
update_replicate_configuration = this.updateReplicateConfiguration;
get_replicate_configuration = this.getReplicateConfiguration;

/**
* Creates a resource group.
*
Expand Down
33 changes: 33 additions & 0 deletions milvus/types/Resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,36 @@ export interface ListResourceGroupsResponse extends resStatusResponse {
export interface DescribeResourceGroupResponse extends resStatusResponse {
resource_group: ResourceGroup;
}

export interface ReplicateConnectionParam {
uri: string;
token?: string;
}

export interface ReplicateCluster {
cluster_id: string;
connection_param: ReplicateConnectionParam;
pchannels?: string[];
}

export interface CrossClusterTopology {
source_cluster_id: string;
target_cluster_id: string;
}

export interface ReplicateConfiguration {
clusters: ReplicateCluster[];
cross_cluster_topology?: CrossClusterTopology[];
}

export interface UpdateReplicateConfigurationReq extends GrpcTimeOut {
clusters: ReplicateCluster[];
cross_cluster_topology?: CrossClusterTopology[];
force_promote?: boolean;
}

export interface GetReplicateConfigurationReq extends GrpcTimeOut {}

export interface GetReplicateConfigurationResponse extends resStatusResponse {
configuration: ReplicateConfiguration;
}
174 changes: 174 additions & 0 deletions test/utils/ReplicateConfiguration.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import { ErrorCode, MilvusClient } from '../../milvus';

describe('Replicate configuration API', () => {
const createClient = () => {
const client = new MilvusClient({
address: 'localhost:19530',
__SKIP_CONNECT__: true,
});
const calls: { method: string; params: any; options: any }[] = [];
(client as any).channelPool = {
acquire: jest.fn().mockResolvedValue({
UpdateReplicateConfiguration: (params: any, options: any, cb: any) => {
calls.push({
method: 'UpdateReplicateConfiguration',
params,
options,
});
cb(null, { error_code: ErrorCode.SUCCESS, reason: '' });
},
GetReplicateConfiguration: (params: any, options: any, cb: any) => {
calls.push({ method: 'GetReplicateConfiguration', params, options });
cb(null, {
status: { error_code: ErrorCode.SUCCESS, reason: '' },
configuration: {
clusters: [
{
cluster_id: 'source-cluster',
connection_param: {
uri: 'http://source:19530',
token: 'source-token',
},
pchannels: ['source-pchannel'],
},
],
cross_cluster_topology: [
{
source_cluster_id: 'source-cluster',
target_cluster_id: 'target-cluster',
},
],
},
});
},
}),
release: jest.fn(),
};

return { client, calls };
};

it('should update replicate configuration', async () => {
const { client, calls } = createClient();

const res = await client.updateReplicateConfiguration({
clusters: [
{
cluster_id: 'source-cluster',
connection_param: {
uri: 'http://source:19530',
token: 'source-token',
},
pchannels: ['source-pchannel'],
},
{
cluster_id: 'target-cluster',
connection_param: {
uri: 'http://target:19530',
token: 'target-token',
},
},
],
cross_cluster_topology: [
{
source_cluster_id: 'source-cluster',
target_cluster_id: 'target-cluster',
},
],
force_promote: true,
timeout: 1000,
});

expect(res.error_code).toEqual(ErrorCode.SUCCESS);
expect(calls[0]).toEqual(
expect.objectContaining({
method: 'UpdateReplicateConfiguration',
params: {
replicate_configuration: {
clusters: [
{
cluster_id: 'source-cluster',
connection_param: {
uri: 'http://source:19530',
token: 'source-token',
},
pchannels: ['source-pchannel'],
},
{
cluster_id: 'target-cluster',
connection_param: {
uri: 'http://target:19530',
token: 'target-token',
},
},
],
cross_cluster_topology: [
{
source_cluster_id: 'source-cluster',
target_cluster_id: 'target-cluster',
},
],
},
force_promote: true,
},
})
);
expect(calls[0].options.deadline).toBeInstanceOf(Date);
});

it('should default optional update fields', async () => {
const { client, calls } = createClient();

await client.update_replicate_configuration({
clusters: [
{
cluster_id: 'source-cluster',
connection_param: { uri: 'http://source:19530' },
},
],
});

expect(calls[0].params).toEqual({
replicate_configuration: {
clusters: [
{
cluster_id: 'source-cluster',
connection_param: { uri: 'http://source:19530' },
},
],
cross_cluster_topology: [],
},
force_promote: false,
});
});

it('should reject update without clusters', async () => {
const { client } = createClient();

await expect(
client.updateReplicateConfiguration({} as any)
).rejects.toThrow('The `clusters` property is missing.');
});

it('should get replicate configuration', async () => {
const { client, calls } = createClient();

const res = await client.getReplicateConfiguration({ timeout: 1000 });

expect(res.status.error_code).toEqual(ErrorCode.SUCCESS);
expect(res.configuration.clusters[0]).toEqual({
cluster_id: 'source-cluster',
connection_param: {
uri: 'http://source:19530',
token: 'source-token',
},
pchannels: ['source-pchannel'],
});
expect(calls[0]).toEqual(
expect.objectContaining({
method: 'GetReplicateConfiguration',
params: {},
})
);
});
});