Skip to content

Commit 21db4e5

Browse files
1owkeymem.bahov
andauthored
Inject AMQP Publisher into AMQP MessageBroker (#20)
* Inject AMQP Publisher into AMQP MessageBroker * Update version * Provide adapters protocol --------- Co-authored-by: m.bahov <m.bahov@vkteam.ru>
1 parent 1a94c9f commit 21db4e5

7 files changed

Lines changed: 89 additions & 48 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}]
2929
name = "python-cqrs"
3030
readme = "README.md"
3131
requires-python = ">=3.10"
32-
version = "3.1.1"
32+
version = "4.0.0"
3333

3434
[project.optional-dependencies]
3535
dev = [

src/cqrs/adapters/amqp.py

Lines changed: 50 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import typing
33
from functools import partial
44

5+
from cqrs.adapters import protocol
6+
57
import aio_pika
68
from aio_pika import abc, pool
79

@@ -10,46 +12,28 @@ async def connection_pool_factory(url: str) -> abc.AbstractRobustConnection:
1012
return await aio_pika.connect_robust(url=url)
1113

1214

13-
async def channel_pool_factory(connection_pool: pool.Pool) -> aio_pika.Channel:
15+
async def channel_pool_factory(
16+
connection_pool: pool.Pool[aio_pika.abc.AbstractConnection],
17+
) -> aio_pika.abc.AbstractChannel:
1418
async with connection_pool.acquire() as connection:
1519
return await connection.channel()
1620

1721

18-
class AMQPPublisher:
19-
def __init__(self, url: str, max_connection_pool_size=2, max_channel_pool_size=10):
20-
self.url = url
21-
self.max_connection_pool_size = max_connection_pool_size
22-
self.max_channel_pool_size = max_channel_pool_size
23-
self.connection_pool: pool.Pool = pool.Pool(
24-
partial(connection_pool_factory, url=url),
25-
max_size=self.max_connection_pool_size,
26-
)
27-
self.channel_pool: pool.Pool = pool.Pool(
28-
partial(channel_pool_factory, connection_pool=self.connection_pool),
29-
max_size=self.max_channel_pool_size,
30-
)
22+
class AMQPPublisher(protocol.AMQPPublisher):
23+
def __init__(self, channel_pool: pool.Pool[aio_pika.abc.AbstractChannel]):
24+
self.channel_pool = channel_pool
3125

3226
async def publish(self, message: abc.AbstractMessage, queue_name: str, exchange_name: str) -> None:
3327
async with self.channel_pool.acquire() as channel:
34-
queue: aio_pika.Queue = await channel.declare_queue(queue_name)
35-
exchange: aio_pika.Exchange = await channel.declare_exchange(exchange_name, type="direct", auto_delete=True)
28+
queue = await channel.declare_queue(queue_name)
29+
exchange = await channel.declare_exchange(exchange_name, type="direct", auto_delete=True)
3630
await queue.bind(exchange=exchange, routing_key=queue_name)
3731
await exchange.publish(message=message, routing_key=queue_name)
3832

3933

40-
class AMQPConsumer:
41-
def __init__(self, url: str, max_connection_pool_size=2, max_channel_pool_size=10):
42-
self.url = url
43-
self.max_connection_pool_size = max_connection_pool_size
44-
self.max_channel_pool_size = max_channel_pool_size
45-
self.connection_pool: pool.Pool = pool.Pool(
46-
partial(connection_pool_factory, url=url),
47-
max_size=self.max_connection_pool_size,
48-
)
49-
self.channel_pool: pool.Pool = pool.Pool(
50-
partial(channel_pool_factory, connection_pool=self.connection_pool),
51-
max_size=self.max_channel_pool_size,
52-
)
34+
class AMQPConsumer(protocol.AMQPConsumer):
35+
def __init__(self, channel_pool: pool.Pool[aio_pika.abc.AbstractChannel]):
36+
self.channel_pool = channel_pool
5337

5438
async def consume(
5539
self,
@@ -59,5 +43,41 @@ async def consume(
5943
async with self.channel_pool.acquire() as channel:
6044
await channel.set_qos(prefetch_count=1)
6145
queue = await channel.declare_queue(queue_name)
62-
await queue._consume(handler)
46+
await queue.consume(handler)
6347
await asyncio.Future()
48+
49+
50+
def amqp_publisher_factory(
51+
url: typing.Text,
52+
max_connection_pool_size: int = 2,
53+
max_channel_pool_size: int = 10,
54+
) -> AMQPPublisher:
55+
max_connection_pool_size = max_connection_pool_size
56+
max_channel_pool_size = max_channel_pool_size
57+
connection_pool = pool.Pool(
58+
partial(connection_pool_factory, url=url),
59+
max_size=max_connection_pool_size,
60+
)
61+
channel_pool = pool.Pool(
62+
partial(channel_pool_factory, connection_pool=connection_pool),
63+
max_size=max_channel_pool_size,
64+
)
65+
return AMQPPublisher(channel_pool=channel_pool)
66+
67+
68+
def amqp_consumer_factory(
69+
url: typing.Text,
70+
max_connection_pool_size: int = 2,
71+
max_channel_pool_size: int = 10,
72+
) -> AMQPConsumer:
73+
max_connection_pool_size = max_connection_pool_size
74+
max_channel_pool_size = max_channel_pool_size
75+
connection_pool = pool.Pool(
76+
partial(connection_pool_factory, url=url),
77+
max_size=max_connection_pool_size,
78+
)
79+
channel_pool = pool.Pool(
80+
partial(channel_pool_factory, connection_pool=connection_pool),
81+
max_size=max_channel_pool_size,
82+
)
83+
return AMQPConsumer(channel_pool=channel_pool)

src/cqrs/adapters/kafka.py

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
import ssl
55
import typing
66

7+
from cqrs.adapters import protocol
8+
from cqrs.serializers import default
9+
710
import aiokafka
811
import retry_async
912
from aiokafka import errors
1013

11-
from cqrs.serializers import default
1214

1315
__all__ = (
1416
"KafkaProducer",
@@ -45,16 +47,7 @@
4547
Serializer = typing.Callable[[typing.Any], typing.ByteString | None]
4648

4749

48-
class _Singleton(type):
49-
_instances = {}
50-
51-
def __call__(cls, *args, **kwargs):
52-
if cls not in cls._instances:
53-
cls._instances[cls] = super(_Singleton, cls).__call__(*args, **kwargs)
54-
return cls._instances[cls]
55-
56-
57-
class KafkaProducer(metaclass=_Singleton):
50+
class KafkaProducer(protocol.KafkaProducer):
5851
def __init__(
5952
self,
6053
producer: aiokafka.AIOKafkaProducer,

src/cqrs/adapters/protocol.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import typing
2+
3+
import aio_pika
4+
5+
6+
class KafkaProducer(typing.Protocol):
7+
async def produce(
8+
self,
9+
topic: typing.Text,
10+
message: typing.Any,
11+
) -> None: ...
12+
13+
14+
class AMQPPublisher(typing.Protocol):
15+
async def publish(
16+
self,
17+
message: aio_pika.abc.AbstractMessage,
18+
queue_name: str,
19+
exchange_name: str,
20+
) -> None: ...
21+
22+
23+
class AMQPConsumer(typing.Protocol):
24+
async def consume(
25+
self,
26+
handler: typing.Callable[[aio_pika.abc.AbstractIncomingMessage], typing.Awaitable[None]],
27+
queue_name: str,
28+
) -> None: ...

src/cqrs/message_brokers/amqp.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
import aio_pika
44
import orjson
55

6-
from cqrs.adapters import amqp
6+
from cqrs.adapters import protocol as adapters_protocol
77
from cqrs.message_brokers import protocol
88

99

1010
class AMQPMessageBroker(protocol.MessageBroker):
11-
def __init__(self, dsn: str, exchange_name: str, pika_log_level: str = "ERROR"):
12-
self.publisher = amqp.AMQPPublisher(url=dsn)
11+
def __init__(self, publisher: adapters_protocol.AMQPPublisher, exchange_name: str, pika_log_level: str = "ERROR"):
12+
self.publisher = publisher
1313
self.exchange_name = exchange_name
1414
logging.getLogger("aiormq").setLevel(pika_log_level)
1515
logging.getLogger("aio_pika").setLevel(pika_log_level)

src/cqrs/message_brokers/kafka.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import logging
22
import typing
33

4-
from cqrs.adapters import kafka
4+
from cqrs.adapters import protocol as adapters_protocol
55
from cqrs.message_brokers import protocol
66

77

88
class KafkaMessageBroker(protocol.MessageBroker):
99
def __init__(
1010
self,
11-
producer: kafka.KafkaProducer,
11+
producer: adapters_protocol.KafkaProducer,
1212
aiokafka_log_level: typing.Text = "ERROR",
1313
):
1414
self._producer = producer

src/cqrs/producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ async def send_message(self, event: repository_protocol.OutboxedEvent):
3737
message_name=event.event.event_name,
3838
message_id=event.event.event_id,
3939
topic=event.topic,
40-
payload=event.event,
40+
payload=event.event.model_dump(),
4141
),
4242
)
4343
except Exception as error:

0 commit comments

Comments
 (0)