Skip to content

Commit f566ba2

Browse files
mgdemersskrawcz
authored andcommitted
feat: support optional flag in from_ extractors
* Add optional flag to ExtractorFactory to skip data loader when target was not found rather than raise exceptions. * Add unit tests for optional and non-optional extractors. * Add more detailed comments and documentation note.
1 parent b3a1ec7 commit f566ba2

3 files changed

Lines changed: 49 additions & 0 deletions

File tree

docs/concepts/materialization.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ Passing ``from_`` and ``to`` Apache Hamilton objects to ``Builder().with_materia
9292
2. Observability ✅: Loaders and savers are part of the dataflow. You can view them with ``Driver.display_all_functions()`` and execute nodes by requesting them with ``Driver.execute()``.
9393
3. Flexibility ✅: The loading and saving behavior is decoupled from the dataflow and can modified easily when creating the ``Driver`` and executing code.
9494

95+
.. note::
96+
97+
``from_`` data loaders can be specified as optional with ``optional=True``. This allows specified data loaders to be skipped rather than raise an exception when not referenced by a dataflow.
9598

9699
Dynamic materializers
97100
~~~~~~~~~~~~~~~~~~~~~

hamilton/io/materialization.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ def __init__(
136136
self,
137137
target: str,
138138
loaders: list[type[DataLoader]],
139+
optional: bool = False,
139140
**data_loader_kwargs: Any | SingleDependency,
140141
):
141142
"""Instantiates an ExtractorFactory. Note this is not a public API -- this is
@@ -144,10 +145,14 @@ def __init__(
144145
145146
:param target: Parameter, into which we're loading the data
146147
:param loaders: A list of data loaders that are viable candidates, given the key after `from_`
148+
:param optional: Whether missing targets should raise exceptions or skip being loaded.
149+
Optional=True will mean that if the target does not exist in the graph, we skip injecting
150+
this into the graph.
147151
:param data_loader_kwargs: Keyword arguments for the data loaders.
148152
"""
149153
self.target = target
150154
self.loaders = loaders
155+
self.optional = optional
151156
self.data_loader_kwargs = process_kwargs(data_loader_kwargs)
152157

153158
def generate_nodes(self, fn_graph: graph.FunctionGraph) -> list[node.Node]:
@@ -163,6 +168,8 @@ def generate_nodes(self, fn_graph: graph.FunctionGraph) -> list[node.Node]:
163168
# TODO -- add some nodes to the graph
164169
node_with_target = fn_graph.nodes.get(self.target)
165170
if node_with_target is None:
171+
if self.optional:
172+
return []
166173
raise ValueError(
167174
f"Could not find node with name: {self.target} in function "
168175
f"graph. Available nodes: {list(fn_graph.nodes.keys()) + [...] if len(fn_graph.nodes) > 10 else []}"

tests/io/test_materialization.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,45 @@ def test(input_data: dict) -> dict:
156156
assert input_data_node.type == dict # From above
157157

158158

159+
def test_extractor_factory_exception_on_missing_targets():
160+
factory = ExtractorFactory(
161+
"input_data",
162+
loaders=[MockDataLoader],
163+
optional=False,
164+
)
165+
166+
def test() -> dict:
167+
return {"loaded_value": {}}
168+
169+
base_node = node.Node.from_fn(test)
170+
nodes_without_dependencies = graph.update_dependencies(
171+
{base_node.name: base_node}, lifecycle_base.LifecycleAdapterSet(base.DefaultAdapter())
172+
)
173+
fn_graph = graph.FunctionGraph(nodes_without_dependencies, {})
174+
with pytest.raises(ValueError):
175+
factory.generate_nodes(fn_graph)
176+
177+
178+
def test_extractor_factory_optional_flag_skips_missing_targets():
179+
factory = ExtractorFactory(
180+
"input_data",
181+
loaders=[MockDataLoader],
182+
optional=True,
183+
)
184+
185+
def test() -> dict:
186+
return {"loaded_value": {}}
187+
188+
base_node = node.Node.from_fn(test)
189+
nodes_without_dependencies = graph.update_dependencies(
190+
{base_node.name: base_node}, lifecycle_base.LifecycleAdapterSet(base.DefaultAdapter())
191+
)
192+
fn_graph = graph.FunctionGraph(nodes_without_dependencies, {})
193+
nodes = factory.generate_nodes(fn_graph)
194+
nodes_by_name = {node_.name: node_ for node_ in nodes}
195+
assert "input_data" not in nodes_by_name
196+
197+
159198
def test_materializer_factory_generates_nodes_with_builder():
160199
factory = MaterializerFactory(
161200
"test_materializer",

0 commit comments

Comments
 (0)