Skip to content

Commit ffaa6a1

Browse files
committed
with one thread we fully build the graph in memory and then save to correctly set the grefs
already faster by ~15%
1 parent a0e7cf3 commit ffaa6a1

3 files changed

Lines changed: 205 additions & 6 deletions

File tree

sql/sql_base.cc

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10215,10 +10215,17 @@ int TABLE::hlindex_read_end()
1021510215

1021610216
int TABLE::hlindexes_bulk_insert_begin(ha_rows rows)
1021710217
{
10218-
if (hlindex && hlindex->in_use)
10218+
if (s->hlindexes())
1021910219
{
10220-
hlindex->bulk_insert_active= true;
10221-
return mhnsw_bulk_insert_begin(this, key_info + s->keys, rows);
10220+
if (!hlindex || !hlindex->in_use)
10221+
if (int err= open_hlindexes_for_write())
10222+
return err;
10223+
10224+
if (hlindex && hlindex->in_use)
10225+
{
10226+
hlindex->bulk_insert_active= true;
10227+
return mhnsw_bulk_insert_begin(this, key_info + s->keys, rows);
10228+
}
1022210229
}
1022310230
return 0;
1022410231
}

sql/table.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1876,6 +1876,8 @@ struct TABLE
18761876
int hlindexes_on_update();
18771877
int hlindexes_on_delete(const uchar *buf);
18781878
int hlindexes_on_delete_all(bool truncate);
1879+
int hlindexes_bulk_insert_begin(ha_rows rows);
1880+
int hlindexes_bulk_insert_end();
18791881
int unlock_hlindexes();
18801882

18811883
void prepare_triggers_for_insert_stmt_or_event();

sql/vector_mhnsw.cc

Lines changed: 193 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ class MHNSW_Share : public Sql_alloc
510510
const uint M;
511511
metric_type metric;
512512
bool use_subdist;
513-
513+
bool bulk_active;
514514
MHNSW_Share(TABLE *t)
515515
: tref_len(t->file->ref_length), gref_len(t->hlindex->file->ref_length),
516516
M(static_cast<uint>(t->s->key_info[t->s->keys].option_struct->M)),
@@ -1012,6 +1012,8 @@ int FVectorNode::load_from_record(TABLE *graph)
10121012
FVector *vec_ptr= FVector::align_ptr(tref() + tref_len());
10131013
memcpy(vec_ptr->data(), v->ptr(), v->length());
10141014
vec_ptr->postprocess(ctx->use_subdist, ctx->vec_len);
1015+
if (ctx->metric == COSINE)
1016+
vec_ptr->abs2= 0.5f;
10151017

10161018
longlong layer= graph->field[FIELD_LAYER]->val_int();
10171019
if (layer > 100) // 10e30 nodes at M=2, more at larger M's
@@ -1266,8 +1268,9 @@ static int update_second_degree_neighbors(MHNSW_param *p, FVectorNode *node)
12661268
if (int err= select_neighbors(p, neigh, neighneighbors, node,
12671269
max_neighbors))
12681270
return err;
1269-
if (int err= neigh->save(p->graph))
1270-
return err;
1271+
if (!p->ctx->bulk_active)
1272+
if (int err= neigh->save(p->graph))
1273+
return err;
12711274
}
12721275
return 0;
12731276
}
@@ -1504,6 +1507,193 @@ int mhnsw_insert(TABLE *table, KEY *keyinfo)
15041507
}
15051508

15061509

1510+
struct MHNSW_Bulk_context : public Sql_alloc {
1511+
MHNSW_Share *ctx;
1512+
DYNAMIC_ARRAY nodes;
1513+
ha_rows estimated_rows;
1514+
uint8_t current_max_layer;
1515+
};
1516+
1517+
int mhnsw_bulk_insert_begin(TABLE *table, KEY *keyinfo, ha_rows rows)
1518+
{
1519+
TABLE *graph= table->hlindex;
1520+
DBUG_ASSERT(graph);
1521+
DBUG_ASSERT(keyinfo->algorithm == HA_KEY_ALG_VECTOR);
1522+
DBUG_ASSERT(keyinfo->usable_key_parts == 1);
1523+
1524+
MHNSW_Bulk_context *bulk= new (table->in_use->mem_root) MHNSW_Bulk_context();
1525+
if (!bulk)
1526+
return HA_ERR_OUT_OF_MEM;
1527+
1528+
bulk->estimated_rows= rows;
1529+
if (my_init_dynamic_array(PSI_INSTRUMENT_MEM, &bulk->nodes, sizeof(FVectorNode*),
1530+
rows + rows * 0.1, rows, MYF(0)))
1531+
{
1532+
delete bulk;
1533+
return HA_ERR_OUT_OF_MEM;
1534+
}
1535+
1536+
int err= MHNSW_Share::acquire(&bulk->ctx, table, true);
1537+
if (err && err != HA_ERR_END_OF_FILE && err != HA_ERR_KEY_NOT_FOUND)
1538+
{
1539+
delete_dynamic(&bulk->nodes);
1540+
delete bulk;
1541+
return err;
1542+
}
1543+
1544+
bulk->ctx->bulk_active= 1;
1545+
bulk->current_max_layer= 0;
1546+
table->hlindex->context= bulk;
1547+
return 0;
1548+
}
1549+
1550+
int mhnsw_bulk_insert_row(TABLE *table, KEY *keyinfo)
1551+
{
1552+
TABLE *graph= table->hlindex;
1553+
MHNSW_Bulk_context *bulk= (MHNSW_Bulk_context*)graph->context;
1554+
MHNSW_Share *ctx= bulk->ctx;
1555+
MY_BITMAP *old_map= dbug_tmp_use_all_columns(table, &table->read_set);
1556+
1557+
DBUG_ASSERT(graph);
1558+
DBUG_ASSERT(bulk);
1559+
DBUG_ASSERT(keyinfo->algorithm == HA_KEY_ALG_VECTOR);
1560+
DBUG_ASSERT(keyinfo->usable_key_parts == 1);
1561+
1562+
Field *vec_field= keyinfo->key_part->field;
1563+
String buf, *res= vec_field->val_str(&buf);
1564+
1565+
DBUG_ASSERT(vec_field->binary());
1566+
DBUG_ASSERT(vec_field->cmp_type() == STRING_RESULT);
1567+
DBUG_ASSERT(res); // ER_INDEX_CANNOT_HAVE_NULL
1568+
DBUG_ASSERT(res->length() > 0 && res->length() % 4 == 0);
1569+
DBUG_ASSERT(table->file->ref_length <= graph->field[FIELD_TREF]->field_length);
1570+
1571+
table->file->position(table->record[0]);
1572+
1573+
if (ctx->byte_len == 0)
1574+
ctx->set_lengths(res->length());
1575+
1576+
if (ctx->byte_len != res->length())
1577+
return my_errno= HA_ERR_CRASHED;
1578+
1579+
const double NORMALIZATION_FACTOR= 1 / std::log(ctx->M);
1580+
double log= -std::log(my_rnd(&table->in_use->rand)) * NORMALIZATION_FACTOR;
1581+
uint8_t max_layer= bulk->current_max_layer;
1582+
uint8_t target_layer= std::min<uint8_t>(static_cast<uint8_t>(std::floor(log)), max_layer + 1);
1583+
1584+
if (bulk->nodes.elements == 0)
1585+
target_layer= 0;
1586+
1587+
if (target_layer > bulk->current_max_layer)
1588+
bulk->current_max_layer= target_layer;
1589+
1590+
FVectorNode *node= new (ctx->alloc_node())
1591+
FVectorNode(ctx, table->file->ref, target_layer, res->ptr());
1592+
1593+
if (insert_dynamic(&bulk->nodes, (uchar*)&node))
1594+
return HA_ERR_OUT_OF_MEM;
1595+
1596+
dbug_tmp_restore_column_map(&table->read_set, old_map);
1597+
return 0;
1598+
}
1599+
1600+
int mhnsw_bulk_insert_end(TABLE *table, KEY *keyinfo)
1601+
{
1602+
THD *thd= table->in_use;
1603+
TABLE *graph= table->hlindex;
1604+
MHNSW_Bulk_context *bulk= (MHNSW_Bulk_context*)graph->context;
1605+
1606+
DBUG_ASSERT(graph);
1607+
DBUG_ASSERT(bulk);
1608+
1609+
MHNSW_Share *ctx= bulk->ctx;
1610+
SCOPE_EXIT([ctx, bulk, table](){
1611+
delete_dynamic(&bulk->nodes);
1612+
ctx->bulk_active= 0;
1613+
ctx->release(table);
1614+
table->hlindex->context= nullptr;
1615+
});
1616+
1617+
for (uint i= 0; i < bulk->nodes.elements; i++)
1618+
{
1619+
FVectorNode *target= *(FVectorNode**)dynamic_element(&bulk->nodes, i, FVectorNode**);
1620+
1621+
if (!ctx->start)
1622+
{
1623+
ctx->start= target;
1624+
continue;
1625+
}
1626+
1627+
MEM_ROOT_SAVEPOINT memroot_sv;
1628+
root_make_savepoint(thd->mem_root, &memroot_sv);
1629+
SCOPE_EXIT([memroot_sv](){ root_free_to_savepoint(&memroot_sv); });
1630+
1631+
const uint8_t max_layer= ctx->start->max_layer;
1632+
uint8_t target_layer= target->max_layer;
1633+
1634+
MHNSW_param p(ctx, graph, max_layer);
1635+
p.acc.graph_size= 1;
1636+
1637+
const size_t max_found= ctx->max_neighbors(0);
1638+
Neighborhood candidates;
1639+
candidates.init(thd->alloc<FVectorNode*>(max_found + 7), max_found);
1640+
candidates.links[candidates.num++]= ctx->start;
1641+
1642+
for (; p.layer > target_layer; p.layer--)
1643+
{
1644+
if (int err= search_layer(&p, target->vec, NEAREST, 1, &candidates, false))
1645+
return err;
1646+
}
1647+
1648+
for (; p.layer >= 0; p.layer--)
1649+
{
1650+
uint max_neighbors= ctx->max_neighbors(p.layer);
1651+
if (int err= search_layer(&p, target->vec, NEAREST, max_neighbors,
1652+
&candidates, true))
1653+
return err;
1654+
if (int err= select_neighbors(&p, target, candidates, 0, max_neighbors))
1655+
return err;
1656+
}
1657+
1658+
ctx->add_to_stats(p.acc);
1659+
1660+
if (target_layer > max_layer)
1661+
ctx->start= target;
1662+
1663+
for (p.layer= target_layer; p.layer >= 0; p.layer--)
1664+
{
1665+
if (int err= update_second_degree_neighbors(&p, target))
1666+
return err;
1667+
}
1668+
}
1669+
1670+
graph->file->ha_start_bulk_insert(bulk->nodes.elements, 0);
1671+
1672+
for (uint i= 0; i < bulk->nodes.elements; i++)
1673+
{
1674+
FVectorNode *node= *(FVectorNode**)dynamic_element(&bulk->nodes, i, FVectorNode**);
1675+
if (int err= node->save(graph))
1676+
return err;
1677+
}
1678+
1679+
if (int err= graph->file->ha_end_bulk_insert())
1680+
return err;
1681+
1682+
if (int err= graph->file->ha_rnd_init(0))
1683+
return err;
1684+
SCOPE_EXIT([graph](){ graph->file->ha_rnd_end(); });
1685+
1686+
// fix neighbors grefs
1687+
for (uint i= 0; i < bulk->nodes.elements; i++)
1688+
{
1689+
FVectorNode *node= *(FVectorNode**)dynamic_element(&bulk->nodes, i, FVectorNode**);
1690+
if (int err= node->save(graph))
1691+
return err;
1692+
}
1693+
1694+
return 0;
1695+
}
1696+
15071697
struct Search_context: public Sql_alloc
15081698
{
15091699
Neighborhood found;

0 commit comments

Comments
 (0)