Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `pw.io.postgres.write` now streams each batch into PostgreSQL through the binary `COPY` protocol instead of issuing one `INSERT` per row, giving a large throughput improvement (up to ~100x) on bulk writes. Both output modes use it: stream-of-changes copies straight into the target, while snapshot mode stages each batch in a temporary table and merges it with a single set-based upsert/delete.

### Fixed
- Fixed a critical JMESPath AST injection vulnerability in the Document Store `_get_jmespath_filter` by introducing strict regex validation on the `filepath_globpattern` parameter, preventing unauthorized logic short-circuiting.
- `pw.io.milvus.write` no longer intermittently fails with a "server unavailable" / "connect failed" error when pointed at a local `.db` file. The embedded local Milvus server reports itself as started before it actually accepts connections, so under load the first connection could lose the race against the server coming up; the connector now retries the initial connection until the local server is ready.
- `BedrockChat` now correctly routes `top_k` and other model-specific arguments to the AWS Converse API via `additionalModelRequestFields`.
- Improved concurrent write handling in pw.io.sqlite.write for SQLite databases. Writes to the same database file now produce deterministic output in multi-worker and multi-table setups.
- `pw.io.elasticsearch.write` no longer fails when a minibatch is big enough that its Elasticsearch `_bulk` request would exceed a server-side limit. The connector reads both the cluster's `http.max_content_length` (the `413 Request Entity Too Large` limit) and `indexing_pressure.memory.limit` (the `429 Too Many Requests` limit, which on a small-heap node trips well below 100 MB) at start-up, and splits the buffered documents across as many bulk requests as needed to stay under whichever is hit first — so large batches are still written in as few requests as possible instead of being rejected. (Both limits fall back to a conservative default if they cannot be read.)
- `pw.io.elasticsearch.write` now retries transient bulk failures with backoff instead of failing the run on the first hiccup. A whole-request rejection or an individual document failing with `429`/`503` (back-pressure / temporary unavailability) is retried — resending only the documents the server reports as not yet applied, so a retry never duplicates data — while deterministic per-document failures (e.g. a type-mismatched value rejected with `400`) are now logged and skipped rather than silently dropped.
Expand Down
6 changes: 6 additions & 0 deletions python/pathway/xpacks/llm/document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
multiple methods for querying.
"""
import json
import re
import warnings
from collections.abc import Callable, Sequence
from enum import Enum
Expand Down Expand Up @@ -39,6 +40,11 @@ def _get_jmespath_filter(metadata_filter: str, filepath_globpattern: str) -> str
)
ret_parts.append(f"({metadata_filter})")
if filepath_globpattern:
if not re.match(r"^[a-zA-Z0-9_\-\*\?\.\/\\ ]+$", filepath_globpattern):
raise ValueError(
"Invalid characters detected in filepath glob pattern. "
"Structural sequence manipulation characters are rejected."
)
ret_parts.append(f"globmatch('{filepath_globpattern}', path)")
if ret_parts:
return " && ".join(ret_parts)
Expand Down
13 changes: 13 additions & 0 deletions python/pathway/xpacks/llm/tests/test_document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,3 +745,16 @@ def add_id(text: str, metadata: dict) -> tuple:
(query_result,) = val.as_list() # extract the single match
assert isinstance(query_result, dict)
assert query_result["metadata"]["id"] == 1


def test_get_jmespath_filter_structural_integrity():
from pathway.xpacks.llm.document_store import _get_jmespath_filter

f = _get_jmespath_filter.__wrapped__

assert f("tenant == 'A'", "*.pdf") == "(tenant == 'A') && globmatch('*.pdf', path)"

# Injection containment check
malicious_input = "x', path) || true"
with pytest.raises(ValueError, match="Invalid characters detected"):
f("tenant == 'A'", malicious_input)
Loading