Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions plaso/parsers/plist_plugins/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ def _GetKeys(self, top_level, keys, depth=1):

return match

def _RecurseKey(self, plist_item, depth=15, key_path=""):
def _RecurseKey(self, plist_item, depth=15, key_path="", visited_object_ids=None):
"""Flattens nested dictionaries and lists by yielding its values.

The hierarchy of a plist file is a series of nested dictionaries and lists.
Expand All @@ -814,20 +814,44 @@ def _RecurseKey(self, plist_item, depth=15, key_path=""):
depth (Optional[int]): current recursion depth. This value is used to
ensure we stop at the maximum recursion depth.
key_path (Optional[str]): path of the current working key.
visited_object_ids (Optional[set[int]]): identities (id()) of container
objects already visited, used to avoid re-walking shared or cyclic
references. Identity is used rather than equality so that distinct
objects with equal contents are both still visited.

Yields:
tuple[str, str, object]: key path, key name and value.
"""
if visited_object_ids is None:
visited_object_ids = set()

if depth < 1:
logger.debug(f"Maximum recursion depth of 15 reached for key: {key_path:s}")

elif isinstance(plist_item, (list, tuple)):
# Track container identity so a shared or cyclic reference is walked
# once per object rather than once per path. Without this a binary
# plist whose decoded object graph shares sub-objects (a DAG) expands
# to an unbounded number of visits.
object_id = id(plist_item)
if object_id in visited_object_ids:
return
visited_object_ids.add(object_id)

for sub_plist_item in plist_item:
yield from self._RecurseKey(
sub_plist_item, depth=depth - 1, key_path=key_path
sub_plist_item,
depth=depth - 1,
key_path=key_path,
visited_object_ids=visited_object_ids,
)

elif hasattr(plist_item, "items"):
object_id = id(plist_item)
if object_id in visited_object_ids:
return
visited_object_ids.add(object_id)

for subkey_name, value in plist_item.items():
yield key_path, subkey_name, value

Expand All @@ -840,7 +864,10 @@ def _RecurseKey(self, plist_item, depth=15, key_path=""):
if isinstance(sub_plist_item, dict):
subkey_path = "/".join([key_path, subkey_name])
yield from self._RecurseKey(
sub_plist_item, depth=depth - 1, key_path=subkey_path
sub_plist_item,
depth=depth - 1,
key_path=subkey_path,
visited_object_ids=visited_object_ids,
)

# pylint: disable=arguments-differ
Expand Down
Binary file added test_data/datetime_multipath.bplist
Binary file not shown.
90 changes: 90 additions & 0 deletions tests/parsers/plist_plugins/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,96 @@ def testRecurseKey(self):
expected = {"DeviceCache", "44-00-00-00-00-04", "44-00-00-00-00-02"}
self.assertTrue(expected == set(my_keys))

def testRecurseKeyShared(self):
"""Tests the _RecurseKey function with shared object references."""
plugin = MockPlugin()

# The same dictionary is referenced by multiple keys, which forms a
# directed acyclic graph rather than a tree. _RecurseKey must flatten a
# shared dictionary once, not once per path that reaches it.
shared_dict = {"shared_key": "shared_value"}
intermediate_dict = {"first": shared_dict, "second": shared_dict}
top_level = {"left": intermediate_dict, "right": intermediate_dict}

result = list(plugin._RecurseKey(top_level))

number_of_shared_keys = 0
for _, key, _ in result:
if key == "shared_key":
number_of_shared_keys += 1
self.assertEqual(number_of_shared_keys, 1)

def testRecurseKeyCyclic(self):
"""Tests the _RecurseKey function with a cyclic reference."""
plugin = MockPlugin()

# A dictionary that references itself must not cause _RecurseKey to
# recurse without bound.
top_level = {"name": "root"}
top_level["cycle"] = top_level

result = list(plugin._RecurseKey(top_level))

number_of_name_keys = 0
for _, key, _ in result:
if key == "name":
number_of_name_keys += 1
self.assertEqual(number_of_name_keys, 1)


class TestPlistPluginRecursionLimits(test_lib.PlistPluginTestCase):
"""Tests the plist plugin interface with pathological object graphs."""

# pylint: disable=protected-access

def _CreateSharedGraph(self, fanout, depth):
"""Creates a plist object graph that shares sub-objects.

Each level is a dictionary whose keys all reference the same next-level
dictionary, so a small number of objects is reachable by fanout**depth
distinct paths. plistlib produces this kind of graph when a binary plist
stores multiple references to the same object.

Args:
fanout (int): number of references to the shared sub-object per level.
depth (int): number of nested levels.

Returns:
dict[str, object]: top level object of the graph.
"""
node = {"timestamp_key": "2009-06-15T12:00:00"}
for _ in range(depth):
node = {f"key{index:d}": node for index in range(fanout)}
return node

def testRecurseKeySharedGraph(self):
"""Tests the _RecurseKey function with a deeply shared object graph."""
plugin = MockPlugin()

top_level = self._CreateSharedGraph(fanout=6, depth=14)

# Without deduplication this walk would not complete; with it the number
# of yielded values is bounded by the number of distinct objects.
result = list(plugin._RecurseKey(top_level))

self.assertLess(len(result), 1000)

def testRecurseKeyCyclicGraph(self):
"""Tests the _RecurseKey function with a cyclic object graph."""
plugin = MockPlugin()

top_level = {"name": "root"}
child = {"parent": top_level}
top_level["child"] = child

result = list(plugin._RecurseKey(top_level))

number_of_name_keys = 0
for _, key, _ in result:
if key == "name":
number_of_name_keys += 1
self.assertEqual(number_of_name_keys, 1)


if __name__ == "__main__":
unittest.main()