Skip to content

Commit bef1283

Browse files
zxqfd555Manul from Pathway
authored andcommitted
gracefully handle pathway and milvusdb simultaneous start (#10472)
GitOrigin-RevId: 208e4833bb6b0b2419e691fbf7a9caffa929ab26
1 parent 5c9d6b6 commit bef1283

3 files changed

Lines changed: 87 additions & 6 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1010
- `pw.io.postgres.write` now streams each batch into PostgreSQL through the binary `COPY` protocol instead of issuing one `INSERT` per row, giving a large throughput improvement (up to ~100x) on bulk writes. Both output modes use it: stream-of-changes copies straight into the target, while snapshot mode stages each batch in a temporary table and merges it with a single set-based upsert/delete.
1111

1212
### Fixed
13+
- `pw.io.milvus.write` no longer intermittently fails with a "server unavailable" / "connect failed" error when pointed at a local `.db` file. The embedded local Milvus server reports itself as started before it actually accepts connections, so under load the first connection could lose the race against the server coming up; the connector now retries the initial connection until the local server is ready.
1314
- Improved concurrent write handling in pw.io.sqlite.write for SQLite databases. Writes to the same database file now produce deterministic output in multi-worker and multi-table setups.
1415
- `pw.io.elasticsearch.write` no longer fails when a minibatch is big enough that its Elasticsearch `_bulk` request would exceed a server-side limit. The connector reads both the cluster's `http.max_content_length` (the `413 Request Entity Too Large` limit) and `indexing_pressure.memory.limit` (the `429 Too Many Requests` limit, which on a small-heap node trips well below 100 MB) at start-up, and splits the buffered documents across as many bulk requests as needed to stay under whichever is hit first — so large batches are still written in as few requests as possible instead of being rejected. (Both limits fall back to a conservative default if they cannot be read.)
1516
- `pw.io.elasticsearch.write` now retries transient bulk failures with backoff instead of failing the run on the first hiccup. A whole-request rejection or an individual document failing with `429`/`503` (back-pressure / temporary unavailability) is retried — resending only the documents the server reports as not yet applied, so a retry never duplicates data — while deterministic per-document failures (e.g. a type-mismatched value rejected with `400`) are now logged and skipped rather than silently dropped.

integration_tests/db_connectors/utils.py

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -847,15 +847,53 @@ def get_table_contents(
847847
MILVUS_VECTOR_DIM = 3
848848

849849

850+
def _is_milvus_transient_connect_error(e: Exception) -> bool:
851+
"""Whether ``e`` is milvus-lite's embedded-server connection race.
852+
853+
milvus-lite reports its local server as started as soon as the server
854+
process is alive, *before* the server's local socket actually accepts
855+
connections. The first client therefore races the socket coming up, and a
856+
client that loses the race fails with a ``server unavailable`` /
857+
``connect failed`` error. Under heavy test parallelism the socket can take
858+
a few seconds to appear, so this is retried in
859+
:func:`_connect_milvus_client_with_retry`.
860+
"""
861+
text = str(e)
862+
return (
863+
"Fail connecting to server" in text
864+
or "server unavailable" in text
865+
or "failed to connect to all addresses" in text
866+
or "No such file or directory" in text
867+
)
868+
869+
870+
def _connect_milvus_client_with_retry(uri: str, *, timeout: float = 30.0):
871+
"""Create a ``MilvusClient`` for ``uri``, retrying while a freshly started
872+
milvus-lite embedded server brings its local socket up (see
873+
:func:`_is_milvus_transient_connect_error`). Non-transient errors are raised
874+
immediately."""
875+
from pymilvus import MilvusClient
876+
877+
deadline = time.monotonic() + timeout
878+
while True:
879+
try:
880+
return MilvusClient(uri)
881+
except Exception as e:
882+
if (
883+
not _is_milvus_transient_connect_error(e)
884+
or time.monotonic() >= deadline
885+
):
886+
raise
887+
time.sleep(0.25)
888+
889+
850890
def _make_milvus_client(uri: str):
851891
"""Create a MilvusClient for the given URI.
852892
853893
For local ``.db`` files, works around a pymilvus 2.6.x bug where the
854894
Unix-domain-socket address is not forwarded to the gRPC handler. See
855895
``pathway.io.milvus._make_client`` for the full explanation.
856896
"""
857-
from pymilvus import MilvusClient
858-
859897
if uri.endswith(".db"):
860898
try:
861899
from milvus_lite.server_manager import server_manager_instance
@@ -865,10 +903,10 @@ def _make_milvus_client(uri: str):
865903
raise RuntimeError(
866904
f"milvus-lite failed to start a local server for: {uri}"
867905
)
868-
return MilvusClient(uds_uri)
906+
return _connect_milvus_client_with_retry(uds_uri)
869907
except ImportError:
870908
pass
871-
return MilvusClient(uri)
909+
return _connect_milvus_client_with_retry(uri)
872910

873911

874912
def _is_milvus_transient_init_error(e: Exception) -> bool:

python/pathway/io/milvus/__init__.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from __future__ import annotations
44

5+
import time
56
from typing import Iterable
67

78
import numpy as np
@@ -52,6 +53,44 @@ def _prepare_row(row: dict) -> dict:
5253
return result
5354

5455

56+
def _is_milvus_transient_connect_error(e: Exception) -> bool:
57+
"""Whether ``e`` is a local milvus-lite embedded-server connection race.
58+
59+
milvus-lite reports its local server as started as soon as the server
60+
process is alive, before the server's local socket actually accepts
61+
connections. The first client therefore races the socket coming up, and a
62+
client that loses the race fails with a ``server unavailable`` /
63+
``connect failed`` error. The socket can take a few seconds to appear when
64+
many local databases start at once, so this is retried in
65+
:func:`_connect_with_retry`.
66+
"""
67+
text = str(e)
68+
return (
69+
"Fail connecting to server" in text
70+
or "server unavailable" in text
71+
or "failed to connect to all addresses" in text
72+
or "No such file or directory" in text
73+
)
74+
75+
76+
def _connect_with_retry(MilvusClient, uri: str, *, timeout: float = 30.0):
77+
"""Create a ``MilvusClient`` for ``uri``, retrying while a freshly started
78+
local milvus-lite server brings its socket up (see
79+
:func:`_is_milvus_transient_connect_error`). Non-transient errors are raised
80+
immediately."""
81+
deadline = time.monotonic() + timeout
82+
while True:
83+
try:
84+
return MilvusClient(uri)
85+
except Exception as e:
86+
if (
87+
not _is_milvus_transient_connect_error(e)
88+
or time.monotonic() >= deadline
89+
):
90+
raise
91+
time.sleep(0.25)
92+
93+
5594
def _make_client(MilvusClient, uri: str):
5695
"""Create a MilvusClient, working around a pymilvus 2.6.x bug.
5796
@@ -63,6 +102,9 @@ def _make_client(MilvusClient, uri: str):
63102
connection hangs. Passing ``address=`` as an explicit keyword argument
64103
causes it to flow through ``handler_kwargs`` directly into
65104
``GrpcHandler.__init__``, where ``kwargs.get("address")`` picks it up.
105+
106+
A freshly started local server may not be accepting connections yet, so the
107+
client is created through :func:`_connect_with_retry`.
66108
"""
67109
if uri.endswith(".db"):
68110
with optional_imports("milvus"):
@@ -73,9 +115,9 @@ def _make_client(MilvusClient, uri: str):
73115
raise RuntimeError(
74116
f"milvus-lite failed to start a local server for: {uri}"
75117
)
76-
return MilvusClient(uds_uri)
118+
return _connect_with_retry(MilvusClient, uds_uri)
77119

78-
return MilvusClient(uri)
120+
return _connect_with_retry(MilvusClient, uri)
79121

80122

81123
@check_arg_types

0 commit comments

Comments
 (0)