Skip to content

Commit 7b6a2ec

Browse files
committed
Add RefreshLoad/Optimize interface
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent e249f46 commit 7b6a2ec

17 files changed

Lines changed: 1383 additions & 2 deletions

File tree

examples/src/v2/optimize.cpp

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#include <chrono>
18+
#include <cstdint>
19+
#include <iostream>
20+
#include <string>
21+
#include <thread>
22+
23+
#include "ExampleUtils.h"
24+
#include "milvus/MilvusClientV2.h"
25+
26+
namespace {
27+
const char* const collection_name = "cpp_sdk_example_optimize_v2";
28+
const char* const id_field = "id";
29+
const char* const vector_field = "vector";
30+
const uint32_t vector_dim = 512;
31+
const int64_t total_rows = 1000000;
32+
const int64_t batch_size = 10000;
33+
34+
int
35+
printSegmentInfo(milvus::MilvusClientV2Ptr& client) {
36+
milvus::ListQuerySegmentsResponse response;
37+
auto status = client->ListQuerySegments(
38+
milvus::ListQuerySegmentsRequest().WithCollectionName(collection_name), response);
39+
util::CheckStatus("list query segments", status);
40+
41+
const auto& segments = response.Result();
42+
std::cout << " Total segments: " << segments.size() << std::endl;
43+
44+
int64_t total_rows_in_segments = 0;
45+
for (const auto& segment : segments) {
46+
std::cout << " Segment " << segment.SegmentID() << ": rows=" << segment.RowCount()
47+
<< ", state=" << static_cast<int>(segment.State()) << ", index=" << segment.IndexName()
48+
<< std::endl;
49+
total_rows_in_segments += segment.RowCount();
50+
}
51+
std::cout << " Total rows across segments: " << total_rows_in_segments << std::endl;
52+
return static_cast<int>(segments.size());
53+
}
54+
55+
} // namespace
56+
57+
int
58+
main(int argc, char* argv[]) {
59+
printf("Example start...\n");
60+
61+
auto client = milvus::MilvusClientV2::Create();
62+
63+
milvus::ConnectParam connect_param{"http://localhost:19530"};
64+
auto status = client->Connect(connect_param);
65+
util::CheckStatus("connect milvus server", status);
66+
67+
std::cout << "========== Step 1: Create collection ==========" << std::endl;
68+
status = client->DropCollection(milvus::DropCollectionRequest().WithCollectionName(collection_name));
69+
70+
milvus::CollectionSchemaPtr schema = std::make_shared<milvus::CollectionSchema>();
71+
schema->AddField(milvus::FieldSchema(id_field, milvus::DataType::INT64, "", true, true));
72+
schema->AddField(milvus::FieldSchema(vector_field, milvus::DataType::FLOAT_VECTOR).WithDimension(vector_dim));
73+
74+
status = client->CreateCollection(
75+
milvus::CreateCollectionRequest().WithCollectionName(collection_name).WithCollectionSchema(schema));
76+
util::CheckStatus("create collection: " + std::string(collection_name), status);
77+
78+
std::cout << "========== Step 2: Insert 1,000,000 rows ==========" << std::endl;
79+
int64_t total_inserted = 0;
80+
for (int64_t batch = 0; batch < total_rows / batch_size; ++batch) {
81+
milvus::EntityRows rows;
82+
rows.reserve(batch_size);
83+
for (int64_t i = 0; i < batch_size; ++i) {
84+
milvus::EntityRow row;
85+
row[vector_field] = util::GenerateFloatVector(vector_dim);
86+
rows.emplace_back(std::move(row));
87+
}
88+
89+
milvus::InsertResponse insert_response;
90+
status = client->Insert(milvus::InsertRequest().WithCollectionName(collection_name).WithRowsData(std::move(rows)),
91+
insert_response);
92+
util::CheckStatus("insert", status);
93+
94+
total_inserted += insert_response.Results().InsertCount();
95+
if ((batch + 1) % 10 == 0) {
96+
std::cout << " Inserted " << total_inserted << " / " << total_rows << " rows" << std::endl;
97+
}
98+
}
99+
100+
status = client->Flush(milvus::FlushRequest().AddCollectionName(collection_name));
101+
util::CheckStatus("flush", status);
102+
std::cout << "Total inserted: " << total_inserted << " rows" << std::endl;
103+
104+
std::cout << "========== Step 3: Create IVF_FLAT index ==========" << std::endl;
105+
milvus::IndexDesc index(vector_field, "", milvus::IndexType::IVF_FLAT, milvus::MetricType::L2);
106+
index.AddExtraParam("nlist", "32");
107+
status = client->CreateIndex(milvus::CreateIndexRequest()
108+
.WithCollectionName(collection_name)
109+
.AddIndex(std::move(index))
110+
.WithTimeoutMs(100000));
111+
util::CheckStatus("create IVF_FLAT index", status);
112+
113+
std::cout << "========== Step 4: Load collection ==========" << std::endl;
114+
status = client->LoadCollection(milvus::LoadCollectionRequest().WithCollectionName(collection_name));
115+
util::CheckStatus("load collection", status);
116+
117+
std::cout << "========== Step 5: Query segment info (before optimize) ==========" << std::endl;
118+
printSegmentInfo(client);
119+
120+
std::cout << "========== Step 6: Optimize (targetSize=4GB, sync) ==========" << std::endl;
121+
const auto start_time = std::chrono::steady_clock::now();
122+
milvus::OptimizeTaskPtr task;
123+
status = client->Optimize(milvus::OptimizeRequest().WithCollectionName(collection_name).WithTargetSize("4GB"), task);
124+
util::CheckStatus("optimize", status);
125+
126+
milvus::OptimizeResponse optimize_response;
127+
status = task->GetResult(optimize_response);
128+
util::CheckStatus("get optimize result", status);
129+
130+
const auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
131+
std::cout << "Optimize completed in " << elapsed.count() / 1000.0 << " seconds" << std::endl;
132+
std::cout << " Status: " << optimize_response.StatusText() << std::endl;
133+
std::cout << " Compaction ID: " << optimize_response.CompactionID() << std::endl;
134+
std::cout << " Progress:";
135+
for (const auto& progress : optimize_response.ProgressHistory()) {
136+
std::cout << " " << progress;
137+
}
138+
std::cout << std::endl;
139+
140+
std::cout << "========== Step 7: Query segment info (after optimize) ==========" << std::endl;
141+
while (true) {
142+
int segment_count = printSegmentInfo(client);
143+
if (segment_count == 1) {
144+
std::cout << "Optimization successful, only one segment remains" << std::endl;
145+
break;
146+
}
147+
std::cout << "Waiting for optimization to complete..." << std::endl;
148+
std::this_thread::sleep_for(std::chrono::seconds(1));
149+
}
150+
151+
client->Disconnect();
152+
return 0;
153+
}

0 commit comments

Comments
 (0)