Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions src/impl/MilvusClientV2Impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1905,6 +1905,69 @@ MilvusClientV2Impl::Flush(const FlushRequest& request) {
nullptr, pre, &MilvusConnection::Flush, wait_for_status, nullptr);
}

Status
MilvusClientV2Impl::FlushAll(const FlushAllRequest& request, FlushAllResponse& response) {
auto pre = [&request](proto::milvus::FlushAllRequest& rpc_request) {
rpc_request.set_db_name(request.DatabaseName());
return Status::OK();
};

ProgressMonitor progress_monitor = ProgressMonitor::Forever();
if (request.WaitFlushedMs() > 0) {
progress_monitor = ProgressMonitor{static_cast<uint32_t>(request.WaitFlushedMs() + 999) / 1000};
}

auto wait_for_status = [this, &request, &progress_monitor](const proto::milvus::FlushAllResponse& rpc_response) {
GetFlushAllStateRequest state_request = GetFlushAllStateRequest()
.WithDatabaseName(request.DatabaseName())
.WithFlushAllTs(rpc_response.flush_all_ts());
return ConnectionHandler::WaitForStatus(
[this, &state_request](Progress& p) -> Status {
p.total_ = 1;
GetFlushAllStateResponse state_response;
auto status = getFlushAllState(state_request, state_response);
if (!status.IsOk()) {
return status;
}
p.finished_ = state_response.Flushed() ? 1 : 0;
return Status::OK();
},
progress_monitor);
};

auto post = [&response](const proto::milvus::FlushAllResponse& rpc_response) {
response.SetFlushAllTs(rpc_response.flush_all_ts());
return Status::OK();
};

return connection_.Invoke<proto::milvus::FlushAllRequest, proto::milvus::FlushAllResponse>(
nullptr, pre, &MilvusConnection::FlushAll, wait_for_status, post);
}

Status
MilvusClientV2Impl::GetFlushAllState(const GetFlushAllStateRequest& request, GetFlushAllStateResponse& response) {
return getFlushAllState(request, response);
}

Status
MilvusClientV2Impl::getFlushAllState(const GetFlushAllStateRequest& request, GetFlushAllStateResponse& response,
uint64_t rpc_timeout_ms) {
auto pre = [&request](proto::milvus::GetFlushAllStateRequest& rpc_request) {
rpc_request.set_db_name(request.DatabaseName());
rpc_request.set_flush_all_ts(request.FlushAllTs());
return Status::OK();
};

auto post = [&response](const proto::milvus::GetFlushAllStateResponse& rpc_response) {
response.SetFlushed(rpc_response.flushed());
return Status::OK();
};

return connection_
.InvokeWithRpcTimeout<proto::milvus::GetFlushAllStateRequest, proto::milvus::GetFlushAllStateResponse>(
rpc_timeout_ms, pre, &MilvusConnection::GetFlushAllState, post);
}

Status
MilvusClientV2Impl::ListPersistentSegments(const ListPersistentSegmentsRequest& request,
ListPersistentSegmentsResponse& response) {
Expand Down
10 changes: 10 additions & 0 deletions src/impl/MilvusClientV2Impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@ class MilvusClientV2Impl : public MilvusClientV2, public std::enable_shared_from
Status
Flush(const FlushRequest& request) final;

Status
FlushAll(const FlushAllRequest& request, FlushAllResponse& response) final;

Status
GetFlushAllState(const GetFlushAllStateRequest& request, GetFlushAllStateResponse& response) final;

Status
ListPersistentSegments(const ListPersistentSegmentsRequest& request,
ListPersistentSegmentsResponse& response) final;
Expand Down Expand Up @@ -342,6 +348,10 @@ class MilvusClientV2Impl : public MilvusClientV2, public std::enable_shared_from
Status
getFlushState(const std::vector<int64_t>& segments, bool& flushed);

Status
getFlushAllState(const GetFlushAllStateRequest& request, GetFlushAllStateResponse& response,
uint64_t rpc_timeout_ms = 0);

Status
describeCollection(const DescribeCollectionRequest& request, DescribeCollectionResponse& response,
uint64_t rpc_timeout_ms = 0);
Expand Down
13 changes: 13 additions & 0 deletions src/impl/MilvusConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,19 @@ MilvusConnection::GetFlushState(const proto::milvus::GetFlushStateRequest& reque
return grpcCall("GetFlushState", &Stub::GetFlushState, request, response, options);
}

Status
MilvusConnection::FlushAll(const proto::milvus::FlushAllRequest& request, proto::milvus::FlushAllResponse& response,
const GrpcContextOptions& options) {
return grpcCall("FlushAll", &Stub::FlushAll, request, response, options);
}

Status
MilvusConnection::GetFlushAllState(const proto::milvus::GetFlushAllStateRequest& request,
proto::milvus::GetFlushAllStateResponse& response,
const GrpcContextOptions& options) {
return grpcCall("GetFlushAllState", &Stub::GetFlushAllState, request, response, options);
}

Status
MilvusConnection::GetPersistentSegmentInfo(const proto::milvus::GetPersistentSegmentInfoRequest& request,
proto::milvus::GetPersistentSegmentInfoResponse& response,
Expand Down
8 changes: 8 additions & 0 deletions src/impl/MilvusConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,14 @@ class MilvusConnection {
GetFlushState(const proto::milvus::GetFlushStateRequest& request, proto::milvus::GetFlushStateResponse& response,
const GrpcContextOptions& options);

Status
FlushAll(const proto::milvus::FlushAllRequest& request, proto::milvus::FlushAllResponse& response,
const GrpcContextOptions& options);

Status
GetFlushAllState(const proto::milvus::GetFlushAllStateRequest& request,
proto::milvus::GetFlushAllStateResponse& response, const GrpcContextOptions& options);

Status
GetPersistentSegmentInfo(const proto::milvus::GetPersistentSegmentInfoRequest& request,
proto::milvus::GetPersistentSegmentInfoResponse& response,
Expand Down
53 changes: 53 additions & 0 deletions src/impl/request/utility/FlushAllRequest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "milvus/request/utility/FlushAllRequest.h"

namespace milvus {

const std::string&
FlushAllRequest::DatabaseName() const {
return db_name_;
}

void
FlushAllRequest::SetDatabaseName(const std::string& db_name) {
db_name_ = db_name;
}

FlushAllRequest&
FlushAllRequest::WithDatabaseName(const std::string& db_name) {
SetDatabaseName(db_name);
return *this;
}

int64_t
FlushAllRequest::WaitFlushedMs() const {
return wait_flushed_ms_;
}

void
FlushAllRequest::SetWaitFlushedMs(int64_t ms) {
wait_flushed_ms_ = ms;
}

FlushAllRequest&
FlushAllRequest::WithWaitFlushedMs(int64_t ms) {
SetWaitFlushedMs(ms);
return *this;
}

} // namespace milvus
53 changes: 53 additions & 0 deletions src/impl/request/utility/GetFlushAllStateRequest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "milvus/request/utility/GetFlushAllStateRequest.h"

namespace milvus {

const std::string&
GetFlushAllStateRequest::DatabaseName() const {
return db_name_;
}

void
GetFlushAllStateRequest::SetDatabaseName(const std::string& db_name) {
db_name_ = db_name;
}

GetFlushAllStateRequest&
GetFlushAllStateRequest::WithDatabaseName(const std::string& db_name) {
SetDatabaseName(db_name);
return *this;
}

uint64_t
GetFlushAllStateRequest::FlushAllTs() const {
return flush_all_ts_;
}

void
GetFlushAllStateRequest::SetFlushAllTs(uint64_t flush_all_ts) {
flush_all_ts_ = flush_all_ts;
}

GetFlushAllStateRequest&
GetFlushAllStateRequest::WithFlushAllTs(uint64_t flush_all_ts) {
SetFlushAllTs(flush_all_ts);
return *this;
}

} // namespace milvus
31 changes: 31 additions & 0 deletions src/impl/response/utility/FlushAllResponse.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "milvus/response/utility/FlushAllResponse.h"

namespace milvus {

uint64_t
FlushAllResponse::FlushAllTs() const {
return flush_all_ts_;
}

void
FlushAllResponse::SetFlushAllTs(uint64_t flush_all_ts) {
flush_all_ts_ = flush_all_ts;
}

} // namespace milvus
31 changes: 31 additions & 0 deletions src/impl/response/utility/GetFlushAllStateResponse.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "milvus/response/utility/GetFlushAllStateResponse.h"

namespace milvus {

bool
GetFlushAllStateResponse::Flushed() const {
return flushed_;
}

void
GetFlushAllStateResponse::SetFlushed(bool flushed) {
flushed_ = flushed;
}

} // namespace milvus
26 changes: 26 additions & 0 deletions src/include/milvus/MilvusClientV2.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@
#include "request/resourcegroup/UpdateResourceGroupsRequest.h"
#include "request/utility/CheckHealthRequest.h"
#include "request/utility/CompactRequest.h"
#include "request/utility/FlushAllRequest.h"
#include "request/utility/FlushRequest.h"
#include "request/utility/GetCompactionRequest.h"
#include "request/utility/GetFlushAllStateRequest.h"
#include "request/utility/ListSegmentsRequest.h"
#include "request/utility/OptimizeRequest.h"
#include "request/utility/RunAnalyzerRequest.h"
Expand Down Expand Up @@ -131,8 +133,10 @@
#include "response/resourcegroup/ListResourceGroupsResponse.h"
#include "response/utility/CheckHealthResponse.h"
#include "response/utility/CompactResponse.h"
#include "response/utility/FlushAllResponse.h"
#include "response/utility/GetCompactionPlansResponse.h"
#include "response/utility/GetCompactionStateResponse.h"
#include "response/utility/GetFlushAllStateResponse.h"
#include "response/utility/ListSegmentsResponse.h"
#include "response/utility/OptimizeResponse.h"
#include "response/utility/RunAnalyzerResponse.h"
Expand Down Expand Up @@ -824,6 +828,28 @@ class MILVUS_SDK_API MilvusClientV2 {
virtual Status
Flush(const FlushRequest& request) = 0;

/**
* @brief Flush all insert buffer data into storage.
* It will check flush-all state in a loop to make sure the data persisted successfully.
* FlushAllRequest.WaitFlushedMs controls the wait timeout; zero means forever.
*
* @param [in] request input parameters
* @param [out] response output results
* @return Status operation successfully or not
*/
virtual Status
FlushAll(const FlushAllRequest& request, FlushAllResponse& response) = 0;

/**
* @brief Get flush-all action state.
*
* @param [in] request input parameters
* @param [out] response output results
* @return Status operation successfully or not
*/
virtual Status
GetFlushAllState(const GetFlushAllStateRequest& request, GetFlushAllStateResponse& response) = 0;

/**
* @brief Retrieve information of persisted segments from data nodes.
*
Expand Down
Loading
Loading