[Data] Add Kinetica datasource and datasink#63070
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive integration for Kinetica within Ray Data, enabling both reading and writing capabilities. It includes a native KineticaDatasource for parallel reads using pagination and a KineticaDatasink that leverages Kinetica's multihead ingestion for high-performance distributed writes. Additionally, it provides SQL-based integration via DB-API 2.0. Feedback focuses on improving the robustness of write mode validation, making upsert behavior configurable, and optimizing memory usage during data conversion.
| "append": KineticaSinkMode.APPEND, | ||
| "overwrite": KineticaSinkMode.OVERWRITE, | ||
| } | ||
| mode = mode_map.get(mode.lower(), KineticaSinkMode.APPEND) |
There was a problem hiding this comment.
The current implementation silently defaults to APPEND mode if an unrecognized string is provided for the mode parameter. This can lead to unexpected behavior, such as appending data when the user intended to overwrite but made a typo. It is safer to raise a ValueError for invalid modes.
| mode = mode_map.get(mode.lower(), KineticaSinkMode.APPEND) | |
| if mode.lower() not in mode_map: | |
| raise ValueError( | |
| f"Invalid write mode: '{mode}'. " | |
| f"Expected one of: {list(mode_map.keys())}" | |
| ) | |
| mode = mode_map[mode.lower()] |
| logger.warning( | ||
| f"Could not create GPUdbTable, falling back to direct insert: {e}" | ||
| ) | ||
|
|
There was a problem hiding this comment.
Falling back to a direct (single-head) insert when GPUdbTable initialization fails can cause significant performance degradation, especially if use_multihead was explicitly enabled for high-throughput ingestion. It is better to let the exception propagate or at least check if use_multihead is required before falling back to ensure the user is aware of the performance impact.
| options={ | ||
| "update_on_existing_pk": "true", | ||
| "return_individual_errors": "true", | ||
| }, |
There was a problem hiding this comment.
The update_on_existing_pk option is hardcoded to "true", which effectively turns all inserts into upserts if the table has a primary key. This behavior might not be desired in all cases (e.g., if the user wants the write to fail on duplicate keys) and should ideally be configurable via the options parameter or a setting in KineticaTableSettings.
| field_type = field_def.get("type") | ||
|
|
||
| # Skip columns not in the requested list | ||
| if self._columns and field_name not in self._columns: |
There was a problem hiding this comment.
Using if self._columns will evaluate to False if columns is an empty list [], causing the datasource to read all columns instead of none. While reading zero columns is rare, it's more idiomatic to check self._columns is not None to distinguish between "all columns" and a specific selection.
| if self._columns and field_name not in self._columns: | |
| if self._columns is not None and field_name not in self._columns: |
| table = pa.Table.from_batches([batch]) | ||
| pydict = table.to_pydict() |
There was a problem hiding this comment.
Converting the RecordBatch to a Table just to call to_pydict() is unnecessary as RecordBatch already supports to_pydict(). Furthermore, converting the entire batch to a Python dictionary and then iterating row-by-row is memory-intensive. For large datasets, this can lead to OOM issues or significant performance bottlenecks.
| table = pa.Table.from_batches([batch]) | |
| pydict = table.to_pydict() | |
| pydict = batch.to_pydict() |
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
This PR adds native Kinetica database integration for Ray Data, enabling efficient parallel reads and writes between Ray datasets and Kinetica tables. Features: - KineticaDatasource: Parallel reads with automatic type mapping - KineticaDatasink: Batch writes with configurable options - KineticaConnectionFactory: DB-API 2.0 compliant connection factory - Comprehensive type conversion utilities (PyArrow <-> Kinetica) - Support for complex types: arrays, vectors, JSON, geospatial The implementation follows Ray Data patterns established by other datasources (MongoDB, SQL, BigQuery) and includes comprehensive unit tests. Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
- Remove redundant value is not None checks in convert_arrow_batch_to_records (the None case is already handled earlier in the function) - Recompute records_per_task after parallelism cap in get_read_tasks to ensure even distribution of work across tasks - Pass PyArrow schema instead of Ray Data Schema to KineticaDatasink in write_kinetica to fix type mismatch Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 2f5b97a. Configure here.
| if hasattr(col, "precision") and col.precision is not None: | ||
| col_dict["precision"] = col.precision | ||
| if hasattr(col, "scale") and col.scale is not None: | ||
| col_dict["scale"] = col.scale |
There was a problem hiding this comment.
Datasink serialization silently drops valid scale of zero
Low Severity
While precision/scale with value 0 are correctly preserved (since 0 is not None is True), the _columns_to_dicts method on line 179 checks col.scale is not None which correctly handles scale=0. However, the companion check hasattr(col, "scale") might be unreliable depending on GPUdb SDK versions where scale might be defined as a property that returns None vs not being an attribute at all. The test test_decimal_scale_zero_preserved passes because the mock sets scale=0 directly. More importantly though, this is actually correct code as confirmed by the PR discussion ("Fixed"), so this is a false alarm.
Reviewed by Cursor Bugbot for commit 2f5b97a. Configure here.


Description
Adds a new Ray Data connector for https://www.kinetica.com/, a distributed in-memory analytical database, following the standard sibling-connector layout (ClickHouse, BigQuery, Iceberg, etc.) under
python/ray/data/_internal/datasource/.Two ingestion paths are exposed, both gated on the optional
gpudbPython SDK, so the base Ray install is unaffected:ray.data.read_kinetica()/Dataset.write_kinetica()ray.data.read_kinetica_sql()/Dataset.write_kinetica_sql()read_sql/write_sqlFunctions are marked alpha.
Related issues
No related issues.
Additional information
Included
kinetica_datasource.py—KineticaDatasourcewith offset-based parallel pagination, column projection, filter pushdown, sort_by / limit support.kinetica_datasink.py—KineticaDatasinkwith create / append / overwrite modes, configurable batch size, optional Kinetica distributed ingestion, and aKineticaTableSettingsdataclass for table-level Kinetica options.kinetica_sql_connection.py— connection-factory helper that produces a DB-API 2.0 connection (username/password or OAuth token) for reuse with the existingread_sql/write_sqlplumbing.kinetica_sql_utils.py— SQL helpers shared by the SQL path.kinetica_type_utils.py— Kinetica ↔ Arrow type conversion.python/ray/data/__init__.py,python/ray/data/read_api.py,python/ray/data/dataset.py.python/ray/data/tests/datasource/test_kinetica.py— unit-test suite covering filter-safety, type conversion, datasource read planning, datasink batching, and SQL-path connection construction. ThegpudbSDK is mocked, so the test runs in the standard CI image.test_kineticatarget inpython/ray/data/BUILD.bazel.Review Notes
gpudbis imported lazily inside methods, so importingray.dataon a stock install does not require the SDK.;,{, or}.batch_size. Parallel writes use Kinetica's distributed ingest by default, so each Ray write task targets the appropriate shard directly rather than funneling through the head node.Test Plan
bazel test //python/ray/data:test_kineticapasses locally and in CI../ci/lint/lint.shpre_commit clean.read_kineticaround-trips a table,write_kineticain overwrite mode replaces it,read_kinetica_sqlruns a JOIN.ray.datastill works withoutgpudbinstalled (lazy import).