Skip to content

Commit dbfac67

Browse files
authored
feat: add replicate configuration APIs (#564)
Signed-off-by: ryjiang <jiangruiyi@gmail.com>
1 parent 4fe3d3f commit dbfac67

3 files changed

Lines changed: 266 additions & 0 deletions

File tree

milvus/grpc/Resource.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,69 @@ import {
1111
UpdateRresourceGroupReq,
1212
TransferNodeReq,
1313
TransferReplicaReq,
14+
UpdateReplicateConfigurationReq,
15+
GetReplicateConfigurationReq,
16+
GetReplicateConfigurationResponse,
1417
promisify,
1518
} from '../';
1619

1720
export class Resource extends Partition {
21+
/**
22+
* Update cross-cluster replication configuration.
23+
*
24+
* @param {UpdateReplicateConfigurationReq} data - The request parameters.
25+
* @param {ReplicateCluster[]} data.clusters - Cluster configurations.
26+
* @param {CrossClusterTopology[]} [data.cross_cluster_topology] - Replication topology between clusters.
27+
* @param {boolean} [data.force_promote] - Whether to force promote the current cluster to primary.
28+
* @param {number} [data.timeout] - An optional duration of time in milliseconds to allow for the RPC.
29+
*
30+
* @returns {Promise<ResStatus>} The response status of the operation.
31+
*/
32+
async updateReplicateConfiguration(
33+
data: UpdateReplicateConfigurationReq
34+
): Promise<ResStatus> {
35+
if (!data || !data.clusters) {
36+
throw new Error('The `clusters` property is missing.');
37+
}
38+
39+
const { clusters, cross_cluster_topology, force_promote, timeout } = data;
40+
return await promisify(
41+
this.channelPool,
42+
'UpdateReplicateConfiguration',
43+
{
44+
replicate_configuration: {
45+
clusters,
46+
cross_cluster_topology: cross_cluster_topology || [],
47+
},
48+
force_promote: force_promote || false,
49+
},
50+
timeout || this.timeout
51+
);
52+
}
53+
54+
/**
55+
* Get cross-cluster replication configuration.
56+
*
57+
* @param {GetReplicateConfigurationReq} [data] - Optional request parameters.
58+
* @param {number} [data.timeout] - An optional duration of time in milliseconds to allow for the RPC.
59+
*
60+
* @returns {Promise<GetReplicateConfigurationResponse>} The replication configuration response.
61+
*/
62+
async getReplicateConfiguration(
63+
data: GetReplicateConfigurationReq = {}
64+
): Promise<GetReplicateConfigurationResponse> {
65+
return await promisify(
66+
this.channelPool,
67+
'GetReplicateConfiguration',
68+
{},
69+
data.timeout || this.timeout
70+
);
71+
}
72+
73+
// aliases
74+
update_replicate_configuration = this.updateReplicateConfiguration;
75+
get_replicate_configuration = this.getReplicateConfiguration;
76+
1877
/**
1978
* Creates a resource group.
2079
*

milvus/types/Resource.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,36 @@ export interface ListResourceGroupsResponse extends resStatusResponse {
6262
export interface DescribeResourceGroupResponse extends resStatusResponse {
6363
resource_group: ResourceGroup;
6464
}
65+
66+
export interface ReplicateConnectionParam {
67+
uri: string;
68+
token?: string;
69+
}
70+
71+
export interface ReplicateCluster {
72+
cluster_id: string;
73+
connection_param: ReplicateConnectionParam;
74+
pchannels?: string[];
75+
}
76+
77+
export interface CrossClusterTopology {
78+
source_cluster_id: string;
79+
target_cluster_id: string;
80+
}
81+
82+
export interface ReplicateConfiguration {
83+
clusters: ReplicateCluster[];
84+
cross_cluster_topology?: CrossClusterTopology[];
85+
}
86+
87+
export interface UpdateReplicateConfigurationReq extends GrpcTimeOut {
88+
clusters: ReplicateCluster[];
89+
cross_cluster_topology?: CrossClusterTopology[];
90+
force_promote?: boolean;
91+
}
92+
93+
export interface GetReplicateConfigurationReq extends GrpcTimeOut {}
94+
95+
export interface GetReplicateConfigurationResponse extends resStatusResponse {
96+
configuration: ReplicateConfiguration;
97+
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
import { ErrorCode, MilvusClient } from '../../milvus';
2+
3+
describe('Replicate configuration API', () => {
4+
const createClient = () => {
5+
const client = new MilvusClient({
6+
address: 'localhost:19530',
7+
__SKIP_CONNECT__: true,
8+
});
9+
const calls: { method: string; params: any; options: any }[] = [];
10+
(client as any).channelPool = {
11+
acquire: jest.fn().mockResolvedValue({
12+
UpdateReplicateConfiguration: (params: any, options: any, cb: any) => {
13+
calls.push({
14+
method: 'UpdateReplicateConfiguration',
15+
params,
16+
options,
17+
});
18+
cb(null, { error_code: ErrorCode.SUCCESS, reason: '' });
19+
},
20+
GetReplicateConfiguration: (params: any, options: any, cb: any) => {
21+
calls.push({ method: 'GetReplicateConfiguration', params, options });
22+
cb(null, {
23+
status: { error_code: ErrorCode.SUCCESS, reason: '' },
24+
configuration: {
25+
clusters: [
26+
{
27+
cluster_id: 'source-cluster',
28+
connection_param: {
29+
uri: 'http://source:19530',
30+
token: 'source-token',
31+
},
32+
pchannels: ['source-pchannel'],
33+
},
34+
],
35+
cross_cluster_topology: [
36+
{
37+
source_cluster_id: 'source-cluster',
38+
target_cluster_id: 'target-cluster',
39+
},
40+
],
41+
},
42+
});
43+
},
44+
}),
45+
release: jest.fn(),
46+
};
47+
48+
return { client, calls };
49+
};
50+
51+
it('should update replicate configuration', async () => {
52+
const { client, calls } = createClient();
53+
54+
const res = await client.updateReplicateConfiguration({
55+
clusters: [
56+
{
57+
cluster_id: 'source-cluster',
58+
connection_param: {
59+
uri: 'http://source:19530',
60+
token: 'source-token',
61+
},
62+
pchannels: ['source-pchannel'],
63+
},
64+
{
65+
cluster_id: 'target-cluster',
66+
connection_param: {
67+
uri: 'http://target:19530',
68+
token: 'target-token',
69+
},
70+
},
71+
],
72+
cross_cluster_topology: [
73+
{
74+
source_cluster_id: 'source-cluster',
75+
target_cluster_id: 'target-cluster',
76+
},
77+
],
78+
force_promote: true,
79+
timeout: 1000,
80+
});
81+
82+
expect(res.error_code).toEqual(ErrorCode.SUCCESS);
83+
expect(calls[0]).toEqual(
84+
expect.objectContaining({
85+
method: 'UpdateReplicateConfiguration',
86+
params: {
87+
replicate_configuration: {
88+
clusters: [
89+
{
90+
cluster_id: 'source-cluster',
91+
connection_param: {
92+
uri: 'http://source:19530',
93+
token: 'source-token',
94+
},
95+
pchannels: ['source-pchannel'],
96+
},
97+
{
98+
cluster_id: 'target-cluster',
99+
connection_param: {
100+
uri: 'http://target:19530',
101+
token: 'target-token',
102+
},
103+
},
104+
],
105+
cross_cluster_topology: [
106+
{
107+
source_cluster_id: 'source-cluster',
108+
target_cluster_id: 'target-cluster',
109+
},
110+
],
111+
},
112+
force_promote: true,
113+
},
114+
})
115+
);
116+
expect(calls[0].options.deadline).toBeInstanceOf(Date);
117+
});
118+
119+
it('should default optional update fields', async () => {
120+
const { client, calls } = createClient();
121+
122+
await client.update_replicate_configuration({
123+
clusters: [
124+
{
125+
cluster_id: 'source-cluster',
126+
connection_param: { uri: 'http://source:19530' },
127+
},
128+
],
129+
});
130+
131+
expect(calls[0].params).toEqual({
132+
replicate_configuration: {
133+
clusters: [
134+
{
135+
cluster_id: 'source-cluster',
136+
connection_param: { uri: 'http://source:19530' },
137+
},
138+
],
139+
cross_cluster_topology: [],
140+
},
141+
force_promote: false,
142+
});
143+
});
144+
145+
it('should reject update without clusters', async () => {
146+
const { client } = createClient();
147+
148+
await expect(
149+
client.updateReplicateConfiguration({} as any)
150+
).rejects.toThrow('The `clusters` property is missing.');
151+
});
152+
153+
it('should get replicate configuration', async () => {
154+
const { client, calls } = createClient();
155+
156+
const res = await client.getReplicateConfiguration({ timeout: 1000 });
157+
158+
expect(res.status.error_code).toEqual(ErrorCode.SUCCESS);
159+
expect(res.configuration.clusters[0]).toEqual({
160+
cluster_id: 'source-cluster',
161+
connection_param: {
162+
uri: 'http://source:19530',
163+
token: 'source-token',
164+
},
165+
pchannels: ['source-pchannel'],
166+
});
167+
expect(calls[0]).toEqual(
168+
expect.objectContaining({
169+
method: 'GetReplicateConfiguration',
170+
params: {},
171+
})
172+
);
173+
});
174+
});

0 commit comments

Comments
 (0)