-
-
Notifications
You must be signed in to change notification settings - Fork 398
Expand file tree
/
Copy path_latency.py
More file actions
86 lines (69 loc) · 2.59 KB
/
_latency.py
File metadata and controls
86 lines (69 loc) · 2.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from zarr.abc.store import ByteRequest, Store
from zarr.storage._wrapper import WrapperStore
if TYPE_CHECKING:
from zarr.core.buffer import Buffer, BufferPrototype
class LatencyStore(WrapperStore[Store]):
"""
A wrapper class that takes any store class in its constructor and
adds latency to the `set` and `get` methods. This can be used for
performance testing.
Particularly useful for testing downstream applications which will
interact with a high-latency zarr store implementation,
such as one which read from or writes to remote object storage.
For example, by using this class to wrap a ``MemoryStore`` instance,
you can (crudely) simulate the latency of reading and writing from S3
without having to actually use the network, or a mock like MinIO.
Parameters
----------
store : Store
Store to wrap
get_latency : float
Amount of latency to add to each get call, in seconds. Default is 0.
set_latency : float
Amount of latency to add to each set call, in seconds. Default is 0.
"""
get_latency: float
set_latency: float
def __init__(self, cls: Store, *, get_latency: float = 0, set_latency: float = 0) -> None:
self.get_latency = float(get_latency)
self.set_latency = float(set_latency)
self._store = cls
async def set(self, key: str, value: Buffer) -> None:
"""
Add latency to the ``set`` method.
Calls ``asyncio.sleep(self.set_latency)`` before invoking the wrapped ``set`` method.
Parameters
----------
key : str
The key to set
value : Buffer
The value to set
Returns
-------
None
"""
await asyncio.sleep(self.set_latency)
await self._store.set(key, value)
async def get(
self, key: str, prototype: BufferPrototype, byte_range: ByteRequest | None = None
) -> Buffer | None:
"""
Add latency to the ``get`` method.
Calls ``asyncio.sleep(self.get_latency)`` before invoking the wrapped ``get`` method.
Parameters
----------
key : str
The key to get
prototype : BufferPrototype
The BufferPrototype to use.
byte_range : ByteRequest, optional
An optional byte range.
Returns
-------
buffer : Buffer or None
"""
await asyncio.sleep(self.get_latency)
return await self._store.get(key, prototype=prototype, byte_range=byte_range)