-
-
Notifications
You must be signed in to change notification settings - Fork 399
Expand file tree
/
Copy path_utils.py
More file actions
180 lines (143 loc) · 5.2 KB
/
_utils.py
File metadata and controls
180 lines (143 loc) · 5.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
from __future__ import annotations
import re
from pathlib import Path
from typing import TYPE_CHECKING, TypeVar
from zarr.abc.store import OffsetByteRequest, RangeByteRequest, SuffixByteRequest
if TYPE_CHECKING:
from collections.abc import Iterable, Mapping
from zarr.abc.store import ByteRequest
from zarr.core.buffer import Buffer
def _normalize_prefix(prefix: str) -> str:
"""Normalize a store prefix to ensure it has a trailing slash.
This ensures that prefix matching uses directory-like semantics,
so that e.g. prefix "a" does not match keys under "a_extra/".
"""
if prefix != "" and not prefix.endswith("/"):
return prefix + "/"
return prefix
def normalize_path(path: str | bytes | Path | None) -> str:
if path is None:
result = ""
elif isinstance(path, bytes):
result = str(path, "ascii")
# handle pathlib.Path
elif isinstance(path, Path):
result = str(path)
elif isinstance(path, str):
result = path
else:
raise TypeError(f'Object {path} has an invalid type for "path": {type(path).__name__}')
# convert backslash to forward slash
result = result.replace("\\", "/")
# remove leading and trailing slashes
result = result.strip("/")
# collapse any repeated slashes
pat = re.compile(r"//+")
result = pat.sub("/", result)
# disallow path segments with just '.' or '..'
segments = result.split("/")
if any(s in {".", ".."} for s in segments):
raise ValueError(
f"The path {path!r} is invalid because its string representation contains '.' or '..' segments."
)
return result
def _normalize_byte_range_index(data: Buffer, byte_range: ByteRequest | None) -> tuple[int, int]:
"""
Convert a ByteRequest into an explicit start and stop
"""
if byte_range is None:
start = 0
stop = len(data) + 1
elif isinstance(byte_range, RangeByteRequest):
start = byte_range.start
stop = byte_range.end
elif isinstance(byte_range, OffsetByteRequest):
start = byte_range.offset
stop = len(data) + 1
elif isinstance(byte_range, SuffixByteRequest):
start = len(data) - byte_range.suffix
stop = len(data) + 1
else:
raise ValueError(f"Unexpected byte_range, got {byte_range}.")
return (start, stop)
def _join_paths(paths: Iterable[str]) -> str:
"""
Filter out instances of '' and join the remaining strings with '/'.
Parameters
----------
paths : Iterable[str]
Returns
-------
str
Examples
--------
```python
from zarr.storage._utils import _join_paths
_join_paths(["", "a", "b"])
# 'a/b'
_join_paths(["a", "b", "c"])
# 'a/b/c'
```
"""
return "/".join(filter(lambda v: v != "", paths))
def _relativize_path(*, path: str, prefix: str) -> str:
"""
Make a "/"-delimited path relative to some prefix. If the prefix is '', then the path is
returned as-is. Otherwise, the prefix is removed from the path as well as the separator
string "/".
If ``prefix`` is not the empty string and ``path`` does not start with ``prefix``
followed by a "/" character, then an error is raised.
This function assumes that the prefix does not end with "/".
Parameters
----------
path : str
The path to make relative to the prefix.
prefix : str
The prefix to make the path relative to.
Returns
-------
str
Examples
--------
```python
from zarr.storage._utils import _relativize_path
_relativize_path(path="a/b", prefix="")
# 'a/b'
_relativize_path(path="a/b/c", prefix="a/b")
# 'c'
```
"""
if prefix == "":
return path
else:
_prefix = prefix + "/"
if not path.startswith(_prefix):
raise ValueError(f"The first component of {path} does not start with {prefix}.")
return path.removeprefix(f"{prefix}/")
def _normalize_paths(paths: Iterable[str]) -> tuple[str, ...]:
"""
Normalize the input paths according to the normalization scheme used for zarr node paths.
If any two paths normalize to the same value, raise a ValueError.
"""
path_map: dict[str, str] = {}
for path in paths:
parsed = normalize_path(path)
if parsed in path_map:
msg = (
f"After normalization, the value '{path}' collides with '{path_map[parsed]}'. "
f"Both '{path}' and '{path_map[parsed]}' normalize to the same value: '{parsed}'. "
f"You should use either '{path}' or '{path_map[parsed]}', but not both."
)
raise ValueError(msg)
path_map[parsed] = path
return tuple(path_map.keys())
T = TypeVar("T")
def _normalize_path_keys(data: Mapping[str, T]) -> dict[str, T]:
"""
Normalize the keys of the input dict according to the normalization scheme used for zarr node
paths. If any two keys in the input normalize to the same value, raise a ValueError.
Returns a dict where the keys are the elements of the input and the values are the
normalized form of each key.
"""
parsed_keys = _normalize_paths(data.keys())
return dict(zip(parsed_keys, data.values(), strict=True))