Skip to content

Commit d5a4aa5

Browse files
committed
Fix Kinetica datasource/datasink bugs
- Remove redundant value is not None checks in convert_arrow_batch_to_records (the None case is already handled earlier in the function) - Recompute records_per_task after parallelism cap in get_read_tasks to ensure even distribution of work across tasks - Pass PyArrow schema instead of Ray Data Schema to KineticaDatasink in write_kinetica to fix type mismatch Signed-off-by: anindyam1969 <amukherjee@kinetica.com>
1 parent 1dfe903 commit d5a4aa5

3 files changed

Lines changed: 18 additions & 4 deletions

File tree

python/ray/data/_internal/datasource/kinetica_datasource.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,10 @@ def get_read_tasks(
623623
# This handles the case where parallelism > total_count
624624
effective_parallelism = min(effective_parallelism, self._total_count)
625625

626+
# Recompute records_per_task after capping parallelism to ensure
627+
# even distribution of work across tasks
628+
records_per_task = max(1, self._total_count // effective_parallelism)
629+
626630
read_tasks = []
627631
offset = 0
628632

python/ray/data/_internal/datasource/kinetica_type_utils.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -604,19 +604,22 @@ def convert_arrow_batch_to_records(
604604
elif isinstance(value, date):
605605
record[col_name] = value.isoformat()
606606
else:
607-
record[col_name] = str(value) if value is not None else None
607+
# value is not None here (handled by if block at line 569)
608+
record[col_name] = str(value)
608609
elif col_type == "time":
609610
# Convert time to ISO format string (HH:MM:SS.ffffff)
610611
if isinstance(value, time):
611612
record[col_name] = value.isoformat()
612613
else:
613-
record[col_name] = str(value) if value is not None else None
614+
# value is not None here (handled by if block at line 569)
615+
record[col_name] = str(value)
614616
elif col_type in ("datetime", "timestamp"):
615617
# Convert datetime to ISO format string (YYYY-MM-DDTHH:MM:SS.ffffff)
616618
if isinstance(value, datetime):
617619
record[col_name] = value.isoformat()
618620
else:
619-
record[col_name] = str(value) if value is not None else None
621+
# value is not None here (handled by if block at line 569)
622+
record[col_name] = str(value)
620623
else:
621624
# Handle any remaining date/time types that weren't detected
622625
# by column properties

python/ray/data/dataset.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5273,13 +5273,20 @@ def write_kinetica(
52735273
"""
52745274
from ray.data._internal.datasource.kinetica_datasink import KineticaDatasink
52755275

5276+
# Extract the underlying PyArrow schema from Ray Data Schema.
5277+
# KineticaDatasink expects pa.Schema, not ray.data.Schema.
5278+
ray_schema = self.schema()
5279+
pa_schema = (
5280+
ray_schema.base_schema if hasattr(ray_schema, "base_schema") else ray_schema
5281+
)
5282+
52765283
datasink = KineticaDatasink(
52775284
url=url,
52785285
table_name=table_name,
52795286
username=username,
52805287
password=password,
52815288
mode=mode,
5282-
schema=self.schema(),
5289+
schema=pa_schema,
52835290
table_settings=table_settings,
52845291
batch_size=batch_size,
52855292
use_multihead=use_multihead,

0 commit comments

Comments
 (0)