Skip to content

Commit f4c5cdb

Browse files
author
pt10597
committed
Add reusable narwhals pipeline abstractions
Introduce ccflow.models.narwhals providing: - NarwhalsFrameTransform: pure LazyFrame -> LazyFrame transform base class. Framework-agnostic; usable standalone via lf.pipe(transform). - SequenceTransform: bundles a strict list of transforms; itself a NarwhalsFrameTransform so it nests and JSON-roundtrips. - NarwhalsPipelineModel: CallableModel that pipes a NarwhalsFrameResult source through a list of transforms. Delegates context_type to the source. Output is always a narwhals.LazyFrame (lazy contract enforced by re-coercing after every stage). Supports loose Callable transforms at runtime (strict NarwhalsFrameTransform required for serialization). - JoinTransform: joins another CallableModel's frame onto the input. Other source invoked with NullContext. - JoinBackTransform: runs an inner transform on the input and joins the result back -- for fork/rejoin patterns where window functions do not fit. All classes are pydantic models, JSON-serializable, and integrate with ccflow's graph evaluator via explicit __deps__. Includes 28 unit tests covering base contracts, JSON roundtrip, lazy enforcement, dependency injection, multi-source enrichment, and confluence (pipeline as source of another pipeline). Signed-off-by: Pascal Tomecek <pascal.tomecek@cubistsystematic.com>
1 parent e2ef462 commit f4c5cdb

3 files changed

Lines changed: 535 additions & 0 deletions

File tree

ccflow/models/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1+
from .narwhals import *
12
from .publisher import *

ccflow/models/narwhals.py

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
"""Reusable narwhals pipeline abstractions for ccflow.
2+
3+
This module provides three layers of composition for building data-frame pipelines on top of
4+
`narwhals <https://narwhals-dev.github.io/narwhals/>`_:
5+
6+
1. :class:`NarwhalsFrameTransform` -- a pure ``LazyFrame -> LazyFrame`` step. Framework-agnostic;
7+
usable standalone via ``lf.pipe(transform)`` without any other ccflow machinery.
8+
2. :class:`SequenceTransform` -- a transform that bundles several transforms together. Itself a
9+
:class:`NarwhalsFrameTransform`, so it nests and composes via ``.pipe()``.
10+
3. :class:`NarwhalsPipelineModel` -- a :class:`~ccflow.CallableModel` that wires a
11+
:class:`~ccflow.NarwhalsFrameResult`-returning source to a list of transforms, producing a new
12+
:class:`~ccflow.NarwhalsFrameResult`. Its :attr:`~NarwhalsPipelineModel.context_type` is
13+
delegated to the source, so the pipeline transparently adopts the source's context type.
14+
15+
Two generic transform implementations are also shipped:
16+
17+
- :class:`JoinTransform` -- joins another callable model's frame onto the input frame.
18+
- :class:`JoinBackTransform` -- runs an inner transform on the input, joins the result back.
19+
20+
Together these are sufficient for a wide range of multi-source enrichment pipelines while
21+
keeping the linear ``.pipe()`` contract that makes the rest of the design composable.
22+
"""
23+
24+
from typing import Callable, List, Type, Union
25+
26+
import narwhals.stable.v1 as nw
27+
from pydantic import Field, SerializeAsAny, model_validator
28+
29+
from ..base import BaseModel
30+
from ..callable import CallableModel, Flow, GraphDepList
31+
from ..context import ContextBase, NullContext
32+
from ..result.narwhals import NarwhalsFrameResult
33+
34+
__all__ = (
35+
"NarwhalsFrameTransform",
36+
"SequenceTransform",
37+
"NarwhalsPipelineModel",
38+
"JoinTransform",
39+
"JoinBackTransform",
40+
)
41+
42+
43+
class NarwhalsFrameTransform(BaseModel):
44+
"""Base class for a pure ``narwhals.LazyFrame -> narwhals.LazyFrame`` transform.
45+
46+
Subclasses configure their behavior via pydantic fields and implement :meth:`__call__`.
47+
Instances are callable (``transform(lf)``), which makes them directly usable as the argument
48+
to :py:meth:`narwhals.LazyFrame.pipe`::
49+
50+
class MultiplyColumn(NarwhalsFrameTransform):
51+
col: str
52+
factor: float
53+
54+
def __call__(self, df):
55+
return df.with_columns(nw.col(self.col) * self.factor)
56+
57+
out = lf.pipe(MultiplyColumn(col="x", factor=2.0))
58+
59+
No ccflow machinery (sources, contexts, evaluators) is required to use a transform on its
60+
own -- it is just a configurable, JSON-serializable function on lazy frames.
61+
"""
62+
63+
def __call__(self, df: nw.LazyFrame) -> nw.LazyFrame:
64+
raise NotImplementedError
65+
66+
67+
class SequenceTransform(NarwhalsFrameTransform):
68+
"""Compose a list of :class:`NarwhalsFrameTransform` instances into a single transform.
69+
70+
The transforms are applied in order via :py:meth:`narwhals.LazyFrame.pipe`. ``SequenceTransform``
71+
is itself a :class:`NarwhalsFrameTransform`, so it can be nested inside other sequences and
72+
used directly as a ``.pipe()`` argument.
73+
74+
Because the ``transforms`` field is typed strictly as a list of :class:`NarwhalsFrameTransform`,
75+
a ``SequenceTransform`` is always JSON-roundtrippable.
76+
"""
77+
78+
transforms: List[NarwhalsFrameTransform] = Field(
79+
default_factory=list,
80+
description="Transforms applied in order via `.pipe()`.",
81+
)
82+
83+
def __call__(self, df: nw.LazyFrame) -> nw.LazyFrame:
84+
for t in self.transforms:
85+
df = df.pipe(t)
86+
return df
87+
88+
89+
# A loose union type used by NarwhalsPipelineModel.transforms. We deliberately put
90+
# NarwhalsFrameTransform first in the union so pydantic prefers the BaseModel branch over
91+
# the duck-typed Callable branch (NarwhalsFrameTransform instances are themselves callable).
92+
NarwhalsFrameTransformOrCallable = Union[NarwhalsFrameTransform, Callable[[nw.LazyFrame], nw.LazyFrame]]
93+
94+
95+
def _coerce_lazy(df) -> nw.LazyFrame:
96+
"""Coerce any narwhals frame to a LazyFrame, accepting native frames as well.
97+
98+
The pipeline contract is "always lazy" -- transforms returning eager frames or native objects
99+
are normalized so that subsequent stages see a ``narwhals.LazyFrame``.
100+
"""
101+
if isinstance(df, nw.LazyFrame):
102+
return df
103+
if isinstance(df, nw.DataFrame):
104+
return df.lazy()
105+
# Fall back to nw.from_native to handle native frames returned by user-supplied callables.
106+
return nw.from_native(df).lazy()
107+
108+
109+
class NarwhalsPipelineModel(CallableModel):
110+
"""A callable model that pipes the output of a source through a list of transforms.
111+
112+
The pipeline is itself a :class:`~ccflow.CallableModel` returning :class:`~ccflow.NarwhalsFrameResult`.
113+
Its :attr:`context_type` is delegated to the source -- so the pipeline transparently adopts
114+
whatever context type the source uses, without requiring users to parameterize a generic class.
115+
116+
The output frame is always a ``narwhals.LazyFrame``; users that need an eager result should
117+
call ``.collect()`` on the returned :class:`~ccflow.NarwhalsFrameResult`.
118+
119+
Parameters
120+
----------
121+
source:
122+
A :class:`~ccflow.CallableModel` returning a :class:`~ccflow.NarwhalsFrameResult`. The
123+
pipeline's :attr:`context_type` is delegated to the source's.
124+
``SerializeAsAny`` is applied explicitly because ccflow's metaclass does not unwrap
125+
generic-aliased ``BaseModel`` fields, and we want JSON roundtrip to preserve subclass info.
126+
transforms:
127+
A list of :class:`NarwhalsFrameTransform` instances or plain callables of one
128+
``LazyFrame`` argument. Plain callables are accepted at runtime but cannot be JSON
129+
serialized; pipelines that need full serialization should use only
130+
:class:`NarwhalsFrameTransform` instances.
131+
"""
132+
133+
source: SerializeAsAny[CallableModel] = Field(
134+
...,
135+
description="Upstream callable model that produces a NarwhalsFrameResult.",
136+
)
137+
transforms: List[NarwhalsFrameTransformOrCallable] = Field(
138+
default_factory=list,
139+
description="Transforms (or plain callables) applied in order via `.pipe()`.",
140+
)
141+
142+
@model_validator(mode="after")
143+
def _validate_source_result_type(self) -> "NarwhalsPipelineModel":
144+
rt = self.source.result_type
145+
if not (isinstance(rt, type) and issubclass(rt, NarwhalsFrameResult)):
146+
raise ValueError(f"NarwhalsPipelineModel source must return NarwhalsFrameResult (or subclass); got source with result_type={rt!r}.")
147+
return self
148+
149+
@property
150+
def context_type(self) -> Type[ContextBase]:
151+
"""Delegate context type to the source so the pipeline adopts its context."""
152+
return self.source.context_type
153+
154+
@property
155+
def result_type(self) -> Type[NarwhalsFrameResult]:
156+
return NarwhalsFrameResult
157+
158+
@Flow.call
159+
def __call__(self, context) -> NarwhalsFrameResult:
160+
df = _coerce_lazy(self.source(context).df)
161+
for t in self.transforms:
162+
df = _coerce_lazy(df.pipe(t))
163+
return NarwhalsFrameResult(df=df)
164+
165+
@Flow.deps
166+
def __deps__(self, context) -> GraphDepList:
167+
# Surface the source so that GraphEvaluator can see this edge. Transforms that themselves
168+
# invoke other CallableModels (e.g. JoinTransform) are NOT surfaced here -- multi-source
169+
# graph awareness is intentionally out of scope for v1; see module docstring.
170+
return [(self.source, [context])]
171+
172+
173+
class JoinTransform(NarwhalsFrameTransform):
174+
"""Join another callable model's frame onto the input frame.
175+
176+
The ``other`` model is invoked with :class:`~ccflow.NullContext` -- this transform is for the
177+
common case where the secondary input is independent of any pipeline-level context. Pipelines
178+
needing context-dependent secondary sources should compose at the
179+
:class:`~ccflow.CallableModel` graph layer instead.
180+
181+
Parameters mirror :py:meth:`narwhals.LazyFrame.join`. The ``other`` model is expected to
182+
return a :class:`~ccflow.NarwhalsFrameResult`; this is enforced at construction time.
183+
"""
184+
185+
other: SerializeAsAny[CallableModel] = Field(
186+
...,
187+
description="Callable model producing the right-hand frame to join.",
188+
)
189+
on: Union[str, List[str]] = Field(..., description="Column name(s) used as the join key.")
190+
how: str = Field("left", description="Join strategy: 'inner', 'left', 'right', 'full', 'cross', 'semi', 'anti'.")
191+
suffix: str = Field("_right", description="Suffix appended to overlapping column names from the right frame.")
192+
193+
@model_validator(mode="after")
194+
def _validate_other_result_type(self) -> "JoinTransform":
195+
rt = self.other.result_type
196+
if not (isinstance(rt, type) and issubclass(rt, NarwhalsFrameResult)):
197+
raise ValueError(f"JoinTransform.other must return NarwhalsFrameResult (or subclass); got other with result_type={rt!r}.")
198+
# NullContext-only invocation is by design for v1; verify the source can accept it.
199+
ct = self.other.context_type
200+
if not (isinstance(ct, type) and issubclass(NullContext, ct)):
201+
raise ValueError(f"JoinTransform.other must accept a NullContext (or supertype); got other with context_type={ct!r}.")
202+
return self
203+
204+
def __call__(self, df: nw.LazyFrame) -> nw.LazyFrame:
205+
right = _coerce_lazy(self.other(NullContext()).df)
206+
return df.join(right, on=self.on, how=self.how, suffix=self.suffix)
207+
208+
209+
class JoinBackTransform(NarwhalsFrameTransform):
210+
"""Run an inner transform on the input frame and join its result back to the input.
211+
212+
Useful for "fork-and-rejoin" patterns where a per-group summary needs to be enriched onto the
213+
original rows but window functions (``.over()``) don't fit -- e.g. when the summary changes
214+
row count, or aggregates a derived projection.
215+
216+
For straightforward per-group statistics, prefer ``df.with_columns(expr.over(...))`` since it
217+
expresses intent more directly. ``JoinBackTransform`` is the right tool when the summary has
218+
a different shape from the input.
219+
"""
220+
221+
inner: NarwhalsFrameTransform = Field(
222+
...,
223+
description="Transform applied to the input to produce the right-hand side of the join.",
224+
)
225+
on: Union[str, List[str]] = Field(..., description="Column name(s) used as the join key.")
226+
how: str = Field("left", description="Join strategy.")
227+
suffix: str = Field("_right", description="Suffix appended to overlapping column names from the right frame.")
228+
229+
def __call__(self, df: nw.LazyFrame) -> nw.LazyFrame:
230+
right = _coerce_lazy(df.pipe(self.inner))
231+
return df.join(right, on=self.on, how=self.how, suffix=self.suffix)

0 commit comments

Comments
 (0)