Skip to content
This repository was archived by the owner on May 6, 2026. It is now read-only.

Commit 1629805

Browse files
author
Chris Rossi
authored
feat: record time spent waiting on rpc calls (#472)
Two new attributes have been added to the top level context object, which can be used in looking at performance of underlying Datastore calls. `rpc_time` records the total amount of time all rpc calls made in that context have taken. `wait_time` records the total amount of time spent waiting (blocking) for rpc calls to complete. Because of parallelism, `wait_time` should be less than `rpc_time`, although in practice they're almost always quite close.
1 parent ab460fa commit 1629805

7 files changed

Lines changed: 106 additions & 44 deletions

File tree

google/cloud/ndb/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from google.cloud.ndb.context import Context
3131
from google.cloud.ndb.context import ContextOptions
3232
from google.cloud.ndb.context import get_context
33+
from google.cloud.ndb.context import get_toplevel_context
3334
from google.cloud.ndb.context import TransactionOptions
3435
from google.cloud.ndb._datastore_api import EVENTUAL
3536
from google.cloud.ndb._datastore_api import EVENTUAL_CONSISTENCY
@@ -218,6 +219,7 @@
218219
"add_flow_exception",
219220
"Future",
220221
"get_context",
222+
"get_toplevel_context",
221223
"make_context",
222224
"make_default_context",
223225
"QueueFuture",

google/cloud/ndb/_datastore_api.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ def make_call(rpc_name, request, retries=None, timeout=None):
8282

8383
@tasklets.tasklet
8484
def rpc_call():
85+
context = context_module.get_toplevel_context()
86+
8587
call = method.future(request, timeout=timeout)
8688
rpc = _remote.RemoteCall(call, "{}({})".format(rpc_name, request))
8789
log.debug(rpc)
@@ -93,6 +95,8 @@ def rpc_call():
9395
if isinstance(error, grpc.Call):
9496
error = core_exceptions.from_grpc_error(error)
9597
raise error
98+
finally:
99+
context.rpc_time += rpc.elapsed_time
96100

97101
raise tasklets.Return(result)
98102

google/cloud/ndb/_eventloop.py

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
This should handle both asynchronous ``ndb`` objects and arbitrary callbacks.
1818
"""
1919
import collections
20+
import logging
2021
import uuid
2122
import time
2223

@@ -26,23 +27,8 @@
2627
except ImportError: # pragma: NO PY3 COVER
2728
import Queue as queue
2829

29-
__all__ = [
30-
"add_idle",
31-
"call_soon",
32-
"EventLoop",
33-
"get_event_loop",
34-
"queue_call",
35-
"queue_rpc",
36-
"run",
37-
"run1",
38-
]
39-
40-
41-
def _logging_debug(*args, **kw):
42-
"""Placeholder.
43-
44-
See #6360."""
4530

31+
log = logging.getLogger(__name__)
4632

4733
_Event = collections.namedtuple(
4834
"_Event", ("when", "callback", "args", "kwargs")
@@ -149,21 +135,21 @@ def clear(self):
149135
idlers = self.idlers
150136
queue = self.queue
151137
rpcs = self.rpcs
152-
_logging_debug("Clearing stale EventLoop instance...")
138+
log.debug("Clearing stale EventLoop instance...")
153139
if current:
154-
_logging_debug(" current = %s", current)
140+
log.debug(" current = %s", current)
155141
if idlers:
156-
_logging_debug(" idlers = %s", idlers)
142+
log.debug(" idlers = %s", idlers)
157143
if queue:
158-
_logging_debug(" queue = %s", queue)
144+
log.debug(" queue = %s", queue)
159145
if rpcs:
160-
_logging_debug(" rpcs = %s", rpcs)
146+
log.debug(" rpcs = %s", rpcs)
161147
self.__init__()
162148
current.clear()
163149
idlers.clear()
164150
queue[:] = []
165151
rpcs.clear()
166-
_logging_debug("Cleared")
152+
log.debug("Cleared")
167153

168154
def insort_event_right(self, event):
169155
"""Insert event in queue with sorting.
@@ -267,12 +253,12 @@ def run_idle(self):
267253
return False
268254
idler = self.idlers.popleft()
269255
callback, args, kwargs = idler
270-
_logging_debug("idler: %s", callback.__name__)
256+
log.debug("idler: %s", callback.__name__)
271257
result = callback(*args, **kwargs)
272258

273259
# See add_idle() for meaning of callback return value.
274260
if result is None:
275-
_logging_debug("idler %s removed", callback.__name__)
261+
log.debug("idler %s removed", callback.__name__)
276262
else:
277263
if result:
278264
self.inactive = 0
@@ -292,7 +278,6 @@ def _run_current(self):
292278

293279
self.inactive = 0
294280
callback, args, kwargs = self.current.popleft()
295-
_logging_debug("nowevent: %s", callback.__name__)
296281
callback(*args, **kwargs)
297282
return True
298283

@@ -312,15 +297,24 @@ def run0(self):
312297
if delay <= 0:
313298
self.inactive = 0
314299
_, callback, args, kwargs = self.queue.pop(0)
315-
_logging_debug("event: %s", callback.__name__)
300+
log.debug("event: %s", callback.__name__)
316301
callback(*args, **kwargs)
317302
return 0
318303

319304
if self.rpcs:
305+
# Avoid circular import
306+
from google.cloud.ndb import context as context_module
307+
308+
context = context_module.get_toplevel_context()
309+
320310
# This potentially blocks, waiting for an rpc to finish and put its
321311
# result on the queue. Functionally equivalent to the ``wait_any``
322312
# call that was used here in legacy NDB.
313+
start_time = time.time()
323314
rpc_id, rpc = self.rpc_results.get()
315+
elapsed = time.time() - start_time
316+
log.debug("Blocked for {}s awaiting RPC results.".format(elapsed))
317+
context.wait_time += elapsed
324318

325319
callback = self.rpcs.pop(rpc_id)
326320
callback(rpc)

google/cloud/ndb/_remote.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
# In its own module to avoid circular import between _datastore_api and
1818
# tasklets modules.
1919
import grpc
20+
import time
2021

2122
from google.cloud.ndb import exceptions
2223

@@ -39,6 +40,13 @@ class RemoteCall(object):
3940
def __init__(self, future, info):
4041
self.future = future
4142
self.info = info
43+
self.start_time = time.time()
44+
self.elapsed_time = 0
45+
46+
def record_time(future):
47+
self.elapsed_time = time.time() - self.start_time
48+
49+
future.add_done_callback(record_time)
4250

4351
def __repr__(self):
4452
return self.info

google/cloud/ndb/context.py

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,12 @@
2626
from google.cloud.ndb import tasklets
2727

2828

29-
__all__ = [
30-
"AutoBatcher",
31-
"Context",
32-
"ContextOptions",
33-
"get_context",
34-
"TransactionOptions",
35-
]
36-
37-
3829
class _LocalState(threading.local):
3930
"""Thread local state."""
4031

4132
def __init__(self):
4233
self.context = None
34+
self.toplevel_context = None
4335

4436

4537
_state = _LocalState()
@@ -73,6 +65,40 @@ def get_context(raise_context_error=True):
7365
raise exceptions.ContextError()
7466

7567

68+
def get_toplevel_context(raise_context_error=True):
69+
"""Get the current top level context.
70+
71+
This function should be called within a context established by
72+
:meth:`google.cloud.ndb.client.Client.context`.
73+
74+
The toplevel context is the context created by the call to
75+
:meth:`google.cloud.ndb.client.Client.context`. At times, this context will
76+
be superceded by subcontexts, which are used, for example, during
77+
transactions. This function will always return the top level context
78+
regardless of whether one of these subcontexts is the current one.
79+
80+
Args:
81+
raise_context_error (bool): If set to :data:`True`, will raise an
82+
exception if called outside of a context. Set this to :data:`False`
83+
in order to have it just return :data:`None` if called outside of a
84+
context. Default: :data:`True`
85+
86+
Returns:
87+
Context: The current context.
88+
89+
Raises:
90+
.ContextError: If called outside of a context
91+
established by :meth:`google.cloud.ndb.client.Client.context` and
92+
``raise_context_error`` is :data:`True`.
93+
"""
94+
context = _state.toplevel_context
95+
if context:
96+
return context
97+
98+
if raise_context_error:
99+
raise exceptions.ContextError()
100+
101+
76102
def _default_policy(attr_name, value_type):
77103
"""Factory for producing default policies.
78104
@@ -192,6 +218,8 @@ def __new__(
192218
datastore_policy=None,
193219
on_commit_callbacks=None,
194220
legacy_data=True,
221+
rpc_time=None,
222+
wait_time=None,
195223
):
196224
# Prevent circular import in Python 2.7
197225
from google.cloud.ndb import _cache
@@ -253,11 +281,17 @@ def use(self):
253281
"""
254282
prev_context = _state.context
255283
_state.context = self
284+
if not prev_context:
285+
_state.toplevel_context = self
286+
self.rpc_time = 0
287+
self.wait_time = 0
256288
try:
257289
yield self
258290
finally:
259291
if prev_context:
260292
prev_context.cache.update(self.cache)
293+
else:
294+
_state.toplevel_context = None
261295
_state.context = prev_context
262296

263297
@tasklets.tasklet

tests/unit/test__eventloop.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,10 @@
2222
import grpc
2323
import pytest
2424

25-
from . import utils
26-
2725
from google.cloud.ndb import exceptions
2826
from google.cloud.ndb import _eventloop
2927

3028

31-
def test___all__():
32-
utils.verify___all__(_eventloop)
33-
34-
3529
def _Event(when=0, what="foo", args=(), kw={}):
3630
return _eventloop._Event(when, what, args, kw)
3731

@@ -258,6 +252,7 @@ def test_run0_next_now(self, time):
258252
assert len(loop.queue) == 1
259253
assert loop.inactive == 0
260254

255+
@pytest.mark.usefixtures("in_context")
261256
def test_run0_rpc(self):
262257
rpc = mock.Mock(spec=grpc.Future)
263258
callback = mock.Mock(spec=())

tests/unit/test_context.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,36 @@
2828
from google.cloud.ndb import model
2929
from google.cloud.ndb import _options
3030

31-
from . import utils
3231

32+
class Test_get_context:
33+
@staticmethod
34+
def test_in_context(in_context):
35+
assert context_module.get_context() is in_context
36+
37+
@staticmethod
38+
def test_no_context_raise():
39+
with pytest.raises(exceptions.ContextError):
40+
context_module.get_context()
41+
42+
@staticmethod
43+
def test_no_context_dont_raise():
44+
assert context_module.get_context(False) is None
3345

34-
def test___all__():
35-
utils.verify___all__(context_module)
46+
47+
class Test_get_toplevel_context:
48+
@staticmethod
49+
def test_in_context(in_context):
50+
with in_context.new().use():
51+
assert context_module.get_toplevel_context() is in_context
52+
53+
@staticmethod
54+
def test_no_context_raise():
55+
with pytest.raises(exceptions.ContextError):
56+
context_module.get_toplevel_context()
57+
58+
@staticmethod
59+
def test_no_context_dont_raise():
60+
assert context_module.get_toplevel_context(False) is None
3661

3762

3863
class TestContext:

0 commit comments

Comments
 (0)