diff --git a/plaso/parsers/plist_plugins/interface.py b/plaso/parsers/plist_plugins/interface.py index 31fc45f547..625c53360d 100644 --- a/plaso/parsers/plist_plugins/interface.py +++ b/plaso/parsers/plist_plugins/interface.py @@ -789,7 +789,7 @@ def _GetKeys(self, top_level, keys, depth=1): return match - def _RecurseKey(self, plist_item, depth=15, key_path=""): + def _RecurseKey(self, plist_item, depth=15, key_path="", visited_object_ids=None): """Flattens nested dictionaries and lists by yielding its values. The hierarchy of a plist file is a series of nested dictionaries and lists. @@ -814,20 +814,44 @@ def _RecurseKey(self, plist_item, depth=15, key_path=""): depth (Optional[int]): current recursion depth. This value is used to ensure we stop at the maximum recursion depth. key_path (Optional[str]): path of the current working key. + visited_object_ids (Optional[set[int]]): identities (id()) of container + objects already visited, used to avoid re-walking shared or cyclic + references. Identity is used rather than equality so that distinct + objects with equal contents are both still visited. Yields: tuple[str, str, object]: key path, key name and value. """ + if visited_object_ids is None: + visited_object_ids = set() + if depth < 1: logger.debug(f"Maximum recursion depth of 15 reached for key: {key_path:s}") elif isinstance(plist_item, (list, tuple)): + # Track container identity so a shared or cyclic reference is walked + # once per object rather than once per path. Without this a binary + # plist whose decoded object graph shares sub-objects (a DAG) expands + # to an unbounded number of visits. + object_id = id(plist_item) + if object_id in visited_object_ids: + return + visited_object_ids.add(object_id) + for sub_plist_item in plist_item: yield from self._RecurseKey( - sub_plist_item, depth=depth - 1, key_path=key_path + sub_plist_item, + depth=depth - 1, + key_path=key_path, + visited_object_ids=visited_object_ids, ) elif hasattr(plist_item, "items"): + object_id = id(plist_item) + if object_id in visited_object_ids: + return + visited_object_ids.add(object_id) + for subkey_name, value in plist_item.items(): yield key_path, subkey_name, value @@ -840,7 +864,10 @@ def _RecurseKey(self, plist_item, depth=15, key_path=""): if isinstance(sub_plist_item, dict): subkey_path = "/".join([key_path, subkey_name]) yield from self._RecurseKey( - sub_plist_item, depth=depth - 1, key_path=subkey_path + sub_plist_item, + depth=depth - 1, + key_path=subkey_path, + visited_object_ids=visited_object_ids, ) # pylint: disable=arguments-differ diff --git a/test_data/datetime_multipath.bplist b/test_data/datetime_multipath.bplist new file mode 100644 index 0000000000..fe45629b4a Binary files /dev/null and b/test_data/datetime_multipath.bplist differ diff --git a/tests/parsers/plist_plugins/interface.py b/tests/parsers/plist_plugins/interface.py index d85530f3f2..78aff5ca7f 100644 --- a/tests/parsers/plist_plugins/interface.py +++ b/tests/parsers/plist_plugins/interface.py @@ -185,6 +185,96 @@ def testRecurseKey(self): expected = {"DeviceCache", "44-00-00-00-00-04", "44-00-00-00-00-02"} self.assertTrue(expected == set(my_keys)) + def testRecurseKeyShared(self): + """Tests the _RecurseKey function with shared object references.""" + plugin = MockPlugin() + + # The same dictionary is referenced by multiple keys, which forms a + # directed acyclic graph rather than a tree. _RecurseKey must flatten a + # shared dictionary once, not once per path that reaches it. + shared_dict = {"shared_key": "shared_value"} + intermediate_dict = {"first": shared_dict, "second": shared_dict} + top_level = {"left": intermediate_dict, "right": intermediate_dict} + + result = list(plugin._RecurseKey(top_level)) + + number_of_shared_keys = 0 + for _, key, _ in result: + if key == "shared_key": + number_of_shared_keys += 1 + self.assertEqual(number_of_shared_keys, 1) + + def testRecurseKeyCyclic(self): + """Tests the _RecurseKey function with a cyclic reference.""" + plugin = MockPlugin() + + # A dictionary that references itself must not cause _RecurseKey to + # recurse without bound. + top_level = {"name": "root"} + top_level["cycle"] = top_level + + result = list(plugin._RecurseKey(top_level)) + + number_of_name_keys = 0 + for _, key, _ in result: + if key == "name": + number_of_name_keys += 1 + self.assertEqual(number_of_name_keys, 1) + + +class TestPlistPluginRecursionLimits(test_lib.PlistPluginTestCase): + """Tests the plist plugin interface with pathological object graphs.""" + + # pylint: disable=protected-access + + def _CreateSharedGraph(self, fanout, depth): + """Creates a plist object graph that shares sub-objects. + + Each level is a dictionary whose keys all reference the same next-level + dictionary, so a small number of objects is reachable by fanout**depth + distinct paths. plistlib produces this kind of graph when a binary plist + stores multiple references to the same object. + + Args: + fanout (int): number of references to the shared sub-object per level. + depth (int): number of nested levels. + + Returns: + dict[str, object]: top level object of the graph. + """ + node = {"timestamp_key": "2009-06-15T12:00:00"} + for _ in range(depth): + node = {f"key{index:d}": node for index in range(fanout)} + return node + + def testRecurseKeySharedGraph(self): + """Tests the _RecurseKey function with a deeply shared object graph.""" + plugin = MockPlugin() + + top_level = self._CreateSharedGraph(fanout=6, depth=14) + + # Without deduplication this walk would not complete; with it the number + # of yielded values is bounded by the number of distinct objects. + result = list(plugin._RecurseKey(top_level)) + + self.assertLess(len(result), 1000) + + def testRecurseKeyCyclicGraph(self): + """Tests the _RecurseKey function with a cyclic object graph.""" + plugin = MockPlugin() + + top_level = {"name": "root"} + child = {"parent": top_level} + top_level["child"] = child + + result = list(plugin._RecurseKey(top_level)) + + number_of_name_keys = 0 + for _, key, _ in result: + if key == "name": + number_of_name_keys += 1 + self.assertEqual(number_of_name_keys, 1) + if __name__ == "__main__": unittest.main()