Skip to content

Commit c7cb18b

Browse files
committed
add mysql/mariadb
1 parent 309fb38 commit c7cb18b

5 files changed

Lines changed: 162 additions & 35 deletions

File tree

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ sql_sqlite = [
8181
"sqlalchemy[asyncio]>=2.0.0,<3.0.0",
8282
"aiosqlite>=0.21.0",
8383
]
84+
sql_mysql = [
85+
"sqlalchemy[asyncio]>=2.0.0,<3.0.0",
86+
"aiomysql>=0.3.2",
87+
"cryptography>=46.0.5",
88+
]
8489
redis = ["redis[hiredis] >= 7.0.0"]
8590

8691
[project.scripts]

src/crawlee/storage_clients/_sql/_client_mixin.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88

99
from sqlalchemy import CursorResult, delete, select, text, update
1010
from sqlalchemy import func as sql_func
11+
from sqlalchemy.dialects.mysql import insert as mysql_insert
1112
from sqlalchemy.dialects.postgresql import insert as pg_insert
1213
from sqlalchemy.dialects.sqlite import insert as lite_insert
13-
from sqlalchemy.exc import SQLAlchemyError
14+
from sqlalchemy.exc import OperationalError, SQLAlchemyError
1415

1516
from crawlee._utils.crypto import crypto_random_object_id
1617

@@ -227,6 +228,9 @@ def _build_insert_stmt_with_ignore(
227228
if dialect == 'sqlite':
228229
return lite_insert(table_model).values(insert_values).on_conflict_do_nothing()
229230

231+
if dialect == 'mysql':
232+
return mysql_insert(table_model).values(insert_values).prefix_with('IGNORE')
233+
230234
raise NotImplementedError(f'Insert with ignore not supported for dialect: {dialect}')
231235

232236
def _build_upsert_stmt(
@@ -260,6 +264,11 @@ def _build_upsert_stmt(
260264
set_ = {col: getattr(lite_stmt.excluded, col) for col in update_columns}
261265
return lite_stmt.on_conflict_do_update(index_elements=conflict_cols, set_=set_)
262266

267+
if dialect == 'mysql':
268+
mysql_stmt = mysql_insert(table_model).values(insert_values)
269+
set_ = {col: getattr(mysql_stmt.inserted, col) for col in update_columns}
270+
return mysql_stmt.on_duplicate_key_update(**set_)
271+
263272
raise NotImplementedError(f'Upsert not supported for dialect: {dialect}')
264273

265274
async def _purge(self, metadata_kwargs: MetadataUpdateParams) -> None:
@@ -402,11 +411,12 @@ async def _try_acquire_buffer_lock(self, session: AsyncSession) -> bool:
402411
Returns:
403412
True if lock was acquired, False if already locked by another process.
404413
"""
414+
capture_error_code = 1020 # MariaDB error code for "Record has changed since last read"
405415
now = datetime.now(timezone.utc)
406416
lock_until = now + self._BLOCK_BUFFER_TIME
407417
dialect = self._storage_client.get_dialect_name()
408418

409-
if dialect == 'postgresql':
419+
if dialect in ('postgresql', 'mysql'):
410420
select_stmt = (
411421
select(self._METADATA_TABLE)
412422
.where(
@@ -417,7 +427,17 @@ async def _try_acquire_buffer_lock(self, session: AsyncSession) -> bool:
417427
)
418428
.with_for_update(skip_locked=True)
419429
)
420-
result = await session.execute(select_stmt)
430+
431+
try:
432+
result = await session.execute(select_stmt)
433+
except OperationalError as e:
434+
# MariaDB raises error 1020 ("Record has changed since last read") instead of
435+
# silently skipping locked rows like MySQL/PostgreSQL. Treat it as lock not acquired.
436+
error_code = getattr(e.orig, 'args', [None])[0]
437+
if error_code == capture_error_code:
438+
return False
439+
raise
440+
421441
metadata_row = result.scalar_one_or_none()
422442

423443
if metadata_row is None:

src/crawlee/storage_clients/_sql/_request_queue_client.py

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -335,33 +335,33 @@ async def add_batch_of_requests(
335335
)
336336
)
337337

338-
if insert_values:
339-
if forefront:
340-
# If the request already exists in the database, we update the sequence_number by shifting request
341-
# to the left.
342-
upsert_stmt = self._build_upsert_stmt(
343-
self._ITEM_TABLE,
344-
insert_values,
345-
update_columns=['sequence_number'],
346-
conflict_cols=['request_id', 'request_queue_id'],
347-
)
348-
result = await session.execute(upsert_stmt)
349-
else:
350-
# If the request already exists in the database, we ignore this request when inserting.
351-
insert_stmt_with_ignore = self._build_insert_stmt_with_ignore(self._ITEM_TABLE, insert_values)
352-
result = await session.execute(insert_stmt_with_ignore)
338+
try:
339+
if insert_values:
340+
if forefront:
341+
# If the request already exists in the database, we update the sequence_number
342+
# by shifting request to the left.
343+
upsert_stmt = self._build_upsert_stmt(
344+
self._ITEM_TABLE,
345+
insert_values,
346+
update_columns=['sequence_number'],
347+
conflict_cols=['request_id', 'request_queue_id'],
348+
)
349+
result = await session.execute(upsert_stmt)
350+
else:
351+
# If the request already exists in the database, we ignore this request when inserting.
352+
insert_stmt_with_ignore = self._build_insert_stmt_with_ignore(self._ITEM_TABLE, insert_values)
353+
result = await session.execute(insert_stmt_with_ignore)
353354

354-
result = cast('CursorResult', result) if not isinstance(result, CursorResult) else result
355-
approximate_new_request += result.rowcount
355+
result = cast('CursorResult', result) if not isinstance(result, CursorResult) else result
356+
approximate_new_request += result.rowcount
356357

357-
await self._add_buffer_record(
358-
session,
359-
update_modified_at=True,
360-
delta_pending_request_count=approximate_new_request,
361-
delta_total_request_count=approximate_new_request,
362-
)
358+
await self._add_buffer_record(
359+
session,
360+
update_modified_at=True,
361+
delta_pending_request_count=approximate_new_request,
362+
delta_total_request_count=approximate_new_request,
363+
)
363364

364-
try:
365365
await session.commit()
366366
processed_requests.extend(transaction_processed_requests)
367367
except SQLAlchemyError as e:
@@ -433,7 +433,7 @@ async def fetch_next_request(self) -> Request | None:
433433

434434
async with self.get_session(with_simple_commit=True) as session:
435435
# We use the `skip_locked` database mechanism to prevent the 'interception' of requests by another client
436-
if dialect == 'postgresql':
436+
if dialect in ('postgresql', 'mysql'):
437437
stmt = stmt.with_for_update(skip_locked=True)
438438
result = await session.execute(stmt)
439439
requests_db = result.scalars().all()

src/crawlee/storage_clients/_sql/_storage_client.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import warnings
44
from logging import getLogger
55
from pathlib import Path
6-
from typing import TYPE_CHECKING
6+
from typing import TYPE_CHECKING, Any
77

88
from sqlalchemy.exc import IntegrityError, OperationalError
99
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
@@ -116,9 +116,9 @@ async def initialize(self, configuration: Configuration) -> None:
116116
async with engine.begin() as conn:
117117
self._dialect_name = engine.dialect.name
118118

119-
if self._dialect_name not in ('sqlite', 'postgresql'):
119+
if self._dialect_name not in ('sqlite', 'postgresql', 'mysql'):
120120
raise ValueError(
121-
f'Unsupported database dialect: {self._dialect_name}. Supported: sqlite, postgresql. '
121+
f'Unsupported database dialect: {self._dialect_name}. Supported: sqlite, postgresql, mysql. '
122122
'Consider using a different database.',
123123
)
124124

@@ -256,11 +256,23 @@ def _get_or_create_engine(self, configuration: Configuration) -> AsyncEngine:
256256
# Create connection string with path to default database
257257
connection_string = f'sqlite+aiosqlite:///{db_path}'
258258

259-
if 'sqlite' not in connection_string and 'postgresql' not in connection_string:
259+
if (
260+
('sqlite' not in connection_string)
261+
and ('postgresql' not in connection_string)
262+
and ('mysql' not in connection_string)
263+
):
260264
raise ValueError(
261-
'Unsupported database. Supported: sqlite, postgresql. Consider using a different database.'
265+
'Unsupported database. Supported: sqlite, postgresql, mysql. Consider using a different database.'
262266
)
263267

268+
connect_args: dict[str, Any]
269+
kwargs: dict[str, Any] = {}
270+
if 'mysql' in connection_string:
271+
connect_args: dict[str, Any] = {'connect_timeout': 30}
272+
kwargs['isolation_level'] = 'READ COMMITTED'
273+
else:
274+
connect_args = {'timeout': 30}
275+
264276
self._engine = create_async_engine(
265277
connection_string,
266278
future=True,
@@ -270,6 +282,7 @@ def _get_or_create_engine(self, configuration: Configuration) -> AsyncEngine:
270282
pool_recycle=600,
271283
pool_pre_ping=True,
272284
echo=False,
273-
connect_args={'timeout': 30},
285+
connect_args=connect_args,
286+
**kwargs,
274287
)
275288
return self._engine

0 commit comments

Comments
 (0)