Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `pw.io.postgres.write` now streams each batch into PostgreSQL through the binary `COPY` protocol instead of issuing one `INSERT` per row, giving a large throughput improvement (up to ~100x) on bulk writes. Both output modes use it: stream-of-changes copies straight into the target, while snapshot mode stages each batch in a temporary table and merges it with a single set-based upsert/delete.

### Fixed
- Fixed an exponential ReDoS (Algorithmic Complexity) vulnerability in `_globmatch` by introducing memoization, reducing time complexity from $O(2^k)$ to $O(N \times M)$ when evaluating unauthenticated `filepath_globpattern` filters.
- `pw.io.milvus.write` no longer intermittently fails with a "server unavailable" / "connect failed" error when pointed at a local `.db` file. The embedded local Milvus server reports itself as started before it actually accepts connections, so under load the first connection could lose the race against the server coming up; the connector now retries the initial connection until the local server is ready.
- Improved concurrent write handling in pw.io.sqlite.write for SQLite databases. Writes to the same database file now produce deterministic output in multi-worker and multi-table setups.
- `pw.io.elasticsearch.write` no longer fails when a minibatch is big enough that its Elasticsearch `_bulk` request would exceed a server-side limit. The connector reads both the cluster's `http.max_content_length` (the `413 Request Entity Too Large` limit) and `indexing_pressure.memory.limit` (the `429 Too Many Requests` limit, which on a small-heap node trips well below 100 MB) at start-up, and splits the buffered documents across as many bulk requests as needed to stay under whichever is hit first — so large batches are still written in as few requests as possible instead of being rejected. (Both limits fall back to a conservative default if they cannot be read.)
Expand Down
31 changes: 22 additions & 9 deletions python/pathway/stdlib/ml/classifiers/_knn_lsh.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from typing import Literal

import jmespath
import jmespath.exceptions
import jmespath.functions
import numpy as np

Expand Down Expand Up @@ -98,18 +99,30 @@ def knn_lsh_classifier_train(


# support for glob metadata search
def _globmatch_impl(pat_i, pat_n, pattern, p_i, p_n, path):
"""Match pattern to path, recursively expanding **."""
def _globmatch_impl(pat_i, pat_n, pattern, p_i, p_n, path, memo):
"""Match pattern to path, recursively expanding **, using memoization."""
state = (pat_i, p_i)
if state in memo:
return memo[state]

if pat_i == pat_n:
return p_i == p_n
memo[state] = p_i == p_n
return memo[state]
if p_i == p_n:
return False
memo[state] = False
return memo[state]
if pattern[pat_i] == "**":
return _globmatch_impl(
pat_i, pat_n, pattern, p_i + 1, p_n, path
) or _globmatch_impl(pat_i + 1, pat_n, pattern, p_i, p_n, path)
res = _globmatch_impl(
pat_i, pat_n, pattern, p_i + 1, p_n, path, memo
) or _globmatch_impl(pat_i + 1, pat_n, pattern, p_i, p_n, path, memo)
memo[state] = res
return res
if fnmatch.fnmatch(path[p_i], pattern[pat_i]):
return _globmatch_impl(pat_i + 1, pat_n, pattern, p_i + 1, p_n, path)
res = _globmatch_impl(pat_i + 1, pat_n, pattern, p_i + 1, p_n, path, memo)
memo[state] = res
return res

memo[state] = False
return False


Expand All @@ -118,7 +131,7 @@ def _globmatch(pattern, path):
pattern_parts = pattern.split("/")
path_parts = path.split("/")
return _globmatch_impl(
0, len(pattern_parts), pattern_parts, 0, len(path_parts), path_parts
0, len(pattern_parts), pattern_parts, 0, len(path_parts), path_parts, {}
)


Expand Down
Loading