Skip to content

Commit a988668

Browse files
authored
Add getReplicateInfo() (#1871)
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent 9a11573 commit a988668

7 files changed

Lines changed: 431 additions & 0 deletions

File tree

sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232
import io.milvus.orm.iterator.SearchIterator;
3333
import io.milvus.orm.iterator.SearchIteratorV2;
3434
import io.milvus.v2.service.cdc.CDCService;
35+
import io.milvus.v2.service.cdc.request.GetReplicateInfoReq;
3536
import io.milvus.v2.service.cdc.request.UpdateReplicateConfigurationReq;
3637
import io.milvus.v2.service.cdc.response.GetReplicateConfigurationResp;
38+
import io.milvus.v2.service.cdc.response.GetReplicateInfoResp;
3739
import io.milvus.v2.service.cdc.response.UpdateReplicateConfigurationResp;
3840
import io.milvus.v2.service.collection.CollectionService;
3941
import io.milvus.v2.service.collection.request.*;
@@ -1648,6 +1650,10 @@ public CheckHealthResp checkHealth() {
16481650
return rpcUtils.retry(() -> utilityService.checkHealth(this.getRpcStub()));
16491651
}
16501652

1653+
public GetReplicateInfoResp getReplicateInfo(GetReplicateInfoReq request) {
1654+
return rpcUtils.retry(() -> cdcService.getReplicateInfo(this.getRpcStub(), request));
1655+
}
1656+
16511657
public GetReplicateConfigurationResp getReplicateConfiguration() {
16521658
return rpcUtils.retry(() -> cdcService.getReplicateConfiguration(this.getRpcStub()));
16531659
}

sdk-core/src/main/java/io/milvus/v2/service/cdc/CDCService.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,43 @@
2121

2222
import io.milvus.grpc.GetReplicateConfigurationRequest;
2323
import io.milvus.grpc.GetReplicateConfigurationResponse;
24+
import io.milvus.grpc.GetReplicateInfoRequest;
25+
import io.milvus.grpc.GetReplicateInfoResponse;
2426
import io.milvus.grpc.MilvusServiceGrpc;
2527
import io.milvus.grpc.Status;
2628
import io.milvus.grpc.UpdateReplicateConfigurationRequest;
29+
import io.milvus.v2.exception.ErrorCode;
30+
import io.milvus.v2.exception.MilvusClientException;
2731
import io.milvus.v2.service.BaseService;
32+
import io.milvus.v2.service.cdc.request.GetReplicateInfoReq;
2833
import io.milvus.v2.service.cdc.request.ReplicateConfiguration;
2934
import io.milvus.v2.service.cdc.request.UpdateReplicateConfigurationReq;
3035
import io.milvus.v2.service.cdc.response.GetReplicateConfigurationResp;
36+
import io.milvus.v2.service.cdc.response.GetReplicateInfoResp;
3137
import io.milvus.v2.service.cdc.response.UpdateReplicateConfigurationResp;
38+
import org.apache.commons.lang3.StringUtils;
3239

3340
public class CDCService extends BaseService {
41+
public GetReplicateInfoResp getReplicateInfo(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, GetReplicateInfoReq requestParam) {
42+
if (StringUtils.isEmpty(requestParam.getSourceClusterId())) {
43+
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "sourceClusterId cannot be null or empty");
44+
}
45+
if (StringUtils.isEmpty(requestParam.getTargetPchannel())) {
46+
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "targetPchannel cannot be null or empty");
47+
}
48+
49+
GetReplicateInfoRequest request = GetReplicateInfoRequest.newBuilder()
50+
.setSourceClusterId(requestParam.getSourceClusterId())
51+
.setTargetPchannel(requestParam.getTargetPchannel())
52+
.build();
53+
54+
GetReplicateInfoResponse response = blockingStub.getReplicateInfo(request);
55+
return GetReplicateInfoResp.builder()
56+
.checkpoint(response.hasCheckpoint() ? GetReplicateInfoResp.ReplicateCheckpoint.fromGRPC(response.getCheckpoint()) : null)
57+
.salvageCheckpoint(response.hasSalvageCheckpoint() ? GetReplicateInfoResp.ReplicateCheckpoint.fromGRPC(response.getSalvageCheckpoint()) : null)
58+
.build();
59+
}
60+
3461
public GetReplicateConfigurationResp getReplicateConfiguration(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub) {
3562
GetReplicateConfigurationRequest request = GetReplicateConfigurationRequest.newBuilder().build();
3663

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package io.milvus.v2.service.cdc.request;
21+
22+
public class GetReplicateInfoReq {
23+
private String sourceClusterId;
24+
private String targetPchannel;
25+
26+
public static GetReplicateInfoReqBuilder builder() {
27+
return new GetReplicateInfoReqBuilder();
28+
}
29+
30+
private GetReplicateInfoReq(GetReplicateInfoReqBuilder builder) {
31+
this.sourceClusterId = builder.sourceClusterId;
32+
this.targetPchannel = builder.targetPchannel;
33+
}
34+
35+
public String getSourceClusterId() {
36+
return sourceClusterId;
37+
}
38+
39+
public void setSourceClusterId(String sourceClusterId) {
40+
this.sourceClusterId = sourceClusterId;
41+
}
42+
43+
public String getTargetPchannel() {
44+
return targetPchannel;
45+
}
46+
47+
public void setTargetPchannel(String targetPchannel) {
48+
this.targetPchannel = targetPchannel;
49+
}
50+
51+
@Override
52+
public String toString() {
53+
return "GetReplicateInfoReq{" +
54+
"sourceClusterId='" + sourceClusterId + '\'' +
55+
", targetPchannel='" + targetPchannel + '\'' +
56+
'}';
57+
}
58+
59+
public static class GetReplicateInfoReqBuilder {
60+
private String sourceClusterId;
61+
private String targetPchannel;
62+
63+
public GetReplicateInfoReqBuilder sourceClusterId(String sourceClusterId) {
64+
this.sourceClusterId = sourceClusterId;
65+
return this;
66+
}
67+
68+
public GetReplicateInfoReqBuilder targetPchannel(String targetPchannel) {
69+
this.targetPchannel = targetPchannel;
70+
return this;
71+
}
72+
73+
public GetReplicateInfoReq build() {
74+
return new GetReplicateInfoReq(this);
75+
}
76+
}
77+
}
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package io.milvus.v2.service.cdc.response;
21+
22+
public class GetReplicateInfoResp {
23+
private ReplicateCheckpoint checkpoint;
24+
private ReplicateCheckpoint salvageCheckpoint;
25+
26+
private GetReplicateInfoResp(GetReplicateInfoRespBuilder builder) {
27+
this.checkpoint = builder.checkpoint;
28+
this.salvageCheckpoint = builder.salvageCheckpoint;
29+
}
30+
31+
public static GetReplicateInfoRespBuilder builder() {
32+
return new GetReplicateInfoRespBuilder();
33+
}
34+
35+
public ReplicateCheckpoint getCheckpoint() {
36+
return checkpoint;
37+
}
38+
39+
public void setCheckpoint(ReplicateCheckpoint checkpoint) {
40+
this.checkpoint = checkpoint;
41+
}
42+
43+
public ReplicateCheckpoint getSalvageCheckpoint() {
44+
return salvageCheckpoint;
45+
}
46+
47+
public void setSalvageCheckpoint(ReplicateCheckpoint salvageCheckpoint) {
48+
this.salvageCheckpoint = salvageCheckpoint;
49+
}
50+
51+
@Override
52+
public String toString() {
53+
return "GetReplicateInfoResp{" +
54+
"checkpoint=" + checkpoint +
55+
", salvageCheckpoint=" + salvageCheckpoint +
56+
'}';
57+
}
58+
59+
public static class GetReplicateInfoRespBuilder {
60+
private ReplicateCheckpoint checkpoint;
61+
private ReplicateCheckpoint salvageCheckpoint;
62+
63+
public GetReplicateInfoRespBuilder checkpoint(ReplicateCheckpoint checkpoint) {
64+
this.checkpoint = checkpoint;
65+
return this;
66+
}
67+
68+
public GetReplicateInfoRespBuilder salvageCheckpoint(ReplicateCheckpoint salvageCheckpoint) {
69+
this.salvageCheckpoint = salvageCheckpoint;
70+
return this;
71+
}
72+
73+
public GetReplicateInfoResp build() {
74+
return new GetReplicateInfoResp(this);
75+
}
76+
}
77+
78+
public static class ReplicateCheckpoint {
79+
private String clusterId;
80+
private String pchannel;
81+
private MessageID messageID;
82+
private Long timeTick;
83+
84+
public static ReplicateCheckpoint fromGRPC(io.milvus.grpc.ReplicateCheckpoint checkpoint) {
85+
return ReplicateCheckpoint.builder()
86+
.clusterId(checkpoint.getClusterId())
87+
.pchannel(checkpoint.getPchannel())
88+
.messageID(checkpoint.hasMessageId() ? MessageID.fromGRPC(checkpoint.getMessageId()) : null)
89+
.timeTick(checkpoint.getTimeTick())
90+
.build();
91+
}
92+
93+
private ReplicateCheckpoint(ReplicateCheckpointBuilder builder) {
94+
this.clusterId = builder.clusterId;
95+
this.pchannel = builder.pchannel;
96+
this.messageID = builder.messageID;
97+
this.timeTick = builder.timeTick;
98+
}
99+
100+
public static ReplicateCheckpointBuilder builder() {
101+
return new ReplicateCheckpointBuilder();
102+
}
103+
104+
public String getClusterId() {
105+
return clusterId;
106+
}
107+
108+
public void setClusterId(String clusterId) {
109+
this.clusterId = clusterId;
110+
}
111+
112+
public String getPchannel() {
113+
return pchannel;
114+
}
115+
116+
public void setPchannel(String pchannel) {
117+
this.pchannel = pchannel;
118+
}
119+
120+
public MessageID getMessageID() {
121+
return messageID;
122+
}
123+
124+
public void setMessageID(MessageID messageID) {
125+
this.messageID = messageID;
126+
}
127+
128+
public Long getTimeTick() {
129+
return timeTick;
130+
}
131+
132+
public void setTimeTick(Long timeTick) {
133+
this.timeTick = timeTick;
134+
}
135+
136+
@Override
137+
public String toString() {
138+
return "ReplicateCheckpoint{" +
139+
"clusterId='" + clusterId + '\'' +
140+
", pchannel='" + pchannel + '\'' +
141+
", messageID=" + messageID +
142+
", timeTick=" + timeTick +
143+
'}';
144+
}
145+
146+
public static class ReplicateCheckpointBuilder {
147+
private String clusterId;
148+
private String pchannel;
149+
private MessageID messageID;
150+
private Long timeTick;
151+
152+
public ReplicateCheckpointBuilder clusterId(String clusterId) {
153+
this.clusterId = clusterId;
154+
return this;
155+
}
156+
157+
public ReplicateCheckpointBuilder pchannel(String pchannel) {
158+
this.pchannel = pchannel;
159+
return this;
160+
}
161+
162+
public ReplicateCheckpointBuilder messageID(MessageID messageID) {
163+
this.messageID = messageID;
164+
return this;
165+
}
166+
167+
public ReplicateCheckpointBuilder timeTick(Long timeTick) {
168+
this.timeTick = timeTick;
169+
return this;
170+
}
171+
172+
public ReplicateCheckpoint build() {
173+
return new ReplicateCheckpoint(this);
174+
}
175+
}
176+
}
177+
178+
public static class MessageID {
179+
private String id;
180+
private String walName;
181+
182+
public static MessageID fromGRPC(io.milvus.grpc.MessageID messageID) {
183+
return MessageID.builder()
184+
.id(messageID.getId())
185+
.walName(messageID.getWALName().name())
186+
.build();
187+
}
188+
189+
private MessageID(MessageIDBuilder builder) {
190+
this.id = builder.id;
191+
this.walName = builder.walName;
192+
}
193+
194+
public static MessageIDBuilder builder() {
195+
return new MessageIDBuilder();
196+
}
197+
198+
public String getId() {
199+
return id;
200+
}
201+
202+
public void setId(String id) {
203+
this.id = id;
204+
}
205+
206+
public String getWalName() {
207+
return walName;
208+
}
209+
210+
public void setWalName(String walName) {
211+
this.walName = walName;
212+
}
213+
214+
@Override
215+
public String toString() {
216+
return "MessageID{" +
217+
"id='" + id + '\'' +
218+
", walName='" + walName + '\'' +
219+
'}';
220+
}
221+
222+
public static class MessageIDBuilder {
223+
private String id;
224+
private String walName;
225+
226+
public MessageIDBuilder id(String id) {
227+
this.id = id;
228+
return this;
229+
}
230+
231+
public MessageIDBuilder walName(String walName) {
232+
this.walName = walName;
233+
return this;
234+
}
235+
236+
public MessageID build() {
237+
return new MessageID(this);
238+
}
239+
}
240+
}
241+
}

0 commit comments

Comments
 (0)