-
Notifications
You must be signed in to change notification settings - Fork 5
[Refactor] Outbox Architecture Overhaul: PostgreSQL Support, Mixin Models, and UUID v7 #33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
a0c711e
befd929
a0458f7
a595c7d
3a642e4
0ce18ec
733ff7f
c00af44
6525ec3
d0af53d
7b6db8a
90a761b
76c7dbd
6b61232
c04200c
52f67aa
608d0a9
9f3e6b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| from cqrs.outbox.repository import OutboxedEventRepository, EventStatus | ||
| from cqrs.outbox.map import OutboxedEventMap | ||
|
|
||
| __all__ = [ | ||
| "OutboxedEventRepository", | ||
| "EventStatus", | ||
| "OutboxedEventMap", | ||
| ] | ||
|
|
||
| try: | ||
| from cqrs.outbox.sqlalchemy import ( | ||
| SqlAlchemyOutboxedEventRepository, | ||
| OutboxModelMixin | ||
| ) | ||
| __all__.extend([ | ||
| "SqlAlchemyOutboxedEventRepository", | ||
| "OutboxModelMixin" | ||
| ]) | ||
| except ImportError: | ||
| pass |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,93 +1,125 @@ | ||
| import datetime | ||
| import logging | ||
| import typing | ||
|
|
||
| import dotenv | ||
| import uuid | ||
| import orjson | ||
| import sqlalchemy | ||
| from sqlalchemy import func | ||
| from sqlalchemy.dialects import mysql | ||
| from sqlalchemy.ext.asyncio import session as sql_session | ||
| from sqlalchemy.orm import DeclarativeMeta, registry | ||
| import uuid6 | ||
|
|
||
| import cqrs | ||
| from cqrs import compressors | ||
| from cqrs.outbox import map, repository | ||
|
|
||
| Base = registry().generate_base() | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
| try: | ||
| import sqlalchemy | ||
| from sqlalchemy import func | ||
| from sqlalchemy.orm import Mapped, mapped_column, declared_attr | ||
| from sqlalchemy.ext.asyncio import session as sql_session | ||
| from sqlalchemy.dialects import postgresql | ||
| except ImportError: | ||
| raise ImportError( | ||
| "You are trying to use SQLAlchemy outbox implementation, " | ||
| "but 'sqlalchemy' is not installed. " | ||
| "Please install it using: pip install python-cqrs[sqlalchemy]" | ||
| ) | ||
|
|
||
| dotenv.load_dotenv() | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
| DEFAULT_OUTBOX_TABLE_NAME = "outbox" | ||
|
|
||
| MAX_FLUSH_COUNTER_VALUE = 5 | ||
|
|
||
|
|
||
| class OutboxModel(Base): | ||
| __tablename__ = DEFAULT_OUTBOX_TABLE_NAME | ||
|
|
||
| __table_args__ = ( | ||
| sqlalchemy.UniqueConstraint( | ||
| "event_id_bin", | ||
| "event_name", | ||
| name="event_id_unique_index", | ||
| ), | ||
| ) | ||
| id = sqlalchemy.Column( | ||
| sqlalchemy.BigInteger(), | ||
| class BinaryUUID(sqlalchemy.TypeDecorator): | ||
| """Stores the UUID as a native UUID in Postgres and as BINARY(16) in other databases (MySQL).""" | ||
| impl = sqlalchemy.BINARY(16) | ||
| cache_ok = True | ||
|
|
||
| def load_dialect_impl(self, dialect): | ||
| if dialect.name == "postgresql": | ||
| return dialect.type_descriptor(postgresql.UUID()) | ||
| else: | ||
| return dialect.type_descriptor(sqlalchemy.BINARY(16)) | ||
|
|
||
| def process_bind_param(self, value, dialect): | ||
| if value is None: | ||
| return value | ||
| if dialect.name == "postgresql": | ||
| return value # asyncpg work with uuid.UUID | ||
| if isinstance(value, uuid.UUID): | ||
| return value.bytes # For MySQL return 16 bytes | ||
| return value | ||
|
Comment on lines
+46
to
+53
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If a string UUID (e.g., 🛡️ Proposed fix: add a string-to-UUID conversion guard def process_bind_param(self, value, dialect):
if value is None:
return value
if dialect.name == "postgresql":
return value # asyncpg works with uuid.UUID
+ if isinstance(value, str):
+ value = uuid.UUID(value)
if isinstance(value, uuid.UUID):
return value.bytes # For MySQL return 16 bytes
return value🤖 Prompt for AI Agents |
||
|
|
||
| def process_result_value(self, value, dialect): | ||
| if value is None: | ||
| return value | ||
| if dialect.name == "postgresql": | ||
| return value # asyncpg return uuid.UUID | ||
| if isinstance(value, bytes): | ||
| return uuid.UUID(bytes=value) # From MySQL got bytes, make UUID | ||
| return value | ||
|
|
||
|
|
||
| class OutboxModelMixin: | ||
| @declared_attr.directive | ||
| def __tablename__(self) -> str: | ||
| return DEFAULT_OUTBOX_TABLE_NAME | ||
|
|
||
| id: Mapped[int] = mapped_column( | ||
| sqlalchemy.BigInteger, | ||
| sqlalchemy.Identity(), | ||
| primary_key=True, | ||
| nullable=False, | ||
| autoincrement=True, | ||
| comment="Identity", | ||
| ) | ||
| event_id = sqlalchemy.Column( | ||
| sqlalchemy.Uuid, | ||
| event_id: Mapped[uuid.UUID] = mapped_column( | ||
| BinaryUUID, | ||
| nullable=False, | ||
| comment="Event idempotency id", | ||
| ) | ||
| event_id_bin = sqlalchemy.Column( | ||
| sqlalchemy.BINARY(16), | ||
| nullable=False, | ||
| comment="Event idempotency id in 16 bit presentation", | ||
| ) | ||
| event_status = sqlalchemy.Column( | ||
| event_status: Mapped[repository.EventStatus] = mapped_column( | ||
| sqlalchemy.Enum(repository.EventStatus), | ||
| nullable=False, | ||
| default=repository.EventStatus.NEW, | ||
| comment="Event producing status", | ||
| ) | ||
| flush_counter = sqlalchemy.Column( | ||
| sqlalchemy.SmallInteger(), | ||
| flush_counter: Mapped[int] = mapped_column( | ||
| sqlalchemy.SmallInteger, | ||
| nullable=False, | ||
| default=0, | ||
| comment="Event producing flush counter", | ||
| ) | ||
| event_name = sqlalchemy.Column( | ||
| event_name: Mapped[typing.Text] = mapped_column( | ||
| sqlalchemy.String(255), | ||
| nullable=False, | ||
| comment="Event name", | ||
| ) | ||
| topic = sqlalchemy.Column( | ||
| topic: Mapped[typing.Text] = mapped_column( | ||
| sqlalchemy.String(255), | ||
| nullable=False, | ||
| comment="Event topic", | ||
| default="", | ||
| comment="Event topic", | ||
| ) | ||
| created_at = sqlalchemy.Column( | ||
| created_at: Mapped[datetime.datetime] = mapped_column( | ||
| sqlalchemy.DateTime, | ||
| nullable=False, | ||
| server_default=func.now(), | ||
| comment="Event creation timestamp", | ||
| ) | ||
| payload = sqlalchemy.Column( | ||
| mysql.BLOB, | ||
| payload: Mapped[bytes] = mapped_column( | ||
| sqlalchemy.LargeBinary, | ||
| nullable=False, | ||
| default={}, | ||
| comment="Event payload", | ||
| ) | ||
|
|
||
| @declared_attr | ||
| def __table_args__(self): | ||
| return ( | ||
| sqlalchemy.UniqueConstraint( | ||
| "event_id", | ||
| "event_name", | ||
| name="event_id_unique_index", | ||
| ), | ||
| ) | ||
|
|
||
| def row_to_dict(self) -> typing.Dict[typing.Text, typing.Any]: | ||
| return { | ||
| column.name: getattr(self, column.name) for column in self.__table__.columns | ||
|
|
@@ -153,10 +185,12 @@ class SqlAlchemyOutboxedEventRepository(repository.OutboxedEventRepository): | |
| def __init__( | ||
| self, | ||
| session: sql_session.AsyncSession, | ||
| outbox_model: type[OutboxModelMixin], | ||
| compressor: compressors.Compressor | None = None, | ||
| ): | ||
| self.session = session | ||
| self._compressor = compressor | ||
| self._outbox_model = outbox_model | ||
|
|
||
| def add( | ||
| self, | ||
|
|
@@ -176,17 +210,16 @@ def add( | |
| bytes_payload = self._compressor.compress(bytes_payload) | ||
|
|
||
| self.session.add( | ||
| OutboxModel( | ||
| self._outbox_model( | ||
| event_id=event.event_id, | ||
| event_id_bin=func.UUID_TO_BIN(event.event_id), | ||
| event_name=event.event_name, | ||
| created_at=event.event_timestamp, | ||
| payload=bytes_payload, | ||
| topic=event.topic, | ||
| ), | ||
| ) | ||
|
|
||
| def _process_events(self, model: OutboxModel) -> repository.OutboxedEvent | None: | ||
| def _process_events(self, model: OutboxModelMixin) -> repository.OutboxedEvent | None: | ||
| event_dict = model.row_to_dict() | ||
|
|
||
| event_model = map.OutboxedEventMap.get(event_dict["event_name"]) | ||
|
|
@@ -211,8 +244,8 @@ async def get_many( | |
| batch_size: int = 100, | ||
| topic: typing.Text | None = None, | ||
| ) -> typing.List[repository.OutboxedEvent]: | ||
| events: typing.Sequence[OutboxModel] = ( | ||
| (await self.session.execute(OutboxModel.get_batch_query(batch_size, topic))) | ||
| events: typing.Sequence[OutboxModelMixin] = ( | ||
| (await self.session.execute(self._outbox_model.get_batch_query(batch_size, topic))) | ||
| .scalars() | ||
| .all() | ||
| ) | ||
|
|
@@ -233,7 +266,7 @@ async def update_status( | |
| new_status: repository.EventStatus, | ||
| ) -> None: | ||
| await self.session.execute( | ||
| statement=OutboxModel.update_status_query(outboxed_event_id, new_status), | ||
| statement=self._outbox_model.update_status_query(outboxed_event_id, new_status), | ||
| ) | ||
|
|
||
| async def commit(self): | ||
|
|
@@ -243,6 +276,7 @@ async def rollback(self): | |
| await self.session.rollback() | ||
|
|
||
|
|
||
| @warnings.deprecated() | ||
| def rebind_outbox_model( | ||
| model: typing.Any, | ||
| new_base: DeclarativeMeta, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.