Skip to content

Commit e9dccb5

Browse files
authored
Merge pull request #61 from Basekick-Labs/release/25.11.2
fix: Properly fix memory leak in query result handling (v25.11.2)
2 parents fb91d3f + 6442be6 commit e9dccb5

3 files changed

Lines changed: 52 additions & 23 deletions

File tree

api/duckdb_engine.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,13 +377,20 @@ def _execute_with_pool(self, sql: str) -> Dict[str, Any]:
377377
columns = [desc[0] for desc in conn.description] if conn.description else []
378378
query_time = time.time() - query_start
379379

380+
# Convert rows to list immediately
381+
data = [list(row) for row in rows]
382+
row_count = len(data)
383+
384+
# CRITICAL: Delete rows reference immediately to free DuckDB result memory
385+
del rows
386+
380387
total_time = time.time() - start_time
381388

382389
return {
383390
"success": True,
384-
"data": [list(row) for row in rows],
391+
"data": data,
385392
"columns": columns,
386-
"row_count": len(rows),
393+
"row_count": row_count,
387394
"execution_time_ms": round(query_time * 1000, 2),
388395
"wait_time_ms": round((total_time - query_time) * 1000, 2)
389396
}

api/duckdb_pool_simple.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,17 @@ def get_connection(self, timeout: float = 5.0):
146146
if conn is not None:
147147
try:
148148
# Reset connection state (clears DuckDB internal caches)
149-
conn.execute("SELECT NULL").fetchall()
149+
# Use a simple query to clear any cached results
150+
result = conn.execute("SELECT 1").fetchall()
151+
del result
150152

151-
# Force garbage collection to release memory
152-
# This is critical for preventing memory leaks
153-
gc.collect()
153+
# Force aggressive garbage collection to release memory
154+
# This is critical for preventing memory leaks from DuckDB result sets
155+
collected = gc.collect()
156+
157+
# Log GC only for significant collections (reduces log noise)
158+
if collected > 100:
159+
logger.debug(f"Connection cleanup: collected {collected} objects")
154160

155161
except Exception as e:
156162
# If cleanup fails, log but don't crash

api/main.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1254,42 +1254,58 @@ async def execute_sql(request: Request, query: QueryRequest):
12541254
elif row_count > 10000:
12551255
warning_message = f"Moderate result: {row_count:,} rows returned. Consider using 'LIMIT' if you don't need all rows."
12561256

1257-
# Extract data for response before potential cleanup
1257+
# Build response object
12581258
response_data = QueryResponse(
12591259
success=True,
12601260
columns=result.get("columns", []),
12611261
data=result.get("data", []),
1262-
row_count=result.get("row_count", 0),
1262+
row_count=row_count,
12631263
execution_time_ms=result.get("execution_time_ms", 0.0),
12641264
timestamp=datetime.now(),
1265-
error=warning_message # Use error field for educational warnings
1265+
error=warning_message
12661266
)
12671267

1268-
# CRITICAL MEMORY FIX: Aggressively free memory after ALL queries
1269-
# Small queries accumulate over time causing memory leaks in long-running services
1270-
del result # Always delete result dict to free references
1268+
# CRITICAL MEMORY FIX: Aggressively free memory BEFORE returning
1269+
# Delete result dict immediately - the data is now owned by response_data
1270+
del result
12711271

1272+
# Trigger aggressive garbage collection based on query size
12721273
import gc
1273-
# Trigger GC based on query size OR periodically for small queries
1274-
if row_count > 1000:
1275-
# Large queries: immediate GC to release DuckDB memory
1276-
gc.collect()
1277-
logger.debug(f"Garbage collection after {row_count:,} row query")
1278-
elif not hasattr(app.state, '_query_counter'):
1279-
# Initialize query counter on first query
1274+
1275+
# Track GC invocations for monitoring
1276+
if not hasattr(app.state, '_query_counter'):
12801277
app.state._query_counter = 0
12811278
app.state._last_gc_time = time.time()
1279+
1280+
# Determine if we should run GC now
1281+
should_gc = False
1282+
gc_reason = ""
1283+
1284+
if row_count > 1000:
1285+
# Large queries: ALWAYS run immediate GC
1286+
should_gc = True
1287+
gc_reason = f"large query ({row_count:,} rows)"
12821288
else:
1283-
# Small queries: GC every 100 queries OR every 60 seconds
1289+
# Small queries: GC every 50 queries OR every 30 seconds
12841290
app.state._query_counter += 1
12851291
current_time = time.time()
12861292
time_since_gc = current_time - app.state._last_gc_time
12871293

1288-
if app.state._query_counter >= 100 or time_since_gc >= 60:
1289-
gc.collect()
1290-
logger.debug(f"Periodic garbage collection: {app.state._query_counter} queries, {time_since_gc:.1f}s since last GC")
1294+
if app.state._query_counter >= 50:
1295+
should_gc = True
1296+
gc_reason = f"periodic (50 queries)"
12911297
app.state._query_counter = 0
12921298
app.state._last_gc_time = current_time
1299+
elif time_since_gc >= 30:
1300+
should_gc = True
1301+
gc_reason = f"periodic ({time_since_gc:.1f}s elapsed)"
1302+
app.state._query_counter = 0
1303+
app.state._last_gc_time = current_time
1304+
1305+
if should_gc:
1306+
# Run full GC cycle to reclaim memory
1307+
collected = gc.collect()
1308+
logger.info(f"Garbage collection: {gc_reason}, collected {collected} objects")
12931309

12941310
return response_data
12951311

0 commit comments

Comments
 (0)