Skip to content

Commit e95a162

Browse files
authored
Merge pull request #100 from UiPath/fix/update_runtime_0_3
fix: reusable sqlite connection
2 parents d6f0e24 + 75aaefc commit e95a162

6 files changed

Lines changed: 536 additions & 315 deletions

File tree

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
"""Async SQLite connection manager with automatic serialization."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
from collections.abc import Iterable
7+
from contextlib import asynccontextmanager
8+
from sqlite3 import Row
9+
from typing import Any, AsyncIterator
10+
11+
import aiosqlite
12+
13+
14+
class AsyncSqlite:
15+
"""Async SQLite wrapper with automatic serialization via locks.
16+
17+
Provides thread-safe access to a SQLite database using asyncio locks
18+
to serialize operations. Maintains a single connection and ensures
19+
proper WAL mode configuration.
20+
"""
21+
22+
def __init__(self, db_path: str, timeout: float = 30.0):
23+
"""
24+
Initialize AsyncSQLite manager.
25+
26+
Args:
27+
db_path: Path to the SQLite database file
28+
timeout: Database connection timeout in seconds
29+
"""
30+
self.db_path = db_path
31+
self.timeout = timeout
32+
self.conn: aiosqlite.Connection | None = None
33+
self.lock = asyncio.Lock()
34+
self.is_setup = False
35+
36+
async def __aenter__(self) -> AsyncSqlite:
37+
"""Async context manager entry."""
38+
await self.connect()
39+
return self
40+
41+
async def __aexit__(self, *args) -> None:
42+
"""Async context manager exit."""
43+
await self.close()
44+
45+
async def connect(self) -> None:
46+
"""Establish database connection and apply initial pragmas."""
47+
if self.conn is not None:
48+
return
49+
50+
self.conn = await aiosqlite.connect(self.db_path, timeout=self.timeout)
51+
await self._apply_connection_pragmas()
52+
53+
# WAL mode is persistent, set once
54+
await self.conn.execute("PRAGMA journal_mode=WAL")
55+
await self.conn.commit()
56+
57+
async def close(self) -> None:
58+
"""Close database connection."""
59+
if self.conn:
60+
await self.conn.close()
61+
self.conn = None
62+
self.is_setup = False
63+
64+
async def execute(
65+
self, query: str, parameters: tuple[Any, ...] | None = None
66+
) -> aiosqlite.Cursor:
67+
"""
68+
Execute a single query with automatic locking.
69+
70+
Args:
71+
query: SQL query to execute
72+
parameters: Query parameters
73+
74+
Returns:
75+
Cursor with query results
76+
"""
77+
if self.conn is None:
78+
await self.connect()
79+
80+
assert self.conn is not None
81+
82+
async with self.lock:
83+
return await self.conn.execute(query, parameters or ())
84+
85+
async def executemany(
86+
self, query: str, parameters_list: list[tuple[Any, ...]]
87+
) -> None:
88+
"""
89+
Execute a query multiple times with different parameters.
90+
91+
Args:
92+
query: SQL query to execute
93+
parameters_list: List of parameter tuples
94+
"""
95+
if self.conn is None:
96+
await self.connect()
97+
98+
assert self.conn is not None
99+
100+
async with self.lock:
101+
await self.conn.executemany(query, parameters_list)
102+
await self.conn.commit()
103+
104+
async def executescript(self, script: str) -> None:
105+
"""
106+
Execute a SQL script (multiple statements).
107+
108+
Args:
109+
script: SQL script to execute
110+
"""
111+
if self.conn is None:
112+
await self.connect()
113+
114+
assert self.conn is not None
115+
116+
async with self.lock:
117+
await self.conn.executescript(script)
118+
await self.conn.commit()
119+
120+
async def commit(self) -> None:
121+
"""Commit the current transaction."""
122+
if self.conn is None:
123+
return
124+
125+
assert self.conn is not None
126+
127+
async with self.lock:
128+
await self.conn.commit()
129+
130+
@asynccontextmanager
131+
async def cursor(self) -> AsyncIterator[aiosqlite.Cursor]:
132+
"""
133+
Get a cursor with automatic locking.
134+
135+
Yields:
136+
Database cursor
137+
"""
138+
if self.conn is None:
139+
await self.connect()
140+
141+
assert self.conn is not None
142+
143+
async with self.lock:
144+
cursor = await self.conn.cursor()
145+
try:
146+
yield cursor
147+
finally:
148+
await cursor.close()
149+
150+
async def fetchone(
151+
self, query: str, parameters: tuple[Any, ...] | None = None
152+
) -> Row | None:
153+
"""
154+
Execute query and fetch one result.
155+
156+
Args:
157+
query: SQL query to execute
158+
parameters: Query parameters
159+
160+
Returns:
161+
Single row or None
162+
"""
163+
cursor = await self.execute(query, parameters)
164+
return await cursor.fetchone()
165+
166+
async def fetchall(
167+
self, query: str, parameters: tuple[Any, ...] | None = None
168+
) -> Iterable[Row]:
169+
"""
170+
Execute query and fetch all results.
171+
172+
Args:
173+
query: SQL query to execute
174+
parameters: Query parameters
175+
176+
Returns:
177+
List of rows
178+
"""
179+
cursor = await self.execute(query, parameters)
180+
return await cursor.fetchall()
181+
182+
async def _apply_connection_pragmas(self) -> None:
183+
"""Apply per-connection PRAGMA settings for optimal concurrency."""
184+
if self.conn is None:
185+
return
186+
187+
await self.conn.execute(f"PRAGMA busy_timeout={int(self.timeout * 1000)}")
188+
await self.conn.execute("PRAGMA synchronous=NORMAL")
189+
await self.conn.execute("PRAGMA cache_size=10000")
190+
await self.conn.execute("PRAGMA temp_store=MEMORY")

src/uipath_llamaindex/runtime/factory.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
UiPathLlamaIndexRuntimeError,
2727
)
2828
from uipath_llamaindex.runtime.runtime import UiPathLlamaIndexRuntime
29-
from uipath_llamaindex.runtime.storage import SQLiteResumableStorage
29+
from uipath_llamaindex.runtime.storage import SqliteResumableStorage
3030
from uipath_llamaindex.runtime.workflow import LlamaIndexWorkflowLoader
3131

3232

@@ -51,7 +51,7 @@ def __init__(
5151
self._workflow_lock = asyncio.Lock()
5252

5353
self._storage_lock = asyncio.Lock()
54-
self._storage: SQLiteResumableStorage | None = None
54+
self._storage: SqliteResumableStorage | None = None
5555

5656
self._setup_instrumentation(self.context.trace_manager)
5757

@@ -80,7 +80,7 @@ def _get_storage_path(self) -> str:
8080
os.makedirs(os.path.dirname(default_path), exist_ok=True)
8181
return default_path
8282

83-
async def _get_storage(self) -> SQLiteResumableStorage:
83+
async def _get_storage(self) -> SqliteResumableStorage:
8484
"""Get or create the shared storage instance."""
8585
if self._storage is not None:
8686
return self._storage
@@ -90,7 +90,7 @@ async def _get_storage(self) -> SQLiteResumableStorage:
9090
return self._storage
9191

9292
storage_path = self._get_storage_path()
93-
self._storage = SQLiteResumableStorage(storage_path)
93+
self._storage = SqliteResumableStorage(storage_path)
9494
await self._storage.setup()
9595
return self._storage
9696

@@ -291,3 +291,7 @@ async def dispose(self) -> None:
291291

292292
self._workflow_loaders.clear()
293293
self._workflow_cache.clear()
294+
295+
if self._storage:
296+
await self._storage.dispose()
297+
self._storage = None

src/uipath_llamaindex/runtime/runtime.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
UiPathLlamaIndexRuntimeError,
4848
)
4949
from uipath_llamaindex.runtime.schema import get_entrypoints_schema, get_workflow_schema
50-
from uipath_llamaindex.runtime.storage import SQLiteResumableStorage
50+
from uipath_llamaindex.runtime.storage import SqliteResumableStorage
5151

5252
from ._serialize import serialize_output
5353

@@ -62,7 +62,7 @@ def __init__(
6262
workflow: Workflow,
6363
runtime_id: str | None = None,
6464
entrypoint: str | None = None,
65-
storage: SQLiteResumableStorage | None = None,
65+
storage: SqliteResumableStorage | None = None,
6666
debug_mode: bool = False,
6767
):
6868
"""
@@ -76,7 +76,7 @@ def __init__(
7676
self.workflow: Workflow = workflow
7777
self.runtime_id: str = runtime_id or "default"
7878
self.entrypoint: str | None = entrypoint
79-
self.storage: SQLiteResumableStorage | None = storage
79+
self.storage: SqliteResumableStorage | None = storage
8080
self.debug_mode: bool = debug_mode
8181
self._context: Context | None = None
8282

0 commit comments

Comments
 (0)