Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import os
import subprocess
from shutil import rmtree
from tempfile import NamedTemporaryFile, mkdtemp
from time import sleep
Expand All @@ -12,8 +13,41 @@
import requests
import yaml

from event import Event, EventType, Process
from server import FileActivityService


def get_dockerd_process() -> Process | None:
result = subprocess.run(
['pgrep', 'dockerd'],
capture_output=True,
text=True,
)
if result.returncode != 0:
return None
pid = int(result.stdout.strip().split('\n')[0])
proc = Process.from_proc(pid)
Comment on lines +20 to +29

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Harden dockerd discovery failure paths.

Line 21 and Line 29 can raise (pgrep unavailable, PID race on /proc), which aborts tests instead of gracefully returning None like the rest of this helper intends.

Suggested fix
 def get_dockerd_process() -> Process | None:
-    result = subprocess.run(
-        ['pgrep', 'dockerd'],
-        capture_output=True,
-        text=True,
-    )
+    try:
+        result = subprocess.run(
+            ['pgrep', 'dockerd'],
+            capture_output=True,
+            text=True,
+            check=False,
+            timeout=2,
+        )
+    except (OSError, subprocess.TimeoutExpired):
+        return None
     if result.returncode != 0:
         return None
-    pid = int(result.stdout.strip().split('\n')[0])
-    proc = Process.from_proc(pid)
+    lines = [line for line in result.stdout.splitlines() if line]
+    if not lines:
+        return None
+    try:
+        pid = int(lines[0])
+        proc = Process.from_proc(pid)
+    except (ValueError, FileNotFoundError, PermissionError):
+        return None
     return Process(
         pid=None,
         uid=proc.uid,
🧰 Tools
🪛 ast-grep (0.44.0)

[error] 20-24: Command coming from incoming request
Context: subprocess.run(
['pgrep', 'dockerd'],
capture_output=True,
text=True,
)
Note: [CWE-78] Improper Neutralization of Special Elements used in an OS Command ('OS Command Injection').

(subprocess-from-request)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/conftest.py` around lines 20 - 29, The get_dockerd_process() function
can raise exceptions from the subprocess.run call when pgrep is unavailable and
from the Process.from_proc() call when there is a race condition with the
process disappearing from /proc, which causes tests to abort instead of
gracefully returning None. Wrap the subprocess.run() call and the
Process.from_proc() call in try-except blocks to catch any exceptions (such as
FileNotFoundError or exceptions from Process.from_proc when the pid no longer
exists) and return None on any failure, ensuring the function consistently
returns None for all error cases.

# Process.from_proc uses os.path.realpath on /proc/<pid>/exe,
# which may not resolve across mount namespaces (e.g. CoreOS).
# Use the path from pgrep -a instead.
result = subprocess.run(
['pgrep', '-a', 'dockerd'],
capture_output=True,
text=True,
)
exe_path = result.stdout.strip().split('\n')[0].split()[1]
Comment on lines +33 to +38

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Avoid mixed process identity and brittle parsing in the second pgrep lookup.

Line 33-38 can pair exe_path from a different dockerd process than the pid chosen at Line 28, and split()[1] can raise if output is empty/unexpected. That makes the fixture flaky and can crash test setup.

Suggested fix
-    result = subprocess.run(
+    result = subprocess.run(
         ['pgrep', '-a', 'dockerd'],
         capture_output=True,
         text=True,
     )
-    exe_path = result.stdout.strip().split('\n')[0].split()[1]
+    if result.returncode != 0:
+        return None
+
+    exe_path = None
+    for line in result.stdout.splitlines():
+        parts = line.split(maxsplit=1)
+        if len(parts) != 2:
+            continue
+        if parts[0] != str(pid):
+            continue
+        cmdline = parts[1].strip()
+        if not cmdline:
+            continue
+        exe_path = cmdline.split()[0]
+        break
+    if exe_path is None:
+        return None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
result = subprocess.run(
['pgrep', '-a', 'dockerd'],
capture_output=True,
text=True,
)
exe_path = result.stdout.strip().split('\n')[0].split()[1]
result = subprocess.run(
['pgrep', '-a', 'dockerd'],
capture_output=True,
text=True,
)
if result.returncode != 0:
return None
exe_path = None
for line in result.stdout.splitlines():
parts = line.split(maxsplit=1)
if len(parts) != 2:
continue
if parts[0] != str(pid):
continue
cmdline = parts[1].strip()
if not cmdline:
continue
exe_path = cmdline.split()[0]
break
if exe_path is None:
return None
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/conftest.py` around lines 33 - 38, The second pgrep lookup on lines
33-38 can return a different dockerd process than the one identified at line 28,
and the brittle string parsing with split()[1] can raise an IndexError if the
output is empty or unexpected. Instead of running a new pgrep command that could
match any dockerd process, use the specific pid that was already determined at
line 28 to get the exe_path from that same process (such as by reading from
/proc/{pid}/exe or by incorporating the pid into the command), and add proper
error handling to gracefully handle unexpected or empty output without crashing
the test setup.

return Process(
pid=None,
uid=proc.uid,
gid=proc.gid,
exe_path=exe_path,
args=proc.args,
name=proc.name,
container_id=proc.container_id,
loginuid=proc.loginuid,
)


# Declare files holding fixtures
pytest_plugins = ['test_editors.commons']

Expand Down Expand Up @@ -185,6 +219,47 @@ def test_container(
container.remove()


@pytest.fixture
def docker_selinux_xattr(
docker_client: docker.DockerClient,
monitored_dir: str,
test_file: str,
) -> list[Event]:
"""
Expected xattr events from Docker SELinux relabeling.

When Docker creates a container with ':z' volume mounts, it
relabels files with security.selinux. This fixture returns the
expected events if Docker has SELinux enabled, or an empty list
otherwise.

Docker relabels both the file and its parent directory.
"""
info = docker_client.info()
selinux = any('selinux' in opt for opt in info.get('SecurityOptions', []))
if not selinux:
return []
dockerd = get_dockerd_process()
if dockerd is None:
return []
return [
Event(
process=dockerd,
event_type=EventType.XATTR_SET,
file='',
host_path=test_file,
xattr_name='security.selinux',
),
Event(
process=dockerd,
event_type=EventType.XATTR_SET,
file='',
host_path=monitored_dir,
xattr_name='security.selinux',
),
]


@pytest.fixture(autouse=True)
def fact(
request: pytest.FixtureRequest,
Expand Down
22 changes: 2 additions & 20 deletions tests/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ def _wait_events(
self,
events: list[Event],
strict: bool,
skip_xattr: bool,
cancel: ThreadingEvent,
):
while self.is_running() and not cancel.is_set():
Expand All @@ -103,12 +102,6 @@ def _wait_events(

print(f'Got event: {msg}')

if skip_xattr and msg.WhichOneof('file') in (
'xattr_set',
'xattr_remove',
):
continue

# Check if msg matches the next expected event
diff = events[0].diff(msg)
if diff is None:
Expand All @@ -118,12 +111,7 @@ def _wait_events(
elif strict:
raise ValueError(json.dumps(diff, indent=4))

def wait_events(
self,
events: list[Event],
strict: bool = True,
skip_xattr: bool = True,
):
def wait_events(self, events: list[Event], strict: bool = True):
"""
Continuously checks the server for incoming events until the
specified events are found.
Expand All @@ -137,13 +125,7 @@ def wait_events(
"""
print('Waiting for events:', *events, sep='\n')
cancel = ThreadingEvent()
fs = self.executor.submit(
self._wait_events,
events,
strict,
skip_xattr,
cancel,
)
fs = self.executor.submit(self._wait_events, events, strict, cancel)
try:
fs.result(timeout=5)
except TimeoutError:
Expand Down
8 changes: 6 additions & 2 deletions tests/test_file_open.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ def test_external_process(monitored_dir: str, server: FileActivityService):
def test_overlay(
test_container: docker.models.containers.Container,
server: FileActivityService,
docker_selinux_xattr: list[Event],
):
assert test_container.id is not None
# File Under Test
Expand All @@ -208,6 +209,7 @@ def test_overlay(
container_id=test_container.id[:12],
)
events = [
*docker_selinux_xattr,
Event(
process=process,
event_type=EventType.CREATION,
Expand All @@ -223,6 +225,7 @@ def test_mounted_dir(
test_container: docker.models.containers.Container,
ignored_dir: str,
server: FileActivityService,
docker_selinux_xattr: list[Event],
):
assert test_container.id is not None
# File Under Test
Expand All @@ -245,13 +248,14 @@ def test_mounted_dir(
host_path='',
)

server.wait_events([event])
server.wait_events([*docker_selinux_xattr, event])


def test_unmonitored_mounted_dir(
test_container: docker.models.containers.Container,
test_file: str,
server: FileActivityService,
docker_selinux_xattr: list[Event],
):
assert test_container.id is not None
# File Under Test
Expand All @@ -273,4 +277,4 @@ def test_unmonitored_mounted_dir(
host_path=test_file,
)

server.wait_events([event])
server.wait_events([*docker_selinux_xattr, event])
3 changes: 2 additions & 1 deletion tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def test_d_path_sanitization(
server: FileActivityService,
run_self_deleter: docker.models.containers.Container,
docker_client: docker.DockerClient,
docker_selinux_xattr: list[Event],
):
"""
Ensure the sanitization of paths obtained by calling the bpf_d_path
Expand All @@ -83,4 +84,4 @@ def test_d_path_sanitization(
host_path=host_path,
)

server.wait_events([event])
server.wait_events([*docker_selinux_xattr, event])
7 changes: 6 additions & 1 deletion tests/test_path_chmod.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def test_external_process(monitored_dir: str, server: FileActivityService):
def test_overlay(
test_container: docker.models.containers.Container,
server: FileActivityService,
docker_selinux_xattr: list[Event],
):
"""
Test permission changes on an overlayfs file (inside a container)
Expand Down Expand Up @@ -224,6 +225,7 @@ def test_overlay(
container_id=test_container.id[:12],
)
events = [
*docker_selinux_xattr,
Event(
process=touch,
event_type=EventType.CREATION,
Expand All @@ -246,6 +248,7 @@ def test_mounted_dir(
test_container: docker.models.containers.Container,
ignored_dir: str,
server: FileActivityService,
docker_selinux_xattr: list[Event],
):
"""
Test permission changes on a file bind mounted into a container
Expand Down Expand Up @@ -279,6 +282,7 @@ def test_mounted_dir(
)
# ignored_dir is not monitored, so host_path should be blank
events = [
*docker_selinux_xattr,
Event(
process=touch,
event_type=EventType.CREATION,
Expand All @@ -301,6 +305,7 @@ def test_unmonitored_mounted_dir(
test_container: docker.models.containers.Container,
test_file: str,
server: FileActivityService,
docker_selinux_xattr: list[Event],
):
"""
Test permission changes on a file bind mounted to a container and
Expand Down Expand Up @@ -335,4 +340,4 @@ def test_unmonitored_mounted_dir(
mode=int(mode, 8),
)

server.wait_events([event])
server.wait_events([*docker_selinux_xattr, event])
9 changes: 8 additions & 1 deletion tests/test_path_chown.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def test_chown(
test_container: docker.models.containers.Container,
server: FileActivityService,
filename: str | bytes,
docker_selinux_xattr: list[Event],
):
"""
Execute a chown operation on a file and verifies the corresponding event is
Expand Down Expand Up @@ -67,6 +68,7 @@ def test_chown(
container_id=test_container.id[:12],
)
events = [
*docker_selinux_xattr,
Event(
process=touch,
event_type=EventType.CREATION,
Expand All @@ -89,6 +91,7 @@ def test_chown(
def test_multiple(
test_container: docker.models.containers.Container,
server: FileActivityService,
docker_selinux_xattr: list[Event],
):
"""
Tests ownership operations on multiple files and verifies the corresponding
Expand All @@ -99,7 +102,7 @@ def test_multiple(
server: The server instance to communicate with.
"""
assert test_container.id is not None
events = []
events: list[Event] = [*docker_selinux_xattr]

# File Under Test
for i in range(3):
Expand Down Expand Up @@ -147,6 +150,7 @@ def test_multiple(
def test_ignored(
test_container: docker.models.containers.Container,
server: FileActivityService,
docker_selinux_xattr: list[Event],
):
"""
Tests that ownership events on ignored files are not captured by the
Expand Down Expand Up @@ -183,6 +187,7 @@ def test_ignored(
container_id=test_container.id[:12],
)
events = [
*docker_selinux_xattr,
Event(
process=reported_touch,
event_type=EventType.CREATION,
Expand All @@ -205,6 +210,7 @@ def test_ignored(
def test_no_change(
test_container: docker.models.containers.Container,
server: FileActivityService,
docker_selinux_xattr: list[Event],
):
"""
Tests that chown to the same UID/GID triggers events for all calls.
Expand Down Expand Up @@ -252,6 +258,7 @@ def test_no_change(

# Expect both chown events (all calls to chown trigger events)
events = [
*docker_selinux_xattr,
Event(
process=touch,
event_type=EventType.CREATION,
Expand Down
9 changes: 8 additions & 1 deletion tests/test_path_rename.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import docker.models.containers
import pytest

from event import Event, EventType, Process
from event import Event, EventType, Process, selinux_xattr
from server import FileActivityService
from utils import join_path_with_filename, path_to_string

Expand Down Expand Up @@ -315,6 +315,7 @@ def test_rename_overwrite(
def test_overlay(
test_container: docker.models.containers.Container,
server: FileActivityService,
docker_selinux_xattr: list[Event],
):
assert test_container.id is not None
# File Under Test
Expand All @@ -338,6 +339,7 @@ def test_overlay(
container_id=test_container.id[:12],
)
events = [
*docker_selinux_xattr,
Event(
process=touch,
event_type=EventType.CREATION,
Expand All @@ -361,6 +363,7 @@ def test_mounted_dir(
test_container: docker.models.containers.Container,
ignored_dir: str,
server: FileActivityService,
docker_selinux_xattr: list[Event],
):
assert test_container.id is not None
# File Under Test
Expand All @@ -385,6 +388,7 @@ def test_mounted_dir(
)
# ignored_dir is not monitored, so host_path should be blank
events = [
*docker_selinux_xattr,
Event(
process=touch,
event_type=EventType.CREATION,
Expand All @@ -408,6 +412,7 @@ def test_cross_mountpoints(
test_container: docker.models.containers.Container,
monitored_dir: str,
server: FileActivityService,
docker_selinux_xattr: list[Event],
):
"""
Attempt to rename files/directories across mountpoints
Expand Down Expand Up @@ -453,6 +458,7 @@ def test_cross_mountpoints(

server.wait_events(
[
*docker_selinux_xattr,
Event(
process=touch,
event_type=EventType.OPEN,
Expand Down Expand Up @@ -500,6 +506,7 @@ def test_cross_mountpoints(
owner_uid=owner_uid,
owner_gid=owner_gid,
),
selinux_xattr(second_rename, host_path),
Event(
process=second_rename,
event_type=EventType.PERMISSION,
Expand Down
Loading
Loading