Skip to content

Commit 83d4a4c

Browse files
committed
add support mariadb dialect
1 parent c7cb18b commit 83d4a4c

3 files changed

Lines changed: 11 additions & 9 deletions

File tree

src/crawlee/storage_clients/_sql/_client_mixin.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ def _build_insert_stmt_with_ignore(
228228
if dialect == 'sqlite':
229229
return lite_insert(table_model).values(insert_values).on_conflict_do_nothing()
230230

231-
if dialect == 'mysql':
231+
if dialect in {'mysql', 'mariadb'}:
232232
return mysql_insert(table_model).values(insert_values).prefix_with('IGNORE')
233233

234234
raise NotImplementedError(f'Insert with ignore not supported for dialect: {dialect}')
@@ -264,7 +264,7 @@ def _build_upsert_stmt(
264264
set_ = {col: getattr(lite_stmt.excluded, col) for col in update_columns}
265265
return lite_stmt.on_conflict_do_update(index_elements=conflict_cols, set_=set_)
266266

267-
if dialect == 'mysql':
267+
if dialect in {'mysql', 'mariadb'}:
268268
mysql_stmt = mysql_insert(table_model).values(insert_values)
269269
set_ = {col: getattr(mysql_stmt.inserted, col) for col in update_columns}
270270
return mysql_stmt.on_duplicate_key_update(**set_)
@@ -416,7 +416,7 @@ async def _try_acquire_buffer_lock(self, session: AsyncSession) -> bool:
416416
lock_until = now + self._BLOCK_BUFFER_TIME
417417
dialect = self._storage_client.get_dialect_name()
418418

419-
if dialect in ('postgresql', 'mysql'):
419+
if dialect in {'postgresql', 'mysql', 'mariadb'}:
420420
select_stmt = (
421421
select(self._METADATA_TABLE)
422422
.where(

src/crawlee/storage_clients/_sql/_request_queue_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ async def fetch_next_request(self) -> Request | None:
433433

434434
async with self.get_session(with_simple_commit=True) as session:
435435
# We use the `skip_locked` database mechanism to prevent the 'interception' of requests by another client
436-
if dialect in ('postgresql', 'mysql'):
436+
if dialect in {'postgresql', 'mysql', 'mariadb'}:
437437
stmt = stmt.with_for_update(skip_locked=True)
438438
result = await session.execute(stmt)
439439
requests_db = result.scalars().all()

src/crawlee/storage_clients/_sql/_storage_client.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,10 @@ async def initialize(self, configuration: Configuration) -> None:
116116
async with engine.begin() as conn:
117117
self._dialect_name = engine.dialect.name
118118

119-
if self._dialect_name not in ('sqlite', 'postgresql', 'mysql'):
119+
if self._dialect_name not in {'sqlite', 'postgresql', 'mysql', 'mariadb'}:
120120
raise ValueError(
121-
f'Unsupported database dialect: {self._dialect_name}. Supported: sqlite, postgresql, mysql. '
122-
'Consider using a different database.',
121+
f'Unsupported database dialect: {self._dialect_name}. Supported: sqlite, postgresql, mysql, '
122+
'mariadb. Consider using a different database.',
123123
)
124124

125125
# Create tables if they don't exist.
@@ -260,14 +260,16 @@ def _get_or_create_engine(self, configuration: Configuration) -> AsyncEngine:
260260
('sqlite' not in connection_string)
261261
and ('postgresql' not in connection_string)
262262
and ('mysql' not in connection_string)
263+
and ('mariadb' not in connection_string)
263264
):
264265
raise ValueError(
265-
'Unsupported database. Supported: sqlite, postgresql, mysql. Consider using a different database.'
266+
'Unsupported database. Supported: sqlite, postgresql, mysql, mariadb. Consider using a different '
267+
'database.'
266268
)
267269

268270
connect_args: dict[str, Any]
269271
kwargs: dict[str, Any] = {}
270-
if 'mysql' in connection_string:
272+
if 'mysql' in connection_string or 'mariadb' in connection_string:
271273
connect_args: dict[str, Any] = {'connect_timeout': 30}
272274
kwargs['isolation_level'] = 'READ COMMITTED'
273275
else:

0 commit comments

Comments
 (0)