Skip to content
This repository was archived by the owner on May 6, 2026. It is now read-only.

Commit f3aa027

Browse files
authored
feat: Transaction propagation using ndb.TransactionOptions (#537)
1 parent 49be23b commit f3aa027

5 files changed

Lines changed: 422 additions & 15 deletions

File tree

google/cloud/ndb/_transaction.py

Lines changed: 141 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,118 @@
2323
log = logging.getLogger(__name__)
2424

2525

26+
class _Propagation(object):
27+
"""This class aims to emulate the same behaviour as was provided by the old
28+
Datastore RPC library.
29+
30+
https://cloud.google.com/appengine/docs/standard/python/ndb/functions#context_options
31+
32+
It provides limited support for transactions within transactions. It has a
33+
single public method func:`handle_propagation`.
34+
35+
Args:
36+
propagation (int): The desired `propagation` option, corresponding
37+
to a class:`TransactionOptions` option.
38+
join (:obj:`bool`, optional): If the provided join argument must be
39+
changed to conform to the requested propagation option then a
40+
warning will be emitted. If it is not provided, it will be set
41+
according to the propagation option but no warning is emitted.
42+
"""
43+
44+
def __init__(self, propagation, join=None):
45+
# Avoid circular import in Python 2.7
46+
from google.cloud.ndb import context as context_module
47+
48+
propagation_options = context_module.TransactionOptions._PROPAGATION
49+
if propagation is None or propagation in propagation_options:
50+
self.propagation = propagation
51+
else:
52+
raise ValueError(
53+
"Unexpected value for propagation. Got: {}. Expected one of: "
54+
"{}".format(propagation, propagation_options)
55+
)
56+
57+
propagation_names = context_module.TransactionOptions._INT_TO_NAME
58+
self.propagation_name = propagation_names.get(self.propagation)
59+
60+
self.join = join
61+
joinable_options = context_module.TransactionOptions._JOINABLE
62+
self.joinable = propagation in joinable_options
63+
64+
def _handle_nested(self):
65+
"""The NESTED propagation policy would commit all changes in the outer
66+
and inner transactions together when the outer policy commits. However,
67+
if an exception is thrown in the inner transaction all changes there
68+
would get thrown out but allow the outer transaction to optionally
69+
recover and continue. The NESTED policy is not supported. If you use
70+
this policy, your code will throw a BadRequestError exception.
71+
"""
72+
raise exceptions.BadRequestError("Nested transactions are not supported.")
73+
74+
def _handle_mandatory(self):
75+
"""Always propagate an existing transaction; throw an exception if
76+
there is no existing transaction. If a function that uses this policy
77+
throws an exception, it's probably not safe to catch the exception and
78+
commit the outer transaction; the function may have left the outer
79+
transaction in a bad state.
80+
"""
81+
if not in_transaction():
82+
raise exceptions.BadRequestError("Requires an existing transaction.")
83+
84+
def _handle_allowed(self):
85+
"""If there is an existing transaction, propagate it. If a function
86+
that uses this policy throws an exception, it's probably not safe to
87+
catch the exception and commit the outer transaction; the function may
88+
have left the outer transaction in a bad state.
89+
"""
90+
# no special handling needed.
91+
pass
92+
93+
def _handle_independent(self):
94+
"""Always use a new transaction, "pausing" any existing transactions.
95+
A function that uses this policy should not return any entities read in
96+
the new transaction, as the entities are not transactionally consistent
97+
with the caller's transaction.
98+
"""
99+
if in_transaction():
100+
# Avoid circular import in Python 2.7
101+
from google.cloud.ndb import context as context_module
102+
103+
context = context_module.get_context()
104+
new_context = context.new(transaction=None)
105+
return new_context
106+
107+
def _handle_join(self):
108+
change_to = self.joinable
109+
if self.join != change_to:
110+
if self.join is not None:
111+
logging.warning(
112+
"Modifying join behaviour to maintain old NDB behaviour. "
113+
"Setting join to {} for propagation value: {} ({})".format(
114+
change_to, self.propagation, self.propagation_name
115+
)
116+
)
117+
self.join = change_to
118+
119+
def handle_propagation(self):
120+
"""Ensure the conditions needed to maintain legacy NDB behaviour are
121+
met.
122+
123+
Returns:
124+
Context: A new :class:`Context` instance that should be
125+
used to run the transaction in or :data:`None` if the
126+
transaction should run in the existing :class:`Context`.
127+
bool: :data:`True` if the new transaction is to be joined to an
128+
existing one otherwise :data:`False`.
129+
"""
130+
context = None
131+
if self.propagation:
132+
# ensure we use the correct joining method.
133+
context = getattr(self, "_handle_{}".format(self.propagation_name))()
134+
self._handle_join()
135+
return context, self.join
136+
137+
26138
def in_transaction():
27139
"""Determine if there is a currently active transaction.
28140
@@ -58,9 +170,10 @@ def transaction(
58170
xg (bool): Enable cross-group transactions. This argument is included
59171
for backwards compatibility reasons and is ignored. All Datastore
60172
transactions are cross-group, up to 25 entity groups, all the time.
61-
propagation (Any): Deprecated, will raise `NotImplementedError` if
62-
passed. Transaction propagation was a feature of the old Datastore
63-
RPC library and is no longer available.
173+
propagation (int): An element from :class:`ndb.TransactionOptions`.
174+
This parameter controls what happens if you try to start a new
175+
transaction within an existing transaction. If this argument is
176+
provided, the `join` argument will be ignored.
64177
"""
65178
future = transaction_async(
66179
callback,
@@ -80,6 +193,25 @@ def transaction_async(
80193
join=False,
81194
xg=True,
82195
propagation=None,
196+
):
197+
new_context, join = _Propagation(propagation, join).handle_propagation()
198+
args = (callback, retries, read_only, join, xg, None)
199+
if new_context is None:
200+
transaction_return_value = transaction_async_(*args)
201+
else:
202+
with new_context.use() as context:
203+
transaction_return_value = transaction_async_(*args)
204+
context.flush()
205+
return transaction_return_value
206+
207+
208+
def transaction_async_(
209+
callback,
210+
retries=_retry._DEFAULT_RETRIES,
211+
read_only=False,
212+
join=False,
213+
xg=True,
214+
propagation=None,
83215
):
84216
"""Run a callback in a transaction.
85217
@@ -321,17 +453,18 @@ def non_transactional(allow_existing=True):
321453
def non_transactional_wrapper(wrapped):
322454
@functools.wraps(wrapped)
323455
def non_transactional_inner_wrapper(*args, **kwargs):
324-
from . import context
456+
# Avoid circular import in Python 2.7
457+
from google.cloud.ndb import context as context_module
325458

326-
ctx = context.get_context()
327-
if not ctx.in_transaction():
459+
context = context_module.get_context()
460+
if not context.in_transaction():
328461
return wrapped(*args, **kwargs)
329462
if not allow_existing:
330463
raise exceptions.BadRequestError(
331464
"{} cannot be called within a transaction".format(wrapped.__name__)
332465
)
333-
new_ctx = ctx.new(transaction=None)
334-
with new_ctx.use():
466+
new_context = context.new(transaction=None)
467+
with new_context.use():
335468
return wrapped(*args, **kwargs)
336469

337470
return non_transactional_inner_wrapper

google/cloud/ndb/context.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -625,8 +625,19 @@ def __init__(self, *args, **kwargs):
625625

626626

627627
class TransactionOptions(object):
628-
def __init__(self, *args, **kwargs):
629-
raise exceptions.NoLongerImplementedError()
628+
NESTED = 1 # join=False
629+
MANDATORY = 2 # join=True
630+
ALLOWED = 3 # join=True
631+
INDEPENDENT = 4 # join=False
632+
633+
_PROPAGATION = frozenset((NESTED, MANDATORY, ALLOWED, INDEPENDENT))
634+
_JOINABLE = frozenset((MANDATORY, ALLOWED))
635+
_INT_TO_NAME = {
636+
NESTED: "nested",
637+
MANDATORY: "mandatory",
638+
ALLOWED: "allowed",
639+
INDEPENDENT: "independent",
640+
}
630641

631642

632643
class AutoBatcher(object):

tests/unit/test__datastore_query.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1489,6 +1489,21 @@ def test__compare_no_order_by():
14891489
with pytest.raises(NotImplementedError):
14901490
result._compare("other")
14911491

1492+
@staticmethod
1493+
def test__compare_with_order_by():
1494+
result = _datastore_query._Result(
1495+
None,
1496+
mock.Mock(
1497+
cursor=b"123",
1498+
spec=("cursor",),
1499+
),
1500+
[
1501+
query_module.PropertyOrder("foo"),
1502+
query_module.PropertyOrder("bar", reverse=True),
1503+
],
1504+
)
1505+
assert result._compare("other") == NotImplemented
1506+
14921507
@staticmethod
14931508
@mock.patch("google.cloud.ndb._datastore_query.model")
14941509
def test_entity_unsupported_result_type(model):

0 commit comments

Comments
 (0)