Skip to content

Commit a89378e

Browse files
committed
update design
1 parent 3484bd1 commit a89378e

2 files changed

Lines changed: 14 additions & 7 deletions

File tree

sdks/python/apache_beam/yaml/yaml_mapping.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import datetime
2121
import itertools
2222
import re
23-
from collections import abc
2423
from collections.abc import Callable
2524
from collections.abc import Collection
2625
from collections.abc import Iterable
@@ -187,7 +186,7 @@ def py_value_to_js_dict(py_value):
187186
py_value = py_value._asdict()
188187
if isinstance(py_value, dict):
189188
return {key: py_value_to_js_dict(value) for key, value in py_value.items()}
190-
elif not isinstance(py_value, str) and isinstance(py_value, abc.Iterable):
189+
elif not isinstance(py_value, str) and isinstance(py_value, Iterable):
191190
return [py_value_to_js_dict(value) for value in list(py_value)]
192191
else:
193192
return py_value
@@ -270,6 +269,14 @@ def __init__(self, fields, original_fields, input_schema):
270269
script.append(udf_code)
271270
self.field_funcs[name] = func_name
272271

272+
if self.field_funcs:
273+
aggregator_entries = ", ".join([
274+
f'"{name}": {func_name}(__row__)'
275+
for name, func_name in self.field_funcs.items()
276+
])
277+
script.append(
278+
f"var __aggregate_fn__ = (__row__) => ({{ {aggregator_entries} }});")
279+
273280
self.script = "\n".join(script) if script else None
274281

275282
def setup(self):
@@ -285,10 +292,10 @@ def process(self, element):
285292
for name, src in self.passthrough_fields:
286293
result_dict[name] = row_as_dict.get(src)
287294

288-
# Handle JS fields
289-
for name, func_name in self.field_funcs.items():
290-
res = self.ctx.call(func_name, row_as_dict)
291-
result_dict[name] = js_to_py(res)
295+
# Handle JS fields via single aggregate call
296+
if self.field_funcs:
297+
res = self.ctx.call("__aggregate_fn__", row_as_dict)
298+
result_dict.update(js_to_py(res))
292299

293300
yield dicts_to_rows(result_dict)
294301

sdks/python/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,7 @@ def get_portability_package_data():
621621
'docstring-parser>=0.15,<1.0',
622622
'jinja2>=3.0,<3.2',
623623
'virtualenv-clone>=0.5,<1.0',
624-
'py-mini-racer',
624+
'mini-racer>=0.14.1',
625625
'jsonschema>=4.0.0,<5.0.0',
626626
] + dataframe_dependency,
627627
# Keep the following dependencies in line with what we test against

0 commit comments

Comments
 (0)