Skip to content

Commit 22631e5

Browse files
committed
Updated to work with eventsourcing 9.4 (albeit not implemented separate tracking recorders yet, or subscriptions).
1 parent d3bab34 commit 22631e5

10 files changed

Lines changed: 124 additions & 77 deletions

File tree

Makefile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,11 @@ fmt: fmt-black fmt-isort
103103

104104
.PHONY: test
105105
test:
106-
$(POETRY) run python -m pytest $(opts) $(call tests,.)
106+
$(POETRY) run python -m unittest discover . -v
107+
108+
.PHONY: pytest
109+
pytest:
110+
$(POETRY) run pytest . -v --durations 10
107111

108112
.PHONY: build
109113
build:

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ Define an adapter for a `scoped_session` object and configure the event-sourced
189189
application using the environment variable `SQLALCHEMY_SCOPED_SESSION_TOPIC`.
190190

191191
```python
192-
from eventsourcing.application import AggregateNotFound
192+
from eventsourcing.application import AggregateNotFoundError
193193
from eventsourcing.utils import get_topic
194194
from sqlalchemy import create_engine
195195
from sqlalchemy.orm import sessionmaker, scoped_session
@@ -242,7 +242,7 @@ session.remove()
242242
try:
243243
# forgot to commit
244244
app.repository.get(aggregate.id)
245-
except AggregateNotFound:
245+
except AggregateNotFoundError:
246246
pass
247247
else:
248248
raise Exception("Expected aggregate not found")
@@ -379,7 +379,7 @@ else:
379379
with db(commit_on_exit=True):
380380
try:
381381
es_app.repository.get(aggregate.id)
382-
except AggregateNotFound:
382+
except AggregateNotFoundError:
383383
pass
384384
else:
385385
raise Exception("Expected aggregate not found")
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
from .factory import Factory # noqa
1+
from .factory import SQLAlchemyFactory # noqa

eventsourcing_sqlalchemy/factory.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
# -*- coding: utf-8 -*-
2+
from __future__ import annotations
3+
24
from typing import Optional
35

46
from eventsourcing.persistence import (
57
AggregateRecorder,
68
ApplicationRecorder,
79
InfrastructureFactory,
810
ProcessRecorder,
11+
TrackingRecorder,
912
)
1013
from eventsourcing.utils import Environment, resolve_topic, strtobool
1114
from sqlalchemy.orm import scoped_session
@@ -18,7 +21,7 @@
1821
)
1922

2023

21-
class Factory(InfrastructureFactory):
24+
class SQLAlchemyFactory(InfrastructureFactory[TrackingRecorder]):
2225
SQLALCHEMY_URL = "SQLALCHEMY_URL"
2326
SQLALCHEMY_AUTOFLUSH = "SQLALCHEMY_AUTOFLUSH"
2427
SQLALCHEMY_CONNECTION_CREATOR_TOPIC = "SQLALCHEMY_CONNECTION_CREATOR_TOPIC"
@@ -102,6 +105,14 @@ def process_recorder(self) -> ProcessRecorder:
102105
recorder.create_table()
103106
return recorder
104107

108+
def tracking_recorder(
109+
self, tracking_recorder_class: type[TrackingRecorder] | None = None
110+
) -> TrackingRecorder:
111+
raise NotImplementedError
112+
105113
def env_create_table(self) -> bool:
106114
default = "yes"
107115
return bool(strtobool(self.env.get(self.CREATE_TABLE) or default))
116+
117+
118+
Factory = SQLAlchemyFactory

eventsourcing_sqlalchemy/recorders.py

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
Notification,
1111
ProcessRecorder,
1212
StoredEvent,
13+
Subscription,
1314
Tracking,
1415
)
1516
from sqlalchemy import Column, Table, text
@@ -173,7 +174,7 @@ def insert_events(
173174
notification_ids = self._insert_events(session, stored_events, **kwargs)
174175
return notification_ids
175176

176-
def max_notification_id(self) -> int:
177+
def max_notification_id(self) -> int | None:
177178
try:
178179
with self.transaction(commit=False) as session:
179180
# record_class = cast(Type[StoredEventRecord], self.events_record_cls)
@@ -183,20 +184,26 @@ def max_notification_id(self) -> int:
183184
records = q[0:1]
184185
return records[0].id
185186
except (IndexError, AssertionError):
186-
return 0
187+
return None
187188

188189
def select_notifications(
189190
self,
190-
start: int,
191+
start: int | None,
191192
limit: int,
192-
stop: Optional[int] = None,
193+
stop: int | None = None,
193194
topics: Sequence[str] = (),
194-
) -> List[Notification]:
195+
*,
196+
inclusive_of_start: bool = True,
197+
) -> list[Notification]:
195198
with self.transaction(commit=False) as session:
196199
# record_class = cast(Type[StoredEventRecord], self.events_record_cls)
197200
record_class = self.events_record_cls
198201
q = session.query(record_class)
199-
q = q.filter(record_class.id >= start)
202+
if start is not None:
203+
if inclusive_of_start:
204+
q = q.filter(record_class.id >= start)
205+
else:
206+
q = q.filter(record_class.id > start)
200207
if stop is not None:
201208
q = q.filter(record_class.id <= stop)
202209
if topics:
@@ -218,6 +225,12 @@ def select_notifications(
218225
]
219226
return notifications
220227

228+
def subscribe(
229+
self, gt: int | None = None, topics: Sequence[str] = ()
230+
) -> Subscription[ApplicationRecorder]:
231+
msg = "SQLAlchemyApplicationRecorder.subscribe() is not implemented"
232+
raise NotImplementedError(msg)
233+
221234

222235
class SQLAlchemyProcessRecorder(SQLAlchemyApplicationRecorder, ProcessRecorder):
223236
def __init__(
@@ -260,20 +273,27 @@ def _insert_events(
260273
session.add(record)
261274
return notification_ids
262275

263-
def max_tracking_id(self, application_name: str) -> int:
276+
def max_tracking_id(self, application_name: str) -> int | None:
264277
with self.transaction(commit=False) as session:
265278
q = session.query(self.tracking_record_cls)
266279
q = q.filter(self.tracking_record_cls.application_name == application_name)
267280
q = q.order_by(self.tracking_record_cls.notification_id.desc())
268281
try:
269282
max_id = q[0].notification_id
270283
except IndexError:
271-
max_id = 0
284+
max_id = None
272285
return max_id
273286

274-
def has_tracking_id(self, application_name: str, notification_id: int) -> bool:
287+
def has_tracking_id(
288+
self, application_name: str, notification_id: int | None
289+
) -> bool:
290+
if notification_id is None:
291+
return True
275292
with self.transaction(commit=False) as session:
276293
q = session.query(self.tracking_record_cls)
277294
q = q.filter(self.tracking_record_cls.application_name == application_name)
278295
q = q.filter(self.tracking_record_cls.notification_id == notification_id)
279296
return bool(q.count())
297+
298+
def insert_tracking(self, tracking: Tracking) -> None:
299+
raise NotImplementedError

poetry.lock

Lines changed: 42 additions & 44 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ python = "^3.9"
2929
#eventsourcing = { path = "../eventsourcing/", extras = ["crypto"], develop = true }
3030
#eventsourcing = { git = "https://github.com/pyeventsourcing/eventsourcing.git", branch = "main", extras = ["crypto"]}
3131
SQLAlchemy-Utils = ">=0.38.2"
32-
eventsourcing = "~9.3"
32+
eventsourcing = "~9.4"
3333
sqlalchemy = ">=1.4.26, <2.1"
3434

3535
[tool.poetry.group.dev.dependencies]
@@ -45,7 +45,7 @@ isort = "*"
4545
mypy = "*"
4646
pre-commit = "*"
4747
pre-commit-hooks = "*"
48-
eventsourcing = { version = "~9.3", extras = ["crypto"] }
48+
eventsourcing = { version = "~9.4", extras = ["crypto"] }
4949
psycopg = { version = "*", extras = ["binary", "pool"] }
5050
psycopg2-binary = "*"
5151
pytest = "*"

0 commit comments

Comments
 (0)