Skip to content

[Data] Add Kinetica datasource and datasink#63070

Open
kylesutton wants to merge 12 commits into
ray-project:masterfrom
kineticadb:master
Open

[Data] Add Kinetica datasource and datasink#63070
kylesutton wants to merge 12 commits into
ray-project:masterfrom
kineticadb:master

Conversation

@kylesutton
Copy link
Copy Markdown

Description

Adds a new Ray Data connector for https://www.kinetica.com/, a distributed in-memory analytical database, following the standard sibling-connector layout (ClickHouse, BigQuery, Iceberg, etc.) under python/ray/data/_internal/datasource/.

Two ingestion paths are exposed, both gated on the optional gpudb Python SDK, so the base Ray install is unaffected:

  • ray.data.read_kinetica() / Dataset.write_kinetica()

    • Uses native GPUdb API + multihead ingest
    • For whole-table reads, bulk loads, parallel writes to a sharded cluster
  • ray.data.read_kinetica_sql() / Dataset.write_kinetica_sql()

    • Uses Kinetica DB-API 2.0, layered on Ray's read_sql / write_sql
    • For arbitrary SQL with joins, aggregations, parameterized inserts

Functions are marked alpha.

Related issues

No related issues.

Additional information

Included

  • kinetica_datasource.pyKineticaDatasource with offset-based parallel pagination, column projection, filter pushdown, sort_by / limit support.
  • kinetica_datasink.pyKineticaDatasink with create / append / overwrite modes, configurable batch size, optional Kinetica distributed ingestion, and a KineticaTableSettings dataclass for table-level Kinetica options.
  • kinetica_sql_connection.py — connection-factory helper that produces a DB-API 2.0 connection (username/password or OAuth token) for reuse with the existing read_sql / write_sql plumbing.
  • kinetica_sql_utils.py — SQL helpers shared by the SQL path.
  • kinetica_type_utils.py — Kinetica ↔ Arrow type conversion.
  • Public surface wired up in python/ray/data/__init__.py, python/ray/data/read_api.py, python/ray/data/dataset.py.
  • python/ray/data/tests/datasource/test_kinetica.py — unit-test suite covering filter-safety, type conversion, datasource read planning, datasink batching, and SQL-path connection construction. The gpudb SDK is mocked, so the test runs in the standard CI image.
  • New test_kinetica target in python/ray/data/BUILD.bazel.

Review Notes

  • Optional dependency gpudb is imported lazily inside methods, so importing ray.data on a stock install does not require the SDK.
  • Filter-injection guard, rejecting fragments containing ;, {, or }.
  • Parallel reads partition with offset/limit pagination sized by batch_size. Parallel writes use Kinetica's distributed ingest by default, so each Ray write task targets the appropriate shard directly rather than funneling through the head node.

Test Plan

  • bazel test //python/ray/data:test_kinetica passes locally and in CI.
  • ./ci/lint/lint.sh pre_commit clean.
  • Manual smoke test against a live Kinetica cluster: read_kinetica round-trips a table, write_kinetica in overwrite mode replaces it, read_kinetica_sql runs a JOIN.
  • Confirm import ray.data still works without gpudb installed (lazy import).

@kylesutton kylesutton requested a review from a team as a code owner May 1, 2026 14:24
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive integration for Kinetica within Ray Data, enabling both reading and writing capabilities. It includes a native KineticaDatasource for parallel reads using pagination and a KineticaDatasink that leverages Kinetica's multihead ingestion for high-performance distributed writes. Additionally, it provides SQL-based integration via DB-API 2.0. Feedback focuses on improving the robustness of write mode validation, making upsert behavior configurable, and optimizing memory usage during data conversion.

"append": KineticaSinkMode.APPEND,
"overwrite": KineticaSinkMode.OVERWRITE,
}
mode = mode_map.get(mode.lower(), KineticaSinkMode.APPEND)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation silently defaults to APPEND mode if an unrecognized string is provided for the mode parameter. This can lead to unexpected behavior, such as appending data when the user intended to overwrite but made a typo. It is safer to raise a ValueError for invalid modes.

Suggested change
mode = mode_map.get(mode.lower(), KineticaSinkMode.APPEND)
if mode.lower() not in mode_map:
raise ValueError(
f"Invalid write mode: '{mode}'. "
f"Expected one of: {list(mode_map.keys())}"
)
mode = mode_map[mode.lower()]

Comment on lines +448 to +451
logger.warning(
f"Could not create GPUdbTable, falling back to direct insert: {e}"
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Falling back to a direct (single-head) insert when GPUdbTable initialization fails can cause significant performance degradation, especially if use_multihead was explicitly enabled for high-throughput ingestion. It is better to let the exception propagate or at least check if use_multihead is required before falling back to ensure the user is aware of the performance impact.

Comment on lines +513 to +516
options={
"update_on_existing_pk": "true",
"return_individual_errors": "true",
},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The update_on_existing_pk option is hardcoded to "true", which effectively turns all inserts into upserts if the table has a primary key. This behavior might not be desired in all cases (e.g., if the user wants the write to fail on duplicate keys) and should ideally be configurable via the options parameter or a setting in KineticaTableSettings.

field_type = field_def.get("type")

# Skip columns not in the requested list
if self._columns and field_name not in self._columns:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using if self._columns will evaluate to False if columns is an empty list [], causing the datasource to read all columns instead of none. While reading zero columns is rare, it's more idiomatic to check self._columns is not None to distinguish between "all columns" and a specific selection.

Suggested change
if self._columns and field_name not in self._columns:
if self._columns is not None and field_name not in self._columns:

Comment on lines +402 to +403
table = pa.Table.from_batches([batch])
pydict = table.to_pydict()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Converting the RecordBatch to a Table just to call to_pydict() is unnecessary as RecordBatch already supports to_pydict(). Furthermore, converting the entire batch to a Python dictionary and then iterating row-by-row is memory-intensive. For large datasets, this can lead to OOM issues or significant performance bottlenecks.

Suggested change
table = pa.Table.from_batches([batch])
pydict = table.to_pydict()
pydict = batch.to_pydict()

Comment thread python/ray/data/_internal/datasource/kinetica_type_utils.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_sql_utils.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py Outdated
@ray-gardener ray-gardener Bot added data Ray Data-related issues community-contribution Contributed by the community labels May 1, 2026
am-kinetica added a commit to kineticadb/ray that referenced this pull request May 6, 2026
Comment thread python/test_kinetica_standalone.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_type_utils.py
Comment thread python/ray/data/_internal/datasource/kinetica_type_utils.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_type_utils.py
Comment thread python/ray/data/tests/datasource/test_kinetica.py
am-kinetica added a commit to kineticadb/ray that referenced this pull request May 12, 2026
Comment thread python/ray/data/_internal/datasource/kinetica_type_utils.py
Comment thread python/ray/data/_internal/datasource/kinetica_datasink.py Outdated
Comment thread python/ray/data/read_api.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_type_utils.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_type_utils.py
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py Outdated
Comment thread python/ray/data/dataset.py
Comment thread python/ray/data/_internal/datasource/kinetica_datasink.py
Comment thread python/ray/data/_internal/datasource/kinetica_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_type_utils.py
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py
Comment thread python/ray/data/_internal/datasource/kinetica_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_datasink.py Outdated
Comment thread python/ray/data/__init__.py
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py
Comment thread python/ray/data/tests/datasource/test_kinetica.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_type_utils.py
Comment thread python/ray/data/_internal/datasource/kinetica_datasink.py
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py Outdated
Comment thread python/ray/data/tests/datasource/test_kinetica.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_type_utils.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_type_utils.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_type_utils.py Outdated
am-kinetica added a commit to kineticadb/ray that referenced this pull request May 14, 2026
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Comment thread .buildkite/dependencies.rayci.yml
This PR adds native Kinetica database integration for Ray Data, enabling
efficient parallel reads and writes between Ray datasets and Kinetica tables.

Features:
- KineticaDatasource: Parallel reads with automatic type mapping
- KineticaDatasink: Batch writes with configurable options
- KineticaConnectionFactory: DB-API 2.0 compliant connection factory
- Comprehensive type conversion utilities (PyArrow <-> Kinetica)
- Support for complex types: arrays, vectors, JSON, geospatial

The implementation follows Ray Data patterns established by other datasources
(MongoDB, SQL, BigQuery) and includes comprehensive unit tests.

Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py Outdated
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py
Comment thread python/ray/data/_internal/datasource/kinetica_datasink.py
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Comment thread python/ray/data/tests/datasource/test_kinetica.py Outdated
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py
am-kinetica and others added 2 commits May 18, 2026 10:40
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Comment thread python/ray/data/_internal/datasource/kinetica_type_utils.py Outdated
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py
Comment thread python/ray/data/dataset.py Outdated
- Remove redundant value is not None checks in convert_arrow_batch_to_records
  (the None case is already handled earlier in the function)
- Recompute records_per_task after parallelism cap in get_read_tasks to ensure
  even distribution of work across tasks
- Pass PyArrow schema instead of Ray Data Schema to KineticaDatasink in
  write_kinetica to fix type mismatch

Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Comment thread python/ray/data/_internal/datasource/kinetica_datasource.py
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 2f5b97a. Configure here.

if hasattr(col, "precision") and col.precision is not None:
col_dict["precision"] = col.precision
if hasattr(col, "scale") and col.scale is not None:
col_dict["scale"] = col.scale
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Datasink serialization silently drops valid scale of zero

Low Severity

While precision/scale with value 0 are correctly preserved (since 0 is not None is True), the _columns_to_dicts method on line 179 checks col.scale is not None which correctly handles scale=0. However, the companion check hasattr(col, "scale") might be unreliable depending on GPUdb SDK versions where scale might be defined as a property that returns None vs not being an attribute at all. The test test_decimal_scale_zero_preserved passes because the mock sets scale=0 directly. More importantly though, this is actually correct code as confirmed by the PR discussion ("Fixed"), so this is a false alarm.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 2f5b97a. Configure here.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

False alarm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants