Skip to content

Commit fae7aae

Browse files
committed
fix(security): prevent AST injection via filepath_globpattern validation
1 parent af10d4f commit fae7aae

3 files changed

Lines changed: 20 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1010
- `pw.io.postgres.write` now streams each batch into PostgreSQL through the binary `COPY` protocol instead of issuing one `INSERT` per row, giving a large throughput improvement (up to ~100x) on bulk writes. Both output modes use it: stream-of-changes copies straight into the target, while snapshot mode stages each batch in a temporary table and merges it with a single set-based upsert/delete.
1111

1212
### Fixed
13+
- Fixed a critical JMESPath AST injection vulnerability in the Document Store `_get_jmespath_filter` by introducing strict regex validation on the `filepath_globpattern` parameter, preventing unauthorized logic short-circuiting.
1314
- `pw.io.milvus.write` no longer intermittently fails with a "server unavailable" / "connect failed" error when pointed at a local `.db` file. The embedded local Milvus server reports itself as started before it actually accepts connections, so under load the first connection could lose the race against the server coming up; the connector now retries the initial connection until the local server is ready.
14-
- `BedrockChat` now correctly routes `top_k` and other model-specific arguments to the AWS Converse API via `additionalModelRequestFields`.
1515
- Improved concurrent write handling in pw.io.sqlite.write for SQLite databases. Writes to the same database file now produce deterministic output in multi-worker and multi-table setups.
1616
- `pw.io.elasticsearch.write` no longer fails when a minibatch is big enough that its Elasticsearch `_bulk` request would exceed a server-side limit. The connector reads both the cluster's `http.max_content_length` (the `413 Request Entity Too Large` limit) and `indexing_pressure.memory.limit` (the `429 Too Many Requests` limit, which on a small-heap node trips well below 100 MB) at start-up, and splits the buffered documents across as many bulk requests as needed to stay under whichever is hit first — so large batches are still written in as few requests as possible instead of being rejected. (Both limits fall back to a conservative default if they cannot be read.)
1717
- `pw.io.elasticsearch.write` now retries transient bulk failures with backoff instead of failing the run on the first hiccup. A whole-request rejection or an individual document failing with `429`/`503` (back-pressure / temporary unavailability) is retried — resending only the documents the server reports as not yet applied, so a retry never duplicates data — while deterministic per-document failures (e.g. a type-mismatched value rejected with `400`) are now logged and skipped rather than silently dropped.

python/pathway/xpacks/llm/document_store.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
multiple methods for querying.
77
"""
88
import json
9+
import re
910
import warnings
1011
from collections.abc import Callable, Sequence
1112
from enum import Enum
@@ -39,6 +40,11 @@ def _get_jmespath_filter(metadata_filter: str, filepath_globpattern: str) -> str
3940
)
4041
ret_parts.append(f"({metadata_filter})")
4142
if filepath_globpattern:
43+
if not re.match(r"^[a-zA-Z0-9_\-\*\?\.\/\\ ]+$", filepath_globpattern):
44+
raise ValueError(
45+
"Invalid characters detected in filepath glob pattern. "
46+
"Structural sequence manipulation characters are rejected."
47+
)
4248
ret_parts.append(f"globmatch('{filepath_globpattern}', path)")
4349
if ret_parts:
4450
return " && ".join(ret_parts)

python/pathway/xpacks/llm/tests/test_document_store.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -745,3 +745,16 @@ def add_id(text: str, metadata: dict) -> tuple:
745745
(query_result,) = val.as_list() # extract the single match
746746
assert isinstance(query_result, dict)
747747
assert query_result["metadata"]["id"] == 1
748+
749+
750+
def test_get_jmespath_filter_structural_integrity():
751+
from pathway.xpacks.llm.document_store import _get_jmespath_filter
752+
753+
f = _get_jmespath_filter.__wrapped__
754+
755+
assert f("tenant == 'A'", "*.pdf") == "(tenant == 'A') && globmatch('*.pdf', path)"
756+
757+
# Injection containment check
758+
malicious_input = "x', path) || true"
759+
with pytest.raises(ValueError, match="Invalid characters detected"):
760+
f("tenant == 'A'", malicious_input)

0 commit comments

Comments
 (0)