Skip to content

Commit 7c19072

Browse files
Remove the custom Workflow transform execution for serving (#354)
* Remove the custom `Workflow` transform execution for serving This also reworks `match_representation` to handle some cases that start to crop up when switching to the executors from Core re: ragged list features and whether values/offsets are expected or not. * Formatting changes * Add validation for schema and data not agreeing on raggedness * Disable custom C++ implementations of NVT operators (for now) * Update workflow test assertions to match the new `Categorify` encoding --------- Co-authored-by: Oliver Holworthy <oholworthy@nvidia.com>
1 parent f307dc2 commit 7c19072

3 files changed

Lines changed: 64 additions & 105 deletions

File tree

merlin/systems/triton/conversions.py

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import numpy as np
3131
import pandas as pd
3232

33+
import merlin.dtypes as md
3334
from merlin.core.compat import cudf
3435
from merlin.core.compat import cupy as cp
3536
from merlin.core.dispatch import build_cudf_list_column, is_list_dtype
@@ -85,10 +86,12 @@ def match_representations(schema: Schema, dict_array: Dict[str, Any]) -> Dict[st
8586
"""
8687
aligned = {}
8788
for col_name, col_schema in schema.column_schemas.items():
88-
if col_schema.is_ragged:
89-
vals_name = f"{col_name}__values"
90-
offs_name = f"{col_name}__offsets"
89+
dtype = col_schema.dtype
90+
91+
vals_name = f"{col_name}__values"
92+
offs_name = f"{col_name}__offsets"
9193

94+
if col_schema.is_ragged:
9295
try:
9396
# Look for values and offsets that already exist
9497
aligned[vals_name] = dict_array[vals_name]
@@ -98,12 +101,40 @@ def match_representations(schema: Schema, dict_array: Dict[str, Any]) -> Dict[st
98101
values, offsets = _to_values_offsets(dict_array[col_name])
99102
aligned[vals_name] = values
100103
aligned[offs_name] = offsets
104+
105+
if dtype != md.unknown:
106+
aligned[vals_name] = aligned[vals_name].astype(dtype.to_numpy)
101107
else:
102-
aligned[col_name] = dict_array[col_name]
108+
try:
109+
# Look for values and offsets that already exist,
110+
# then reshape accordingly
111+
aligned[col_name] = _from_values_offsets(
112+
dict_array[vals_name], dict_array[offs_name], col_schema.shape
113+
)
114+
except KeyError:
115+
# If you don't find them, just use the values
116+
aligned[col_name] = dict_array[col_name]
117+
118+
if dtype != md.unknown:
119+
aligned[col_name] = aligned[col_name].astype(dtype.to_numpy)
103120

104121
return aligned
105122

106123

124+
def _from_values_offsets(values, offsets, shape):
125+
new_shape = [-1]
126+
new_shape.extend(shape.as_tuple[1:])
127+
128+
row_lengths = offsets[1:] - offsets[:-1]
129+
if not all(row_lengths == row_lengths[0]):
130+
raise ValueError(
131+
"Attempted to convert values/offsets representation of list column "
132+
"to values-only representation when row lengths were not equal."
133+
)
134+
135+
return values.reshape(new_shape)
136+
137+
107138
def _to_values_offsets(array):
108139
"""Convert array to values/offsets representation
109140
@@ -306,7 +337,10 @@ def convert_format(tensors, kind, target_kind):
306337
elif kind == Supports.CPU_DICT_ARRAY:
307338
return _array_to_pandas(tensors), Supports.CPU_DATAFRAME
308339
elif kind == Supports.GPU_DICT_ARRAY:
309-
return _array_to_pandas(_convert_array(tensors, cp.asnumpy)), Supports.CPU_DATAFRAME
340+
return (
341+
_array_to_pandas(_convert_array(tensors, cp.asnumpy)),
342+
Supports.CPU_DATAFRAME,
343+
)
310344

311345
raise ValueError("unsupported target for converting tensors", target_kind)
312346

merlin/systems/workflow/base.py

Lines changed: 12 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
import json
2929
import logging
3030

31-
from merlin.core.dispatch import concat_columns
32-
from merlin.dag import ColumnSelector, Supports
31+
from merlin.dag import ColumnSelector, DataFormats, Supports
32+
from merlin.dag.executors import LocalExecutor, _convert_format, _data_format
3333
from merlin.schema import Tags
34-
from merlin.systems.triton.conversions import convert_format, match_representations
34+
from merlin.systems.triton.conversions import match_representations
3535
from merlin.table import TensorTable
3636

3737
LOG = logging.getLogger("merlin-systems")
@@ -65,8 +65,10 @@ def __init__(self, workflow, output_dtypes, model_config, model_device):
6565
f"The following columns were not found in the workflow's output: {missing_cols}"
6666
)
6767

68-
# recurse over all column groups, initializing operators for inference pipeline
69-
self._initialize_ops(self.workflow.output_node)
68+
# recurse over all column groups, initializing operators for inference pipeline.
69+
# (disabled for now while we sort out whether and how we want to use C++ implementations
70+
# of NVTabular operators for performance optimization)
71+
# self._initialize_ops(self.workflow.output_node)
7072

7173
def _initialize_ops(self, workflow_node, visited=None):
7274
if visited is None:
@@ -97,98 +99,13 @@ def _initialize_ops(self, workflow_node, visited=None):
9799
self._initialize_ops(parent, visited)
98100

99101
def run_workflow(self, input_tensors):
100-
# use our NVTabular workflow to transform the dataset
101-
transformed, kind = self._transform_tensors(input_tensors, self.workflow.output_node)
102-
103-
# if we don't have tensors in numpy format, convert back so that the we can return
104-
# to triton
105-
if kind != Supports.CPU_DICT_ARRAY:
106-
transformed, kind = convert_format(transformed, kind, Supports.CPU_DICT_ARRAY)
107-
108-
transformed = TensorTable(transformed).to_dict()
109-
output_dict = match_representations(self.workflow.output_schema, transformed)
110-
111-
for key, value in output_dict.items():
112-
output_dict[key] = value.astype(self.output_dtypes[key])
113-
114-
return output_dict
115-
116-
def _transform_tensors(self, input_tensors, workflow_node):
117-
upstream_inputs = []
118-
119-
# Gather inputs from the parents and dependency nodes
120-
if workflow_node.parents_with_dependencies:
121-
for parent in workflow_node.parents_with_dependencies:
122-
upstream_tensors, upstream_kind = self._transform_tensors(input_tensors, parent)
123-
if upstream_tensors is not None and upstream_kind:
124-
upstream_inputs.append((upstream_tensors, upstream_kind))
125-
126-
# Gather additional input columns from the original input tensors
127-
if workflow_node.selector:
128-
selector_columns = workflow_node.selector.names
129-
to_remove = []
130-
for upstream_tensors, upstream_kind in upstream_inputs:
131-
for col in selector_columns:
132-
if col in upstream_tensors:
133-
to_remove.append(col)
134-
for col in set(to_remove):
135-
selector_columns.remove(col)
136-
137-
if selector_columns:
138-
selected_tensors = {c: input_tensors[c] for c in selector_columns}
139-
selected_kinds = Supports.CPU_DICT_ARRAY
140-
upstream_inputs.append((selected_tensors, selected_kinds))
141-
142-
# Standardize the formats
143-
tensors, kind = None, None
144-
for upstream_tensors, upstream_kind in upstream_inputs:
145-
if tensors is None:
146-
tensors, kind = upstream_tensors, upstream_kind
147-
else:
148-
if kind != upstream_kind:
149-
# we have multiple different kinds of data here (dataframe/array on cpu/gpu)
150-
# we need to convert to a common format here first before concatenating.
151-
op = workflow_node.op
152-
if op and hasattr(op, "inference_supports"):
153-
target_kind = op.inference_supports
154-
else:
155-
target_kind = Supports.CPU_DICT_ARRAY
156-
# note : the 2nd convert_format call needs to be stricter in what the kind is
157-
# (exact match rather than a bitmask of values)
158-
tensors, kind = convert_format(tensors, kind, target_kind)
159-
upstream_tensors, _ = convert_format(upstream_tensors, upstream_kind, kind)
160-
161-
tensors = self.concat_tensors([tensors, upstream_tensors], kind)
162-
163-
# Run the transform
164-
if tensors is not None and kind and workflow_node.op:
165-
try:
166-
# if the op doesn't support the current kind - we need to convert
167-
if (
168-
hasattr(workflow_node, "inference_supports")
169-
and not workflow_node.inference_supports & kind
170-
):
171-
tensors, kind = convert_format(tensors, kind, workflow_node.inference_supports)
172-
173-
tensors = workflow_node.op.transform(
174-
workflow_node.input_columns,
175-
tensors,
176-
)
177-
178-
except Exception:
179-
LOG.exception("Failed to transform operator %s", workflow_node.op)
180-
raise
102+
transformable = TensorTable(input_tensors).to_df()
103+
transformed = LocalExecutor().transform(transformable, self.workflow.graph)
181104

182-
return tensors, kind
105+
if _data_format(transformed) != DataFormats.NUMPY_DICT_ARRAY:
106+
transformed = _convert_format(transformed, DataFormats.NUMPY_DICT_ARRAY)
183107

184-
def concat_tensors(self, tensors, kind):
185-
if kind & (Supports.GPU_DATAFRAME | Supports.CPU_DATAFRAME):
186-
return concat_columns(tensors)
187-
else:
188-
output = tensors[0]
189-
for tensor in tensors[1:]:
190-
output.update(tensor)
191-
return output
108+
return match_representations(self.workflow.output_schema, transformed)
192109

193110
def _get_param(self, config, *args, default=None):
194111
config_element = config["parameters"]

tests/unit/systems/dag/runtimes/triton/ops/workflow/test_ensemble.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ def test_workflow_with_ragged_input_and_output(tmpdir):
269269
"x__offsets": np.array([0, 1], dtype="int32"),
270270
},
271271
{
272-
"x__values": np.array([1], dtype="int64"),
272+
"x__values": np.array([3], dtype="int64"),
273273
"x__offsets": np.array([0, 1], dtype="int32"),
274274
},
275275
),
@@ -279,7 +279,7 @@ def test_workflow_with_ragged_input_and_output(tmpdir):
279279
"x__offsets": np.array([0, 1, 2], dtype="int32"),
280280
},
281281
{
282-
"x__values": np.array([1, 2], dtype="int64"),
282+
"x__values": np.array([3, 4], dtype="int64"),
283283
"x__offsets": np.array([0, 1, 2], dtype="int32"),
284284
},
285285
),
@@ -289,7 +289,7 @@ def test_workflow_with_ragged_input_and_output(tmpdir):
289289
"x__offsets": np.array([0, 2, 3], dtype="int32"),
290290
},
291291
{
292-
"x__values": np.array([1, 2, 3], dtype="int64"),
292+
"x__values": np.array([3, 4, 5], dtype="int64"),
293293
"x__offsets": np.array([0, 2, 3], dtype="int32"),
294294
},
295295
),
@@ -298,7 +298,11 @@ def test_workflow_with_ragged_input_and_output(tmpdir):
298298
input_table = TensorTable(request_dict)
299299
output_names = ["x__values", "x__offsets"]
300300
response = send_triton_request(
301-
schema, input_table, output_names, client=client, triton_model=model_name
301+
schema,
302+
input_table,
303+
output_names,
304+
client=client,
305+
triton_model=model_name,
302306
)
303307
for key, value in expected_response.items():
304308
np.testing.assert_array_equal(response[key], value)
@@ -362,7 +366,11 @@ def test_workflow_dtypes(tmpdir):
362366
input_table = TensorTable(request_dict)
363367
output_names = ["a__values", "a__offsets", "b"]
364368
response = send_triton_request(
365-
schema, input_table, output_names, client=client, triton_model=model_name
369+
schema,
370+
input_table,
371+
output_names,
372+
client=client,
373+
triton_model=model_name,
366374
)
367375
for key, value in expected_response.items():
368376
np.testing.assert_array_equal(response[key], value)

0 commit comments

Comments
 (0)