Skip to content

Commit b76b850

Browse files
fix: offload sync Pydantic serialization to thread in async write path
AsyncPointsApi methods for `upsert_points`, `batch_update`, and `update_vectors` call `_build_for_*` which runs `jsonable_encoder` (-> `model_dump_json`) synchronously before the first `await`. This blocks the event loop for the entire duration of Pydantic serialization, which scales linearly with point count and vector dimensionality. Move the sync `_build_for_*` call into `run_in_executor` for these three methods, whose body size is unbounded. The `_build_for_*` methods return an unawaited coroutine object (from the async api_client.request), so the executor runs the serialization in a thread and we await the resulting coroutine back on the event loop. Fixes #1175
1 parent c7a5bef commit b76b850

1 file changed

Lines changed: 51 additions & 18 deletions

File tree

qdrant_client/http/api/points_api.py

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
# flake8: noqa E501
2+
import asyncio
3+
import functools
24
from typing import TYPE_CHECKING, Any, Dict, Set, TypeVar, Union
35

46
from pydantic import BaseModel
@@ -562,13 +564,26 @@ async def batch_update(
562564
"""
563565
Apply a series of update operations for points, vectors and payloads
564566
"""
565-
return await self._build_for_batch_update(
566-
collection_name=collection_name,
567-
wait=wait,
568-
ordering=ordering,
569-
timeout=timeout,
570-
update_operations=update_operations,
567+
# _build_for_batch_update is a sync method that:
568+
# 1. serializes the body with jsonable_encoder (CPU-bound, can be slow)
569+
# 2. calls self.api_client.request(...) which, for AsyncApiClient, is
570+
# async def — so calling it merely *creates* a coroutine object
571+
# without executing it
572+
# We run the whole sync method in an executor to move the serialization
573+
# off the event loop, then await the returned coroutine here.
574+
loop = asyncio.get_running_loop()
575+
coro = await loop.run_in_executor(
576+
None,
577+
functools.partial(
578+
self._build_for_batch_update,
579+
collection_name=collection_name,
580+
wait=wait,
581+
ordering=ordering,
582+
timeout=timeout,
583+
update_operations=update_operations,
584+
),
571585
)
586+
return await coro
572587

573588
async def clear_payload(
574589
self,
@@ -778,13 +793,22 @@ async def update_vectors(
778793
"""
779794
Update specified named vectors on points, keep unspecified vectors intact.
780795
"""
781-
return await self._build_for_update_vectors(
782-
collection_name=collection_name,
783-
wait=wait,
784-
ordering=ordering,
785-
timeout=timeout,
786-
update_vectors=update_vectors,
796+
# See batch_update for rationale: _build_for_update_vectors is sync,
797+
# returns an unawaited coroutine, and its jsonable_encoder call is the
798+
# expensive part we need off the event loop.
799+
loop = asyncio.get_running_loop()
800+
coro = await loop.run_in_executor(
801+
None,
802+
functools.partial(
803+
self._build_for_update_vectors,
804+
collection_name=collection_name,
805+
wait=wait,
806+
ordering=ordering,
807+
timeout=timeout,
808+
update_vectors=update_vectors,
809+
),
787810
)
811+
return await coro
788812

789813
async def upsert_points(
790814
self,
@@ -797,13 +821,22 @@ async def upsert_points(
797821
"""
798822
Perform insert + updates on points. If point with given ID already exists - it will be overwritten.
799823
"""
800-
return await self._build_for_upsert_points(
801-
collection_name=collection_name,
802-
wait=wait,
803-
ordering=ordering,
804-
timeout=timeout,
805-
point_insert_operations=point_insert_operations,
824+
# See batch_update for rationale: _build_for_upsert_points is sync,
825+
# returns an unawaited coroutine, and its jsonable_encoder call is the
826+
# expensive part we need off the event loop.
827+
loop = asyncio.get_running_loop()
828+
coro = await loop.run_in_executor(
829+
None,
830+
functools.partial(
831+
self._build_for_upsert_points,
832+
collection_name=collection_name,
833+
wait=wait,
834+
ordering=ordering,
835+
timeout=timeout,
836+
point_insert_operations=point_insert_operations,
837+
),
806838
)
839+
return await coro
807840

808841

809842
class SyncPointsApi(_PointsApi):

0 commit comments

Comments
 (0)