Skip to content

Commit ca58aaa

Browse files
authored
Bugfix: Always initialize Package.args in Linux Analyzer Package (kevoreilly#2824)
* Bugfix: Always initialize Package.args in Linux Analyzer Package Fixes a bug introduced in d9be043 that prevents Package.args from being set if a string was not passed into the object during initialization. Package.args will now always be initialized with an empty list if no arguments are provided. This behavior aligns with the Windows analyzer. * Adds some basic Linux analyzer unit tests * Use poetry to run Linux analyzer unit tests in CI * Make analyzer.linux.lib.core.packages.Package.__init__._args more concise * Remove placeholder test * Increase pytest verbosity for Linux analyzer tests * Add additional test cases for Linux Analyzer package arguments * Remove unused import in Linux analyzer * Fix Linux analyzer test case * Fix Linux analyzer skip logic * Add exception handling to Process.get_parent_pid in Linux analyzer * Fix test_proc_dead_states test case
1 parent 73cc62a commit ca58aaa

File tree

9 files changed

+276
-20
lines changed

9 files changed

+276
-20
lines changed

.github/workflows/python-package.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ jobs:
4848
- name: Run unit tests
4949
run: poetry run python -m pytest --import-mode=append
5050

51+
- name: Run Linux analyzer unit tests
52+
run: poetry --project . --directory "analyzer/linux" run python -m pytest -v
53+
5154
# see the mypy configuration in pyproject.toml
5255
- name: Run mypy
5356
run: poetry run mypy

analyzer/linux/lib/api/process.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ def is_alive(self):
3030
return True
3131

3232
def get_parent_pid(self):
33-
return int(self.get_proc_status().get("PPid"))
33+
try:
34+
return int(self.get_proc_status().get("PPid"))
35+
except (TypeError, ValueError):
36+
return None
3437

3538
def get_proc_status(self):
3639
try:

analyzer/linux/lib/core/packages.py

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -56,22 +56,25 @@ def _found_target_class(module, name):
5656

5757

5858
def _guess_package_name(file_type, file_name):
59-
if "Bourne-Again" in file_type or "bash" in file_type:
60-
return "bash"
61-
elif "Zip archive" in file_type:
62-
return "zip"
63-
elif "gzip compressed data" in file_type:
64-
return "zip"
65-
elif "PDF document" in file_type or file_name.endswith(".pdf"):
66-
return "pdf"
67-
elif "Composite Document File V2 Document" in file_type or file_name.endswith(".doc"):
68-
return "doc"
69-
elif "Microsoft Word" in file_type or file_name.endswith(".docx"):
70-
return "doc"
71-
elif "ELF" in file_type:
72-
return "generic"
73-
elif "Unicode text" in file_type or file_name.endswith(".js"):
74-
return "js"
59+
try:
60+
if "Bourne-Again" in file_type or "bash" in file_type:
61+
return "bash"
62+
elif "Zip archive" in file_type:
63+
return "zip"
64+
elif "gzip compressed data" in file_type:
65+
return "zip"
66+
elif "PDF document" in file_type or file_name.endswith(".pdf"):
67+
return "pdf"
68+
elif "Composite Document File V2 Document" in file_type or file_name.endswith(".doc"):
69+
return "doc"
70+
elif "Microsoft Word" in file_type or file_name.endswith(".docx"):
71+
return "doc"
72+
elif "ELF" in file_type:
73+
return "generic"
74+
elif "Unicode text" in file_type or file_name.endswith(".js"):
75+
return "js"
76+
except (TypeError, AttributeError):
77+
pass
7578
return None
7679

7780

@@ -101,9 +104,16 @@ def __init__(self, target, **kwargs):
101104
self.timeout = kwargs.get("timeout")
102105
# Command-line arguments for the target.
103106

104-
_args = self.options.get("arguments", [])
105-
if isinstance(_args, str):
106-
self.args = _args.split()
107+
def _args():
108+
args = self.options.get("arguments")
109+
if isinstance(args, list):
110+
return args
111+
if isinstance(args, str):
112+
return args.split()
113+
return []
114+
115+
self.args = _args()
116+
107117
# Choose an analysis method (or fallback to apicalls)
108118
self.method = self.options.get("method", "apicalls")
109119
# Should our target be launched as root or not

analyzer/linux/pytest.ini

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[pytest]
2+
pythonpath = .
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
from unittest.mock import Mock, patch, mock_open, PropertyMock
2+
from lib.api.process import Process
3+
import base64
4+
5+
import pytest
6+
7+
proc_status = """Name: test-process
8+
Umask: 0002
9+
State: R (running)
10+
Tgid: 42
11+
Ngid: 0
12+
Pid: 42
13+
PPid: 24"""
14+
15+
@pytest.fixture
16+
def os_path_exists(monkeypatch):
17+
monkeypatch.setattr("os.path.exists", Mock(return_value=True))
18+
yield
19+
20+
@pytest.fixture
21+
def os_path_not_exists(monkeypatch):
22+
monkeypatch.setattr("os.path.exists", Mock(return_value=False))
23+
yield
24+
25+
@pytest.fixture
26+
def fake_proc_status_file(monkeypatch):
27+
monkeypatch.setattr("builtins.open", mock_open(read_data=proc_status))
28+
yield
29+
30+
ARGS = {
31+
"pid": 42
32+
}
33+
34+
def test_init():
35+
"""Initialize Process instances using both args and kwargs"""
36+
kw_args_instance = Process(**ARGS)
37+
assert kw_args_instance.pid == ARGS["pid"]
38+
args_instance = Process(*ARGS.values())
39+
assert args_instance.pid == ARGS["pid"]
40+
41+
@pytest.mark.usefixtures("os_path_exists")
42+
def test_proc_alive_states():
43+
for state in [
44+
"State: R (running)",
45+
"State: S (sleeping)",
46+
"State: D (waiting)",
47+
"State: T (stopped)",
48+
"State: t (trace stopped)",
49+
"State: W (paging)",
50+
"State: W (waking)",
51+
"State: P (parked)",
52+
]:
53+
state_file_content = proc_status.replace("State: R (running)", state)
54+
with patch("builtins.open", mock_open(read_data=state_file_content)):
55+
process = Process(**ARGS)
56+
assert process.is_alive()
57+
58+
@pytest.mark.usefixtures("os_path_exists")
59+
def test_proc_dead_states():
60+
for state in [
61+
"State: Z (zombie)",
62+
]:
63+
state_file_content = proc_status.replace("State: R (running)", state)
64+
with patch("builtins.open", mock_open(read_data=state_file_content)):
65+
process = Process(**ARGS)
66+
alive = process.is_alive()
67+
assert not alive
68+
69+
@pytest.mark.usefixtures("os_path_not_exists")
70+
def test_proc_file_not_exists():
71+
process = Process(**ARGS)
72+
assert not process.is_alive()
73+
74+
@pytest.mark.usefixtures("os_path_exists")
75+
def test_proc_file_corrupt():
76+
corrupt_status = base64.b64encode(proc_status.encode("utf-8")).decode("utf-8")
77+
with patch("builtins.open", mock_open(read_data=corrupt_status)):
78+
process = Process(**ARGS)
79+
assert not process.is_alive()
80+
81+
@pytest.mark.usefixtures("os_path_exists", "fake_proc_status_file")
82+
def test_get_ppid():
83+
process = Process(**ARGS)
84+
assert 24 == process.get_parent_pid()
85+
86+
@patch("builtins.open", side_effect=FileNotFoundError)
87+
def test_get_ppid_file_not_exists(bopen):
88+
process = Process(**ARGS)
89+
assert process.get_parent_pid() is None
90+
91+
@patch("subprocess.Popen")
92+
def test_execute(popen):
93+
process = Process(**ARGS)
94+
type(popen.return_value).pid = PropertyMock(return_value=ARGS["pid"])
95+
assert process.execute(["echo", "this is a test message"])
96+
assert ARGS["pid"] == process.pid
97+
assert popen.called
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from lib.core.packages import _guess_package_name
2+
import pytest
3+
4+
@pytest.mark.parametrize("file_type, file_name, expected_package_name", [
5+
("", "", None),
6+
("Bourne-Again", None, "bash"),
7+
("Zip archive", None, "zip"),
8+
("gzip compressed data", None, "zip"),
9+
("PDF document", "test.pdf", "pdf"),
10+
("Composite Document File V2 Document", "test.docx", "doc"),
11+
("Microsoft Word", "test.docx", "doc"),
12+
("ELF", None, "generic"),
13+
("Unicode text", "malware.js", "js")
14+
])
15+
def test__guess_package_name(file_type, file_name, expected_package_name):
16+
assert _guess_package_name(file_type, "") == expected_package_name, f"Expected {expected_package_name} for {file_type}, {file_name}"
17+
if file_name:
18+
assert _guess_package_name("", file_name) == expected_package_name, f"Expected {expected_package_name} for {file_type}, {file_name}"
19+
assert _guess_package_name(file_type, file_name) == expected_package_name, f"Expected {expected_package_name} for {file_type}, {file_name}"
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from unittest.mock import patch, Mock
2+
import logging
3+
from logging import StreamHandler
4+
import pytest
5+
6+
from lib.common.results import NetlogHandler
7+
import lib.core.startup
8+
import lib.core.config
9+
10+
import lib
11+
12+
@pytest.fixture
13+
def patch_netloghandler(monkeypatch):
14+
monkeypatch.setattr(NetlogHandler, "__init__", Mock(return_value=None))
15+
monkeypatch.setattr(NetlogHandler, "connect", Mock())
16+
yield
17+
18+
@patch('os.makedirs')
19+
@patch('os.path.exists')
20+
def test_create_folders_path_not_exists(os_path_exists, os_mkdirs):
21+
"""Test initial folder creation with paths that do not exist"""
22+
# Fake path not existing
23+
os_path_exists.return_value = False
24+
lib.core.startup.create_folders()
25+
assert os_path_exists.called
26+
# Ensure there is an attempt to create a folder
27+
assert os_mkdirs.called
28+
29+
@patch('os.makedirs')
30+
@patch('os.path.exists')
31+
def test_create_folders_path_exists(os_path_exists, os_mkdirs):
32+
"""Test initial folder creation with paths that already exist"""
33+
# Fake path not existing
34+
os_path_exists.return_value = True
35+
lib.core.startup.create_folders()
36+
assert os_path_exists.called
37+
# Ensure there are no attempts to create a folder
38+
assert not os_mkdirs.called
39+
40+
@pytest.mark.usefixtures("patch_netloghandler")
41+
@patch("logging.Logger.addHandler")
42+
def test_init_logging(addhandler):
43+
"""Ensure init_logging adds the right log handlers"""
44+
lib.core.startup.init_logging()
45+
handlers = []
46+
# Get a list of all the types of handlers that are being added
47+
for name, args, kwargs in addhandler.mock_calls:
48+
handlers = [*handlers, *[type(arg) for arg in args]]
49+
# Ensure there is a StreamHandler and a NetlogHandler
50+
assert StreamHandler in handlers
51+
assert NetlogHandler in handlers
52+
# Ensure log level is set to DEBUG
53+
assert lib.core.startup.log.level == logging.DEBUG
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import sys
2+
import unittest
3+
import pytest
4+
5+
import lib
6+
7+
from lib.core.packages import Package
8+
9+
@pytest.fixture
10+
def patch_netlogfile(monkeypatch):
11+
class MockNetlogFile:
12+
def init(self, *args):
13+
return
14+
def close(self):
15+
return
16+
monkeypatch.setattr(lib.core.packages, "NetlogFile", MockNetlogFile)
17+
monkeypatch.setattr(lib.core.packages, "append_buffer_to_host", lambda *args: None)
18+
yield
19+
20+
class TestPackage(unittest.TestCase):
21+
22+
@pytest.mark.usefixtures("patch_netlogfile")
23+
def test_package_init_args(self):
24+
pkg = Package(sys.executable, options={})
25+
self.assertEqual(pkg.args, [])
26+
27+
@pytest.mark.usefixtures("patch_netlogfile")
28+
def test_package_init_args_list(self):
29+
pkg = Package(sys.executable, options={"arguments": ["foo", "bar"]})
30+
self.assertEqual(pkg.args, ["foo", "bar"])
31+
32+
@pytest.mark.usefixtures("patch_netlogfile")
33+
def test_package_init_args_str(self):
34+
pkg = Package(sys.executable, options={"arguments": "foo bar"})
35+
self.assertEqual(pkg.args, ["foo", "bar"])
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import sys
2+
import unittest
3+
4+
import pytest
5+
6+
if sys.platform == "linux":
7+
import analyzer
8+
from analyzer import PROCESS_LIST, SEEN_LIST
9+
10+
@pytest.mark.skipif(sys.platform != "linux", reason="Requires Linux")
11+
class TestAnalyzer(unittest.TestCase):
12+
13+
def test_add_pids(self):
14+
"""Test add_pids with a variety of valid types"""
15+
# Check that both sets are empty
16+
self.assertEqual(PROCESS_LIST, set())
17+
self.assertEqual(SEEN_LIST, set())
18+
19+
pids = [123, 456, 789]
20+
# Add a list of PIDs
21+
analyzer.add_pids([str(pids[0]), pids[1]])
22+
# Add a set of PIDs
23+
analyzer.add_pids(set([pids[0], pids[2]]))
24+
# Add a tuple of PIDs
25+
analyzer.add_pids((pids[1], pids[2]))
26+
27+
self.assertEqual(PROCESS_LIST, set(pids))
28+
self.assertEqual(SEEN_LIST, set(pids))
29+
30+
31+
def test_add_pids_invalid_var(self):
32+
"""Test add_pids with an invalid type"""
33+
with self.assertRaises(TypeError):
34+
analyzer.add_pids(analyzer.add_pids)

0 commit comments

Comments
 (0)