Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ These breaking changes apply to Python JSONPath in its default configuration. We

- Added the `startswith(value, prefix)` function extension. `startswith` returns `True` if both arguments are strings and the second argument is a prefix of the first argument. See the [filter functions](https://jg-rp.github.io/python-jsonpath/functions/#startswith) documentation.
- The non-standard `keys()` function extension has been reimplemented. It used to be a simple Python function, `jsonpath.function_extensions.keys`. Now it is a "well-typed" class, `jsonpath.function_extensions.Keys`. See the [filter functions](https://jg-rp.github.io/python-jsonpath/functions/#keys) documentation.
- Added `cache_capacity`, `debug` and `thread_safe` arguments to `jsonpath.function_extensions.Match` and `jsonpath.function_extensions.Search` constructors.

**JSONPath features**

Expand Down
81 changes: 81 additions & 0 deletions jsonpath/function_extensions/_pattern.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,88 @@
from typing import List
from typing import Optional

try:
import regex as re

REGEX_AVAILABLE = True
except ImportError:
import re # type: ignore

REGEX_AVAILABLE = False

try:
from iregexp_check import check

IREGEXP_AVAILABLE = True
except ImportError:
IREGEXP_AVAILABLE = False

from jsonpath.exceptions import JSONPathError
from jsonpath.function_extensions import ExpressionType
from jsonpath.function_extensions import FilterFunction
from jsonpath.lru_cache import LRUCache
from jsonpath.lru_cache import ThreadSafeLRUCache


class AbstractRegexFilterFunction(FilterFunction):
"""Base class for filter function that accept regular expression arguments.

Arguments:
cache_capacity: The size of the regular expression cache.
debug: When `True`, raise an exception when regex pattern compilation
fails. The default - as required by RFC 9535 - is `False`, which
silently ignores bad patterns.
thread_safe: When `True`, use a `ThreadSafeLRUCache` instead of an
instance of `LRUCache`.
"""

arg_types = [ExpressionType.VALUE, ExpressionType.VALUE]
return_type = ExpressionType.LOGICAL

def __init__(
self,
*,
cache_capacity: int = 300,
debug: bool = False,
thread_safe: bool = False,
):
self.cache: LRUCache[str, Optional[re.Pattern]] = ( # type: ignore
ThreadSafeLRUCache(capacity=cache_capacity)
if thread_safe
else LRUCache(capacity=cache_capacity)
)

self.debug = debug

def check_cache(self, pattern: str) -> Optional[re.Pattern]: # type: ignore
"""Return a compiled re pattern if `pattern` is valid, or `None` otherwise."""
try:
_pattern = self.cache[pattern]
except KeyError:
if IREGEXP_AVAILABLE and not check(pattern):
if self.debug:
raise JSONPathError(
"search pattern is not a valid I-Regexp", token=None
) from None
_pattern = None
else:
if REGEX_AVAILABLE:
pattern = map_re(pattern)

try:
_pattern = re.compile(pattern)
except re.error:
if self.debug:
raise
_pattern = None

self.cache[pattern] = _pattern

return _pattern


def map_re(pattern: str) -> str:
"""Convert an I-Regexp pattern into a Python re pattern."""
escaped = False
char_class = False
parts: List[str] = []
Expand Down
51 changes: 10 additions & 41 deletions jsonpath/function_extensions/match.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,19 @@
"""The standard `match` function extension."""

try:
import regex as re
from ._pattern import AbstractRegexFilterFunction

REGEX_AVAILABLE = True
except ImportError:
import re # type: ignore

REGEX_AVAILABLE = False
class Match(AbstractRegexFilterFunction):
"""The standard `match` function."""

try:
from iregexp_check import check

IREGEXP_AVAILABLE = True
except ImportError:
IREGEXP_AVAILABLE = False

from jsonpath.function_extensions import ExpressionType
from jsonpath.function_extensions import FilterFunction

from ._pattern import map_re


class Match(FilterFunction):
"""A type-aware implementation of the standard `match` function."""

arg_types = [ExpressionType.VALUE, ExpressionType.VALUE]
return_type = ExpressionType.LOGICAL

def __call__(self, string: str, pattern: str) -> bool:
"""Return `True` if _string_ matches _pattern_, or `False` otherwise."""
# TODO: re.match caches compiled patterns internally, but `map_re` and `check`
# are not cached.

# TODO: validate literal patterns ar compile time?

if IREGEXP_AVAILABLE and (not isinstance(pattern, str) or not check(pattern)):
def __call__(self, value: object, pattern: object) -> bool:
"""Return `True` if _value_ matches _pattern_, or `False` otherwise."""
if not isinstance(value, str) or not isinstance(pattern, str):
return False

if REGEX_AVAILABLE:
try:
pattern = map_re(pattern)
except TypeError:
return False
_pattern = self.check_cache(pattern)

try:
return bool(re.fullmatch(pattern, string))
except (TypeError, re.error):
if _pattern is None:
return False

return bool(_pattern.fullmatch(value))
51 changes: 10 additions & 41 deletions jsonpath/function_extensions/search.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,19 @@
"""The standard `search` function extension."""

try:
import regex as re
from ._pattern import AbstractRegexFilterFunction

REGEX_AVAILABLE = True
except ImportError:
import re # type: ignore

REGEX_AVAILABLE = False
class Search(AbstractRegexFilterFunction):
"""The standard `search` function."""

try:
from iregexp_check import check

IREGEXP_AVAILABLE = True
except ImportError:
IREGEXP_AVAILABLE = False

from jsonpath.function_extensions import ExpressionType
from jsonpath.function_extensions import FilterFunction

from ._pattern import map_re


class Search(FilterFunction):
"""A type-aware implementation of the standard `search` function."""

arg_types = [ExpressionType.VALUE, ExpressionType.VALUE]
return_type = ExpressionType.LOGICAL

def __call__(self, string: str, pattern: str) -> bool:
"""Return `True` if _string_ contains _pattern_, or `False` otherwise."""
# TODO: re.search caches compiled patterns internally, but `map_re` and `check`
# are not cached.

# TODO: validate literal patterns ar compile time?

if IREGEXP_AVAILABLE and (not isinstance(pattern, str) or not check(pattern)):
def __call__(self, value: object, pattern: object) -> bool:
"""Return `True` if _value_ matches _pattern_, or `False` otherwise."""
if not isinstance(value, str) or not isinstance(pattern, str):
return False

if REGEX_AVAILABLE:
try:
pattern = map_re(pattern)
except TypeError:
return False
_pattern = self.check_cache(pattern)

try:
return bool(re.search(pattern, string))
except (TypeError, re.error):
if _pattern is None:
return False

return bool(_pattern.search(value))
130 changes: 130 additions & 0 deletions jsonpath/lru_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""An LRU cache with a mapping interface implemented using an ordered dict."""

from collections import OrderedDict
from threading import Lock
from typing import Generic
from typing import Iterator
from typing import Optional
from typing import Tuple
from typing import TypeVar
from typing import Union
from typing import overload

_KT = TypeVar("_KT")
_VT = TypeVar("_VT")
_T = TypeVar("_T")


class LRUCache(Generic[_KT, _VT]):
"""An LRU cache with a mapping interface."""

def __init__(self, capacity: int):
if capacity < 1:
raise ValueError("cache capacity must be greater than zero")

self.capacity = capacity
self._cache: OrderedDict[_KT, _VT] = OrderedDict()

def __getitem__(self, key: _KT) -> _VT:
value = self._cache[key] # This will raise a KeyError if key is not cached
self._cache.move_to_end(key)
return value

def __setitem__(self, key: _KT, value: _VT) -> None:
try:
self._cache.move_to_end(key)
except KeyError:
if len(self._cache) >= self.capacity:
self._cache.popitem(last=False)

self._cache[key] = value

def __delitem__(self, key: _KT) -> None:
del self._cache[key]

def __len__(self) -> int:
return len(self._cache)

def __iter__(self) -> Iterator[_KT]:
return reversed(self._cache)

def __contains__(self, key: _KT) -> bool:
return key in self._cache

@overload
def get(self, key: _KT) -> Optional[_VT]: ...
@overload
def get(self, key: _KT, default: _VT) -> _VT: ...
@overload
def get(self, key: _KT, default: _T) -> Union[_VT, _T]: ...
def get(self, key: _KT, default: object = None) -> object:
"""Return the cached value for _key_ if _key_ is in the cache, else default."""
try:
return self[key]
except KeyError:
return default

def keys(self) -> Iterator[_KT]:
"""Return an iterator over this cache's keys."""
return reversed(self._cache.keys())

def values(self) -> Iterator[_VT]:
"""Return an iterator over this cache's values."""
return reversed(self._cache.values())

def items(self) -> Iterator[Tuple[_KT, _VT]]:
"""Return an iterator over this cache's key/value pairs."""
return reversed(self._cache.items())


class ThreadSafeLRUCache(LRUCache[_KT, _VT]):
"""A thread safe LRU cache."""

def __init__(self, capacity: int):
super().__init__(capacity)
self._lock = Lock()

def __getitem__(self, key: _KT) -> _VT:
with self._lock:
return super().__getitem__(key)

def __setitem__(self, key: _KT, value: _VT) -> None:
with self._lock:
return super().__setitem__(key, value)

def __delitem__(self, key: _KT) -> None:
with self._lock:
return super().__delitem__(key)

def __contains__(self, key: _KT) -> bool:
with self._lock:
return super().__contains__(key)

@overload
def get(self, key: _KT) -> Optional[_VT]: ...
@overload
def get(self, key: _KT, default: _VT) -> _VT: ...
@overload
def get(self, key: _KT, default: _T) -> Union[_VT, _T]: ...
def get(self, key: _KT, default: object = None) -> object:
"""Return the cached value for _key_ if _key_ is in the cache, else default."""
# NOTE: self.__getitem__ is already acquiring the lock.
try:
return self[key]
except KeyError:
return default

def keys(self) -> Iterator[_KT]:
"""Return an iterator over this cache's keys."""
with self._lock:
return super().keys()

def values(self) -> Iterator[_VT]:
"""Return an iterator over this cache's values."""
with self._lock:
return super().values()

def items(self) -> Iterator[Tuple[_KT, _VT]]:
"""Return an iterator over this cache's key/value pairs."""
with self._lock:
return super().items()
16 changes: 13 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,14 @@ dependencies = [
]

[tool.hatch.envs.default.scripts]
cov = "pytest --cov-report=term-missing --cov-config=pyproject.toml --cov=jsonpath --cov=tests {args}"
cov-html = "pytest --cov-report=html --cov-config=pyproject.toml --cov=jsonpath --cov=tests {args}"
cov = [
"hatch run no-regex:cov",
"pytest --cov-append --cov-report=term-missing --cov-config=pyproject.toml --cov=jsonpath --cov=tests {args}"
]
cov-html = [
"hatch run no-regex:cov",
"pytest --cov-append --cov-report=html --cov-config=pyproject.toml --cov=jsonpath --cov=tests {args}",
]
no-cov = "cov --no-cov {args}"
test = "pytest {args}"
lint = "ruff check ."
Expand All @@ -80,7 +86,11 @@ build = "mkdocs build --clean --strict"
serve = "mkdocs serve --dev-addr localhost:8000"

[tool.hatch.envs.no-regex]
dependencies = ["pytest"]
dependencies = ["pytest", "pytest-cov"]

[tool.hatch.envs.no-regex.scripts]
cov = "pytest --cov-report=term-missing --cov-config=pyproject.toml --cov=jsonpath --cov=tests tests/test_compliance.py {args}"


[tool.coverage.run]
branch = true
Expand Down
Loading
Loading