Skip to content

Commit a024281

Browse files
authored
Merge pull request #62 from Basekick-Labs/release/25.11.2
Release/25.11.2
2 parents e9dccb5 + f43713e commit a024281

5 files changed

Lines changed: 303 additions & 21 deletions

File tree

api/duckdb_engine.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -373,16 +373,21 @@ def _execute_with_pool(self, sql: str) -> Dict[str, Any]:
373373
with self.connection_pool.get_connection(timeout=5.0) as conn:
374374
# Execute query
375375
query_start = time.time()
376-
rows = conn.execute(sql).fetchall()
377-
columns = [desc[0] for desc in conn.description] if conn.description else []
376+
377+
# CRITICAL: Keep reference to cursor to close it properly
378+
cursor = conn.execute(sql)
379+
rows = cursor.fetchall()
380+
columns = [desc[0] for desc in cursor.description] if cursor.description else []
378381
query_time = time.time() - query_start
379382

380383
# Convert rows to list immediately
381384
data = [list(row) for row in rows]
382385
row_count = len(data)
383386

384-
# CRITICAL: Delete rows reference immediately to free DuckDB result memory
387+
# CRITICAL: Delete ALL references to free DuckDB memory
385388
del rows
389+
cursor.close() # Close cursor to release DuckDB internal buffers
390+
del cursor
386391

387392
total_time = time.time() - start_time
388393

api/duckdb_pool_simple.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,8 @@ def get_connection(self, timeout: float = 5.0):
145145
# CRITICAL: Always return and clean connection
146146
if conn is not None:
147147
try:
148-
# Reset connection state (clears DuckDB internal caches)
149-
# Use a simple query to clear any cached results
148+
# Test if connection is still valid
149+
# If the connection is closed, this will raise an exception
150150
result = conn.execute("SELECT 1").fetchall()
151151
del result
152152

@@ -158,13 +158,21 @@ def get_connection(self, timeout: float = 5.0):
158158
if collected > 100:
159159
logger.debug(f"Connection cleanup: collected {collected} objects")
160160

161+
# Connection is valid, return to pool
162+
self.pool.put(conn)
163+
161164
except Exception as e:
162-
# If cleanup fails, log but don't crash
163-
logger.debug(f"Connection cleanup warning: {e}")
165+
# Connection is invalid/closed - create a fresh one
166+
logger.warning(f"Connection invalid during cleanup ({e}), creating fresh connection")
167+
try:
168+
# Close the bad connection properly
169+
conn.close()
170+
except Exception:
171+
pass
164172

165-
finally:
166-
# Always return connection to pool
167-
self.pool.put(conn)
173+
# Create and return a fresh connection to the pool
174+
fresh_conn = duckdb.connect(self.db_path)
175+
self.pool.put(fresh_conn)
168176

169177
def get_metrics(self) -> Dict[str, Any]:
170178
"""

api/main.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1305,7 +1305,22 @@ async def execute_sql(request: Request, query: QueryRequest):
13051305
if should_gc:
13061306
# Run full GC cycle to reclaim memory
13071307
collected = gc.collect()
1308-
logger.info(f"Garbage collection: {gc_reason}, collected {collected} objects")
1308+
1309+
# CRITICAL: Force Python to return memory to OS (Linux only)
1310+
# Python's memory allocator keeps freed memory in arenas for reuse
1311+
# This causes RSS to keep growing even when objects are freed
1312+
try:
1313+
import ctypes
1314+
import platform
1315+
if platform.system() == 'Linux':
1316+
# Call malloc_trim(0) to return freed memory to OS
1317+
libc = ctypes.CDLL('libc.so.6')
1318+
freed = libc.malloc_trim(0)
1319+
logger.info(f"Garbage collection: {gc_reason}, collected {collected} objects, malloc_trim freed memory")
1320+
else:
1321+
logger.info(f"Garbage collection: {gc_reason}, collected {collected} objects")
1322+
except Exception as e:
1323+
logger.info(f"Garbage collection: {gc_reason}, collected {collected} objects (malloc_trim unavailable)")
13091324

13101325
return response_data
13111326

api/query_cache.py

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,11 @@ def __init__(
4242
max_result_size_mb: Max size of individual result to cache in MB (default: 10)
4343
max_total_cache_mb: Max total cache size in MB (default: 200)
4444
"""
45-
self.ttl_seconds = ttl_seconds or int(os.getenv("QUERY_CACHE_TTL", "60"))
46-
self.max_size = max_size or int(os.getenv("QUERY_CACHE_MAX_SIZE", "100"))
47-
self.max_result_size_mb = max_result_size_mb or int(os.getenv("QUERY_CACHE_MAX_RESULT_MB", "10"))
48-
self.max_total_cache_mb = max_total_cache_mb or int(os.getenv("QUERY_CACHE_MAX_TOTAL_MB", "200"))
45+
# CRITICAL: Reduced defaults for memory-constrained deployments
46+
self.ttl_seconds = ttl_seconds or int(os.getenv("QUERY_CACHE_TTL", "30")) # Was 60s, now 30s
47+
self.max_size = max_size or int(os.getenv("QUERY_CACHE_MAX_SIZE", "50")) # Was 100, now 50
48+
self.max_result_size_mb = max_result_size_mb or int(os.getenv("QUERY_CACHE_MAX_RESULT_MB", "5")) # Was 10MB, now 5MB
49+
self.max_total_cache_mb = max_total_cache_mb or int(os.getenv("QUERY_CACHE_MAX_TOTAL_MB", "100")) # Was 200MB, now 100MB
4950

5051
# Use OrderedDict for LRU eviction
5152
self.cache: OrderedDict[str, Tuple[Dict[str, Any], datetime]] = OrderedDict()
@@ -87,13 +88,13 @@ def _start_cleanup_thread(self):
8788
def _cleanup_expired(self):
8889
"""
8990
Background task to remove expired cache entries.
90-
Runs every ttl_seconds/2 to ensure memory is actually released.
91+
Runs every 10 seconds for aggressive memory cleanup.
9192
"""
9293
import time
9394
import gc
9495

95-
# Run cleanup at half the TTL interval (more aggressive)
96-
cleanup_interval = max(self.ttl_seconds // 2, 10)
96+
# CRITICAL: Run cleanup every 10 seconds for aggressive memory management
97+
cleanup_interval = 10
9798

9899
while self._cleanup_running:
99100
try:
@@ -115,6 +116,9 @@ def _cleanup_expired(self):
115116

116117
# Remove expired entries
117118
for key in expired_keys:
119+
# CRITICAL: Delete data reference first
120+
cached_data, _ = self.cache[key]
121+
del cached_data
118122
del self.cache[key]
119123
expired_count += 1
120124
self.expirations += 1
@@ -125,10 +129,23 @@ def _cleanup_expired(self):
125129
if expired_count > 0:
126130
logger.info(
127131
f"Cache cleanup: removed {expired_count} expired entries, "
128-
f"freed {freed_mb:.1f}MB"
132+
f"freed {freed_mb:.1f}MB, cache size now {self.current_cache_size_mb:.1f}MB"
129133
)
130-
# Force GC to actually release the memory
131-
gc.collect()
134+
# Force aggressive GC to actually release the memory
135+
collected = gc.collect()
136+
137+
# CRITICAL: Force Python to return memory to OS
138+
try:
139+
import ctypes
140+
import platform
141+
if platform.system() == 'Linux':
142+
libc = ctypes.CDLL('libc.so.6')
143+
libc.malloc_trim(0)
144+
logger.info(f"Cache cleanup GC: collected {collected} objects, returned memory to OS")
145+
else:
146+
logger.info(f"Cache cleanup GC: collected {collected} objects")
147+
except Exception:
148+
logger.info(f"Cache cleanup GC: collected {collected} objects")
132149

133150
except Exception as e:
134151
logger.error(f"Error in cache cleanup thread: {e}", exc_info=True)
@@ -277,6 +294,8 @@ def set(self, sql: str, limit: int, result: Dict[str, Any]) -> bool:
277294
# Remove oldest entry
278295
oldest_key, (oldest_data, _) = self.cache.popitem(last=False)
279296
oldest_size = self._estimate_size_mb(oldest_data)
297+
# CRITICAL: Delete data reference to free memory
298+
del oldest_data
280299
self.current_cache_size_mb -= oldest_size
281300
self.evictions += 1
282301
logger.debug(
@@ -289,6 +308,8 @@ def set(self, sql: str, limit: int, result: Dict[str, Any]) -> bool:
289308
# Remove oldest (first item in OrderedDict)
290309
oldest_key, (oldest_data, _) = self.cache.popitem(last=False)
291310
oldest_size = self._estimate_size_mb(oldest_data)
311+
# CRITICAL: Delete data reference to free memory
312+
del oldest_data
292313
self.current_cache_size_mb -= oldest_size
293314
self.evictions += 1
294315
logger.debug(f"Cache EVICT: Entry count limit ({self.max_size})")

0 commit comments

Comments
 (0)