Skip to content

Commit 16b8dad

Browse files
authored
Add flushAll/getFlushAllState (#499)
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent 478ccc6 commit 16b8dad

17 files changed

Lines changed: 768 additions & 0 deletions

src/impl/MilvusClientV2Impl.cpp

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1905,6 +1905,69 @@ MilvusClientV2Impl::Flush(const FlushRequest& request) {
19051905
nullptr, pre, &MilvusConnection::Flush, wait_for_status, nullptr);
19061906
}
19071907

1908+
Status
1909+
MilvusClientV2Impl::FlushAll(const FlushAllRequest& request, FlushAllResponse& response) {
1910+
auto pre = [&request](proto::milvus::FlushAllRequest& rpc_request) {
1911+
rpc_request.set_db_name(request.DatabaseName());
1912+
return Status::OK();
1913+
};
1914+
1915+
ProgressMonitor progress_monitor = ProgressMonitor::Forever();
1916+
if (request.WaitFlushedMs() > 0) {
1917+
progress_monitor = ProgressMonitor{static_cast<uint32_t>(request.WaitFlushedMs() + 999) / 1000};
1918+
}
1919+
1920+
auto wait_for_status = [this, &request, &progress_monitor](const proto::milvus::FlushAllResponse& rpc_response) {
1921+
GetFlushAllStateRequest state_request = GetFlushAllStateRequest()
1922+
.WithDatabaseName(request.DatabaseName())
1923+
.WithFlushAllTs(rpc_response.flush_all_ts());
1924+
return ConnectionHandler::WaitForStatus(
1925+
[this, &state_request](Progress& p) -> Status {
1926+
p.total_ = 1;
1927+
GetFlushAllStateResponse state_response;
1928+
auto status = getFlushAllState(state_request, state_response);
1929+
if (!status.IsOk()) {
1930+
return status;
1931+
}
1932+
p.finished_ = state_response.Flushed() ? 1 : 0;
1933+
return Status::OK();
1934+
},
1935+
progress_monitor);
1936+
};
1937+
1938+
auto post = [&response](const proto::milvus::FlushAllResponse& rpc_response) {
1939+
response.SetFlushAllTs(rpc_response.flush_all_ts());
1940+
return Status::OK();
1941+
};
1942+
1943+
return connection_.Invoke<proto::milvus::FlushAllRequest, proto::milvus::FlushAllResponse>(
1944+
nullptr, pre, &MilvusConnection::FlushAll, wait_for_status, post);
1945+
}
1946+
1947+
Status
1948+
MilvusClientV2Impl::GetFlushAllState(const GetFlushAllStateRequest& request, GetFlushAllStateResponse& response) {
1949+
return getFlushAllState(request, response);
1950+
}
1951+
1952+
Status
1953+
MilvusClientV2Impl::getFlushAllState(const GetFlushAllStateRequest& request, GetFlushAllStateResponse& response,
1954+
uint64_t rpc_timeout_ms) {
1955+
auto pre = [&request](proto::milvus::GetFlushAllStateRequest& rpc_request) {
1956+
rpc_request.set_db_name(request.DatabaseName());
1957+
rpc_request.set_flush_all_ts(request.FlushAllTs());
1958+
return Status::OK();
1959+
};
1960+
1961+
auto post = [&response](const proto::milvus::GetFlushAllStateResponse& rpc_response) {
1962+
response.SetFlushed(rpc_response.flushed());
1963+
return Status::OK();
1964+
};
1965+
1966+
return connection_
1967+
.InvokeWithRpcTimeout<proto::milvus::GetFlushAllStateRequest, proto::milvus::GetFlushAllStateResponse>(
1968+
rpc_timeout_ms, pre, &MilvusConnection::GetFlushAllState, post);
1969+
}
1970+
19081971
Status
19091972
MilvusClientV2Impl::ListPersistentSegments(const ListPersistentSegmentsRequest& request,
19101973
ListPersistentSegmentsResponse& response) {

src/impl/MilvusClientV2Impl.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,12 @@ class MilvusClientV2Impl : public MilvusClientV2, public std::enable_shared_from
233233
Status
234234
Flush(const FlushRequest& request) final;
235235

236+
Status
237+
FlushAll(const FlushAllRequest& request, FlushAllResponse& response) final;
238+
239+
Status
240+
GetFlushAllState(const GetFlushAllStateRequest& request, GetFlushAllStateResponse& response) final;
241+
236242
Status
237243
ListPersistentSegments(const ListPersistentSegmentsRequest& request,
238244
ListPersistentSegmentsResponse& response) final;
@@ -342,6 +348,10 @@ class MilvusClientV2Impl : public MilvusClientV2, public std::enable_shared_from
342348
Status
343349
getFlushState(const std::vector<int64_t>& segments, bool& flushed);
344350

351+
Status
352+
getFlushAllState(const GetFlushAllStateRequest& request, GetFlushAllStateResponse& response,
353+
uint64_t rpc_timeout_ms = 0);
354+
345355
Status
346356
describeCollection(const DescribeCollectionRequest& request, DescribeCollectionResponse& response,
347357
uint64_t rpc_timeout_ms = 0);

src/impl/MilvusConnection.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,19 @@ MilvusConnection::GetFlushState(const proto::milvus::GetFlushStateRequest& reque
530530
return grpcCall("GetFlushState", &Stub::GetFlushState, request, response, options);
531531
}
532532

533+
Status
534+
MilvusConnection::FlushAll(const proto::milvus::FlushAllRequest& request, proto::milvus::FlushAllResponse& response,
535+
const GrpcContextOptions& options) {
536+
return grpcCall("FlushAll", &Stub::FlushAll, request, response, options);
537+
}
538+
539+
Status
540+
MilvusConnection::GetFlushAllState(const proto::milvus::GetFlushAllStateRequest& request,
541+
proto::milvus::GetFlushAllStateResponse& response,
542+
const GrpcContextOptions& options) {
543+
return grpcCall("GetFlushAllState", &Stub::GetFlushAllState, request, response, options);
544+
}
545+
533546
Status
534547
MilvusConnection::GetPersistentSegmentInfo(const proto::milvus::GetPersistentSegmentInfoRequest& request,
535548
proto::milvus::GetPersistentSegmentInfoResponse& response,

src/impl/MilvusConnection.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,14 @@ class MilvusConnection {
284284
GetFlushState(const proto::milvus::GetFlushStateRequest& request, proto::milvus::GetFlushStateResponse& response,
285285
const GrpcContextOptions& options);
286286

287+
Status
288+
FlushAll(const proto::milvus::FlushAllRequest& request, proto::milvus::FlushAllResponse& response,
289+
const GrpcContextOptions& options);
290+
291+
Status
292+
GetFlushAllState(const proto::milvus::GetFlushAllStateRequest& request,
293+
proto::milvus::GetFlushAllStateResponse& response, const GrpcContextOptions& options);
294+
287295
Status
288296
GetPersistentSegmentInfo(const proto::milvus::GetPersistentSegmentInfoRequest& request,
289297
proto::milvus::GetPersistentSegmentInfoResponse& response,
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#include "milvus/request/utility/FlushAllRequest.h"
18+
19+
namespace milvus {
20+
21+
const std::string&
22+
FlushAllRequest::DatabaseName() const {
23+
return db_name_;
24+
}
25+
26+
void
27+
FlushAllRequest::SetDatabaseName(const std::string& db_name) {
28+
db_name_ = db_name;
29+
}
30+
31+
FlushAllRequest&
32+
FlushAllRequest::WithDatabaseName(const std::string& db_name) {
33+
SetDatabaseName(db_name);
34+
return *this;
35+
}
36+
37+
int64_t
38+
FlushAllRequest::WaitFlushedMs() const {
39+
return wait_flushed_ms_;
40+
}
41+
42+
void
43+
FlushAllRequest::SetWaitFlushedMs(int64_t ms) {
44+
wait_flushed_ms_ = ms;
45+
}
46+
47+
FlushAllRequest&
48+
FlushAllRequest::WithWaitFlushedMs(int64_t ms) {
49+
SetWaitFlushedMs(ms);
50+
return *this;
51+
}
52+
53+
} // namespace milvus
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#include "milvus/request/utility/GetFlushAllStateRequest.h"
18+
19+
namespace milvus {
20+
21+
const std::string&
22+
GetFlushAllStateRequest::DatabaseName() const {
23+
return db_name_;
24+
}
25+
26+
void
27+
GetFlushAllStateRequest::SetDatabaseName(const std::string& db_name) {
28+
db_name_ = db_name;
29+
}
30+
31+
GetFlushAllStateRequest&
32+
GetFlushAllStateRequest::WithDatabaseName(const std::string& db_name) {
33+
SetDatabaseName(db_name);
34+
return *this;
35+
}
36+
37+
uint64_t
38+
GetFlushAllStateRequest::FlushAllTs() const {
39+
return flush_all_ts_;
40+
}
41+
42+
void
43+
GetFlushAllStateRequest::SetFlushAllTs(uint64_t flush_all_ts) {
44+
flush_all_ts_ = flush_all_ts;
45+
}
46+
47+
GetFlushAllStateRequest&
48+
GetFlushAllStateRequest::WithFlushAllTs(uint64_t flush_all_ts) {
49+
SetFlushAllTs(flush_all_ts);
50+
return *this;
51+
}
52+
53+
} // namespace milvus
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#include "milvus/response/utility/FlushAllResponse.h"
18+
19+
namespace milvus {
20+
21+
uint64_t
22+
FlushAllResponse::FlushAllTs() const {
23+
return flush_all_ts_;
24+
}
25+
26+
void
27+
FlushAllResponse::SetFlushAllTs(uint64_t flush_all_ts) {
28+
flush_all_ts_ = flush_all_ts;
29+
}
30+
31+
} // namespace milvus
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#include "milvus/response/utility/GetFlushAllStateResponse.h"
18+
19+
namespace milvus {
20+
21+
bool
22+
GetFlushAllStateResponse::Flushed() const {
23+
return flushed_;
24+
}
25+
26+
void
27+
GetFlushAllStateResponse::SetFlushed(bool flushed) {
28+
flushed_ = flushed;
29+
}
30+
31+
} // namespace milvus

src/include/milvus/MilvusClientV2.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,10 @@
9797
#include "request/resourcegroup/UpdateResourceGroupsRequest.h"
9898
#include "request/utility/CheckHealthRequest.h"
9999
#include "request/utility/CompactRequest.h"
100+
#include "request/utility/FlushAllRequest.h"
100101
#include "request/utility/FlushRequest.h"
101102
#include "request/utility/GetCompactionRequest.h"
103+
#include "request/utility/GetFlushAllStateRequest.h"
102104
#include "request/utility/ListSegmentsRequest.h"
103105
#include "request/utility/OptimizeRequest.h"
104106
#include "request/utility/RunAnalyzerRequest.h"
@@ -131,8 +133,10 @@
131133
#include "response/resourcegroup/ListResourceGroupsResponse.h"
132134
#include "response/utility/CheckHealthResponse.h"
133135
#include "response/utility/CompactResponse.h"
136+
#include "response/utility/FlushAllResponse.h"
134137
#include "response/utility/GetCompactionPlansResponse.h"
135138
#include "response/utility/GetCompactionStateResponse.h"
139+
#include "response/utility/GetFlushAllStateResponse.h"
136140
#include "response/utility/ListSegmentsResponse.h"
137141
#include "response/utility/OptimizeResponse.h"
138142
#include "response/utility/RunAnalyzerResponse.h"
@@ -824,6 +828,28 @@ class MILVUS_SDK_API MilvusClientV2 {
824828
virtual Status
825829
Flush(const FlushRequest& request) = 0;
826830

831+
/**
832+
* @brief Flush all insert buffer data into storage.
833+
* It will check flush-all state in a loop to make sure the data persisted successfully.
834+
* FlushAllRequest.WaitFlushedMs controls the wait timeout; zero means forever.
835+
*
836+
* @param [in] request input parameters
837+
* @param [out] response output results
838+
* @return Status operation successfully or not
839+
*/
840+
virtual Status
841+
FlushAll(const FlushAllRequest& request, FlushAllResponse& response) = 0;
842+
843+
/**
844+
* @brief Get flush-all action state.
845+
*
846+
* @param [in] request input parameters
847+
* @param [out] response output results
848+
* @return Status operation successfully or not
849+
*/
850+
virtual Status
851+
GetFlushAllState(const GetFlushAllStateRequest& request, GetFlushAllStateResponse& response) = 0;
852+
827853
/**
828854
* @brief Retrieve information of persisted segments from data nodes.
829855
*

0 commit comments

Comments
 (0)