From 4f362116e6fcacf7e992a736684574395c6b1ed0 Mon Sep 17 00:00:00 2001 From: fadil4u Date: Sun, 28 Dec 2025 14:36:32 +0530 Subject: [PATCH 1/4] lazy execution for ibis --- datatune/core/ibis/lazy_pipeline.py | 49 +++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 datatune/core/ibis/lazy_pipeline.py diff --git a/datatune/core/ibis/lazy_pipeline.py b/datatune/core/ibis/lazy_pipeline.py new file mode 100644 index 0000000..a33e3b0 --- /dev/null +++ b/datatune/core/ibis/lazy_pipeline.py @@ -0,0 +1,49 @@ +class LazyTable: + + def __init__(self, plan): + self.plan = plan + + def execute(self): + if isinstance(self.plan, PlanNode): + return self.plan.execute() + else: + return self.plan + + def show_plan(self): + return repr(self.plan) + +class PlanNode: + + def execute(self): + raise NotImplementedError("Subclasses must implement execute()") + +class MapNode(PlanNode): + + def __init__(self, map_obj, source): + self.map_obj = map_obj + self.source = source + + def execute(self): + if isinstance(self.source, LazyTable): + table = self.source.execute() + else: + table = self.source + + return self.map_obj(self.map_obj.llm, table) + +class FilterNode(PlanNode): + def __init__(self, filter_obj, source): + self.filter_obj = filter_obj + self.source = source + + def execute(self): + if isinstance(self.source, LazyTable): + table = self.source.execute() + else: + table = self.source + + return self.filter_obj(self.filter_obj.llm, table) + + + + From 9ffe1f67bb557c1df1d6d3aa036cb5cfe8ba7697 Mon Sep 17 00:00:00 2001 From: fadil4u Date: Sun, 28 Dec 2025 14:37:01 +0530 Subject: [PATCH 2/4] refac --- datatune/core/filter.py | 10 +++++++--- datatune/core/map.py | 11 ++++++++--- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/datatune/core/filter.py b/datatune/core/filter.py index 2421fd1..8001638 100644 --- a/datatune/core/filter.py +++ b/datatune/core/filter.py @@ -1,7 +1,8 @@ def _is_ibis_table(obj): try: import ibis - return isinstance(obj, ibis.Table) + from .ibis.lazy_pipeline import LazyTable + return isinstance(obj, (ibis.Table, LazyTable)) except ImportError: return False @@ -23,10 +24,13 @@ def apply(llm, data): )(llm, data) elif _is_ibis_table(data): from .ibis.filter_ibis import _filter_ibis - return _filter_ibis( + from .ibis.lazy_pipeline import LazyTable, FilterNode + filter_obj = _filter_ibis( prompt=prompt, input_fields=input_fields, - )(llm, data) + ) + filter_obj.llm = llm + return LazyTable(FilterNode(filter_obj, data)) raise TypeError(f"Unsupported data type: {type(data)}") return apply diff --git a/datatune/core/map.py b/datatune/core/map.py index a5113ef..a84a5bd 100644 --- a/datatune/core/map.py +++ b/datatune/core/map.py @@ -1,7 +1,9 @@ + def _is_ibis_table(obj): try: import ibis - return isinstance(obj, ibis.Table) + from .ibis.lazy_pipeline import LazyTable + return isinstance(obj, (ibis.Table,LazyTable)) except ImportError: return False @@ -24,11 +26,14 @@ def apply(llm, data): )(llm, data) elif _is_ibis_table(data): from .ibis.map_ibis import _map_ibis - return _map_ibis( + from .ibis.lazy_pipeline import LazyTable, MapNode + map_obj = _map_ibis( prompt=prompt, output_fields=output_fields, input_fields=input_fields, - )(llm, data) + ) + map_obj.llm = llm + return LazyTable(MapNode(map_obj, data)) raise TypeError(f"Unsupported data type: {type(data)}") From df4dc11830f07d2eb1f82f42654dd3e45f55ec03 Mon Sep 17 00:00:00 2001 From: fadil4u Date: Mon, 29 Dec 2025 12:14:17 +0530 Subject: [PATCH 3/4] execute() to pandas --- datatune/core/filter.py | 7 ++++++- datatune/core/ibis/lazy_pipeline.py | 32 ++++++++++++----------------- datatune/core/map.py | 5 +++++ 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/datatune/core/filter.py b/datatune/core/filter.py index 8001638..9edd816 100644 --- a/datatune/core/filter.py +++ b/datatune/core/filter.py @@ -25,11 +25,16 @@ def apply(llm, data): elif _is_ibis_table(data): from .ibis.filter_ibis import _filter_ibis from .ibis.lazy_pipeline import LazyTable, FilterNode + filter_obj = _filter_ibis( prompt=prompt, - input_fields=input_fields, + input_fields=input_fields ) filter_obj.llm = llm + + if not isinstance(data, LazyTable): + data = LazyTable(data) + return LazyTable(FilterNode(filter_obj, data)) raise TypeError(f"Unsupported data type: {type(data)}") diff --git a/datatune/core/ibis/lazy_pipeline.py b/datatune/core/ibis/lazy_pipeline.py index a33e3b0..aa7c04a 100644 --- a/datatune/core/ibis/lazy_pipeline.py +++ b/datatune/core/ibis/lazy_pipeline.py @@ -2,33 +2,31 @@ class LazyTable: def __init__(self, plan): self.plan = plan + + def to_ibis(self): + if isinstance(self.plan, PlanNode): + return self.plan.to_ibis() + return self.plan def execute(self): - if isinstance(self.plan, PlanNode): - return self.plan.execute() - else: - return self.plan + ibis_table = self.to_ibis() + return ibis_table.execute() def show_plan(self): return repr(self.plan) class PlanNode: - def execute(self): - raise NotImplementedError("Subclasses must implement execute()") + def to_ibis(self): + raise NotImplementedError class MapNode(PlanNode): - def __init__(self, map_obj, source): self.map_obj = map_obj self.source = source - def execute(self): - if isinstance(self.source, LazyTable): - table = self.source.execute() - else: - table = self.source - + def to_ibis(self): + table = self.source.to_ibis() return self.map_obj(self.map_obj.llm, table) class FilterNode(PlanNode): @@ -36,12 +34,8 @@ def __init__(self, filter_obj, source): self.filter_obj = filter_obj self.source = source - def execute(self): - if isinstance(self.source, LazyTable): - table = self.source.execute() - else: - table = self.source - + def to_ibis(self): + table = self.source.to_ibis() return self.filter_obj(self.filter_obj.llm, table) diff --git a/datatune/core/map.py b/datatune/core/map.py index a84a5bd..910909e 100644 --- a/datatune/core/map.py +++ b/datatune/core/map.py @@ -27,12 +27,17 @@ def apply(llm, data): elif _is_ibis_table(data): from .ibis.map_ibis import _map_ibis from .ibis.lazy_pipeline import LazyTable, MapNode + map_obj = _map_ibis( prompt=prompt, output_fields=output_fields, input_fields=input_fields, ) map_obj.llm = llm + + if not isinstance(data, LazyTable): + data = LazyTable(data) + return LazyTable(MapNode(map_obj, data)) raise TypeError(f"Unsupported data type: {type(data)}") From 1731cf61c198c6946f9d867d1a481b6c662f1981 Mon Sep 17 00:00:00 2001 From: fadil4u Date: Mon, 29 Dec 2025 13:28:38 +0530 Subject: [PATCH 4/4] bug fix --- datatune/core/ibis/filter_ibis.py | 14 +++++++------- datatune/core/ibis/map_ibis.py | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/datatune/core/ibis/filter_ibis.py b/datatune/core/ibis/filter_ibis.py index da285c3..e55c0a4 100644 --- a/datatune/core/ibis/filter_ibis.py +++ b/datatune/core/ibis/filter_ibis.py @@ -104,14 +104,14 @@ def __call__(self, llm: Callable, table: Table) -> Table: self.llm = llm if self.input_fields: missing = [f for f in self.input_fields if f not in table.columns] - if missing: - error_msg = ( - f"[datatune] Schema mismatch: The following input_fields were not found: {missing}. " - f"Available columns: {list(table.columns)}" - ) - logger.error(error_msg) + if missing: + error_msg = ( + f"[datatune] Schema mismatch: The following input_fields were not found: {missing}. " + f"Available columns: {list(table.columns)}" + ) + logger.error(error_msg) - raise ValueError(error_msg) + raise ValueError(error_msg) table = add_serialized_col( table, diff --git a/datatune/core/ibis/map_ibis.py b/datatune/core/ibis/map_ibis.py index 486a629..a6e6b9c 100644 --- a/datatune/core/ibis/map_ibis.py +++ b/datatune/core/ibis/map_ibis.py @@ -124,14 +124,14 @@ def __call__(self, llm: Callable, table: Table) -> Table: self.llm = llm if self.input_fields: missing = [f for f in self.input_fields if f not in table.columns] - if missing: - error_msg = ( - f"[datatune] Schema mismatch: The following input_fields were not found: {missing}. " - f"Available columns: {list(table.columns)}" - ) - logger.error(error_msg) + if missing: + error_msg = ( + f"[datatune] Schema mismatch: The following input_fields were not found: {missing}. " + f"Available columns: {list(table.columns)}" + ) + logger.error(error_msg) - raise ValueError(error_msg) + raise ValueError(error_msg) table = add_serialized_col(