Skip to content

Commit 88351f3

Browse files
authored
CreateCollection with indexes (#402)
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent e1d088e commit 88351f3

15 files changed

Lines changed: 301 additions & 67 deletions

examples/src/v2/general.cpp

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -87,26 +87,41 @@ main(int argc, char* argv[]) {
8787
collection_schema->AddField(
8888
milvus::FieldSchema(field_face, milvus::DataType::FLOAT_VECTOR, "face signature").WithDimension(dimension));
8989

90+
// define indexes
91+
milvus::IndexDesc index_vector(field_face, "", milvus::IndexType::IVF_FLAT, milvus::MetricType::COSINE);
92+
index_vector.AddExtraParam(milvus::NLIST, "100");
93+
milvus::IndexDesc index_sort(field_age, "", milvus::IndexType::STL_SORT);
94+
milvus::IndexDesc index_varchar(field_name, "", milvus::IndexType::TRIE);
95+
96+
// drop collection if it exists, the CreateCollectionRequest with indexes will automatically create indexes
97+
// for this collection and load the collection
9098
status = client->DropCollection(
9199
milvus::DropCollectionRequest().WithCollectionName(collection_name).WithDatabaseName(db_name));
92100
status = client->CreateCollection(milvus::CreateCollectionRequest()
93101
.WithCollectionSchema(collection_schema)
94102
.WithDatabaseName(db_name)
103+
.AddIndex(std::move(index_vector))
104+
.AddIndex(std::move(index_sort))
105+
.AddIndex(std::move(index_varchar))
106+
.AddProperty("my_prop", "dummy") // add a customized property
107+
.AddProperty("collection.ttl.seconds", "60") // configure a built-in property
95108
.WithConsistencyLevel(milvus::ConsistencyLevel::STRONG));
96109
util::CheckStatus("create collection: " + collection_name + " in database: " + db_name, status);
97110

98-
// create index
99-
milvus::IndexDesc index_vector(field_face, "", milvus::IndexType::IVF_FLAT, milvus::MetricType::COSINE);
100-
index_vector.AddExtraParam(milvus::NLIST, "100");
101-
milvus::IndexDesc index_sort(field_age, "", milvus::IndexType::STL_SORT);
102-
milvus::IndexDesc index_varchar(field_name, "", milvus::IndexType::TRIE);
103-
status = client->CreateIndex(milvus::CreateIndexRequest()
104-
.WithCollectionName(collection_name)
105-
.WithDatabaseName(db_name)
106-
.AddIndex(std::move(index_vector))
107-
.AddIndex(std::move(index_sort))
108-
.AddIndex(std::move(index_varchar)));
109-
util::CheckStatus("create indexes on collection", status);
111+
{
112+
// describe the collection
113+
milvus::DescribeCollectionResponse desc_response;
114+
status = client->DescribeCollection(
115+
milvus::DescribeCollectionRequest().WithCollectionName(collection_name).WithDatabaseName(db_name),
116+
desc_response);
117+
util::CheckStatus("describe collection: " + collection_name, status);
118+
119+
std::cout << "\tCollection ID: " << desc_response.Desc().ID() << std::endl;
120+
auto properties = desc_response.Desc().Properties();
121+
std::map<std::string, std::string> temp_properties(properties.begin(), properties.end());
122+
std::cout << "\tCollection properties: ";
123+
util::PrintMap(temp_properties);
124+
}
110125

111126
// create a partition
112127
std::string partition_name = "Year_2022";
@@ -116,13 +131,6 @@ main(int argc, char* argv[]) {
116131
.WithPartitionName(partition_name));
117132
util::CheckStatus("create partition: " + partition_name, status);
118133

119-
// tell server prepare to load collection
120-
status = client->LoadCollection(milvus::LoadCollectionRequest()
121-
.WithDatabaseName(db_name)
122-
.WithCollectionName(collection_name)
123-
.WithReplicaNum(1));
124-
util::CheckStatus("load collection: " + collection_name, status);
125-
126134
// list collections
127135
milvus::ListCollectionsResponse resp_list_coll;
128136
status = client->ListCollections(milvus::ListCollectionsRequest().WithDatabaseName(db_name), resp_list_coll);
@@ -234,6 +242,31 @@ main(int argc, char* argv[]) {
234242
std::cout << "partition count(*) = " << response.Results().GetRowCount() << std::endl;
235243
}
236244

245+
{
246+
// call flush() here just to persist the data so that indexnode can build index on a new segment
247+
// Note: in practice, no need to call flush() manually since milvus automatically trigger flush actions
248+
status = client->Flush(milvus::FlushRequest().AddCollectionName(collection_name));
249+
util::CheckStatus("flush collection", status);
250+
251+
// describe an index
252+
milvus::DescribeIndexResponse desc_response;
253+
status = client->DescribeIndex(milvus::DescribeIndexRequest()
254+
.WithCollectionName(collection_name)
255+
.WithDatabaseName(db_name)
256+
.WithFieldName(field_face),
257+
desc_response);
258+
util::CheckStatus("describe index on field: " + field_face, status);
259+
260+
for (const auto& desc : desc_response.Descs()) {
261+
std::cout << "\tIndexName: " << desc.IndexName() << std::endl;
262+
std::cout << "\tIndexType: " << std::to_string(desc.IndexType()) << std::endl;
263+
std::cout << "\tMetricType: " << std::to_string(desc.MetricType()) << std::endl;
264+
std::cout << "\tTotalRows: " << std::to_string(desc.TotalRows()) << std::endl;
265+
std::cout << "\tIndexedRows: " << std::to_string(desc.IndexedRows()) << std::endl;
266+
std::cout << "\tPendingRows: " << std::to_string(desc.PendingRows()) << std::endl;
267+
}
268+
}
269+
237270
{
238271
// query the deleted item and some other item, the returned result will not contain the deleted item
239272
auto request = milvus::QueryRequest()

src/impl/MilvusClientV2Impl.cpp

Lines changed: 116 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,33 @@ MilvusClientV2Impl::CreateCollection(const CreateCollectionRequest& request) {
152152
return Status::OK();
153153
};
154154

155+
auto post = [this, &request, &schema](const proto::common::Status& rpc_response) {
156+
if (request.Indexes().empty()) {
157+
return Status::OK();
158+
}
159+
160+
// if user has defined indexes, create indexes immediately after collection is created.
161+
// note that Sync is false since the new collection empty, no need to wait index.
162+
const auto& descs = request.Indexes();
163+
for (const auto& desc : descs) {
164+
auto status = createIndex(request.DatabaseName(), schema.Name(), desc, false, 0);
165+
if (!status.IsOk()) {
166+
return status;
167+
}
168+
}
169+
170+
// load collection automatically
171+
LoadCollectionRequest load_req =
172+
LoadCollectionRequest()
173+
.WithDatabaseName(request.DatabaseName())
174+
.WithCollectionName(schema.Name())
175+
.WithSync(false); // set sync to false since no need to wait loading progress
176+
177+
return LoadCollection(load_req);
178+
};
179+
155180
return connection_.Invoke<proto::milvus::CreateCollectionRequest, proto::common::Status>(
156-
validate, pre, &MilvusConnection::CreateCollection, nullptr);
181+
validate, pre, &MilvusConnection::CreateCollection, post);
157182
}
158183

159184
Status
@@ -185,11 +210,9 @@ MilvusClientV2Impl::DropCollection(const DropCollectionRequest& request) {
185210
auto post = [this, &request](const proto::common::Status& status) {
186211
if (status.error_code() == proto::common::ErrorCode::Success && status.code() == 0) {
187212
// compile warning at this line since proto deprecates this method error_code()
188-
// TODO: if the parameters provides db_name in future, we need to set the correct
189-
// db_name to RemoveCollectionTs()
190213
auto db_name = connection_.CurrentDbName(request.DatabaseName());
191214
auto collection_name = request.CollectionName();
192-
GtsDict::GetInstance().RemoveCollectionTs(connection_.CurrentDbName(db_name), collection_name);
215+
GtsDict::GetInstance().RemoveCollectionTs(db_name, collection_name);
193216
removeCollectionDesc(db_name, collection_name);
194217
}
195218
return Status::OK();
@@ -222,8 +245,15 @@ MilvusClientV2Impl::LoadCollection(const LoadCollectionRequest& request) {
222245
pre, &MilvusConnection::LoadCollection);
223246
}
224247

225-
// TODO: check timeout value in sync mode
248+
// wait loading progress, check load state in interval 500ms, until the time cost exceeds request.TimeoutMs()
249+
// ProgressMonitor timeout unit is second, it is a history problem.
250+
// request.TimeoutMs() 0ms is treated as 0 second, which means "forever".
251+
// request.TimeoutMs() in [1, 1000] is treated as 1 second, request.
252+
// request.TimeoutMs() in [1001, 2000] is treated as 2 seconds, etc.
226253
ProgressMonitor progress_monitor = ProgressMonitor::Forever();
254+
if (request.TimeoutMs() > 0) {
255+
progress_monitor = ProgressMonitor{static_cast<uint32_t>(request.TimeoutMs() + 999) / 1000};
256+
}
227257
auto wait_for_status = [this, &request, &progress_monitor](const proto::common::Status&) {
228258
return ConnectionHandler::WaitForStatus(
229259
[&request, this](Progress& progress) -> Status {
@@ -274,9 +304,14 @@ MilvusClientV2Impl::DescribeCollection(const DescribeCollectionRequest& request,
274304
aliases.insert(aliases.end(), rpc_response.aliases().begin(), rpc_response.aliases().end());
275305
collection_desc.SetAlias(std::move(aliases));
276306

277-
response.SetDesc(std::move(collection_desc));
307+
std::unordered_map<std::string, std::string> properties;
308+
for (int i = 0; i < rpc_response.properties_size(); i++) {
309+
const auto& prop = rpc_response.properties(i);
310+
properties[prop.key()] = prop.value();
311+
}
312+
collection_desc.SetProperties(std::move(properties));
278313

279-
// TODO: set properties
314+
response.SetDesc(std::move(collection_desc));
280315
return Status::OK();
281316
};
282317

@@ -356,10 +391,21 @@ MilvusClientV2Impl::GetLoadState(const GetLoadStateRequest& request, GetLoadStat
356391
return Status::OK();
357392
};
358393

359-
auto post = [&response](const proto::milvus::GetLoadStateResponse& rpc_response) {
360-
response.SetState(LoadStateCast(rpc_response.state()));
394+
auto post = [this, &request, &response](const proto::milvus::GetLoadStateResponse& rpc_response) {
395+
auto state = rpc_response.state();
396+
response.SetState(LoadStateCast(state));
361397

362-
// TODO: set progress percent if state is LoadStateLoading
398+
if (state == proto::common::LoadState::LoadStateLoading) {
399+
uint32_t progress = 0;
400+
auto status = connection_.GetLoadingProgress(request.DatabaseName(), request.CollectionName(),
401+
request.PartitionNames(), progress);
402+
if (!status.IsOk()) {
403+
return status;
404+
}
405+
response.SetProgress(progress);
406+
} else if (state == proto::common::LoadState::LoadStateLoaded) {
407+
response.SetProgress(100);
408+
}
363409
return Status::OK();
364410
};
365411

@@ -503,8 +549,15 @@ MilvusClientV2Impl::LoadPartitions(const LoadPartitionsRequest& request) {
503549
pre, &MilvusConnection::LoadPartitions);
504550
}
505551

506-
// TODO: check timeout value in sync mode
552+
// wait loading progress, check load state in interval 500ms, until the time cost exceeds request.TimeoutMs()
553+
// ProgressMonitor timeout unit is second, it is a history problem.
554+
// request.TimeoutMs() 0ms is treated as 0 second, which means "forever".
555+
// request.TimeoutMs() in [1, 1000] is treated as 1 second, request.
556+
// request.TimeoutMs() in [1001, 2000] is treated as 2 seconds, etc.
507557
ProgressMonitor progress_monitor = ProgressMonitor::Forever();
558+
if (request.TimeoutMs() > 0) {
559+
progress_monitor = ProgressMonitor{static_cast<uint32_t>(request.TimeoutMs() + 999) / 1000};
560+
}
508561
auto wait_for_status = [this, &request, &progress_monitor](const proto::common::Status&) {
509562
return ConnectionHandler::WaitForStatus(
510563
[&request, this](Progress& progress) -> Status {
@@ -782,7 +835,7 @@ MilvusClientV2Impl::DescribeDatabase(const DescribeDatabaseRequest& request, Des
782835
const auto& prop = rpc_response.properties(i);
783836
properties[prop.key()] = prop.value();
784837
}
785-
db_desc.SetProperties(properties);
838+
db_desc.SetProperties(std::move(properties));
786839

787840
response.SetDesc(std::move(db_desc));
788841
return Status::OK();
@@ -796,13 +849,13 @@ Status
796849
MilvusClientV2Impl::CreateIndex(const CreateIndexRequest& request) {
797850
const auto& descs = request.Indexes();
798851
for (const auto& desc : descs) {
799-
auto status = createIndex(request.DatabaseName(), request.CollectionName(), desc, request.Sync());
852+
auto status =
853+
createIndex(request.DatabaseName(), request.CollectionName(), desc, request.Sync(), request.TimeoutMs());
800854
if (!status.IsOk()) {
801855
return status;
802856
}
803-
804-
// TODO: check timeout value in sync mode
805857
}
858+
806859
return Status::OK();
807860
}
808861

@@ -822,9 +875,15 @@ MilvusClientV2Impl::DescribeIndex(const DescribeIndexRequest& request, DescribeI
822875
return Status{StatusCode::SERVER_FAILED, "Index not found:" + request.FieldName()};
823876
}
824877

878+
// althought we have specified the field_name, the server returns all the indexes of the collection,
879+
// pick the correct index from the list.
825880
std::vector<IndexDesc> descs;
826881
for (auto i = 0; i < count; i++) {
827-
auto rpc_desc = rpc_response.index_descriptions(0);
882+
auto rpc_desc = rpc_response.index_descriptions(i);
883+
if (rpc_desc.field_name() != request.FieldName()) {
884+
continue;
885+
}
886+
828887
IndexDesc index_desc;
829888
index_desc.SetFieldName(rpc_desc.field_name());
830889
index_desc.SetIndexName(rpc_desc.index_name());
@@ -1428,8 +1487,15 @@ MilvusClientV2Impl::Flush(const FlushRequest& request) {
14281487
return Status::OK();
14291488
};
14301489

1431-
// TODO: check timeout value in sync mode
1490+
// wait flush progress, check flush state in interval 500ms, until the time cost exceeds request.WaitFlushedMs()
1491+
// ProgressMonitor timeout unit is second, it is a history problem.
1492+
// request.WaitFlushedMs() 0ms is treated as 0 second, which means "forever".
1493+
// request.WaitFlushedMs() in [1, 1000] is treated as 1 second, request.
1494+
// request.WaitFlushedMs() in [1001, 2000] is treated as 2 seconds, etc.
14321495
ProgressMonitor progress_monitor = ProgressMonitor::Forever();
1496+
if (request.WaitFlushedMs() > 0) {
1497+
progress_monitor = ProgressMonitor{static_cast<uint32_t>(request.WaitFlushedMs() + 999) / 1000};
1498+
}
14331499
auto wait_for_status = [this, &progress_monitor](const proto::milvus::FlushResponse& response) {
14341500
std::map<std::string, std::vector<int64_t>> flush_segments;
14351501
for (const auto& iter : response.coll_segids()) {
@@ -2014,7 +2080,7 @@ MilvusClientV2Impl::RemovePrivilegesFromGroup(const RemovePrivilegesFromGroupReq
20142080
// internal used methods
20152081
Status
20162082
MilvusClientV2Impl::createIndex(const std::string& db_name, const std::string& collection_name, const IndexDesc& desc,
2017-
bool sync) {
2083+
bool sync, int64_t timeout_ms) {
20182084
auto pre = [&db_name, &collection_name, &desc](proto::milvus::CreateIndexRequest& rpc_request) {
20192085
rpc_request.set_db_name(db_name);
20202086
rpc_request.set_collection_name(collection_name);
@@ -2046,7 +2112,16 @@ MilvusClientV2Impl::createIndex(const std::string& db_name, const std::string& c
20462112
pre, &MilvusConnection::CreateIndex);
20472113
}
20482114

2115+
// wait index progress, check index state in interval 500ms, until the time cost exceeds timeout_ms
2116+
// ProgressMonitor timeout unit is second, it is a history problem.
2117+
// timeout_ms 0ms is treated as 0 second, which means "forever".
2118+
// timeout_ms in [1, 1000] is treated as 1 second, request.
2119+
// timeout_ms in [1001, 2000] is treated as 2 seconds, etc.
2120+
// Note: wait timeout_ms for each index, means N indexes will wait N * timeout_ms.
20492121
ProgressMonitor progress_monitor = ProgressMonitor::Forever();
2122+
if (timeout_ms > 0) {
2123+
progress_monitor = ProgressMonitor{static_cast<uint32_t>(timeout_ms + 999) / 1000};
2124+
}
20502125
auto wait_for_status = [&db_name, &collection_name, &desc, &progress_monitor, this](const proto::common::Status&) {
20512126
return ConnectionHandler::WaitForStatus(
20522127
[&db_name, &collection_name, &desc, this](Progress& progress) -> Status {
@@ -2112,6 +2187,16 @@ combineDbCollectionName(const std::string& db_name, const std::string& collectio
21122187
Status
21132188
MilvusClientV2Impl::getCollectionDesc(const std::string& db_name, const std::string& collection_name, bool force_update,
21142189
CollectionDescPtr& desc_ptr) {
2190+
// if connection is connected to "", equals "default" db, the input db_name is "", actual_db is "default"
2191+
// if connection is connected to "default", the input db_name is "" or "default", actual_db is "default"
2192+
// if connection is connected to "A" but the input db_name is "B", actual_db is "B"
2193+
// if connection is connected to "A" but the input db_name is "", actual_db is "A"
2194+
// if connection is connected to "A" but the input db_name is "A", actual_db is "A"
2195+
auto actual_db = connection_.CurrentDbName(db_name);
2196+
if (actual_db.empty()) {
2197+
actual_db = "default";
2198+
}
2199+
21152200
// this lock locks the entire section, including the call of DescribeCollection()
21162201
// the reason is: describeCollection() could be limited by server-side(DDL request throttling is enabled)
21172202
// we don't intend to allow too many threads run into describeCollection() in this method
@@ -2125,12 +2210,12 @@ MilvusClientV2Impl::getCollectionDesc(const std::string& db_name, const std::str
21252210
}
21262211

21272212
DescribeCollectionRequest rquest =
2128-
DescribeCollectionRequest().WithDatabaseName(db_name).WithCollectionName(collection_name);
2213+
DescribeCollectionRequest().WithDatabaseName(actual_db).WithCollectionName(collection_name);
21292214
DescribeCollectionResponse response;
21302215
auto status = DescribeCollection(rquest, response);
21312216
if (status.IsOk()) {
21322217
desc_ptr = std::make_shared<CollectionDesc>(response.Desc());
2133-
auto name = combineDbCollectionName(db_name, collection_name);
2218+
auto name = combineDbCollectionName(actual_db, collection_name);
21342219
collection_desc_cache_[name] = desc_ptr;
21352220
return status;
21362221
}
@@ -2145,7 +2230,17 @@ MilvusClientV2Impl::cleanCollectionDescCache() {
21452230

21462231
void
21472232
MilvusClientV2Impl::removeCollectionDesc(const std::string& db_name, const std::string& collection_name) {
2148-
auto name = combineDbCollectionName(db_name, collection_name);
2233+
// if connection is connected to "", equals "default" db, the input db_name is "", actual_db is "default"
2234+
// if connection is connected to "default", the input db_name is "" or "default", actual_db is "default"
2235+
// if connection is connected to "A" but the input db_name is "B", actual_db is "B"
2236+
// if connection is connected to "A" but the input db_name is "", actual_db is "A"
2237+
// if connection is connected to "A" but the input db_name is "A", actual_db is "A"
2238+
auto actual_db = connection_.CurrentDbName(db_name);
2239+
if (actual_db.empty()) {
2240+
actual_db = "default";
2241+
}
2242+
2243+
auto name = combineDbCollectionName(actual_db, collection_name);
21492244
std::lock_guard<std::mutex> lock(collection_desc_cache_mtx_);
21502245
collection_desc_cache_.erase(name);
21512246
}

src/impl/MilvusClientV2Impl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,8 @@ class MilvusClientV2Impl : public MilvusClientV2 {
294294

295295
private:
296296
Status
297-
createIndex(const std::string& db_name, const std::string& collection_name, const IndexDesc& desc, bool sync);
297+
createIndex(const std::string& db_name, const std::string& collection_name, const IndexDesc& desc, bool sync,
298+
int64_t timeout_ms);
298299

299300
Status
300301
getFlushState(const std::vector<int64_t>& segments, bool& flushed);

0 commit comments

Comments
 (0)