Skip to content

Commit 5d968c3

Browse files
committed
Merge branch 'fork/mobuild-io/SQLAlchemySubscription'
# Conflicts: # poetry.lock
2 parents b1527a6 + f1f8f2f commit 5d968c3

7 files changed

Lines changed: 641 additions & 12 deletions

File tree

eventsourcing_sqlalchemy/datastore.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22
from __future__ import annotations
33

44
import sqlite3
5+
from contextlib import contextmanager
56
from contextvars import ContextVar, Token
67
from threading import Lock, Semaphore
7-
from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union, cast
8+
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union, cast
89

10+
import psycopg
11+
import psycopg2
912
import sqlalchemy.exc
1013
from eventsourcing.persistence import (
1114
DatabaseError,
@@ -23,6 +26,7 @@
2326
from sqlalchemy.future import create_engine
2427
from sqlalchemy.orm import Session, scoped_session, sessionmaker
2528
from sqlalchemy.pool import StaticPool
29+
from typing_extensions import TypeVar
2630

2731
from eventsourcing_sqlalchemy.models import ( # type: ignore
2832
EventRecord,
@@ -263,3 +267,30 @@ def define_record_class(
263267
)
264268
cls.record_classes[record_classes_key] = (record_class, base_cls)
265269
return cast(Type[TEventRecord], record_class)
270+
271+
@contextmanager
272+
def get_connection(self) -> Iterator[Connection]:
273+
try:
274+
assert self.engine
275+
conn = self.engine.connect()
276+
yield conn
277+
except (psycopg.InterfaceError, psycopg2.InterfaceError) as e:
278+
raise InterfaceError(str(e)) from e
279+
except (psycopg.OperationalError, psycopg2.OperationalError) as e:
280+
raise OperationalError(str(e)) from e
281+
except (psycopg.DataError, psycopg2.DataError) as e:
282+
raise DataError(str(e)) from e
283+
except (psycopg.IntegrityError, psycopg2.IntegrityError) as e:
284+
raise IntegrityError(str(e)) from e
285+
except (psycopg.InternalError, psycopg2.InternalError) as e:
286+
raise InternalError(str(e)) from e
287+
except (psycopg.ProgrammingError, psycopg2.ProgrammingError) as e:
288+
raise ProgrammingError(str(e)) from e
289+
except (psycopg.NotSupportedError, psycopg2.NotSupportedError) as e:
290+
raise NotSupportedError(str(e)) from e
291+
except (psycopg.DatabaseError, psycopg2.DatabaseError) as e:
292+
raise DatabaseError(str(e)) from e
293+
except (psycopg.Error, psycopg2.Error) as e:
294+
raise PersistenceError(str(e)) from e
295+
except Exception:
296+
raise

eventsourcing_sqlalchemy/recorders.py

Lines changed: 96 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
# -*- coding: utf-8 -*-
22
from __future__ import annotations
33

4-
from typing import Any, List, Optional, Sequence, Type, cast
4+
import select
5+
import time
6+
from threading import Thread
7+
from typing import Any, Callable, List, Optional, Sequence, Type, cast
58
from uuid import UUID
69

710
from eventsourcing.persistence import (
811
AggregateRecorder,
912
ApplicationRecorder,
1013
IntegrityError,
14+
ListenNotifySubscription,
1115
Notification,
1216
ProcessRecorder,
17+
ProgrammingError,
1318
StoredEvent,
1419
Subscription,
1520
Tracking,
@@ -29,6 +34,8 @@
2934
class SQLAlchemyRecorder:
3035
"""Base class for recorders that use SQLAlchemy."""
3136

37+
POSTGRES_MAX_IDENTIFIER_LEN = 63
38+
3239
def __init__(
3340
self,
3441
datastore: SQLAlchemyDatastore,
@@ -38,6 +45,13 @@ def __init__(
3845
self.schema_name = schema_name
3946
self.tables: List[Table] = []
4047

48+
def check_identifier_length(self, table_name: str) -> None:
49+
assert self.datastore.engine is not None
50+
if self.datastore.engine.dialect.name == "postgresql":
51+
if len(table_name) > SQLAlchemyRecorder.POSTGRES_MAX_IDENTIFIER_LEN:
52+
msg = f"Identifier too long: {table_name}"
53+
raise ProgrammingError(msg)
54+
4155
def create_table(self) -> None:
4256
assert self.datastore.engine is not None
4357
for table in self.tables:
@@ -234,12 +248,12 @@ def _insert_stored_events(
234248
session.add(record)
235249
if self._has_autoincrementing_ids:
236250
session.flush() # We want the autoincremented IDs now.
251+
self._notify_channel(session)
237252
return [cast(StoredEventRecord, r).id for r in records]
238253

239254
def max_notification_id(self) -> int | None:
240255
try:
241256
with self.transaction(commit=False) as session:
242-
# record_class = cast(Type[StoredEventRecord], self.events_record_cls)
243257
record_class = self.events_record_cls
244258
q = session.query(record_class)
245259
q = q.order_by(record_class.id.desc())
@@ -258,7 +272,6 @@ def select_notifications(
258272
inclusive_of_start: bool = True,
259273
) -> list[Notification]:
260274
with self.transaction(commit=False) as session:
261-
# record_class = cast(Type[StoredEventRecord], self.events_record_cls)
262275
record_class = self.events_record_cls
263276
q = session.query(record_class)
264277
if start is not None:
@@ -290,8 +303,86 @@ def select_notifications(
290303
def subscribe(
291304
self, gt: int | None = None, topics: Sequence[str] = ()
292305
) -> Subscription[ApplicationRecorder]:
293-
msg = "SQLAlchemyApplicationRecorder.subscribe() is not implemented"
294-
raise NotImplementedError(msg)
306+
assert self.datastore.engine
307+
if self.datastore.engine.dialect.name == "postgresql":
308+
return SQLAlchemySubscription(recorder=self, gt=gt, topics=topics)
309+
else:
310+
msg = "SQLAlchemyApplicationRecorder.subscribe() is not implemented for"
311+
msg += f"{self.datastore.engine.dialect}"
312+
raise NotImplementedError(msg)
313+
314+
def _notify_channel(self, session: Session) -> None:
315+
"""
316+
Send a NOTIFY on the channel using a SQLAlchemy connection.
317+
"""
318+
assert self.datastore.engine
319+
if self.datastore.engine.dialect.name == "postgresql":
320+
# Get the raw psycopg connection
321+
cursor = session.connection().connection.cursor()
322+
cursor.execute(f"NOTIFY {self.channel_name};")
323+
324+
325+
class SQLAlchemySubscription(ListenNotifySubscription[SQLAlchemyApplicationRecorder]):
326+
def __init__(
327+
self,
328+
recorder: SQLAlchemyApplicationRecorder,
329+
gt: int | None = None,
330+
topics: Sequence[str] = (),
331+
) -> None:
332+
assert isinstance(recorder, SQLAlchemyApplicationRecorder)
333+
super().__init__(recorder=recorder, gt=gt, topics=topics)
334+
self._listen_thread = Thread(target=self._listen)
335+
self._listen_thread.start()
336+
337+
def __exit__(self, *args: object, **kwargs: Any) -> None:
338+
super().__exit__(*args, **kwargs)
339+
self._listen_thread.join()
340+
341+
def _listen(self) -> None:
342+
assert self._recorder.datastore.engine
343+
assert self._recorder.datastore.engine.dialect.name == "postgresql"
344+
notification_handler = self.__get_notification_handler()
345+
346+
try:
347+
with self._recorder.datastore.get_connection() as sa_conn:
348+
sa_conn.execution_options(isolation_level="AUTOCOMMIT")
349+
raw_conn = sa_conn.connection
350+
351+
cur = raw_conn.cursor()
352+
cur.execute(f"LISTEN {self._recorder.channel_name};")
353+
354+
while not self._has_been_stopped and not self._thread_error:
355+
if select.select([raw_conn], [], [], 0.1)[0]:
356+
notification_handler(raw_conn)
357+
else:
358+
time.sleep(0.1)
359+
360+
except BaseException as e: # noqa: B036
361+
if self._thread_error is None:
362+
self._thread_error = e
363+
self.stop()
364+
365+
def __get_notification_handler(self) -> Callable[[Any], None]:
366+
assert self._recorder.datastore.engine
367+
driver_name = self._recorder.datastore.engine.dialect.driver
368+
handlers = {
369+
"psycopg": self.__handle_psycopg_notification,
370+
"psycopg2": self.__handle_psycopg2_notification,
371+
}
372+
try:
373+
return handlers[driver_name]
374+
except KeyError as e:
375+
raise NotImplementedError(f"Unsupported driver: {driver_name}") from e
376+
377+
def __handle_psycopg_notification(self, raw_conn: Any) -> None:
378+
next(raw_conn.notifies())
379+
self._has_been_notified.set()
380+
381+
def __handle_psycopg2_notification(self, raw_conn: Any) -> None:
382+
raw_conn.poll()
383+
if raw_conn.notifies:
384+
raw_conn.notifies.pop(0)
385+
self._has_been_notified.set()
295386

296387

297388
class SQLAlchemyTrackingRecorder(SQLAlchemyRecorder, TrackingRecorder):

poetry.lock

Lines changed: 74 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ python = "^3.10.0"
2727
#eventsourcing = { path = "../eventsourcing/", extras = ["crypto"], develop = true }
2828
#eventsourcing = { git = "https://github.com/pyeventsourcing/eventsourcing.git", branch = "main", extras = ["crypto"]}
2929
SQLAlchemy-Utils = ">=0.38.2"
30-
eventsourcing = "^9.5.2"
30+
eventsourcing = "^9.5.3"
3131
sqlalchemy = ">=1.4.26, <2.1"
3232

3333
[tool.poetry.group.dev.dependencies]
@@ -40,10 +40,11 @@ flake8-coding = "*"
4040
flake8-isort = "*"
4141
flake8-tidy-imports = "*"
4242
isort = "*"
43+
msgspec = "*"
4344
mypy = "*"
4445
pre-commit = "*"
4546
pre-commit-hooks = "*"
46-
eventsourcing = { version = "^9.5.2", extras = ["crypto"] }
47+
eventsourcing = { version = "^9.5.3", extras = ["crypto"] }
4748
psycopg = { version = "*", extras = ["binary", "pool"] }
4849
psycopg2-binary = "*"
4950
pytest = "*"

0 commit comments

Comments
 (0)