diff --git a/pyglove/core/__init__.py b/pyglove/core/__init__.py index 8b3d292..a90cc32 100644 --- a/pyglove/core/__init__.py +++ b/pyglove/core/__init__.py @@ -299,6 +299,7 @@ DocStr = utils.DocStr registered_types = utils.registered_types +enable_opaque_pickle = utils.enable_opaque_pickle explicit_method_override = utils.explicit_method_override is_partial = utils.is_partial diff --git a/pyglove/core/utils/__init__.py b/pyglove/core/utils/__init__.py index 91f7887..8667d33 100644 --- a/pyglove/core/utils/__init__.py +++ b/pyglove/core/utils/__init__.py @@ -79,6 +79,7 @@ from pyglove.core.utils.json_conversion import from_json from pyglove.core.utils.json_conversion import to_json from pyglove.core.utils.json_conversion import registered_types +from pyglove.core.utils.json_conversion import enable_opaque_pickle # Handling formatting. from pyglove.core.utils.formatting import Formattable diff --git a/pyglove/core/utils/json_conversion.py b/pyglove/core/utils/json_conversion.py index 5000957..597314c 100644 --- a/pyglove/core/utils/json_conversion.py +++ b/pyglove/core/utils/json_conversion.py @@ -25,7 +25,7 @@ import types import typing from typing import Any, Callable, ContextManager, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Tuple, Type, TypeVar, Union - +import warnings # Nestable[T] is a (maybe) nested structure of T, which could be T, a Dict # a List or a Tuple of Nestable[T]. We use a Union to fool PyType checker to @@ -64,7 +64,7 @@ def __init__(self): def register( self, type_name: str, cls: Type[Any], override_existing: bool = False - ) -> None: + ) -> None: """Register a ``symbolic.Object`` class with a type name. Args: @@ -82,13 +82,12 @@ def register( if type_name in self._type_to_cls_map and not override_existing: raise KeyError( f'Type {type_name!r} has already been registered with class ' - f'{self._type_to_cls_map[type_name].__name__}.') + f'{self._type_to_cls_map[type_name].__name__}.' + ) self._type_to_cls_map[type_name] = cls def add_module_alias( - self, - module: str, - alias: Union[str, Sequence[str]] + self, module: str, alias: Union[str, Sequence[str]] ) -> None: """Maps a module name to another name. Usually due to rename.""" if isinstance(alias, str): @@ -117,8 +116,7 @@ def load_types_for_deserialization( finally: self._ondemand_registry_stack.pop() - def class_from_typename( - self, type_name: str) -> Optional[Type[Any]]: + def class_from_typename(self, type_name: str) -> Optional[Type[Any]]: """Get class from type name.""" if self._ondemand_registry_stack: top_registry = self._ondemand_registry_stack[-1] @@ -205,7 +203,8 @@ def from_json(cls, json_value, **kwargs): # When this field is set by users, the converter will be invoked when a # complex value cannot be serialized by existing methods. TYPE_CONVERTER: Optional[ - Callable[[Type[Any]], Callable[[Any], JSONValueType]]] = None + Callable[[Type[Any]], Callable[[Any], JSONValueType]] + ] = None # Class property that indicates whether to automatically register class # for deserialization. Subclass can override. @@ -225,16 +224,16 @@ def from_json(cls, json_value: JSONValueType, **kwargs) -> 'JSONConvertible': An instance of cls. """ assert isinstance(json_value, dict) - init_args = {k: from_json(v, **kwargs) for k, v in json_value.items() - if k != JSONConvertible.TYPE_NAME_KEY} + init_args = { + k: from_json(v, **kwargs) + for k, v in json_value.items() + if k != JSONConvertible.TYPE_NAME_KEY + } return cls(**init_args) @abc.abstractmethod def to_json( - self, - *, - context: Optional['JSONConversionContext'] = None, - **kwargs + self, *, context: Optional['JSONConversionContext'] = None, **kwargs ) -> JSONValueType: """Returns a plain Python value as a representation for this object. @@ -256,8 +255,8 @@ def register( cls, type_name: str, subclass: Type['JSONConvertible'], - override_existing: bool = False - ) -> None: + override_existing: bool = False, + ) -> None: """Registers a class with a type name. The type name will be used as the key for class lookup during @@ -267,16 +266,14 @@ def register( Args: type_name: A global unique string identifier for subclass. subclass: A subclass of JSONConvertible. - override_existing: If True, override the class if the type name is - already present in the registry. Otherwise an error will be raised. + override_existing: If True, override the class if the type name is already + present in the registry. Otherwise an error will be raised. """ cls._TYPE_REGISTRY.register(type_name, subclass, override_existing) @classmethod def add_module_alias( - cls, - module: str, - alias: Union[str, Sequence[str]] + cls, module: str, alias: Union[str, Sequence[str]] ) -> None: """Adds a module alias so previous serialized objects could be loaded.""" cls._TYPE_REGISTRY.add_module_alias(module, alias) @@ -288,7 +285,8 @@ def is_registered(cls, type_name: str) -> bool: @classmethod def class_from_typename( - cls, type_name: str) -> Optional[Type['JSONConvertible']]: + cls, type_name: str + ) -> Optional[Type['JSONConvertible']]: """Gets the class for a registered type name. Args: @@ -307,9 +305,8 @@ def registered_types(cls) -> Iterable[Tuple[str, Type['JSONConvertible']]]: @classmethod def load_types_for_deserialization( - cls, - *types_to_deserialize: Type[Any] - ) -> ContextManager[Dict[str, Type[Any]]]: + cls, *types_to_deserialize: Type[Any] + ) -> ContextManager[Dict[str, Type[Any]]]: """Context manager for loading unregistered types for deserialization. Example:: @@ -342,7 +339,8 @@ def to_json_dict( *, exclude_default=False, exclude_keys: Optional[Set[str]] = None, - **kwargs) -> Dict[str, JSONValueType]: + **kwargs, + ) -> Dict[str, JSONValueType]: """Helper method to create JSON dict from class and field.""" json_dict = {JSONConvertible.TYPE_NAME_KEY: _serialization_key(cls)} exclude_keys = exclude_keys or set() @@ -351,9 +349,11 @@ def to_json_dict( if k not in exclude_keys and v != default: json_dict[k] = to_json(v, **kwargs) else: - json_dict.update( - {k: to_json(v, **kwargs) for k, v in fields.items() - if k not in exclude_keys}) + json_dict.update({ + k: to_json(v, **kwargs) + for k, v in fields.items() + if k not in exclude_keys + }) return json_dict def __init_subclass__(cls): @@ -364,7 +364,8 @@ def __init_subclass__(cls): def _serialization_key( - type_or_function: Union[Type[Any], types.FunctionType]) -> str: + type_or_function: Union[Type[Any], types.FunctionType], +) -> str: """Returns the ID for a type or function.""" serializaton_key = getattr(type_or_function, '__serialization_key__', None) if serializaton_key is not None: @@ -372,14 +373,35 @@ def _serialization_key( return _type_name(type_or_function) -def _type_name( - type_or_function: Union[Type[Any], types.FunctionType]) -> str: +def _type_name(type_or_function: Union[Type[Any], types.FunctionType]) -> str: return f'{type_or_function.__module__}.{type_or_function.__qualname__}' +_allow_opaque_pickle = False + + +def enable_opaque_pickle(allow: bool = True) -> None: + """Enables or disables opaque pickle deserialization.""" + global _allow_opaque_pickle + if allow: + warnings.warn( + 'Enabling opaque pickle deserialization is insecure and can lead to' + ' arbitrary code execution. Please use with caution and only on trusted' + ' data.', + UserWarning, + stacklevel=2, + ) + JSONConvertible.register( + _serialization_key(_OpaqueObject), _OpaqueObject, override_existing=True + ) + _allow_opaque_pickle = allow + + class _OpaqueObject(JSONConvertible): """An JSON converter for opaque Python objects.""" + auto_register = False + def __init__(self, value: Any, encoded: bool = False): if encoded: value = self.decode(value) @@ -395,19 +417,24 @@ def encode(self, value: Any) -> JSONValueType: return base64.encodebytes(pickle.dumps(value)).decode('utf-8') except Exception as e: raise ValueError( - f'Cannot encode opaque object {value!r} with pickle.') from e + f'Cannot encode opaque object {value!r} with pickle.' + ) from e def decode(self, json_value: JSONValueType) -> Any: assert isinstance(json_value, str), json_value + if not _allow_opaque_pickle: + raise ValueError( + 'Deserializing opaque python objects via pickle is disabled by ' + 'default for security reasons. You can enable it by calling ' + '`pg.enable_opaque_pickle()`.' + ) try: return pickle.loads(base64.decodebytes(json_value.encode('utf-8'))) except Exception as e: raise ValueError('Cannot decode opaque object with pickle.') from e def to_json(self, **kwargs) -> JSONValueType: - return self.to_json_dict({ - 'value': self.encode(self._value) - }, **kwargs) + return self.to_json_dict({'value': self.encode(self._value)}, **kwargs) @classmethod def from_json( @@ -415,7 +442,7 @@ def from_json( json_value: JSONValueType, *args, context: Optional['JSONConversionContext'] = None, - **kwargs + **kwargs, ) -> Any: del args, context, kwargs assert isinstance(json_value, dict) and 'value' in json_value, json_value @@ -450,7 +477,9 @@ class ObjectEntry: ref_index: int ref_count: int - def __init__(self,) -> None: + def __init__( + self, + ) -> None: self._shared_objects: list[JSONConversionContext.ObjectEntry] = [] self._id_to_shared_object = {} @@ -470,7 +499,7 @@ def serialize_maybe_shared( self, value: Any, json_fn: Optional[Callable[..., JSONValueType]] = None, - **kwargs + **kwargs, ) -> JSONValueType: """Track maybe shared objects and returns their JSON representation.""" if json_fn is None: @@ -483,9 +512,11 @@ def serialize_maybe_shared( # It's possible that maybe_shared_json is called recursively on the same # object, thus we need to check for self-references explicitly. - if (isinstance(serialized, dict) + if ( + isinstance(serialized, dict) and JSONConvertible.REF_KEY in serialized - and len(serialized) == 1): + and len(serialized) == 1 + ): return serialized shared_object = self.ObjectEntry( @@ -497,9 +528,7 @@ def serialize_maybe_shared( self._shared_objects.append(shared_object) self._id_to_shared_object[value_id] = shared_object shared_object.ref_count += 1 - return { - JSONConvertible.REF_KEY: shared_object.ref_index - } + return {JSONConvertible.REF_KEY: shared_object.ref_index} def _maybe_deref(self, serialized: Any, ref_index_map: dict[int, int]) -> Any: """In-place dereference ref-1 shared objects in an object tree. @@ -586,22 +615,14 @@ def from_json( def to_json( - value: Any, - *, - context: Optional[JSONConversionContext] = None, - **kwargs + value: Any, *, context: Optional[JSONConversionContext] = None, **kwargs ) -> Any: """Serializes a (maybe) JSONConvertible value into a plain Python object. Args: - value: value to serialize. Applicable value types are: - - * Builtin python types: None, bool, int, float, string; - * JSONConvertible types; - * List types; - * Tuple types; - * Dict types. - + value: value to serialize. Applicable types are: * Builtin python types: + None, bool, int, float, string; * JSONConvertible types; * Listtypes; * + Tuple types; * Dict types. context: JSON conversion context. **kwargs: Keyword arguments to pass to value.to_json if value is JSONConvertible. @@ -621,25 +642,23 @@ def to_json( elif isinstance(value, JSONConvertible): # Non-symbolic objects serialize by references. v = context.serialize_maybe_shared( - value, - json_fn=getattr(value, 'sym_jsonify', value.to_json), - **kwargs + value, json_fn=getattr(value, 'sym_jsonify', value.to_json), **kwargs ) elif isinstance(value, list): # Standard lists serialize by references. v = context.serialize_maybe_shared( value, json_fn=lambda **kwargs: [to_json(x, **kwargs) for x in value], - **kwargs + **kwargs, ) elif isinstance(value, dict): # Standard dicts serialize by references. v = context.serialize_maybe_shared( value, json_fn=lambda **kwargs: { - k: to_json(v, **kwargs) for k, v in value.items() # pytype: disable=attribute-error + k: to_json(v, **kwargs) for k, v in value.items() # pytype: disable=attribute-error }, - **kwargs + **kwargs, ) elif isinstance(value, tuple): # Tuples serialize by values. @@ -663,7 +682,7 @@ def to_json( else: v, converted = None, False if JSONConvertible.TYPE_CONVERTER is not None: - converter = JSONConvertible.TYPE_CONVERTER(type(value)) # pylint: disable=not-callable + converter = JSONConvertible.TYPE_CONVERTER(type(value)) # pylint: disable=not-callable if converter: v = to_json(converter(value), context=context, **kwargs) converted = True @@ -672,7 +691,7 @@ def to_json( v = context.serialize_maybe_shared( value, json_fn=lambda **kwargs: _OpaqueObject(value).to_json(**kwargs), - **kwargs + **kwargs, ) if is_root: @@ -686,7 +705,7 @@ def from_json( context: Optional[JSONConversionContext] = None, auto_import: bool = True, convert_unknown: bool = False, - **kwargs + **kwargs, ) -> Any: """Deserializes a (maybe) JSONConvertible value from JSON value. @@ -694,29 +713,29 @@ def from_json( json_value: Input JSON value. context: Serialization context. auto_import: If True, when a '_type' is not registered, PyGlove will - identify its parent module and automatically import it. For example, - if the type is 'foo.bar.A', PyGlove will try to import 'foo.bar' and - find the class 'A' within the imported module. - convert_unknown: If True, when a '_type' is not registered and cannot - be imported, PyGlove will create objects of: - - `pg.symbolic.UnknownType` for unknown types; - - `pg.symbolic.UnknownTypedObject` for objects of unknown types; - - `pg.symbolic.UnknownFunction` for unknown functions; - - `pg.symbolic.UnknownMethod` for unknown methods. - If False, TypeError will be raised. + identify its parent module and automatically import it. For example, if + the type is 'foo.bar.A', PyGlove will try to import 'foo.bar' and find the + class 'A' within the imported module. + convert_unknown: If True, when a '_type' is not registered and cannot be + imported, PyGlove will create objects of: - `pg.symbolic.UnknownType` for + unknown types; - `pg.symbolic.UnknownTypedObject` for objects of unknown + types; - `pg.symbolic.UnknownFunction` for unknown functions; - + `pg.symbolic.UnknownMethod` for unknown methods. If False, TypeError will + be raised. **kwargs: Keyword arguments that will be passed to JSONConvertible.__init__. Returns: Deserialized value. """ if context is None: - if (isinstance(json_value, dict) - and (context_node := json_value.get(JSONConvertible.CONTEXT_KEY))): + if isinstance(json_value, dict) and ( + context_node := json_value.get(JSONConvertible.CONTEXT_KEY) + ): context = JSONConversionContext.from_json( context_node, auto_import=auto_import, convert_unknown=convert_unknown, - **kwargs + **kwargs, ) json_value = json_value[JSONConvertible.ROOT_VALUE_KEY] else: @@ -736,8 +755,9 @@ def child_from(v): if len(json_value) < 2: raise ValueError( f'Tuple should have at least one element ' - f'besides \'{JSONConvertible.TUPLE_MARKER}\'. ' - f'Encountered: {json_value}.') + f"besides '{JSONConvertible.TUPLE_MARKER}'. " + f'Encountered: {json_value}.' + ) return tuple([child_from(v) for v in json_value[1:]]) return [child_from(v) for v in json_value] elif isinstance(json_value, dict): @@ -788,7 +808,7 @@ def _resolve_typename(v: Dict[str, Any]) -> bool: ) from e elif not convert_unknown: raise TypeError( - f'Type name \'{type_name}\' is not registered ' + f"Type name '{type_name}' is not registered " 'with a `pg.JSONConvertible` subclass.\n' 'Try pass `auto_import=True` to load the type from its module.' ) @@ -803,6 +823,7 @@ def _resolve_typename(v: Dict[str, Any]) -> bool: if factory_fn is None and convert_unknown: type_name = v[JSONConvertible.TYPE_NAME_KEY] + def _factory_fn(json_value: Dict[str, Any], **kwargs): del kwargs # See `pg.symbolic.UnknownObject` for details. @@ -858,15 +879,15 @@ def _type_to_json(t: Type[Any]) -> Dict[str, str]: def _builtin_function_to_json(f: Any) -> Dict[str, str]: return { JSONConvertible.TYPE_NAME_KEY: 'function', - 'name': f'builtins.{f.__name__}' + 'name': f'builtins.{f.__name__}', } def _function_to_json(f: types.FunctionType) -> Dict[str, str]: """Converts a function to a JSON dict.""" - if ('' == f.__name__ # lambda functions. - or (f.__code__.co_flags & inspect.CO_NESTED) # local functions. - ): + if '' == f.__name__ or ( # lambda functions. + f.__code__.co_flags & inspect.CO_NESTED + ): # local functions. return { JSONConvertible.TYPE_NAME_KEY: 'function', 'name': _type_name(f), @@ -874,20 +895,14 @@ def _function_to_json(f: types.FunctionType) -> Dict[str, str]: 'defaults': to_json(f.__defaults__), } - return { - JSONConvertible.TYPE_NAME_KEY: 'function', - 'name': _type_name(f) - } + return {JSONConvertible.TYPE_NAME_KEY: 'function', 'name': _type_name(f)} def _method_to_json(f: types.MethodType) -> Dict[str, str]: """Converts a method to a JSON dict.""" type_name = _type_name(f) if isinstance(f.__self__, type): - return { - JSONConvertible.TYPE_NAME_KEY: 'method', - 'name': type_name - } + return {JSONConvertible.TYPE_NAME_KEY: 'method', 'name': type_name} raise ValueError(f'Cannot convert instance method {type_name!r} to JSON.') @@ -1012,6 +1027,7 @@ def _load_symbol(type_name: str) -> Any: def _type_from_json(convert_unknown: bool) -> Callable[..., Any]: """Loads a type from a JSON dict.""" + def _fn(json_value: Dict[str, str], **kwargs) -> Type[Any]: del kwargs try: @@ -1031,19 +1047,22 @@ def _fn(json_value: Dict[str, str], **kwargs) -> Type[Any]: # See `pg.symbolic.UnknownType` for details. json_value[JSONConvertible.TYPE_NAME_KEY] = 'unknown_type' return from_json(json_value) + return _fn def _function_from_json( - convert_unknown: bool + convert_unknown: bool, ) -> Callable[..., types.FunctionType]: """Loads a function from a JSON dict.""" + def _fn(json_value: Dict[str, str], **kwargs) -> types.FunctionType: del kwargs function_name = json_value['name'] if 'code' in json_value: code = marshal.loads( - base64.decodebytes(json_value['code'].encode('utf-8'))) + base64.decodebytes(json_value['code'].encode('utf-8')) + ) defaults = from_json(json_value['defaults'], _typename_resolved=True) return types.FunctionType( code=code, @@ -1062,13 +1081,13 @@ def _fn(json_value: Dict[str, str], **kwargs) -> types.FunctionType: ) from e json_value[JSONConvertible.TYPE_NAME_KEY] = 'unknown_function' return from_json(json_value) + return _fn -def _method_from_json( - convert_unknown: bool -) -> Callable[..., types.MethodType]: +def _method_from_json(convert_unknown: bool) -> Callable[..., types.MethodType]: """Loads a class method from a JSON dict.""" + def _fn(json_value: Dict[str, str], **kwargs) -> types.MethodType: del kwargs try: @@ -1082,6 +1101,7 @@ def _fn(json_value: Dict[str, str], **kwargs) -> types.MethodType: ) from e json_value[JSONConvertible.TYPE_NAME_KEY] = 'unknown_method' return from_json(json_value) + return _fn diff --git a/pyglove/core/utils/json_conversion_test.py b/pyglove/core/utils/json_conversion_test.py index 6d9b8ec..91c6efe 100644 --- a/pyglove/core/utils/json_conversion_test.py +++ b/pyglove/core/utils/json_conversion_test.py @@ -14,6 +14,7 @@ import abc import typing import unittest +import warnings from pyglove.core.symbolic import unknown_symbols from pyglove.core.typing import inspect as pg_inspect from pyglove.core.utils import json_conversion @@ -50,6 +51,7 @@ def __ne__(self, other): def bar(): pass + T1 = typing.TypeVar('T1') T2 = typing.TypeVar('T2') T3 = typing.TypeVar('T3') @@ -95,9 +97,7 @@ def __init__(self, x): self.x = x def to_json(self): - return self.__class__.to_json_dict({ - 'x': self.x - }) + return self.__class__.to_json_dict({'x': self.x}) def value(self): return self.x @@ -108,10 +108,12 @@ def value(self): self.assertFalse(json_conversion.JSONConvertible.is_registered(typename(A))) self.assertTrue(json_conversion.JSONConvertible.is_registered(typename(B))) self.assertIs( - json_conversion.JSONConvertible.class_from_typename(typename(B)), B) + json_conversion.JSONConvertible.class_from_typename(typename(B)), B + ) self.assertIn( (typename(B), B), - list(json_conversion.JSONConvertible.registered_types())) + list(json_conversion.JSONConvertible.registered_types()), + ) class C(B): auto_register = False @@ -120,14 +122,17 @@ class C(B): self.assertFalse(json_conversion.JSONConvertible.is_registered(typename(C))) with self.assertRaisesRegex( - KeyError, 'Type .* has already been registered with class .*'): + KeyError, 'Type .* has already been registered with class .*' + ): json_conversion.JSONConvertible.register(typename(B), C) json_conversion.JSONConvertible.register( - typename(B), C, override_existing=True) + typename(B), C, override_existing=True + ) self.assertIn( (typename(B), C), - list(json_conversion.JSONConvertible.registered_types())) + list(json_conversion.JSONConvertible.registered_types()), + ) # Test load_types_for_deserialization. class D(C): @@ -138,16 +143,17 @@ class D(C): self.assertEqual( json_conversion.JSONConvertible._TYPE_REGISTRY._ondemand_registry_stack, - [] + [], ) with json_conversion.JSONConvertible.load_types_for_deserialization(A): with json_conversion.JSONConvertible.load_types_for_deserialization( - D) as ondemand_registry: + D + ) as ondemand_registry: self.assertEqual(ondemand_registry, {'A': A, 'D': D}) self.assertIsNotNone(json_conversion.from_json(D(1).to_json())) self.assertEqual( json_conversion.JSONConvertible._TYPE_REGISTRY._ondemand_registry_stack, - [] + [], ) def test_json_conversion(self): @@ -168,27 +174,34 @@ def __ne__(self, other): typename = lambda cls: f'{cls.__module__}.{cls.__qualname__}' json_value = json_conversion.to_json([(T(1), 2), {'y': T(3)}]) - self.assertEqual(json_value, [ - ['__tuple__', {'_type': typename(T), 'x': 1}, 2], - {'y': {'_type': typename(T), 'x': 3}} - ]) - self.assertEqual(json_conversion.from_json(json_value), - [(T(1), 2), {'y': T(3)}]) + self.assertEqual( + json_value, + [ + ['__tuple__', {'_type': typename(T), 'x': 1}, 2], + {'y': {'_type': typename(T), 'x': 3}}, + ], + ) + self.assertEqual( + json_conversion.from_json(json_value), [(T(1), 2), {'y': T(3)}] + ) # Omitting default values. json_value = json_conversion.to_json([(T(), 2), {'y': T(3)}]) - self.assertEqual(json_value, [ - ['__tuple__', {'_type': typename(T)}, 2], - {'y': {'_type': typename(T), 'x': 3}} - ]) - self.assertEqual(json_conversion.from_json(json_value), - [(T(), 2), {'y': T(3)}]) + self.assertEqual( + json_value, + [ + ['__tuple__', {'_type': typename(T)}, 2], + {'y': {'_type': typename(T), 'x': 3}}, + ], + ) + self.assertEqual( + json_conversion.from_json(json_value), [(T(), 2), {'y': T(3)}] + ) # Test module alias. json_conversion.JSONConvertible.add_module_alias(T.__module__, 'mymodule') self.assertEqual( - json_conversion.from_json({'_type': f'mymodule.{T.__qualname__}'}), - T() + json_conversion.from_json({'_type': f'mymodule.{T.__qualname__}'}), T() ) def assert_conversion_is(self, v): @@ -204,9 +217,7 @@ def __init__(self, x=None): self.x = x def to_json(self, **kwargs): - return self.to_json_dict( - dict(x=(self.x, None)), exclude_default=True - ) + return self.to_json_dict(dict(x=(self.x, None)), exclude_default=True) def __eq__(self, other): return isinstance(other, self.__class__) and self.x == other.x @@ -216,13 +227,11 @@ def __ne__(self, other): def test_json_conversion_with_auto_import(self): json_dict = json_conversion.to_json(self.CustomJsonConvertible(1)) - with self.assertRaisesRegex( - TypeError, 'Type name .* is not registered'): + with self.assertRaisesRegex(TypeError, 'Type name .* is not registered'): json_conversion.from_json(json_dict, auto_import=False) self.assertEqual( - json_conversion.from_json(json_dict), - self.CustomJsonConvertible(1) + json_conversion.from_json(json_dict), self.CustomJsonConvertible(1) ) def test_json_conversion_for_types(self): @@ -253,7 +262,8 @@ class B: pass with self.assertRaisesRegex( - ValueError, 'Cannot convert local class .* to JSON.'): + ValueError, 'Cannot convert local class .* to JSON.' + ): json_conversion.to_json(B) # Generic types. @@ -263,7 +273,8 @@ class B: self.assert_conversion_is(G4[int, int, int, int]) with self.assertRaisesRegex( NotImplementedError, - 'Cannot convert generic type with more than 4 type arguments'): + 'Cannot convert generic type with more than 4 type arguments', + ): json_conversion.to_json(G5[int, int, int, int, int]) def test_json_conversion_for_annotations(self): @@ -304,22 +315,22 @@ def test_json_conversion_for_functions(self): # Locally defined function. def baz(x, y=1): return x + y + baz1 = json_conversion.from_json(json_conversion.to_json(baz)) self.assertTrue(pg_inspect.callable_eq(baz1, baz)) self.assertEqual(baz1(1), 2) self.assertEqual(baz1(1, 2), 3) - with self.assertRaisesRegex( - TypeError, 'Cannot load function .*'): + with self.assertRaisesRegex(TypeError, 'Cannot load function .*'): json_conversion.from_json( {'_type': 'function', 'name': 'non_existent_function'} ) self.assertEqual( json_conversion.from_json( {'_type': 'function', 'name': 'non_existent_function'}, - convert_unknown=True + convert_unknown=True, ), - unknown_symbols.UnknownFunction('non_existent_function') + unknown_symbols.UnknownFunction('non_existent_function'), ) def test_json_conversion_for_methods(self): @@ -331,74 +342,101 @@ def test_json_conversion_for_methods(self): self.assert_conversion_is(X.Y.Z.static_method) with self.assertRaisesRegex( - ValueError, 'Cannot convert instance method .* to JSON.'): + ValueError, 'Cannot convert instance method .* to JSON.' + ): json_conversion.to_json(X.Y.Z().instance_method) - with self.assertRaisesRegex( - TypeError, 'Cannot load method .*'): + with self.assertRaisesRegex(TypeError, 'Cannot load method .*'): json_conversion.from_json( {'_type': 'method', 'name': 'non_existent_method'} ) self.assertEqual( json_conversion.from_json( {'_type': 'method', 'name': 'non_existent_method'}, - convert_unknown=True + convert_unknown=True, ), - unknown_symbols.UnknownMethod('non_existent_method') + unknown_symbols.UnknownMethod('non_existent_method'), ) def test_json_conversion_for_opaque_objects(self): - self.assert_conversion_equal(X(1)) + # Test that deserialization of opaque objects is disabled by default. + json_dict = json_conversion.to_json(X(1)) + with self.assertRaisesRegex( + ValueError, 'Deserializing opaque python objects via pickle is disabled' + ): + json_conversion.from_json(json_dict) + + # Test that enabling opaque pickle works and issues a warning. + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter('always') + json_conversion.enable_opaque_pickle(True) + self.assertEqual(len(w), 1) + self.assertIn( + 'Enabling opaque pickle deserialization is insecure', + str(w[0].message), + ) + + try: + self.assert_conversion_equal(X(1)) + finally: + json_conversion.enable_opaque_pickle(False) class LocalX: pass with self.assertRaisesRegex( - ValueError, 'Cannot encode opaque object .* with pickle.'): + ValueError, 'Cannot encode opaque object .* with pickle.' + ): json_conversion.to_json(LocalX()) - json_dict = json_conversion.to_json(X(1)) - json_dict['value'] = 'abc' - with self.assertRaisesRegex( - ValueError, 'Cannot decode opaque object with pickle.'): - json_conversion.from_json(json_dict) + json_conversion.enable_opaque_pickle(True) + try: + json_dict = json_conversion.to_json(X(1)) + json_dict['value'] = 'abc' + with self.assertRaisesRegex( + ValueError, 'Cannot decode opaque object with pickle.' + ): + json_conversion.from_json(json_dict) + finally: + json_conversion.enable_opaque_pickle(False) def test_json_conversion_convert_unknown(self): self.assertEqual( - json_conversion.from_json([ - '__tuple__', - 1, - { - '_type': 'type', - 'name': 'Unknown type', - }, - { - '_type': 'Unknown type', - 'x': [{ + json_conversion.from_json( + [ + '__tuple__', + 1, + { + '_type': 'type', + 'name': 'Unknown type', + }, + { '_type': 'Unknown type', - }, { - '_type': 'function', - 'name': 'builtins.print' - }] - } - ], convert_unknown=True), + 'x': [ + { + '_type': 'Unknown type', + }, + {'_type': 'function', 'name': 'builtins.print'}, + ], + }, + ], + convert_unknown=True, + ), ( 1, unknown_symbols.UnknownType('Unknown type'), unknown_symbols.UnknownTypedObject( type_name='Unknown type', - x=[ - unknown_symbols.UnknownTypedObject('Unknown type'), - print - ] - ) - ) + x=[unknown_symbols.UnknownTypedObject('Unknown type'), print], + ), + ), ) def test_json_conversion_with_bad_types(self): # Bad tuple. with self.assertRaisesRegex( - ValueError, 'Tuple should have at least one element besides .*'): + ValueError, 'Tuple should have at least one element besides .*' + ): json_conversion.from_json(['__tuple__']) # Unregistered type without auto_import. @@ -410,17 +448,16 @@ def test_json_conversion_with_bad_types(self): '_type': 'Unknown type', 'x': [{ '_type': 'Unknown type', - }] - }, auto_import=False + }], + }, + auto_import=False, ) # Type does not exist. - with self.assertRaisesRegex( - TypeError, 'Cannot load class .*'): + with self.assertRaisesRegex(TypeError, 'Cannot load class .*'): json_conversion.from_json({'_type': '__main__.ABC'}) - with self.assertRaisesRegex( - TypeError, 'Cannot load type .*'): + with self.assertRaisesRegex(TypeError, 'Cannot load type .*'): json_conversion.from_json({'_type': 'type', 'name': '__main__.ABC'}) # Type exist but not a JSONConvertible subclass. @@ -429,67 +466,52 @@ class A: json_conversion.JSONConvertible.register('__main__.A', A) with self.assertRaisesRegex( - TypeError, '.* is not a `pg.JSONConvertible` subclass'): + TypeError, '.* is not a `pg.JSONConvertible` subclass' + ): json_conversion.from_json({'_type': '__main__.A'}) def test_json_conversion_with_sharing(self): - - class T(json_conversion.JSONConvertible): - - def __init__(self, x=None): - self.x = x - - def to_json(self, **kwargs): - return T.to_json_dict(dict(x=(self.x, None)), exclude_default=True) - - t = T(1) - x = X(1) - u = {'x': x} - v = [u, t] - y = dict(t=t, x=x, u=u, v=v) - y_json = json_conversion.to_json(y) - x_serialized = json_conversion._OpaqueObject(x).to_json() - self.assertEqual( - y_json, - { - '__context__': { - 'shared_objects': [ - { - '_type': json_conversion._type_name(T), - 'x': 1 - }, - x_serialized, - { - 'x': { - '__ref__': 1 - } - } - ] - }, - '__root__': { - 't': { - '__ref__': 0 - }, - 'x': { - '__ref__': 1 - }, - 'u': { - '__ref__': 2 - }, - 'v': [ - { - '__ref__': 2 - }, - { - '__ref__': 0 - } - ] - } - } - ) - y_prime = json_conversion.from_json(y_json) - self.assertIs(y_prime['t'], y_prime['v'][1]) - self.assertIs(y_prime['u'], y_prime['v'][0]) + json_conversion.enable_opaque_pickle(True) + try: + + class T(json_conversion.JSONConvertible): + + def __init__(self, x=None): + self.x = x + + def to_json(self, **kwargs): + return T.to_json_dict(dict(x=(self.x, None)), exclude_default=True) + + t = T(1) + x = X(1) + u = {'x': x} + v = [u, t] + y = dict(t=t, x=x, u=u, v=v) + y_json = json_conversion.to_json(y) + x_serialized = json_conversion._OpaqueObject(x).to_json() + self.assertEqual( + y_json, + { + '__context__': { + 'shared_objects': [ + {'_type': json_conversion._type_name(T), 'x': 1}, + x_serialized, + {'x': {'__ref__': 1}}, + ] + }, + '__root__': { + 't': {'__ref__': 0}, + 'x': {'__ref__': 1}, + 'u': {'__ref__': 2}, + 'v': [{'__ref__': 2}, {'__ref__': 0}], + }, + }, + ) + y_prime = json_conversion.from_json(y_json) + self.assertIs(y_prime['t'], y_prime['v'][1]) + self.assertIs(y_prime['u'], y_prime['v'][0]) + finally: + json_conversion.enable_opaque_pickle(False) def test_json_conversion_with_sharing_convert_unknown(self): self.assertEqual( @@ -501,31 +523,22 @@ def test_json_conversion_with_sharing_convert_unknown(self): '_type': 'type', 'name': '__main__.ABC', }, - { - '_type': '__main__.ABC', - 'x': 1 - } + {'_type': '__main__.ABC', 'x': 1}, ] }, '__root__': [ - { - '__ref__': 0 - }, - { - '__ref__': 1 - }, - ] + {'__ref__': 0}, + {'__ref__': 1}, + ], }, - convert_unknown=True + convert_unknown=True, ), [ unknown_symbols.UnknownType('__main__.ABC'), - unknown_symbols.UnknownTypedObject( - type_name='__main__.ABC', - x=1 - ) - ] + unknown_symbols.UnknownTypedObject(type_name='__main__.ABC', x=1), + ], ) + if __name__ == '__main__': unittest.main()