Skip to content

Commit 2b82c99

Browse files
committed
Add RefreshLoad/Optimize interface
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent e249f46 commit 2b82c99

31 files changed

Lines changed: 2160 additions & 90 deletions

.github/workflows/main.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ on:
2727
- '.clang-format'
2828
- '.clang-tidy'
2929

30+
concurrency:
31+
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
32+
cancel-in-progress: ${{ github.event_name == 'pull_request' }}
33+
3034
jobs:
3135
# Detect whether the PR touches core SDK files (src, cmake, test,
3236
# thirdparty, scripts, workflow, lint configs) or only peripheral files

.github/workflows/release.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ on:
66
tags:
77
- v*
88

9+
concurrency:
10+
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
11+
cancel-in-progress: true
12+
913
jobs:
1014
linux:
1115
name: Release package for ${{ matrix.os.distro }} ${{ matrix.os.version }}

examples/src/v2/optimize.cpp

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#include <chrono>
18+
#include <cstdint>
19+
#include <iostream>
20+
#include <string>
21+
#include <thread>
22+
23+
#include "ExampleUtils.h"
24+
#include "milvus/MilvusClientV2.h"
25+
26+
namespace {
27+
const char* const collection_name = "cpp_sdk_example_optimize_v2";
28+
const char* const id_field = "id";
29+
const char* const vector_field = "vector";
30+
const uint32_t vector_dim = 512;
31+
const int64_t total_rows = 1000000;
32+
const int64_t batch_size = 10000;
33+
34+
int
35+
printSegmentInfo(milvus::MilvusClientV2Ptr& client) {
36+
milvus::ListQuerySegmentsResponse response;
37+
auto status =
38+
client->ListQuerySegments(milvus::ListQuerySegmentsRequest().WithCollectionName(collection_name), response);
39+
util::CheckStatus("list query segments", status);
40+
41+
const auto& segments = response.Result();
42+
std::cout << " Total segments: " << segments.size() << std::endl;
43+
44+
int64_t total_rows_in_segments = 0;
45+
for (const auto& segment : segments) {
46+
std::cout << " Segment " << segment.SegmentID() << ": rows=" << segment.RowCount()
47+
<< ", state=" << static_cast<int>(segment.State()) << ", index=" << segment.IndexName() << std::endl;
48+
total_rows_in_segments += segment.RowCount();
49+
}
50+
std::cout << " Total rows across segments: " << total_rows_in_segments << std::endl;
51+
return static_cast<int>(segments.size());
52+
}
53+
54+
} // namespace
55+
56+
int
57+
main(int argc, char* argv[]) {
58+
printf("Example start...\n");
59+
60+
auto client = milvus::MilvusClientV2::Create();
61+
62+
milvus::ConnectParam connect_param{"http://localhost:19530"};
63+
auto status = client->Connect(connect_param);
64+
util::CheckStatus("connect milvus server", status);
65+
66+
std::cout << "========== Step 1: Create collection ==========" << std::endl;
67+
status = client->DropCollection(milvus::DropCollectionRequest().WithCollectionName(collection_name));
68+
69+
milvus::CollectionSchemaPtr schema = std::make_shared<milvus::CollectionSchema>();
70+
schema->AddField(milvus::FieldSchema(id_field, milvus::DataType::INT64, "", true, true));
71+
schema->AddField(milvus::FieldSchema(vector_field, milvus::DataType::FLOAT_VECTOR).WithDimension(vector_dim));
72+
73+
status = client->CreateCollection(
74+
milvus::CreateCollectionRequest().WithCollectionName(collection_name).WithCollectionSchema(schema));
75+
util::CheckStatus("create collection: " + std::string(collection_name), status);
76+
77+
std::cout << "========== Step 2: Insert 1,000,000 rows ==========" << std::endl;
78+
int64_t total_inserted = 0;
79+
for (int64_t batch = 0; batch < total_rows / batch_size; ++batch) {
80+
milvus::EntityRows rows;
81+
rows.reserve(batch_size);
82+
for (int64_t i = 0; i < batch_size; ++i) {
83+
milvus::EntityRow row;
84+
row[vector_field] = util::GenerateFloatVector(vector_dim);
85+
rows.emplace_back(std::move(row));
86+
}
87+
88+
milvus::InsertResponse insert_response;
89+
status = client->Insert(
90+
milvus::InsertRequest().WithCollectionName(collection_name).WithRowsData(std::move(rows)), insert_response);
91+
util::CheckStatus("insert", status);
92+
93+
total_inserted += insert_response.Results().InsertCount();
94+
if ((batch + 1) % 10 == 0) {
95+
std::cout << " Inserted " << total_inserted << " / " << total_rows << " rows" << std::endl;
96+
}
97+
}
98+
99+
status = client->Flush(milvus::FlushRequest().AddCollectionName(collection_name));
100+
util::CheckStatus("flush", status);
101+
std::cout << "Total inserted: " << total_inserted << " rows" << std::endl;
102+
103+
std::cout << "========== Step 3: Create IVF_FLAT index ==========" << std::endl;
104+
milvus::IndexDesc index(vector_field, "", milvus::IndexType::IVF_FLAT, milvus::MetricType::L2);
105+
index.AddExtraParam("nlist", "32");
106+
status = client->CreateIndex(milvus::CreateIndexRequest()
107+
.WithCollectionName(collection_name)
108+
.AddIndex(std::move(index))
109+
.WithTimeoutMs(100000));
110+
util::CheckStatus("create IVF_FLAT index", status);
111+
112+
std::cout << "========== Step 4: Load collection ==========" << std::endl;
113+
status = client->LoadCollection(milvus::LoadCollectionRequest().WithCollectionName(collection_name));
114+
util::CheckStatus("load collection", status);
115+
116+
std::cout << "========== Step 5: Query segment info (before optimize) ==========" << std::endl;
117+
printSegmentInfo(client);
118+
119+
std::cout << "========== Step 6: Optimize (targetSize=4GB, async) ==========" << std::endl;
120+
const auto start_time = std::chrono::steady_clock::now();
121+
milvus::OptimizeTaskPtr task;
122+
status = client->Optimize(
123+
milvus::OptimizeRequest().WithCollectionName(collection_name).WithTargetSize("4GB").WithAsync(true), task);
124+
util::CheckStatus("optimize", status);
125+
126+
std::string last_progress;
127+
while (!task->IsDone()) {
128+
auto progress = task->CurrentProgress();
129+
if (!progress.empty() && progress != last_progress) {
130+
const auto elapsed =
131+
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start_time);
132+
std::cout << " Optimize progress [" << elapsed.count() << "s]: " << progress << std::endl;
133+
last_progress = progress;
134+
}
135+
std::this_thread::sleep_for(std::chrono::seconds(1));
136+
}
137+
138+
milvus::OptimizeResponse optimize_response;
139+
status = task->GetResult(optimize_response);
140+
util::CheckStatus("get optimize result", status);
141+
142+
const auto elapsed =
143+
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
144+
std::cout << "Optimize completed in " << elapsed.count() / 1000.0 << " seconds" << std::endl;
145+
std::cout << " Status: " << optimize_response.StatusText() << std::endl;
146+
std::cout << " Compaction ID: " << optimize_response.CompactionID() << std::endl;
147+
std::cout << " Progress:";
148+
for (const auto& progress : optimize_response.ProgressHistory()) {
149+
std::cout << " " << progress;
150+
}
151+
std::cout << std::endl;
152+
153+
std::cout << "========== Step 7: Query segment info (after optimize) ==========" << std::endl;
154+
const auto step7_start_time = std::chrono::steady_clock::now();
155+
while (true) {
156+
int segment_count = printSegmentInfo(client);
157+
if (segment_count == 1) {
158+
std::cout << "Optimization successful, only one segment remains" << std::endl;
159+
break;
160+
}
161+
std::cout << "Waiting for optimization to complete..." << std::endl;
162+
std::this_thread::sleep_for(std::chrono::seconds(1));
163+
}
164+
const auto step7_elapsed =
165+
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - step7_start_time);
166+
std::cout << "Step 7 completed in " << step7_elapsed.count() / 1000.0 << " seconds" << std::endl;
167+
168+
client->Disconnect();
169+
return 0;
170+
}

0 commit comments

Comments
 (0)