From 64f748e95c97406b723f9795a387693c80a1dc26 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 6 Jan 2026 05:42:36 +0000 Subject: [PATCH 1/7] Initial plan From be4bbc584bf75fc58a583fa52d308594d984579b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 6 Jan 2026 05:49:54 +0000 Subject: [PATCH 2/7] Fix async session handling in PGMQueue operations Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- pgmq_sqlalchemy/queue.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/pgmq_sqlalchemy/queue.py b/pgmq_sqlalchemy/queue.py index a248cbd..7075c81 100644 --- a/pgmq_sqlalchemy/queue.py +++ b/pgmq_sqlalchemy/queue.py @@ -125,13 +125,23 @@ def __init__( def _check_pgmq_ext(self) -> None: """Check if the pgmq extension exists.""" - self._execute_operation(PGMQOperation.check_pgmq_ext, session=None, commit=True) + if self.is_async: + self.loop.run_until_complete( + self._execute_async_operation(PGMQOperation.check_pgmq_ext_async, session=None, commit=True) + ) + else: + self._execute_operation(PGMQOperation.check_pgmq_ext, session=None, commit=True) def _check_pg_partman_ext(self) -> None: """Check if the pg_partman extension exists.""" - self._execute_operation( - PGMQOperation.check_pg_partman_ext, session=None, commit=True - ) + if self.is_async: + self.loop.run_until_complete( + self._execute_async_operation(PGMQOperation.check_pg_partman_ext_async, session=None, commit=True) + ) + else: + self._execute_operation( + PGMQOperation.check_pg_partman_ext, session=None, commit=True + ) def _execute_operation( self, @@ -153,6 +163,14 @@ def _execute_operation( Returns: The result from the operation """ + # If this is an async PGMQueue, use the async operation path + if self.is_async: + # Get the async version of the operation + op_async = getattr(PGMQOperation, op_sync.__name__ + '_async') + return self.loop.run_until_complete( + self._execute_async_operation(op_async, session, commit, *args, **kwargs) + ) + if session is None: with self.session_maker() as s: return op_sync(*args, session=s, commit=commit, **kwargs) From e26055e8f8168447f18839b247a3da6bb1db142f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 6 Jan 2026 05:52:10 +0000 Subject: [PATCH 3/7] Improve error handling and remove code duplication in async routing Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- pgmq_sqlalchemy/queue.py | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/pgmq_sqlalchemy/queue.py b/pgmq_sqlalchemy/queue.py index 7075c81..44937a3 100644 --- a/pgmq_sqlalchemy/queue.py +++ b/pgmq_sqlalchemy/queue.py @@ -125,23 +125,13 @@ def __init__( def _check_pgmq_ext(self) -> None: """Check if the pgmq extension exists.""" - if self.is_async: - self.loop.run_until_complete( - self._execute_async_operation(PGMQOperation.check_pgmq_ext_async, session=None, commit=True) - ) - else: - self._execute_operation(PGMQOperation.check_pgmq_ext, session=None, commit=True) + self._execute_operation(PGMQOperation.check_pgmq_ext, session=None, commit=True) def _check_pg_partman_ext(self) -> None: """Check if the pg_partman extension exists.""" - if self.is_async: - self.loop.run_until_complete( - self._execute_async_operation(PGMQOperation.check_pg_partman_ext_async, session=None, commit=True) - ) - else: - self._execute_operation( - PGMQOperation.check_pg_partman_ext, session=None, commit=True - ) + self._execute_operation( + PGMQOperation.check_pg_partman_ext, session=None, commit=True + ) def _execute_operation( self, @@ -166,7 +156,13 @@ def _execute_operation( # If this is an async PGMQueue, use the async operation path if self.is_async: # Get the async version of the operation - op_async = getattr(PGMQOperation, op_sync.__name__ + '_async') + async_op_name = op_sync.__name__ + '_async' + if not hasattr(PGMQOperation, async_op_name): + raise AttributeError( + f"Async version of operation '{op_sync.__name__}' not found. " + f"Expected '{async_op_name}' to exist in PGMQOperation." + ) + op_async = getattr(PGMQOperation, async_op_name) return self.loop.run_until_complete( self._execute_async_operation(op_async, session, commit, *args, **kwargs) ) From 889308c8b65cf69577657e974efc8ff2de583e2a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 6 Jan 2026 06:07:21 +0000 Subject: [PATCH 4/7] Fix async handling by updating tests to use call_method helper Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- pgmq_sqlalchemy/queue.py | 32 +++--- tests/_async_utils.py | 31 ++++++ tests/fixture_deps.py | 32 +++++- tests/test_queue.py | 207 ++++++++++++++++++++------------------- 4 files changed, 177 insertions(+), 125 deletions(-) create mode 100644 tests/_async_utils.py diff --git a/pgmq_sqlalchemy/queue.py b/pgmq_sqlalchemy/queue.py index 44937a3..e6e3687 100644 --- a/pgmq_sqlalchemy/queue.py +++ b/pgmq_sqlalchemy/queue.py @@ -125,13 +125,23 @@ def __init__( def _check_pgmq_ext(self) -> None: """Check if the pgmq extension exists.""" - self._execute_operation(PGMQOperation.check_pgmq_ext, session=None, commit=True) + if self.is_async: + self.loop.run_until_complete( + self._execute_async_operation(PGMQOperation.check_pgmq_ext_async, session=None, commit=True) + ) + else: + self._execute_operation(PGMQOperation.check_pgmq_ext, session=None, commit=True) def _check_pg_partman_ext(self) -> None: """Check if the pg_partman extension exists.""" - self._execute_operation( - PGMQOperation.check_pg_partman_ext, session=None, commit=True - ) + if self.is_async: + self.loop.run_until_complete( + self._execute_async_operation(PGMQOperation.check_pg_partman_ext_async, session=None, commit=True) + ) + else: + self._execute_operation( + PGMQOperation.check_pg_partman_ext, session=None, commit=True + ) def _execute_operation( self, @@ -153,20 +163,6 @@ def _execute_operation( Returns: The result from the operation """ - # If this is an async PGMQueue, use the async operation path - if self.is_async: - # Get the async version of the operation - async_op_name = op_sync.__name__ + '_async' - if not hasattr(PGMQOperation, async_op_name): - raise AttributeError( - f"Async version of operation '{op_sync.__name__}' not found. " - f"Expected '{async_op_name}' to exist in PGMQOperation." - ) - op_async = getattr(PGMQOperation, async_op_name) - return self.loop.run_until_complete( - self._execute_async_operation(op_async, session, commit, *args, **kwargs) - ) - if session is None: with self.session_maker() as s: return op_sync(*args, session=s, commit=commit, **kwargs) diff --git a/tests/_async_utils.py b/tests/_async_utils.py new file mode 100644 index 0000000..c10ad63 --- /dev/null +++ b/tests/_async_utils.py @@ -0,0 +1,31 @@ +"""Utility functions for handling both sync and async PGMQueue instances in tests.""" + +from typing import Any, Optional +from pgmq_sqlalchemy import PGMQueue + + +def call_method(pgmq: PGMQueue, method_name: str, *args, **kwargs) -> Any: + """ + Call a method on PGMQueue, automatically handling sync vs async. + + For async PGMQueue instances, calls the async version of the method + and runs it with loop.run_until_complete. + + Args: + pgmq: The PGMQueue instance + method_name: Name of the method to call (without _async suffix) + *args: Positional arguments to pass to the method + **kwargs: Keyword arguments to pass to the method + + Returns: + The result from the method call + """ + if pgmq.is_async: + # Call the async version + async_method_name = method_name + '_async' + async_method = getattr(pgmq, async_method_name) + return pgmq.loop.run_until_complete(async_method(*args, **kwargs)) + else: + # Call the sync version + method = getattr(pgmq, method_name) + return method(*args, **kwargs) diff --git a/tests/fixture_deps.py b/tests/fixture_deps.py index b1bc6e8..1889ba6 100644 --- a/tests/fixture_deps.py +++ b/tests/fixture_deps.py @@ -49,10 +49,22 @@ def test_something(pgmq_setup_teardown): pgmq = pgmq_all_variants queue_name = f"test_queue_{uuid.uuid4().hex}" assert check_queue_exists(db_session, queue_name) is False - pgmq.create_queue(queue_name) + + # Handle both sync and async PGMQueue + if pgmq.is_async: + pgmq.loop.run_until_complete(pgmq.create_queue_async(queue_name)) + else: + pgmq.create_queue(queue_name) + assert check_queue_exists(db_session, queue_name) is True yield pgmq, queue_name - pgmq.drop_queue(queue_name) + + # Handle both sync and async PGMQueue + if pgmq.is_async: + pgmq.loop.run_until_complete(pgmq.drop_queue_async(queue_name)) + else: + pgmq.drop_queue(queue_name) + assert check_queue_exists(db_session, queue_name) is False @@ -79,8 +91,20 @@ def test_something(pgmq_partitioned_setup_teardown): pgmq: PGMQueue = pgmq_all_variants queue_name = f"test_queue_{uuid.uuid4().hex}" assert check_queue_exists(db_session, queue_name) is False - pgmq.create_partitioned_queue(queue_name) + + # Handle both sync and async PGMQueue + if pgmq.is_async: + pgmq.loop.run_until_complete(pgmq.create_partitioned_queue_async(queue_name)) + else: + pgmq.create_partitioned_queue(queue_name) + assert check_queue_exists(db_session, queue_name) is True yield pgmq, queue_name - pgmq.drop_queue(queue_name, partitioned=True) + + # Handle both sync and async PGMQueue + if pgmq.is_async: + pgmq.loop.run_until_complete(pgmq.drop_queue_async(queue_name, partitioned=True)) + else: + pgmq.drop_queue(queue_name, partitioned=True) + assert check_queue_exists(db_session, queue_name) is False diff --git a/tests/test_queue.py b/tests/test_queue.py index 9062b36..e63f093 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -14,6 +14,7 @@ ) from tests._utils import check_queue_exists +from tests._async_utils import call_method from tests.constant import MSG, LOCK_FILE_NAME use_fixtures = [ @@ -25,22 +26,22 @@ def test_create_queue(pgmq_all_variants, db_session): pgmq: PGMQueue = pgmq_all_variants queue_name = f"test_queue_{uuid.uuid4().hex}" - pgmq.create_queue(queue_name) + call_method(pgmq, "create_queue", queue_name) assert check_queue_exists(db_session, queue_name) is True def test_create_partitioned_queue(pgmq_all_variants, db_session): pgmq: PGMQueue = pgmq_all_variants queue_name = f"test_queue_{uuid.uuid4().hex}" - pgmq.create_partitioned_queue(queue_name) + call_method(pgmq, "create_partitioned_queue", queue_name) assert check_queue_exists(db_session, queue_name) is True def test_create_same_queue(pgmq_setup_teardown: PGMQ_WITH_QUEUE, db_session): pgmq, queue_name = pgmq_setup_teardown - pgmq.create_queue(queue_name) + call_method(pgmq, "create_queue", queue_name) assert check_queue_exists(db_session, queue_name) is True - pgmq.create_queue(queue_name) + call_method(pgmq, "create_queue", queue_name) # `create_queue` with the same queue name should not raise an exception # and the queue should still exist assert check_queue_exists(db_session, queue_name) is True @@ -49,10 +50,10 @@ def test_create_same_queue(pgmq_setup_teardown: PGMQ_WITH_QUEUE, db_session): def test_validate_queue_name(pgmq_all_variants): pgmq: PGMQueue = pgmq_all_variants queue_name = f"test_queue_{uuid.uuid4().hex}" - pgmq.validate_queue_name(queue_name) + call_method(pgmq, "validate_queue_name", queue_name) # `queue_name` should be a less than 48 characters with pytest.raises(Exception) as e: - pgmq.validate_queue_name("a" * 49) + call_method(pgmq, "validate_queue_name", "a" * 49) error_msg: str = str(e.value.orig) assert "queue name is too long, maximum length is 48 characters" in error_msg @@ -67,7 +68,7 @@ def test_drop_non_exist_queue(pgmq_all_variants, db_session): queue_name = f"test_queue_{uuid.uuid4().hex}" assert check_queue_exists(db_session, queue_name) is False with pytest.raises(ProgrammingError): - pgmq.drop_queue(queue_name) + call_method(pgmq, "drop_queue", queue_name) def test_drop_partitioned_queue(pgmq_partitioned_setup_teardown: PGMQ_WITH_QUEUE): @@ -80,26 +81,26 @@ def test_drop_non_exist_partitioned_queue(pgmq_all_variants, db_session): queue_name = f"test_queue_{uuid.uuid4().hex}" assert check_queue_exists(db_session, queue_name) is False with pytest.raises(ProgrammingError): - pgmq.drop_queue(queue_name, partitioned=True) + call_method(pgmq, "drop_queue", queue_name, partitioned=True) def test_list_queues(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown - queues = pgmq.list_queues() + queues = call_method(pgmq, "list_queues", ) assert queue_name in queues def test_list_partitioned_queues(pgmq_partitioned_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_partitioned_setup_teardown - queues = pgmq.list_queues() + queues = call_method(pgmq, "list_queues", ) assert queue_name in queues def test_send_and_read_msg(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_id: int = pgmq.send(queue_name, msg) - msg_read = pgmq.read(queue_name) + msg_id: int = call_method(pgmq, "send", queue_name, msg) + msg_read = call_method(pgmq, "read", queue_name) assert msg_read.message == msg assert msg_read.msg_id == msg_id @@ -107,14 +108,14 @@ def test_send_and_read_msg(pgmq_setup_teardown: PGMQ_WITH_QUEUE): def test_send_and_read_msg_with_delay(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_id: int = pgmq.send(queue_name, msg, delay=2) - msg_read = pgmq.read(queue_name) + msg_id: int = call_method(pgmq, "send", queue_name, msg, delay=2) + msg_read = call_method(pgmq, "read", queue_name) assert msg_read is None time.sleep(1) - msg_read = pgmq.read(queue_name) + msg_read = call_method(pgmq, "read", queue_name) assert msg_read is None time.sleep(1.1) - msg_read = pgmq.read(queue_name) + msg_read = call_method(pgmq, "read", queue_name) assert msg_read.message == msg assert msg_read.msg_id == msg_id @@ -122,15 +123,15 @@ def test_send_and_read_msg_with_delay(pgmq_setup_teardown: PGMQ_WITH_QUEUE): def test_send_and_read_msg_with_vt(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_id: int = pgmq.send(queue_name, msg) - msg_read = pgmq.read(queue_name, vt=2) + msg_id: int = call_method(pgmq, "send", queue_name, msg) + msg_read = call_method(pgmq, "read", queue_name, vt=2) assert msg_read.message == msg assert msg_read.msg_id == msg_id time.sleep(1.5) - msg_read = pgmq.read(queue_name) + msg_read = call_method(pgmq, "read", queue_name) assert msg_read is None time.sleep(0.6) - msg_read = pgmq.read(queue_name) + msg_read = call_method(pgmq, "read", queue_name) assert msg_read.message == msg assert msg_read.msg_id == msg_id @@ -138,21 +139,21 @@ def test_send_and_read_msg_with_vt(pgmq_setup_teardown: PGMQ_WITH_QUEUE): def test_send_and_read_msg_with_vt_and_delay(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_id: int = pgmq.send(queue_name, msg, delay=2) - msg_read = pgmq.read(queue_name, vt=2) + msg_id: int = call_method(pgmq, "send", queue_name, msg, delay=2) + msg_read = call_method(pgmq, "read", queue_name, vt=2) assert msg_read is None time.sleep(1) - msg_read = pgmq.read(queue_name, vt=2) + msg_read = call_method(pgmq, "read", queue_name, vt=2) assert msg_read is None time.sleep(1.1) - msg_read = pgmq.read(queue_name, vt=2) + msg_read = call_method(pgmq, "read", queue_name, vt=2) assert msg_read.message == msg assert msg_read.msg_id == msg_id time.sleep(1.5) - msg_read = pgmq.read(queue_name) + msg_read = call_method(pgmq, "read", queue_name) assert msg_read is None time.sleep(0.6) - msg_read = pgmq.read(queue_name) + msg_read = call_method(pgmq, "read", queue_name) assert msg_read.message == msg assert msg_read.msg_id == msg_id @@ -161,29 +162,29 @@ def test_send_and_read_msg_with_vt_zero(pgmq_setup_teardown: PGMQ_WITH_QUEUE): """Test that vt=0 works correctly and message becomes visible immediately.""" pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_id: int = pgmq.send(queue_name, msg) + msg_id: int = call_method(pgmq, "send", queue_name, msg) # Read with vt=0 means message should be immediately visible again - msg_read = pgmq.read(queue_name, vt=0) + msg_read = call_method(pgmq, "read", queue_name, vt=0) assert msg_read.message == msg assert msg_read.msg_id == msg_id # Message should be visible immediately (no waiting) - msg_read = pgmq.read(queue_name) + msg_read = call_method(pgmq, "read", queue_name) assert msg_read.message == msg assert msg_read.msg_id == msg_id def test_read_empty_queue(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown - msg_read = pgmq.read(queue_name) + msg_read = call_method(pgmq, "read", queue_name) assert msg_read is None def test_read_batch(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_id_1: int = pgmq.send(queue_name, msg) - msg_id_2: int = pgmq.send(queue_name, msg) - msg_read = pgmq.read_batch(queue_name, 3) + msg_id_1: int = call_method(pgmq, "send", queue_name, msg) + msg_id_2: int = call_method(pgmq, "send", queue_name, msg) + msg_read = call_method(pgmq, "read_batch", queue_name, 3) assert len(msg_read) == 2 assert msg_read[0].message == msg assert msg_read[0].msg_id == msg_id_1 @@ -193,14 +194,14 @@ def test_read_batch(pgmq_setup_teardown: PGMQ_WITH_QUEUE): def test_read_batch_empty_queue(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown - msg_read = pgmq.read_batch(queue_name, 3) + msg_read = call_method(pgmq, "read_batch", queue_name, 3) assert msg_read is None def test_send_batch(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_ids = pgmq.send_batch(queue_name=queue_name, messages=[msg, msg, msg]) + msg_ids = call_method(pgmq, "send_batch", queue_name=queue_name, messages=[msg, msg, msg]) assert len(msg_ids) == 3 assert msg_ids == [1, 2, 3] @@ -208,10 +209,10 @@ def test_send_batch(pgmq_setup_teardown: PGMQ_WITH_QUEUE): def test_send_batch_with_read_batch(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_ids = pgmq.send_batch(queue_name=queue_name, messages=[msg, msg, msg]) + msg_ids = call_method(pgmq, "send_batch", queue_name=queue_name, messages=[msg, msg, msg]) assert len(msg_ids) == 3 assert msg_ids == [1, 2, 3] - msg_read_batch = pgmq.read_batch(queue_name, 3) + msg_read_batch = call_method(pgmq, "read_batch", queue_name, 3) assert len(msg_read_batch) == 3 assert [msg_read.message for msg_read in msg_read_batch] == [msg, msg, msg] assert [msg_read.msg_id for msg_read in msg_read_batch] == [1, 2, 3] @@ -220,9 +221,9 @@ def test_send_batch_with_read_batch(pgmq_setup_teardown: PGMQ_WITH_QUEUE): def test_read_with_poll(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_ids = pgmq.send_batch(queue_name, [msg, msg, msg, msg, msg], delay=2) + msg_ids = call_method(pgmq, "send_batch", queue_name, [msg, msg, msg, msg, msg], delay=2) start_time = time.time() - msg_reads = pgmq.read_with_poll( + msg_reads = call_method(pgmq, "read_with_poll", queue_name, vt=1000, qty=3, @@ -239,7 +240,7 @@ def test_read_with_poll(pgmq_setup_teardown: PGMQ_WITH_QUEUE): def test_read_with_poll_with_empty_queue(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown start_time = time.time() - msg_reads = pgmq.read_with_poll( + msg_reads = call_method(pgmq, "read_with_poll", queue_name, vt=1000, qty=3, @@ -255,62 +256,62 @@ def test_read_with_poll_with_empty_queue(pgmq_setup_teardown: PGMQ_WITH_QUEUE): def test_set_vt(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_id = pgmq.send(queue_name, msg) - msg_read = pgmq.set_vt(queue_name, msg_id, 2) + msg_id = call_method(pgmq, "send", queue_name, msg) + msg_read = call_method(pgmq, "set_vt", queue_name, msg_id, 2) assert msg is not None - assert pgmq.read(queue_name) is None + assert call_method(pgmq, "read", queue_name) is None time.sleep(1.5) - assert pgmq.read(queue_name) is None + assert call_method(pgmq, "read", queue_name) is None time.sleep(0.6) - msg_read = pgmq.read(queue_name) + msg_read = call_method(pgmq, "read", queue_name) assert msg_read.message == msg def test_set_vt_to_smaller_value(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_id = pgmq.send(queue_name, msg) - _ = pgmq.read(queue_name, vt=5) # set vt to 5 seconds + msg_id = call_method(pgmq, "send", queue_name, msg) + _ = call_method(pgmq, "read", queue_name, vt=5) # set vt to 5 seconds assert msg is not None - assert pgmq.read(queue_name) is None + assert call_method(pgmq, "read", queue_name) is None time.sleep(0.5) - assert pgmq.set_vt(queue_name, msg_id, 1) is not None + assert call_method(pgmq, "set_vt", queue_name, msg_id, 1) is not None time.sleep(0.3) - assert pgmq.read(queue_name) is None + assert call_method(pgmq, "read", queue_name) is None time.sleep(0.8) - assert pgmq.read(queue_name) is not None + assert call_method(pgmq, "read", queue_name) is not None def test_set_vt_not_exist(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown - msg_updated = pgmq.set_vt(queue_name, 999, 20) + msg_updated = call_method(pgmq, "set_vt", queue_name, 999, 20) assert msg_updated is None def test_pop(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_ids = pgmq.send_batch(queue_name, [msg, msg, msg]) - msg = pgmq.pop(queue_name) + msg_ids = call_method(pgmq, "send_batch", queue_name, [msg, msg, msg]) + msg = call_method(pgmq, "pop", queue_name) assert msg.msg_id == msg_ids[0] assert msg.message == MSG - msg_reads = pgmq.read_batch(queue_name, 3) + msg_reads = call_method(pgmq, "read_batch", queue_name, 3) assert len(msg_reads) == 2 assert [msg_read.msg_id for msg_read in msg_reads] == msg_ids[1:] def test_pop_empty_queue(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown - msg = pgmq.pop(queue_name) + msg = call_method(pgmq, "pop", queue_name) assert msg is None def test_delete_msg(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_ids = pgmq.send_batch(queue_name, [msg, msg, msg]) - assert pgmq.delete(queue_name, msg_ids[1]) is True - msg_reads = pgmq.read_batch(queue_name, 3) + msg_ids = call_method(pgmq, "send_batch", queue_name, [msg, msg, msg]) + assert call_method(pgmq, "delete", queue_name, msg_ids[1]) is True + msg_reads = call_method(pgmq, "read_batch", queue_name, 3) assert len(msg_reads) == 2 assert [msg_read.msg_id for msg_read in msg_reads] == [msg_ids[0], msg_ids[2]] @@ -318,9 +319,9 @@ def test_delete_msg(pgmq_setup_teardown: PGMQ_WITH_QUEUE): def test_delete_msg_not_exist(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_ids = pgmq.send_batch(queue_name, [msg, msg, msg]) - assert pgmq.delete(queue_name, 999) is False - msg_reads = pgmq.read_batch(queue_name, 3) + msg_ids = call_method(pgmq, "send_batch", queue_name, [msg, msg, msg]) + assert call_method(pgmq, "delete", queue_name, 999) is False + msg_reads = call_method(pgmq, "read_batch", queue_name, 3) assert len(msg_reads) == 3 assert [msg_read.msg_id for msg_read in msg_reads] == msg_ids @@ -328,12 +329,12 @@ def test_delete_msg_not_exist(pgmq_setup_teardown: PGMQ_WITH_QUEUE): def test_delete_batch(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_ids = pgmq.send_batch(queue_name, [msg, msg, msg]) - assert pgmq.delete_batch(queue_name, [msg_ids[0], msg_ids[2]]) == [ + msg_ids = call_method(pgmq, "send_batch", queue_name, [msg, msg, msg]) + assert call_method(pgmq, "delete_batch", queue_name, [msg_ids[0], msg_ids[2]]) == [ msg_ids[0], msg_ids[2], ] - msg_reads = pgmq.read_batch(queue_name, 3) + msg_reads = call_method(pgmq, "read_batch", queue_name, 3) assert len(msg_reads) == 1 assert [msg_read.msg_id for msg_read in msg_reads] == [msg_ids[1]] @@ -341,9 +342,9 @@ def test_delete_batch(pgmq_setup_teardown: PGMQ_WITH_QUEUE): def test_delete_batch_not_exist(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_ids = pgmq.send_batch(queue_name, [msg, msg, msg]) - assert pgmq.delete_batch(queue_name, [999, 998]) == [] - msg_reads = pgmq.read_batch(queue_name, 3) + msg_ids = call_method(pgmq, "send_batch", queue_name, [msg, msg, msg]) + assert call_method(pgmq, "delete_batch", queue_name, [999, 998]) == [] + msg_reads = call_method(pgmq, "read_batch", queue_name, 3) assert len(msg_reads) == 3 assert [msg_read.msg_id for msg_read in msg_reads] == msg_ids @@ -351,9 +352,9 @@ def test_delete_batch_not_exist(pgmq_setup_teardown: PGMQ_WITH_QUEUE): def test_archive(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_ids = pgmq.send_batch(queue_name, [msg, msg, msg]) - assert pgmq.archive(queue_name, msg_ids[0]) is True - msg_reads = pgmq.read_batch(queue_name, 3) + msg_ids = call_method(pgmq, "send_batch", queue_name, [msg, msg, msg]) + assert call_method(pgmq, "archive", queue_name, msg_ids[0]) is True + msg_reads = call_method(pgmq, "read_batch", queue_name, 3) assert len(msg_reads) == 2 assert [msg_read.msg_id for msg_read in msg_reads] == [msg_ids[1], msg_ids[2]] @@ -361,9 +362,9 @@ def test_archive(pgmq_setup_teardown: PGMQ_WITH_QUEUE): def test_archive_not_exist(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_ids = pgmq.send_batch(queue_name, [msg, msg, msg]) - assert pgmq.archive(queue_name, 999) is False - msg_reads = pgmq.read_batch(queue_name, 3) + msg_ids = call_method(pgmq, "send_batch", queue_name, [msg, msg, msg]) + assert call_method(pgmq, "archive", queue_name, 999) is False + msg_reads = call_method(pgmq, "read_batch", queue_name, 3) assert len(msg_reads) == 3 assert [msg_read.msg_id for msg_read in msg_reads] == msg_ids @@ -371,12 +372,12 @@ def test_archive_not_exist(pgmq_setup_teardown: PGMQ_WITH_QUEUE): def test_archive_batch(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_ids = pgmq.send_batch(queue_name, [msg, msg, msg]) - assert pgmq.archive_batch(queue_name, [msg_ids[0], msg_ids[2]]) == [ + msg_ids = call_method(pgmq, "send_batch", queue_name, [msg, msg, msg]) + assert call_method(pgmq, "archive_batch", queue_name, [msg_ids[0], msg_ids[2]]) == [ msg_ids[0], msg_ids[2], ] - msg_reads = pgmq.read_batch(queue_name, 3) + msg_reads = call_method(pgmq, "read_batch", queue_name, 3) assert len(msg_reads) == 1 assert [msg_read.msg_id for msg_read in msg_reads] == [msg_ids[1]] @@ -384,9 +385,9 @@ def test_archive_batch(pgmq_setup_teardown: PGMQ_WITH_QUEUE): def test_archive_batch_not_exist(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - msg_ids = pgmq.send_batch(queue_name, [msg, msg, msg]) - assert pgmq.archive_batch(queue_name, [999, 998]) == [] - msg_reads = pgmq.read_batch(queue_name, 3) + msg_ids = call_method(pgmq, "send_batch", queue_name, [msg, msg, msg]) + assert call_method(pgmq, "archive_batch", queue_name, [999, 998]) == [] + msg_reads = call_method(pgmq, "read_batch", queue_name, 3) assert len(msg_reads) == 3 assert [msg_read.msg_id for msg_read in msg_reads] == msg_ids @@ -394,14 +395,14 @@ def test_archive_batch_not_exist(pgmq_setup_teardown: PGMQ_WITH_QUEUE): def test_purge(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown msg = MSG - assert pgmq.purge(queue_name) == 0 - pgmq.send_batch(queue_name, [msg, msg, msg]) - assert pgmq.purge(queue_name) == 3 + assert call_method(pgmq, "purge", queue_name) == 0 + call_method(pgmq, "send_batch", queue_name, [msg, msg, msg]) + assert call_method(pgmq, "purge", queue_name) == 3 def test_metrics(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown - metrics = pgmq.metrics(queue_name) + metrics = call_method(pgmq, "metrics", queue_name) assert metrics is not None assert metrics.queue_name == queue_name assert metrics.queue_length == 0 @@ -418,10 +419,10 @@ def test_metrics_all_queues(pgmq_setup_teardown: PGMQ_WITH_QUEUE): with FileLock(LOCK_FILE_NAME): pgmq, queue_name_1 = pgmq_setup_teardown queue_name_2 = f"test_queue_{uuid.uuid4().hex}" - pgmq.create_queue(queue_name_2) - pgmq.send_batch(queue_name_1, [MSG, MSG, MSG]) - pgmq.send_batch(queue_name_2, [MSG, MSG]) - metrics_all = pgmq.metrics_all() + call_method(pgmq, "create_queue", queue_name_2) + call_method(pgmq, "send_batch", queue_name_1, [MSG, MSG, MSG]) + call_method(pgmq, "send_batch", queue_name_2, [MSG, MSG]) + metrics_all = call_method(pgmq, "metrics_all", ) queue_1 = [q for q in metrics_all if q.queue_name == queue_name_1][0] queue_2 = [q for q in metrics_all if q.queue_name == queue_name_2][0] assert queue_1.queue_length == 3 @@ -434,7 +435,7 @@ def test_metrics_all_queues(pgmq_setup_teardown: PGMQ_WITH_QUEUE): def test_create_time_based_partitioned_queue(pgmq_all_variants, db_session): pgmq: PGMQueue = pgmq_all_variants queue_name = f"test_queue_{uuid.uuid4().hex}" - pgmq.create_partitioned_queue( + call_method(pgmq, "create_partitioned_queue", queue_name, partition_interval="1 day", retention_interval="7 days" ) assert check_queue_exists(db_session, queue_name) is True @@ -447,14 +448,14 @@ def test_create_time_based_partitioned_queue_various_intervals( # Test with hour queue_name_hour = f"test_queue_{uuid.uuid4().hex}" - pgmq.create_partitioned_queue( + call_method(pgmq, "create_partitioned_queue", queue_name_hour, partition_interval="1 hour", retention_interval="24 hours" ) assert check_queue_exists(db_session, queue_name_hour) is True # Test with week queue_name_week = f"test_queue_{uuid.uuid4().hex}" - pgmq.create_partitioned_queue( + call_method(pgmq, "create_partitioned_queue", queue_name_week, partition_interval="1 week", retention_interval="4 weeks" ) assert check_queue_exists(db_session, queue_name_week) is True @@ -464,7 +465,7 @@ def test_create_partitioned_queue_invalid_time_interval(pgmq_all_variants): pgmq: PGMQueue = pgmq_all_variants queue_name = f"test_queue_{uuid.uuid4().hex}" with pytest.raises(ValueError) as e: - pgmq.create_partitioned_queue( + call_method(pgmq, "create_partitioned_queue", queue_name, partition_interval="invalid interval", retention_interval="7 days", @@ -476,7 +477,7 @@ def test_create_partitioned_queue_invalid_numeric_interval(pgmq_all_variants): pgmq: PGMQueue = pgmq_all_variants queue_name = f"test_queue_{uuid.uuid4().hex}" with pytest.raises(ValueError) as e: - pgmq.create_partitioned_queue( + call_method(pgmq, "create_partitioned_queue", queue_name, partition_interval=-100, retention_interval=100000 ) assert "Numeric partition interval must be positive" in str(e.value) @@ -491,11 +492,11 @@ def test_read_with_poll_without_vt(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq.vt = 100 # Send a message - msg_id = pgmq.send(queue_name, MSG) + msg_id = call_method(pgmq, "send", queue_name, MSG) # Call read_with_poll with vt=None to test the fallback logic # When vt is None, it should fall back to using pgmq.vt value (100) - msgs = pgmq.read_with_poll( + msgs = call_method(pgmq, "read_with_poll", queue_name, vt=None, # Explicitly passing None to test the None fallback logic qty=1, @@ -520,23 +521,23 @@ def test_execute_operation_with_provided_sync_session(pgmq_by_session_maker, get # that the sync path with provided session works correctly with get_session_maker() as session: # Create queue with provided session - pgmq.create_queue(queue_name, session=session) + call_method(pgmq, "create_queue", queue_name, session=session) # Verify queue was created assert check_queue_exists(db_session, queue_name) is True # Send a message with the same provided session - msg_id = pgmq.send(queue_name, MSG, session=session) + msg_id = call_method(pgmq, "send", queue_name, MSG, session=session) # Read message with the same provided session - msg = pgmq.read(queue_name, vt=30, session=session) + msg = call_method(pgmq, "read", queue_name, vt=30, session=session) assert msg is not None assert msg.msg_id == msg_id assert msg.message == MSG # Clean up with the same provided session - pgmq.drop_queue(queue_name, session=session) + call_method(pgmq, "drop_queue", queue_name, session=session) # Verify queue was dropped assert check_queue_exists(db_session, queue_name) is False @@ -553,16 +554,16 @@ def test_execute_operation_async_with_session_none(pgmq_by_async_dsn, db_session # When session is None, _execute_operation creates a new async session # and uses loop.run_until_complete to execute the operation - pgmq.create_queue(queue_name) - msg_id = pgmq.send(queue_name, MSG) - msg = pgmq.read(queue_name, vt=30) + call_method(pgmq, "create_queue", queue_name) + msg_id = call_method(pgmq, "send", queue_name, MSG) + msg = call_method(pgmq, "read", queue_name, vt=30) assert msg is not None assert msg.msg_id == msg_id assert msg.message == MSG # Clean up - pgmq.drop_queue(queue_name) + call_method(pgmq, "drop_queue", queue_name) # Verify queue was dropped assert check_queue_exists(db_session, queue_name) is False From a348a09ca2c5e50f1575c017478a49f47a385ac8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 6 Jan 2026 06:10:16 +0000 Subject: [PATCH 5/7] Fix nested event loop issue in async check methods Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- pgmq_sqlalchemy/queue.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/pgmq_sqlalchemy/queue.py b/pgmq_sqlalchemy/queue.py index e6e3687..afc2658 100644 --- a/pgmq_sqlalchemy/queue.py +++ b/pgmq_sqlalchemy/queue.py @@ -126,22 +126,26 @@ def __init__( def _check_pgmq_ext(self) -> None: """Check if the pgmq extension exists.""" if self.is_async: - self.loop.run_until_complete( - self._execute_async_operation(PGMQOperation.check_pgmq_ext_async, session=None, commit=True) - ) + self.loop.run_until_complete(self._check_pgmq_ext_async()) else: self._execute_operation(PGMQOperation.check_pgmq_ext, session=None, commit=True) + + async def _check_pgmq_ext_async(self) -> None: + """Check if the pgmq extension exists (async version).""" + await self._execute_async_operation(PGMQOperation.check_pgmq_ext_async, session=None, commit=True) def _check_pg_partman_ext(self) -> None: """Check if the pg_partman extension exists.""" if self.is_async: - self.loop.run_until_complete( - self._execute_async_operation(PGMQOperation.check_pg_partman_ext_async, session=None, commit=True) - ) + self.loop.run_until_complete(self._check_pg_partman_ext_async()) else: self._execute_operation( PGMQOperation.check_pg_partman_ext, session=None, commit=True ) + + async def _check_pg_partman_ext_async(self) -> None: + """Check if the pg_partman extension exists (async version).""" + await self._execute_async_operation(PGMQOperation.check_pg_partman_ext_async, session=None, commit=True) def _execute_operation( self, @@ -352,7 +356,7 @@ async def create_partitioned_queue_async( """ # check if the pg_partman extension exists before creating a partitioned queue at runtime - self._check_pg_partman_ext() + await self._check_pg_partman_ext_async() return await self._execute_async_operation( PGMQOperation.create_partitioned_queue_async, @@ -461,7 +465,7 @@ async def drop_queue_async( """ # check if the pg_partman extension exists before dropping a partitioned queue at runtime if partitioned: - self._check_pg_partman_ext() + await self._check_pg_partman_ext_async() return await self._execute_async_operation( PGMQOperation.drop_queue_async, From d364dfd93afcdab69764ef1acc109311b699ec4e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 6 Jan 2026 06:12:11 +0000 Subject: [PATCH 6/7] Fix code style: remove trailing commas in function calls Co-authored-by: jason810496 <68415893+jason810496@users.noreply.github.com> --- tests/test_queue.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index e63f093..a205f5c 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -86,13 +86,13 @@ def test_drop_non_exist_partitioned_queue(pgmq_all_variants, db_session): def test_list_queues(pgmq_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_setup_teardown - queues = call_method(pgmq, "list_queues", ) + queues = call_method(pgmq, "list_queues") assert queue_name in queues def test_list_partitioned_queues(pgmq_partitioned_setup_teardown: PGMQ_WITH_QUEUE): pgmq, queue_name = pgmq_partitioned_setup_teardown - queues = call_method(pgmq, "list_queues", ) + queues = call_method(pgmq, "list_queues") assert queue_name in queues @@ -422,7 +422,7 @@ def test_metrics_all_queues(pgmq_setup_teardown: PGMQ_WITH_QUEUE): call_method(pgmq, "create_queue", queue_name_2) call_method(pgmq, "send_batch", queue_name_1, [MSG, MSG, MSG]) call_method(pgmq, "send_batch", queue_name_2, [MSG, MSG]) - metrics_all = call_method(pgmq, "metrics_all", ) + metrics_all = call_method(pgmq, "metrics_all") queue_1 = [q for q in metrics_all if q.queue_name == queue_name_1][0] queue_2 = [q for q in metrics_all if q.queue_name == queue_name_2][0] assert queue_1.queue_length == 3 From d0ac04d874cf86caff00b486d916e734fab0ea5d Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Tue, 6 Jan 2026 14:18:58 +0800 Subject: [PATCH 7/7] Fix queue not await error --- pgmq_sqlalchemy/queue.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/pgmq_sqlalchemy/queue.py b/pgmq_sqlalchemy/queue.py index afc2658..b307318 100644 --- a/pgmq_sqlalchemy/queue.py +++ b/pgmq_sqlalchemy/queue.py @@ -125,27 +125,25 @@ def __init__( def _check_pgmq_ext(self) -> None: """Check if the pgmq extension exists.""" - if self.is_async: - self.loop.run_until_complete(self._check_pgmq_ext_async()) - else: - self._execute_operation(PGMQOperation.check_pgmq_ext, session=None, commit=True) - + self._execute_operation(PGMQOperation.check_pgmq_ext, session=None, commit=True) + async def _check_pgmq_ext_async(self) -> None: """Check if the pgmq extension exists (async version).""" - await self._execute_async_operation(PGMQOperation.check_pgmq_ext_async, session=None, commit=True) + await self._execute_async_operation( + PGMQOperation.check_pgmq_ext_async, session=None, commit=True + ) def _check_pg_partman_ext(self) -> None: """Check if the pg_partman extension exists.""" - if self.is_async: - self.loop.run_until_complete(self._check_pg_partman_ext_async()) - else: - self._execute_operation( - PGMQOperation.check_pg_partman_ext, session=None, commit=True - ) - + self._execute_operation( + PGMQOperation.check_pg_partman_ext, session=None, commit=True + ) + async def _check_pg_partman_ext_async(self) -> None: """Check if the pg_partman extension exists (async version).""" - await self._execute_async_operation(PGMQOperation.check_pg_partman_ext_async, session=None, commit=True) + await self._execute_async_operation( + PGMQOperation.check_pg_partman_ext_async, session=None, commit=True + ) def _execute_operation( self,