Skip to content

[python] Support query auth (row filter & column masking) for REST catalog#8136

Open
MgjLLL wants to merge 4 commits into
apache:masterfrom
MgjLLL:python-query-auth
Open

[python] Support query auth (row filter & column masking) for REST catalog#8136
MgjLLL wants to merge 4 commits into
apache:masterfrom
MgjLLL:python-query-auth

Conversation

@MgjLLL

@MgjLLL MgjLLL commented Jun 5, 2026

Copy link
Copy Markdown

Purpose

Adds query-auth support to the Python client so it honors the row-level filter and column masking rules returned by a REST catalog, matching the existing JVM client behavior.

When the new option query-auth.enabled is set to true, before producing a Plan the client calls POST /v1/.../databases/{db}/tables/{tb}/auth with the projected fields, receives { filter, columnMasking }, and applies them on the read path:

  • RESTApi.auth_table_query issues the call (new request/response models AuthTableQueryRequest / AuthTableQueryResponse, new path in ResourcePaths.auth_table).
  • TableQueryAuth / TableQueryAuthResult (catalog/table_query_auth.py) wrap the result and convert each split to a QueryAuthSplit.
  • predicate_json_parser (common/predicate_json_parser.py) parses Paimon predicate JSON into a PyArrow compute filter (EQ/NEQ/LT/LTEQ/GT/GTEQ/IS_NULL/IS_NOT_NULL/IN/NOT_IN/STARTS_WITH/ENDS_WITH/CONTAINS/AND/OR/NOT).
  • AuthFilterReader / AuthMaskingReader / ColumnProjectReader (read/reader/auth_masking_reader.py) implement row filtering, column masking transforms (NULL, FIELD_REF, CAST, UPPER, LOWER, CONCAT, CONCAT_WS) and final projection back to the user's requested columns.
  • read_builder / stream_read_builder / table_read / table_scan / file_store_table / catalog_environment / rest_catalog are wired to invoke the auth call and pull extra fields required only by the auth filter.

Behavior is gated by the new CoreOptions.QUERY_AUTH_ENABLED (query-auth.enabled, default false), so existing users see no change.

Tests

Three new test files (994+ lines, all passing locally under pytest):

  • paimon-python/pypaimon/tests/predicate_json_parser_test.py — covers each predicate kind, nested AND/OR/NOT, type coercion, null handling, and extract_referenced_fields.
  • paimon-python/pypaimon/tests/auth_masking_reader_test.py — covers each masking transform, missing-field validation, and projection back to the user-requested columns.
  • paimon-python/pypaimon/tests/table_query_auth_test.py — end-to-end coverage: REST catalog calls auth_table_query, the result is plumbed into the plan, splits become QueryAuthSplit, and reads return filtered + masked rows.

Local check:

cd paimon-python
python -m pytest pypaimon/tests/predicate_json_parser_test.py \
                  pypaimon/tests/auth_masking_reader_test.py \
                  pypaimon/tests/table_query_auth_test.py -q
flake8 --config dev/cfg.ini pypaimon/  # 已在改动范围内通过

API and Format

  • New catalog option: query-auth.enabled (boolean, default false).
  • New REST endpoint consumed by the client: POST /v1/{prefix}/databases/{db}/tables/{tb}/auth. Request { "select": [...] }, response { "filter": [<predicate-json>...], "columnMasking": { <col>: <transform-json>, ... } }. The contract follows the existing Java client; no server-side change is required for catalogs that already implement query auth.
  • No change to existing user-facing Python APIs. New types (AuthTableQueryRequest, AuthTableQueryResponse, TableQueryAuth, TableQueryAuthResult, QueryAuthSplit, AuthFilterReader, AuthMaskingReader, ColumnProjectReader) are additive and live under existing modules.
  • File format / on-disk layout: unchanged.

Documentation

The new option query-auth.enabled should be reflected in the Python configuration reference. Happy to add the docs entry in this PR or in a follow-up — please advise.

This closes #8135

…talog

Adds query-auth support to the Python client so it honors the row-level
filter and column masking rules returned by a REST catalog, matching the
existing JVM client behavior.

When the new option `query-auth.enabled` is set to true, the client
calls `POST /v1/.../databases/{db}/tables/{tb}/auth` before producing a
plan, receives `{ filter, columnMasking }`, and applies them on the
read path:

  * `predicate_json_parser` parses Paimon predicate JSON into a
    PyArrow compute filter (EQ/NEQ/LT/LTEQ/GT/GTEQ/IS_NULL/IS_NOT_NULL/
    IN/NOT_IN/STARTS_WITH/ENDS_WITH/CONTAINS/AND/OR/NOT).
  * `AuthFilterReader` / `AuthMaskingReader` / `ColumnProjectReader`
    perform row filtering, column masking transforms (NULL, FIELD_REF,
    CAST, UPPER, LOWER, CONCAT, CONCAT_WS) and final projection back to
    the user's requested columns.
  * `TableQueryAuth` / `TableQueryAuthResult` wrap the result and
    convert each split to a `QueryAuthSplit`.

Behavior is gated by `CoreOptions.QUERY_AUTH_ENABLED` (default false),
so existing users see no change.
@JingsongLi

Copy link
Copy Markdown
Contributor

I found a few correctness issues in the query-auth paths introduced here:

  1. paimon-python/pypaimon/read/datasource/split_provider.py:127 constructs ReadBuilder(self._ensure_table()) directly. That bypasses FileStoreTable.new_read_builder(), which is where the REST query auth is injected. As a result, pypaimon.ray.read_paimon(...) can read REST tables without applying server-side row filters or column masking. I think this should either call self._ensure_table().new_read_builder() or explicitly pass the table's query auth into the builder, with a Ray regression test for row filtering/masking.

  2. paimon-python/pypaimon/read/stream_read_builder.py:117 stores _query_auth, but new_streaming_scan() does not pass it into AsyncStreamingTableScan. The plans returned from streaming_table_scan.py:322 and streaming_table_scan.py:386 also do not go through auth_result.convert_plan(). So table.new_stream_read_builder() skips row filters and column masking for both the initial scan and later delta/changelog scans. The streaming scan should preserve and apply query auth before returning each plan.

  3. The auth reader wrappers currently assume the inner reader supports read_arrow_batch() (paimon-python/pypaimon/read/reader/auth_masking_reader.py:38 and :66). For primary-key tables with non-raw-convertible splits, TableRead can create a MergeFileSplitRead, whose create_reader() returns the normal row RecordReader path rather than a RecordBatchReader. Wrapping that in AuthFilterReader/AuthMaskingReader will fail with AttributeError when query auth is enabled. This needs either a row-reader auth path, conversion to a batch-capable reader before wrapping, or routing/rejecting these splits explicitly.

- Ray: use table.new_read_builder() instead of direct ReadBuilder()
- Streaming: pass query_auth to AsyncStreamingTableScan, apply to all plans
- Merge reader: add RecordReaderToBatchAdapter for primary-key tables
- Parallel: use _create_reader_for_split, add raw_convertible proxy
@MgjLLL

MgjLLL commented Jun 8, 2026

Copy link
Copy Markdown
Author

Fixes for issues raised by @JingsongLi, plus one additional issue found during analysis.
8 files changed, +188 -13 lines.

Fix 1: Ray read path bypasses auth (split_provider.py)

_ensure_planned() directly constructed ReadBuilder(table), bypassing the query_auth injection from FileStoreTable.new_read_builder().

Fix: Changed to self._ensure_table().new_read_builder(). Removed the direct ReadBuilder import.

Fix 2: Streaming read path skips auth entirely (streaming_table_scan.py, stream_read_builder.py)

StreamReadBuilder stored _query_auth but never passed it to AsyncStreamingTableScan. All three plan creation methods (initial/follow-up/catch-up) produced plans without auth wrapping.

Fix:

  • Added query_auth parameter to AsyncStreamingTableScan.__init__()
  • Added _apply_auth(plan) method that calls query_auth(select)convert_plan(plan)
  • Applied auth to _create_initial_plan, _create_follow_up_plan, _create_catch_up_plan
  • Extracted _create_initial_plan_raw() to avoid double auth in catch-up path
  • StreamReadBuilder.new_streaming_scan() now passes query_auth

Fix 3: Primary-key table merge reader incompatible with auth wrappers (auth_masking_reader.py, table_read.py)

MergeFileSplitRead.create_reader() returns a RecordReader (row-level), but AuthFilterReader/AuthMaskingReader require RecordBatchReader (batch-level with read_arrow_batch()). Java has a unified RecordReader<InternalRow> so this
mismatch doesn't exist in JVM.

Fix:

  • Added RecordReaderToBatchAdapter(RecordBatchReader) that collects OffsetRow tuples from read_batch()/next() and converts them to pa.RecordBatch
  • _authed_reader() now detects non-RecordBatchReader and wraps with the adapter before applying auth wrappers

Fix 4: Parallel read path bypasses auth + missing raw_convertible proxy (table_read.py, query_auth_split.py)

_read_one_split_to_batches() called _create_split_read(split).create_reader() directly, bypassing QueryAuthSplit detection. Additionally, QueryAuthSplit lacked a raw_convertible property proxy, causing AttributeError when accessed.

Fix:

  • Changed to self._create_reader_for_split(split) which correctly handles QueryAuthSplit
  • Added raw_convertible property proxy to QueryAuthSplit

New Tests (for fixes)

  • TestQueryAuthSplitRawConvertible (2 tests) — verifies raw_convertible proxy for true/false
  • TestRecordReaderToBatchAdapter (5 tests) — basic conversion, multi-batch, empty reader, close delegation, integration with AuthFilterReader

All 90 auth-related tests pass. Full test suite: 318/324 pass (6 pre-existing failures unrelated to this PR).

@MgjLLL

MgjLLL commented Jun 8, 2026

Copy link
Copy Markdown
Author

I found a few correctness issues in the query-auth paths introduced here:

  1. paimon-python/pypaimon/read/datasource/split_provider.py:127 constructs ReadBuilder(self._ensure_table()) directly. That bypasses FileStoreTable.new_read_builder(), which is where the REST query auth is injected. As a result, pypaimon.ray.read_paimon(...) can read REST tables without applying server-side row filters or column masking. I think this should either call self._ensure_table().new_read_builder() or explicitly pass the table's query auth into the builder, with a Ray regression test for row filtering/masking.
  2. paimon-python/pypaimon/read/stream_read_builder.py:117 stores _query_auth, but new_streaming_scan() does not pass it into AsyncStreamingTableScan. The plans returned from streaming_table_scan.py:322 and streaming_table_scan.py:386 also do not go through auth_result.convert_plan(). So table.new_stream_read_builder() skips row filters and column masking for both the initial scan and later delta/changelog scans. The streaming scan should preserve and apply query auth before returning each plan.
  3. The auth reader wrappers currently assume the inner reader supports read_arrow_batch() (paimon-python/pypaimon/read/reader/auth_masking_reader.py:38 and :66). For primary-key tables with non-raw-convertible splits, TableRead can create a MergeFileSplitRead, whose create_reader() returns the normal row RecordReader path rather than a RecordBatchReader. Wrapping that in AuthFilterReader/AuthMaskingReader will fail with AttributeError when query auth is enabled. This needs either a row-reader auth path, conversion to a batch-capable reader before wrapping, or routing/rejecting these splits explicitly.

@JingsongLi All 3 issues fixed (+ 1 additional parallel path bypass found during analysis). See updated PR description. PTAL.

Return None instead of a local lambda from table_query_auth() when
auth is disabled, since pickle cannot serialize local lambdas. This
fixes serializable_test and ray_sink_test failures.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
if not self.filter and not self.column_masking:
return plan
auth_splits = [QueryAuthSplit(split, self) for split in plan.splits()]
return Plan(auth_splits)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java TableScan.Plan does not carry a snapshot id, but Python Plan does and the update / row-id update paths use it as check_from_snapshot. Wrapping the plan here drops plan.snapshot_id, so a query-auth table planned from a non-empty snapshot becomes snapshot_id=None; table_update then emits commit messages with -1, which disables the row-id conflict checks (and related global-index update checks). Please preserve the original plan metadata, e.g. Plan(auth_splits, snapshot_id=plan.snapshot_id).


return reader

def _create_split_read_with_read_type(self, split, read_type):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This auth-specific construction bypasses the normal PK read path above. In _create_split_read, PK tables inject missing sequence.field columns into the inner read type and then project them back out, matching the Java withReadType + outer projection behavior. Here, if query auth is enabled and the user projects id,val from a PK table with sequence.field=ts, MergeFileSplitRead is built without ts; that can either fail with sequence.field ... not found or merge by file sequence instead of the configured user sequence. Please reuse the existing _create_split_read widening/project-back logic for effective_read_type, or factor it so the auth path cannot drift from the normal PK path.

elif function == "LIKE":
raw = literals[0]
escaped = re.escape(raw)
pattern = escaped.replace("%", ".*").replace("_", ".")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not match the JVM LIKE semantics. Java treats backslash as the default escape character before expanding % / _, so a policy predicate like LIKE admin\\_% matches admin_foo and not adminXfoo. Escaping the whole string first and then replacing every % / _ makes escaped wildcards behave as wildcards (or requires a literal backslash), so Python can allow/deny different rows from the Java client for the same auth filter. Please port the Java Like.sqlToRegexLike behavior, including invalid escape handling.

Addresses the three review comments from @JingsongLi on commit 482fdad,
plus related correctness issues found during follow-up self-review.

## Fixes for round-2 review comments

1. Preserve Plan.snapshot_id in TableQueryAuthResult.convert_plan
   Java TableScan.Plan does not carry a snapshot id, but Python Plan
   does, and table_update / table_update_by_row_id rely on it as
   check_from_snapshot. Wrapping was dropping snapshot_id, so
   query-auth tables planned from a non-empty snapshot lost
   row-id conflict / global-index update checks.

2. Reuse the normal PK read path instead of a parallel auth-only one
   Removed _create_split_read_with_read_type and made
   _create_split_read accept an optional read_type. The auth path
   now goes through the same widening/project-back logic that
   injects sequence.field columns for PK MergeFileSplitRead, so
   `id,val` projection on a PK table with sequence.field=ts no
   longer fails or merges by file sequence.

3. Port Java Like.sqlToRegexLike escape semantics
   `LIKE admin\_%` now matches `admin_foo` and not `adminXfoo`,
   matching JVM behavior. Backslash is the default escape character;
   invalid escape sequences raise instead of being silently treated
   as wildcards.

## Related correctness fixes found during self-review

4. Streaming path now applies query auth to every plan
   StreamReadBuilder forwards query_auth and read_type to
   AsyncStreamingTableScan, which calls _apply_auth on initial,
   follow-up, and catch-up plans. Catch-up reuses
   _create_initial_plan_raw to avoid double auth.

5. Iterator / parallel / Ray paths route through
   _create_reader_for_split, so QueryAuthSplit detection is
   centralized. to_iterator gains a RecordBatchReader branch for
   PyTorch / generic iterator consumers of auth-wrapped splits.
   scan_with_stats also applies auth for parity with plan().

6. Row kind preserved through the auth pipeline on PK tables
   RecordReaderToBatchAdapter encodes row_kind into a `_row_kind`
   column when include_row_kind=True; ColumnProjectReader keeps
   `_row_kind` even when the user projection drops it; to_iterator
   restores it via OffsetRow.set_row_kind_byte without leaking
   the column into row_tuple.

7. RecordReaderToBatchAdapter no longer drops rows when an inner
   read_batch yields more than chunk_size rows: extra rows are
   carried over to the next flush.

8. QueryAuthSplit attribute delegation
   __getattr__ forwards file_size, file_paths, data_deletion_files,
   raw_convertible, etc. to the inner split for Ray/explain/Daft
   paths, while guarding _-prefixed names to avoid pickle recursion.

9. Daft explain_scan now reports the same reader_mode as the real
   read path. ExplainSplitInfo carries has_auth, _build_explain_result
   populates it from QueryAuthSplit, and Daft's _split_has_auth
   accepts both QueryAuthSplit and the explain descriptor so
   "query auth active" appears as a fallback reason consistently.

10. table_query_auth returns None (not a local lambda) when auth is
    disabled, so FileStoreTable remains pickle-safe for Ray.

## Tests

- New unit tests cover snapshot_id preservation, row_kind through
  the adapter / project reader, chunk-size carry-over, and
  QueryAuthSplit attribute delegation incl. pickle round-trip.
- All query-auth related tests (45) pass under pytest:
  `pytest pypaimon/tests/ -k "explain or query_auth or table_query_auth"`

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@MgjLLL

MgjLLL commented Jun 11, 2026

Copy link
Copy Markdown
Author

@JingsongLi All 3 issues fixed (+ several related correctness issues found during follow-up self-review). New commit 7d037c6a8 squashes the round-2 fixes into one increment on top of 482fdadd7. PTAL.

Fixes for the three review comments

1. catalog/table_query_auth.py — wrapping drops Plan.snapshot_id

convert_plan now returns Plan(auth_splits, snapshot_id=plan.snapshot_id).

Without this, query-auth tables planned from a non-empty snapshot lost row-id conflict / global-index update checks (check_from_snapshot=-1). New test TestConvertPlanPreservesSnapshotId covers both non-null and None cases.

2. read/table_read.py_create_split_read_with_read_type bypasses PK widening

Removed the parallel method. _create_split_read now accepts an optional read_type, so the auth path goes through the same sequence.field injection + outer projection used by the normal PK read path.

id,val projection over a PK table with sequence.field=ts no longer fails or merges by file sequence.

3. common/predicate_json_parser.pyLIKE escape mismatches JVM

Ported Like.sqlToRegexLike: backslash is the default escape character, \% / \_ are literal, invalid escape sequences raise.

LIKE admin\_% now matches admin_foo (not adminXfoo), so policy predicates evaluate identically to the JVM client.

Related correctness fixes (self-found, all on the same auth surface)

  • Streaming path previously stored _query_auth but never applied it. StreamReadBuilder now forwards query_auth and read_type to AsyncStreamingTableScan, which calls _apply_auth on initial / follow-up / catch-up plans. Catch-up uses _create_initial_plan_raw to avoid double auth.
  • Iterator / parallel / Ray paths consolidated through _create_reader_for_split. to_iterator gains a RecordBatchReader branch for PyTorch / generic iterator consumers. scan_with_stats applies auth for parity with plan().
  • Row kind preserved on PK + auth: RecordReaderToBatchAdapter encodes row_kind into a _row_kind column when include_row_kind=True; ColumnProjectReader keeps _row_kind even when user projection drops it; to_iterator restores it via OffsetRow.set_row_kind_byte without leaking the column into row_tuple.
  • Adapter chunking: when an inner read_batch yields more rows than chunk_size, the surplus rows are carried over to the next flush instead of being dropped.
  • QueryAuthSplit attribute delegation: __getattr__ forwards file_size, file_paths, data_deletion_files, raw_convertible, etc. to the inner split (Ray / explain / Daft paths), while guarding _-prefixed names to avoid pickle recursion.
  • Daft explain_scan consistency: ExplainSplitInfo now carries has_auth, populated from QueryAuthSplit in _build_explain_result. Daft's _split_has_auth accepts both real splits and the explain descriptor, so reported reader_mode and fallback_reason="query auth active" match the actual read path.
  • Pickle safety when auth is disabled: table_query_auth returns None instead of a local lambda, keeping FileStoreTable Ray-pickle-safe.

Tests

  • New tests for snapshot_id preservation, row_kind through adapter / project reader, chunk-size carry-over, and QueryAuthSplit attribute delegation incl. pickle round-trip.
  • 45 query-auth related tests pass:
pytest pypaimon/tests/ -k "explain or query_auth or table_query_auth"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] [python] Support query auth (row filter & column masking) for REST catalog

2 participants