Skip to content
This repository was archived by the owner on May 27, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions projects/pgai/pgai/vectorizer/embedders/ollama.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,24 @@ class Ollama(BaseModel, BaseURLMixin, Embedder):
model: str
options: OllamaOptions | None = None
keep_alive: str | None = None # this is only `str` because of the SQL API
_client: "ollama.AsyncClient | None" = None

def _get_client(self) -> "ollama.AsyncClient":
# Note: deferred import to avoid import overhead
import ollama

if self._client is None:
self._client = ollama.AsyncClient(host=self.base_url)
return self._client

@override
async def cleanup(self) -> None:
"""Close the underlying HTTP client to prevent connection leaks."""
if self._client is not None:
# Ollama's AsyncClient uses httpx internally
if hasattr(self._client, "_client") and self._client._client is not None:
await self._client._client.aclose()
self._client = None

@override
async def embed(
Expand Down Expand Up @@ -101,7 +119,7 @@ async def setup(self):
# Note: deferred import to avoid import overhead
import ollama

client = ollama.AsyncClient(host=self.base_url)
client = self._get_client()
try:
await client.show(self.model)
except ollama.ResponseError as e:
Expand All @@ -113,10 +131,7 @@ async def setup(self):

@override
async def call_embed_api(self, documents: list[str]) -> EmbeddingResponse:
# Note: deferred import to avoid import overhead
import ollama

response = await ollama.AsyncClient(host=self.base_url).embed(
response = await self._get_client().embed(
model=self.model,
input=documents,
options=self.options,
Expand All @@ -132,10 +147,7 @@ async def _context_length(self) -> int | None:
"""
Gets the context_length of the configured model, if available
"""
# Note: deferred import to avoid import overhead
import ollama

model = await ollama.AsyncClient(host=self.base_url).show(self.model)
model = await self._get_client().show(self.model)
architecture = model["model_info"].get("general.architecture", None)
if architecture is None:
logger.warn(f"unable to determine architecture for model '{self.model}'")
Expand Down
19 changes: 15 additions & 4 deletions projects/pgai/pgai/vectorizer/embedders/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,24 @@ def _openai_user(self) -> "str | openai.NotGiven":

return self.user if self.user is not None else openai.NOT_GIVEN

@cached_property
_client: "openai.AsyncOpenAI | None" = None

@property
def _embedder(self) -> "resources.AsyncEmbeddingsWithStreamingResponse":
import openai

return openai.AsyncOpenAI(
base_url=self.base_url, api_key=self._api_key, max_retries=3
).embeddings.with_streaming_response
if self._client is None:
self._client = openai.AsyncOpenAI(
base_url=self.base_url, api_key=self._api_key, max_retries=3
)
return self._client.embeddings.with_streaming_response

@override
async def cleanup(self) -> None:
"""Close the underlying HTTP client to prevent connection leaks."""
if self._client is not None:
await self._client.close()
self._client = None

@override
def _max_chunks_per_batch(self) -> int:
Expand Down
23 changes: 19 additions & 4 deletions projects/pgai/pgai/vectorizer/embedders/voyageai.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,24 @@ class VoyageAI(ApiKeyMixin, BaseModel, Embedder):
input_type: Literal["document"] | Literal["query"] | None = None
output_dimension: int | None = None
output_dtype: str | None = None
_client: "voyageai.AsyncClient | None" = None

def _get_client(self) -> "voyageai.AsyncClient":
# Note: deferred import to avoid import overhead
import voyageai

if self._client is None:
self._client = voyageai.AsyncClient(api_key=self._api_key)
return self._client

@override
async def cleanup(self) -> None:
"""Close the underlying HTTP client to prevent connection leaks."""
if self._client is not None:
# VoyageAI's AsyncClient uses httpx internally
if hasattr(self._client, "_client") and self._client._client is not None:
await self._client._client.aclose()
self._client = None

@override
async def embed(
Expand Down Expand Up @@ -107,9 +125,6 @@ def _token_counter(self) -> Callable[[str], int] | None:

@override
async def call_embed_api(self, documents: list[str]) -> EmbeddingResponse:
# Note: deferred import to avoid import overhead
import voyageai

# Build API call parameters
params: dict[str, Any] = {
"model": self.model,
Expand All @@ -120,7 +135,7 @@ async def call_embed_api(self, documents: list[str]) -> EmbeddingResponse:
if self.output_dtype is not None:
params["output_dtype"] = self.output_dtype

response = await voyageai.AsyncClient(api_key=self._api_key).embed(
response = await self._get_client().embed(
documents,
**params,
)
Expand Down
6 changes: 6 additions & 0 deletions projects/pgai/pgai/vectorizer/embeddings.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ async def setup(self) -> None: # noqa: B027 empty on purpose
Setup the embedder
"""

async def cleanup(self) -> None: # noqa: B027 empty on purpose
"""
Cleanup resources used by the embedder (e.g., close HTTP clients).
Should be called when the embedder is no longer needed.
"""

@abstractmethod
async def call_embed_api(self, documents: list[str]) -> EmbeddingResponse:
"""
Expand Down
4 changes: 4 additions & 0 deletions projects/pgai/pgai/vectorizer/vectorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,10 @@ async def run(self) -> int:
),
)
raise e
finally:
# Clean up embedder resources (e.g., close HTTP clients)
# to prevent connection leaks
await self.vectorizer.config.embedding.cleanup()

async def _should_continue_processing(
self, conn: AsyncConnection, loops: int, res: int
Expand Down