-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[wip] - js2py to mini-racer #38236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[wip] - js2py to mini-racer #38236
Changes from all commits
2812743
2696662
7aa8200
c37227f
2880125
5a91930
bc804db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| { | ||
| "comment": "Modify this file in a trivial way to cause this test suite to run", | ||
| "revision": 2 | ||
| "revision": 3 | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -16,8 +16,12 @@ | |||||||||||||
| # | ||||||||||||||
|
|
||||||||||||||
| """This module defines the basic MapToFields operation.""" | ||||||||||||||
|
|
||||||||||||||
| import datetime | ||||||||||||||
| import itertools | ||||||||||||||
| import re | ||||||||||||||
| import threading | ||||||||||||||
| import uuid | ||||||||||||||
| from collections import abc | ||||||||||||||
| from collections.abc import Callable | ||||||||||||||
| from collections.abc import Collection | ||||||||||||||
|
derrickaw marked this conversation as resolved.
derrickaw marked this conversation as resolved.
|
||||||||||||||
|
|
@@ -53,13 +57,13 @@ | |||||||||||||
| from apache_beam.yaml.yaml_errors import maybe_with_exception_handling_transform_fn | ||||||||||||||
| from apache_beam.yaml.yaml_provider import dicts_to_rows | ||||||||||||||
|
|
||||||||||||||
| # Import js2py package if it exists | ||||||||||||||
| try: | ||||||||||||||
| import js2py | ||||||||||||||
| from js2py.base import JsObjectWrapper | ||||||||||||||
| from py_mini_racer import MiniRacer | ||||||||||||||
| except ImportError: | ||||||||||||||
| js2py = None | ||||||||||||||
| JsObjectWrapper = object | ||||||||||||||
| MiniRacer = None | ||||||||||||||
|
|
||||||||||||||
| _JS_DATE_ISO_REGEX = re.compile( | ||||||||||||||
| r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$') | ||||||||||||||
|
|
||||||||||||||
| _str_expression_fields = { | ||||||||||||||
| 'AssignTimestamps': 'timestamp', | ||||||||||||||
|
|
@@ -178,20 +182,6 @@ def _check_mapping_arguments( | |||||||||||||
| raise ValueError(f'{transform_name} cannot specify "name" without "path"') | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| # js2py's JsObjectWrapper object has a self-referencing __dict__ property | ||||||||||||||
| # that cannot be pickled without implementing the __getstate__ and | ||||||||||||||
| # __setstate__ methods. | ||||||||||||||
| class _CustomJsObjectWrapper(JsObjectWrapper): | ||||||||||||||
| def __init__(self, js_obj): | ||||||||||||||
| super().__init__(js_obj.__dict__['_obj']) | ||||||||||||||
|
|
||||||||||||||
| def __getstate__(self): | ||||||||||||||
| return self.__dict__.copy() | ||||||||||||||
|
|
||||||||||||||
| def __setstate__(self, state): | ||||||||||||||
| self.__dict__.update(state) | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| # TODO(yaml) Improve type inferencing for JS UDF's | ||||||||||||||
| def py_value_to_js_dict(py_value): | ||||||||||||||
| if ((isinstance(py_value, tuple) and hasattr(py_value, '_asdict')) or | ||||||||||||||
|
|
@@ -205,85 +195,138 @@ def py_value_to_js_dict(py_value): | |||||||||||||
| return py_value | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| # TODO(yaml) Consider adding optional language version parameter to support | ||||||||||||||
| # ECMAScript 5 and 6 | ||||||||||||||
| def _expand_javascript_mapping_func( | ||||||||||||||
| original_fields, expression=None, callable=None, path=None, name=None): | ||||||||||||||
| def js_to_py(obj): | ||||||||||||||
| """Converts mini-racer mapped objects to standard Python types. | ||||||||||||||
|
|
||||||||||||||
| This is needed because ctx.eval returns objects that implement Mapping | ||||||||||||||
| and Iterable but are not picklable (like JSMappedObjectImpl and JSArrayImpl), | ||||||||||||||
| which would fail when Beam tries to serialize rows containing them. | ||||||||||||||
| We also preserve datetime objects which are correctly produced by ctx.eval | ||||||||||||||
| for JS Date objects. | ||||||||||||||
| """ | ||||||||||||||
| if isinstance(obj, datetime.datetime): | ||||||||||||||
| return obj | ||||||||||||||
| elif isinstance(obj, Mapping): | ||||||||||||||
| return {k: js_to_py(v) for k, v in obj.items()} | ||||||||||||||
| elif not isinstance(obj, (str, bytes)) and isinstance(obj, Iterable): | ||||||||||||||
|
Comment on lines
+209
to
+211
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The names
Suggested change
|
||||||||||||||
| return [js_to_py(v) for v in obj] | ||||||||||||||
| elif isinstance(obj, str): | ||||||||||||||
| if _JS_DATE_ISO_REGEX.match(obj): | ||||||||||||||
| try: | ||||||||||||||
| return datetime.datetime.fromisoformat(obj[:-1] + '+00:00') | ||||||||||||||
| except ValueError: | ||||||||||||||
| return obj | ||||||||||||||
| return obj | ||||||||||||||
| else: | ||||||||||||||
| return obj | ||||||||||||||
|
|
||||||||||||||
| # Check for installed js2py package | ||||||||||||||
| if js2py is None: | ||||||||||||||
| raise ValueError( | ||||||||||||||
| "Javascript mapping functions are not supported on" | ||||||||||||||
| " Python 3.12 or later.") | ||||||||||||||
|
|
||||||||||||||
| # import remaining js2py objects | ||||||||||||||
| from js2py import base | ||||||||||||||
| from js2py.constructors import jsdate | ||||||||||||||
| from js2py.internals import simplex | ||||||||||||||
|
|
||||||||||||||
| js_array_type = ( | ||||||||||||||
| base.PyJsArray, | ||||||||||||||
| base.PyJsArrayBuffer, | ||||||||||||||
| base.PyJsInt8Array, | ||||||||||||||
| base.PyJsUint8Array, | ||||||||||||||
| base.PyJsUint8ClampedArray, | ||||||||||||||
| base.PyJsInt16Array, | ||||||||||||||
| base.PyJsUint16Array, | ||||||||||||||
| base.PyJsInt32Array, | ||||||||||||||
| base.PyJsUint32Array, | ||||||||||||||
| base.PyJsFloat32Array, | ||||||||||||||
| base.PyJsFloat64Array) | ||||||||||||||
|
|
||||||||||||||
| def _js_object_to_py_object(obj): | ||||||||||||||
| if isinstance(obj, (base.PyJsNumber, base.PyJsString, base.PyJsBoolean)): | ||||||||||||||
| return base.to_python(obj) | ||||||||||||||
| elif isinstance(obj, js_array_type): | ||||||||||||||
| return [_js_object_to_py_object(value) for value in obj.to_list()] | ||||||||||||||
| elif isinstance(obj, jsdate.PyJsDate): | ||||||||||||||
| return obj.to_utc_dt() | ||||||||||||||
| elif isinstance(obj, (base.PyJsNull, base.PyJsUndefined)): | ||||||||||||||
| return None | ||||||||||||||
| elif isinstance(obj, base.PyJsError): | ||||||||||||||
| raise RuntimeError(obj['message']) | ||||||||||||||
| elif isinstance(obj, base.PyJsObject): | ||||||||||||||
| return { | ||||||||||||||
| key: _js_object_to_py_object(value['value']) | ||||||||||||||
| for (key, value) in obj.own.items() | ||||||||||||||
| } | ||||||||||||||
| elif isinstance(obj, base.JsObjectWrapper): | ||||||||||||||
| return _js_object_to_py_object(obj._obj) | ||||||||||||||
|
|
||||||||||||||
| return obj | ||||||||||||||
| class JsFilterDoFn(beam.DoFn): | ||||||||||||||
| def __init__(self, udf_code, function_name): | ||||||||||||||
| self.udf_code = udf_code | ||||||||||||||
| self.function_name = function_name | ||||||||||||||
| self.ctx = None | ||||||||||||||
|
|
||||||||||||||
| def setup(self): | ||||||||||||||
| self.ctx = MiniRacer() | ||||||||||||||
| self.ctx.eval(self.udf_code) | ||||||||||||||
|
|
||||||||||||||
| def process(self, element): | ||||||||||||||
| row_as_dict = py_value_to_js_dict(element) | ||||||||||||||
| result = self.ctx.call(self.function_name, row_as_dict) | ||||||||||||||
| result = js_to_py(result) | ||||||||||||||
| if result: | ||||||||||||||
| yield element | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| class JsMapToFieldsDoFn(beam.DoFn): | ||||||||||||||
| def __init__(self, fields, original_fields, input_schema): | ||||||||||||||
| self.fields = fields | ||||||||||||||
| self.original_fields = original_fields | ||||||||||||||
| self.input_schema = input_schema | ||||||||||||||
| self.ctx = None | ||||||||||||||
| self.field_funcs = {} | ||||||||||||||
| self.passthrough_fields = [] | ||||||||||||||
|
|
||||||||||||||
| def setup(self): | ||||||||||||||
| self.ctx = MiniRacer() | ||||||||||||||
| script = [] | ||||||||||||||
| for name, expr in self.fields.items(): | ||||||||||||||
| if isinstance(expr, str) and expr in self.input_schema: | ||||||||||||||
| self.passthrough_fields.append((name, expr)) | ||||||||||||||
| continue | ||||||||||||||
|
|
||||||||||||||
| if isinstance(expr, str): | ||||||||||||||
| expr = {'expression': expr} | ||||||||||||||
|
|
||||||||||||||
| if 'expression' in expr: | ||||||||||||||
| e = expr['expression'] | ||||||||||||||
| code = f"var func_{name} = (__row__) => {{ " + " ".join([ | ||||||||||||||
| f"const {n} = __row__.{n};" for n in self.original_fields if n in e | ||||||||||||||
| ]) + f" return ({e}); }}" | ||||||||||||||
| script.append(code) | ||||||||||||||
| self.field_funcs[name] = f"func_{name}" | ||||||||||||||
| elif 'callable' in expr: | ||||||||||||||
| code = f"var func_{name} = {expr['callable']}" | ||||||||||||||
| script.append(code) | ||||||||||||||
| self.field_funcs[name] = f"func_{name}" | ||||||||||||||
| elif 'path' in expr and 'name' in expr: | ||||||||||||||
| path = expr['path'] | ||||||||||||||
| func_name = expr['name'] | ||||||||||||||
| udf_code = FileSystems.open(path).read().decode() | ||||||||||||||
| script.append(udf_code) | ||||||||||||||
| self.field_funcs[name] = func_name | ||||||||||||||
|
Comment on lines
+273
to
+278
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reading the UDF file inside |
||||||||||||||
|
|
||||||||||||||
| if script: | ||||||||||||||
| self.ctx.eval("\n".join(script)) | ||||||||||||||
|
|
||||||||||||||
| def process(self, element): | ||||||||||||||
| row_as_dict = py_value_to_js_dict(element) | ||||||||||||||
| result_dict = {} | ||||||||||||||
|
|
||||||||||||||
| # Handle passthrough fields | ||||||||||||||
| for name, src in self.passthrough_fields: | ||||||||||||||
| result_dict[name] = row_as_dict.get(src) | ||||||||||||||
|
|
||||||||||||||
| # Handle JS fields | ||||||||||||||
| for name, func_name in self.field_funcs.items(): | ||||||||||||||
| res = self.ctx.call(func_name, row_as_dict) | ||||||||||||||
| result_dict[name] = js_to_py(res) | ||||||||||||||
|
|
||||||||||||||
| yield dicts_to_rows(result_dict) | ||||||||||||||
|
|
||||||||||||||
| if expression: | ||||||||||||||
| source = '\n'.join(['function(__row__) {'] + [ | ||||||||||||||
| f' {name} = __row__.{name}' | ||||||||||||||
| for name in original_fields if name in expression | ||||||||||||||
| ] + [' return (' + expression + ')'] + ['}']) | ||||||||||||||
| js_func = _CustomJsObjectWrapper(js2py.eval_js(source)) | ||||||||||||||
|
|
||||||||||||||
| elif callable: | ||||||||||||||
| js_func = _CustomJsObjectWrapper(js2py.eval_js(callable)) | ||||||||||||||
| # TODO(yaml) Consider adding optional language version parameter to support | ||||||||||||||
| # ECMAScript 5 and 6 | ||||||||||||||
| def _get_javascript_udf_code( | ||||||||||||||
| original_fields, | ||||||||||||||
| function_name="func", | ||||||||||||||
| expression=None, | ||||||||||||||
| callable=None, | ||||||||||||||
| path=None, | ||||||||||||||
| name=None): | ||||||||||||||
|
|
||||||||||||||
| if MiniRacer is None: | ||||||||||||||
| raise ValueError( | ||||||||||||||
| "JavaScript mapping functions require the 'mini-racer' package to be installed." | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| else: | ||||||||||||||
| udf_code = None | ||||||||||||||
| if path: | ||||||||||||||
| if not path.endswith('.js'): | ||||||||||||||
| raise ValueError(f'File "{path}" is not a valid .js file.') | ||||||||||||||
| udf_code = FileSystems.open(path).read().decode() | ||||||||||||||
| js = js2py.EvalJs() | ||||||||||||||
| js.eval(udf_code) | ||||||||||||||
| js_func = _CustomJsObjectWrapper(getattr(js, name)) | ||||||||||||||
|
|
||||||||||||||
| def js_wrapper(row): | ||||||||||||||
| row_as_dict = py_value_to_js_dict(row) | ||||||||||||||
| try: | ||||||||||||||
| js_result = js_func(row_as_dict) | ||||||||||||||
| except simplex.JsException as exn: | ||||||||||||||
| raise RuntimeError( | ||||||||||||||
| f"Error evaluating javascript expression: " | ||||||||||||||
| f"{exn.mes['message']}") from exn | ||||||||||||||
| return dicts_to_rows(_js_object_to_py_object(js_result)) | ||||||||||||||
|
|
||||||||||||||
| return js_wrapper | ||||||||||||||
| return udf_code, name | ||||||||||||||
| elif expression: | ||||||||||||||
| udf_code = f"var {function_name} = (__row__) => {{ " + " ".join([ | ||||||||||||||
| f"const {n} = __row__.{n};" for n in original_fields if n in expression | ||||||||||||||
| ]) + f" return ({expression}); }}" | ||||||||||||||
| return udf_code, function_name | ||||||||||||||
| elif callable: | ||||||||||||||
| udf_code = f"var {function_name} = {callable}" | ||||||||||||||
| return udf_code, function_name | ||||||||||||||
| else: | ||||||||||||||
| raise ValueError("Must specify expression, callable, or path.") | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| def _expand_python_mapping_func( | ||||||||||||||
|
|
@@ -390,14 +433,10 @@ def _as_callable(original_fields, expr, transform_name, language, input_schema): | |||||||||||||
| explicit_type = expr.pop('output_type', None) | ||||||||||||||
| _check_mapping_arguments(transform_name, **expr) | ||||||||||||||
|
|
||||||||||||||
| if language == "javascript": | ||||||||||||||
| func = _expand_javascript_mapping_func(original_fields, **expr) | ||||||||||||||
| elif language in ("python", "generic", None): | ||||||||||||||
| if language in ("python", "generic", None): | ||||||||||||||
| func = _expand_python_mapping_func(original_fields, **expr) | ||||||||||||||
| else: | ||||||||||||||
| raise ValueError( | ||||||||||||||
| f'Unknown language for mapping transform: {language}. ' | ||||||||||||||
| 'Supported languages are "javascript" and "python."') | ||||||||||||||
| raise ValueError(f'Language {language} not supported in this context.') | ||||||||||||||
|
|
||||||||||||||
| if explicit_type: | ||||||||||||||
| if isinstance(explicit_type, str): | ||||||||||||||
|
|
@@ -636,8 +675,17 @@ def _PyJsFilter( | |||||||||||||
| error_handling: Whether and where to output records that throw errors when | ||||||||||||||
| the above expressions are evaluated. | ||||||||||||||
| """ # pylint: disable=line-too-long | ||||||||||||||
| keep_fn = _as_callable_for_pcoll(pcoll, keep, "keep", language or 'generic') | ||||||||||||||
| return pcoll | beam.Filter(keep_fn) | ||||||||||||||
| if language == 'javascript': | ||||||||||||||
| if isinstance(keep, str): | ||||||||||||||
| keep = {'expression': keep} | ||||||||||||||
| udf_code, function_name = _get_javascript_udf_code( | ||||||||||||||
| [f.name for f in schema_from_element_type(pcoll.element_type).fields], | ||||||||||||||
| **keep | ||||||||||||||
| ) | ||||||||||||||
| return pcoll | beam.ParDo(JsFilterDoFn(udf_code, function_name)) | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The use of |
||||||||||||||
| else: | ||||||||||||||
| keep_fn = _as_callable_for_pcoll(pcoll, keep, "keep", language or 'generic') | ||||||||||||||
| return pcoll | beam.Filter(keep_fn) | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| def is_expr(v): | ||||||||||||||
|
|
@@ -709,10 +757,12 @@ def _PyJsMapToFields( | |||||||||||||
| """ # pylint: disable=line-too-long | ||||||||||||||
| input_schema, fields = normalize_fields( | ||||||||||||||
| pcoll, fields, drop or (), append, language=language or 'generic') | ||||||||||||||
| original_fields = list(input_schema.keys()) | ||||||||||||||
|
|
||||||||||||||
| if language == 'javascript': | ||||||||||||||
| options.YamlOptions.check_enabled(pcoll.pipeline, 'javascript') | ||||||||||||||
|
|
||||||||||||||
| original_fields = list(input_schema.keys()) | ||||||||||||||
| return pcoll | beam.ParDo( | ||||||||||||||
| JsMapToFieldsDoFn(fields, original_fields, input_schema)) | ||||||||||||||
|
Comment on lines
+764
to
+765
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||
|
|
||||||||||||||
| return pcoll | beam.Select( | ||||||||||||||
| **{ | ||||||||||||||
|
|
||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Several required imports are missing for the new functionality:
FileSystems(fromapache_beam.io.filesystems) andschema_from_element_type(fromapache_beam.typehints.schemas). Additionally,threadinganduuidappear to be unused in this file and should be removed to keep the code clean.