Skip to content

Commit 1c6c264

Browse files
committed
set pragma per connect
1 parent 9d31eb2 commit 1c6c264

File tree

1 file changed

+31
-16
lines changed

1 file changed

+31
-16
lines changed

src/crawlee/storage_clients/_sql/_storage_client.py

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
from pathlib import Path
66
from typing import TYPE_CHECKING, Any, ClassVar
77

8+
from sqlalchemy import event
89
from sqlalchemy.exc import IntegrityError, OperationalError
910
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
10-
from sqlalchemy.sql import insert, select, text
11+
from sqlalchemy.sql import insert, select
1112
from typing_extensions import override
1213

1314
from crawlee._utils.docs import docs_group
@@ -22,7 +23,9 @@
2223
if TYPE_CHECKING:
2324
from types import TracebackType
2425

26+
from sqlalchemy.engine.interfaces import DBAPIConnection
2527
from sqlalchemy.ext.asyncio import AsyncSession
28+
from sqlalchemy.pool import ConnectionPoolEntry
2629

2730

2831
logger = getLogger(__name__)
@@ -72,10 +75,7 @@ def __init__(
7275
self._initialized = False
7376
self.session_maker: None | async_sessionmaker[AsyncSession] = None
7477

75-
# Flag needed to apply optimizations only for SQLite database
76-
self._optimization_flag = self._engine is None and (
77-
self._connection_string is None or self._connection_string.startswith('sqlite')
78-
)
78+
self._listeners_registered = False
7979
self._dialect_name: str | None = None
8080

8181
# Call the notification only once
@@ -117,9 +117,10 @@ async def initialize(self, configuration: Configuration) -> None:
117117
"""
118118
if not self._initialized:
119119
engine = self._get_or_create_engine(configuration)
120-
async with engine.begin() as conn:
121-
self._dialect_name = engine.dialect.name
122120

121+
self._dialect_name = engine.dialect.name
122+
123+
async with engine.begin() as conn:
123124
if self._dialect_name not in self._SUPPORTED_DIALECTS:
124125
raise ValueError(
125126
f'Unsupported database dialect: {self._dialect_name}. Supported: '
@@ -130,16 +131,8 @@ async def initialize(self, configuration: Configuration) -> None:
130131
# Rollback the transaction when an exception occurs.
131132
# This is likely an attempt to create a database from several parallel processes.
132133
try:
133-
# Set SQLite pragmas for performance and consistency
134-
if self._optimization_flag:
135-
await conn.execute(text('PRAGMA journal_mode=WAL')) # Better concurrency
136-
await conn.execute(text('PRAGMA synchronous=NORMAL')) # Balanced safety/speed
137-
await conn.execute(text('PRAGMA cache_size=100000')) # 100MB cache
138-
await conn.execute(text('PRAGMA temp_store=MEMORY')) # Memory temp storage
139-
await conn.execute(text('PRAGMA mmap_size=268435456')) # 256MB memory mapping
140-
await conn.execute(text('PRAGMA foreign_keys=ON')) # Enforce constraints
141-
await conn.execute(text('PRAGMA busy_timeout=30000')) # 30s busy timeout
142134
await conn.run_sync(Base.metadata.create_all, checkfirst=True)
135+
143136
from crawlee import __version__ # Noqa: PLC0415
144137

145138
db_version = (await conn.execute(select(VersionDb))).scalar_one_or_none()
@@ -155,6 +148,7 @@ async def initialize(self, configuration: Configuration) -> None:
155148
)
156149
elif not db_version:
157150
await conn.execute(insert(VersionDb).values(version=__version__))
151+
158152
except (IntegrityError, OperationalError):
159153
await conn.rollback()
160154

@@ -163,6 +157,10 @@ async def initialize(self, configuration: Configuration) -> None:
163157
async def close(self) -> None:
164158
"""Close the database connection pool."""
165159
if self._engine is not None:
160+
if self._listeners_registered:
161+
event.remove(self._engine.sync_engine, 'connect', self._on_connect)
162+
self._listeners_registered = False
163+
166164
await self._engine.dispose()
167165
self._engine = None
168166

@@ -287,4 +285,21 @@ def _get_or_create_engine(self, configuration: Configuration) -> AsyncEngine:
287285
connect_args=connect_args,
288286
**kwargs,
289287
)
288+
289+
event.listen(self._engine.sync_engine, 'connect', self._on_connect)
290+
self._listeners_registered = True
291+
290292
return self._engine
293+
294+
def _on_connect(self, dbapi_conn: DBAPIConnection, _connection_record: ConnectionPoolEntry) -> None:
295+
"""Event listener for new database connections to set pragmas."""
296+
if self._dialect_name == 'sqlite':
297+
cursor = dbapi_conn.cursor()
298+
cursor.execute('PRAGMA journal_mode=WAL') # Better concurrency
299+
cursor.execute('PRAGMA synchronous=NORMAL') # Balanced safety/speed
300+
cursor.execute('PRAGMA cache_size=100000') # 100MB cache
301+
cursor.execute('PRAGMA temp_store=MEMORY') # Memory temp storage
302+
cursor.execute('PRAGMA mmap_size=268435456') # 256MB memory mapping
303+
cursor.execute('PRAGMA foreign_keys=ON') # Enforce constraints
304+
cursor.execute('PRAGMA busy_timeout=30000') # 30s busy timeout
305+
cursor.close()

0 commit comments

Comments
 (0)