Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/linux_mcp_server/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ class CommandGroup(BaseModel):
"stats": CommandSpec(args=("cat", "/proc/net/dev")),
}
),
"network_routes": CommandGroup(
commands={
"default": CommandSpec(args=("ip", "-json", "route")),
}
),
# === Logs ===
"journal_logs": CommandGroup(
commands={
Expand Down
15 changes: 15 additions & 0 deletions src/linux_mcp_server/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path

from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field
from pydantic import field_serializer
from pydantic import model_validator
Expand Down Expand Up @@ -43,6 +44,20 @@ class ListeningPort(BaseModel):
process: str = ""


class Route(BaseModel):
"""Parsed route entry from ip -json route output."""

model_config = ConfigDict(populate_by_name=True)

destination: str = Field(validation_alias="dst")
gateway: str = ""
device: str = Field(default="", validation_alias="dev")
protocol: str = ""
scope: str = ""
source: str = Field(default="", validation_alias="prefsrc")
metric: int | None = None


class NetworkInterface(BaseModel):
"""Parsed network interface information."""

Expand Down
2 changes: 2 additions & 0 deletions src/linux_mcp_server/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from linux_mcp_server.tools.network import get_listening_ports
from linux_mcp_server.tools.network import get_network_connections
from linux_mcp_server.tools.network import get_network_interfaces
from linux_mcp_server.tools.network import get_network_routes

# processes
from linux_mcp_server.tools.processes import get_process_info
Expand Down Expand Up @@ -49,6 +50,7 @@
"get_memory_information",
"get_network_connections",
"get_network_interfaces",
"get_network_routes",
"get_process_info",
"get_service_logs",
"get_service_status",
Expand Down
29 changes: 29 additions & 0 deletions src/linux_mcp_server/tools/network.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
"""Network diagnostic tools."""

import json

from fastmcp.exceptions import ToolError
from mcp.types import ToolAnnotations

from linux_mcp_server.audit import log_tool_call
from linux_mcp_server.commands import get_command
from linux_mcp_server.formatters import format_listening_ports
from linux_mcp_server.formatters import format_network_connections
from linux_mcp_server.formatters import format_network_interfaces
from linux_mcp_server.models import Route
from linux_mcp_server.parsers import parse_ip_brief
from linux_mcp_server.parsers import parse_proc_net_dev
from linux_mcp_server.parsers import parse_ss_connections
Expand Down Expand Up @@ -103,3 +107,28 @@ async def get_listening_ports(
ports = parse_ss_listening(stdout)
return format_listening_ports(ports)
return f"Error getting listening ports: return code {returncode}, stderr: {stderr}"


@mcp.tool(
title="Get network routes",
description="Get the system routing table showing network routes, gateways, and interfaces.",
tags={"fixed", "connectivity", "network", "routing"},
annotations=ToolAnnotations(readOnlyHint=True),
)
@log_tool_call
@disallow_local_execution_in_containers
async def get_network_routes(
host: Host = None,
) -> list[Route]:
"""Get network routing table.

Retrieves the system routing table including destination networks,
gateways, devices, protocols, scopes, source addresses, and metrics.
"""
cmd = get_command("network_routes")

returncode, stdout, stderr = await cmd.run(host=host)

if is_successful_output(returncode, stdout):
return [Route.model_validate(entry) for entry in json.loads(stdout)]
raise ToolError(f"Error getting network routes: return code {returncode}, stderr: {stderr}")
1 change: 1 addition & 0 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"get_memory_information",
"get_network_connections",
"get_network_interfaces",
"get_network_routes",
"get_process_info",
"get_service_logs",
"get_service_status",
Expand Down
88 changes: 88 additions & 0 deletions tests/tools/test_network.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for network diagnostic tools."""

import json
import re

import pytest
Expand Down Expand Up @@ -211,3 +212,90 @@ async def test_get_listening_ports_error(self, mcp_client, mock_execute):
match = re.compile(r"error calling tool.*raised intentionally", flags=re.I)
with pytest.raises(ToolError, match=match):
await mcp_client.call_tool("get_listening_ports")


class TestGetNetworkRoutes:
"""Test get_network_routes function."""

@pytest.mark.parametrize(
("host", "mock_output", "expected_content"),
[
pytest.param(
None,
json.dumps(
[
{
"dst": "default",
"gateway": "192.168.1.1",
"dev": "eth0",
"protocol": "dhcp",
"prefsrc": "192.168.1.100",
"metric": 100,
},
{
"dst": "192.168.1.0/24",
"dev": "eth0",
"protocol": "kernel",
"scope": "link",
"prefsrc": "192.168.1.100",
},
{
"dst": "172.17.0.0/16",
"dev": "docker0",
"protocol": "kernel",
"scope": "link",
"prefsrc": "172.17.0.1",
},
]
),
["192.168.1.1", "eth0", "192.168.1.100"],
id="local",
),
pytest.param(
"remote.host",
json.dumps(
[
{"dst": "default", "gateway": "10.0.0.1", "dev": "ens5", "protocol": "dhcp", "metric": 100},
{
"dst": "10.0.0.0/24",
"dev": "ens5",
"protocol": "kernel",
"scope": "link",
"prefsrc": "10.0.0.5",
},
]
),
["10.0.0.1", "ens5"],
id="remote",
),
],
)
async def test_get_network_routes_success(self, mcp_client, mock_execute, host, mock_output, expected_content):
"""Test getting network routes with success."""
mock_execute.return_value = (0, mock_output, "")
result = await mcp_client.call_tool("get_network_routes", arguments={"host": host})
result_text = result.content[0].text.casefold()

assert all(content.casefold() in result_text for content in expected_content), (
"Did not find all expected values"
)

@pytest.mark.parametrize(
("return_value",),
[
pytest.param((1, "", "Command not found"), id="command_fails"),
pytest.param((0, "", ""), id="empty_output"),
],
)
async def test_get_network_routes_failure(self, mcp_client, mock_execute, return_value):
"""Test getting network routes when command fails or returns empty."""
mock_execute.return_value = return_value
with pytest.raises(ToolError, match="Error getting network routes"):
await mcp_client.call_tool("get_network_routes")

async def test_get_network_routes_error(self, mcp_client, mock_execute):
"""Test getting network routes with general error."""
mock_execute.side_effect = ValueError("Raised intentionally")
match = re.compile(r"error calling tool.*raised intentionally", flags=re.I)
with pytest.raises(ToolError, match=match):
await mcp_client.call_tool("get_network_routes")
Loading