You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hello,
I was looking for a library that allowed for a threadsafe single connection to sqlite for a side project I've been working on and came across your project here. I like the functionality available but was curious about the internals when I noticed the execute / fetch_results pattern which generates a token and uses that token to fetch the result set when it eventually executes.
On a hunch I asked AI to perform a code review on the file and from the result it does indeed look like generated tokens are retained forever and never get expunged from _select_events which means that memory will continue to grow and will never shrink. The prompt I used was:
do a code review of https://raw.githubusercontent.com/roshanlam/sqlite-worker/refs/heads/main/sqlite_worker/main.py
i'd like you to analyze in particular for memory leaks and thread safety issues
You should be able to run this and get similar results
I'm curious if you've ever considered an approach based on the Future object? As a result of using futures you sidestep any memory leaking due to the tokens approach. Here's an example implementation that even implements full async functionality:
importasyncioimportqueueimportsqlite3importthreadingfromtypingimportAny, OptionalclassAsyncSqliteWorker:
"""Minimal async reimplementation of the original SqliteWorker. Only execute() + execution thread. No tokens, no leaks. """def__init__(self, db_path: str, max_queue_size: int=100):
self.db_path=db_pathself._queue: queue.Queue=queue.Queue(maxsize=max_queue_size)
self._loop: Optional[asyncio.AbstractEventLoop] =Noneself._close_event=threading.Event()
# Start the execution thread (exactly like the original)self._thread=threading.Thread(target=self._worker, daemon=True)
self._thread.start()
def_is_select_query(self, query: str) ->bool:
"""Same detection logic as the original library."""returnquery.lower().lstrip().startswith("select")
def_worker(self):
"""The original execution thread — now talks to asyncio futures."""conn=sqlite3.connect(
self.db_path,
check_same_thread=False,
detect_types=sqlite3.PARSE_DECLTYPES,
timeout=30.0,
)
try:
whilenotself._close_event.is_set():
try:
task=self._queue.get(timeout=1.0)
exceptqueue.Empty:
continueiftaskisNone: # shutdown sentinelbreakfuture: asyncio.Futurequery: strvalues: listfuture, query, values=tasktry:
cursor=conn.cursor()
cursor.execute(query, valuesor [])
# Return sensible result (unlike original that always did fetchall)ifself._is_select_query(query):
result: Any=cursor.fetchall()
else:
result=cursor.rowcountconn.commit() # simple auto-commit for DML# Safe cross-thread result deliveryifself._loopisnotNone:
self._loop.call_soon_threadsafe(
lambdaf, r: f.set_result(r) ifnotf.done() elseNone,
future, result
)
exceptExceptionasexc:
ifself._loopisnotNone:
self._loop.call_soon_threadsafe(
lambdaf, e: f.set_exception(e) ifnotf.done() elseNone,
future, exc
)
finally:
conn.close()
asyncdefexecute(self, query: str, values: Optional[list] =None) ->Any:
"""Fully async version of the original execute(). - No token - No fetch_results() - Just await it → get rows or rowcount (or exception) """ifself._close_event.is_set():
raiseRuntimeError("Worker is closed")
# Capture the running loop on first call (safe pattern)ifself._loopisNone:
self._loop=asyncio.get_running_loop()
future: asyncio.Future=self._loop.create_future()
# Same queue pattern as the original libraryself._queue.put((future, query, valuesor []), timeout=5)
returnawaitfuturedefclose(self):
"""Simple shutdown (for completeness in a POC)."""self._close_event.set()
self._queue.put(None, timeout=5) # sentinelself._thread.join(timeout=5)
# ====================== Tiny usage example ======================asyncdefdemo():
db=AsyncSqliteWorker("test.db")
# CREATEawaitdb.execute("CREATE TABLE IF NOT EXISTS test (id INTEGER PRIMARY KEY, name TEXT)")
# INSERTawaitdb.execute("INSERT INTO test (name) VALUES (?)", ["hello async"])
# SELECTrows=awaitdb.execute("SELECT * FROM test")
print("Rows:", rows)
db.close()
if__name__=="__main__":
asyncio.run(demo())
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Hello,
I was looking for a library that allowed for a threadsafe single connection to sqlite for a side project I've been working on and came across your project here. I like the functionality available but was curious about the internals when I noticed the execute / fetch_results pattern which generates a token and uses that token to fetch the result set when it eventually executes.
On a hunch I asked AI to perform a code review on the file and from the result it does indeed look like generated tokens are retained forever and never get expunged from
_select_eventswhich means that memory will continue to grow and will never shrink. The prompt I used was:You should be able to run this and get similar results
I'm curious if you've ever considered an approach based on the Future object? As a result of using futures you sidestep any memory leaking due to the tokens approach. Here's an example implementation that even implements full async functionality:
Beta Was this translation helpful? Give feedback.
All reactions