diff --git a/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json b/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json index 541dc4ea8e87..8ed972c9f579 100644 --- a/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "revision": 2 + "revision": 3 } diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index a6b2b5704751..95b3d10d577d 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -16,8 +16,12 @@ # """This module defines the basic MapToFields operation.""" + +import datetime import itertools import re +import threading +import uuid from collections import abc from collections.abc import Callable from collections.abc import Collection @@ -53,13 +57,13 @@ from apache_beam.yaml.yaml_errors import maybe_with_exception_handling_transform_fn from apache_beam.yaml.yaml_provider import dicts_to_rows -# Import js2py package if it exists try: - import js2py - from js2py.base import JsObjectWrapper + from py_mini_racer import MiniRacer except ImportError: - js2py = None - JsObjectWrapper = object + MiniRacer = None + +_JS_DATE_ISO_REGEX = re.compile( + r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$') _str_expression_fields = { 'AssignTimestamps': 'timestamp', @@ -178,20 +182,6 @@ def _check_mapping_arguments( raise ValueError(f'{transform_name} cannot specify "name" without "path"') -# js2py's JsObjectWrapper object has a self-referencing __dict__ property -# that cannot be pickled without implementing the __getstate__ and -# __setstate__ methods. -class _CustomJsObjectWrapper(JsObjectWrapper): - def __init__(self, js_obj): - super().__init__(js_obj.__dict__['_obj']) - - def __getstate__(self): - return self.__dict__.copy() - - def __setstate__(self, state): - self.__dict__.update(state) - - # TODO(yaml) Improve type inferencing for JS UDF's def py_value_to_js_dict(py_value): if ((isinstance(py_value, tuple) and hasattr(py_value, '_asdict')) or @@ -205,85 +195,138 @@ def py_value_to_js_dict(py_value): return py_value -# TODO(yaml) Consider adding optional language version parameter to support -# ECMAScript 5 and 6 -def _expand_javascript_mapping_func( - original_fields, expression=None, callable=None, path=None, name=None): +def js_to_py(obj): + """Converts mini-racer mapped objects to standard Python types. + + This is needed because ctx.eval returns objects that implement Mapping + and Iterable but are not picklable (like JSMappedObjectImpl and JSArrayImpl), + which would fail when Beam tries to serialize rows containing them. + We also preserve datetime objects which are correctly produced by ctx.eval + for JS Date objects. + """ + if isinstance(obj, datetime.datetime): + return obj + elif isinstance(obj, Mapping): + return {k: js_to_py(v) for k, v in obj.items()} + elif not isinstance(obj, (str, bytes)) and isinstance(obj, Iterable): + return [js_to_py(v) for v in obj] + elif isinstance(obj, str): + if _JS_DATE_ISO_REGEX.match(obj): + try: + return datetime.datetime.fromisoformat(obj[:-1] + '+00:00') + except ValueError: + return obj + return obj + else: + return obj - # Check for installed js2py package - if js2py is None: - raise ValueError( - "Javascript mapping functions are not supported on" - " Python 3.12 or later.") - - # import remaining js2py objects - from js2py import base - from js2py.constructors import jsdate - from js2py.internals import simplex - - js_array_type = ( - base.PyJsArray, - base.PyJsArrayBuffer, - base.PyJsInt8Array, - base.PyJsUint8Array, - base.PyJsUint8ClampedArray, - base.PyJsInt16Array, - base.PyJsUint16Array, - base.PyJsInt32Array, - base.PyJsUint32Array, - base.PyJsFloat32Array, - base.PyJsFloat64Array) - - def _js_object_to_py_object(obj): - if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)): - return base.to_python(obj) - elif isinstance(obj, js_array_type): - return [_js_object_to_py_object(value) for value in obj.to_list()] - elif isinstance(obj, jsdate.PyJsDate): - return obj.to_utc_dt() - elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)): - return None - elif isinstance(obj, base.PyJsError): - raise RuntimeError(obj['message']) - elif isinstance(obj, base.PyJsObject): - return { - key: _js_object_to_py_object(value['value']) - for (key, value) in obj.own.items() - } - elif isinstance(obj, base.JsObjectWrapper): - return _js_object_to_py_object(obj._obj) - return obj +class JsFilterDoFn(beam.DoFn): + def __init__(self, udf_code, function_name): + self.udf_code = udf_code + self.function_name = function_name + self.ctx = None + + def setup(self): + self.ctx = MiniRacer() + self.ctx.eval(self.udf_code) + + def process(self, element): + row_as_dict = py_value_to_js_dict(element) + result = self.ctx.call(self.function_name, row_as_dict) + result = js_to_py(result) + if result: + yield element + + +class JsMapToFieldsDoFn(beam.DoFn): + def __init__(self, fields, original_fields, input_schema): + self.fields = fields + self.original_fields = original_fields + self.input_schema = input_schema + self.ctx = None + self.field_funcs = {} + self.passthrough_fields = [] + + def setup(self): + self.ctx = MiniRacer() + script = [] + for name, expr in self.fields.items(): + if isinstance(expr, str) and expr in self.input_schema: + self.passthrough_fields.append((name, expr)) + continue + + if isinstance(expr, str): + expr = {'expression': expr} + + if 'expression' in expr: + e = expr['expression'] + code = f"var func_{name} = (__row__) => {{ " + " ".join([ + f"const {n} = __row__.{n};" for n in self.original_fields if n in e + ]) + f" return ({e}); }}" + script.append(code) + self.field_funcs[name] = f"func_{name}" + elif 'callable' in expr: + code = f"var func_{name} = {expr['callable']}" + script.append(code) + self.field_funcs[name] = f"func_{name}" + elif 'path' in expr and 'name' in expr: + path = expr['path'] + func_name = expr['name'] + udf_code = FileSystems.open(path).read().decode() + script.append(udf_code) + self.field_funcs[name] = func_name + + if script: + self.ctx.eval("\n".join(script)) + + def process(self, element): + row_as_dict = py_value_to_js_dict(element) + result_dict = {} + + # Handle passthrough fields + for name, src in self.passthrough_fields: + result_dict[name] = row_as_dict.get(src) + + # Handle JS fields + for name, func_name in self.field_funcs.items(): + res = self.ctx.call(func_name, row_as_dict) + result_dict[name] = js_to_py(res) + + yield dicts_to_rows(result_dict) - if expression: - source = '\n'.join(['function(__row__) {'] + [ - f' {name} = __row__.{name}' - for name in original_fields if name in expression - ] + [' return (' + expression + ')'] + ['}']) - js_func = _CustomJsObjectWrapper(js2py.eval_js(source)) - elif callable: - js_func = _CustomJsObjectWrapper(js2py.eval_js(callable)) +# TODO(yaml) Consider adding optional language version parameter to support +# ECMAScript 5 and 6 +def _get_javascript_udf_code( + original_fields, + function_name="func", + expression=None, + callable=None, + path=None, + name=None): + + if MiniRacer is None: + raise ValueError( + "JavaScript mapping functions require the 'mini-racer' package to be installed." + ) - else: + udf_code = None + if path: if not path.endswith('.js'): raise ValueError(f'File "{path}" is not a valid .js file.') udf_code = FileSystems.open(path).read().decode() - js = js2py.EvalJs() - js.eval(udf_code) - js_func = _CustomJsObjectWrapper(getattr(js, name)) - - def js_wrapper(row): - row_as_dict = py_value_to_js_dict(row) - try: - js_result = js_func(row_as_dict) - except simplex.JsException as exn: - raise RuntimeError( - f"Error evaluating javascript expression: " - f"{exn.mes['message']}") from exn - return dicts_to_rows(_js_object_to_py_object(js_result)) - - return js_wrapper + return udf_code, name + elif expression: + udf_code = f"var {function_name} = (__row__) => {{ " + " ".join([ + f"const {n} = __row__.{n};" for n in original_fields if n in expression + ]) + f" return ({expression}); }}" + return udf_code, function_name + elif callable: + udf_code = f"var {function_name} = {callable}" + return udf_code, function_name + else: + raise ValueError("Must specify expression, callable, or path.") def _expand_python_mapping_func( @@ -390,14 +433,10 @@ def _as_callable(original_fields, expr, transform_name, language, input_schema): explicit_type = expr.pop('output_type', None) _check_mapping_arguments(transform_name, **expr) - if language == "javascript": - func = _expand_javascript_mapping_func(original_fields, **expr) - elif language in ("python", "generic", None): + if language in ("python", "generic", None): func = _expand_python_mapping_func(original_fields, **expr) else: - raise ValueError( - f'Unknown language for mapping transform: {language}. ' - 'Supported languages are "javascript" and "python."') + raise ValueError(f'Language {language} not supported in this context.') if explicit_type: if isinstance(explicit_type, str): @@ -636,8 +675,17 @@ def _PyJsFilter( error_handling: Whether and where to output records that throw errors when the above expressions are evaluated. """ # pylint: disable=line-too-long - keep_fn = _as_callable_for_pcoll(pcoll, keep, "keep", language or 'generic') - return pcoll | beam.Filter(keep_fn) + if language == 'javascript': + if isinstance(keep, str): + keep = {'expression': keep} + udf_code, function_name = _get_javascript_udf_code( + [f.name for f in schema_from_element_type(pcoll.element_type).fields], + **keep + ) + return pcoll | beam.ParDo(JsFilterDoFn(udf_code, function_name)) + else: + keep_fn = _as_callable_for_pcoll(pcoll, keep, "keep", language or 'generic') + return pcoll | beam.Filter(keep_fn) def is_expr(v): @@ -709,10 +757,12 @@ def _PyJsMapToFields( """ # pylint: disable=line-too-long input_schema, fields = normalize_fields( pcoll, fields, drop or (), append, language=language or 'generic') + original_fields = list(input_schema.keys()) + if language == 'javascript': options.YamlOptions.check_enabled(pcoll.pipeline, 'javascript') - - original_fields = list(input_schema.keys()) + return pcoll | beam.ParDo( + JsMapToFieldsDoFn(fields, original_fields, input_schema)) return pcoll | beam.Select( **{ diff --git a/sdks/python/apache_beam/yaml/yaml_udf_test.py b/sdks/python/apache_beam/yaml/yaml_udf_test.py index 3d664ab9de41..33d62fc7d80b 100644 --- a/sdks/python/apache_beam/yaml/yaml_udf_test.py +++ b/sdks/python/apache_beam/yaml/yaml_udf_test.py @@ -32,10 +32,10 @@ from apache_beam.yaml.yaml_transform import YamlTransform try: - import js2py + from py_mini_racer import MiniRacer except ImportError: - js2py = None - logging.warning('js2py is not installed; some tests will be skipped.') + MiniRacer = None + logging.warning('py_mini_racer is not installed; some tests will be skipped.') def as_rows(): @@ -63,7 +63,7 @@ def setUp(self): def tearDown(self): shutil.rmtree(self.tmpdir) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(MiniRacer is None, 'py_mini_racer not installed.') def test_map_to_fields_filter_inline_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -197,7 +197,7 @@ def test_map_to_fields_sql_reserved_keyword_append(): beam.Row(label='389a', timestamp=2, label_copy="389a"), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(MiniRacer is None, 'py_mini_racer not installed.') def test_filter_inline_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -252,7 +252,7 @@ def test_filter_inline_py(self): row=beam.Row(rank=2, values=[7, 8, 9])), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(MiniRacer is None, 'py_mini_racer not installed.') def test_filter_expression_js(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle', yaml_experimental_features=['javascript' @@ -296,7 +296,7 @@ def test_filter_expression_py(self): row=beam.Row(rank=0, values=[1, 2, 3])), ])) - @unittest.skipIf(js2py is None, 'js2py not installed.') + @unittest.skipIf(MiniRacer is None, 'py_mini_racer not installed.') def test_filter_inline_js_file(self): data = ''' function f(x) { @@ -374,6 +374,31 @@ def g(x): row=beam.Row(rank=2, values=[7, 8, 9])), ])) + @unittest.skipIf(MiniRacer is None, 'py_mini_racer not installed.') + def test_map_to_fields_js_date(self): + import datetime + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle', yaml_experimental_features=['javascript' + ])) as p: + elements = p | beam.Create([beam.Row(label='11a')]) + result = elements | YamlTransform( + ''' + type: MapToFields + config: + language: javascript + fields: + date: + callable: | + function get_date(x) { + return new Date('2026-04-17T18:00:00Z') + } + ''') + + expected_date = datetime.datetime( + 2026, 4, 17, 18, tzinfo=datetime.timezone.utc) + + assert_that(result | as_rows(), equal_to([beam.Row(date=expected_date)])) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index c5cd9145338c..d771d609d9d7 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -621,8 +621,7 @@ def get_portability_package_data(): 'docstring-parser>=0.15,<1.0', 'jinja2>=3.0,<3.2', 'virtualenv-clone>=0.5,<1.0', - # https://github.com/PiotrDabkowski/Js2Py/issues/317 - 'js2py>=0.74,<1; python_version<"3.12"', + 'mini-racer', 'jsonschema>=4.0.0,<5.0.0', ] + dataframe_dependency, # Keep the following dependencies in line with what we test against