diff --git a/python/ray/data/__init__.py b/python/ray/data/__init__.py index f58c59873284..de849386e822 100644 --- a/python/ray/data/__init__.py +++ b/python/ray/data/__init__.py @@ -70,6 +70,8 @@ read_iceberg, read_images, read_json, + read_kinetica, + read_kinetica_sql, read_lance, read_mcap, read_mongo, @@ -180,6 +182,8 @@ "read_iceberg", "read_images", "read_json", + "read_kinetica", + "read_kinetica_sql", "read_lance", "read_mcap", "read_numpy", diff --git a/python/ray/data/_internal/datasource/kinetica_datasink.py b/python/ray/data/_internal/datasource/kinetica_datasink.py new file mode 100644 index 000000000000..6b899984ea18 --- /dev/null +++ b/python/ray/data/_internal/datasource/kinetica_datasink.py @@ -0,0 +1,720 @@ +""" +Kinetica Datasink for Ray Data. + +This module provides a Ray Data Datasink implementation for writing data +to Kinetica databases. +""" + +import logging +from dataclasses import dataclass, field +from enum import IntEnum +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Union + +from ray.data._internal.execution.interfaces import TaskContext +from ray.data._internal.util import _check_import +from ray.data.block import Block, BlockAccessor +from ray.data.datasource.datasink import Datasink + +if TYPE_CHECKING: + import pyarrow as pa + from gpudb import GPUdb + + +logger = logging.getLogger(__name__) + + +class KineticaSinkMode(IntEnum): + """Defines how the datasink should handle existing tables.""" + + CREATE = 1 + """Create a new table. Fails if the table already exists.""" + + APPEND = 2 + """Append to an existing table, or create if it doesn't exist.""" + + OVERWRITE = 3 + """Drop and recreate the table if it exists.""" + + +@dataclass +class KineticaTableSettings: + """Settings for Kinetica table creation.""" + + is_replicated: bool = False + """If True, creates a replicated table (data copied to all nodes).""" + + chunk_size: int = 8000000 + """Number of records per chunk for data storage.""" + + ttl: int = -1 + """Time-to-live in minutes. -1 means no expiration.""" + + primary_keys: List[str] = field(default_factory=list) + """List of column names to use as primary key.""" + + shard_keys: List[str] = field(default_factory=list) + """List of column names to use as shard key.""" + + persist: bool = True + """If True, the table will be persisted to disk.""" + + collection_name: Optional[str] = None + """Optional schema/collection name for the table.""" + + update_on_existing_pk: bool = True + """If True, updates existing records with matching primary keys. + If False, insert fails on PK conflict.""" + + +class KineticaDatasink(Datasink): + """ + A Ray Data Datasink for writing to Kinetica databases. + + This datasink supports distributed writes using Kinetica's multihead + ingestion capabilities for optimal performance. + + Example: + >>> import ray # doctest: +SKIP + >>> ds = ray.data.from_items([{"id": 1}]) # doctest: +SKIP + >>> ds.write_datasink( # doctest: +SKIP + ... KineticaDatasink( + ... url="http://localhost:9191", + ... table_name="my_table", + ... username="admin", + ... password="password", + ... ) + ... ) + """ + + def __init__( + self, + url: str, + table_name: str, + username: Optional[str] = None, + password: Optional[str] = None, + mode: Union[KineticaSinkMode, str] = KineticaSinkMode.APPEND, + schema: Optional["pa.Schema"] = None, + table_settings: Optional[KineticaTableSettings] = None, + batch_size: int = 10000, + use_multihead: bool = True, + options: Optional[Dict[str, Any]] = None, + ): + """ + Initialize the Kinetica datasink. + + Args: + url: URL of the Kinetica server (e.g., "http://localhost:9191"). + table_name: Name of the target table. + username: Username for authentication. + password: Password for authentication. + mode: How to handle existing tables ("create", "append", "overwrite"). + schema: Optional PyArrow schema for table creation. + table_settings: Optional table configuration settings. + batch_size: Number of records per batch for ingestion. + use_multihead: Whether to use multihead ingestion for parallelism. + options: Additional GPUdb client options. + """ + super().__init__() + _check_import(self, module="gpudb", package="gpudb") + + self._url = url + self._table_name = table_name + self._username = username + self._password = password + + # Handle string mode values + if isinstance(mode, str): + mode_map = { + "create": KineticaSinkMode.CREATE, + "append": KineticaSinkMode.APPEND, + "overwrite": KineticaSinkMode.OVERWRITE, + } + mode_lower = mode.lower() + if mode_lower not in mode_map: + raise ValueError( + f"Invalid mode '{mode}'. " + "Must be one of: 'create', 'append', 'overwrite'" + ) + mode = mode_map[mode_lower] + + self._mode = mode + self._schema = schema + self._table_settings = table_settings or KineticaTableSettings() + self._batch_size = batch_size + self._use_multihead = use_multihead + self._options = options or {} + + # Serializable column definitions + self._column_defs: Optional[List[Dict[str, Any]]] = None + self._schema_string: Optional[str] = None + self._column_properties: Optional[Dict[str, List[str]]] = None + self._table_ready: bool = False + + def _init_client(self): + """Create and return a GPUdb client instance.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + create_gpudb_client, + ) + + return create_gpudb_client( + url=self._url, + username=self._username, + password=self._password, + options=self._options, + ) + + def _columns_to_dicts(self, columns) -> List[Dict[str, Any]]: + """Convert GPUdbRecordColumn objects to serializable dicts.""" + result = [] + for col in columns: + col_dict = { + "name": col.name, + "column_type": col.column_type, + "column_properties": col.column_properties, + "is_nullable": col.is_nullable, + } + # Preserve precision and scale for decimal columns + if hasattr(col, "precision") and col.precision is not None: + col_dict["precision"] = col.precision + if hasattr(col, "scale") and col.scale is not None: + col_dict["scale"] = col.scale + result.append(col_dict) + return result + + def _dicts_to_columns(self, dicts: List[Dict[str, Any]]): + """Convert serializable dicts back to GPUdbRecordColumn objects.""" + from gpudb import GPUdbRecordColumn + + result = [] + for d in dicts: + col_kwargs = { + "name": d["name"], + "column_type": d["column_type"], + "column_properties": d["column_properties"], + "is_nullable": d["is_nullable"], + } + # Restore precision and scale for decimal columns if present + if "precision" in d: + col_kwargs["precision"] = d["precision"] + if "scale" in d: + col_kwargs["scale"] = d["scale"] + result.append(GPUdbRecordColumn(**col_kwargs)) + return result + + def _table_exists(self, client) -> bool: + """Check if the target table exists.""" + from gpudb import GPUdbException + + try: + response = client.has_table(table_name=self._table_name) + return response.get("table_exists", False) + except GPUdbException: + return False + + def _drop_table(self, client: Any) -> None: + """Drop the target table if it exists. + + Uses the 'no_error_if_not_exists' option to avoid exceptions when the + table doesn't exist, making this operation idempotent. + + Args: + client: GPUdb client instance. + + Raises: + GPUdbException: If the table exists but cannot be dropped + (e.g., due to permissions). + """ + from gpudb import GPUdbException + + try: + client.clear_table( + table_name=self._table_name, + options={"no_error_if_not_exists": "true"}, + ) + logger.info(f"Dropped table '{self._table_name}'") + except GPUdbException as e: + # Real error (e.g., permissions) - propagate it + raise GPUdbException( + f"Failed to drop table '{self._table_name}' in OVERWRITE mode: {e}" + ) from e + + def _create_table(self, client, columns): + """Create the target table with the given column definitions.""" + from gpudb import GPUdbRecordType + + record_type = GPUdbRecordType( + columns=columns, + label=self._table_name, + ) + + options = { + # Avoid error if table already exists (e.g., race condition + # or retry scenario). The table schema must still be compatible. + "no_error_if_exists": "true", + } + + if self._table_settings.is_replicated: + options["is_replicated"] = "true" + + if self._table_settings.chunk_size is not None: + options["chunk_size"] = str(self._table_settings.chunk_size) + + if self._table_settings.ttl >= 0: + options["ttl"] = str(self._table_settings.ttl) + + if not self._table_settings.persist: + options["no_persist"] = "true" + + if self._table_settings.collection_name: + options["collection_name"] = self._table_settings.collection_name + + type_id = record_type.create_type(client) + + client.create_table( + table_name=self._table_name, + type_id=type_id, + options=options, + ) + + logger.info(f"Created table '{self._table_name}' with type_id '{type_id}'") + + self._schema_string = record_type.schema_string + self._column_properties = record_type.column_properties + + return record_type + + def _get_existing_record_type(self, client): + """Get the record type for an existing table.""" + from gpudb import GPUdbException, GPUdbRecordType + + response = client.show_table( + table_name=self._table_name, + options={"get_column_info": "true"}, + ) + + type_ids = response.get("type_ids", []) + type_schemas = response.get("type_schemas", []) + properties_list = response.get("properties", [{}]) + + if not type_ids or not type_schemas: + raise GPUdbException( + f"Could not retrieve type information for table '{self._table_name}'" + ) + + type_schema = type_schemas[0] + props_dict = properties_list[0] if properties_list else {} + + self._schema_string = type_schema + self._column_properties = props_dict + + record_type = GPUdbRecordType( + schema_string=type_schema, + column_properties=props_dict, + ) + + return record_type + + def on_write_start(self, schema: Optional["pa.Schema"] = None) -> None: + """ + Called before writing begins. + + Args: + schema: Optional PyArrow schema passed by Ray Data framework. + """ + from gpudb import GPUdbException + + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_schema_to_kinetica_columns, + ) + + if schema is not None and self._schema is None: + self._schema = schema + + client = self._init_client() + + table_exists = self._table_exists(client) + + if self._mode == KineticaSinkMode.CREATE: + if table_exists: + raise GPUdbException( + f"Table '{self._table_name}' already exists. " + "Use mode='append' or mode='overwrite'." + ) + if self._schema is None: + raise GPUdbException("Schema must be provided when using mode='create'") + columns = arrow_schema_to_kinetica_columns( + self._schema, + primary_keys=self._table_settings.primary_keys, + shard_keys=self._table_settings.shard_keys, + ) + self._create_table(client, columns) + self._column_defs = self._columns_to_dicts(columns) + self._table_ready = True + + elif self._mode == KineticaSinkMode.OVERWRITE: + # Validate schema BEFORE dropping table to avoid irreversible data loss + if self._schema is None: + raise GPUdbException( + "Schema must be provided when using mode='overwrite'" + ) + if table_exists: + self._drop_table(client) + columns = arrow_schema_to_kinetica_columns( + self._schema, + primary_keys=self._table_settings.primary_keys, + shard_keys=self._table_settings.shard_keys, + ) + self._create_table(client, columns) + self._column_defs = self._columns_to_dicts(columns) + self._table_ready = True + + elif self._mode == KineticaSinkMode.APPEND: + if table_exists: + record_type = self._get_existing_record_type(client) + self._column_defs = self._columns_to_dicts(record_type.columns) + self._table_ready = True + elif self._schema is not None: + columns = arrow_schema_to_kinetica_columns( + self._schema, + primary_keys=self._table_settings.primary_keys, + shard_keys=self._table_settings.shard_keys, + ) + self._create_table(client, columns) + self._column_defs = self._columns_to_dicts(columns) + self._table_ready = True + else: + logger.info( + f"Table '{self._table_name}' does not exist and no " + "schema provided. Schema will be inferred from first " + "data block." + ) + + def _get_record_type(self): + """Reconstruct GPUdbRecordType from stored schema info.""" + from gpudb import GPUdbRecordType + + if self._schema_string is not None: + return GPUdbRecordType( + schema_string=self._schema_string, + column_properties=self._column_properties, + ) + elif self._column_defs is not None: + columns = self._dicts_to_columns(self._column_defs) + return GPUdbRecordType(columns=columns, label=self._table_name) + return None + + def _create_gpudb_table(self, client: "GPUdb", table_exists: bool = False): + """Create a GPUdbTable instance for writing records. + + Args: + client: GPUdb client instance. + table_exists: If True, the table already exists and table creation + options (is_replicated, chunk_size, ttl, etc.) will be ignored + since they only apply when creating a new table. + + Returns: + GPUdbTable instance configured for writing. + """ + from gpudb import GPUdbException, GPUdbTable, GPUdbTableOptions + + record_type = self._get_record_type() + + if record_type is None: + raise GPUdbException( + "Cannot create GPUdbTable: no schema information available" + ) + + # Only set table creation options when the table doesn't exist. + # These options have no effect on existing tables and could cause + # issues with some GPUdb SDK versions. + table_options = GPUdbTableOptions() + if not table_exists: + if self._table_settings.is_replicated: + table_options.is_replicated = True + + if self._table_settings.chunk_size is not None: + table_options.chunk_size = self._table_settings.chunk_size + + if self._table_settings.ttl >= 0: + table_options.ttl = self._table_settings.ttl + + if not self._table_settings.persist: + table_options.no_persist = True + + if self._table_settings.collection_name: + table_options.collection_name = self._table_settings.collection_name + + gpudb_table = GPUdbTable( + _type=record_type, + name=self._table_name, + db=client, + options=table_options, + use_multihead_io=self._use_multihead, + multihead_ingest_batch_size=self._batch_size, + # Buffer records instead of flushing per insertion. + # This allows the error check to prevent committing partial data + # when errors occur. flush_data_to_server() is called explicitly + # after confirming no errors. + flush_multi_head_ingest_per_insertion=False, + ) + + return gpudb_table + + def _try_create_gpudb_table(self, client: Any, table_exists: bool = False): + """Try to create GPUdbTable, handling errors based on multihead setting. + + Args: + client: GPUdb client instance. + table_exists: If True, the table already exists and table creation + options will be skipped. + + Returns: + GPUdbTable instance if successful, None if failed and multihead + is not required. + + Raises: + RuntimeError: If multihead is required but GPUdbTable creation fails. + """ + try: + return self._create_gpudb_table(client, table_exists=table_exists) + except Exception as e: + if self._use_multihead: + raise RuntimeError( + "Failed to create GPUdbTable for multihead ingest. " + "Multihead ingestion was explicitly requested but " + "could not be initialized. Set use_multihead=False " + f"to use single-head insert instead. Error: {e}" + ) from e + else: + logger.warning( + "Could not create GPUdbTable, falling back to " + f"direct insert: {e}" + ) + return None + + def write( + self, + blocks: Iterable[Block], + ctx: TaskContext, + ) -> Any: + """ + Write blocks of data to Kinetica. + + Args: + blocks: Iterable of data blocks to write. + ctx: Ray task context. + + Returns: + Write statistics. + """ + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_schema_to_kinetica_columns, + convert_arrow_batch_to_records, + ) + + client = self._init_client() + total_inserted = 0 + total_updated = 0 + errors = [] + + kinetica_columns = None + if self._column_defs is not None: + kinetica_columns = self._dicts_to_columns(self._column_defs) + + gpudb_table = None + if self._schema_string is not None or self._column_defs is not None: + # Table already exists at this point (created in on_write_start) + gpudb_table = self._try_create_gpudb_table(client, table_exists=True) + + for block in blocks: + accessor = BlockAccessor.for_block(block) + arrow_table = accessor.to_arrow() + + if kinetica_columns is None: + # Deferred table creation - infer schema from first data block. + # Note: supports_distributed_writes returns False for this path, + # so only a single worker runs - no race condition handling needed. + kinetica_columns = arrow_schema_to_kinetica_columns( + arrow_table.schema, + primary_keys=self._table_settings.primary_keys, + shard_keys=self._table_settings.shard_keys, + ) + + # Create the table with inferred schema. + # _create_table sets _schema_string and _column_properties which + # are used by _get_record_type (called from _create_gpudb_table). + self._create_table(client, kinetica_columns) + + # Table was just created by _create_table, so it exists now + gpudb_table = self._try_create_gpudb_table(client, table_exists=True) + + for batch in arrow_table.to_batches(): + records = convert_arrow_batch_to_records( + batch, + kinetica_columns, + ) + + if gpudb_table is not None: + # With buffered mode (flush_multi_head_ingest_per_insertion=False), + # records are buffered and counts are only meaningful after flush. + # We only track errors here; counts are captured after final flush. + batch_errors = self._write_with_gpudb_table(gpudb_table, records) + else: + # Simple path returns actual counts per batch + inserted, updated, batch_errors = self._write_simple( + client, records + ) + total_inserted += inserted + total_updated += updated + + errors.extend(batch_errors) + + # Check for errors collected during insert_records calls. + # Note: With flush_multi_head_ingest_per_insertion=False, most errors + # will only surface during flush_data_to_server(), not here. + if errors: + error_summary = "; ".join(errors[:5]) # Show first 5 errors + if len(errors) > 5: + error_summary += f" ... and {len(errors) - 5} more errors" + raise RuntimeError( + f"Failed to write {len(errors)} record batch(es) to Kinetica " + f"table '{self._table_name}': {error_summary}" + ) + + # Flush buffered records and handle any errors that surface during flush. + # With flush_multi_head_ingest_per_insertion=False, per-record errors + # are only detected here, not during insert_records calls. + if gpudb_table is not None: + from gpudb import GPUdbException + + try: + gpudb_table.flush_data_to_server() + except GPUdbException as e: + raise RuntimeError( + f"Failed to flush records to Kinetica table " + f"'{self._table_name}': {e}" + ) from e + + # Get total counts from GPUdbTable after all records are flushed. + # These represent all records written through this table instance. + total_inserted = gpudb_table.total_inserted + total_updated = gpudb_table.total_updated + + return { + "num_inserted": total_inserted, + "num_updated": total_updated, + } + + def _write_with_gpudb_table( + self, gpudb_table: Any, records: List[Dict[str, Any]] + ) -> List[str]: + """Write records using GPUdbTable. + + With flush_multi_head_ingest_per_insertion=False, records are buffered + and actual insert/update counts are only available after + flush_data_to_server(). This method only returns errors; counts are + retrieved from gpudb_table.total_inserted and gpudb_table.total_updated + after the final flush. + + Args: + gpudb_table: GPUdbTable instance for multihead ingestion. + records: List of record dictionaries to write. + + Returns: + List of error messages (empty if successful). + """ + from gpudb import GPUdbException + + errors = [] + + try: + gpudb_table.insert_records( + records, + options={ + "update_on_existing_pk": "true" + if self._table_settings.update_on_existing_pk + else "false", + "return_individual_errors": "true", + }, + ) + except GPUdbException as e: + errors.append(str(e)) + logger.error(f"Error inserting records via GPUdbTable: {e}") + + return errors + + def _write_simple(self, client, records: List[Dict[str, Any]]) -> tuple: + """Write records using simple insert_records API with JSON encoding.""" + import base64 + import json + + from gpudb import GPUdbException + + def json_serializer(obj): + """Custom JSON serializer for bytes (vector columns).""" + if isinstance(obj, bytes): + return base64.b64encode(obj).decode("ascii") + raise TypeError(f"Object of type {type(obj)} is not JSON serializable") + + inserted = 0 + updated = 0 + errors = [] + + for i in range(0, len(records), self._batch_size): + batch = records[i : i + self._batch_size] + + try: + json_records = [json.dumps(r, default=json_serializer) for r in batch] + + response = client.insert_records( + table_name=self._table_name, + data=json_records, + list_encoding="json", + options={ + "update_on_existing_pk": "true" + if self._table_settings.update_on_existing_pk + else "false", + "return_individual_errors": "true", + }, + ) + + inserted += response.get("count_inserted", 0) + updated += response.get("count_updated", 0) + + response_errors = response.get("info", {}).get("errors") + if response_errors: + # Handle both list and string error formats from the API + if isinstance(response_errors, list): + errors.extend(response_errors) + else: + # Single error string - append as single item + errors.append(str(response_errors)) + + except GPUdbException as e: + errors.append(str(e)) + logger.error(f"Error inserting records via simple insert: {e}") + + return inserted, updated, errors + + @property + def supports_distributed_writes(self) -> bool: + """Whether this datasink supports distributed writes. + + Returns False when table creation is deferred (APPEND mode without + schema and table doesn't exist) to prevent race conditions where + multiple workers try to create the same table simultaneously. + """ + return self._table_ready + + def get_name(self) -> str: + """Return the name of this datasink.""" + return f"Kinetica({self._table_name})" + + @property + def min_rows_per_write(self) -> Optional[int]: + """Target minimum number of rows per write task. + + This overrides the base class property to hint to Ray Data + how many rows should be bundled together for each write call. + """ + return self._batch_size diff --git a/python/ray/data/_internal/datasource/kinetica_datasource.py b/python/ray/data/_internal/datasource/kinetica_datasource.py new file mode 100644 index 000000000000..062f021f7a89 --- /dev/null +++ b/python/ray/data/_internal/datasource/kinetica_datasource.py @@ -0,0 +1,690 @@ +""" +Kinetica Datasource for Ray Data. + +This module provides a Ray Data Datasource implementation for reading data +from Kinetica databases using the GPUdbTable class for idiomatic API usage. +""" + +import logging +import re +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional + +from ray.data._internal.util import _check_import +from ray.data.block import BlockMetadata +from ray.data.datasource.datasource import Datasource, ReadTask + +if TYPE_CHECKING: + import pyarrow as pa + + from ray.data.context import DataContext + + +logger = logging.getLogger(__name__) + + +# Patterns that are potentially dangerous in filter expressions +# Note: This is a defense-in-depth measure. Kinetica's expression parser +# provides the primary protection, but we block obvious injection attempts. +_UNSAFE_FILTER_PATTERNS = [ + re.compile(r"[;{}]"), # Statement terminators and code blocks + re.compile(r"--"), # SQL comment sequences + re.compile(r"/\*"), # Block comment start + re.compile(r"\*/"), # Block comment end + re.compile(r"\bUNION\b", re.IGNORECASE), # UNION-based injection + re.compile(r"\bINSERT\b", re.IGNORECASE), # INSERT statements + re.compile(r"\bUPDATE\b", re.IGNORECASE), # UPDATE statements + re.compile(r"\bDELETE\b", re.IGNORECASE), # DELETE statements + re.compile(r"\bDROP\b", re.IGNORECASE), # DROP statements + re.compile(r"\bCREATE\b", re.IGNORECASE), # CREATE statements + re.compile(r"\bALTER\b", re.IGNORECASE), # ALTER statements + re.compile(r"\bTRUNCATE\b", re.IGNORECASE), # TRUNCATE statements + re.compile(r"\bEXEC\b", re.IGNORECASE), # EXEC/EXECUTE + re.compile(r"\bEXECUTE\b", re.IGNORECASE), + re.compile(r"\bINTO\s+OUTFILE\b", re.IGNORECASE), # File operations + re.compile(r"\bLOAD_FILE\b", re.IGNORECASE), +] + + +def _has_balanced_quotes(expr: str) -> bool: + """ + Check if string literals in the expression are properly balanced. + + This detects unclosed string literals which could be used to bypass + the string stripping logic in filter safety checks. + + Args: + expr: The expression to check. + + Returns: + True if quotes are balanced, False if there are unclosed strings. + """ + in_single = False + in_double = False + i = 0 + while i < len(expr): + char = expr[i] + if not in_double and char == "'": + # Check for escaped quote '' only when inside a single-quoted string + if in_single and i + 1 < len(expr) and expr[i + 1] == "'": + i += 2 # Skip escaped quote + continue + in_single = not in_single + elif not in_single and char == '"': + # Check for escaped quote "" only when inside a double-quoted string + if in_double and i + 1 < len(expr) and expr[i + 1] == '"': + i += 2 # Skip escaped quote + continue + in_double = not in_double + i += 1 + return not in_single and not in_double + + +def _strip_string_literals(expr: str) -> str: + """ + Remove content inside string literals to avoid false positives. + + This handles single-quoted strings (SQL standard) and double-quoted strings. + Escaped quotes within strings are handled ('' for single, "" for double). + + Args: + expr: The expression to process. + + Returns: + The expression with string literal contents replaced by empty strings. + """ + # Replace single-quoted strings (handling escaped quotes '') + expr = re.sub(r"'(?:[^']|'')*'", "''", expr) + # Replace double-quoted strings (handling escaped quotes "") + expr = re.sub(r'"(?:[^"]|"")*"', '""', expr) + return expr + + +def _is_filter_safe(filter_expr: str) -> bool: + """ + Check if a filter expression is safe from SQL injection. + + This function provides defense-in-depth validation against common SQL + injection patterns. It blocks: + - Unbalanced quotes (unclosed string literals) + - Statement terminators (;) and code blocks ({}) + - SQL comments (-- and /* */) + - DDL/DML keywords (UNION, INSERT, UPDATE, DELETE, DROP, etc.) + - File operation functions + + String literals are stripped before checking, so legitimate values like + "city = 'Union City'" won't trigger false positives for the UNION keyword. + + Note: Kinetica's expression parser provides the primary security layer. + This validation is an additional safeguard against obvious attacks. + + Args: + filter_expr: The filter expression to check. + + Returns: + True if the filter appears safe, False otherwise. + """ + if not filter_expr: + return True + + # First check for unbalanced quotes - unclosed strings could bypass stripping + if not _has_balanced_quotes(filter_expr): + return False + + # Strip string literals to avoid false positives on keywords in data values + # e.g., city = 'Union City' should not trigger on UNION + expr_without_strings = _strip_string_literals(filter_expr) + + for pattern in _UNSAFE_FILTER_PATTERNS: + if pattern.search(expr_without_strings): + return False + + return True + + +class KineticaDatasource(Datasource): + """ + A Ray Data Datasource for reading from Kinetica databases. + + This datasource uses GPUdbTable for reading data, providing automatic + type handling, schema management, and optional multihead I/O support. + It supports parallel reads by partitioning data across multiple read + tasks using offset-based pagination. + + Example: + >>> import ray # doctest: +SKIP + >>> ds = ray.data.read_datasource( # doctest: +SKIP + ... KineticaDatasource( + ... url="http://localhost:9191", + ... table_name="my_table", + ... username="admin", + ... password="password", + ... ) + ... ) + """ + + # Default batch size for pagination (10,000 records per request) + DEFAULT_BATCH_SIZE = 10000 + + def __init__( + self, + url: str, + table_name: str, + username: Optional[str] = None, + password: Optional[str] = None, + columns: Optional[List[str]] = None, + filter_expression: Optional[str] = None, + sort_by: Optional[str] = None, + sort_order: str = "ascending", + limit: Optional[int] = None, + batch_size: Optional[int] = None, + use_multihead_io: bool = False, + convert_special_types: bool = True, + options: Optional[Dict[str, Any]] = None, + ): + """ + Initialize the Kinetica datasource. + + Args: + url: URL of the Kinetica server (e.g., "http://localhost:9191"). + table_name: Name of the table to read from. + username: Username for authentication. + password: Password for authentication. + columns: Optional list of column names to read. If None, reads all + columns. + filter_expression: Optional SQL WHERE clause filter (without the + WHERE keyword). + sort_by: Optional column name to sort results by. + sort_order: Sort order, either "ascending" or "descending". + limit: Optional maximum number of records to read. + batch_size: Number of records to fetch per request for pagination. + Defaults to 10,000. + use_multihead_io: If True, enables multihead I/O for parallel reads + from multiple Kinetica nodes. Can improve performance for large + datasets on clustered deployments. + convert_special_types: If True, converts special types (arrays, JSON) + on retrieval. Defaults to True. + options: Additional GPUdb client options. + """ + super().__init__() + _check_import(self, module="gpudb", package="gpudb") + + self._url = url + self._table_name = table_name + self._username = username + self._password = password + self._filter_expression = filter_expression + self._sort_by = sort_by + + # Validate limit - None means no limit, positive integer means specific limit + if limit is not None and limit <= 0: + raise ValueError( + f"limit must be a positive integer or None, got {limit}. " + "Use None for no limit." + ) + self._limit = limit + + if batch_size is not None: + if batch_size <= 0: + raise ValueError( + f"batch_size must be a positive integer, got {batch_size}" + ) + self._batch_size = batch_size + else: + self._batch_size = self.DEFAULT_BATCH_SIZE + self._use_multihead_io = use_multihead_io + self._convert_special_types = convert_special_types + self._options = options or {} + + # Validate columns - empty list is likely a mistake + # None means "all columns", non-empty list means "specific columns" + if columns is not None and len(columns) == 0: + raise ValueError( + "columns cannot be an empty list. " + "Use None to select all columns, or provide a non-empty list " + "of column names to select specific columns." + ) + self._columns = columns + + # Validate sort_order + valid_sort_orders = ("ascending", "descending") + if sort_order not in valid_sort_orders: + raise ValueError( + f"Invalid sort_order '{sort_order}'. " + f"Must be one of: {', '.join(valid_sort_orders)}" + ) + self._sort_order = sort_order + + # Validate filter expression + if filter_expression and not _is_filter_safe(filter_expression): + raise ValueError( + "Filter expression contains potentially unsafe patterns and was " + "rejected. Please review your filter for SQL injection patterns " + "such as statement terminators, comments, or DDL/DML keywords." + ) + + # Cache for schema and record count + self._arrow_schema: Optional["pa.Schema"] = None + self._total_count: Optional[int] = None + + def _init_client(self): + """Create and return a GPUdb client instance.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + create_gpudb_client, + ) + + return create_gpudb_client( + url=self._url, + username=self._username, + password=self._password, + options=self._options, + ) + + def _create_gpudb_table(self, client: Any): + """ + Create a GPUdbTable instance for reading records. + + Args: + client: GPUdb client instance. + + Returns: + GPUdbTable instance configured for reading. + """ + from gpudb import GPUdbTable + + # Create GPUdbTable for an existing table (pass None for _type) + gpudb_table = GPUdbTable( + _type=None, + name=self._table_name, + db=client, + use_multihead_io=self._use_multihead_io, + convert_special_types_on_retrieval=self._convert_special_types, + ) + + return gpudb_table + + def _get_table_info(self, client: Any) -> tuple: + """ + Get table schema and record count using GPUdbTable. + + Args: + client: GPUdb client instance. + + Returns: + Tuple of (total_record_count, arrow_schema) + """ + from gpudb import GPUdbException + + from ray.data._internal.datasource.kinetica_type_utils import ( + kinetica_type_to_arrow_schema, + ) + + # Create GPUdbTable to get schema information + gpudb_table = self._create_gpudb_table(client) + + # Get record type from GPUdbTable + record_type = gpudb_table.gpudbrecord_type + + if record_type is None: + raise GPUdbException( + f"Could not retrieve schema for table '{self._table_name}'" + ) + + # If columns are specified, filter the schema + if self._columns: + from gpudb import GPUdbRecordType + + # Get actual column names from the table + actual_column_names = {col.name for col in record_type.columns} + requested_columns = set(self._columns) + + # Find columns that don't exist in the table + invalid_columns = requested_columns - actual_column_names + if invalid_columns: + raise ValueError( + f"The following columns were requested but do not exist in " + f"table '{self._table_name}': {sorted(invalid_columns)}. " + f"Available columns are: {sorted(actual_column_names)}" + ) + + # Build column map for lookup + column_map = {col.name: col for col in record_type.columns} + + # Preserve user-specified column order + filtered_columns = [column_map[name] for name in self._columns] + + # Create a filtered record type for Arrow schema conversion + record_type = GPUdbRecordType( + columns=filtered_columns, label=self._table_name + ) + + # Get total count (accounting for filter) + if self._filter_expression: + # Use get_records with limit=0 to get filtered count + count_response = client.get_records( + table_name=self._table_name, + offset=0, + limit=0, + options={"expression": self._filter_expression}, + ) + total_count = count_response.get("total_number_of_records", 0) + else: + # Get count from show_table for unfiltered reads + response = client.show_table( + table_name=self._table_name, + options={"get_sizes": "true"}, + ) + total_count = response.get("total_size", 0) + if isinstance(total_count, list): + total_count = total_count[0] if total_count else 0 + + # Apply limit if specified + if self._limit is not None: + total_count = min(total_count, self._limit) + + # Convert to Arrow schema + arrow_schema = kinetica_type_to_arrow_schema(record_type) + + return total_count, arrow_schema + + def _estimate_row_size(self, client: Any, sample_size: int = 100) -> int: + """ + Estimate the average row size by sampling using GPUdbTable. + + Args: + client: GPUdb client instance. + sample_size: Number of rows to sample. + + Returns: + Estimated average row size in bytes. + """ + try: + gpudb_table = self._create_gpudb_table(client) + + # Build options for the query + options = {} + if self._filter_expression: + options["expression"] = self._filter_expression + if self._sort_by: + options["sort_by"] = self._sort_by + options["sort_order"] = self._sort_order + + # Use get_records_by_column if specific columns requested + if self._columns: + records = gpudb_table.get_records_by_column( + column_names=self._columns, + offset=0, + limit=sample_size, + options=options, + get_column_major=False, # Get row-major for size estimation + force_primitive_return_types=True, + ) + else: + records = gpudb_table.get_records( + offset=0, + limit=sample_size, + options=options, + force_primitive_return_types=True, + ) + + if not records: + return 256 + + # Estimate size based on string representation + total_size = sum(len(str(dict(r))) for r in records) + return total_size // len(records) + + except Exception: + return 256 + + def _create_read_fn( + self, + offset: int, + limit: int, + ) -> Callable[[], Iterable["pa.Table"]]: + """ + Create a read function for a specific data partition using GPUdbTable. + + Args: + offset: Starting offset for this partition. + limit: Number of records to read. + + Returns: + A callable that returns an iterable of Arrow tables. + """ + # Capture instance variables for the closure + url = self._url + username = self._username + password = self._password + table_name = self._table_name + columns = self._columns + filter_expression = self._filter_expression + sort_by = self._sort_by + sort_order = self._sort_order + client_options = self._options + arrow_schema = self._arrow_schema + batch_size = self._batch_size + use_multihead_io = self._use_multihead_io + convert_special_types = self._convert_special_types + + def read_fn() -> Iterable["pa.Table"]: + import pyarrow as pa + from gpudb import GPUdbTable + + from ray.data._internal.datasource.kinetica_type_utils import ( + create_gpudb_client, + ) + + # Create client in the worker + client = create_gpudb_client( + url=url, + username=username, + password=password, + options=client_options, + ) + + # Create GPUdbTable for reading + gpudb_table = GPUdbTable( + _type=None, + name=table_name, + db=client, + use_multihead_io=use_multihead_io, + convert_special_types_on_retrieval=convert_special_types, + ) + + # Build request options + options = {} + if filter_expression: + options["expression"] = filter_expression + if sort_by: + options["sort_by"] = sort_by + options["sort_order"] = sort_order + + # Track how many records we've read and need to read + records_remaining = limit + current_offset = offset + has_yielded = False + + try: + while records_remaining > 0: + fetch_count = min(batch_size, records_remaining) + + # Use appropriate method based on column selection + if columns: + # get_records_by_column for specific columns + records = gpudb_table.get_records_by_column( + column_names=columns, + offset=current_offset, + limit=fetch_count, + options=options, + get_column_major=False, # Row-major format + force_primitive_return_types=True, + ) + else: + # get_records for all columns + records = gpudb_table.get_records( + offset=current_offset, + limit=fetch_count, + options=options, + force_primitive_return_types=True, + ) + + if not records: + break + + # Convert records to Arrow table + # Records are OrderedDict or Record objects + from ray.data._internal.datasource.kinetica_type_utils import ( + convert_records_to_arrow_table, + ) + + record_dicts = [dict(r) for r in records] + table = convert_records_to_arrow_table(record_dicts, arrow_schema) + yield table + has_yielded = True + + # Update counters + fetched_count = len(records) + records_remaining -= fetched_count + current_offset += fetched_count + + # If we got fewer records than requested, we're done + if fetched_count < fetch_count: + break + + # If we never yielded anything, yield an empty table + if not has_yielded: + empty_arrays = [ + pa.array([], type=field.type) for field in arrow_schema + ] + yield pa.Table.from_arrays(empty_arrays, schema=arrow_schema) + + except Exception as e: + logger.error(f"Error reading from Kinetica: {e}") + raise + + return read_fn + + def get_read_tasks( + self, + parallelism: int, + per_task_row_limit: Optional[int] = None, + data_context: Optional["DataContext"] = None, + ) -> List[ReadTask]: + """ + Generate read tasks for parallel data loading. + + Args: + parallelism: Desired level of parallelism. + per_task_row_limit: Optional max rows per task. + data_context: Optional Ray data context. + + Returns: + List of ReadTask objects. + """ + client = self._init_client() + + # Get table info using GPUdbTable + self._total_count, self._arrow_schema = self._get_table_info(client) + + if self._total_count == 0: + return [] + + # Estimate row size for metadata + avg_row_size = self._estimate_row_size(client) + + # Handle parallelism=-1 (auto-detect) by computing based on data size. + # Use min_records_per_task to determine a reasonable parallelism. + min_records_per_task = 1000 + if parallelism == -1: + # Auto-detect: aim for ~min_records_per_task rows per task + effective_parallelism = max(1, self._total_count // min_records_per_task) + else: + # Ensure parallelism is at least 1 to handle invalid values + effective_parallelism = max(1, parallelism) + + if not self._sort_by and effective_parallelism > 1: + # Without a deterministic sort order, offset-based pagination + # with parallelism > 1 will produce incorrect results (duplicates + # or missing rows) because Kinetica does not guarantee a stable + # row order across paginated get_records calls. Force single-task + # execution unless sort_by is specified. + logger.warning( + "Parallel reads without sort_by may produce non-deterministic " + "results (duplicate or missing rows). Reducing parallelism " + f"from {effective_parallelism} to 1. Specify sort_by to enable " + "parallel reads with consistent ordering." + ) + effective_parallelism = 1 + + # Calculate partition sizes + records_per_task = max(1, self._total_count // effective_parallelism) + + # Ensure we don't create too many tiny tasks + if ( + records_per_task < min_records_per_task + and self._total_count > min_records_per_task + ): + effective_parallelism = max(1, self._total_count // min_records_per_task) + records_per_task = self._total_count // effective_parallelism + + # Cap effective_parallelism to total_count to avoid empty tasks + # This handles the case where parallelism > total_count + effective_parallelism = min(effective_parallelism, self._total_count) + + # Recompute records_per_task after capping parallelism to ensure + # even distribution of work across tasks + records_per_task = max(1, self._total_count // effective_parallelism) + + read_tasks = [] + offset = 0 + + for i in range(effective_parallelism): + if i == effective_parallelism - 1: + partition_size = self._total_count - offset + else: + partition_size = records_per_task + + # Skip if we've already assigned all records + if partition_size <= 0 or offset >= self._total_count: + break + + metadata = BlockMetadata( + num_rows=partition_size, + size_bytes=partition_size * avg_row_size, + input_files=None, + exec_stats=None, + ) + + read_fn = self._create_read_fn(offset, partition_size) + + read_tasks.append( + ReadTask( + read_fn, + metadata, + schema=self._arrow_schema, + per_task_row_limit=per_task_row_limit, + ) + ) + offset += partition_size + + return read_tasks + + def estimate_inmemory_data_size(self) -> Optional[int]: + """ + Estimate the in-memory size of the data. + + Returns: + Estimated size in bytes, or None if unknown. + """ + if self._total_count is None: + client = self._init_client() + self._total_count, _ = self._get_table_info(client) + + if self._total_count is None: + return None + + return self._total_count * 256 + + def get_name(self) -> str: + """Return the name of this datasource.""" + return f"Kinetica({self._table_name})" diff --git a/python/ray/data/_internal/datasource/kinetica_sql_connection.py b/python/ray/data/_internal/datasource/kinetica_sql_connection.py new file mode 100644 index 000000000000..b8fe852bf064 --- /dev/null +++ b/python/ray/data/_internal/datasource/kinetica_sql_connection.py @@ -0,0 +1,121 @@ +""" +Connection factory for Kinetica DB-API integration with Ray Data. + +This module provides a connection factory that creates DB-API 2.0 compliant +connections compatible with Ray Data's read_sql and write_sql methods. +""" + +from dataclasses import dataclass +from typing import Any, Callable, Dict, Optional + + +def _check_gpudb(): + """Check that gpudb is installed and return the dbapi module.""" + try: + from gpudb import dbapi + + return dbapi + except ImportError: + raise ImportError( + "gpudb is required to use Kinetica SQL integration. " + "Install it with: pip install gpudb" + ) + + +@dataclass +class KineticaConnectionFactory: + """ + A callable factory that creates Kinetica DB-API connections. + + This class is designed to work with Ray Data's SQL integration, + which expects a callable that returns DB-API 2.0 compliant connections. + + Attributes: + url: URL of the Kinetica server. + username: Username for authentication. + password: Password for authentication. + oauth_token: OAuth token for authentication (alternative to username/password). + default_schema: Default schema to use for queries. + options: Additional GPUdb client options. + + Example: + >>> factory = KineticaConnectionFactory( # doctest: +SKIP + ... url="http://localhost:9191", + ... username="admin", + ... password="password", + ... ) + """ + + url: str + username: Optional[str] = None + password: Optional[str] = None + oauth_token: Optional[str] = None + default_schema: Optional[str] = None + options: Optional[Dict[str, Any]] = None + + def __call__(self): + """ + Create and return a new Kinetica connection. + + Returns: + A DB-API 2.0 compliant KineticaConnection instance. + """ + dbapi = _check_gpudb() + return dbapi.connect( + connection_string="kinetica://", + url=self.url, + username=self.username, + password=self.password, + oauth_token=self.oauth_token, + default_schema=self.default_schema, + options=self.options, + ) + + +def create_kinetica_connection_factory( + url: str, + username: Optional[str] = None, + password: Optional[str] = None, + oauth_token: Optional[str] = None, + default_schema: Optional[str] = None, + options: Optional[Dict[str, Any]] = None, +) -> Callable: + """ + Create a connection factory for use with Ray Data's SQL methods. + + This function returns a callable that creates new Kinetica connections + when invoked. It's designed to work with Ray Data's `read_sql` and + `write_sql` methods, which require a connection factory. + + Args: + url: URL of the Kinetica server (e.g., "http://localhost:9191"). + username: Username for authentication. + password: Password for authentication. + oauth_token: OAuth token for authentication (alternative to username/password). + default_schema: Default schema to use for queries. + options: Additional GPUdb client options + (e.g., {"skip_ssl_cert_verification": True}). + + Returns: + A callable that creates new KineticaConnection instances. + + Example: + >>> import ray # doctest: +SKIP + >>> factory = create_kinetica_connection_factory( # doctest: +SKIP + ... url="http://localhost:9191", + ... username="admin", + ... password="password", + ... ) + >>> ds = ray.data.read_sql( # doctest: +SKIP + ... sql="SELECT * FROM my_table", + ... connection_factory=factory, + ... ) + """ + return KineticaConnectionFactory( + url=url, + username=username, + password=password, + oauth_token=oauth_token, + default_schema=default_schema, + options=options, + ) diff --git a/python/ray/data/_internal/datasource/kinetica_type_utils.py b/python/ray/data/_internal/datasource/kinetica_type_utils.py new file mode 100644 index 000000000000..bb39b8216e9c --- /dev/null +++ b/python/ray/data/_internal/datasource/kinetica_type_utils.py @@ -0,0 +1,686 @@ +""" +Type conversion utilities for mapping between PyArrow and Kinetica types. + +This module provides bidirectional type conversion between PyArrow schemas +and Kinetica column definitions. +""" + +import re +from typing import Any, Dict, List, Optional, Tuple + +try: + import pyarrow as pa +except ImportError: + pa = None + + +def _check_pyarrow(): + """Check if PyArrow is available.""" + if pa is None: + raise ImportError( + "PyArrow is required for Kinetica integration. " + "Install it with: pip install pyarrow" + ) + + +def _check_gpudb(): + """Check if gpudb is available.""" + try: + import gpudb + + return gpudb + except ImportError: + raise ImportError( + "gpudb is required for Kinetica integration. " + "Install it with: pip install gpudb" + ) + + +def create_gpudb_client( + url: str, + username: Optional[str] = None, + password: Optional[str] = None, + options: Optional[Dict[str, Any]] = None, +): + """ + Create and return a GPUdb client instance. + + This is a shared factory function used by both KineticaDatasource and + KineticaDatasink to ensure consistent client construction. + + Args: + url: URL of the Kinetica server (e.g., "http://localhost:9191"). + username: Username for authentication. + password: Password for authentication. + options: Additional GPUdb client options. + + Returns: + A configured GPUdb client instance. + """ + gpudb = _check_gpudb() + GPUdb = gpudb.GPUdb + + gpudb_options = GPUdb.Options() + + # Use explicit None checks to allow empty string credentials + # (consistent with SQL connection path in KineticaConnectionFactory) + if username is not None: + gpudb_options.username = username + if password is not None: + gpudb_options.password = password + + # Apply additional options + if options: + for key, value in options.items(): + if hasattr(gpudb_options, key): + setattr(gpudb_options, key, value) + + return GPUdb(host=url, options=gpudb_options) + + +def _column_type_to_array_inner(column_type: Any) -> str: + """ + Convert a GPUdbRecordColumn type constant to its array inner type string. + + This helper is used when constructing array() property strings for both + variable-length lists and fixed-size lists to avoid code duplication. + + Args: + column_type: A GPUdbRecordColumn._ColumnType constant. + + Returns: + The lowercase string representation for use in array() properties. + + Raises: + TypeError: If the column type is not supported as an array element type. + Kinetica only supports int, long, float, double, and string as + array inner types. + """ + gpudb = _check_gpudb() + GPUdbRecordColumn = gpudb.GPUdbRecordColumn + + type_map = { + GPUdbRecordColumn._ColumnType.INT: "int", + GPUdbRecordColumn._ColumnType.LONG: "long", + GPUdbRecordColumn._ColumnType.FLOAT: "float", + GPUdbRecordColumn._ColumnType.DOUBLE: "double", + GPUdbRecordColumn._ColumnType.STRING: "string", + } + result = type_map.get(column_type) + if result is None: + # Get a readable name for the unsupported type + type_name = getattr(column_type, "name", str(column_type)) + raise TypeError( + f"Unsupported array element type: {type_name}. " + f"Kinetica arrays only support: int, long, float, double, string. " + f"Consider converting binary/bytes data to base64-encoded strings." + ) + return result + + +def arrow_to_kinetica_type( + arrow_type: "pa.DataType", + column_name: str = "", +) -> Tuple[str, List[str]]: + """ + Convert a PyArrow data type to Kinetica column type and properties. + + Args: + arrow_type: The PyArrow data type to convert. + column_name: Optional column name for error messages. + + Returns: + A tuple of (kinetica_base_type, list_of_properties). + + Raises: + TypeError: If the PyArrow type cannot be converted. + """ + _check_pyarrow() + gpudb = _check_gpudb() + GPUdbRecordColumn = gpudb.GPUdbRecordColumn + GPUdbColumnProperty = gpudb.GPUdbColumnProperty + + # Boolean + if pa.types.is_boolean(arrow_type): + return GPUdbRecordColumn._ColumnType.INT, [GPUdbColumnProperty.BOOLEAN] + + # Integer types + if pa.types.is_int8(arrow_type): + return GPUdbRecordColumn._ColumnType.INT, [GPUdbColumnProperty.INT8] + if pa.types.is_int16(arrow_type): + return GPUdbRecordColumn._ColumnType.INT, [GPUdbColumnProperty.INT16] + if pa.types.is_int32(arrow_type): + return GPUdbRecordColumn._ColumnType.INT, [] + if pa.types.is_int64(arrow_type): + return GPUdbRecordColumn._ColumnType.LONG, [] + + # Unsigned integer types + if pa.types.is_uint8(arrow_type): + return GPUdbRecordColumn._ColumnType.INT, [GPUdbColumnProperty.INT16] + if pa.types.is_uint16(arrow_type): + return GPUdbRecordColumn._ColumnType.INT, [] + if pa.types.is_uint32(arrow_type): + return GPUdbRecordColumn._ColumnType.LONG, [] + if pa.types.is_uint64(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [GPUdbColumnProperty.ULONG] + + # Float types + if pa.types.is_float16(arrow_type) or pa.types.is_float32(arrow_type): + return GPUdbRecordColumn._ColumnType.FLOAT, [] + if pa.types.is_float64(arrow_type): + return GPUdbRecordColumn._ColumnType.DOUBLE, [] + + # Decimal types + if pa.types.is_decimal(arrow_type): + precision = arrow_type.precision + scale = arrow_type.scale + return GPUdbRecordColumn._ColumnType.STRING, [f"decimal({precision},{scale})"] + + # String types + if pa.types.is_string(arrow_type) or pa.types.is_large_string(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [] + if hasattr(pa.types, "is_utf8"): + if pa.types.is_utf8(arrow_type) or pa.types.is_large_utf8(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [] + + # UUID (must come before generic fixed-size binary check) + if pa.types.is_fixed_size_binary(arrow_type) and arrow_type.byte_width == 16: + return GPUdbRecordColumn._ColumnType.STRING, [GPUdbColumnProperty.UUID] + + # Binary types + if pa.types.is_binary(arrow_type) or pa.types.is_large_binary(arrow_type): + return GPUdbRecordColumn._ColumnType.BYTES, [] + if pa.types.is_fixed_size_binary(arrow_type): + return GPUdbRecordColumn._ColumnType.BYTES, [] + + # Date types + if pa.types.is_date32(arrow_type) or pa.types.is_date64(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [GPUdbColumnProperty.DATE] + + # Time types + if pa.types.is_time32(arrow_type) or pa.types.is_time64(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [GPUdbColumnProperty.TIME] + + # Timestamp types + if pa.types.is_timestamp(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [GPUdbColumnProperty.DATETIME] + + # Duration + if pa.types.is_duration(arrow_type): + return GPUdbRecordColumn._ColumnType.LONG, [] + + # List/Array types + if pa.types.is_list(arrow_type) or pa.types.is_large_list(arrow_type): + value_type = arrow_type.value_type + inner_type, _ = arrow_to_kinetica_type(value_type) + array_inner = _column_type_to_array_inner(inner_type) + return GPUdbRecordColumn._ColumnType.STRING, [f"array({array_inner})"] + + # Fixed-size list (vector) + if pa.types.is_fixed_size_list(arrow_type): + list_size = arrow_type.list_size + value_type = arrow_type.value_type + if pa.types.is_float32(value_type) or pa.types.is_float64(value_type): + return GPUdbRecordColumn._ColumnType.BYTES, [f"vector({list_size})"] + else: + inner_type, _ = arrow_to_kinetica_type(value_type) + array_inner = _column_type_to_array_inner(inner_type) + return GPUdbRecordColumn._ColumnType.STRING, [ + f"array({array_inner},{list_size})" + ] + + # Struct types -> JSON + if pa.types.is_struct(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [GPUdbColumnProperty.JSON] + + # Map types -> JSON + if pa.types.is_map(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [GPUdbColumnProperty.JSON] + + # Dictionary encoded + if pa.types.is_dictionary(arrow_type): + return arrow_to_kinetica_type(arrow_type.value_type, column_name) + + # Null type + if pa.types.is_null(arrow_type): + return GPUdbRecordColumn._ColumnType.STRING, [GPUdbColumnProperty.NULLABLE] + + raise TypeError( + f"Cannot convert PyArrow type '{arrow_type}' to Kinetica type" + + (f" for column '{column_name}'" if column_name else "") + ) + + +def kinetica_to_arrow_type(column: Any) -> "pa.DataType": + """ + Convert a Kinetica column definition to a PyArrow data type. + + Args: + column: The GPUdbRecordColumn to convert. + + Returns: + The corresponding PyArrow data type. + """ + _check_pyarrow() + gpudb = _check_gpudb() + GPUdbRecordColumn = gpudb.GPUdbRecordColumn + GPUdbColumnProperty = gpudb.GPUdbColumnProperty + + base_type = column.column_type + properties = column.column_properties + # Lowercase both the properties and the constants for consistent comparison + prop_set = {p.lower() for p in properties} + + # Boolean + if GPUdbColumnProperty.BOOLEAN.lower() in prop_set: + return pa.bool_() + + # Int8 + if GPUdbColumnProperty.INT8.lower() in prop_set: + return pa.int8() + + # Int16 + if GPUdbColumnProperty.INT16.lower() in prop_set: + return pa.int16() + + # UUID + if GPUdbColumnProperty.UUID.lower() in prop_set: + return pa.string() + + # ULong + if GPUdbColumnProperty.ULONG.lower() in prop_set: + return pa.uint64() + + # Date + if GPUdbColumnProperty.DATE.lower() in prop_set: + return pa.date32() + + # Time + if GPUdbColumnProperty.TIME.lower() in prop_set: + return pa.time64("us") + + # Datetime + if GPUdbColumnProperty.DATETIME.lower() in prop_set: + return pa.timestamp("us") + + # Timestamp + if GPUdbColumnProperty.TIMESTAMP.lower() in prop_set: + return pa.timestamp("ms") + + # Decimal (note: is_decimal is a @property, not a method like is_json()) + if column.is_decimal: + # Use explicit None check to preserve valid 0 values for scale + precision = ( + column.precision + if column.precision is not None + else GPUdbRecordColumn.DEFAULT_DECIMAL_PRECISION + ) + scale = ( + column.scale + if column.scale is not None + else GPUdbRecordColumn.DEFAULT_DECIMAL_SCALE + ) + return pa.decimal128(precision, scale) + + # JSON + if column.is_json(): + return pa.string() + + # Array + # Note: column.array_type returns _ColumnType constants for int/long/float/ + # double/string, but GPUdbColumnProperty strings for boolean/ulong. + if column.is_array(): + array_type = column.array_type + if array_type == GPUdbRecordColumn._ColumnType.INT: + return pa.list_(pa.int32()) + elif array_type == GPUdbRecordColumn._ColumnType.LONG: + return pa.list_(pa.int64()) + elif array_type == GPUdbRecordColumn._ColumnType.FLOAT: + return pa.list_(pa.float32()) + elif array_type == GPUdbRecordColumn._ColumnType.DOUBLE: + return pa.list_(pa.float64()) + elif array_type == GPUdbRecordColumn._ColumnType.STRING: + return pa.list_(pa.string()) + elif array_type == GPUdbColumnProperty.BOOLEAN: + return pa.list_(pa.bool_()) + elif array_type == GPUdbColumnProperty.ULONG: + return pa.list_(pa.uint64()) + else: + return pa.list_(pa.string()) + + # Vector + if column.is_vector(): + dim = column.vector_dimension + if dim and dim > 0: + return pa.list_(pa.float32(), dim) + else: + return pa.list_(pa.float32()) + + # IPV4 + if GPUdbColumnProperty.IPV4.lower() in prop_set: + return pa.string() + + # WKT + if GPUdbColumnProperty.WKT.lower() in prop_set: + if base_type == GPUdbRecordColumn._ColumnType.BYTES: + return pa.binary() + return pa.string() + + # CharN types + for prop in properties: + prop_lower = prop.lower() + if prop_lower.startswith("char"): + match = re.match(r"char(\d+)", prop_lower) + if match: + return pa.string() + + # Base types + if base_type == GPUdbRecordColumn._ColumnType.INT: + return pa.int32() + elif base_type == GPUdbRecordColumn._ColumnType.LONG: + return pa.int64() + elif base_type == GPUdbRecordColumn._ColumnType.FLOAT: + return pa.float32() + elif base_type == GPUdbRecordColumn._ColumnType.DOUBLE: + return pa.float64() + elif base_type == GPUdbRecordColumn._ColumnType.STRING: + return pa.string() + elif base_type == GPUdbRecordColumn._ColumnType.BYTES: + return pa.binary() + + raise TypeError( + f"Cannot convert Kinetica type '{base_type}' with properties " + f"{properties} to PyArrow type for column '{column.name}'" + ) + + +def arrow_schema_to_kinetica_columns( + schema: "pa.Schema", + primary_keys: Optional[List[str]] = None, + shard_keys: Optional[List[str]] = None, +) -> List: + """ + Convert a PyArrow schema to a list of Kinetica column definitions. + + Args: + schema: The PyArrow schema to convert. + primary_keys: Optional list of column names to mark as primary keys. + shard_keys: Optional list of column names to mark as shard keys. + + Returns: + A list of GPUdbRecordColumn objects. + """ + _check_pyarrow() + gpudb = _check_gpudb() + GPUdbRecordColumn = gpudb.GPUdbRecordColumn + GPUdbColumnProperty = gpudb.GPUdbColumnProperty + + # Handle Ray Data Schema objects + if hasattr(schema, "base_schema"): + schema = schema.base_schema + + primary_keys = set(primary_keys or []) + shard_keys = set(shard_keys or []) + + # Validate primary/shard keys exist in schema + schema_columns = {field.name for field in schema} + invalid_primary = primary_keys - schema_columns + invalid_shard = shard_keys - schema_columns + if invalid_primary or invalid_shard: + invalid_cols = invalid_primary | invalid_shard + raise ValueError( + f"Primary/shard keys reference non-existent columns: " + f"{sorted(invalid_cols)}. Available columns: {sorted(schema_columns)}" + ) + + columns = [] + + for field in schema: + base_type, properties = arrow_to_kinetica_type(field.type, field.name) + + if field.name in primary_keys: + properties.append(GPUdbColumnProperty.PRIMARY_KEY) + if field.name in shard_keys: + properties.append(GPUdbColumnProperty.SHARD_KEY) + + is_nullable = field.nullable + + column = GPUdbRecordColumn( + name=field.name, + column_type=base_type, + column_properties=properties, + is_nullable=is_nullable, + ) + columns.append(column) + + return columns + + +def kinetica_type_to_arrow_schema(record_type: Any) -> "pa.Schema": + """ + Convert a Kinetica record type to a PyArrow schema. + + Args: + record_type: The GPUdbRecordType to convert. + + Returns: + A PyArrow Schema object. + """ + _check_pyarrow() + + fields = [] + for column in record_type.columns: + arrow_type = kinetica_to_arrow_type(column) + field = pa.field(column.name, arrow_type, nullable=column.is_nullable) + fields.append(field) + + return pa.schema(fields) + + +def _is_date_time_column(col_def: Any) -> Optional[str]: + """Check if a column is a date/time type and return the type name. + + Args: + col_def: The column definition to check. + + Returns: + The date/time type name ('date', 'time', 'datetime', 'timestamp'), + or None if not a date/time column. + """ + gpudb = _check_gpudb() + GPUdbColumnProperty = gpudb.GPUdbColumnProperty + + if col_def is None: + return None + + prop_set = {p.lower() for p in col_def.column_properties} + + if GPUdbColumnProperty.DATE.lower() in prop_set: + return "date" + if GPUdbColumnProperty.TIME.lower() in prop_set: + return "time" + if GPUdbColumnProperty.DATETIME.lower() in prop_set: + return "datetime" + if GPUdbColumnProperty.TIMESTAMP.lower() in prop_set: + return "timestamp" + return None + + +def convert_arrow_batch_to_records( + batch: "pa.RecordBatch", + columns: List, +) -> List[Dict[str, Any]]: + """ + Convert a PyArrow RecordBatch to a list of dictionaries for Kinetica insertion. + + Args: + batch: The PyArrow RecordBatch to convert. + columns: The Kinetica column definitions. + + Returns: + A list of dictionaries, each representing a record. + """ + _check_pyarrow() + import json + import struct + from datetime import date, datetime, time + + # Handle None columns - create empty map if no column definitions provided + column_map = {col.name: col for col in columns} if columns else {} + col_names = batch.schema.names + num_rows = batch.num_rows + + # Pre-process columns: convert to Python lists and identify special columns + col_data = {} + # col_types values: 'json', 'array', 'vector', 'decimal', 'date', 'time', + # 'datetime', 'timestamp', or None for normal columns + col_types = {} + for col_name in col_names: + col_def = column_map.get(col_name) + # Convert column to Python list directly from batch (avoids full + # table conversion) + col_data[col_name] = batch.column(col_name).to_pylist() + + if col_def: + if col_def.is_json(): + col_types[col_name] = "json" + elif col_def.is_array(): + col_types[col_name] = "array" + elif col_def.is_vector(): + col_types[col_name] = "vector" + elif col_def.is_decimal: + col_types[col_name] = "decimal" + else: + # Check for date/time types + dt_type = _is_date_time_column(col_def) + col_types[ + col_name + ] = dt_type # Could be 'date', 'time', 'datetime', 'timestamp', or None + else: + col_types[col_name] = None + + records = [] + for i in range(num_rows): + record = {} + for col_name in col_names: + value = col_data[col_name][i] + col_type = col_types[col_name] + + if value is None: + record[col_name] = None + elif col_type == "json": + if isinstance(value, (dict, list)): + record[col_name] = json.dumps(value) + else: + record[col_name] = value + elif col_type == "array": + if isinstance(value, list): + # Use json.dumps for proper JSON array format + # str() produces single-quoted strings which is invalid JSON + record[col_name] = json.dumps(value) + else: + record[col_name] = value + elif col_type == "vector": + if isinstance(value, list): + try: + record[col_name] = struct.pack(f"{len(value)}f", *value) + except (struct.error, TypeError) as e: + raise ValueError( + f"Failed to pack vector values for column '{col_name}': " + f"expected list of floats, got {type(value).__name__} " + f"with values that cannot be converted. Error: {e}" + ) from e + else: + record[col_name] = value + elif col_type == "decimal": + # Convert decimal to string representation for Kinetica + record[col_name] = str(value) + elif col_type == "date": + # Convert date to ISO format string (YYYY-MM-DD) + # Check datetime first since datetime is a subclass of date + if isinstance(value, datetime): + # Extract just the date part for date-typed columns + record[col_name] = value.date().isoformat() + elif isinstance(value, date): + record[col_name] = value.isoformat() + else: + # value is not None here (handled by if block at line 569) + record[col_name] = str(value) + elif col_type == "time": + # Convert time to ISO format string (HH:MM:SS.ffffff) + if isinstance(value, time): + record[col_name] = value.isoformat() + else: + # value is not None here (handled by if block at line 569) + record[col_name] = str(value) + elif col_type in ("datetime", "timestamp"): + # Convert datetime to ISO format string (YYYY-MM-DDTHH:MM:SS.ffffff) + if isinstance(value, datetime): + record[col_name] = value.isoformat() + else: + # value is not None here (handled by if block at line 569) + record[col_name] = str(value) + else: + # Handle any remaining date/time types that weren't detected + # by column properties + if isinstance(value, datetime): + record[col_name] = value.isoformat() + elif isinstance(value, date): + record[col_name] = value.isoformat() + elif isinstance(value, time): + record[col_name] = value.isoformat() + else: + record[col_name] = value + + records.append(record) + + return records + + +def convert_records_to_arrow_table( + records: List[Dict[str, Any]], + schema: "pa.Schema", +) -> "pa.Table": + """ + Convert a list of record dictionaries to a PyArrow Table. + + Args: + records: List of dictionaries representing records. + schema: The PyArrow schema to use. + + Returns: + A PyArrow Table. + """ + _check_pyarrow() + + if not records: + # Create empty arrays for each field in the schema + empty_arrays = [pa.array([], type=field.type) for field in schema] + return pa.Table.from_arrays(empty_arrays, schema=schema) + + columns = {field.name: [] for field in schema} + + for record in records: + for field in schema: + value = record.get(field.name) + columns[field.name].append(value) + + arrays = [] + for field in schema: + col_data = columns[field.name] + try: + array = pa.array(col_data, type=field.type) + except (pa.ArrowInvalid, pa.ArrowTypeError) as initial_error: + # Try creating array without explicit type, then cast + try: + array = pa.array(col_data) + if array.type != field.type: + array = array.cast(field.type) + except (pa.ArrowInvalid, pa.ArrowTypeError) as cast_error: + raise pa.ArrowTypeError( + f"Failed to convert column '{field.name}' to type {field.type}. " + f"Initial error: {initial_error}. Cast error: {cast_error}" + ) from cast_error + arrays.append(array) + + return pa.Table.from_arrays(arrays, schema=schema) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index eb1a0214a8cd..4c79c5b2ff8e 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -153,6 +153,7 @@ import torch.utils.data from tensorflow_metadata.proto.v0 import schema_pb2 + from ray.data._internal.datasource.kinetica_datasink import KineticaTableSettings from ray.data._internal.execution.interfaces import Executor, NodeIdStr from ray.data._internal.execution.streaming_executor import StreamingExecutor from ray.data._internal.logical.interfaces.logical_operator import LogicalOperator @@ -5216,6 +5217,165 @@ def write_sql( concurrency=concurrency, ) + @ConsumptionAPI + def write_kinetica( + self, + table_name: str, + url: str, + *, + username: Optional[str] = None, + password: Optional[str] = None, + mode: str = "append", + table_settings: Optional["KineticaTableSettings"] = None, + batch_size: int = 10000, + use_multihead: bool = True, + options: Optional[Dict[str, Any]] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + concurrency: Optional[int] = None, + ) -> None: + """Write this :class:`~ray.data.Dataset` to a Kinetica database table. + + Kinetica is a distributed, in-memory analytical database. This method + uses Kinetica's multihead ingestion for optimal write performance when + writing to a distributed cluster. + + Examples: + + .. testcode:: + :skipif: True + + import ray + + ds = ray.data.from_items([ + {"id": 1, "name": "Alice", "amount": 100.0}, + {"id": 2, "name": "Bob", "amount": 200.0}, + ]) + ds.write_kinetica( + table_name="transactions", + url="http://localhost:9191", + username="admin", + password="password", + mode="append", + ) + + Args: + table_name: Target table name. + url: Kinetica server URL (e.g., "http://localhost:9191"). + username: Authentication username. + password: Authentication password. + mode: Write mode - "create", "append", or "overwrite". + table_settings: Kinetica-specific table options (KineticaTableSettings). + batch_size: Records per ingestion batch. Default is 10,000. + use_multihead: Enable multihead ingestion for parallelism. + options: Additional GPUdb client options. + ray_remote_args: Keyword arguments passed to :func:`ray.remote`. + concurrency: Maximum concurrent write tasks. + """ + from ray.data._internal.datasource.kinetica_datasink import KineticaDatasink + + # Extract the underlying PyArrow schema from Ray Data Schema. + # KineticaDatasink expects pa.Schema, not ray.data.Schema. + ray_schema = self.schema() + pa_schema = ( + ray_schema.base_schema if hasattr(ray_schema, "base_schema") else ray_schema + ) + + datasink = KineticaDatasink( + url=url, + table_name=table_name, + username=username, + password=password, + mode=mode, + schema=pa_schema, + table_settings=table_settings, + batch_size=batch_size, + use_multihead=use_multihead, + options=options, + ) + self.write_datasink( + datasink, + ray_remote_args=ray_remote_args, + concurrency=concurrency, + ) + + @ConsumptionAPI + def write_kinetica_sql( + self, + sql: str, + url: str, + *, + username: Optional[str] = None, + password: Optional[str] = None, + oauth_token: Optional[str] = None, + default_schema: Optional[str] = None, + options: Optional[Dict[str, Any]] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + concurrency: Optional[int] = None, + ) -> None: + """Write this :class:`~ray.data.Dataset` to Kinetica using SQL INSERT. + + This method uses Kinetica's DB-API 2.0 interface with Ray Data's native + ``write_sql`` method. The target table must already exist. + + For automatic table creation and multihead ingestion, use + :meth:`~ray.data.Dataset.write_kinetica` instead. + + Examples: + + .. testcode:: + :skipif: True + + import ray + + ds = ray.data.from_items([ + {"id": 1, "name": "Alice", "value": 100.0}, + {"id": 2, "name": "Bob", "value": 200.0}, + ]) + ds.write_kinetica_sql( + sql="INSERT INTO my_table (id, name, value) VALUES (?, ?, ?)", + url="http://localhost:9191", + username="admin", + password="password", + ) + + Args: + sql: SQL INSERT statement with parameter placeholders. + Use '?' for placeholders (qmark paramstyle). + url: Kinetica server URL (e.g., "http://localhost:9191"). + username: Authentication username. + password: Authentication password. + oauth_token: OAuth token for authentication (alternative to username/password). + default_schema: Default schema for queries. + options: Additional GPUdb client options. + ray_remote_args: Keyword arguments passed to :func:`ray.remote`. + concurrency: Maximum concurrent write tasks. + """ + from ray.data._internal.datasource.kinetica_sql_connection import ( + create_kinetica_connection_factory, + ) + + connection_factory = create_kinetica_connection_factory( + url=url, + username=username, + password=password, + oauth_token=oauth_token, + default_schema=default_schema, + options=options, + ) + + write_kwargs: Dict[str, Any] = { + "sql": sql, + "connection_factory": connection_factory, + } + + if ray_remote_args is not None: + write_kwargs["ray_remote_args"] = ray_remote_args + + if concurrency is not None: + write_kwargs["concurrency"] = concurrency + + self.write_sql(**write_kwargs) + @ConsumptionAPI def write_snowflake( self, diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 5dc047e7e7ea..283f5426dfbf 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -1036,6 +1036,186 @@ def read_mongo( ) +@PublicAPI(stability="alpha") +def read_kinetica( + table_name: str, + url: str, + *, + username: Optional[str] = None, + password: Optional[str] = None, + columns: Optional[List[str]] = None, + filter_expression: Optional[str] = None, + sort_by: Optional[str] = None, + sort_order: str = "ascending", + limit: Optional[int] = None, + batch_size: int = 10000, + use_multihead_io: bool = False, + convert_special_types: bool = True, + options: Optional[Dict[str, Any]] = None, + parallelism: int = -1, + num_cpus: Optional[float] = None, + num_gpus: Optional[float] = None, + memory: Optional[float] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, +) -> Dataset: + """Create a :class:`~ray.data.Dataset` from a Kinetica database table. + + Kinetica is a distributed, in-memory analytical database designed for + real-time analytics on streaming and historical data. This function reads + data using Kinetica's native API with parallel pagination. + + Examples: + >>> import ray # doctest: +SKIP + >>> ds = ray.data.read_kinetica( # doctest: +SKIP + ... table_name="transactions", + ... url="http://localhost:9191", + ... username="admin", + ... password="password", + ... filter_expression="amount > 1000", + ... columns=["id", "customer", "amount"], + ... ) + + Args: + table_name: Name of the Kinetica table to read. + url: URL of the Kinetica server (e.g., "http://localhost:9191"). + username: Authentication username. + password: Authentication password. + columns: Specific columns to read. None reads all columns. + filter_expression: SQL WHERE clause filter (without WHERE keyword). + sort_by: Column to sort by for consistent pagination. + sort_order: "ascending" or "descending". + limit: Maximum rows to read. + batch_size: Records per API request for pagination. Default is 10,000. + use_multihead_io: If True, enables multihead I/O for parallel reads + from multiple Kinetica nodes. Can improve performance for large + datasets on clustered deployments. Default is False. + convert_special_types: If True, converts special types (arrays, JSON) + on retrieval. Default is True. + options: Additional GPUdb client options. + parallelism: This argument is deprecated. Use ``override_num_blocks``. + num_cpus: The number of CPUs to reserve for each parallel read worker. + num_gpus: The number of GPUs to reserve for each parallel read worker. + memory: The heap memory in bytes to reserve for each parallel read worker. + ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks. + concurrency: The maximum number of Ray tasks to run concurrently. + override_num_blocks: Override the number of output blocks from all read tasks. + + Returns: + :class:`~ray.data.Dataset` producing rows from the Kinetica table. + """ + from ray.data._internal.datasource.kinetica_datasource import ( + KineticaDatasource, + ) + + datasource = KineticaDatasource( + url=url, + table_name=table_name, + username=username, + password=password, + columns=columns, + filter_expression=filter_expression, + sort_by=sort_by, + sort_order=sort_order, + limit=limit, + batch_size=batch_size, + use_multihead_io=use_multihead_io, + convert_special_types=convert_special_types, + options=options, + ) + return read_datasource( + datasource, + num_cpus=num_cpus, + num_gpus=num_gpus, + memory=memory, + parallelism=parallelism, + ray_remote_args=ray_remote_args, + concurrency=concurrency, + override_num_blocks=override_num_blocks, + ) + + +@PublicAPI(stability="alpha") +def read_kinetica_sql( + sql: str, + url: str, + *, + username: Optional[str] = None, + password: Optional[str] = None, + oauth_token: Optional[str] = None, + default_schema: Optional[str] = None, + options: Optional[Dict[str, Any]] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + concurrency: Optional[int] = None, + override_num_blocks: Optional[int] = None, +) -> Dataset: + """Create a :class:`~ray.data.Dataset` from a Kinetica SQL query. + + This function uses Kinetica's DB-API 2.0 compliant interface with Ray Data's + native ``read_sql`` method. It's ideal for complex SQL queries including JOINs, + aggregations, and subqueries. + + For simple table reads with parallel pagination, consider using + :func:`~ray.data.read_kinetica` instead which uses Kinetica's native API. + + Examples: + >>> import ray # doctest: +SKIP + >>> ds = ray.data.read_kinetica_sql( # doctest: +SKIP + ... sql="SELECT t.id, t.customer, SUM(t.amount) as total " + ... "FROM transactions t " + ... "JOIN customers c ON t.customer_id = c.id " + ... "GROUP BY t.id, t.customer", + ... url="http://localhost:9191", + ... username="admin", + ... password="password", + ... ) + + Args: + sql: SQL query to execute. + url: URL of the Kinetica server (e.g., "http://localhost:9191"). + username: Authentication username. + password: Authentication password. + oauth_token: OAuth token for authentication (alternative to username/password). + default_schema: Default schema to use for queries. + options: Additional GPUdb client options. + ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks. + concurrency: The maximum number of Ray tasks to run concurrently. + override_num_blocks: Override the number of output blocks. + + Returns: + :class:`~ray.data.Dataset` producing rows from the query results. + """ + from ray.data._internal.datasource.kinetica_sql_connection import ( + create_kinetica_connection_factory, + ) + + connection_factory = create_kinetica_connection_factory( + url=url, + username=username, + password=password, + oauth_token=oauth_token, + default_schema=default_schema, + options=options, + ) + + read_kwargs: Dict[str, Any] = { + "sql": sql, + "connection_factory": connection_factory, + } + + if ray_remote_args is not None: + read_kwargs["ray_remote_args"] = ray_remote_args + + if concurrency is not None: + read_kwargs["concurrency"] = concurrency + + if override_num_blocks is not None: + read_kwargs["override_num_blocks"] = override_num_blocks + + return read_sql(**read_kwargs) + + @PublicAPI(stability="alpha") def read_bigquery( project_id: str, diff --git a/python/ray/data/tests/datasource/test_kinetica.py b/python/ray/data/tests/datasource/test_kinetica.py new file mode 100644 index 000000000000..e463c2a06bc3 --- /dev/null +++ b/python/ray/data/tests/datasource/test_kinetica.py @@ -0,0 +1,1930 @@ +""" +Tests for the Kinetica Ray Data integration. + +These tests use mocks to verify the KineticaDatasource and KineticaDatasink +work correctly without requiring a running Kinetica server. +""" + +import json +import os +from unittest.mock import MagicMock, patch + +import pyarrow as pa +import pytest + +from ray.data._internal.datasource.kinetica_datasink import ( + KineticaDatasink, + KineticaSinkMode, + KineticaTableSettings, +) +from ray.data._internal.datasource.kinetica_datasource import ( + KineticaDatasource, + _has_balanced_quotes, + _is_filter_safe, +) +from ray.data._internal.execution.interfaces.task_context import TaskContext + +# ============================================================================ +# Fixtures for Mocking GPUdb Client +# ============================================================================ + + +@pytest.fixture +def mock_gpudb_client(): + """Mock GPUdb client for datasource tests.""" + client = MagicMock() + + # Mock show_table response + client.show_table.return_value = { + "type_schemas": [ + json.dumps( + { + "fields": [ + {"name": "id", "type": "long"}, + {"name": "name", "type": "string"}, + {"name": "value", "type": "double"}, + ] + } + ) + ], + "properties": [{"id": [], "name": [], "value": []}], + "total_size": 100, + } + + # Mock get_records response + client.get_records.return_value = { + "records_json": [ + json.dumps({"id": 1, "name": "Alice", "value": 100.5}), + json.dumps({"id": 2, "name": "Bob", "value": 200.75}), + ], + "total_number_of_records": 100, + } + + return client + + +@pytest.fixture +def mock_gpudb_sink_client(): + """Mock GPUdb client for datasink tests.""" + client = MagicMock() + + # Mock table existence check + client.has_table.return_value = {"table_exists": False} + + # Mock insert_records response + client.insert_records.return_value = { + "count_inserted": 3, + "count_updated": 0, + "info": {}, + } + + # Mock show_table response + client.show_table.return_value = { + "type_ids": ["type_123"], + "type_schemas": [ + json.dumps( + { + "fields": [ + {"name": "id", "type": "long"}, + {"name": "name", "type": "string"}, + ] + } + ) + ], + "properties": [{"id": [], "name": []}], + } + + return client + + +@pytest.fixture(autouse=True) +def patch_gpudb(): + """Automatically patch GPUdb for all tests.""" + with patch("gpudb.GPUdb") as mock_gpudb_class: + mock_instance = MagicMock() + mock_gpudb_class.return_value = mock_instance + yield mock_instance + + +# ============================================================================ +# Filter Safety Tests +# ============================================================================ + + +class TestFilterSafety: + """Tests for filter expression safety validation.""" + + @pytest.mark.parametrize( + "filter_expr, is_safe", + [ + # Safe expressions + ("id > 100", True), + ("name = 'Alice' AND value > 50", True), + ("id > 100 AND name IS NOT NULL", True), + # Keywords inside string literals are allowed (not injection) + ("city = 'Union City'", True), + ("status = 'DROP_PENDING'", True), + ("name = 'Delete Me'", True), + ('comment = "ALTER this later"', True), + # Escaped quotes in strings + ("name = 'O''Brien'", True), + # Unsafe expressions - keywords outside strings + ("id = 1; DROP TABLE test;", False), + ("id > 100; SELECT * FROM users", False), + ("id IN {1, 2, 3}", False), + # Keywords outside of string context + ("id = 1 UNION SELECT * FROM secrets", False), + ("id = 1 -- comment injection", False), + ("id = 1 /* block comment */", False), + # Unclosed string literals (could bypass stripping) + ("x = ''' ; DROP TABLE t", False), + ('x = """ ; DROP TABLE t', False), + ("name = 'value", False), + ('comment = "test', False), + ], + ) + def test_is_filter_safe(self, filter_expr, is_safe): + """Test filter safety validation.""" + assert _is_filter_safe(filter_expr) == is_safe + + def test_balanced_quotes_detection(self): + """Test the _has_balanced_quotes helper function.""" + # Balanced quotes + assert _has_balanced_quotes("name = 'Alice'") is True + assert _has_balanced_quotes('status = "active"') is True + assert _has_balanced_quotes("name = 'O''Brien'") is True + assert _has_balanced_quotes('msg = "He said ""hi"""') is True + assert _has_balanced_quotes("id = 1") is True + + # Unbalanced quotes + assert _has_balanced_quotes("name = 'Alice") is False + assert _has_balanced_quotes('status = "active') is False + assert _has_balanced_quotes("x = '''") is False + + +# ============================================================================ +# KineticaDatasource Tests +# ============================================================================ + + +class TestKineticaDatasource: + """Tests for KineticaDatasource.""" + + @pytest.fixture + def datasource(self): + """Create a KineticaDatasource with test parameters.""" + with patch( + "ray.data._internal.datasource.kinetica_datasource.KineticaDatasource._init_client" + ): + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + username="admin", + password="password", + columns=["id", "name"], + filter_expression="id > 100", + batch_size=5000, + ) + return ds + + def test_init(self, datasource): + """Test datasource initialization.""" + assert datasource._url == "http://localhost:9191" + assert datasource._table_name == "test_table" + assert datasource._username == "admin" + assert datasource._columns == ["id", "name"] + assert datasource._filter_expression == "id > 100" + assert datasource._batch_size == 5000 + + def test_default_batch_size(self): + """Test default batch size.""" + with patch( + "ray.data._internal.datasource.kinetica_datasource.KineticaDatasource._init_client" + ): + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + ) + assert ds._batch_size == 10000 + + def test_unsafe_filter_rejected(self): + """Test that unsafe filter expressions are rejected without leaking input.""" + with pytest.raises(ValueError, match="unsafe patterns") as exc_info: + KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + filter_expression="id = 1; DROP TABLE test;", + ) + # Verify the error message does NOT include the raw user input + # (to prevent log injection) + assert "DROP TABLE" not in str(exc_info.value) + + def test_get_name(self, datasource): + """Test datasource name generation.""" + assert datasource.get_name() == "Kinetica(test_table)" + + @patch.object(KineticaDatasource, "_init_client") + def test_get_table_info(self, mock_init_client, mock_gpudb_client): + """Test _get_table_info method.""" + mock_init_client.return_value = mock_gpudb_client + + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + ) + + total_count, arrow_schema = ds._get_table_info(mock_gpudb_client) + + assert total_count == 100 + assert arrow_schema is not None + assert len(arrow_schema) == 3 + + @patch.object(KineticaDatasource, "_init_client") + def test_get_table_info_with_filter(self, mock_init_client, mock_gpudb_client): + """Test _get_table_info with filter expression.""" + mock_init_client.return_value = mock_gpudb_client + + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + filter_expression="id > 50", + ) + + total_count, arrow_schema = ds._get_table_info(mock_gpudb_client) + + assert total_count >= 0 + assert arrow_schema is not None + + @patch.object(KineticaDatasource, "_init_client") + def test_estimate_row_size(self, mock_init_client, mock_gpudb_client): + """Test _estimate_row_size method.""" + mock_init_client.return_value = mock_gpudb_client + + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + columns=["id", "name"], + ) + + row_size = ds._estimate_row_size(mock_gpudb_client, sample_size=100) + + assert row_size > 0 + mock_gpudb_client.get_records.assert_called() + + @patch.object(KineticaDatasource, "_init_client") + @pytest.mark.parametrize("parallelism", [1, 2, 4]) + def test_get_read_tasks(self, mock_init_client, mock_gpudb_client, parallelism): + """Test get_read_tasks with different parallelism levels.""" + mock_init_client.return_value = mock_gpudb_client + + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + ) + + read_tasks = ds.get_read_tasks(parallelism) + + assert len(read_tasks) <= parallelism + assert all(task.metadata.num_rows > 0 for task in read_tasks) + + @patch.object(KineticaDatasource, "_init_client") + def test_get_read_tasks_empty_table(self, mock_init_client, mock_gpudb_client): + """Test get_read_tasks with empty table.""" + mock_init_client.return_value = mock_gpudb_client + mock_gpudb_client.show_table.return_value = { + "type_schemas": [json.dumps({"fields": []})], + "properties": [{}], + "total_size": 0, + } + + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="empty_table", + ) + + read_tasks = ds.get_read_tasks(parallelism=2) + + assert len(read_tasks) == 0 + + @patch.object(KineticaDatasource, "_init_client") + def test_estimate_inmemory_data_size(self, mock_init_client, mock_gpudb_client): + """Test estimate_inmemory_data_size method.""" + mock_init_client.return_value = mock_gpudb_client + + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + ) + + size = ds.estimate_inmemory_data_size() + + assert size is not None + assert size > 0 + + +# ============================================================================ +# KineticaDatasink Tests +# ============================================================================ + + +class TestKineticaDatasink: + """Tests for KineticaDatasink.""" + + @pytest.fixture + def datasink(self): + """Create a KineticaDatasink with test parameters.""" + with patch( + "ray.data._internal.datasource.kinetica_datasink.KineticaDatasink._init_client" + ): + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + username="admin", + password="password", + mode=KineticaSinkMode.APPEND, + batch_size=5000, + ) + return ds + + def test_init(self, datasink): + """Test datasink initialization.""" + assert datasink._url == "http://localhost:9191" + assert datasink._table_name == "test_table" + assert datasink._username == "admin" + assert datasink._mode == KineticaSinkMode.APPEND + assert datasink._batch_size == 5000 + + def test_string_mode(self): + """Test datasink accepts string mode values.""" + with patch( + "ray.data._internal.datasource.kinetica_datasink.KineticaDatasink._init_client" + ): + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + mode="overwrite", + ) + assert ds._mode == KineticaSinkMode.OVERWRITE + + def test_table_settings(self): + """Test KineticaTableSettings configuration.""" + settings = KineticaTableSettings( + is_replicated=True, + chunk_size=1000000, + ttl=60, + primary_keys=["id"], + shard_keys=["region"], + ) + + with patch( + "ray.data._internal.datasource.kinetica_datasink.KineticaDatasink._init_client" + ): + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + table_settings=settings, + ) + + assert ds._table_settings.is_replicated is True + assert ds._table_settings.chunk_size == 1000000 + assert ds._table_settings.ttl == 60 + assert ds._table_settings.primary_keys == ["id"] + assert ds._table_settings.shard_keys == ["region"] + + def test_get_name(self, datasink): + """Test datasink name generation.""" + assert datasink.get_name() == "Kinetica(test_table)" + + def test_supports_distributed_writes(self, datasink): + """Test datasink reports distributed write support based on table readiness.""" + # Before on_write_start is called, _table_ready is False + # so distributed writes should be disabled to prevent race conditions + assert datasink.supports_distributed_writes is False + + # After table is ready, distributed writes should be enabled + datasink._table_ready = True + assert datasink.supports_distributed_writes is True + + def test_min_rows_per_write(self, datasink): + """Test min_rows_per_write property (used by Ray Data framework).""" + assert datasink.min_rows_per_write == 5000 + + @patch.object(KineticaDatasink, "_init_client") + def test_table_exists(self, mock_init_client, mock_gpudb_sink_client): + """Test _table_exists method.""" + mock_init_client.return_value = mock_gpudb_sink_client + mock_gpudb_sink_client.has_table.return_value = {"table_exists": True} + + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + ) + + exists = ds._table_exists(mock_gpudb_sink_client) + + assert exists is True + mock_gpudb_sink_client.has_table.assert_called_once() + + @patch.object(KineticaDatasink, "_init_client") + def test_drop_table(self, mock_init_client, mock_gpudb_sink_client): + """Test _drop_table method uses no_error_if_not_exists option.""" + mock_init_client.return_value = mock_gpudb_sink_client + + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + ) + + ds._drop_table(mock_gpudb_sink_client) + + mock_gpudb_sink_client.clear_table.assert_called_once_with( + table_name="test_table", + options={"no_error_if_not_exists": "true"}, + ) + + @patch.object(KineticaDatasink, "_init_client") + @patch( + "ray.data._internal.datasource.kinetica_type_utils.arrow_schema_to_kinetica_columns" + ) + def test_create_table( + self, mock_arrow_to_kinetica, mock_init_client, mock_gpudb_sink_client + ): + """Test _create_table method.""" + from gpudb import GPUdbRecordColumn, GPUdbRecordType + + mock_init_client.return_value = mock_gpudb_sink_client + + # Mock columns + mock_columns = [ + GPUdbRecordColumn( + name="id", + column_type=GPUdbRecordColumn._ColumnType.LONG, + column_properties=[], + is_nullable=False, + ), + ] + + # Mock record type + mock_record_type = MagicMock(spec=GPUdbRecordType) + mock_record_type.create_type.return_value = "type_123" + mock_record_type.schema_string = "schema_string" + mock_record_type.column_properties = {} + + with patch("gpudb.GPUdbRecordType", return_value=mock_record_type): + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + ) + + ds._create_table(mock_gpudb_sink_client, mock_columns) + + mock_gpudb_sink_client.create_table.assert_called_once() + + @patch.object(KineticaDatasink, "_init_client") + @pytest.mark.parametrize( + "mode, table_exists, should_create", + [ + (KineticaSinkMode.CREATE, False, True), + (KineticaSinkMode.APPEND, False, True), + (KineticaSinkMode.APPEND, True, False), + (KineticaSinkMode.OVERWRITE, False, True), + (KineticaSinkMode.OVERWRITE, True, True), + ], + ) + def test_on_write_start_modes( + self, + mock_init_client, + mock_gpudb_sink_client, + mode, + table_exists, + should_create, + ): + """Test on_write_start with different modes.""" + mock_init_client.return_value = mock_gpudb_sink_client + mock_gpudb_sink_client.has_table.return_value = {"table_exists": table_exists} + + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("name", pa.string()), + ] + ) + + with ( + patch.object(KineticaDatasink, "_create_table") as mock_create, + patch.object(KineticaDatasink, "_drop_table") as mock_drop, + patch.object( + KineticaDatasink, "_get_existing_record_type" + ) as mock_get_type, + patch( + "ray.data._internal.datasource.kinetica_type_utils.arrow_schema_to_kinetica_columns" + ) as mock_arrow_to_kinetica, + ): + mock_arrow_to_kinetica.return_value = [] + + # Mock existing record type + mock_record_type = MagicMock() + mock_record_type.columns = [] + mock_get_type.return_value = mock_record_type + + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + mode=mode, + schema=schema, + ) + + ds.on_write_start(schema) + + if mode == KineticaSinkMode.OVERWRITE and table_exists: + mock_drop.assert_called_once() + + if should_create: + if mode == KineticaSinkMode.APPEND and table_exists: + mock_get_type.assert_called_once() + else: + # CREATE or OVERWRITE should create table + if mode != KineticaSinkMode.APPEND or not table_exists: + mock_create.assert_called() + + @patch.object(KineticaDatasink, "_init_client") + def test_on_write_start_create_existing_table_fails( + self, mock_init_client, mock_gpudb_sink_client + ): + """Test that CREATE mode fails if table already exists.""" + from gpudb import GPUdbException + + mock_init_client.return_value = mock_gpudb_sink_client + mock_gpudb_sink_client.has_table.return_value = {"table_exists": True} + + schema = pa.schema([pa.field("id", pa.int64())]) + + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + mode=KineticaSinkMode.CREATE, + schema=schema, + ) + + with pytest.raises(GPUdbException, match="already exists"): + ds.on_write_start(schema) + + @patch.object(KineticaDatasink, "_init_client") + @patch( + "ray.data._internal.datasource.kinetica_type_utils.arrow_schema_to_kinetica_columns" + ) + @patch( + "ray.data._internal.datasource.kinetica_type_utils.convert_arrow_batch_to_records" + ) + def test_write( + self, + mock_convert_batch, + mock_arrow_to_kinetica, + mock_init_client, + mock_gpudb_sink_client, + ): + """Test write method.""" + mock_init_client.return_value = mock_gpudb_sink_client + mock_gpudb_sink_client.has_table.return_value = {"table_exists": False} + + # Mock conversion functions + mock_arrow_to_kinetica.return_value = [] + mock_convert_batch.return_value = [ + {"id": 1, "name": "Alice"}, + {"id": 2, "name": "Bob"}, + ] + + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("name", pa.string()), + ] + ) + + # Create test data + rb = pa.record_batch( + [pa.array([1, 2]), pa.array(["Alice", "Bob"])], + names=["id", "name"], + ) + block_data = pa.Table.from_batches([rb]) + + with patch.object(KineticaDatasink, "_create_table"): + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + mode=KineticaSinkMode.CREATE, + schema=schema, + ) + ds._column_defs = [] + + ctx = TaskContext(task_idx=0, op_name="test_write") + result = ds.write([block_data], ctx=ctx) + + assert "num_inserted" in result + assert "num_updated" in result + # Note: write raises RuntimeError on errors instead of returning them + + +# ============================================================================ +# Type Utils Tests +# ============================================================================ + + +class TestKineticaTypeUtils: + """Tests for type conversion utilities.""" + + def test_arrow_schema_conversion(self): + """Test converting Arrow schema to Kinetica columns.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_schema_to_kinetica_columns, + ) + + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("name", pa.string()), + pa.field("value", pa.float64()), + pa.field("active", pa.bool_()), + ] + ) + + columns = arrow_schema_to_kinetica_columns(schema) + + assert len(columns) == 4 + assert columns[0].name == "id" + assert columns[1].name == "name" + assert columns[2].name == "value" + assert columns[3].name == "active" + + def test_arrow_schema_with_keys(self): + """Test converting Arrow schema with primary/shard keys.""" + from gpudb import GPUdbColumnProperty + + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_schema_to_kinetica_columns, + ) + + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("region", pa.string()), + pa.field("value", pa.float64()), + ] + ) + + columns = arrow_schema_to_kinetica_columns( + schema, + primary_keys=["id"], + shard_keys=["region"], + ) + + assert GPUdbColumnProperty.PRIMARY_KEY in columns[0].column_properties + assert GPUdbColumnProperty.SHARD_KEY in columns[1].column_properties + + def test_arrow_schema_with_invalid_keys_rejected(self): + """Test that invalid primary/shard keys raise ValueError.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_schema_to_kinetica_columns, + ) + + schema = pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("name", pa.string()), + ] + ) + + # Invalid primary key + with pytest.raises(ValueError, match="non-existent columns"): + arrow_schema_to_kinetica_columns( + schema, + primary_keys=["nonexistent_column"], + ) + + # Invalid shard key + with pytest.raises(ValueError, match="non-existent columns"): + arrow_schema_to_kinetica_columns( + schema, + shard_keys=["also_nonexistent"], + ) + + # Both invalid + with pytest.raises(ValueError, match="non-existent columns"): + arrow_schema_to_kinetica_columns( + schema, + primary_keys=["bad_pk"], + shard_keys=["bad_sk"], + ) + + def test_arrow_to_kinetica_integer_types(self): + """Test Arrow integer types convert correctly to Kinetica.""" + from gpudb import GPUdbColumnProperty + + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_to_kinetica_type, + ) + + # Signed integers + assert arrow_to_kinetica_type(pa.int8()) == ( + "int", + [GPUdbColumnProperty.INT8], + ) + assert arrow_to_kinetica_type(pa.int16()) == ( + "int", + [GPUdbColumnProperty.INT16], + ) + assert arrow_to_kinetica_type(pa.int32()) == ("int", []) + assert arrow_to_kinetica_type(pa.int64()) == ("long", []) + + # Unsigned integers + assert arrow_to_kinetica_type(pa.uint8()) == ( + "int", + [GPUdbColumnProperty.INT16], + ) + assert arrow_to_kinetica_type(pa.uint16()) == ("int", []) + assert arrow_to_kinetica_type(pa.uint32()) == ("long", []) + assert arrow_to_kinetica_type(pa.uint64()) == ( + "string", + [GPUdbColumnProperty.ULONG], + ) + + def test_arrow_to_kinetica_float_types(self): + """Test Arrow float types convert correctly to Kinetica.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_to_kinetica_type, + ) + + assert arrow_to_kinetica_type(pa.float32()) == ("float", []) + assert arrow_to_kinetica_type(pa.float64()) == ("double", []) + + def test_arrow_to_kinetica_datetime_types(self): + """Test Arrow date/time types convert correctly to Kinetica.""" + from gpudb import GPUdbColumnProperty + + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_to_kinetica_type, + ) + + assert arrow_to_kinetica_type(pa.date32()) == ( + "string", + [GPUdbColumnProperty.DATE], + ) + assert arrow_to_kinetica_type(pa.date64()) == ( + "string", + [GPUdbColumnProperty.DATE], + ) + assert arrow_to_kinetica_type(pa.time32("ms")) == ( + "string", + [GPUdbColumnProperty.TIME], + ) + assert arrow_to_kinetica_type(pa.time64("us")) == ( + "string", + [GPUdbColumnProperty.TIME], + ) + assert arrow_to_kinetica_type(pa.timestamp("us")) == ( + "string", + [GPUdbColumnProperty.DATETIME], + ) + + def test_arrow_to_kinetica_other_types(self): + """Test Arrow boolean, string, binary types convert correctly.""" + from gpudb import GPUdbColumnProperty + + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_to_kinetica_type, + ) + + assert arrow_to_kinetica_type(pa.bool_()) == ( + "int", + [GPUdbColumnProperty.BOOLEAN], + ) + assert arrow_to_kinetica_type(pa.string()) == ("string", []) + assert arrow_to_kinetica_type(pa.binary()) == ("bytes", []) + + def test_kinetica_to_arrow_case_insensitive(self): + """Test Kinetica to Arrow conversion is case-insensitive.""" + from gpudb import GPUdbRecordColumn + + from ray.data._internal.datasource.kinetica_type_utils import ( + kinetica_to_arrow_type, + ) + + # Helper to create mock column + def make_col(col_type, props): + col = GPUdbRecordColumn( + name="test", + column_type=col_type, + column_properties=props, + ) + return col + + # Test lowercase properties + assert ( + kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.INT, ["boolean"]) + ) + == pa.bool_() + ) + assert ( + kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.INT, ["int8"]) + ) + == pa.int8() + ) + assert ( + kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.STRING, ["date"]) + ) + == pa.date32() + ) + assert kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.STRING, ["time"]) + ) == pa.time64("us") + assert kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.STRING, ["datetime"]) + ) == pa.timestamp("us") + + # Test uppercase properties + assert ( + kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.INT, ["BOOLEAN"]) + ) + == pa.bool_() + ) + assert ( + kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.INT, ["INT8"]) + ) + == pa.int8() + ) + assert ( + kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.STRING, ["DATE"]) + ) + == pa.date32() + ) + assert kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.STRING, ["DATETIME"]) + ) == pa.timestamp("us") + + # Test mixed case properties + assert ( + kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.INT, ["Boolean"]) + ) + == pa.bool_() + ) + assert kinetica_to_arrow_type( + make_col(GPUdbRecordColumn._ColumnType.STRING, ["DateTime"]) + ) == pa.timestamp("us") + + def test_convert_arrow_batch_datetime_serialization(self): + """Test date/time values serialize to ISO format strings.""" + from datetime import date, datetime, time + + from gpudb import GPUdbColumnProperty, GPUdbRecordColumn + + from ray.data._internal.datasource.kinetica_type_utils import ( + convert_arrow_batch_to_records, + ) + + # Create test data with date/time columns + data = { + "id": [1, 2], + "date_col": [date(2024, 1, 15), date(2024, 12, 31)], + "time_col": [time(10, 30, 45), time(23, 59, 59)], + "datetime_col": [ + datetime(2024, 1, 15, 10, 30, 45), + datetime(2024, 12, 31, 23, 59, 59), + ], + } + batch = pa.RecordBatch.from_pydict(data) + + # Create column definitions + columns = [ + GPUdbRecordColumn( + name="id", + column_type=GPUdbRecordColumn._ColumnType.INT, + column_properties=[], + ), + GPUdbRecordColumn( + name="date_col", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=[GPUdbColumnProperty.DATE], + ), + GPUdbRecordColumn( + name="time_col", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=[GPUdbColumnProperty.TIME], + ), + GPUdbRecordColumn( + name="datetime_col", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=[GPUdbColumnProperty.DATETIME], + ), + ] + + records = convert_arrow_batch_to_records(batch, columns) + + # Verify date serialization + assert records[0]["date_col"] == "2024-01-15" + assert records[1]["date_col"] == "2024-12-31" + + # Verify time serialization + assert records[0]["time_col"] == "10:30:45" + assert records[1]["time_col"] == "23:59:59" + + # Verify datetime serialization (ISO format) + assert "2024-01-15" in records[0]["datetime_col"] + assert "10:30:45" in records[0]["datetime_col"] + + def test_convert_arrow_batch_datetime_json_serializable(self): + """Test that records with date/time are JSON serializable.""" + import json + from datetime import date, datetime, time + + from gpudb import GPUdbColumnProperty, GPUdbRecordColumn + + from ray.data._internal.datasource.kinetica_type_utils import ( + convert_arrow_batch_to_records, + ) + + data = { + "id": [1], + "date_col": [date(2024, 1, 15)], + "time_col": [time(10, 30, 45)], + "datetime_col": [datetime(2024, 1, 15, 10, 30, 45)], + } + batch = pa.RecordBatch.from_pydict(data) + + columns = [ + GPUdbRecordColumn( + name="id", + column_type=GPUdbRecordColumn._ColumnType.INT, + column_properties=[], + ), + GPUdbRecordColumn( + name="date_col", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=[GPUdbColumnProperty.DATE], + ), + GPUdbRecordColumn( + name="time_col", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=[GPUdbColumnProperty.TIME], + ), + GPUdbRecordColumn( + name="datetime_col", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=[GPUdbColumnProperty.DATETIME], + ), + ] + + records = convert_arrow_batch_to_records(batch, columns) + + # This should not raise TypeError + json_str = json.dumps(records[0]) + parsed = json.loads(json_str) + + assert isinstance(parsed["date_col"], str) + assert isinstance(parsed["time_col"], str) + assert isinstance(parsed["datetime_col"], str) + + def test_convert_arrow_batch_null_datetime(self): + """Test that null date/time values are handled correctly.""" + from datetime import date + + from gpudb import GPUdbColumnProperty, GPUdbRecordColumn + + from ray.data._internal.datasource.kinetica_type_utils import ( + convert_arrow_batch_to_records, + ) + + data = { + "id": [1, 2], + "date_col": [date(2024, 1, 15), None], + } + batch = pa.RecordBatch.from_pydict(data) + + columns = [ + GPUdbRecordColumn( + name="id", + column_type=GPUdbRecordColumn._ColumnType.INT, + column_properties=[], + ), + GPUdbRecordColumn( + name="date_col", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=[GPUdbColumnProperty.DATE], + ), + ] + + records = convert_arrow_batch_to_records(batch, columns) + + assert records[0]["date_col"] == "2024-01-15" + assert records[1]["date_col"] is None + + def test_convert_records_to_arrow_error_handling(self): + """Test that type conversion errors include column name.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + convert_records_to_arrow_table, + ) + + schema = pa.schema([pa.field("int_col", pa.int64())]) + bad_records = [{"int_col": "not_an_integer"}] + + with pytest.raises(pa.ArrowTypeError) as exc_info: + convert_records_to_arrow_table(bad_records, schema) + + error_msg = str(exc_info.value) + assert "int_col" in error_msg + + def test_vector_bytes_json_serialization(self): + """Test that vector (bytes) can be JSON serialized via base64.""" + import base64 + import json + import struct + + # Simulate the custom serializer used in _write_simple + def json_serializer(obj): + if isinstance(obj, bytes): + return base64.b64encode(obj).decode("ascii") + raise TypeError(f"Object of type {type(obj)} is not JSON serializable") + + # Create a 3D vector + vector_bytes = struct.pack("3f", 1.0, 2.0, 3.0) + record = {"id": 1, "embedding": vector_bytes} + + # Should serialize without error + json_str = json.dumps(record, default=json_serializer) + parsed = json.loads(json_str) + + assert isinstance(parsed["embedding"], str) + + # Verify we can decode back to original bytes + decoded = base64.b64decode(parsed["embedding"]) + assert decoded == vector_bytes + + def test_decimal_scale_zero_preserved(self): + """Test that decimal scale=0 is preserved, not treated as falsy.""" + from gpudb import GPUdbRecordColumn + + from ray.data._internal.datasource.kinetica_type_utils import ( + kinetica_to_arrow_type, + ) + + # Create a decimal column with explicit scale=0 + col = GPUdbRecordColumn( + name="amount", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=["decimal"], + precision=10, + scale=0, # Integer decimal (no decimal places) + ) + + arrow_type = kinetica_to_arrow_type(col) + + # Verify it's a decimal type + assert pa.types.is_decimal(arrow_type), f"Expected decimal, got {arrow_type}" + # Verify scale is 0, not the default (4) + assert arrow_type.scale == 0, ( + f"Expected scale=0, got {arrow_type.scale}. " + "Scale=0 should not be treated as falsy." + ) + assert arrow_type.precision == 10 + + def test_decimal_scale_none_uses_default(self): + """Test that decimal with scale=None uses the default scale.""" + from gpudb import GPUdbRecordColumn + + from ray.data._internal.datasource.kinetica_type_utils import ( + kinetica_to_arrow_type, + ) + + # Create a decimal column without explicit scale + col = GPUdbRecordColumn( + name="amount", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=["decimal"], + precision=18, + scale=None, # Should use default + ) + + arrow_type = kinetica_to_arrow_type(col) + + assert pa.types.is_decimal(arrow_type) + # Default scale is 4 + assert arrow_type.scale == GPUdbRecordColumn.DEFAULT_DECIMAL_SCALE + + def test_fixed_size_list_float_to_vector(self): + """Test fixed-size list of floats maps to Kinetica vector type.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_to_kinetica_type, + ) + + # 3D float vector + arrow_type = pa.list_(pa.float32(), 3) + kinetica_type, props = arrow_to_kinetica_type(arrow_type) + + assert kinetica_type == "bytes", f"Expected 'bytes', got {kinetica_type}" + assert "vector(3)" in props, f"Expected 'vector(3)' in props, got {props}" + + def test_fixed_size_list_double_to_vector(self): + """Test fixed-size list of doubles maps to Kinetica vector type.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_to_kinetica_type, + ) + + # 128D double vector + arrow_type = pa.list_(pa.float64(), 128) + kinetica_type, props = arrow_to_kinetica_type(arrow_type) + + assert kinetica_type == "bytes" + assert "vector(128)" in props + + def test_fixed_size_list_int_to_array(self): + """Test fixed-size list of non-floats maps to Kinetica array type.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_to_kinetica_type, + ) + + # Fixed-size int array + arrow_type = pa.list_(pa.int32(), 4) + kinetica_type, props = arrow_to_kinetica_type(arrow_type) + + assert kinetica_type == "string", f"Expected 'string', got {kinetica_type}" + assert "array(int,4)" in props, f"Expected 'array(int,4)' in props, got {props}" + + def test_variable_list_to_array(self): + """Test variable-length list maps to Kinetica array type without size.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + arrow_to_kinetica_type, + ) + + # Variable-length list + arrow_type = pa.list_(pa.float64()) + kinetica_type, props = arrow_to_kinetica_type(arrow_type) + + assert kinetica_type == "string" + # Should be array(double) without size + assert ( + "array(double)" in props + ), f"Expected 'array(double)' in props, got {props}" + + def test_convert_arrow_batch_null_columns(self): + """Test convert_arrow_batch_to_records handles None columns gracefully.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + convert_arrow_batch_to_records, + ) + + # Create a simple batch + batch = pa.RecordBatch.from_pydict( + { + "id": [1, 2, 3], + "name": ["Alice", "Bob", "Charlie"], + } + ) + + # Should not raise TypeError when columns is None + records = convert_arrow_batch_to_records(batch, None) + + assert len(records) == 3 + assert records[0]["id"] == 1 + assert records[0]["name"] == "Alice" + + def test_convert_arrow_batch_vector_invalid_values_error(self): + """Test that vector serialization with invalid values raises ValueError.""" + from gpudb import GPUdbRecordColumn + + from ray.data._internal.datasource.kinetica_type_utils import ( + convert_arrow_batch_to_records, + ) + + # Create a batch with a list that should be treated as a vector + # but contains non-float values + batch = pa.RecordBatch.from_pydict( + { + "id": [1], + "embedding": [["not", "floats", "here"]], # Strings instead of floats + } + ) + + # Create column definitions with vector type + columns = [ + GPUdbRecordColumn( + name="id", + column_type=GPUdbRecordColumn._ColumnType.INT, + column_properties=[], + ), + GPUdbRecordColumn( + name="embedding", + column_type=GPUdbRecordColumn._ColumnType.BYTES, + column_properties=["vector(3)"], + ), + ] + + # Should raise ValueError with helpful message including column name + with pytest.raises(ValueError, match="embedding"): + convert_arrow_batch_to_records(batch, columns) + + +# ============================================================================ +# Datasource Validation Tests +# ============================================================================ + + +class TestKineticaDatasourceValidation: + """Tests for input validation in KineticaDatasource.""" + + def test_base_class_attributes_initialized(self): + """Test that base class mixin attributes are properly initialized. + + KineticaDatasource must call super().__init__() to initialize + _predicate_expr from _DatasourcePredicatePushdownMixin. + """ + with patch( + "ray.data._internal.datasource.kinetica_datasource." + "KineticaDatasource._init_client" + ): + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + ) + + # These attributes are set by base class __init__ + # If super().__init__() wasn't called, these would raise AttributeError + assert hasattr(ds, "_predicate_expr") + assert ds._predicate_expr is None # Initial value + + def test_invalid_sort_order_rejected(self): + """Test that invalid sort_order values are rejected.""" + with pytest.raises(ValueError, match="Invalid sort_order"): + KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + sort_order="invalid", + ) + + def test_valid_sort_orders_accepted(self): + """Test that valid sort_order values are accepted.""" + with patch( + "ray.data._internal.datasource.kinetica_datasource." + "KineticaDatasource._init_client" + ): + # ascending should work + ds1 = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + sort_order="ascending", + ) + assert ds1._sort_order == "ascending" + + # descending should work + ds2 = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + sort_order="descending", + ) + assert ds2._sort_order == "descending" + + +# ============================================================================ +# Datasink Validation Tests +# ============================================================================ + + +class TestKineticaDatasinkValidation: + """Tests for input validation in KineticaDatasink.""" + + def test_base_class_attributes_initialized(self): + """Test that base class is properly initialized. + + KineticaDatasink must call super().__init__() to ensure + the Datasink base class initializes any required state. + """ + with patch( + "ray.data._internal.datasource.kinetica_datasink." + "KineticaDatasink._init_client" + ): + sink = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + ) + + # Verify the datasink is a proper Datasink instance + # and that super().__init__() was called (no AttributeError) + from ray.data.datasource.datasink import Datasink + + assert isinstance(sink, Datasink) + + +# ============================================================================ +# Datasink Serialization Tests +# ============================================================================ + + +class TestKineticaDatasinkSerialization: + """Tests for column serialization in KineticaDatasink.""" + + def test_decimal_columns_preserve_precision_scale(self): + """Test that decimal column precision/scale survives serialization.""" + from gpudb import GPUdbRecordColumn + + with patch( + "ray.data._internal.datasource.kinetica_datasink." + "KineticaDatasink._init_client" + ): + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + ) + + # Create a decimal column with specific precision and scale + decimal_col = GPUdbRecordColumn( + name="amount", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=["decimal"], + is_nullable=False, + precision=10, + scale=2, + ) + + # Serialize and deserialize + dicts = ds._columns_to_dicts([decimal_col]) + restored = ds._dicts_to_columns(dicts) + + # Verify precision and scale are preserved + assert restored[0].precision == 10 + assert restored[0].scale == 2 + + def test_non_decimal_columns_no_precision_scale(self): + """Test that non-decimal columns don't include precision/scale.""" + from gpudb import GPUdbRecordColumn + + with patch( + "ray.data._internal.datasource.kinetica_datasink." + "KineticaDatasink._init_client" + ): + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + ) + + # Create a regular string column + string_col = GPUdbRecordColumn( + name="name", + column_type=GPUdbRecordColumn._ColumnType.STRING, + column_properties=[], + is_nullable=True, + ) + + # Serialize + dicts = ds._columns_to_dicts([string_col]) + + # Verify no precision/scale in dict (they weren't set) + assert "precision" not in dicts[0] or dicts[0].get("precision") is None + assert "scale" not in dicts[0] or dicts[0].get("scale") is None + + +# ============================================================================ +# GPUdbTable Creation Helper Tests +# ============================================================================ + + +class TestTryCreateGpudbTable: + """Tests for _try_create_gpudb_table helper method.""" + + @patch.object(KineticaDatasink, "_init_client") + @patch.object(KineticaDatasink, "_create_gpudb_table") + def test_success_returns_gpudb_table( + self, mock_create_gpudb_table, mock_init_client + ): + """Test that successful creation returns the GPUdbTable.""" + mock_client = MagicMock() + mock_init_client.return_value = mock_client + + mock_gpudb_table = MagicMock() + mock_create_gpudb_table.return_value = mock_gpudb_table + + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + use_multihead=False, + ) + + result = ds._try_create_gpudb_table(mock_client) + + assert result == mock_gpudb_table + mock_create_gpudb_table.assert_called_once_with(mock_client, table_exists=False) + + @patch.object(KineticaDatasink, "_init_client") + @patch.object(KineticaDatasink, "_create_gpudb_table") + def test_failure_with_multihead_raises( + self, mock_create_gpudb_table, mock_init_client + ): + """Test that failure with multihead=True raises RuntimeError.""" + mock_client = MagicMock() + mock_init_client.return_value = mock_client + + mock_create_gpudb_table.side_effect = Exception("Connection failed") + + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + use_multihead=True, + ) + + with pytest.raises(RuntimeError, match="multihead ingest"): + ds._try_create_gpudb_table(mock_client) + + @patch.object(KineticaDatasink, "_init_client") + @patch.object(KineticaDatasink, "_create_gpudb_table") + def test_failure_without_multihead_returns_none( + self, mock_create_gpudb_table, mock_init_client + ): + """Test that failure with multihead=False returns None and logs warning.""" + mock_client = MagicMock() + mock_init_client.return_value = mock_client + + mock_create_gpudb_table.side_effect = Exception("Connection failed") + + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + use_multihead=False, + ) + + result = ds._try_create_gpudb_table(mock_client) + + assert result is None + + +# ============================================================================ +# Deferred Table Creation Tests +# ============================================================================ + + +class TestKineticaDatasinkDeferredCreation: + """Tests for deferred table creation in KineticaDatasink. + + Note: Since supports_distributed_writes returns False for deferred creation, + only a single worker runs - no race condition handling is needed. + """ + + @patch.object(KineticaDatasink, "_init_client") + @patch.object(KineticaDatasink, "_create_table") + @patch.object(KineticaDatasink, "_create_gpudb_table") + @patch( + "ray.data._internal.datasource.kinetica_type_utils." + "arrow_schema_to_kinetica_columns" + ) + @patch( + "ray.data._internal.datasource.kinetica_type_utils." + "convert_arrow_batch_to_records" + ) + def test_deferred_creation_creates_table( + self, + mock_convert, + mock_arrow_to_kinetica, + mock_create_gpudb_table, + mock_create_table, + mock_init_client, + ): + """Test deferred creation creates table. + + Note: _create_table sets _schema_string and _column_properties, + so no separate _get_existing_record_type call is needed. + """ + mock_client = MagicMock() + mock_init_client.return_value = mock_client + + mock_arrow_to_kinetica.return_value = [] + mock_convert.return_value = [{"id": 1}] + mock_create_gpudb_table.return_value = None + + # Create datasink without schema (deferred creation) + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + mode=KineticaSinkMode.APPEND, + use_multihead=False, + ) + ds._column_defs = None # Simulate deferred creation + + # Create test data + rb = pa.record_batch([pa.array([1])], names=["id"]) + block_data = pa.Table.from_batches([rb]) + + ctx = TaskContext(task_idx=0, op_name="test_write") + ds.write([block_data], ctx=ctx) + + # Verify table was created + mock_create_table.assert_called_once() + # _create_table sets _schema_string/_column_properties, + # so _get_existing_record_type is not called (no redundant network call) + + @patch.object(KineticaDatasink, "_init_client") + @patch.object(KineticaDatasink, "_create_table") + @patch( + "ray.data._internal.datasource.kinetica_type_utils." + "arrow_schema_to_kinetica_columns" + ) + def test_deferred_creation_error_propagated( + self, + mock_arrow_to_kinetica, + mock_create_table, + mock_init_client, + ): + """Test that errors during deferred creation are propagated.""" + mock_client = MagicMock() + mock_init_client.return_value = mock_client + + # Simulate a real error + mock_create_table.side_effect = Exception("Connection refused") + + mock_arrow_to_kinetica.return_value = [] + + # Create datasink without schema + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + mode=KineticaSinkMode.APPEND, + use_multihead=False, + ) + ds._column_defs = None + + # Create test data + rb = pa.record_batch([pa.array([1])], names=["id"]) + block_data = pa.Table.from_batches([rb]) + + ctx = TaskContext(task_idx=0, op_name="test_write") + + # Should raise the real error + with pytest.raises(Exception, match="Connection refused"): + ds.write([block_data], ctx=ctx) + + def test_supports_distributed_writes_false_for_deferred(self): + """Test that distributed writes are disabled for deferred creation. + + This ensures only a single worker runs when the table doesn't exist + and schema is unknown, preventing race conditions. + """ + with patch( + "ray.data._internal.datasource.kinetica_datasink." + "KineticaDatasink._init_client" + ): + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + mode=KineticaSinkMode.APPEND, + ) + # _table_ready is False when deferred + assert ds._table_ready is False + assert ds.supports_distributed_writes is False + + +# ============================================================================ +# Module Import Tests +# ============================================================================ + + +class TestModuleImports: + """Tests for module imports and API exposure.""" + + def test_read_kinetica_importable(self): + """Test read_kinetica is importable from ray.data.""" + try: + from ray.data import read_kinetica + + assert callable(read_kinetica) + except ImportError: + pytest.skip("read_kinetica not available in this Ray build") + + def test_read_kinetica_sql_importable(self): + """Test read_kinetica_sql is importable from ray.data.""" + try: + from ray.data import read_kinetica_sql + + assert callable(read_kinetica_sql) + except ImportError: + pytest.skip("read_kinetica_sql not available in this Ray build") + + def test_datasource_importable(self): + """Test KineticaDatasource is importable.""" + from ray.data._internal.datasource.kinetica_datasource import ( + KineticaDatasource, + ) + + assert KineticaDatasource is not None + + def test_datasink_importable(self): + """Test KineticaDatasink is importable.""" + from ray.data._internal.datasource.kinetica_datasink import ( + KineticaDatasink, + KineticaSinkMode, + KineticaTableSettings, + ) + + assert KineticaDatasink is not None + assert KineticaSinkMode is not None + assert KineticaTableSettings is not None + + def test_sql_connection_factory_importable(self): + """Test SQL connection factory is importable.""" + from ray.data._internal.datasource.kinetica_sql_connection import ( + KineticaConnectionFactory, + create_kinetica_connection_factory, + ) + + assert KineticaConnectionFactory is not None + assert callable(create_kinetica_connection_factory) + + def test_create_gpudb_client_importable(self): + """Test create_gpudb_client shared factory is importable.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + create_gpudb_client, + ) + + assert callable(create_gpudb_client) + + +# ============================================================================ +# Client Factory Tests +# ============================================================================ + + +class TestCreateGpudbClient: + """Tests for the shared create_gpudb_client factory function.""" + + def test_create_client_basic(self, patch_gpudb): + """Test basic client creation with minimal parameters.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + create_gpudb_client, + ) + + client = create_gpudb_client(url="http://localhost:9191") + assert client is not None + + def test_create_client_with_auth(self, patch_gpudb): + """Test client creation with authentication.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + create_gpudb_client, + ) + + client = create_gpudb_client( + url="http://localhost:9191", + username="admin", + password="password123", + ) + assert client is not None + + def test_create_client_with_options(self, patch_gpudb): + """Test client creation with additional options.""" + from ray.data._internal.datasource.kinetica_type_utils import ( + create_gpudb_client, + ) + + client = create_gpudb_client( + url="http://localhost:9191", + username="admin", + password="password", + options={"timeout": 30000}, + ) + assert client is not None + + def test_datasource_uses_shared_factory(self): + """Test that KineticaDatasource uses the shared factory.""" + with patch( + "ray.data._internal.datasource.kinetica_type_utils.create_gpudb_client" + ) as mock_factory: + mock_factory.return_value = MagicMock() + + ds = KineticaDatasource( + url="http://localhost:9191", + table_name="test_table", + username="admin", + password="password", + ) + + ds._init_client() + + mock_factory.assert_called_once_with( + url="http://localhost:9191", + username="admin", + password="password", + options={}, + ) + + def test_datasink_uses_shared_factory(self): + """Test that KineticaDatasink uses the shared factory.""" + with patch( + "ray.data._internal.datasource.kinetica_type_utils.create_gpudb_client" + ) as mock_factory: + mock_factory.return_value = MagicMock() + + ds = KineticaDatasink( + url="http://localhost:9191", + table_name="test_table", + username="admin", + password="password", + ) + + ds._init_client() + + mock_factory.assert_called_once_with( + url="http://localhost:9191", + username="admin", + password="password", + options={}, + ) + + +# ============================================================================ +# Integration Tests (require Kinetica server) +# ============================================================================ + + +@pytest.mark.skipif( + not os.environ.get("KINETICA_URL"), + reason="Integration tests require KINETICA_URL environment variable", +) +class TestKineticaIntegration: + """Integration tests requiring a running Kinetica server.""" + + @pytest.fixture + def connection_params(self): + """Get connection parameters from environment.""" + return { + "url": os.environ.get("KINETICA_URL", "http://localhost:9191"), + "username": os.environ.get("KINETICA_USER", "admin"), + "password": os.environ.get("KINETICA_PASS", ""), + } + + def test_read_simple_query(self, connection_params): + """Test reading data from a Kinetica table.""" + import ray + + if not ray.is_initialized(): + ray.init(ignore_reinit_error=True) + + try: + ds = ray.data.read_kinetica( + table_name="ki_home.ki_catalog_ddl", # System table that should exist + **connection_params, + limit=10, + ) + + count = ds.count() + assert count >= 0 # Table might be empty but query should work + + finally: + pass + + def test_write_and_read_roundtrip(self, connection_params): + """Test writing and reading back data.""" + import ray + + if not ray.is_initialized(): + ray.init(ignore_reinit_error=True) + + table_name = "test_ray_roundtrip" + + try: + # Create test data + data = [ + {"id": 1, "name": "Alice", "value": 100.5}, + {"id": 2, "name": "Bob", "value": 200.75}, + {"id": 3, "name": "Charlie", "value": 300.25}, + ] + ds = ray.data.from_items(data) + + # Write to Kinetica + ds.write_kinetica( + table_name=table_name, + mode="overwrite", + **connection_params, + ) + + # Read back + read_ds = ray.data.read_kinetica( + table_name=table_name, + **connection_params, + ) + + # Verify + assert read_ds.count() == 3 + + finally: + # Cleanup: drop the test table + try: + from gpudb import GPUdb + + client = GPUdb( + host=connection_params["url"], + username=connection_params.get("username"), + password=connection_params.get("password"), + ) + client.clear_table(table_name=table_name) + except Exception: + pass + + def test_read_with_filter(self, connection_params): + """Test reading with a filter expression.""" + import ray + + if not ray.is_initialized(): + ray.init(ignore_reinit_error=True) + + table_name = "test_ray_filter" + + try: + # Create test data + data = [{"id": i, "value": i * 10} for i in range(100)] + ds = ray.data.from_items(data) + + # Write to Kinetica + ds.write_kinetica( + table_name=table_name, + mode="overwrite", + **connection_params, + ) + + # Read with filter + read_ds = ray.data.read_kinetica( + table_name=table_name, + filter_expression="value >= 500", + **connection_params, + ) + + # Verify filter worked + count = read_ds.count() + assert count == 50 # ids 50-99 have values >= 500 + + finally: + # Cleanup + try: + from gpudb import GPUdb + + client = GPUdb( + host=connection_params["url"], + username=connection_params.get("username"), + password=connection_params.get("password"), + ) + client.clear_table(table_name=table_name) + except Exception: + pass + + def test_read_specific_columns(self, connection_params): + """Test reading specific columns.""" + import ray + + if not ray.is_initialized(): + ray.init(ignore_reinit_error=True) + + table_name = "test_ray_columns" + + try: + # Create test data + data = [ + {"id": 1, "name": "Alice", "value": 100.5, "extra": "data"}, + ] + ds = ray.data.from_items(data) + + # Write to Kinetica + ds.write_kinetica( + table_name=table_name, + mode="overwrite", + **connection_params, + ) + + # Read specific columns + read_ds = ray.data.read_kinetica( + table_name=table_name, + columns=["id", "name"], + **connection_params, + ) + + # Verify only requested columns present + row = read_ds.take(1)[0] + assert "id" in row + assert "name" in row + assert "value" not in row + assert "extra" not in row + + finally: + # Cleanup + try: + from gpudb import GPUdb + + client = GPUdb( + host=connection_params["url"], + username=connection_params.get("username"), + password=connection_params.get("password"), + ) + client.clear_table(table_name=table_name) + except Exception: + pass + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])