Skip to content
This repository was archived by the owner on May 27, 2026. It is now read-only.

Commit 5e73f20

Browse files
committed
fix: prevent HTTP client connection leaks in embedders
The AsyncOpenAI, Ollama, and VoyageAI embedders were creating HTTP clients that were never explicitly closed, causing connections to accumulate in CLOSE_WAIT state and eventually exhausting file descriptors. Changes: - Add cleanup() method to Embedder base class - Implement cleanup() in OpenAI, Ollama, and VoyageAI embedders to close underlying HTTP clients - Call cleanup() in Executor.run() finally block to ensure resources are released regardless of how the embedding loop exits - Reuse client instances instead of creating new ones for each request (Ollama, VoyageAI) This fixes a file descriptor leak that could cause 'Too many open files' errors after prolonged operation.
1 parent 3e05485 commit 5e73f20

5 files changed

Lines changed: 65 additions & 17 deletions

File tree

projects/pgai/pgai/vectorizer/embedders/ollama.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,24 @@ class Ollama(BaseModel, BaseURLMixin, Embedder):
7070
model: str
7171
options: OllamaOptions | None = None
7272
keep_alive: str | None = None # this is only `str` because of the SQL API
73+
_client: "ollama.AsyncClient | None" = None
74+
75+
def _get_client(self) -> "ollama.AsyncClient":
76+
# Note: deferred import to avoid import overhead
77+
import ollama
78+
79+
if self._client is None:
80+
self._client = ollama.AsyncClient(host=self.base_url)
81+
return self._client
82+
83+
@override
84+
async def cleanup(self) -> None:
85+
"""Close the underlying HTTP client to prevent connection leaks."""
86+
if self._client is not None:
87+
# Ollama's AsyncClient uses httpx internally
88+
if hasattr(self._client, "_client") and self._client._client is not None:
89+
await self._client._client.aclose()
90+
self._client = None
7391

7492
@override
7593
async def embed(
@@ -101,7 +119,7 @@ async def setup(self):
101119
# Note: deferred import to avoid import overhead
102120
import ollama
103121

104-
client = ollama.AsyncClient(host=self.base_url)
122+
client = self._get_client()
105123
try:
106124
await client.show(self.model)
107125
except ollama.ResponseError as e:
@@ -113,10 +131,7 @@ async def setup(self):
113131

114132
@override
115133
async def call_embed_api(self, documents: list[str]) -> EmbeddingResponse:
116-
# Note: deferred import to avoid import overhead
117-
import ollama
118-
119-
response = await ollama.AsyncClient(host=self.base_url).embed(
134+
response = await self._get_client().embed(
120135
model=self.model,
121136
input=documents,
122137
options=self.options,
@@ -132,10 +147,7 @@ async def _context_length(self) -> int | None:
132147
"""
133148
Gets the context_length of the configured model, if available
134149
"""
135-
# Note: deferred import to avoid import overhead
136-
import ollama
137-
138-
model = await ollama.AsyncClient(host=self.base_url).show(self.model)
150+
model = await self._get_client().show(self.model)
139151
architecture = model["model_info"].get("general.architecture", None)
140152
if architecture is None:
141153
logger.warn(f"unable to determine architecture for model '{self.model}'")

projects/pgai/pgai/vectorizer/embedders/openai.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,24 @@ def _openai_user(self) -> "str | openai.NotGiven":
9090

9191
return self.user if self.user is not None else openai.NOT_GIVEN
9292

93-
@cached_property
93+
_client: "openai.AsyncOpenAI | None" = None
94+
95+
@property
9496
def _embedder(self) -> "resources.AsyncEmbeddingsWithStreamingResponse":
9597
import openai
9698

97-
return openai.AsyncOpenAI(
98-
base_url=self.base_url, api_key=self._api_key, max_retries=3
99-
).embeddings.with_streaming_response
99+
if self._client is None:
100+
self._client = openai.AsyncOpenAI(
101+
base_url=self.base_url, api_key=self._api_key, max_retries=3
102+
)
103+
return self._client.embeddings.with_streaming_response
104+
105+
@override
106+
async def cleanup(self) -> None:
107+
"""Close the underlying HTTP client to prevent connection leaks."""
108+
if self._client is not None:
109+
await self._client.close()
110+
self._client = None
100111

101112
@override
102113
def _max_chunks_per_batch(self) -> int:

projects/pgai/pgai/vectorizer/embedders/voyageai.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,24 @@ class VoyageAI(ApiKeyMixin, BaseModel, Embedder):
7070
input_type: Literal["document"] | Literal["query"] | None = None
7171
output_dimension: int | None = None
7272
output_dtype: str | None = None
73+
_client: "voyageai.AsyncClient | None" = None
74+
75+
def _get_client(self) -> "voyageai.AsyncClient":
76+
# Note: deferred import to avoid import overhead
77+
import voyageai
78+
79+
if self._client is None:
80+
self._client = voyageai.AsyncClient(api_key=self._api_key)
81+
return self._client
82+
83+
@override
84+
async def cleanup(self) -> None:
85+
"""Close the underlying HTTP client to prevent connection leaks."""
86+
if self._client is not None:
87+
# VoyageAI's AsyncClient uses httpx internally
88+
if hasattr(self._client, "_client") and self._client._client is not None:
89+
await self._client._client.aclose()
90+
self._client = None
7391

7492
@override
7593
async def embed(
@@ -107,9 +125,6 @@ def _token_counter(self) -> Callable[[str], int] | None:
107125

108126
@override
109127
async def call_embed_api(self, documents: list[str]) -> EmbeddingResponse:
110-
# Note: deferred import to avoid import overhead
111-
import voyageai
112-
113128
# Build API call parameters
114129
params: dict[str, Any] = {
115130
"model": self.model,
@@ -120,7 +135,7 @@ async def call_embed_api(self, documents: list[str]) -> EmbeddingResponse:
120135
if self.output_dtype is not None:
121136
params["output_dtype"] = self.output_dtype
122137

123-
response = await voyageai.AsyncClient(api_key=self._api_key).embed(
138+
response = await self._get_client().embed(
124139
documents,
125140
**params,
126141
)

projects/pgai/pgai/vectorizer/embeddings.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ async def setup(self) -> None: # noqa: B027 empty on purpose
118118
Setup the embedder
119119
"""
120120

121+
async def cleanup(self) -> None: # noqa: B027 empty on purpose
122+
"""
123+
Cleanup resources used by the embedder (e.g., close HTTP clients).
124+
Should be called when the embedder is no longer needed.
125+
"""
126+
121127
@abstractmethod
122128
async def call_embed_api(self, documents: list[str]) -> EmbeddingResponse:
123129
"""

projects/pgai/pgai/vectorizer/vectorizer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -839,6 +839,10 @@ async def run(self) -> int:
839839
),
840840
)
841841
raise e
842+
finally:
843+
# Clean up embedder resources (e.g., close HTTP clients)
844+
# to prevent connection leaks
845+
await self.vectorizer.config.embedding.cleanup()
842846

843847
async def _should_continue_processing(
844848
self, conn: AsyncConnection, loops: int, res: int

0 commit comments

Comments
 (0)