Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions pgmq_sqlalchemy/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,24 @@ def _check_pgmq_ext(self) -> None:
"""Check if the pgmq extension exists."""
self._execute_operation(PGMQOperation.check_pgmq_ext, session=None, commit=True)

async def _check_pgmq_ext_async(self) -> None:
"""Check if the pgmq extension exists (async version)."""
await self._execute_async_operation(
PGMQOperation.check_pgmq_ext_async, session=None, commit=True
)

def _check_pg_partman_ext(self) -> None:
"""Check if the pg_partman extension exists."""
self._execute_operation(
PGMQOperation.check_pg_partman_ext, session=None, commit=True
)

async def _check_pg_partman_ext_async(self) -> None:
"""Check if the pg_partman extension exists (async version)."""
await self._execute_async_operation(
PGMQOperation.check_pg_partman_ext_async, session=None, commit=True
)

def _execute_operation(
self,
op_sync,
Expand Down Expand Up @@ -342,7 +354,7 @@ async def create_partitioned_queue_async(

"""
# check if the pg_partman extension exists before creating a partitioned queue at runtime
self._check_pg_partman_ext()
await self._check_pg_partman_ext_async()

return await self._execute_async_operation(
PGMQOperation.create_partitioned_queue_async,
Expand Down Expand Up @@ -451,7 +463,7 @@ async def drop_queue_async(
"""
# check if the pg_partman extension exists before dropping a partitioned queue at runtime
if partitioned:
self._check_pg_partman_ext()
await self._check_pg_partman_ext_async()

return await self._execute_async_operation(
PGMQOperation.drop_queue_async,
Expand Down
31 changes: 31 additions & 0 deletions tests/_async_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Utility functions for handling both sync and async PGMQueue instances in tests."""

from typing import Any, Optional
from pgmq_sqlalchemy import PGMQueue


def call_method(pgmq: PGMQueue, method_name: str, *args, **kwargs) -> Any:
"""
Call a method on PGMQueue, automatically handling sync vs async.

For async PGMQueue instances, calls the async version of the method
and runs it with loop.run_until_complete.

Args:
pgmq: The PGMQueue instance
method_name: Name of the method to call (without _async suffix)
*args: Positional arguments to pass to the method
**kwargs: Keyword arguments to pass to the method

Returns:
The result from the method call
"""
if pgmq.is_async:
# Call the async version
async_method_name = method_name + '_async'
async_method = getattr(pgmq, async_method_name)
return pgmq.loop.run_until_complete(async_method(*args, **kwargs))
else:
# Call the sync version
method = getattr(pgmq, method_name)
return method(*args, **kwargs)
32 changes: 28 additions & 4 deletions tests/fixture_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,22 @@ def test_something(pgmq_setup_teardown):
pgmq = pgmq_all_variants
queue_name = f"test_queue_{uuid.uuid4().hex}"
assert check_queue_exists(db_session, queue_name) is False
pgmq.create_queue(queue_name)

# Handle both sync and async PGMQueue
if pgmq.is_async:
pgmq.loop.run_until_complete(pgmq.create_queue_async(queue_name))
else:
pgmq.create_queue(queue_name)

assert check_queue_exists(db_session, queue_name) is True
yield pgmq, queue_name
pgmq.drop_queue(queue_name)

# Handle both sync and async PGMQueue
if pgmq.is_async:
pgmq.loop.run_until_complete(pgmq.drop_queue_async(queue_name))
else:
pgmq.drop_queue(queue_name)

assert check_queue_exists(db_session, queue_name) is False


Expand All @@ -79,8 +91,20 @@ def test_something(pgmq_partitioned_setup_teardown):
pgmq: PGMQueue = pgmq_all_variants
queue_name = f"test_queue_{uuid.uuid4().hex}"
assert check_queue_exists(db_session, queue_name) is False
pgmq.create_partitioned_queue(queue_name)

# Handle both sync and async PGMQueue
if pgmq.is_async:
pgmq.loop.run_until_complete(pgmq.create_partitioned_queue_async(queue_name))
else:
pgmq.create_partitioned_queue(queue_name)

assert check_queue_exists(db_session, queue_name) is True
yield pgmq, queue_name
pgmq.drop_queue(queue_name, partitioned=True)

# Handle both sync and async PGMQueue
if pgmq.is_async:
pgmq.loop.run_until_complete(pgmq.drop_queue_async(queue_name, partitioned=True))
else:
pgmq.drop_queue(queue_name, partitioned=True)

assert check_queue_exists(db_session, queue_name) is False
Loading