Skip to content

Commit 89b654f

Browse files
authored
Add GetReplicateInfo interface (#501)
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent 9f58b5c commit 89b654f

18 files changed

Lines changed: 596 additions & 0 deletions

src/impl/MilvusClientV2Impl.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2458,6 +2458,27 @@ MilvusClientV2Impl::UpdateReplicateConfiguration(const UpdateReplicateConfigurat
24582458
pre, &MilvusConnection::UpdateReplicateConfiguration, nullptr);
24592459
}
24602460

2461+
Status
2462+
MilvusClientV2Impl::GetReplicateInfo(const GetReplicateInfoRequest& request, GetReplicateInfoResponse& response) {
2463+
auto pre = [&request](proto::milvus::GetReplicateInfoRequest& rpc_request) {
2464+
rpc_request.set_source_cluster_id(request.SourceClusterID());
2465+
rpc_request.set_target_pchannel(request.TargetPChannel());
2466+
return Status::OK();
2467+
};
2468+
2469+
auto post = [&response](const proto::milvus::GetReplicateInfoResponse& rpc_response) {
2470+
ReplicateCheckpoint checkpoint;
2471+
if (rpc_response.has_checkpoint()) {
2472+
ConvertReplicateCheckpoint(rpc_response.checkpoint(), checkpoint);
2473+
}
2474+
response.SetCheckpoint(std::move(checkpoint));
2475+
return Status::OK();
2476+
};
2477+
2478+
return connection_.Invoke<proto::milvus::GetReplicateInfoRequest, proto::milvus::GetReplicateInfoResponse>(
2479+
pre, &MilvusConnection::GetReplicateInfo, post);
2480+
}
2481+
24612482
Status
24622483
MilvusClientV2Impl::CreateResourceGroup(const CreateResourceGroupRequest& request) {
24632484
auto pre = [&request](proto::milvus::CreateResourceGroupRequest& rpc_request) {

src/impl/MilvusClientV2Impl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,9 @@ class MilvusClientV2Impl : public MilvusClientV2, public std::enable_shared_from
265265
Status
266266
UpdateReplicateConfiguration(const UpdateReplicateConfigurationRequest& request) final;
267267

268+
Status
269+
GetReplicateInfo(const GetReplicateInfoRequest& request, GetReplicateInfoResponse& response) final;
270+
268271
Status
269272
CreateResourceGroup(const CreateResourceGroupRequest& request) final;
270273

src/impl/MilvusConnection.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ MilvusConnection::StatusByProtoResponse(const proto::common::Status& status) {
7373
return Status::OK();
7474
}
7575

76+
Status
77+
MilvusConnection::StatusByProtoResponse(const proto::milvus::GetReplicateInfoResponse& response) {
78+
(void)response;
79+
return Status::OK();
80+
}
81+
7682
template <typename Response>
7783
Status
7884
MilvusConnection::StatusByProtoResponse(const Response& response) {
@@ -670,6 +676,13 @@ MilvusConnection::UpdateReplicateConfiguration(const proto::milvus::UpdateReplic
670676
return grpcCall("UpdateReplicateConfiguration", &Stub::UpdateReplicateConfiguration, request, response, options);
671677
}
672678

679+
Status
680+
MilvusConnection::GetReplicateInfo(const proto::milvus::GetReplicateInfoRequest& request,
681+
proto::milvus::GetReplicateInfoResponse& response,
682+
const GrpcContextOptions& options) {
683+
return grpcCall("GetReplicateInfo", &Stub::GetReplicateInfo, request, response, options);
684+
}
685+
673686
Status
674687
MilvusConnection::SelectUser(const proto::milvus::SelectUserRequest& request,
675688
proto::milvus::SelectUserResponse& response, const GrpcContextOptions& options) {

src/impl/MilvusConnection.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,10 @@ class MilvusConnection {
373373
UpdateReplicateConfiguration(const proto::milvus::UpdateReplicateConfigurationRequest& request,
374374
proto::common::Status& response, const GrpcContextOptions& options);
375375

376+
Status
377+
GetReplicateInfo(const proto::milvus::GetReplicateInfoRequest& request,
378+
proto::milvus::GetReplicateInfoResponse& response, const GrpcContextOptions& options);
379+
376380
Status
377381
SelectUser(const proto::milvus::SelectUserRequest& request, proto::milvus::SelectUserResponse& response,
378382
const GrpcContextOptions& options);
@@ -426,6 +430,9 @@ class MilvusConnection {
426430
static Status
427431
StatusByProtoResponse(const proto::common::Status& status);
428432

433+
static Status
434+
StatusByProtoResponse(const proto::milvus::GetReplicateInfoResponse& response);
435+
429436
template <typename Response>
430437
static Status
431438
StatusByProtoResponse(const Response& response);
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#include "milvus/request/cdc/GetReplicateInfoRequest.h"
18+
19+
namespace milvus {
20+
21+
const std::string&
22+
GetReplicateInfoRequest::SourceClusterID() const {
23+
return source_cluster_id_;
24+
}
25+
26+
void
27+
GetReplicateInfoRequest::SetSourceClusterID(const std::string& source_cluster_id) {
28+
source_cluster_id_ = source_cluster_id;
29+
}
30+
31+
GetReplicateInfoRequest&
32+
GetReplicateInfoRequest::WithSourceClusterID(const std::string& source_cluster_id) {
33+
SetSourceClusterID(source_cluster_id);
34+
return *this;
35+
}
36+
37+
const std::string&
38+
GetReplicateInfoRequest::TargetPChannel() const {
39+
return target_pchannel_;
40+
}
41+
42+
void
43+
GetReplicateInfoRequest::SetTargetPChannel(const std::string& target_pchannel) {
44+
target_pchannel_ = target_pchannel;
45+
}
46+
47+
GetReplicateInfoRequest&
48+
GetReplicateInfoRequest::WithTargetPChannel(const std::string& target_pchannel) {
49+
SetTargetPChannel(target_pchannel);
50+
return *this;
51+
}
52+
53+
} // namespace milvus
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#include "milvus/response/cdc/GetReplicateInfoResponse.h"
18+
19+
#include <utility>
20+
21+
namespace milvus {
22+
23+
const ReplicateCheckpoint&
24+
GetReplicateInfoResponse::Checkpoint() const {
25+
return checkpoint_;
26+
}
27+
28+
void
29+
GetReplicateInfoResponse::SetCheckpoint(ReplicateCheckpoint&& checkpoint) {
30+
checkpoint_ = std::move(checkpoint);
31+
}
32+
33+
} // namespace milvus

src/impl/types/ReplicateConfiguration.cpp

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,102 @@ CrossClusterTopology::WithTargetClusterID(const std::string& cluster_id) {
122122
return *this;
123123
}
124124

125+
const std::string&
126+
ReplicateMessageID::ID() const {
127+
return id_;
128+
}
129+
130+
void
131+
ReplicateMessageID::SetID(const std::string& id) {
132+
id_ = id;
133+
}
134+
135+
ReplicateMessageID&
136+
ReplicateMessageID::WithID(const std::string& id) {
137+
SetID(id);
138+
return *this;
139+
}
140+
141+
const std::string&
142+
ReplicateMessageID::WalName() const {
143+
return wal_name_;
144+
}
145+
146+
void
147+
ReplicateMessageID::SetWalName(const std::string& wal_name) {
148+
wal_name_ = wal_name;
149+
}
150+
151+
ReplicateMessageID&
152+
ReplicateMessageID::WithWalName(const std::string& wal_name) {
153+
SetWalName(wal_name);
154+
return *this;
155+
}
156+
157+
const std::string&
158+
ReplicateCheckpoint::ClusterID() const {
159+
return cluster_id_;
160+
}
161+
162+
void
163+
ReplicateCheckpoint::SetClusterID(const std::string& cluster_id) {
164+
cluster_id_ = cluster_id;
165+
}
166+
167+
ReplicateCheckpoint&
168+
ReplicateCheckpoint::WithClusterID(const std::string& cluster_id) {
169+
SetClusterID(cluster_id);
170+
return *this;
171+
}
172+
173+
const std::string&
174+
ReplicateCheckpoint::PChannel() const {
175+
return pchannel_;
176+
}
177+
178+
void
179+
ReplicateCheckpoint::SetPChannel(const std::string& pchannel) {
180+
pchannel_ = pchannel;
181+
}
182+
183+
ReplicateCheckpoint&
184+
ReplicateCheckpoint::WithPChannel(const std::string& pchannel) {
185+
SetPChannel(pchannel);
186+
return *this;
187+
}
188+
189+
const ReplicateMessageID&
190+
ReplicateCheckpoint::MessageID() const {
191+
return message_id_;
192+
}
193+
194+
void
195+
ReplicateCheckpoint::SetMessageID(ReplicateMessageID&& message_id) {
196+
message_id_ = std::move(message_id);
197+
}
198+
199+
ReplicateCheckpoint&
200+
ReplicateCheckpoint::WithMessageID(ReplicateMessageID&& message_id) {
201+
SetMessageID(std::move(message_id));
202+
return *this;
203+
}
204+
205+
uint64_t
206+
ReplicateCheckpoint::TimeTick() const {
207+
return time_tick_;
208+
}
209+
210+
void
211+
ReplicateCheckpoint::SetTimeTick(uint64_t time_tick) {
212+
time_tick_ = time_tick;
213+
}
214+
215+
ReplicateCheckpoint&
216+
ReplicateCheckpoint::WithTimeTick(uint64_t time_tick) {
217+
SetTimeTick(time_tick);
218+
return *this;
219+
}
220+
125221
const std::vector<MilvusCluster>&
126222
ReplicateConfiguration::Clusters() const {
127223
return clusters_;

src/impl/utils/TypeUtils.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,21 @@ ConvertReplicateConfiguration(const proto::common::ReplicateConfiguration& rpc_c
886886
configuration.SetCrossClusterTopologies(std::move(topologies));
887887
}
888888

889+
void
890+
ConvertReplicateCheckpoint(const proto::common::ReplicateCheckpoint& rpc_checkpoint,
891+
ReplicateCheckpoint& checkpoint) {
892+
checkpoint.SetClusterID(rpc_checkpoint.cluster_id());
893+
checkpoint.SetPChannel(rpc_checkpoint.pchannel());
894+
checkpoint.SetTimeTick(rpc_checkpoint.time_tick());
895+
896+
ReplicateMessageID message_id;
897+
if (rpc_checkpoint.has_message_id()) {
898+
message_id.SetID(rpc_checkpoint.message_id().id());
899+
message_id.SetWalName(proto::common::WALName_Name(rpc_checkpoint.message_id().wal_name()));
900+
}
901+
checkpoint.SetMessageID(std::move(message_id));
902+
}
903+
889904
bool
890905
IsValidTemplate(const nlohmann::json& filter_template) {
891906
return filter_template.is_boolean() || filter_template.is_number() || filter_template.is_string();

src/impl/utils/TypeUtils.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ void
136136
ConvertReplicateConfiguration(const proto::common::ReplicateConfiguration& rpc_config,
137137
ReplicateConfiguration& configuration);
138138

139+
void
140+
ConvertReplicateCheckpoint(const proto::common::ReplicateCheckpoint& rpc_checkpoint,
141+
ReplicateCheckpoint& checkpoint);
142+
139143
bool
140144
IsValidTemplate(const nlohmann::json& filter_template);
141145

src/include/milvus/MilvusClientV2.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include "request/collection/RenameCollectionRequest.h"
4848
#include "request/collection/TruncateCollectionRequest.h"
4949
#include "request/cdc/GetReplicateConfigurationRequest.h"
50+
#include "request/cdc/GetReplicateInfoRequest.h"
5051
#include "request/cdc/UpdateReplicateConfigurationRequest.h"
5152
#include "request/database/AlterDatabasePropertiesRequest.h"
5253
#include "request/database/CreateDatabaseRequest.h"
@@ -107,6 +108,7 @@
107108
#include "response/alias/DescribeAliasResponse.h"
108109
#include "response/alias/ListAliasesResponse.h"
109110
#include "response/cdc/GetReplicateConfigurationResponse.h"
111+
#include "response/cdc/GetReplicateInfoResponse.h"
110112
#include "response/collection/BatchDescribeCollectionsResponse.h"
111113
#include "response/collection/DescribeCollectionResponse.h"
112114
#include "response/collection/DescribeReplicasResponse.h"
@@ -932,6 +934,16 @@ class MILVUS_SDK_API MilvusClientV2 {
932934
virtual Status
933935
UpdateReplicateConfiguration(const UpdateReplicateConfigurationRequest& request) = 0;
934936

937+
/**
938+
* @brief Get replicate information for a channel.
939+
*
940+
* @param [in] request input parameters
941+
* @param [out] response output results
942+
* @return Status operation successfully or not
943+
*/
944+
virtual Status
945+
GetReplicateInfo(const GetReplicateInfoRequest& request, GetReplicateInfoResponse& response) = 0;
946+
935947
/**
936948
* @brief Create a resource group. A resource group to physically isolate certain query nodes from others.
937949
* Read the doc for more info: https://milvus.io/docs/resource_group.md#Manage-Resource-Groups

0 commit comments

Comments
 (0)