From f8d408d4618a365737cf7516075212933b3202c3 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Thu, 14 May 2026 18:12:52 +0530 Subject: [PATCH 1/9] [Data] Add Kinetica database source and sink for Ray Data This PR adds native Kinetica database integration for Ray Data, enabling efficient parallel reads and writes between Ray datasets and Kinetica tables. Features: - KineticaDatasource: Parallel reads with automatic type mapping - KineticaDatasink: Batch writes with configurable options - KineticaConnectionFactory: DB-API 2.0 compliant connection factory - Comprehensive type conversion utilities (PyArrow <-> Kinetica) - Support for complex types: arrays, vectors, JSON, geospatial The implementation follows Ray Data patterns established by other datasources (MongoDB, SQL, BigQuery) and includes comprehensive unit tests. Signed-off-by: anindyam1969 --- python/ray/data/__init__.py | 4 + .../_internal/datasource/kinetica_datasink.py | 698 ++++++ .../datasource/kinetica_datasource.py | 666 ++++++ .../datasource/kinetica_sql_connection.py | 121 ++ .../datasource/kinetica_type_utils.py | 683 ++++++ python/ray/data/dataset.py | 153 ++ .../data/tests/datasource/test_kinetica.py | 1930 +++++++++++++++++ 7 files changed, 4255 insertions(+) create mode 100644 python/ray/data/_internal/datasource/kinetica_datasink.py create mode 100644 python/ray/data/_internal/datasource/kinetica_datasource.py create mode 100644 python/ray/data/_internal/datasource/kinetica_sql_connection.py create mode 100644 python/ray/data/_internal/datasource/kinetica_type_utils.py create mode 100644 python/ray/data/tests/datasource/test_kinetica.py diff --git a/python/ray/data/__init__.py b/python/ray/data/__init__.py index f58c59873284..de849386e822 100644 --- a/python/ray/data/__init__.py +++ b/python/ray/data/__init__.py @@ -70,6 +70,8 @@ read_iceberg, read_images, read_json, + read_kinetica, + read_kinetica_sql, read_lance, read_mcap, read_mongo, @@ -180,6 +182,8 @@ "read_iceberg", "read_images", "read_json", + "read_kinetica", + "read_kinetica_sql", "read_lance", "read_mcap", "read_numpy", diff --git a/python/ray/data/_internal/datasource/kinetica_datasink.py b/python/ray/data/_internal/datasource/kinetica_datasink.py new file mode 100644 index 000000000000..f83977fe2892 --- /dev/null +++ b/python/ray/data/_internal/datasource/kinetica_datasink.py @@ -0,0 +1,698 @@ +""" +Kinetica Datasink for Ray Data. + +This module provides a Ray Data Datasink implementation for writing data +to Kinetica databases. +""" + +import logging +from dataclasses import dataclass, field +from enum import IntEnum +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Union + +from ray.data._internal.execution.interfaces import TaskContext +from ray.data._internal.util import _check_import +from ray.data.block import Block, BlockAccessor +from ray.data.datasource.datasink import Datasink + +if TYPE_CHECKING: + import pyarrow as pa + + +logger = logging.getLogger(__name__) + + +class KineticaSinkMode(IntEnum): + """Defines how the datasink should handle existing tables.""" + + CREATE = 1 + """Create a new table. Fails if the table already exists.""" + + APPEND = 2 + """Append to an existing table, or create if it doesn't exist.""" + + OVERWRITE = 3 + """Drop and recreate the table if it exists.""" + + +@dataclass +class KineticaTableSettings: + """Settings for Kinetica table creation.""" + + is_replicated: bool = False + """If True, creates a replicated table (data copied to all nodes).""" + + chunk_size: int = 8000000 + """Number of records per chunk for data storage.""" + + ttl: int = -1 + """Time-to-live in minutes. -1 means no expiration.""" + + primary_keys: List[str] = field(default_factory=list) + """List of column names to use as primary key.""" + + shard_keys: List[str] = field(default_factory=list) + """List of column names to use as shard key.""" + + persist: bool = True + """If True, the table will be persisted to disk.""" + + collection_name: Optional[str] = None + """Optional schema/collection name for the table.""" + + update_on_existing_pk: bool = True + """If True, updates existing records with matching primary keys. + If False, insert fails on PK conflict.""" + + +class KineticaDatasink(Datasink): + """ + A Ray Data Datasink for writing to Kinetica databases. + + This datasink supports distributed writes using Kinetica's multihead + ingestion capabilities for optimal performance. + + Example: + >>> import ray # doctest: +SKIP + >>> ds = ray.data.from_items([{"id": 1}]) # doctest: +SKIP + >>> ds.write_datasink( # doctest: +SKIP + ... KineticaDatasink( + ... url="http://localhost:9191", + ... table_name="my_table", + ... username="admin", + ... password="password", + ... ) + ... ) + """ + + def __init__( + self, + url: str, + table_name: str, + username: Optional[str] = None, + password: Optional[str] = None, + mode: Union[KineticaSinkMode, str] = KineticaSinkMode.APPEND, + schema: Optional["pa.Schema"] = None, + table_settings: Optional[KineticaTableSettings] = None, + batch_size: int = 10000, + use_multihead: bool = True, + options: Optional[Dict[str, Any]] = None, + ): + """ + Initialize the Kinetica datasink. + + Args: + url: URL of the Kinetica server (e.g., "http://localhost:9191"). + table_name: Name of the target table. + username: Username for authentication. + password: Password for authentication. + mode: How to handle existing tables ("create", "append", "overwrite"). + schema: Optional PyArrow schema for table creation. + table_settings: Optional table configuration settings. + batch_size: Number of records per batch for ingestion. + use_multihead: Whether to use multihead ingestion for parallelism. + options: Additional GPUdb client options. + """ + super().__init__() + _check_import(self, module="gpudb", package="gpudb") + + self._url = url + self._table_name = table_name + self._username = username + self._password = password + + # Handle string mode values + if isinstance(mode, str): + mode_map = { + "create": KineticaSinkMode.CREATE, + "append": KineticaSinkMode.APPEND, + "overwrite": KineticaSinkMode.OVERWRITE, + } + mode_lower = mode.lower() + if mode_lower not in mode_map: + raise ValueError( + f"Invalid mode '{mode}'. " + "Must be one of: 'create', 'append', 'overwrite'" + ) + mode = mode_map[mode_lower] + + self._mode = mode + self._schema = schema + self._table_settings = table_settings or KineticaTableSettings() + self._batch_size = batch_size + self._use_multihead = use_multihead + self._options = options or {} + + # Serializable column definitions + self._column_defs: Optional[List[Dict[str, Any]]] = None + self._schema_string: Optional[str] = None + self._column_properties: Optional[Dict[str, List[str]]] = None + self._table_ready: bool = False + + def _init_client(self): + """Create and return a GPUdb client instance.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + create_gpudb_client, + ) + + return create_gpudb_client( + url=self._url, + username=self._username, + password=self._password, + options=self._options, + ) + + def _columns_to_dicts(self, columns) -> List[Dict[str, Any]]: + """Convert GPUdbRecordColumn objects to serializable dicts.""" + result = [] + for col in columns: + col_dict = { + "name": col.name, + "column_type": col.column_type, + "column_properties": col.column_properties, + "is_nullable": col.is_nullable, + } + # Preserve precision and scale for decimal columns + if hasattr(col, "precision") and col.precision is not None: + col_dict["precision"] = col.precision + if hasattr(col, "scale") and col.scale is not None: + col_dict["scale"] = col.scale + result.append(col_dict) + return result + + def _dicts_to_columns(self, dicts: List[Dict[str, Any]]): + """Convert serializable dicts back to GPUdbRecordColumn objects.""" + from gpudb import GPUdbRecordColumn + + result = [] + for d in dicts: + col_kwargs = { + "name": d["name"], + "column_type": d["column_type"], + "column_properties": d["column_properties"], + "is_nullable": d["is_nullable"], + } + # Restore precision and scale for decimal columns if present + if "precision" in d: + col_kwargs["precision"] = d["precision"] + if "scale" in d: + col_kwargs["scale"] = d["scale"] + result.append(GPUdbRecordColumn(**col_kwargs)) + return result + + def _table_exists(self, client) -> bool: + """Check if the target table exists.""" + from gpudb import GPUdbException + + try: + response = client.has_table(table_name=self._table_name) + return response.get("table_exists", False) + except GPUdbException: + return False + + def _drop_table(self, client: Any) -> None: + """Drop the target table if it exists. + + Uses the 'no_error_if_not_exists' option to avoid exceptions when the + table doesn't exist, making this operation idempotent. + + Args: + client: GPUdb client instance. + + Raises: + GPUdbException: If the table exists but cannot be dropped + (e.g., due to permissions). + """ + from gpudb import GPUdbException + + try: + client.clear_table( + table_name=self._table_name, + options={"no_error_if_not_exists": "true"}, + ) + logger.info(f"Dropped table '{self._table_name}'") + except GPUdbException as e: + # Real error (e.g., permissions) - propagate it + raise GPUdbException( + f"Failed to drop table '{self._table_name}' in OVERWRITE mode: {e}" + ) from e + + def _create_table(self, client, columns): + """Create the target table with the given column definitions.""" + from gpudb import GPUdbRecordType + + record_type = GPUdbRecordType( + columns=columns, + label=self._table_name, + ) + + options = {} + + if self._table_settings.is_replicated: + options["is_replicated"] = "true" + + if self._table_settings.chunk_size is not None: + options["chunk_size"] = str(self._table_settings.chunk_size) + + if self._table_settings.ttl >= 0: + options["ttl"] = str(self._table_settings.ttl) + + if not self._table_settings.persist: + options["no_persist"] = "true" + + if self._table_settings.collection_name: + options["collection_name"] = self._table_settings.collection_name + + type_id = record_type.create_type(client) + + client.create_table( + table_name=self._table_name, + type_id=type_id, + options=options, + ) + + logger.info(f"Created table '{self._table_name}' with type_id '{type_id}'") + + self._schema_string = record_type.schema_string + self._column_properties = record_type.column_properties + + return record_type + + def _get_existing_record_type(self, client): + """Get the record type for an existing table.""" + from gpudb import GPUdbException, GPUdbRecordType + + response = client.show_table( + table_name=self._table_name, + options={"get_column_info": "true"}, + ) + + type_ids = response.get("type_ids", []) + type_schemas = response.get("type_schemas", []) + properties_list = response.get("properties", [{}]) + + if not type_ids or not type_schemas: + raise GPUdbException( + f"Could not retrieve type information for table '{self._table_name}'" + ) + + type_schema = type_schemas[0] + props_dict = properties_list[0] if properties_list else {} + + self._schema_string = type_schema + self._column_properties = props_dict + + record_type = GPUdbRecordType( + schema_string=type_schema, + column_properties=props_dict, + ) + + return record_type + + def on_write_start(self, schema: Optional["pa.Schema"] = None) -> None: + """ + Called before writing begins. + + Args: + schema: Optional PyArrow schema passed by Ray Data framework. + """ + from gpudb import GPUdbException + + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_schema_to_kinetica_columns, + ) + + if schema is not None and self._schema is None: + self._schema = schema + + client = self._init_client() + + table_exists = self._table_exists(client) + + if self._mode == KineticaSinkMode.CREATE: + if table_exists: + raise GPUdbException( + f"Table '{self._table_name}' already exists. " + "Use mode='append' or mode='overwrite'." + ) + if self._schema is None: + raise GPUdbException("Schema must be provided when using mode='create'") + columns = arrow_schema_to_kinetica_columns( + self._schema, + primary_keys=self._table_settings.primary_keys, + shard_keys=self._table_settings.shard_keys, + ) + self._create_table(client, columns) + self._column_defs = self._columns_to_dicts(columns) + self._table_ready = True + + elif self._mode == KineticaSinkMode.OVERWRITE: + # Validate schema BEFORE dropping table to avoid irreversible data loss + if self._schema is None: + raise GPUdbException( + "Schema must be provided when using mode='overwrite'" + ) + if table_exists: + self._drop_table(client) + columns = arrow_schema_to_kinetica_columns( + self._schema, + primary_keys=self._table_settings.primary_keys, + shard_keys=self._table_settings.shard_keys, + ) + self._create_table(client, columns) + self._column_defs = self._columns_to_dicts(columns) + self._table_ready = True + + elif self._mode == KineticaSinkMode.APPEND: + if table_exists: + record_type = self._get_existing_record_type(client) + self._column_defs = self._columns_to_dicts(record_type.columns) + self._table_ready = True + elif self._schema is not None: + columns = arrow_schema_to_kinetica_columns( + self._schema, + primary_keys=self._table_settings.primary_keys, + shard_keys=self._table_settings.shard_keys, + ) + self._create_table(client, columns) + self._column_defs = self._columns_to_dicts(columns) + self._table_ready = True + else: + logger.info( + f"Table '{self._table_name}' does not exist and no " + "schema provided. Schema will be inferred from first " + "data block." + ) + + def _get_record_type(self): + """Reconstruct GPUdbRecordType from stored schema info.""" + from gpudb import GPUdbRecordType + + if self._schema_string is not None: + return GPUdbRecordType( + schema_string=self._schema_string, + column_properties=self._column_properties, + ) + elif self._column_defs is not None: + columns = self._dicts_to_columns(self._column_defs) + return GPUdbRecordType(columns=columns, label=self._table_name) + return None + + def _create_gpudb_table(self, client): + """Create a GPUdbTable instance for writing records.""" + from gpudb import GPUdbException, GPUdbTable, GPUdbTableOptions + + record_type = self._get_record_type() + + if record_type is None: + raise GPUdbException( + "Cannot create GPUdbTable: no schema information available" + ) + + table_options = GPUdbTableOptions() + + if self._table_settings.is_replicated: + table_options.is_replicated = True + + if self._table_settings.chunk_size is not None: + table_options.chunk_size = self._table_settings.chunk_size + + if self._table_settings.ttl >= 0: + table_options.ttl = self._table_settings.ttl + + if not self._table_settings.persist: + table_options.no_persist = True + + if self._table_settings.collection_name: + table_options.collection_name = self._table_settings.collection_name + + gpudb_table = GPUdbTable( + _type=record_type, + name=self._table_name, + db=client, + options=table_options, + use_multihead_io=self._use_multihead, + multihead_ingest_batch_size=self._batch_size, + # Buffer records instead of flushing per insertion. + # This allows the error check to prevent committing partial data + # when errors occur. flush_data_to_server() is called explicitly + # after confirming no errors. + flush_multi_head_ingest_per_insertion=False, + ) + + return gpudb_table + + def _try_create_gpudb_table(self, client: Any): + """Try to create GPUdbTable, handling errors based on multihead setting. + + Args: + client: GPUdb client instance. + + Returns: + GPUdbTable instance if successful, None if failed and multihead + is not required. + + Raises: + RuntimeError: If multihead is required but GPUdbTable creation fails. + """ + try: + return self._create_gpudb_table(client) + except Exception as e: + if self._use_multihead: + raise RuntimeError( + "Failed to create GPUdbTable for multihead ingest. " + "Multihead ingestion was explicitly requested but " + "could not be initialized. Set use_multihead=False " + f"to use single-head insert instead. Error: {e}" + ) from e + else: + logger.warning( + "Could not create GPUdbTable, falling back to " + f"direct insert: {e}" + ) + return None + + def write( + self, + blocks: Iterable[Block], + ctx: TaskContext, + ) -> Any: + """ + Write blocks of data to Kinetica. + + Args: + blocks: Iterable of data blocks to write. + ctx: Ray task context. + + Returns: + Write statistics. + """ + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_schema_to_kinetica_columns, + convert_arrow_batch_to_records, + ) + + client = self._init_client() + total_inserted = 0 + total_updated = 0 + errors = [] + + kinetica_columns = None + if self._column_defs is not None: + kinetica_columns = self._dicts_to_columns(self._column_defs) + + gpudb_table = None + if self._schema_string is not None or self._column_defs is not None: + gpudb_table = self._try_create_gpudb_table(client) + + for block in blocks: + accessor = BlockAccessor.for_block(block) + arrow_table = accessor.to_arrow() + + if kinetica_columns is None: + # Deferred table creation - infer schema from first data block. + # Note: supports_distributed_writes returns False for this path, + # so only a single worker runs - no race condition handling needed. + kinetica_columns = arrow_schema_to_kinetica_columns( + arrow_table.schema, + primary_keys=self._table_settings.primary_keys, + shard_keys=self._table_settings.shard_keys, + ) + + # Create the table with inferred schema. + # _create_table sets _schema_string and _column_properties which + # are used by _get_record_type (called from _create_gpudb_table). + self._create_table(client, kinetica_columns) + + gpudb_table = self._try_create_gpudb_table(client) + + for batch in arrow_table.to_batches(): + records = convert_arrow_batch_to_records( + batch, + kinetica_columns, + ) + + if gpudb_table is not None: + # With buffered mode (flush_multi_head_ingest_per_insertion=False), + # records are buffered and counts are only meaningful after flush. + # We only track errors here; counts are captured after final flush. + batch_errors = self._write_with_gpudb_table(gpudb_table, records) + else: + # Simple path returns actual counts per batch + inserted, updated, batch_errors = self._write_simple( + client, records + ) + total_inserted += inserted + total_updated += updated + + errors.extend(batch_errors) + + # Check for errors collected during insert_records calls. + # Note: With flush_multi_head_ingest_per_insertion=False, most errors + # will only surface during flush_data_to_server(), not here. + if errors: + error_summary = "; ".join(errors[:5]) # Show first 5 errors + if len(errors) > 5: + error_summary += f" ... and {len(errors) - 5} more errors" + raise RuntimeError( + f"Failed to write {len(errors)} record batch(es) to Kinetica " + f"table '{self._table_name}': {error_summary}" + ) + + # Flush buffered records and handle any errors that surface during flush. + # With flush_multi_head_ingest_per_insertion=False, per-record errors + # are only detected here, not during insert_records calls. + if gpudb_table is not None: + from gpudb import GPUdbException + + try: + gpudb_table.flush_data_to_server() + except GPUdbException as e: + raise RuntimeError( + f"Failed to flush records to Kinetica table " + f"'{self._table_name}': {e}" + ) from e + + # Get total counts from GPUdbTable after all records are flushed. + # These represent all records written through this table instance. + total_inserted = gpudb_table.total_inserted + total_updated = gpudb_table.total_updated + + return { + "num_inserted": total_inserted, + "num_updated": total_updated, + } + + def _write_with_gpudb_table( + self, gpudb_table: Any, records: List[Dict[str, Any]] + ) -> List[str]: + """Write records using GPUdbTable. + + With flush_multi_head_ingest_per_insertion=False, records are buffered + and actual insert/update counts are only available after + flush_data_to_server(). This method only returns errors; counts are + retrieved from gpudb_table.total_inserted and gpudb_table.total_updated + after the final flush. + + Args: + gpudb_table: GPUdbTable instance for multihead ingestion. + records: List of record dictionaries to write. + + Returns: + List of error messages (empty if successful). + """ + from gpudb import GPUdbException + + errors = [] + + try: + gpudb_table.insert_records( + records, + options={ + "update_on_existing_pk": "true" + if self._table_settings.update_on_existing_pk + else "false", + "return_individual_errors": "true", + }, + ) + except GPUdbException as e: + errors.append(str(e)) + logger.error(f"Error inserting records via GPUdbTable: {e}") + + return errors + + def _write_simple(self, client, records: List[Dict[str, Any]]) -> tuple: + """Write records using simple insert_records API with JSON encoding.""" + import base64 + import json + + from gpudb import GPUdbException + + def json_serializer(obj): + """Custom JSON serializer for bytes (vector columns).""" + if isinstance(obj, bytes): + return base64.b64encode(obj).decode("ascii") + raise TypeError(f"Object of type {type(obj)} is not JSON serializable") + + inserted = 0 + updated = 0 + errors = [] + + for i in range(0, len(records), self._batch_size): + batch = records[i : i + self._batch_size] + + try: + json_records = [json.dumps(r, default=json_serializer) for r in batch] + + response = client.insert_records( + table_name=self._table_name, + data=json_records, + list_encoding="json", + options={ + "update_on_existing_pk": "true" + if self._table_settings.update_on_existing_pk + else "false", + "return_individual_errors": "true", + }, + ) + + inserted += response.get("count_inserted", 0) + updated += response.get("count_updated", 0) + + response_errors = response.get("info", {}).get("errors") + if response_errors: + # Handle both list and string error formats from the API + if isinstance(response_errors, list): + errors.extend(response_errors) + else: + # Single error string - append as single item + errors.append(str(response_errors)) + + except GPUdbException as e: + errors.append(str(e)) + logger.error(f"Error inserting records via simple insert: {e}") + + return inserted, updated, errors + + @property + def supports_distributed_writes(self) -> bool: + """Whether this datasink supports distributed writes. + + Returns False when table creation is deferred (APPEND mode without + schema and table doesn't exist) to prevent race conditions where + multiple workers try to create the same table simultaneously. + """ + return self._table_ready + + def get_name(self) -> str: + """Return the name of this datasink.""" + return f"Kinetica({self._table_name})" + + @property + def min_rows_per_write(self) -> Optional[int]: + """Target minimum number of rows per write task. + + This overrides the base class property to hint to Ray Data + how many rows should be bundled together for each write call. + """ + return self._batch_size diff --git a/python/ray/data/_internal/datasource/kinetica_datasource.py b/python/ray/data/_internal/datasource/kinetica_datasource.py new file mode 100644 index 000000000000..1275248604c2 --- /dev/null +++ b/python/ray/data/_internal/datasource/kinetica_datasource.py @@ -0,0 +1,666 @@ +""" +Kinetica Datasource for Ray Data. + +This module provides a Ray Data Datasource implementation for reading data +from Kinetica databases using the GPUdbTable class for idiomatic API usage. +""" + +import logging +import re +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional + +from ray.data._internal.util import _check_import +from ray.data.block import BlockMetadata +from ray.data.datasource.datasource import Datasource, ReadTask + +if TYPE_CHECKING: + import pyarrow as pa + + from ray.data.context import DataContext + + +logger = logging.getLogger(__name__) + + +# Patterns that are potentially dangerous in filter expressions +# Note: This is a defense-in-depth measure. Kinetica's expression parser +# provides the primary protection, but we block obvious injection attempts. +_UNSAFE_FILTER_PATTERNS = [ + re.compile(r"[;{}]"), # Statement terminators and code blocks + re.compile(r"--"), # SQL comment sequences + re.compile(r"/\*"), # Block comment start + re.compile(r"\*/"), # Block comment end + re.compile(r"\bUNION\b", re.IGNORECASE), # UNION-based injection + re.compile(r"\bINSERT\b", re.IGNORECASE), # INSERT statements + re.compile(r"\bUPDATE\b", re.IGNORECASE), # UPDATE statements + re.compile(r"\bDELETE\b", re.IGNORECASE), # DELETE statements + re.compile(r"\bDROP\b", re.IGNORECASE), # DROP statements + re.compile(r"\bCREATE\b", re.IGNORECASE), # CREATE statements + re.compile(r"\bALTER\b", re.IGNORECASE), # ALTER statements + re.compile(r"\bTRUNCATE\b", re.IGNORECASE), # TRUNCATE statements + re.compile(r"\bEXEC\b", re.IGNORECASE), # EXEC/EXECUTE + re.compile(r"\bEXECUTE\b", re.IGNORECASE), + re.compile(r"\bINTO\s+OUTFILE\b", re.IGNORECASE), # File operations + re.compile(r"\bLOAD_FILE\b", re.IGNORECASE), +] + + +def _has_balanced_quotes(expr: str) -> bool: + """ + Check if string literals in the expression are properly balanced. + + This detects unclosed string literals which could be used to bypass + the string stripping logic in filter safety checks. + + Args: + expr: The expression to check. + + Returns: + True if quotes are balanced, False if there are unclosed strings. + """ + in_single = False + in_double = False + i = 0 + while i < len(expr): + char = expr[i] + if not in_double and char == "'": + # Check for escaped quote '' + if i + 1 < len(expr) and expr[i + 1] == "'": + i += 2 # Skip escaped quote + continue + in_single = not in_single + elif not in_single and char == '"': + # Check for escaped quote "" + if i + 1 < len(expr) and expr[i + 1] == '"': + i += 2 # Skip escaped quote + continue + in_double = not in_double + i += 1 + return not in_single and not in_double + + +def _strip_string_literals(expr: str) -> str: + """ + Remove content inside string literals to avoid false positives. + + This handles single-quoted strings (SQL standard) and double-quoted strings. + Escaped quotes within strings are handled ('' for single, "" for double). + + Args: + expr: The expression to process. + + Returns: + The expression with string literal contents replaced by empty strings. + """ + # Replace single-quoted strings (handling escaped quotes '') + expr = re.sub(r"'(?:[^']|'')*'", "''", expr) + # Replace double-quoted strings (handling escaped quotes "") + expr = re.sub(r'"(?:[^"]|"")*"', '""', expr) + return expr + + +def _is_filter_safe(filter_expr: str) -> bool: + """ + Check if a filter expression is safe from SQL injection. + + This function provides defense-in-depth validation against common SQL + injection patterns. It blocks: + - Unbalanced quotes (unclosed string literals) + - Statement terminators (;) and code blocks ({}) + - SQL comments (-- and /* */) + - DDL/DML keywords (UNION, INSERT, UPDATE, DELETE, DROP, etc.) + - File operation functions + + String literals are stripped before checking, so legitimate values like + "city = 'Union City'" won't trigger false positives for the UNION keyword. + + Note: Kinetica's expression parser provides the primary security layer. + This validation is an additional safeguard against obvious attacks. + + Args: + filter_expr: The filter expression to check. + + Returns: + True if the filter appears safe, False otherwise. + """ + if not filter_expr: + return True + + # First check for unbalanced quotes - unclosed strings could bypass stripping + if not _has_balanced_quotes(filter_expr): + return False + + # Strip string literals to avoid false positives on keywords in data values + # e.g., city = 'Union City' should not trigger on UNION + expr_without_strings = _strip_string_literals(filter_expr) + + for pattern in _UNSAFE_FILTER_PATTERNS: + if pattern.search(expr_without_strings): + return False + + return True + + +class KineticaDatasource(Datasource): + """ + A Ray Data Datasource for reading from Kinetica databases. + + This datasource uses GPUdbTable for reading data, providing automatic + type handling, schema management, and optional multihead I/O support. + It supports parallel reads by partitioning data across multiple read + tasks using offset-based pagination. + + Example: + >>> import ray # doctest: +SKIP + >>> ds = ray.data.read_datasource( # doctest: +SKIP + ... KineticaDatasource( + ... url="http://localhost:9191", + ... table_name="my_table", + ... username="admin", + ... password="password", + ... ) + ... ) + """ + + # Default batch size for pagination (10,000 records per request) + DEFAULT_BATCH_SIZE = 10000 + + def __init__( + self, + url: str, + table_name: str, + username: Optional[str] = None, + password: Optional[str] = None, + columns: Optional[List[str]] = None, + filter_expression: Optional[str] = None, + sort_by: Optional[str] = None, + sort_order: str = "ascending", + limit: Optional[int] = None, + batch_size: Optional[int] = None, + use_multihead_io: bool = False, + convert_special_types: bool = True, + options: Optional[Dict[str, Any]] = None, + ): + """ + Initialize the Kinetica datasource. + + Args: + url: URL of the Kinetica server (e.g., "http://localhost:9191"). + table_name: Name of the table to read from. + username: Username for authentication. + password: Password for authentication. + columns: Optional list of column names to read. If None, reads all + columns. + filter_expression: Optional SQL WHERE clause filter (without the + WHERE keyword). + sort_by: Optional column name to sort results by. + sort_order: Sort order, either "ascending" or "descending". + limit: Optional maximum number of records to read. + batch_size: Number of records to fetch per request for pagination. + Defaults to 10,000. + use_multihead_io: If True, enables multihead I/O for parallel reads + from multiple Kinetica nodes. Can improve performance for large + datasets on clustered deployments. + convert_special_types: If True, converts special types (arrays, JSON) + on retrieval. Defaults to True. + options: Additional GPUdb client options. + """ + super().__init__() + _check_import(self, module="gpudb", package="gpudb") + + self._url = url + self._table_name = table_name + self._username = username + self._password = password + self._filter_expression = filter_expression + self._sort_by = sort_by + self._limit = limit + if batch_size is not None: + if batch_size <= 0: + raise ValueError( + f"batch_size must be a positive integer, got {batch_size}" + ) + self._batch_size = batch_size + else: + self._batch_size = self.DEFAULT_BATCH_SIZE + self._use_multihead_io = use_multihead_io + self._convert_special_types = convert_special_types + self._options = options or {} + + # Validate columns - empty list is likely a mistake + # None means "all columns", non-empty list means "specific columns" + if columns is not None and len(columns) == 0: + raise ValueError( + "columns cannot be an empty list. " + "Use None to select all columns, or provide a non-empty list " + "of column names to select specific columns." + ) + self._columns = columns + + # Validate sort_order + valid_sort_orders = ("ascending", "descending") + if sort_order not in valid_sort_orders: + raise ValueError( + f"Invalid sort_order '{sort_order}'. " + f"Must be one of: {', '.join(valid_sort_orders)}" + ) + self._sort_order = sort_order + + # Validate filter expression + if filter_expression and not _is_filter_safe(filter_expression): + raise ValueError( + "Filter expression contains potentially unsafe patterns and was " + "rejected. Please review your filter for SQL injection patterns " + "such as statement terminators, comments, or DDL/DML keywords." + ) + + # Cache for schema and record count + self._arrow_schema: Optional["pa.Schema"] = None + self._total_count: Optional[int] = None + + def _init_client(self): + """Create and return a GPUdb client instance.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + create_gpudb_client, + ) + + return create_gpudb_client( + url=self._url, + username=self._username, + password=self._password, + options=self._options, + ) + + def _create_gpudb_table(self, client: Any): + """ + Create a GPUdbTable instance for reading records. + + Args: + client: GPUdb client instance. + + Returns: + GPUdbTable instance configured for reading. + """ + from gpudb import GPUdbTable + + # Create GPUdbTable for an existing table (pass None for _type) + gpudb_table = GPUdbTable( + _type=None, + name=self._table_name, + db=client, + use_multihead_io=self._use_multihead_io, + convert_special_types_on_retrieval=self._convert_special_types, + ) + + return gpudb_table + + def _get_table_info(self, client: Any) -> tuple: + """ + Get table schema and record count using GPUdbTable. + + Args: + client: GPUdb client instance. + + Returns: + Tuple of (total_record_count, arrow_schema) + """ + from gpudb import GPUdbException + + from ray.data._internal.datasource.kinetica_type_utils import ( + kinetica_type_to_arrow_schema, + ) + + # Create GPUdbTable to get schema information + gpudb_table = self._create_gpudb_table(client) + + # Get record type from GPUdbTable + record_type = gpudb_table.gpudbrecord_type + + if record_type is None: + raise GPUdbException( + f"Could not retrieve schema for table '{self._table_name}'" + ) + + # If columns are specified, filter the schema + if self._columns: + from gpudb import GPUdbRecordType + + # Get actual column names from the table + actual_column_names = {col.name for col in record_type.columns} + requested_columns = set(self._columns) + + # Find columns that don't exist in the table + invalid_columns = requested_columns - actual_column_names + if invalid_columns: + raise ValueError( + f"The following columns were requested but do not exist in " + f"table '{self._table_name}': {sorted(invalid_columns)}. " + f"Available columns are: {sorted(actual_column_names)}" + ) + + # Build column map for lookup + column_map = {col.name: col for col in record_type.columns} + + # Preserve user-specified column order + filtered_columns = [column_map[name] for name in self._columns] + + # Create a filtered record type for Arrow schema conversion + record_type = GPUdbRecordType( + columns=filtered_columns, label=self._table_name + ) + + # Get total count (accounting for filter) + if self._filter_expression: + # Use get_records with limit=0 to get filtered count + count_response = client.get_records( + table_name=self._table_name, + offset=0, + limit=0, + options={"expression": self._filter_expression}, + ) + total_count = count_response.get("total_number_of_records", 0) + else: + # Get count from show_table for unfiltered reads + response = client.show_table( + table_name=self._table_name, + options={"get_sizes": "true"}, + ) + total_count = response.get("total_size", 0) + if isinstance(total_count, list): + total_count = total_count[0] if total_count else 0 + + # Apply limit if specified + if self._limit is not None: + total_count = min(total_count, self._limit) + + # Convert to Arrow schema + arrow_schema = kinetica_type_to_arrow_schema(record_type) + + return total_count, arrow_schema + + def _estimate_row_size(self, client: Any, sample_size: int = 100) -> int: + """ + Estimate the average row size by sampling using GPUdbTable. + + Args: + client: GPUdb client instance. + sample_size: Number of rows to sample. + + Returns: + Estimated average row size in bytes. + """ + try: + gpudb_table = self._create_gpudb_table(client) + + # Build options for the query + options = {} + if self._filter_expression: + options["expression"] = self._filter_expression + if self._sort_by: + options["sort_by"] = self._sort_by + options["sort_order"] = self._sort_order + + # Use get_records_by_column if specific columns requested + if self._columns: + records = gpudb_table.get_records_by_column( + column_names=self._columns, + offset=0, + limit=sample_size, + options=options, + get_column_major=False, # Get row-major for size estimation + force_primitive_return_types=True, + ) + else: + records = gpudb_table.get_records( + offset=0, + limit=sample_size, + options=options, + force_primitive_return_types=True, + ) + + if not records: + return 256 + + # Estimate size based on string representation + total_size = sum(len(str(dict(r))) for r in records) + return total_size // len(records) + + except Exception: + return 256 + + def _create_read_fn( + self, + offset: int, + limit: int, + ) -> Callable[[], Iterable["pa.Table"]]: + """ + Create a read function for a specific data partition using GPUdbTable. + + Args: + offset: Starting offset for this partition. + limit: Number of records to read. + + Returns: + A callable that returns an iterable of Arrow tables. + """ + # Capture instance variables for the closure + url = self._url + username = self._username + password = self._password + table_name = self._table_name + columns = self._columns + filter_expression = self._filter_expression + sort_by = self._sort_by + sort_order = self._sort_order + client_options = self._options + arrow_schema = self._arrow_schema + batch_size = self._batch_size + use_multihead_io = self._use_multihead_io + convert_special_types = self._convert_special_types + + def read_fn() -> Iterable["pa.Table"]: + import pyarrow as pa + from gpudb import GPUdbTable + + from ray.data._internal.datasource.kinetica_type_utils import ( + create_gpudb_client, + ) + + # Create client in the worker + client = create_gpudb_client( + url=url, + username=username, + password=password, + options=client_options, + ) + + # Create GPUdbTable for reading + gpudb_table = GPUdbTable( + _type=None, + name=table_name, + db=client, + use_multihead_io=use_multihead_io, + convert_special_types_on_retrieval=convert_special_types, + ) + + # Build request options + options = {} + if filter_expression: + options["expression"] = filter_expression + if sort_by: + options["sort_by"] = sort_by + options["sort_order"] = sort_order + + # Track how many records we've read and need to read + records_remaining = limit + current_offset = offset + has_yielded = False + + try: + while records_remaining > 0: + fetch_count = min(batch_size, records_remaining) + + # Use appropriate method based on column selection + if columns: + # get_records_by_column for specific columns + records = gpudb_table.get_records_by_column( + column_names=columns, + offset=current_offset, + limit=fetch_count, + options=options, + get_column_major=False, # Row-major format + force_primitive_return_types=True, + ) + else: + # get_records for all columns + records = gpudb_table.get_records( + offset=current_offset, + limit=fetch_count, + options=options, + force_primitive_return_types=True, + ) + + if not records: + break + + # Convert records to Arrow table + # Records are OrderedDict or Record objects + from ray.data._internal.datasource.kinetica_type_utils import ( + convert_records_to_arrow_table, + ) + + record_dicts = [dict(r) for r in records] + table = convert_records_to_arrow_table(record_dicts, arrow_schema) + yield table + has_yielded = True + + # Update counters + fetched_count = len(records) + records_remaining -= fetched_count + current_offset += fetched_count + + # If we got fewer records than requested, we're done + if fetched_count < fetch_count: + break + + # If we never yielded anything, yield an empty table + if not has_yielded: + empty_arrays = [ + pa.array([], type=field.type) for field in arrow_schema + ] + yield pa.Table.from_arrays(empty_arrays, schema=arrow_schema) + + except Exception as e: + logger.error(f"Error reading from Kinetica: {e}") + raise + + return read_fn + + def get_read_tasks( + self, + parallelism: int, + per_task_row_limit: Optional[int] = None, + data_context: Optional["DataContext"] = None, + ) -> List[ReadTask]: + """ + Generate read tasks for parallel data loading. + + Args: + parallelism: Desired level of parallelism. + per_task_row_limit: Optional max rows per task. + data_context: Optional Ray data context. + + Returns: + List of ReadTask objects. + """ + client = self._init_client() + + # Get table info using GPUdbTable + self._total_count, self._arrow_schema = self._get_table_info(client) + + if self._total_count == 0: + return [] + + # Estimate row size for metadata + avg_row_size = self._estimate_row_size(client) + + # Ensure parallelism is at least 1 to handle unresolved or invalid values + effective_parallelism = max(1, parallelism) + if not self._sort_by and effective_parallelism > 1: + # Without a deterministic sort order, offset-based pagination + # with parallelism > 1 will produce incorrect results (duplicates + # or missing rows) because Kinetica does not guarantee a stable + # row order across paginated get_records calls. Force single-task + # execution unless sort_by is specified. + logger.warning( + "Parallel reads without sort_by may produce non-deterministic " + "results (duplicate or missing rows). Reducing parallelism " + f"from {effective_parallelism} to 1. Specify sort_by to enable " + "parallel reads with consistent ordering." + ) + effective_parallelism = 1 + + # Calculate partition sizes + records_per_task = max(1, self._total_count // effective_parallelism) + + # Ensure we don't create too many tiny tasks + min_records_per_task = 1000 + if ( + records_per_task < min_records_per_task + and self._total_count > min_records_per_task + ): + effective_parallelism = max(1, self._total_count // min_records_per_task) + records_per_task = self._total_count // effective_parallelism + + # Cap effective_parallelism to total_count to avoid empty tasks + # This handles the case where parallelism > total_count + effective_parallelism = min(effective_parallelism, self._total_count) + + read_tasks = [] + offset = 0 + + for i in range(effective_parallelism): + if i == effective_parallelism - 1: + partition_size = self._total_count - offset + else: + partition_size = records_per_task + + # Skip if we've already assigned all records + if partition_size <= 0 or offset >= self._total_count: + break + + metadata = BlockMetadata( + num_rows=partition_size, + size_bytes=partition_size * avg_row_size, + input_files=None, + exec_stats=None, + ) + + read_fn = self._create_read_fn(offset, partition_size) + + read_tasks.append( + ReadTask(read_fn, metadata, per_task_row_limit=per_task_row_limit) + ) + offset += partition_size + + return read_tasks + + def estimate_inmemory_data_size(self) -> Optional[int]: + """ + Estimate the in-memory size of the data. + + Returns: + Estimated size in bytes, or None if unknown. + """ + if self._total_count is None: + client = self._init_client() + self._total_count, _ = self._get_table_info(client) + + if self._total_count is None: + return None + + return self._total_count * 256 + + def get_name(self) -> str: + """Return the name of this datasource.""" + return f"Kinetica({self._table_name})" diff --git a/python/ray/data/_internal/datasource/kinetica_sql_connection.py b/python/ray/data/_internal/datasource/kinetica_sql_connection.py new file mode 100644 index 000000000000..b8fe852bf064 --- /dev/null +++ b/python/ray/data/_internal/datasource/kinetica_sql_connection.py @@ -0,0 +1,121 @@ +""" +Connection factory for Kinetica DB-API integration with Ray Data. + +This module provides a connection factory that creates DB-API 2.0 compliant +connections compatible with Ray Data's read_sql and write_sql methods. +""" + +from dataclasses import dataclass +from typing import Any, Callable, Dict, Optional + + +def _check_gpudb(): + """Check that gpudb is installed and return the dbapi module.""" + try: + from gpudb import dbapi + + return dbapi + except ImportError: + raise ImportError( + "gpudb is required to use Kinetica SQL integration. " + "Install it with: pip install gpudb" + ) + + +@dataclass +class KineticaConnectionFactory: + """ + A callable factory that creates Kinetica DB-API connections. + + This class is designed to work with Ray Data's SQL integration, + which expects a callable that returns DB-API 2.0 compliant connections. + + Attributes: + url: URL of the Kinetica server. + username: Username for authentication. + password: Password for authentication. + oauth_token: OAuth token for authentication (alternative to username/password). + default_schema: Default schema to use for queries. + options: Additional GPUdb client options. + + Example: + >>> factory = KineticaConnectionFactory( # doctest: +SKIP + ... url="http://localhost:9191", + ... username="admin", + ... password="password", + ... ) + """ + + url: str + username: Optional[str] = None + password: Optional[str] = None + oauth_token: Optional[str] = None + default_schema: Optional[str] = None + options: Optional[Dict[str, Any]] = None + + def __call__(self): + """ + Create and return a new Kinetica connection. + + Returns: + A DB-API 2.0 compliant KineticaConnection instance. + """ + dbapi = _check_gpudb() + return dbapi.connect( + connection_string="kinetica://", + url=self.url, + username=self.username, + password=self.password, + oauth_token=self.oauth_token, + default_schema=self.default_schema, + options=self.options, + ) + + +def create_kinetica_connection_factory( + url: str, + username: Optional[str] = None, + password: Optional[str] = None, + oauth_token: Optional[str] = None, + default_schema: Optional[str] = None, + options: Optional[Dict[str, Any]] = None, +) -> Callable: + """ + Create a connection factory for use with Ray Data's SQL methods. + + This function returns a callable that creates new Kinetica connections + when invoked. It's designed to work with Ray Data's `read_sql` and + `write_sql` methods, which require a connection factory. + + Args: + url: URL of the Kinetica server (e.g., "http://localhost:9191"). + username: Username for authentication. + password: Password for authentication. + oauth_token: OAuth token for authentication (alternative to username/password). + default_schema: Default schema to use for queries. + options: Additional GPUdb client options + (e.g., {"skip_ssl_cert_verification": True}). + + Returns: + A callable that creates new KineticaConnection instances. + + Example: + >>> import ray # doctest: +SKIP + >>> factory = create_kinetica_connection_factory( # doctest: +SKIP + ... url="http://localhost:9191", + ... username="admin", + ... password="password", + ... ) + >>> ds = ray.data.read_sql( # doctest: +SKIP + ... sql="SELECT * FROM my_table", + ... connection_factory=factory, + ... ) + """ + return KineticaConnectionFactory( + url=url, + username=username, + password=password, + oauth_token=oauth_token, + default_schema=default_schema, + options=options, + ) diff --git a/python/ray/data/_internal/datasource/kinetica_type_utils.py b/python/ray/data/_internal/datasource/kinetica_type_utils.py new file mode 100644 index 000000000000..838212c3128f --- /dev/null +++ b/python/ray/data/_internal/datasource/kinetica_type_utils.py @@ -0,0 +1,683 @@ +""" +Type conversion utilities for mapping between PyArrow and Kinetica types. + +This module provides bidirectional type conversion between PyArrow schemas +and Kinetica column definitions. +""" + +import re +from typing import Any, Dict, List, Optional, Tuple + +try: + import pyarrow as pa +except ImportError: + pa = None + + +def _check_pyarrow(): + """Check if PyArrow is available.""" + if pa is None: + raise ImportError( + "PyArrow is required for Kinetica integration. " + "Install it with: pip install pyarrow" + ) + + +def _check_gpudb(): + """Check if gpudb is available.""" + try: + import gpudb + + return gpudb + except ImportError: + raise ImportError( + "gpudb is required for Kinetica integration. " + "Install it with: pip install gpudb" + ) + + +def create_gpudb_client( + url: str, + username: Optional[str] = None, + password: Optional[str] = None, + options: Optional[Dict[str, Any]] = None, +): + """ + Create and return a GPUdb client instance. + + This is a shared factory function used by both KineticaDatasource and + KineticaDatasink to ensure consistent client construction. + + Args: + url: URL of the Kinetica server (e.g., "http://localhost:9191"). + username: Username for authentication. + password: Password for authentication. + options: Additional GPUdb client options. + + Returns: + A configured GPUdb client instance. + """ + gpudb = _check_gpudb() + GPUdb = gpudb.GPUdb + + gpudb_options = GPUdb.Options() + + # Use explicit None checks to allow empty string credentials + # (consistent with SQL connection path in KineticaConnectionFactory) + if username is not None: + gpudb_options.username = username + if password is not None: + gpudb_options.password = password + + # Apply additional options + if options: + for key, value in options.items(): + if hasattr(gpudb_options, key): + setattr(gpudb_options, key, value) + + return GPUdb(host=url, options=gpudb_options) + + +def _column_type_to_array_inner(column_type: Any) -> str: + """ + Convert a GPUdbRecordColumn type constant to its array inner type string. + + This helper is used when constructing array() property strings for both + variable-length lists and fixed-size lists to avoid code duplication. + + Args: + column_type: A GPUdbRecordColumn._ColumnType constant. + + Returns: + The lowercase string representation for use in array() properties. + + Raises: + TypeError: If the column type is not supported as an array element type. + Kinetica only supports int, long, float, double, and string as + array inner types. + """ + gpudb = _check_gpudb() + GPUdbRecordColumn = gpudb.GPUdbRecordColumn + + type_map = { + GPUdbRecordColumn._ColumnType.INT: "int", + GPUdbRecordColumn._ColumnType.LONG: "long", + GPUdbRecordColumn._ColumnType.FLOAT: "float", + GPUdbRecordColumn._ColumnType.DOUBLE: "double", + GPUdbRecordColumn._ColumnType.STRING: "string", + } + result = type_map.get(column_type) + if result is None: + # Get a readable name for the unsupported type + type_name = getattr(column_type, "name", str(column_type)) + raise TypeError( + f"Unsupported array element type: {type_name}. " + f"Kinetica arrays only support: int, long, float, double, string. " + f"Consider converting binary/bytes data to base64-encoded strings." + ) + return result + + +def arrow_to_kinetica_type( + arrow_type: "pa.DataType", + column_name: str = "", +) -> Tuple[str, List[str]]: + """ + Convert a PyArrow data type to Kinetica column type and properties. + + Args: + arrow_type: The PyArrow data type to convert. + column_name: Optional column name for error messages. + + Returns: + A tuple of (kinetica_base_type, list_of_properties). + + Raises: + TypeError: If the PyArrow type cannot be converted. + """ + _check_pyarrow() + gpudb = _check_gpudb() + GPUdbRecordColumn = gpudb.GPUdbRecordColumn + GPUdbColumnProperty = gpudb.GPUdbColumnProperty + + # Boolean + if pa.types.is_boolean(arrow_type): + return GPUdbRecordColumn._ColumnType.INT, [GPUdbColumnProperty.BOOLEAN] + + # Integer types + if pa.types.is_int8(arrow_type): + return GPUdbRecordColumn._ColumnType.INT, [GPUdbColumnProperty.INT8] + if pa.types.is_int16(arrow_type): + return GPUdbRecordColumn._ColumnType.INT, [GPUdbColumnProperty.INT16] + if pa.types.is_int32(arrow_type): + return GPUdbRecordColumn._ColumnType.INT, [] + if pa.types.is_int64(arrow_type): + return GPUdbRecordColumn._ColumnType.LONG, [] + + # Unsigned integer types + if pa.types.is_uint8(arrow_type): + return GPUdbRecordColumn._ColumnType.INT, [GPUdbColumnProperty.INT16] + if pa.types.is_uint16(arrow_type): + return GPUdbRecordColumn._ColumnType.INT, [] + if pa.types.is_uint32(arrow_type): + return GPUdbRecordColumn._ColumnType.LONG, [] + if pa.types.is_uint64(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [GPUdbColumnProperty.ULONG] + + # Float types + if pa.types.is_float16(arrow_type) or pa.types.is_float32(arrow_type): + return GPUdbRecordColumn._ColumnType.FLOAT, [] + if pa.types.is_float64(arrow_type): + return GPUdbRecordColumn._ColumnType.DOUBLE, [] + + # Decimal types + if pa.types.is_decimal(arrow_type): + precision = arrow_type.precision + scale = arrow_type.scale + return GPUdbRecordColumn._ColumnType.STRING, [f"decimal({precision},{scale})"] + + # String types + if pa.types.is_string(arrow_type) or pa.types.is_large_string(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [] + if hasattr(pa.types, "is_utf8"): + if pa.types.is_utf8(arrow_type) or pa.types.is_large_utf8(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [] + + # UUID (must come before generic fixed-size binary check) + if pa.types.is_fixed_size_binary(arrow_type) and arrow_type.byte_width == 16: + return GPUdbRecordColumn._ColumnType.STRING, [GPUdbColumnProperty.UUID] + + # Binary types + if pa.types.is_binary(arrow_type) or pa.types.is_large_binary(arrow_type): + return GPUdbRecordColumn._ColumnType.BYTES, [] + if pa.types.is_fixed_size_binary(arrow_type): + return GPUdbRecordColumn._ColumnType.BYTES, [] + + # Date types + if pa.types.is_date32(arrow_type) or pa.types.is_date64(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [GPUdbColumnProperty.DATE] + + # Time types + if pa.types.is_time32(arrow_type) or pa.types.is_time64(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [GPUdbColumnProperty.TIME] + + # Timestamp types + if pa.types.is_timestamp(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [GPUdbColumnProperty.DATETIME] + + # Duration + if pa.types.is_duration(arrow_type): + return GPUdbRecordColumn._ColumnType.LONG, [] + + # List/Array types + if pa.types.is_list(arrow_type) or pa.types.is_large_list(arrow_type): + value_type = arrow_type.value_type + inner_type, _ = arrow_to_kinetica_type(value_type) + array_inner = _column_type_to_array_inner(inner_type) + return GPUdbRecordColumn._ColumnType.STRING, [f"array({array_inner})"] + + # Fixed-size list (vector) + if pa.types.is_fixed_size_list(arrow_type): + list_size = arrow_type.list_size + value_type = arrow_type.value_type + if pa.types.is_float32(value_type) or pa.types.is_float64(value_type): + return GPUdbRecordColumn._ColumnType.BYTES, [f"vector({list_size})"] + else: + inner_type, _ = arrow_to_kinetica_type(value_type) + array_inner = _column_type_to_array_inner(inner_type) + return GPUdbRecordColumn._ColumnType.STRING, [ + f"array({array_inner},{list_size})" + ] + + # Struct types -> JSON + if pa.types.is_struct(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [GPUdbColumnProperty.JSON] + + # Map types -> JSON + if pa.types.is_map(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [GPUdbColumnProperty.JSON] + + # Dictionary encoded + if pa.types.is_dictionary(arrow_type): + return arrow_to_kinetica_type(arrow_type.value_type, column_name) + + # Null type + if pa.types.is_null(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [GPUdbColumnProperty.NULLABLE] + + raise TypeError( + f"Cannot convert PyArrow type '{arrow_type}' to Kinetica type" + + (f" for column '{column_name}'" if column_name else "") + ) + + +def kinetica_to_arrow_type(column: Any) -> "pa.DataType": + """ + Convert a Kinetica column definition to a PyArrow data type. + + Args: + column: The GPUdbRecordColumn to convert. + + Returns: + The corresponding PyArrow data type. + """ + _check_pyarrow() + gpudb = _check_gpudb() + GPUdbRecordColumn = gpudb.GPUdbRecordColumn + GPUdbColumnProperty = gpudb.GPUdbColumnProperty + + base_type = column.column_type + properties = column.column_properties + # Lowercase both the properties and the constants for consistent comparison + prop_set = {p.lower() for p in properties} + + # Boolean + if GPUdbColumnProperty.BOOLEAN.lower() in prop_set: + return pa.bool_() + + # Int8 + if GPUdbColumnProperty.INT8.lower() in prop_set: + return pa.int8() + + # Int16 + if GPUdbColumnProperty.INT16.lower() in prop_set: + return pa.int16() + + # UUID + if GPUdbColumnProperty.UUID.lower() in prop_set: + return pa.string() + + # ULong + if GPUdbColumnProperty.ULONG.lower() in prop_set: + return pa.uint64() + + # Date + if GPUdbColumnProperty.DATE.lower() in prop_set: + return pa.date32() + + # Time + if GPUdbColumnProperty.TIME.lower() in prop_set: + return pa.time64("us") + + # Datetime + if GPUdbColumnProperty.DATETIME.lower() in prop_set: + return pa.timestamp("us") + + # Timestamp + if GPUdbColumnProperty.TIMESTAMP.lower() in prop_set: + return pa.timestamp("ms") + + # Decimal (note: is_decimal is a @property, not a method like is_json()) + if column.is_decimal: + # Use explicit None check to preserve valid 0 values for scale + precision = ( + column.precision + if column.precision is not None + else GPUdbRecordColumn.DEFAULT_DECIMAL_PRECISION + ) + scale = ( + column.scale + if column.scale is not None + else GPUdbRecordColumn.DEFAULT_DECIMAL_SCALE + ) + return pa.decimal128(precision, scale) + + # JSON + if column.is_json(): + return pa.string() + + # Array + # Note: column.array_type returns _ColumnType constants for int/long/float/ + # double/string, but GPUdbColumnProperty strings for boolean/ulong. + if column.is_array(): + array_type = column.array_type + if array_type == GPUdbRecordColumn._ColumnType.INT: + return pa.list_(pa.int32()) + elif array_type == GPUdbRecordColumn._ColumnType.LONG: + return pa.list_(pa.int64()) + elif array_type == GPUdbRecordColumn._ColumnType.FLOAT: + return pa.list_(pa.float32()) + elif array_type == GPUdbRecordColumn._ColumnType.DOUBLE: + return pa.list_(pa.float64()) + elif array_type == GPUdbRecordColumn._ColumnType.STRING: + return pa.list_(pa.string()) + elif array_type == GPUdbColumnProperty.BOOLEAN: + return pa.list_(pa.bool_()) + elif array_type == GPUdbColumnProperty.ULONG: + return pa.list_(pa.uint64()) + else: + return pa.list_(pa.string()) + + # Vector + if column.is_vector(): + dim = column.vector_dimension + if dim and dim > 0: + return pa.list_(pa.float32(), dim) + else: + return pa.list_(pa.float32()) + + # IPV4 + if GPUdbColumnProperty.IPV4.lower() in prop_set: + return pa.string() + + # WKT + if GPUdbColumnProperty.WKT.lower() in prop_set: + if base_type == GPUdbRecordColumn._ColumnType.BYTES: + return pa.binary() + return pa.string() + + # CharN types + for prop in properties: + prop_lower = prop.lower() + if prop_lower.startswith("char"): + match = re.match(r"char(\d+)", prop_lower) + if match: + return pa.string() + + # Base types + if base_type == GPUdbRecordColumn._ColumnType.INT: + return pa.int32() + elif base_type == GPUdbRecordColumn._ColumnType.LONG: + return pa.int64() + elif base_type == GPUdbRecordColumn._ColumnType.FLOAT: + return pa.float32() + elif base_type == GPUdbRecordColumn._ColumnType.DOUBLE: + return pa.float64() + elif base_type == GPUdbRecordColumn._ColumnType.STRING: + return pa.string() + elif base_type == GPUdbRecordColumn._ColumnType.BYTES: + return pa.binary() + + raise TypeError( + f"Cannot convert Kinetica type '{base_type}' with properties " + f"{properties} to PyArrow type for column '{column.name}'" + ) + + +def arrow_schema_to_kinetica_columns( + schema: "pa.Schema", + primary_keys: Optional[List[str]] = None, + shard_keys: Optional[List[str]] = None, +) -> List: + """ + Convert a PyArrow schema to a list of Kinetica column definitions. + + Args: + schema: The PyArrow schema to convert. + primary_keys: Optional list of column names to mark as primary keys. + shard_keys: Optional list of column names to mark as shard keys. + + Returns: + A list of GPUdbRecordColumn objects. + """ + _check_pyarrow() + gpudb = _check_gpudb() + GPUdbRecordColumn = gpudb.GPUdbRecordColumn + GPUdbColumnProperty = gpudb.GPUdbColumnProperty + + # Handle Ray Data Schema objects + if hasattr(schema, "base_schema"): + schema = schema.base_schema + + primary_keys = set(primary_keys or []) + shard_keys = set(shard_keys or []) + + # Validate primary/shard keys exist in schema + schema_columns = {field.name for field in schema} + invalid_primary = primary_keys - schema_columns + invalid_shard = shard_keys - schema_columns + if invalid_primary or invalid_shard: + invalid_cols = invalid_primary | invalid_shard + raise ValueError( + f"Primary/shard keys reference non-existent columns: " + f"{sorted(invalid_cols)}. Available columns: {sorted(schema_columns)}" + ) + + columns = [] + + for field in schema: + base_type, properties = arrow_to_kinetica_type(field.type, field.name) + + if field.name in primary_keys: + properties.append(GPUdbColumnProperty.PRIMARY_KEY) + if field.name in shard_keys: + properties.append(GPUdbColumnProperty.SHARD_KEY) + + is_nullable = field.nullable + + column = GPUdbRecordColumn( + name=field.name, + column_type=base_type, + column_properties=properties, + is_nullable=is_nullable, + ) + columns.append(column) + + return columns + + +def kinetica_type_to_arrow_schema(record_type: Any) -> "pa.Schema": + """ + Convert a Kinetica record type to a PyArrow schema. + + Args: + record_type: The GPUdbRecordType to convert. + + Returns: + A PyArrow Schema object. + """ + _check_pyarrow() + + fields = [] + for column in record_type.columns: + arrow_type = kinetica_to_arrow_type(column) + field = pa.field(column.name, arrow_type, nullable=column.is_nullable) + fields.append(field) + + return pa.schema(fields) + + +def _is_date_time_column(col_def: Any) -> Optional[str]: + """Check if a column is a date/time type and return the type name. + + Args: + col_def: The column definition to check. + + Returns: + The date/time type name ('date', 'time', 'datetime', 'timestamp'), + or None if not a date/time column. + """ + gpudb = _check_gpudb() + GPUdbColumnProperty = gpudb.GPUdbColumnProperty + + if col_def is None: + return None + + prop_set = {p.lower() for p in col_def.column_properties} + + if GPUdbColumnProperty.DATE.lower() in prop_set: + return "date" + if GPUdbColumnProperty.TIME.lower() in prop_set: + return "time" + if GPUdbColumnProperty.DATETIME.lower() in prop_set: + return "datetime" + if GPUdbColumnProperty.TIMESTAMP.lower() in prop_set: + return "timestamp" + return None + + +def convert_arrow_batch_to_records( + batch: "pa.RecordBatch", + columns: List, +) -> List[Dict[str, Any]]: + """ + Convert a PyArrow RecordBatch to a list of dictionaries for Kinetica insertion. + + Args: + batch: The PyArrow RecordBatch to convert. + columns: The Kinetica column definitions. + + Returns: + A list of dictionaries, each representing a record. + """ + _check_pyarrow() + import json + import struct + from datetime import date, datetime, time + + # Handle None columns - create empty map if no column definitions provided + column_map = {col.name: col for col in columns} if columns else {} + col_names = batch.schema.names + num_rows = batch.num_rows + + # Pre-process columns: convert to Python lists and identify special columns + col_data = {} + # col_types values: 'json', 'array', 'vector', 'decimal', 'date', 'time', + # 'datetime', 'timestamp', or None for normal columns + col_types = {} + for col_name in col_names: + col_def = column_map.get(col_name) + # Convert column to Python list directly from batch (avoids full + # table conversion) + col_data[col_name] = batch.column(col_name).to_pylist() + + if col_def: + if col_def.is_json(): + col_types[col_name] = "json" + elif col_def.is_array(): + col_types[col_name] = "array" + elif col_def.is_vector(): + col_types[col_name] = "vector" + elif col_def.is_decimal: + col_types[col_name] = "decimal" + else: + # Check for date/time types + dt_type = _is_date_time_column(col_def) + col_types[ + col_name + ] = dt_type # Could be 'date', 'time', 'datetime', 'timestamp', or None + else: + col_types[col_name] = None + + records = [] + for i in range(num_rows): + record = {} + for col_name in col_names: + value = col_data[col_name][i] + col_type = col_types[col_name] + + if value is None: + record[col_name] = None + elif col_type == "json": + if isinstance(value, (dict, list)): + record[col_name] = json.dumps(value) + else: + record[col_name] = value + elif col_type == "array": + if isinstance(value, list): + # Use json.dumps for proper JSON array format + # str() produces single-quoted strings which is invalid JSON + record[col_name] = json.dumps(value) + else: + record[col_name] = value + elif col_type == "vector": + if isinstance(value, list): + try: + record[col_name] = struct.pack(f"{len(value)}f", *value) + except (struct.error, TypeError) as e: + raise ValueError( + f"Failed to pack vector values for column '{col_name}': " + f"expected list of floats, got {type(value).__name__} " + f"with values that cannot be converted. Error: {e}" + ) from e + else: + record[col_name] = value + elif col_type == "decimal": + # Convert decimal to string representation for Kinetica + record[col_name] = str(value) + elif col_type == "date": + # Convert date to ISO format string (YYYY-MM-DD) + # Check datetime first since datetime is a subclass of date + if isinstance(value, datetime): + # Extract just the date part for date-typed columns + record[col_name] = value.date().isoformat() + elif isinstance(value, date): + record[col_name] = value.isoformat() + else: + record[col_name] = str(value) if value is not None else None + elif col_type == "time": + # Convert time to ISO format string (HH:MM:SS.ffffff) + if isinstance(value, time): + record[col_name] = value.isoformat() + else: + record[col_name] = str(value) if value is not None else None + elif col_type in ("datetime", "timestamp"): + # Convert datetime to ISO format string (YYYY-MM-DDTHH:MM:SS.ffffff) + if isinstance(value, datetime): + record[col_name] = value.isoformat() + else: + record[col_name] = str(value) if value is not None else None + else: + # Handle any remaining date/time types that weren't detected + # by column properties + if isinstance(value, datetime): + record[col_name] = value.isoformat() + elif isinstance(value, date): + record[col_name] = value.isoformat() + elif isinstance(value, time): + record[col_name] = value.isoformat() + else: + record[col_name] = value + + records.append(record) + + return records + + +def convert_records_to_arrow_table( + records: List[Dict[str, Any]], + schema: "pa.Schema", +) -> "pa.Table": + """ + Convert a list of record dictionaries to a PyArrow Table. + + Args: + records: List of dictionaries representing records. + schema: The PyArrow schema to use. + + Returns: + A PyArrow Table. + """ + _check_pyarrow() + + if not records: + # Create empty arrays for each field in the schema + empty_arrays = [pa.array([], type=field.type) for field in schema] + return pa.Table.from_arrays(empty_arrays, schema=schema) + + columns = {field.name: [] for field in schema} + + for record in records: + for field in schema: + value = record.get(field.name) + columns[field.name].append(value) + + arrays = [] + for field in schema: + col_data = columns[field.name] + try: + array = pa.array(col_data, type=field.type) + except (pa.ArrowInvalid, pa.ArrowTypeError) as initial_error: + # Try creating array without explicit type, then cast + try: + array = pa.array(col_data) + if array.type != field.type: + array = array.cast(field.type) + except (pa.ArrowInvalid, pa.ArrowTypeError) as cast_error: + raise pa.ArrowTypeError( + f"Failed to convert column '{field.name}' to type {field.type}. " + f"Initial error: {initial_error}. Cast error: {cast_error}" + ) from cast_error + arrays.append(array) + + return pa.Table.from_arrays(arrays, schema=schema) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 3c38557cb381..6c30b70caef4 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -153,6 +153,7 @@ import torch.utils.data from tensorflow_metadata.proto.v0 import schema_pb2 + from ray.data._internal.datasource.kinetica_datasink import KineticaTableSettings from ray.data._internal.execution.interfaces import Executor, NodeIdStr from ray.data._internal.execution.streaming_executor import StreamingExecutor from ray.data._internal.logical.interfaces.logical_operator import LogicalOperator @@ -5212,6 +5213,158 @@ def write_sql( concurrency=concurrency, ) + @ConsumptionAPI + def write_kinetica( + self, + table_name: str, + url: str, + *, + username: Optional[str] = None, + password: Optional[str] = None, + mode: str = "append", + table_settings: Optional["KineticaTableSettings"] = None, + batch_size: int = 10000, + use_multihead: bool = True, + options: Optional[Dict[str, Any]] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + concurrency: Optional[int] = None, + ) -> None: + """Write this :class:`~ray.data.Dataset` to a Kinetica database table. + + Kinetica is a distributed, in-memory analytical database. This method + uses Kinetica's multihead ingestion for optimal write performance when + writing to a distributed cluster. + + Examples: + + .. testcode:: + :skipif: True + + import ray + + ds = ray.data.from_items([ + {"id": 1, "name": "Alice", "amount": 100.0}, + {"id": 2, "name": "Bob", "amount": 200.0}, + ]) + ds.write_kinetica( + table_name="transactions", + url="http://localhost:9191", + username="admin", + password="password", + mode="append", + ) + + Args: + table_name: Target table name. + url: Kinetica server URL (e.g., "http://localhost:9191"). + username: Authentication username. + password: Authentication password. + mode: Write mode - "create", "append", or "overwrite". + table_settings: Kinetica-specific table options (KineticaTableSettings). + batch_size: Records per ingestion batch. Default is 10,000. + use_multihead: Enable multihead ingestion for parallelism. + options: Additional GPUdb client options. + ray_remote_args: Keyword arguments passed to :func:`ray.remote`. + concurrency: Maximum concurrent write tasks. + """ + from ray.data._internal.datasource.kinetica_datasink import KineticaDatasink + + datasink = KineticaDatasink( + url=url, + table_name=table_name, + username=username, + password=password, + mode=mode, + schema=self.schema(), + table_settings=table_settings, + batch_size=batch_size, + use_multihead=use_multihead, + options=options, + ) + self.write_datasink( + datasink, + ray_remote_args=ray_remote_args, + concurrency=concurrency, + ) + + @ConsumptionAPI + def write_kinetica_sql( + self, + sql: str, + url: str, + *, + username: Optional[str] = None, + password: Optional[str] = None, + oauth_token: Optional[str] = None, + default_schema: Optional[str] = None, + options: Optional[Dict[str, Any]] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + concurrency: Optional[int] = None, + ) -> None: + """Write this :class:`~ray.data.Dataset` to Kinetica using SQL INSERT. + + This method uses Kinetica's DB-API 2.0 interface with Ray Data's native + ``write_sql`` method. The target table must already exist. + + For automatic table creation and multihead ingestion, use + :meth:`~ray.data.Dataset.write_kinetica` instead. + + Examples: + + .. testcode:: + :skipif: True + + import ray + + ds = ray.data.from_items([ + {"id": 1, "name": "Alice", "value": 100.0}, + {"id": 2, "name": "Bob", "value": 200.0}, + ]) + ds.write_kinetica_sql( + sql="INSERT INTO my_table (id, name, value) VALUES (?, ?, ?)", + url="http://localhost:9191", + username="admin", + password="password", + ) + + Args: + sql: SQL INSERT statement with parameter placeholders. + Use '?' for placeholders (qmark paramstyle). + url: Kinetica server URL (e.g., "http://localhost:9191"). + username: Authentication username. + password: Authentication password. + oauth_token: OAuth token for authentication (alternative to username/password). + default_schema: Default schema for queries. + options: Additional GPUdb client options. + ray_remote_args: Keyword arguments passed to :func:`ray.remote`. + concurrency: Maximum concurrent write tasks. + """ + from ray.data._internal.datasource.kinetica_sql_connection import ( + create_kinetica_connection_factory, + ) + + connection_factory = create_kinetica_connection_factory( + url=url, + username=username, + password=password, + oauth_token=oauth_token, + default_schema=default_schema, + options=options, + ) + + write_kwargs: Dict[str, Any] = { + "sql": sql, + "connection_factory": connection_factory, + } + + if ray_remote_args is not None: + write_kwargs["ray_remote_args"] = ray_remote_args + + if concurrency is not None: + write_kwargs["concurrency"] = concurrency + + self.write_sql(**write_kwargs) + @ConsumptionAPI def write_snowflake( self, diff --git a/python/ray/data/tests/datasource/test_kinetica.py b/python/ray/data/tests/datasource/test_kinetica.py new file mode 100644 index 000000000000..3f760d5c0d31 --- /dev/null +++ b/python/ray/data/tests/datasource/test_kinetica.py @@ -0,0 +1,1930 @@ +""" +Tests for the Kinetica Ray Data integration. + +These tests use mocks to verify the KineticaDatasource and KineticaDatasink +work correctly without requiring a running Kinetica server. +""" + +import json +import os +from unittest.mock import MagicMock, patch + +import pyarrow as pa +import pytest + +from ray.data._internal.datasource.kinetica_datasink import ( + KineticaDatasink, + KineticaSinkMode, + KineticaTableSettings, +) +from ray.data._internal.datasource.kinetica_datasource import ( + KineticaDatasource, + _has_balanced_quotes, + _is_filter_safe, +) +from ray.data._internal.execution.interfaces.task_context import TaskContext + +# ============================================================================ +# Fixtures for Mocking GPUdb Client +# ============================================================================ + + +@pytest.fixture +def mock_gpudb_client(): + """Mock GPUdb client for datasource tests.""" + client = MagicMock() + + # Mock show_table response + client.show_table.return_value = { + "type_schemas": [ + json.dumps( + { + "fields": [ + {"name": "id", "type": "long"}, + {"name": "name", "type": "string"}, + {"name": "value", "type": "double"}, + ] + } + ) + ], + "properties": [{"id": [], "name": [], "value": []}], + "total_size": 100, + } + + # Mock get_records response + client.get_records.return_value = { + "records_json": [ + json.dumps({"id": 1, "name": "Alice", "value": 100.5}), + json.dumps({"id": 2, "name": "Bob", "value": 200.75}), + ], + "total_number_of_records": 100, + } + + return client + + +@pytest.fixture +def mock_gpudb_sink_client(): + """Mock GPUdb client for datasink tests.""" + client = MagicMock() + + # Mock table existence check + client.has_table.return_value = {"table_exists": False} + + # Mock insert_records response + client.insert_records.return_value = { + "count_inserted": 3, + "count_updated": 0, + "info": {}, + } + + # Mock show_table response + client.show_table.return_value = { + "type_ids": ["type_123"], + "type_schemas": [ + json.dumps( + { + "fields": [ + {"name": "id", "type": "long"}, + {"name": "name", "type": "string"}, + ] + } + ) + ], + "properties": [{"id": [], "name": []}], + } + + return client + + +@pytest.fixture(autouse=True) +def patch_gpudb(): + """Automatically patch GPUdb for all tests.""" + with patch("gpudb.GPUdb") as mock_gpudb_class: + mock_instance = MagicMock() + mock_gpudb_class.return_value = mock_instance + yield mock_instance + + +# ============================================================================ +# Filter Safety Tests +# ============================================================================ + + +class TestFilterSafety: + """Tests for filter expression safety validation.""" + + @pytest.mark.parametrize( + "filter_expr, is_safe", + [ + # Safe expressions + ("id > 100", True), + ("name = 'Alice' AND value > 50", True), + ("id > 100 AND name IS NOT NULL", True), + # Keywords inside string literals are allowed (not injection) + ("city = 'Union City'", True), + ("status = 'DROP_PENDING'", True), + ("name = 'Delete Me'", True), + ('comment = "ALTER this later"', True), + # Escaped quotes in strings + ("name = 'O''Brien'", True), + # Unsafe expressions - keywords outside strings + ("id = 1; DROP TABLE test;", False), + ("id > 100; SELECT * FROM users", False), + ("id IN {1, 2, 3}", False), + # Keywords outside of string context + ("id = 1 UNION SELECT * FROM secrets", False), + ("id = 1 -- comment injection", False), + ("id = 1 /* block comment */", False), + # Unclosed string literals (could bypass stripping) + ("x = ''' ; DROP TABLE t", False), + ('x = """ ; DROP TABLE t', False), + ("name = 'value", False), + ('comment = "test', False), + ], + ) + def test_is_filter_safe(self, filter_expr, is_safe): + """Test filter safety validation.""" + assert _is_filter_safe(filter_expr) == is_safe + + def test_balanced_quotes_detection(self): + """Test the _has_balanced_quotes helper function.""" + # Balanced quotes + assert _has_balanced_quotes("name = 'Alice'") is True + assert _has_balanced_quotes('status = "active"') is True + assert _has_balanced_quotes("name = 'O''Brien'") is True + assert _has_balanced_quotes('msg = "He said ""hi"""') is True + assert _has_balanced_quotes("id = 1") is True + + # Unbalanced quotes + assert _has_balanced_quotes("name = 'Alice") is False + assert _has_balanced_quotes('status = "active') is False + assert _has_balanced_quotes("x = '''") is False + + +# ============================================================================ +# KineticaDatasource Tests +# ============================================================================ + + +class TestKineticaDatasource: + """Tests for KineticaDatasource.""" + + @pytest.fixture + def datasource(self): + """Create a KineticaDatasource with test parameters.""" + with patch( + "ray.data._internal.datasource.kinetica_datasource.KineticaDatasource._init_client" + ): + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + username="admin", + password="password", + columns=["id", "name"], + filter_expression="id > 100", + batch_size=5000, + ) + return ds + + def test_init(self, datasource): + """Test datasource initialization.""" + assert datasource._url == "http://localhost:9191" + assert datasource._table_name == "test_table" + assert datasource._username == "admin" + assert datasource._columns == ["id", "name"] + assert datasource._filter_expression == "id > 100" + assert datasource._batch_size == 5000 + + def test_default_batch_size(self): + """Test default batch size.""" + with patch( + "ray.data._internal.datasource.kinetica_datasource.KineticaDatasource._init_client" + ): + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + ) + assert ds._batch_size == 10000 + + def test_unsafe_filter_rejected(self): + """Test that unsafe filter expressions are rejected without leaking input.""" + with pytest.raises(ValueError, match="unsafe patterns") as exc_info: + KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + filter_expression="id = 1; DROP TABLE test;", + ) + # Verify the error message does NOT include the raw user input + # (to prevent log injection) + assert "DROP TABLE" not in str(exc_info.value) + + def test_get_name(self, datasource): + """Test datasource name generation.""" + assert datasource.get_name() == "Kinetica(test_table)" + + @patch.object(KineticaDatasource, "_init_client") + def test_get_table_info(self, mock_init_client, mock_gpudb_client): + """Test _get_table_info method.""" + mock_init_client.return_value = mock_gpudb_client + + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + ) + + total_count, arrow_schema = ds._get_table_info(mock_gpudb_client) + + assert total_count == 100 + assert arrow_schema is not None + assert len(arrow_schema) == 3 + + @patch.object(KineticaDatasource, "_init_client") + def test_get_table_info_with_filter(self, mock_init_client, mock_gpudb_client): + """Test _get_table_info with filter expression.""" + mock_init_client.return_value = mock_gpudb_client + + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + filter_expression="id > 50", + ) + + total_count, arrow_schema = ds._get_table_info(mock_gpudb_client) + + assert total_count >= 0 + assert arrow_schema is not None + + @patch.object(KineticaDatasource, "_init_client") + def test_estimate_row_size(self, mock_init_client, mock_gpudb_client): + """Test _estimate_row_size method.""" + mock_init_client.return_value = mock_gpudb_client + + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + columns=["id", "name"], + ) + + row_size = ds._estimate_row_size(mock_gpudb_client, sample_size=100) + + assert row_size > 0 + mock_gpudb_client.get_records.assert_called() + + @patch.object(KineticaDatasource, "_init_client") + @pytest.mark.parametrize("parallelism", [1, 2, 4]) + def test_get_read_tasks(self, mock_init_client, mock_gpudb_client, parallelism): + """Test get_read_tasks with different parallelism levels.""" + mock_init_client.return_value = mock_gpudb_client + + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + ) + + read_tasks = ds.get_read_tasks(parallelism) + + assert len(read_tasks) <= parallelism + assert all(task.metadata.num_rows > 0 for task in read_tasks) + + @patch.object(KineticaDatasource, "_init_client") + def test_get_read_tasks_empty_table(self, mock_init_client, mock_gpudb_client): + """Test get_read_tasks with empty table.""" + mock_init_client.return_value = mock_gpudb_client + mock_gpudb_client.show_table.return_value = { + "type_schemas": [json.dumps({"fields": []})], + "properties": [{}], + "total_size": 0, + } + + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="empty_table", + ) + + read_tasks = ds.get_read_tasks(parallelism=2) + + assert len(read_tasks) == 0 + + @patch.object(KineticaDatasource, "_init_client") + def test_estimate_inmemory_data_size(self, mock_init_client, mock_gpudb_client): + """Test estimate_inmemory_data_size method.""" + mock_init_client.return_value = mock_gpudb_client + + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + ) + + size = ds.estimate_inmemory_data_size() + + assert size is not None + assert size > 0 + + +# ============================================================================ +# KineticaDatasink Tests +# ============================================================================ + + +class TestKineticaDatasink: + """Tests for KineticaDatasink.""" + + @pytest.fixture + def datasink(self): + """Create a KineticaDatasink with test parameters.""" + with patch( + "ray.data._internal.datasource.kinetica_datasink.KineticaDatasink._init_client" + ): + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + username="admin", + password="password", + mode=KineticaSinkMode.APPEND, + batch_size=5000, + ) + return ds + + def test_init(self, datasink): + """Test datasink initialization.""" + assert datasink._url == "http://localhost:9191" + assert datasink._table_name == "test_table" + assert datasink._username == "admin" + assert datasink._mode == KineticaSinkMode.APPEND + assert datasink._batch_size == 5000 + + def test_string_mode(self): + """Test datasink accepts string mode values.""" + with patch( + "ray.data._internal.datasource.kinetica_datasink.KineticaDatasink._init_client" + ): + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + mode="overwrite", + ) + assert ds._mode == KineticaSinkMode.OVERWRITE + + def test_table_settings(self): + """Test KineticaTableSettings configuration.""" + settings = KineticaTableSettings( + is_replicated=True, + chunk_size=1000000, + ttl=60, + primary_keys=["id"], + shard_keys=["region"], + ) + + with patch( + "ray.data._internal.datasource.kinetica_datasink.KineticaDatasink._init_client" + ): + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + table_settings=settings, + ) + + assert ds._table_settings.is_replicated is True + assert ds._table_settings.chunk_size == 1000000 + assert ds._table_settings.ttl == 60 + assert ds._table_settings.primary_keys == ["id"] + assert ds._table_settings.shard_keys == ["region"] + + def test_get_name(self, datasink): + """Test datasink name generation.""" + assert datasink.get_name() == "Kinetica(test_table)" + + def test_supports_distributed_writes(self, datasink): + """Test datasink reports distributed write support based on table readiness.""" + # Before on_write_start is called, _table_ready is False + # so distributed writes should be disabled to prevent race conditions + assert datasink.supports_distributed_writes is False + + # After table is ready, distributed writes should be enabled + datasink._table_ready = True + assert datasink.supports_distributed_writes is True + + def test_min_rows_per_write(self, datasink): + """Test min_rows_per_write property (used by Ray Data framework).""" + assert datasink.min_rows_per_write == 5000 + + @patch.object(KineticaDatasink, "_init_client") + def test_table_exists(self, mock_init_client, mock_gpudb_sink_client): + """Test _table_exists method.""" + mock_init_client.return_value = mock_gpudb_sink_client + mock_gpudb_sink_client.has_table.return_value = {"table_exists": True} + + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + ) + + exists = ds._table_exists(mock_gpudb_sink_client) + + assert exists is True + mock_gpudb_sink_client.has_table.assert_called_once() + + @patch.object(KineticaDatasink, "_init_client") + def test_drop_table(self, mock_init_client, mock_gpudb_sink_client): + """Test _drop_table method uses no_error_if_not_exists option.""" + mock_init_client.return_value = mock_gpudb_sink_client + + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + ) + + ds._drop_table(mock_gpudb_sink_client) + + mock_gpudb_sink_client.clear_table.assert_called_once_with( + table_name="test_table", + options={"no_error_if_not_exists": "true"}, + ) + + @patch.object(KineticaDatasink, "_init_client") + @patch( + "ray.data._internal.datasource.kinetica_type_utils.arrow_schema_to_kinetica_columns" + ) + def test_create_table( + self, mock_arrow_to_kinetica, mock_init_client, mock_gpudb_sink_client + ): + """Test _create_table method.""" + from gpudb import GPUdbRecordColumn, GPUdbRecordType + + mock_init_client.return_value = mock_gpudb_sink_client + + # Mock columns + mock_columns = [ + GPUdbRecordColumn( + name="id", + column_type=GPUdbRecordColumn._ColumnType.LONG, + column_properties=[], + is_nullable=False, + ), + ] + + # Mock record type + mock_record_type = MagicMock(spec=GPUdbRecordType) + mock_record_type.create_type.return_value = "type_123" + mock_record_type.schema_string = "schema_string" + mock_record_type.column_properties = {} + + with patch("gpudb.GPUdbRecordType", return_value=mock_record_type): + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + ) + + ds._create_table(mock_gpudb_sink_client, mock_columns) + + mock_gpudb_sink_client.create_table.assert_called_once() + + @patch.object(KineticaDatasink, "_init_client") + @pytest.mark.parametrize( + "mode, table_exists, should_create", + [ + (KineticaSinkMode.CREATE, False, True), + (KineticaSinkMode.APPEND, False, True), + (KineticaSinkMode.APPEND, True, False), + (KineticaSinkMode.OVERWRITE, False, True), + (KineticaSinkMode.OVERWRITE, True, True), + ], + ) + def test_on_write_start_modes( + self, + mock_init_client, + mock_gpudb_sink_client, + mode, + table_exists, + should_create, + ): + """Test on_write_start with different modes.""" + mock_init_client.return_value = mock_gpudb_sink_client + mock_gpudb_sink_client.has_table.return_value = {"table_exists": table_exists} + + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("name", pa.string()), + ] + ) + + with ( + patch.object(KineticaDatasink, "_create_table") as mock_create, + patch.object(KineticaDatasink, "_drop_table") as mock_drop, + patch.object( + KineticaDatasink, "_get_existing_record_type" + ) as mock_get_type, + patch( + "ray.data._internal.datasource.kinetica_type_utils.arrow_schema_to_kinetica_columns" + ) as mock_arrow_to_kinetica, + ): + mock_arrow_to_kinetica.return_value = [] + + # Mock existing record type + mock_record_type = MagicMock() + mock_record_type.columns = [] + mock_get_type.return_value = mock_record_type + + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + mode=mode, + schema=schema, + ) + + ds.on_write_start(schema) + + if mode == KineticaSinkMode.OVERWRITE and table_exists: + mock_drop.assert_called_once() + + if should_create: + if mode == KineticaSinkMode.APPEND and table_exists: + mock_get_type.assert_called_once() + else: + # CREATE or OVERWRITE should create table + if mode != KineticaSinkMode.APPEND or not table_exists: + mock_create.assert_called() + + @patch.object(KineticaDatasink, "_init_client") + def test_on_write_start_create_existing_table_fails( + self, mock_init_client, mock_gpudb_sink_client + ): + """Test that CREATE mode fails if table already exists.""" + from gpudb import GPUdbException + + mock_init_client.return_value = mock_gpudb_sink_client + mock_gpudb_sink_client.has_table.return_value = {"table_exists": True} + + schema = pa.schema([pa.field("id", pa.int64())]) + + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + mode=KineticaSinkMode.CREATE, + schema=schema, + ) + + with pytest.raises(GPUdbException, match="already exists"): + ds.on_write_start(schema) + + @patch.object(KineticaDatasink, "_init_client") + @patch( + "ray.data._internal.datasource.kinetica_type_utils.arrow_schema_to_kinetica_columns" + ) + @patch( + "ray.data._internal.datasource.kinetica_type_utils.convert_arrow_batch_to_records" + ) + def test_write( + self, + mock_convert_batch, + mock_arrow_to_kinetica, + mock_init_client, + mock_gpudb_sink_client, + ): + """Test write method.""" + mock_init_client.return_value = mock_gpudb_sink_client + mock_gpudb_sink_client.has_table.return_value = {"table_exists": False} + + # Mock conversion functions + mock_arrow_to_kinetica.return_value = [] + mock_convert_batch.return_value = [ + {"id": 1, "name": "Alice"}, + {"id": 2, "name": "Bob"}, + ] + + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("name", pa.string()), + ] + ) + + # Create test data + rb = pa.record_batch( + [pa.array([1, 2]), pa.array(["Alice", "Bob"])], + names=["id", "name"], + ) + block_data = pa.Table.from_batches([rb]) + + with patch.object(KineticaDatasink, "_create_table"): + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + mode=KineticaSinkMode.CREATE, + schema=schema, + ) + ds._column_defs = [] + + ctx = TaskContext(task_idx=0, op_name="test_write") + result = ds.write([block_data], ctx=ctx) + + assert "num_inserted" in result + assert "num_updated" in result + # Note: write raises RuntimeError on errors instead of returning them + + +# ============================================================================ +# Type Utils Tests +# ============================================================================ + + +class TestKineticaTypeUtils: + """Tests for type conversion utilities.""" + + def test_arrow_schema_conversion(self): + """Test converting Arrow schema to Kinetica columns.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_schema_to_kinetica_columns, + ) + + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("name", pa.string()), + pa.field("value", pa.float64()), + pa.field("active", pa.bool_()), + ] + ) + + columns = arrow_schema_to_kinetica_columns(schema) + + assert len(columns) == 4 + assert columns[0].name == "id" + assert columns[1].name == "name" + assert columns[2].name == "value" + assert columns[3].name == "active" + + def test_arrow_schema_with_keys(self): + """Test converting Arrow schema with primary/shard keys.""" + from gpudb import GPUdbColumnProperty + + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_schema_to_kinetica_columns, + ) + + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("region", pa.string()), + pa.field("value", pa.float64()), + ] + ) + + columns = arrow_schema_to_kinetica_columns( + schema, + primary_keys=["id"], + shard_keys=["region"], + ) + + assert GPUdbColumnProperty.PRIMARY_KEY in columns[0].column_properties + assert GPUdbColumnProperty.SHARD_KEY in columns[1].column_properties + + def test_arrow_schema_with_invalid_keys_rejected(self): + """Test that invalid primary/shard keys raise ValueError.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_schema_to_kinetica_columns, + ) + + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("name", pa.string()), + ] + ) + + # Invalid primary key + with pytest.raises(ValueError, match="non-existent columns"): + arrow_schema_to_kinetica_columns( + schema, + primary_keys=["nonexistent_column"], + ) + + # Invalid shard key + with pytest.raises(ValueError, match="non-existent columns"): + arrow_schema_to_kinetica_columns( + schema, + shard_keys=["also_nonexistent"], + ) + + # Both invalid + with pytest.raises(ValueError, match="non-existent columns"): + arrow_schema_to_kinetica_columns( + schema, + primary_keys=["bad_pk"], + shard_keys=["bad_sk"], + ) + + def test_arrow_to_kinetica_integer_types(self): + """Test Arrow integer types convert correctly to Kinetica.""" + from gpudb import GPUdbColumnProperty + + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_to_kinetica_type, + ) + + # Signed integers + assert arrow_to_kinetica_type(pa.int8()) == ( + "int", + [GPUdbColumnProperty.INT8], + ) + assert arrow_to_kinetica_type(pa.int16()) == ( + "int", + [GPUdbColumnProperty.INT16], + ) + assert arrow_to_kinetica_type(pa.int32()) == ("int", []) + assert arrow_to_kinetica_type(pa.int64()) == ("long", []) + + # Unsigned integers + assert arrow_to_kinetica_type(pa.uint8()) == ( + "int", + [GPUdbColumnProperty.INT16], + ) + assert arrow_to_kinetica_type(pa.uint16()) == ("int", []) + assert arrow_to_kinetica_type(pa.uint32()) == ("long", []) + assert arrow_to_kinetica_type(pa.uint64()) == ( + "string", + [GPUdbColumnProperty.ULONG], + ) + + def test_arrow_to_kinetica_float_types(self): + """Test Arrow float types convert correctly to Kinetica.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_to_kinetica_type, + ) + + assert arrow_to_kinetica_type(pa.float32()) == ("float", []) + assert arrow_to_kinetica_type(pa.float64()) == ("double", []) + + def test_arrow_to_kinetica_datetime_types(self): + """Test Arrow date/time types convert correctly to Kinetica.""" + from gpudb import GPUdbColumnProperty + + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_to_kinetica_type, + ) + + assert arrow_to_kinetica_type(pa.date32()) == ( + "string", + [GPUdbColumnProperty.DATE], + ) + assert arrow_to_kinetica_type(pa.date64()) == ( + "string", + [GPUdbColumnProperty.DATE], + ) + assert arrow_to_kinetica_type(pa.time32("ms")) == ( + "string", + [GPUdbColumnProperty.TIME], + ) + assert arrow_to_kinetica_type(pa.time64("us")) == ( + "string", + [GPUdbColumnProperty.TIME], + ) + assert arrow_to_kinetica_type(pa.timestamp("us")) == ( + "string", + [GPUdbColumnProperty.DATETIME], + ) + + def test_arrow_to_kinetica_other_types(self): + """Test Arrow boolean, string, binary types convert correctly.""" + from gpudb import GPUdbColumnProperty + + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_to_kinetica_type, + ) + + assert arrow_to_kinetica_type(pa.bool_()) == ( + "int", + [GPUdbColumnProperty.BOOLEAN], + ) + assert arrow_to_kinetica_type(pa.string()) == ("string", []) + assert arrow_to_kinetica_type(pa.binary()) == ("bytes", []) + + def test_kinetica_to_arrow_case_insensitive(self): + """Test Kinetica to Arrow conversion is case-insensitive.""" + from gpudb import GPUdbRecordColumn + + from ray.data._internal.datasource.kinetica_type_utils import ( + kinetica_to_arrow_type, + ) + + # Helper to create mock column + def make_col(col_type, props): + col = GPUdbRecordColumn( + name="test", + column_type=col_type, + column_properties=props, + ) + return col + + # Test lowercase properties + assert ( + kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.INT, ["boolean"]) + ) + == pa.bool_() + ) + assert ( + kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.INT, ["int8"]) + ) + == pa.int8() + ) + assert ( + kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.STRING, ["date"]) + ) + == pa.date32() + ) + assert kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.STRING, ["time"]) + ) == pa.time64("us") + assert kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.STRING, ["datetime"]) + ) == pa.timestamp("us") + + # Test uppercase properties + assert ( + kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.INT, ["BOOLEAN"]) + ) + == pa.bool_() + ) + assert ( + kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.INT, ["INT8"]) + ) + == pa.int8() + ) + assert ( + kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.STRING, ["DATE"]) + ) + == pa.date32() + ) + assert kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.STRING, ["DATETIME"]) + ) == pa.timestamp("us") + + # Test mixed case properties + assert ( + kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.INT, ["Boolean"]) + ) + == pa.bool_() + ) + assert kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.STRING, ["DateTime"]) + ) == pa.timestamp("us") + + def test_convert_arrow_batch_datetime_serialization(self): + """Test date/time values serialize to ISO format strings.""" + from datetime import date, datetime, time + + from gpudb import GPUdbColumnProperty, GPUdbRecordColumn + + from ray.data._internal.datasource.kinetica_type_utils import ( + convert_arrow_batch_to_records, + ) + + # Create test data with date/time columns + data = { + "id": [1, 2], + "date_col": [date(2024, 1, 15), date(2024, 12, 31)], + "time_col": [time(10, 30, 45), time(23, 59, 59)], + "datetime_col": [ + datetime(2024, 1, 15, 10, 30, 45), + datetime(2024, 12, 31, 23, 59, 59), + ], + } + batch = pa.RecordBatch.from_pydict(data) + + # Create column definitions + columns = [ + GPUdbRecordColumn( + name="id", + column_type=GPUdbRecordColumn._ColumnType.INT, + column_properties=[], + ), + GPUdbRecordColumn( + name="date_col", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=[GPUdbColumnProperty.DATE], + ), + GPUdbRecordColumn( + name="time_col", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=[GPUdbColumnProperty.TIME], + ), + GPUdbRecordColumn( + name="datetime_col", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=[GPUdbColumnProperty.DATETIME], + ), + ] + + records = convert_arrow_batch_to_records(batch, columns) + + # Verify date serialization + assert records[0]["date_col"] == "2024-01-15" + assert records[1]["date_col"] == "2024-12-31" + + # Verify time serialization + assert records[0]["time_col"] == "10:30:45" + assert records[1]["time_col"] == "23:59:59" + + # Verify datetime serialization (ISO format) + assert "2024-01-15" in records[0]["datetime_col"] + assert "10:30:45" in records[0]["datetime_col"] + + def test_convert_arrow_batch_datetime_json_serializable(self): + """Test that records with date/time are JSON serializable.""" + import json + from datetime import date, datetime, time + + from gpudb import GPUdbColumnProperty, GPUdbRecordColumn + + from ray.data._internal.datasource.kinetica_type_utils import ( + convert_arrow_batch_to_records, + ) + + data = { + "id": [1], + "date_col": [date(2024, 1, 15)], + "time_col": [time(10, 30, 45)], + "datetime_col": [datetime(2024, 1, 15, 10, 30, 45)], + } + batch = pa.RecordBatch.from_pydict(data) + + columns = [ + GPUdbRecordColumn( + name="id", + column_type=GPUdbRecordColumn._ColumnType.INT, + column_properties=[], + ), + GPUdbRecordColumn( + name="date_col", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=[GPUdbColumnProperty.DATE], + ), + GPUdbRecordColumn( + name="time_col", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=[GPUdbColumnProperty.TIME], + ), + GPUdbRecordColumn( + name="datetime_col", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=[GPUdbColumnProperty.DATETIME], + ), + ] + + records = convert_arrow_batch_to_records(batch, columns) + + # This should not raise TypeError + json_str = json.dumps(records[0]) + parsed = json.loads(json_str) + + assert isinstance(parsed["date_col"], str) + assert isinstance(parsed["time_col"], str) + assert isinstance(parsed["datetime_col"], str) + + def test_convert_arrow_batch_null_datetime(self): + """Test that null date/time values are handled correctly.""" + from datetime import date + + from gpudb import GPUdbColumnProperty, GPUdbRecordColumn + + from ray.data._internal.datasource.kinetica_type_utils import ( + convert_arrow_batch_to_records, + ) + + data = { + "id": [1, 2], + "date_col": [date(2024, 1, 15), None], + } + batch = pa.RecordBatch.from_pydict(data) + + columns = [ + GPUdbRecordColumn( + name="id", + column_type=GPUdbRecordColumn._ColumnType.INT, + column_properties=[], + ), + GPUdbRecordColumn( + name="date_col", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=[GPUdbColumnProperty.DATE], + ), + ] + + records = convert_arrow_batch_to_records(batch, columns) + + assert records[0]["date_col"] == "2024-01-15" + assert records[1]["date_col"] is None + + def test_convert_records_to_arrow_error_handling(self): + """Test that type conversion errors include column name.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + convert_records_to_arrow_table, + ) + + schema = pa.schema([pa.field("int_col", pa.int64())]) + bad_records = [{"int_col": "not_an_integer"}] + + with pytest.raises(pa.ArrowTypeError) as exc_info: + convert_records_to_arrow_table(bad_records, schema) + + error_msg = str(exc_info.value) + assert "int_col" in error_msg + + def test_vector_bytes_json_serialization(self): + """Test that vector (bytes) can be JSON serialized via base64.""" + import base64 + import json + import struct + + # Simulate the custom serializer used in _write_simple + def json_serializer(obj): + if isinstance(obj, bytes): + return base64.b64encode(obj).decode("ascii") + raise TypeError(f"Object of type {type(obj)} is not JSON serializable") + + # Create a 3D vector + vector_bytes = struct.pack("3f", 1.0, 2.0, 3.0) + record = {"id": 1, "embedding": vector_bytes} + + # Should serialize without error + json_str = json.dumps(record, default=json_serializer) + parsed = json.loads(json_str) + + assert isinstance(parsed["embedding"], str) + + # Verify we can decode back to original bytes + decoded = base64.b64decode(parsed["embedding"]) + assert decoded == vector_bytes + + def test_decimal_scale_zero_preserved(self): + """Test that decimal scale=0 is preserved, not treated as falsy.""" + from gpudb import GPUdbRecordColumn + + from ray.data._internal.datasource.kinetica_type_utils import ( + kinetica_to_arrow_type, + ) + + # Create a decimal column with explicit scale=0 + col = GPUdbRecordColumn( + name="amount", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=["decimal"], + precision=10, + scale=0, # Integer decimal (no decimal places) + ) + + arrow_type = kinetica_to_arrow_type(col) + + # Verify it's a decimal type + assert pa.types.is_decimal(arrow_type), f"Expected decimal, got {arrow_type}" + # Verify scale is 0, not the default (4) + assert arrow_type.scale == 0, ( + f"Expected scale=0, got {arrow_type.scale}. " + "Scale=0 should not be treated as falsy." + ) + assert arrow_type.precision == 10 + + def test_decimal_scale_none_uses_default(self): + """Test that decimal with scale=None uses the default scale.""" + from gpudb import GPUdbRecordColumn + + from ray.data._internal.datasource.kinetica_type_utils import ( + kinetica_to_arrow_type, + ) + + # Create a decimal column without explicit scale + col = GPUdbRecordColumn( + name="amount", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=["decimal"], + precision=18, + scale=None, # Should use default + ) + + arrow_type = kinetica_to_arrow_type(col) + + assert pa.types.is_decimal(arrow_type) + # Default scale is 4 + assert arrow_type.scale == GPUdbRecordColumn.DEFAULT_DECIMAL_SCALE + + def test_fixed_size_list_float_to_vector(self): + """Test fixed-size list of floats maps to Kinetica vector type.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_to_kinetica_type, + ) + + # 3D float vector + arrow_type = pa.list_(pa.float32(), 3) + kinetica_type, props = arrow_to_kinetica_type(arrow_type) + + assert kinetica_type == "bytes", f"Expected 'bytes', got {kinetica_type}" + assert "vector(3)" in props, f"Expected 'vector(3)' in props, got {props}" + + def test_fixed_size_list_double_to_vector(self): + """Test fixed-size list of doubles maps to Kinetica vector type.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_to_kinetica_type, + ) + + # 128D double vector + arrow_type = pa.list_(pa.float64(), 128) + kinetica_type, props = arrow_to_kinetica_type(arrow_type) + + assert kinetica_type == "bytes" + assert "vector(128)" in props + + def test_fixed_size_list_int_to_array(self): + """Test fixed-size list of non-floats maps to Kinetica array type.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_to_kinetica_type, + ) + + # Fixed-size int array + arrow_type = pa.list_(pa.int32(), 4) + kinetica_type, props = arrow_to_kinetica_type(arrow_type) + + assert kinetica_type == "string", f"Expected 'string', got {kinetica_type}" + assert "array(int,4)" in props, f"Expected 'array(int,4)' in props, got {props}" + + def test_variable_list_to_array(self): + """Test variable-length list maps to Kinetica array type without size.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_to_kinetica_type, + ) + + # Variable-length list + arrow_type = pa.list_(pa.float64()) + kinetica_type, props = arrow_to_kinetica_type(arrow_type) + + assert kinetica_type == "string" + # Should be array(double) without size + assert ( + "array(double)" in props + ), f"Expected 'array(double)' in props, got {props}" + + def test_convert_arrow_batch_null_columns(self): + """Test convert_arrow_batch_to_records handles None columns gracefully.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + convert_arrow_batch_to_records, + ) + + # Create a simple batch + batch = pa.RecordBatch.from_pydict( + { + "id": [1, 2, 3], + "name": ["Alice", "Bob", "Charlie"], + } + ) + + # Should not raise TypeError when columns is None + records = convert_arrow_batch_to_records(batch, None) + + assert len(records) == 3 + assert records[0]["id"] == 1 + assert records[0]["name"] == "Alice" + + def test_convert_arrow_batch_vector_invalid_values_error(self): + """Test that vector serialization with invalid values raises ValueError.""" + from gpudb import GPUdbRecordColumn + + from ray.data._internal.datasource.kinetica_type_utils import ( + convert_arrow_batch_to_records, + ) + + # Create a batch with a list that should be treated as a vector + # but contains non-float values + batch = pa.RecordBatch.from_pydict( + { + "id": [1], + "embedding": [["not", "floats", "here"]], # Strings instead of floats + } + ) + + # Create column definitions with vector type + columns = [ + GPUdbRecordColumn( + name="id", + column_type=GPUdbRecordColumn._ColumnType.INT, + column_properties=[], + ), + GPUdbRecordColumn( + name="embedding", + column_type=GPUdbRecordColumn._ColumnType.BYTES, + column_properties=["vector(3)"], + ), + ] + + # Should raise ValueError with helpful message including column name + with pytest.raises(ValueError, match="embedding"): + convert_arrow_batch_to_records(batch, columns) + + +# ============================================================================ +# Datasource Validation Tests +# ============================================================================ + + +class TestKineticaDatasourceValidation: + """Tests for input validation in KineticaDatasource.""" + + def test_base_class_attributes_initialized(self): + """Test that base class mixin attributes are properly initialized. + + KineticaDatasource must call super().__init__() to initialize + _predicate_expr from _DatasourcePredicatePushdownMixin. + """ + with patch( + "ray.data._internal.datasource.kinetica_datasource." + "KineticaDatasource._init_client" + ): + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + ) + + # These attributes are set by base class __init__ + # If super().__init__() wasn't called, these would raise AttributeError + assert hasattr(ds, "_predicate_expr") + assert ds._predicate_expr is None # Initial value + + def test_invalid_sort_order_rejected(self): + """Test that invalid sort_order values are rejected.""" + with pytest.raises(ValueError, match="Invalid sort_order"): + KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + sort_order="invalid", + ) + + def test_valid_sort_orders_accepted(self): + """Test that valid sort_order values are accepted.""" + with patch( + "ray.data._internal.datasource.kinetica_datasource." + "KineticaDatasource._init_client" + ): + # ascending should work + ds1 = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + sort_order="ascending", + ) + assert ds1._sort_order == "ascending" + + # descending should work + ds2 = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + sort_order="descending", + ) + assert ds2._sort_order == "descending" + + +# ============================================================================ +# Datasink Validation Tests +# ============================================================================ + + +class TestKineticaDatasinkValidation: + """Tests for input validation in KineticaDatasink.""" + + def test_base_class_attributes_initialized(self): + """Test that base class is properly initialized. + + KineticaDatasink must call super().__init__() to ensure + the Datasink base class initializes any required state. + """ + with patch( + "ray.data._internal.datasource.kinetica_datasink." + "KineticaDatasink._init_client" + ): + sink = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + ) + + # Verify the datasink is a proper Datasink instance + # and that super().__init__() was called (no AttributeError) + from ray.data.datasource.datasink import Datasink + + assert isinstance(sink, Datasink) + + +# ============================================================================ +# Datasink Serialization Tests +# ============================================================================ + + +class TestKineticaDatasinkSerialization: + """Tests for column serialization in KineticaDatasink.""" + + def test_decimal_columns_preserve_precision_scale(self): + """Test that decimal column precision/scale survives serialization.""" + from gpudb import GPUdbRecordColumn + + with patch( + "ray.data._internal.datasource.kinetica_datasink." + "KineticaDatasink._init_client" + ): + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + ) + + # Create a decimal column with specific precision and scale + decimal_col = GPUdbRecordColumn( + name="amount", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=["decimal"], + is_nullable=False, + precision=10, + scale=2, + ) + + # Serialize and deserialize + dicts = ds._columns_to_dicts([decimal_col]) + restored = ds._dicts_to_columns(dicts) + + # Verify precision and scale are preserved + assert restored[0].precision == 10 + assert restored[0].scale == 2 + + def test_non_decimal_columns_no_precision_scale(self): + """Test that non-decimal columns don't include precision/scale.""" + from gpudb import GPUdbRecordColumn + + with patch( + "ray.data._internal.datasource.kinetica_datasink." + "KineticaDatasink._init_client" + ): + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + ) + + # Create a regular string column + string_col = GPUdbRecordColumn( + name="name", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=[], + is_nullable=True, + ) + + # Serialize + dicts = ds._columns_to_dicts([string_col]) + + # Verify no precision/scale in dict (they weren't set) + assert "precision" not in dicts[0] or dicts[0].get("precision") is None + assert "scale" not in dicts[0] or dicts[0].get("scale") is None + + +# ============================================================================ +# GPUdbTable Creation Helper Tests +# ============================================================================ + + +class TestTryCreateGpudbTable: + """Tests for _try_create_gpudb_table helper method.""" + + @patch.object(KineticaDatasink, "_init_client") + @patch.object(KineticaDatasink, "_create_gpudb_table") + def test_success_returns_gpudb_table( + self, mock_create_gpudb_table, mock_init_client + ): + """Test that successful creation returns the GPUdbTable.""" + mock_client = MagicMock() + mock_init_client.return_value = mock_client + + mock_gpudb_table = MagicMock() + mock_create_gpudb_table.return_value = mock_gpudb_table + + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + use_multihead=False, + ) + + result = ds._try_create_gpudb_table(mock_client) + + assert result == mock_gpudb_table + mock_create_gpudb_table.assert_called_once_with(mock_client) + + @patch.object(KineticaDatasink, "_init_client") + @patch.object(KineticaDatasink, "_create_gpudb_table") + def test_failure_with_multihead_raises( + self, mock_create_gpudb_table, mock_init_client + ): + """Test that failure with multihead=True raises RuntimeError.""" + mock_client = MagicMock() + mock_init_client.return_value = mock_client + + mock_create_gpudb_table.side_effect = Exception("Connection failed") + + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + use_multihead=True, + ) + + with pytest.raises(RuntimeError, match="multihead ingest"): + ds._try_create_gpudb_table(mock_client) + + @patch.object(KineticaDatasink, "_init_client") + @patch.object(KineticaDatasink, "_create_gpudb_table") + def test_failure_without_multihead_returns_none( + self, mock_create_gpudb_table, mock_init_client + ): + """Test that failure with multihead=False returns None and logs warning.""" + mock_client = MagicMock() + mock_init_client.return_value = mock_client + + mock_create_gpudb_table.side_effect = Exception("Connection failed") + + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + use_multihead=False, + ) + + result = ds._try_create_gpudb_table(mock_client) + + assert result is None + + +# ============================================================================ +# Deferred Table Creation Tests +# ============================================================================ + + +class TestKineticaDatasinkDeferredCreation: + """Tests for deferred table creation in KineticaDatasink. + + Note: Since supports_distributed_writes returns False for deferred creation, + only a single worker runs - no race condition handling is needed. + """ + + @patch.object(KineticaDatasink, "_init_client") + @patch.object(KineticaDatasink, "_create_table") + @patch.object(KineticaDatasink, "_create_gpudb_table") + @patch( + "ray.data._internal.datasource.kinetica_type_utils." + "arrow_schema_to_kinetica_columns" + ) + @patch( + "ray.data._internal.datasource.kinetica_type_utils." + "convert_arrow_batch_to_records" + ) + def test_deferred_creation_creates_table( + self, + mock_convert, + mock_arrow_to_kinetica, + mock_create_gpudb_table, + mock_create_table, + mock_init_client, + ): + """Test deferred creation creates table. + + Note: _create_table sets _schema_string and _column_properties, + so no separate _get_existing_record_type call is needed. + """ + mock_client = MagicMock() + mock_init_client.return_value = mock_client + + mock_arrow_to_kinetica.return_value = [] + mock_convert.return_value = [{"id": 1}] + mock_create_gpudb_table.return_value = None + + # Create datasink without schema (deferred creation) + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + mode=KineticaSinkMode.APPEND, + use_multihead=False, + ) + ds._column_defs = None # Simulate deferred creation + + # Create test data + rb = pa.record_batch([pa.array([1])], names=["id"]) + block_data = pa.Table.from_batches([rb]) + + ctx = TaskContext(task_idx=0, op_name="test_write") + ds.write([block_data], ctx=ctx) + + # Verify table was created + mock_create_table.assert_called_once() + # _create_table sets _schema_string/_column_properties, + # so _get_existing_record_type is not called (no redundant network call) + + @patch.object(KineticaDatasink, "_init_client") + @patch.object(KineticaDatasink, "_create_table") + @patch( + "ray.data._internal.datasource.kinetica_type_utils." + "arrow_schema_to_kinetica_columns" + ) + def test_deferred_creation_error_propagated( + self, + mock_arrow_to_kinetica, + mock_create_table, + mock_init_client, + ): + """Test that errors during deferred creation are propagated.""" + mock_client = MagicMock() + mock_init_client.return_value = mock_client + + # Simulate a real error + mock_create_table.side_effect = Exception("Connection refused") + + mock_arrow_to_kinetica.return_value = [] + + # Create datasink without schema + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + mode=KineticaSinkMode.APPEND, + use_multihead=False, + ) + ds._column_defs = None + + # Create test data + rb = pa.record_batch([pa.array([1])], names=["id"]) + block_data = pa.Table.from_batches([rb]) + + ctx = TaskContext(task_idx=0, op_name="test_write") + + # Should raise the real error + with pytest.raises(Exception, match="Connection refused"): + ds.write([block_data], ctx=ctx) + + def test_supports_distributed_writes_false_for_deferred(self): + """Test that distributed writes are disabled for deferred creation. + + This ensures only a single worker runs when the table doesn't exist + and schema is unknown, preventing race conditions. + """ + with patch( + "ray.data._internal.datasource.kinetica_datasink." + "KineticaDatasink._init_client" + ): + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + mode=KineticaSinkMode.APPEND, + ) + # _table_ready is False when deferred + assert ds._table_ready is False + assert ds.supports_distributed_writes is False + + +# ============================================================================ +# Module Import Tests +# ============================================================================ + + +class TestModuleImports: + """Tests for module imports and API exposure.""" + + def test_read_kinetica_importable(self): + """Test read_kinetica is importable from ray.data.""" + try: + from ray.data import read_kinetica + + assert callable(read_kinetica) + except ImportError: + pytest.skip("read_kinetica not available in this Ray build") + + def test_read_kinetica_sql_importable(self): + """Test read_kinetica_sql is importable from ray.data.""" + try: + from ray.data import read_kinetica_sql + + assert callable(read_kinetica_sql) + except ImportError: + pytest.skip("read_kinetica_sql not available in this Ray build") + + def test_datasource_importable(self): + """Test KineticaDatasource is importable.""" + from ray.data._internal.datasource.kinetica_datasource import ( + KineticaDatasource, + ) + + assert KineticaDatasource is not None + + def test_datasink_importable(self): + """Test KineticaDatasink is importable.""" + from ray.data._internal.datasource.kinetica_datasink import ( + KineticaDatasink, + KineticaSinkMode, + KineticaTableSettings, + ) + + assert KineticaDatasink is not None + assert KineticaSinkMode is not None + assert KineticaTableSettings is not None + + def test_sql_connection_factory_importable(self): + """Test SQL connection factory is importable.""" + from ray.data._internal.datasource.kinetica_sql_connection import ( + KineticaConnectionFactory, + create_kinetica_connection_factory, + ) + + assert KineticaConnectionFactory is not None + assert callable(create_kinetica_connection_factory) + + def test_create_gpudb_client_importable(self): + """Test create_gpudb_client shared factory is importable.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + create_gpudb_client, + ) + + assert callable(create_gpudb_client) + + +# ============================================================================ +# Client Factory Tests +# ============================================================================ + + +class TestCreateGpudbClient: + """Tests for the shared create_gpudb_client factory function.""" + + def test_create_client_basic(self, patch_gpudb): + """Test basic client creation with minimal parameters.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + create_gpudb_client, + ) + + client = create_gpudb_client(url="http://localhost:9191") + assert client is not None + + def test_create_client_with_auth(self, patch_gpudb): + """Test client creation with authentication.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + create_gpudb_client, + ) + + client = create_gpudb_client( + url="http://localhost:9191", + username="admin", + password="password123", + ) + assert client is not None + + def test_create_client_with_options(self, patch_gpudb): + """Test client creation with additional options.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + create_gpudb_client, + ) + + client = create_gpudb_client( + url="http://localhost:9191", + username="admin", + password="password", + options={"timeout": 30000}, + ) + assert client is not None + + def test_datasource_uses_shared_factory(self): + """Test that KineticaDatasource uses the shared factory.""" + with patch( + "ray.data._internal.datasource.kinetica_type_utils.create_gpudb_client" + ) as mock_factory: + mock_factory.return_value = MagicMock() + + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + username="admin", + password="password", + ) + + ds._init_client() + + mock_factory.assert_called_once_with( + url="http://localhost:9191", + username="admin", + password="password", + options={}, + ) + + def test_datasink_uses_shared_factory(self): + """Test that KineticaDatasink uses the shared factory.""" + with patch( + "ray.data._internal.datasource.kinetica_type_utils.create_gpudb_client" + ) as mock_factory: + mock_factory.return_value = MagicMock() + + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + username="admin", + password="password", + ) + + ds._init_client() + + mock_factory.assert_called_once_with( + url="http://localhost:9191", + username="admin", + password="password", + options={}, + ) + + +# ============================================================================ +# Integration Tests (require Kinetica server) +# ============================================================================ + + +@pytest.mark.skipif( + not os.environ.get("KINETICA_URL"), + reason="Integration tests require KINETICA_URL environment variable", +) +class TestKineticaIntegration: + """Integration tests requiring a running Kinetica server.""" + + @pytest.fixture + def connection_params(self): + """Get connection parameters from environment.""" + return { + "url": os.environ.get("KINETICA_URL", "http://localhost:9191"), + "username": os.environ.get("KINETICA_USER", "admin"), + "password": os.environ.get("KINETICA_PASS", ""), + } + + def test_read_simple_query(self, connection_params): + """Test reading data from a Kinetica table.""" + import ray + + if not ray.is_initialized(): + ray.init(ignore_reinit_error=True) + + try: + ds = ray.data.read_kinetica( + table_name="ki_home.ki_catalog_ddl", # System table that should exist + **connection_params, + limit=10, + ) + + count = ds.count() + assert count >= 0 # Table might be empty but query should work + + finally: + pass + + def test_write_and_read_roundtrip(self, connection_params): + """Test writing and reading back data.""" + import ray + + if not ray.is_initialized(): + ray.init(ignore_reinit_error=True) + + table_name = "test_ray_roundtrip" + + try: + # Create test data + data = [ + {"id": 1, "name": "Alice", "value": 100.5}, + {"id": 2, "name": "Bob", "value": 200.75}, + {"id": 3, "name": "Charlie", "value": 300.25}, + ] + ds = ray.data.from_items(data) + + # Write to Kinetica + ds.write_kinetica( + table_name=table_name, + mode="overwrite", + **connection_params, + ) + + # Read back + read_ds = ray.data.read_kinetica( + table_name=table_name, + **connection_params, + ) + + # Verify + assert read_ds.count() == 3 + + finally: + # Cleanup: drop the test table + try: + from gpudb import GPUdb + + client = GPUdb( + host=connection_params["url"], + username=connection_params.get("username"), + password=connection_params.get("password"), + ) + client.clear_table(table_name=table_name) + except Exception: + pass + + def test_read_with_filter(self, connection_params): + """Test reading with a filter expression.""" + import ray + + if not ray.is_initialized(): + ray.init(ignore_reinit_error=True) + + table_name = "test_ray_filter" + + try: + # Create test data + data = [{"id": i, "value": i * 10} for i in range(100)] + ds = ray.data.from_items(data) + + # Write to Kinetica + ds.write_kinetica( + table_name=table_name, + mode="overwrite", + **connection_params, + ) + + # Read with filter + read_ds = ray.data.read_kinetica( + table_name=table_name, + filter_expression="value >= 500", + **connection_params, + ) + + # Verify filter worked + count = read_ds.count() + assert count == 50 # ids 50-99 have values >= 500 + + finally: + # Cleanup + try: + from gpudb import GPUdb + + client = GPUdb( + host=connection_params["url"], + username=connection_params.get("username"), + password=connection_params.get("password"), + ) + client.clear_table(table_name=table_name) + except Exception: + pass + + def test_read_specific_columns(self, connection_params): + """Test reading specific columns.""" + import ray + + if not ray.is_initialized(): + ray.init(ignore_reinit_error=True) + + table_name = "test_ray_columns" + + try: + # Create test data + data = [ + {"id": 1, "name": "Alice", "value": 100.5, "extra": "data"}, + ] + ds = ray.data.from_items(data) + + # Write to Kinetica + ds.write_kinetica( + table_name=table_name, + mode="overwrite", + **connection_params, + ) + + # Read specific columns + read_ds = ray.data.read_kinetica( + table_name=table_name, + columns=["id", "name"], + **connection_params, + ) + + # Verify only requested columns present + row = read_ds.take(1)[0] + assert "id" in row + assert "name" in row + assert "value" not in row + assert "extra" not in row + + finally: + # Cleanup + try: + from gpudb import GPUdb + + client = GPUdb( + host=connection_params["url"], + username=connection_params.get("username"), + password=connection_params.get("password"), + ) + client.clear_table(table_name=table_name) + except Exception: + pass + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) From afe25579e44e35a297d1d12285263bb5a873a597 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Mon, 18 May 2026 09:24:17 +0530 Subject: [PATCH 2/9] Reinstated missing read methods Signed-off-by: anindyam1969 --- python/ray/data/read_api.py | 180 ++++++++++++++++++++++++++++++++++++ 1 file changed, 180 insertions(+) diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 797d9e3abdd8..4fc828a82d87 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -1032,6 +1032,186 @@ def read_mongo( ) +@PublicAPI(stability="alpha") +def read_kinetica( + table_name: str, + url: str, + *, + username: Optional[str] = None, + password: Optional[str] = None, + columns: Optional[List[str]] = None, + filter_expression: Optional[str] = None, + sort_by: Optional[str] = None, + sort_order: str = "ascending", + limit: Optional[int] = None, + batch_size: int = 10000, + use_multihead_io: bool = False, + convert_special_types: bool = True, + options: Optional[Dict[str, Any]] = None, + parallelism: int = -1, + num_cpus: Optional[float] = None, + num_gpus: Optional[float] = None, + memory: Optional[float] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, +) -> Dataset: + """Create a :class:`~ray.data.Dataset` from a Kinetica database table. + + Kinetica is a distributed, in-memory analytical database designed for + real-time analytics on streaming and historical data. This function reads + data using Kinetica's native API with parallel pagination. + + Examples: + >>> import ray # doctest: +SKIP + >>> ds = ray.data.read_kinetica( # doctest: +SKIP + ... table_name="transactions", + ... url="http://localhost:9191", + ... username="admin", + ... password="password", + ... filter_expression="amount > 1000", + ... columns=["id", "customer", "amount"], + ... ) + + Args: + table_name: Name of the Kinetica table to read. + url: URL of the Kinetica server (e.g., "http://localhost:9191"). + username: Authentication username. + password: Authentication password. + columns: Specific columns to read. None reads all columns. + filter_expression: SQL WHERE clause filter (without WHERE keyword). + sort_by: Column to sort by for consistent pagination. + sort_order: "ascending" or "descending". + limit: Maximum rows to read. + batch_size: Records per API request for pagination. Default is 10,000. + use_multihead_io: If True, enables multihead I/O for parallel reads + from multiple Kinetica nodes. Can improve performance for large + datasets on clustered deployments. Default is False. + convert_special_types: If True, converts special types (arrays, JSON) + on retrieval. Default is True. + options: Additional GPUdb client options. + parallelism: This argument is deprecated. Use ``override_num_blocks``. + num_cpus: The number of CPUs to reserve for each parallel read worker. + num_gpus: The number of GPUs to reserve for each parallel read worker. + memory: The heap memory in bytes to reserve for each parallel read worker. + ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks. + concurrency: The maximum number of Ray tasks to run concurrently. + override_num_blocks: Override the number of output blocks from all read tasks. + + Returns: + :class:`~ray.data.Dataset` producing rows from the Kinetica table. + """ + from ray.data._internal.datasource.kinetica_datasource import ( + KineticaDatasource, + ) + + datasource = KineticaDatasource( + url=url, + table_name=table_name, + username=username, + password=password, + columns=columns, + filter_expression=filter_expression, + sort_by=sort_by, + sort_order=sort_order, + limit=limit, + batch_size=batch_size, + use_multihead_io=use_multihead_io, + convert_special_types=convert_special_types, + options=options, + ) + return read_datasource( + datasource, + num_cpus=num_cpus, + num_gpus=num_gpus, + memory=memory, + parallelism=parallelism, + ray_remote_args=ray_remote_args, + concurrency=concurrency, + override_num_blocks=override_num_blocks, + ) + + +@PublicAPI(stability="alpha") +def read_kinetica_sql( + sql: str, + url: str, + *, + username: Optional[str] = None, + password: Optional[str] = None, + oauth_token: Optional[str] = None, + default_schema: Optional[str] = None, + options: Optional[Dict[str, Any]] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, +) -> Dataset: + """Create a :class:`~ray.data.Dataset` from a Kinetica SQL query. + + This function uses Kinetica's DB-API 2.0 compliant interface with Ray Data's + native ``read_sql`` method. It's ideal for complex SQL queries including JOINs, + aggregations, and subqueries. + + For simple table reads with parallel pagination, consider using + :func:`~ray.data.read_kinetica` instead which uses Kinetica's native API. + + Examples: + >>> import ray # doctest: +SKIP + >>> ds = ray.data.read_kinetica_sql( # doctest: +SKIP + ... sql="SELECT t.id, t.customer, SUM(t.amount) as total " + ... "FROM transactions t " + ... "JOIN customers c ON t.customer_id = c.id " + ... "GROUP BY t.id, t.customer", + ... url="http://localhost:9191", + ... username="admin", + ... password="password", + ... ) + + Args: + sql: SQL query to execute. + url: URL of the Kinetica server (e.g., "http://localhost:9191"). + username: Authentication username. + password: Authentication password. + oauth_token: OAuth token for authentication (alternative to username/password). + default_schema: Default schema to use for queries. + options: Additional GPUdb client options. + ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks. + concurrency: The maximum number of Ray tasks to run concurrently. + override_num_blocks: Override the number of output blocks. + + Returns: + :class:`~ray.data.Dataset` producing rows from the query results. + """ + from ray.data._internal.datasource.kinetica_sql_connection import ( + create_kinetica_connection_factory, + ) + + connection_factory = create_kinetica_connection_factory( + url=url, + username=username, + password=password, + oauth_token=oauth_token, + default_schema=default_schema, + options=options, + ) + + read_kwargs: Dict[str, Any] = { + "sql": sql, + "connection_factory": connection_factory, + } + + if ray_remote_args is not None: + read_kwargs["ray_remote_args"] = ray_remote_args + + if concurrency is not None: + read_kwargs["concurrency"] = concurrency + + if override_num_blocks is not None: + read_kwargs["override_num_blocks"] = override_num_blocks + + return read_sql(**read_kwargs) + + @PublicAPI(stability="alpha") def read_bigquery( project_id: str, From 0359b7857c17d374dd178a6daa0d584c5e381f19 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Mon, 18 May 2026 09:43:50 +0530 Subject: [PATCH 3/9] Added the schema parameter to the read function Signed-off-by: anindyam1969 --- .../ray/data/_internal/datasource/kinetica_datasource.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/datasource/kinetica_datasource.py b/python/ray/data/_internal/datasource/kinetica_datasource.py index 1275248604c2..401fa06a2b7c 100644 --- a/python/ray/data/_internal/datasource/kinetica_datasource.py +++ b/python/ray/data/_internal/datasource/kinetica_datasource.py @@ -639,7 +639,12 @@ def get_read_tasks( read_fn = self._create_read_fn(offset, partition_size) read_tasks.append( - ReadTask(read_fn, metadata, per_task_row_limit=per_task_row_limit) + ReadTask( + read_fn, + metadata, + schema=self._arrow_schema, + per_task_row_limit=per_task_row_limit, + ) ) offset += partition_size From 1a93eddf9b53ef1b8c3ec1a50f9d428cf5db4bfb Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Mon, 18 May 2026 09:56:11 +0530 Subject: [PATCH 4/9] Fixed Cursor bugbot reported issues Signed-off-by: anindyam1969 --- .../_internal/datasource/kinetica_datasink.py | 57 +++++++++++++------ .../datasource/kinetica_datasource.py | 8 +-- 2 files changed, 43 insertions(+), 22 deletions(-) diff --git a/python/ray/data/_internal/datasource/kinetica_datasink.py b/python/ray/data/_internal/datasource/kinetica_datasink.py index f83977fe2892..4db4959a38da 100644 --- a/python/ray/data/_internal/datasource/kinetica_datasink.py +++ b/python/ray/data/_internal/datasource/kinetica_datasink.py @@ -246,7 +246,11 @@ def _create_table(self, client, columns): label=self._table_name, ) - options = {} + options = { + # Avoid error if table already exists (e.g., race condition + # or retry scenario). The table schema must still be compatible. + "no_error_if_exists": "true", + } if self._table_settings.is_replicated: options["is_replicated"] = "true" @@ -398,8 +402,18 @@ def _get_record_type(self): return GPUdbRecordType(columns=columns, label=self._table_name) return None - def _create_gpudb_table(self, client): - """Create a GPUdbTable instance for writing records.""" + def _create_gpudb_table(self, client, table_exists: bool = False): + """Create a GPUdbTable instance for writing records. + + Args: + client: GPUdb client instance. + table_exists: If True, the table already exists and table creation + options (is_replicated, chunk_size, ttl, etc.) will be ignored + since they only apply when creating a new table. + + Returns: + GPUdbTable instance configured for writing. + """ from gpudb import GPUdbException, GPUdbTable, GPUdbTableOptions record_type = self._get_record_type() @@ -409,22 +423,25 @@ def _create_gpudb_table(self, client): "Cannot create GPUdbTable: no schema information available" ) + # Only set table creation options when the table doesn't exist. + # These options have no effect on existing tables and could cause + # issues with some GPUdb SDK versions. table_options = GPUdbTableOptions() + if not table_exists: + if self._table_settings.is_replicated: + table_options.is_replicated = True - if self._table_settings.is_replicated: - table_options.is_replicated = True - - if self._table_settings.chunk_size is not None: - table_options.chunk_size = self._table_settings.chunk_size + if self._table_settings.chunk_size is not None: + table_options.chunk_size = self._table_settings.chunk_size - if self._table_settings.ttl >= 0: - table_options.ttl = self._table_settings.ttl + if self._table_settings.ttl >= 0: + table_options.ttl = self._table_settings.ttl - if not self._table_settings.persist: - table_options.no_persist = True + if not self._table_settings.persist: + table_options.no_persist = True - if self._table_settings.collection_name: - table_options.collection_name = self._table_settings.collection_name + if self._table_settings.collection_name: + table_options.collection_name = self._table_settings.collection_name gpudb_table = GPUdbTable( _type=record_type, @@ -442,11 +459,13 @@ def _create_gpudb_table(self, client): return gpudb_table - def _try_create_gpudb_table(self, client: Any): + def _try_create_gpudb_table(self, client: Any, table_exists: bool = False): """Try to create GPUdbTable, handling errors based on multihead setting. Args: client: GPUdb client instance. + table_exists: If True, the table already exists and table creation + options will be skipped. Returns: GPUdbTable instance if successful, None if failed and multihead @@ -456,7 +475,7 @@ def _try_create_gpudb_table(self, client: Any): RuntimeError: If multihead is required but GPUdbTable creation fails. """ try: - return self._create_gpudb_table(client) + return self._create_gpudb_table(client, table_exists=table_exists) except Exception as e: if self._use_multihead: raise RuntimeError( @@ -503,7 +522,8 @@ def write( gpudb_table = None if self._schema_string is not None or self._column_defs is not None: - gpudb_table = self._try_create_gpudb_table(client) + # Table already exists at this point (created in on_write_start) + gpudb_table = self._try_create_gpudb_table(client, table_exists=True) for block in blocks: accessor = BlockAccessor.for_block(block) @@ -524,7 +544,8 @@ def write( # are used by _get_record_type (called from _create_gpudb_table). self._create_table(client, kinetica_columns) - gpudb_table = self._try_create_gpudb_table(client) + # Table was just created by _create_table, so it exists now + gpudb_table = self._try_create_gpudb_table(client, table_exists=True) for batch in arrow_table.to_batches(): records = convert_arrow_batch_to_records( diff --git a/python/ray/data/_internal/datasource/kinetica_datasource.py b/python/ray/data/_internal/datasource/kinetica_datasource.py index 401fa06a2b7c..32c6071b8ea2 100644 --- a/python/ray/data/_internal/datasource/kinetica_datasource.py +++ b/python/ray/data/_internal/datasource/kinetica_datasource.py @@ -64,14 +64,14 @@ def _has_balanced_quotes(expr: str) -> bool: while i < len(expr): char = expr[i] if not in_double and char == "'": - # Check for escaped quote '' - if i + 1 < len(expr) and expr[i + 1] == "'": + # Check for escaped quote '' only when inside a single-quoted string + if in_single and i + 1 < len(expr) and expr[i + 1] == "'": i += 2 # Skip escaped quote continue in_single = not in_single elif not in_single and char == '"': - # Check for escaped quote "" - if i + 1 < len(expr) and expr[i + 1] == '"': + # Check for escaped quote "" only when inside a double-quoted string + if in_double and i + 1 < len(expr) and expr[i + 1] == '"': i += 2 # Skip escaped quote continue in_double = not in_double From 46cc092016c1b0460f8a98fbb97575ad2c70692f Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Mon, 18 May 2026 10:05:38 +0530 Subject: [PATCH 5/9] Fixed test assertions Signed-off-by: anindyam1969 --- python/ray/data/tests/datasource/test_kinetica.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/tests/datasource/test_kinetica.py b/python/ray/data/tests/datasource/test_kinetica.py index 3f760d5c0d31..e463c2a06bc3 100644 --- a/python/ray/data/tests/datasource/test_kinetica.py +++ b/python/ray/data/tests/datasource/test_kinetica.py @@ -1415,7 +1415,7 @@ def test_success_returns_gpudb_table( result = ds._try_create_gpudb_table(mock_client) assert result == mock_gpudb_table - mock_create_gpudb_table.assert_called_once_with(mock_client) + mock_create_gpudb_table.assert_called_once_with(mock_client, table_exists=False) @patch.object(KineticaDatasink, "_init_client") @patch.object(KineticaDatasink, "_create_gpudb_table") From 0a5320e9fbbc77f51e9c5bd614f0cac1a0dfae3a Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Mon, 18 May 2026 10:32:21 +0530 Subject: [PATCH 6/9] Fixed datasink type hint Signed-off-by: anindyam1969 --- python/ray/data/_internal/datasource/kinetica_datasink.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/datasource/kinetica_datasink.py b/python/ray/data/_internal/datasource/kinetica_datasink.py index 4db4959a38da..6b899984ea18 100644 --- a/python/ray/data/_internal/datasource/kinetica_datasink.py +++ b/python/ray/data/_internal/datasource/kinetica_datasink.py @@ -17,6 +17,7 @@ if TYPE_CHECKING: import pyarrow as pa + from gpudb import GPUdb logger = logging.getLogger(__name__) @@ -402,7 +403,7 @@ def _get_record_type(self): return GPUdbRecordType(columns=columns, label=self._table_name) return None - def _create_gpudb_table(self, client, table_exists: bool = False): + def _create_gpudb_table(self, client: "GPUdb", table_exists: bool = False): """Create a GPUdbTable instance for writing records. Args: From c1e3afaf84492e7f4becbcf90b104513c6b5b0df Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Mon, 18 May 2026 10:40:30 +0530 Subject: [PATCH 7/9] Updated dataasource code Signed-off-by: anindyam1969 --- .../_internal/datasource/kinetica_datasource.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/datasource/kinetica_datasource.py b/python/ray/data/_internal/datasource/kinetica_datasource.py index 32c6071b8ea2..8b349a11beab 100644 --- a/python/ray/data/_internal/datasource/kinetica_datasource.py +++ b/python/ray/data/_internal/datasource/kinetica_datasource.py @@ -584,8 +584,16 @@ def get_read_tasks( # Estimate row size for metadata avg_row_size = self._estimate_row_size(client) - # Ensure parallelism is at least 1 to handle unresolved or invalid values - effective_parallelism = max(1, parallelism) + # Handle parallelism=-1 (auto-detect) by computing based on data size. + # Use min_records_per_task to determine a reasonable parallelism. + min_records_per_task = 1000 + if parallelism == -1: + # Auto-detect: aim for ~min_records_per_task rows per task + effective_parallelism = max(1, self._total_count // min_records_per_task) + else: + # Ensure parallelism is at least 1 to handle invalid values + effective_parallelism = max(1, parallelism) + if not self._sort_by and effective_parallelism > 1: # Without a deterministic sort order, offset-based pagination # with parallelism > 1 will produce incorrect results (duplicates @@ -604,7 +612,6 @@ def get_read_tasks( records_per_task = max(1, self._total_count // effective_parallelism) # Ensure we don't create too many tiny tasks - min_records_per_task = 1000 if ( records_per_task < min_records_per_task and self._total_count > min_records_per_task From d5a4aa56a352601c899b1eef9840d126b2e10b18 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Thu, 28 May 2026 11:08:03 +0530 Subject: [PATCH 8/9] Fix Kinetica datasource/datasink bugs - Remove redundant value is not None checks in convert_arrow_batch_to_records (the None case is already handled earlier in the function) - Recompute records_per_task after parallelism cap in get_read_tasks to ensure even distribution of work across tasks - Pass PyArrow schema instead of Ray Data Schema to KineticaDatasink in write_kinetica to fix type mismatch Signed-off-by: anindyam1969 --- .../ray/data/_internal/datasource/kinetica_datasource.py | 4 ++++ .../ray/data/_internal/datasource/kinetica_type_utils.py | 9 ++++++--- python/ray/data/dataset.py | 9 ++++++++- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/python/ray/data/_internal/datasource/kinetica_datasource.py b/python/ray/data/_internal/datasource/kinetica_datasource.py index 8b349a11beab..1e45e11d2213 100644 --- a/python/ray/data/_internal/datasource/kinetica_datasource.py +++ b/python/ray/data/_internal/datasource/kinetica_datasource.py @@ -623,6 +623,10 @@ def get_read_tasks( # This handles the case where parallelism > total_count effective_parallelism = min(effective_parallelism, self._total_count) + # Recompute records_per_task after capping parallelism to ensure + # even distribution of work across tasks + records_per_task = max(1, self._total_count // effective_parallelism) + read_tasks = [] offset = 0 diff --git a/python/ray/data/_internal/datasource/kinetica_type_utils.py b/python/ray/data/_internal/datasource/kinetica_type_utils.py index 838212c3128f..bb39b8216e9c 100644 --- a/python/ray/data/_internal/datasource/kinetica_type_utils.py +++ b/python/ray/data/_internal/datasource/kinetica_type_utils.py @@ -604,19 +604,22 @@ def convert_arrow_batch_to_records( elif isinstance(value, date): record[col_name] = value.isoformat() else: - record[col_name] = str(value) if value is not None else None + # value is not None here (handled by if block at line 569) + record[col_name] = str(value) elif col_type == "time": # Convert time to ISO format string (HH:MM:SS.ffffff) if isinstance(value, time): record[col_name] = value.isoformat() else: - record[col_name] = str(value) if value is not None else None + # value is not None here (handled by if block at line 569) + record[col_name] = str(value) elif col_type in ("datetime", "timestamp"): # Convert datetime to ISO format string (YYYY-MM-DDTHH:MM:SS.ffffff) if isinstance(value, datetime): record[col_name] = value.isoformat() else: - record[col_name] = str(value) if value is not None else None + # value is not None here (handled by if block at line 569) + record[col_name] = str(value) else: # Handle any remaining date/time types that weren't detected # by column properties diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 5dbdcf497549..4c79c5b2ff8e 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -5273,13 +5273,20 @@ def write_kinetica( """ from ray.data._internal.datasource.kinetica_datasink import KineticaDatasink + # Extract the underlying PyArrow schema from Ray Data Schema. + # KineticaDatasink expects pa.Schema, not ray.data.Schema. + ray_schema = self.schema() + pa_schema = ( + ray_schema.base_schema if hasattr(ray_schema, "base_schema") else ray_schema + ) + datasink = KineticaDatasink( url=url, table_name=table_name, username=username, password=password, mode=mode, - schema=self.schema(), + schema=pa_schema, table_settings=table_settings, batch_size=batch_size, use_multihead=use_multihead, From 2f5b97a54cdf8f4143650c79e5d23509d6ba2bd8 Mon Sep 17 00:00:00 2001 From: anindyam1969 Date: Thu, 28 May 2026 11:27:49 +0530 Subject: [PATCH 9/9] Validate limit parameter in KineticaDatasource to reject negative values Signed-off-by: anindyam1969 --- .../ray/data/_internal/datasource/kinetica_datasource.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/python/ray/data/_internal/datasource/kinetica_datasource.py b/python/ray/data/_internal/datasource/kinetica_datasource.py index 1e45e11d2213..062f021f7a89 100644 --- a/python/ray/data/_internal/datasource/kinetica_datasource.py +++ b/python/ray/data/_internal/datasource/kinetica_datasource.py @@ -214,7 +214,15 @@ def __init__( self._password = password self._filter_expression = filter_expression self._sort_by = sort_by + + # Validate limit - None means no limit, positive integer means specific limit + if limit is not None and limit <= 0: + raise ValueError( + f"limit must be a positive integer or None, got {limit}. " + "Use None for no limit." + ) self._limit = limit + if batch_size is not None: if batch_size <= 0: raise ValueError(