-
-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy path_pattern.py
More file actions
112 lines (92 loc) · 3.19 KB
/
_pattern.py
File metadata and controls
112 lines (92 loc) · 3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from typing import List
from typing import Optional
try:
import regex as re
REGEX_AVAILABLE = True
except ImportError:
import re # type: ignore
REGEX_AVAILABLE = False
try:
from iregexp_check import check
IREGEXP_AVAILABLE = True
except ImportError:
IREGEXP_AVAILABLE = False
from jsonpath.exceptions import JSONPathError
from jsonpath.function_extensions import ExpressionType
from jsonpath.function_extensions import FilterFunction
from jsonpath.lru_cache import LRUCache
from jsonpath.lru_cache import ThreadSafeLRUCache
class AbstractRegexFilterFunction(FilterFunction):
"""Base class for filter function that accept regular expression arguments.
Arguments:
cache_capacity: The size of the regular expression cache.
debug: When `True`, raise an exception when regex pattern compilation
fails. The default - as required by RFC 9535 - is `False`, which
silently ignores bad patterns.
thread_safe: When `True`, use a `ThreadSafeLRUCache` instead of an
instance of `LRUCache`.
"""
arg_types = [ExpressionType.VALUE, ExpressionType.VALUE]
return_type = ExpressionType.LOGICAL
def __init__(
self,
*,
cache_capacity: int = 300,
debug: bool = False,
thread_safe: bool = False,
):
self.cache: LRUCache[str, Optional[re.Pattern]] = ( # type: ignore
ThreadSafeLRUCache(capacity=cache_capacity)
if thread_safe
else LRUCache(capacity=cache_capacity)
)
self.debug = debug
def check_cache(self, pattern: str) -> Optional[re.Pattern]: # type: ignore
"""Return a compiled re pattern if `pattern` is valid, or `None` otherwise."""
try:
_pattern = self.cache[pattern]
except KeyError:
if IREGEXP_AVAILABLE and not check(pattern):
if self.debug:
raise JSONPathError(
"search pattern is not a valid I-Regexp", token=None
) from None
_pattern = None
else:
if REGEX_AVAILABLE:
pattern = map_re(pattern)
try:
_pattern = re.compile(pattern)
except re.error:
if self.debug:
raise
_pattern = None
self.cache[pattern] = _pattern
return _pattern
def map_re(pattern: str) -> str:
"""Convert an I-Regexp pattern into a Python re pattern."""
escaped = False
char_class = False
parts: List[str] = []
for ch in pattern:
if escaped:
parts.append(ch)
escaped = False
continue
if ch == ".":
if not char_class:
parts.append(r"(?:(?![\r\n])\P{Cs}|\p{Cs}\p{Cs})")
else:
parts.append(ch)
elif ch == "\\":
escaped = True
parts.append(ch)
elif ch == "[":
char_class = True
parts.append(ch)
elif ch == "]":
char_class = False
parts.append(ch)
else:
parts.append(ch)
return "".join(parts)