-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy pathtest_uow.py
More file actions
160 lines (130 loc) · 5.2 KB
/
Copy pathtest_uow.py
File metadata and controls
160 lines (130 loc) · 5.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# pylint: disable=broad-except
import threading
import time
import traceback
from typing import List
import pytest
from sqlalchemy.orm.exc import StaleDataError
from allocation.domain import model
from allocation.service_layer import unit_of_work
from ..random_refs import random_batchref, random_orderid, random_sku
def insert_batch(session, ref, sku, qty, eta, product_version=1):
session.execute(
"INSERT INTO products (sku, version_number) VALUES (:sku, :version)",
dict(sku=sku, version=product_version),
)
session.execute(
"INSERT INTO batches (reference, sku, _purchased_quantity, eta)"
" VALUES (:ref, :sku, :qty, :eta)",
dict(ref=ref, sku=sku, qty=qty, eta=eta),
)
def get_allocated_batch_ref(session, orderid, sku):
[[orderlineid]] = session.execute(
"SELECT id FROM order_lines WHERE orderid=:orderid AND sku=:sku",
dict(orderid=orderid, sku=sku),
)
[[batchref]] = session.execute(
"SELECT b.reference FROM allocations JOIN batches AS b ON batch_id = b.id"
" WHERE orderline_id=:orderlineid",
dict(orderlineid=orderlineid),
)
return batchref
def test_uow_can_retrieve_a_batch_and_allocate_to_it(session_factory):
session = session_factory()
insert_batch(session, "batch1", "HIPSTER-WORKBENCH", 100, None)
session.commit()
uow = unit_of_work.SqlAlchemyUnitOfWork(session_factory)
with uow:
product = uow.products.get(sku="HIPSTER-WORKBENCH")
line = model.OrderLine("o1", "HIPSTER-WORKBENCH", 10)
product.allocate(line)
uow.commit()
[[version_after]] = session.execute(
"SELECT version_number FROM products WHERE sku=:sku",
dict(sku="HIPSTER-WORKBENCH"),
)
assert version_after == 2
batchref = get_allocated_batch_ref(session, "o1", "HIPSTER-WORKBENCH")
assert batchref == "batch1"
def test_commit_fails_with_stale_version_when_row_changed_elsewhere(session_factory):
sku = "OPTIMISTIC-SKU"
session = session_factory()
insert_batch(session, "batch1", sku, 100, None)
session.commit()
uow_a = unit_of_work.SqlAlchemyUnitOfWork(session_factory)
uow_b = unit_of_work.SqlAlchemyUnitOfWork(session_factory)
uow_a.__enter__()
uow_b.__enter__()
try:
product_a = uow_a.products.get(sku=sku)
product_b = uow_b.products.get(sku=sku)
assert product_a.version_number == product_b.version_number == 1
product_a.allocate(model.OrderLine("order-a", sku, 10))
uow_a.commit()
product_b.allocate(model.OrderLine("order-b", sku, 10))
with pytest.raises(StaleDataError):
uow_b.commit()
finally:
uow_b.__exit__(None, None, None)
uow_a.__exit__(None, None, None)
def test_rolls_back_uncommitted_work_by_default(session_factory):
uow = unit_of_work.SqlAlchemyUnitOfWork(session_factory)
with uow:
insert_batch(uow.session, "batch1", "MEDIUM-PLINTH", 100, None)
new_session = session_factory()
rows = list(new_session.execute('SELECT * FROM "batches"'))
assert rows == []
def test_rolls_back_on_error(session_factory):
class MyException(Exception):
pass
uow = unit_of_work.SqlAlchemyUnitOfWork(session_factory)
with pytest.raises(MyException):
with uow:
insert_batch(uow.session, "batch1", "LARGE-FORK", 100, None)
raise MyException()
new_session = session_factory()
rows = list(new_session.execute('SELECT * FROM "batches"'))
assert rows == []
def try_to_allocate(orderid, sku, exceptions):
line = model.OrderLine(orderid, sku, 10)
try:
with unit_of_work.SqlAlchemyUnitOfWork() as uow:
product = uow.products.get(sku=sku)
product.allocate(line)
time.sleep(0.2)
uow.commit()
except Exception as e:
print(traceback.format_exc())
exceptions.append(e)
def test_concurrent_updates_to_version_are_not_allowed(postgres_session_factory):
sku, batch = random_sku(), random_batchref()
session = postgres_session_factory()
insert_batch(session, batch, sku, 100, eta=None, product_version=1)
session.commit()
order1, order2 = random_orderid(1), random_orderid(2)
exceptions = [] # type: List[Exception]
try_to_allocate_order1 = lambda: try_to_allocate(order1, sku, exceptions)
try_to_allocate_order2 = lambda: try_to_allocate(order2, sku, exceptions)
thread1 = threading.Thread(target=try_to_allocate_order1)
thread2 = threading.Thread(target=try_to_allocate_order2)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
[[version]] = session.execute(
"SELECT version_number FROM products WHERE sku=:sku",
dict(sku=sku),
)
assert version == 2
[exception] = exceptions
assert isinstance(exception, StaleDataError)
orders = session.execute(
"SELECT orderid FROM allocations"
" JOIN batches ON allocations.batch_id = batches.id"
" JOIN order_lines ON allocations.orderline_id = order_lines.id"
" WHERE order_lines.sku=:sku",
dict(sku=sku),
)
assert orders.rowcount == 1
with unit_of_work.SqlAlchemyUnitOfWork() as uow:
uow.session.execute("select 1")