Skip to content

Commit 28dd3db

Browse files
committed
Add RefreshLoad/Optimize interface
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent e249f46 commit 28dd3db

28 files changed

Lines changed: 1999 additions & 69 deletions

.github/workflows/main.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ on:
2727
- '.clang-format'
2828
- '.clang-tidy'
2929

30+
concurrency:
31+
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
32+
cancel-in-progress: true
33+
3034
jobs:
3135
# Detect whether the PR touches core SDK files (src, cmake, test,
3236
# thirdparty, scripts, workflow, lint configs) or only peripheral files

.github/workflows/release.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ on:
66
tags:
77
- v*
88

9+
concurrency:
10+
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
11+
cancel-in-progress: true
12+
913
jobs:
1014
linux:
1115
name: Release package for ${{ matrix.os.distro }} ${{ matrix.os.version }}

examples/src/v2/optimize.cpp

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#include <chrono>
18+
#include <cstdint>
19+
#include <iostream>
20+
#include <string>
21+
#include <thread>
22+
23+
#include "ExampleUtils.h"
24+
#include "milvus/MilvusClientV2.h"
25+
26+
namespace {
27+
const char* const collection_name = "cpp_sdk_example_optimize_v2";
28+
const char* const id_field = "id";
29+
const char* const vector_field = "vector";
30+
const uint32_t vector_dim = 512;
31+
const int64_t total_rows = 1000000;
32+
const int64_t batch_size = 10000;
33+
34+
int
35+
printSegmentInfo(milvus::MilvusClientV2Ptr& client) {
36+
milvus::ListQuerySegmentsResponse response;
37+
auto status =
38+
client->ListQuerySegments(milvus::ListQuerySegmentsRequest().WithCollectionName(collection_name), response);
39+
util::CheckStatus("list query segments", status);
40+
41+
const auto& segments = response.Result();
42+
std::cout << " Total segments: " << segments.size() << std::endl;
43+
44+
int64_t total_rows_in_segments = 0;
45+
for (const auto& segment : segments) {
46+
std::cout << " Segment " << segment.SegmentID() << ": rows=" << segment.RowCount()
47+
<< ", state=" << static_cast<int>(segment.State()) << ", index=" << segment.IndexName() << std::endl;
48+
total_rows_in_segments += segment.RowCount();
49+
}
50+
std::cout << " Total rows across segments: " << total_rows_in_segments << std::endl;
51+
return static_cast<int>(segments.size());
52+
}
53+
54+
} // namespace
55+
56+
int
57+
main(int argc, char* argv[]) {
58+
printf("Example start...\n");
59+
60+
auto client = milvus::MilvusClientV2::Create();
61+
62+
milvus::ConnectParam connect_param{"http://localhost:19530"};
63+
auto status = client->Connect(connect_param);
64+
util::CheckStatus("connect milvus server", status);
65+
66+
std::cout << "========== Step 1: Create collection ==========" << std::endl;
67+
status = client->DropCollection(milvus::DropCollectionRequest().WithCollectionName(collection_name));
68+
69+
milvus::CollectionSchemaPtr schema = std::make_shared<milvus::CollectionSchema>();
70+
schema->AddField(milvus::FieldSchema(id_field, milvus::DataType::INT64, "", true, true));
71+
schema->AddField(milvus::FieldSchema(vector_field, milvus::DataType::FLOAT_VECTOR).WithDimension(vector_dim));
72+
73+
status = client->CreateCollection(
74+
milvus::CreateCollectionRequest().WithCollectionName(collection_name).WithCollectionSchema(schema));
75+
util::CheckStatus("create collection: " + std::string(collection_name), status);
76+
77+
std::cout << "========== Step 2: Insert 1,000,000 rows ==========" << std::endl;
78+
int64_t total_inserted = 0;
79+
for (int64_t batch = 0; batch < total_rows / batch_size; ++batch) {
80+
milvus::EntityRows rows;
81+
rows.reserve(batch_size);
82+
for (int64_t i = 0; i < batch_size; ++i) {
83+
milvus::EntityRow row;
84+
row[vector_field] = util::GenerateFloatVector(vector_dim);
85+
rows.emplace_back(std::move(row));
86+
}
87+
88+
milvus::InsertResponse insert_response;
89+
status = client->Insert(
90+
milvus::InsertRequest().WithCollectionName(collection_name).WithRowsData(std::move(rows)), insert_response);
91+
util::CheckStatus("insert", status);
92+
93+
total_inserted += insert_response.Results().InsertCount();
94+
if ((batch + 1) % 10 == 0) {
95+
std::cout << " Inserted " << total_inserted << " / " << total_rows << " rows" << std::endl;
96+
}
97+
}
98+
99+
status = client->Flush(milvus::FlushRequest().AddCollectionName(collection_name));
100+
util::CheckStatus("flush", status);
101+
std::cout << "Total inserted: " << total_inserted << " rows" << std::endl;
102+
103+
std::cout << "========== Step 3: Create IVF_FLAT index ==========" << std::endl;
104+
milvus::IndexDesc index(vector_field, "", milvus::IndexType::IVF_FLAT, milvus::MetricType::L2);
105+
index.AddExtraParam("nlist", "32");
106+
status = client->CreateIndex(milvus::CreateIndexRequest()
107+
.WithCollectionName(collection_name)
108+
.AddIndex(std::move(index))
109+
.WithTimeoutMs(100000));
110+
util::CheckStatus("create IVF_FLAT index", status);
111+
112+
std::cout << "========== Step 4: Load collection ==========" << std::endl;
113+
status = client->LoadCollection(milvus::LoadCollectionRequest().WithCollectionName(collection_name));
114+
util::CheckStatus("load collection", status);
115+
116+
std::cout << "========== Step 5: Query segment info (before optimize) ==========" << std::endl;
117+
printSegmentInfo(client);
118+
119+
std::cout << "========== Step 6: Optimize (targetSize=4GB, sync) ==========" << std::endl;
120+
const auto start_time = std::chrono::steady_clock::now();
121+
milvus::OptimizeTaskPtr task;
122+
status =
123+
client->Optimize(milvus::OptimizeRequest().WithCollectionName(collection_name).WithTargetSize("4GB"), task);
124+
util::CheckStatus("optimize", status);
125+
126+
milvus::OptimizeResponse optimize_response;
127+
status = task->GetResult(optimize_response);
128+
util::CheckStatus("get optimize result", status);
129+
130+
const auto elapsed =
131+
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
132+
std::cout << "Optimize completed in " << elapsed.count() / 1000.0 << " seconds" << std::endl;
133+
std::cout << " Status: " << optimize_response.StatusText() << std::endl;
134+
std::cout << " Compaction ID: " << optimize_response.CompactionID() << std::endl;
135+
std::cout << " Progress:";
136+
for (const auto& progress : optimize_response.ProgressHistory()) {
137+
std::cout << " " << progress;
138+
}
139+
std::cout << std::endl;
140+
141+
std::cout << "========== Step 7: Query segment info (after optimize) ==========" << std::endl;
142+
while (true) {
143+
int segment_count = printSegmentInfo(client);
144+
if (segment_count == 1) {
145+
std::cout << "Optimization successful, only one segment remains" << std::endl;
146+
break;
147+
}
148+
std::cout << "Waiting for optimization to complete..." << std::endl;
149+
std::this_thread::sleep_for(std::chrono::seconds(1));
150+
}
151+
152+
client->Disconnect();
153+
return 0;
154+
}

0 commit comments

Comments
 (0)