Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/guarded-command-execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,28 @@ The scores give an approximate sense of the capability of the model acting as a
actual performance in real-world situations may vary.
Smaller models than those listed above are *not recommended*.

## Session-Based Behavior Tracking

In addition to the per-script gatekeeper check,
linux-mcp-server tracks gatekeeper verdicts across each session
using a `BehaviorRecord`.
This detects patterns of malicious activity
that might not be obvious from any single script.

Each session maintains counters for consecutive and total malicious actions.
A session is escalated to require confirmation for **all** subsequent scripts if either threshold is met:

* **3** consecutive scripts flagged as `MALICIOUS` by the gatekeeper.
* **4** total scripts flagged as `MALICIOUS` across the session.

Once a session is permanently flagged, the flag cannot be cleared.

A single `MALICIOUS` verdict triggers a **temporary warning**
that forces the next script to require human confirmation,
even if it would otherwise run without approval.
The temporary warning is cleared once the human approves execution.
A non-malicious verdict resets the consecutive counter but not the total counter.

## Human In The Loop

To provide a better experience for the human approving the call:
Expand Down
4 changes: 4 additions & 0 deletions mcp-app/src/global.css
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@
.execution-state-tag {
@apply border border-app-border-primary rounded-md w-fit px-2 text-sm
}

.security-warning {
@apply bg-red-200 text-red-600 rounded-lg p-4 mb-4
}
}

* {
Expand Down
9 changes: 9 additions & 0 deletions mcp-app/src/run-script-app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,15 @@ function RunScriptAppInner({
<div className="app-container">
<div className="script-main-box">
<div className="mb-4">
{validatedToolResult.maliciousActivityWarning && (
<p className="security-warning">
Suspicious activity detected - your chat client may be under
attack. Please examine previous tool calls in detail, and if you
have any doubts, do not approve this command and terminate this
chat session.
</p>
)}

{/* TODO: we can dynamically inject the platform that users are using here */}
<p>
Goose wants to perform the following action on{" "}
Expand Down
34 changes: 21 additions & 13 deletions mcp-app/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,27 @@ export const ExecuteScriptResultSchema = z.object({

export type ExecuteScriptResult = z.infer<typeof ExecuteScriptResultSchema>;

export const McpAppToolResultSchema = z.object({
status: z.enum([
"OK",
"BAD_DESCRIPTION",
"POLICY",
"MODIFIES_SYSTEM",
"UNCLEAR",
"DANGEROUS",
"MALICIOUS",
]),
detail: z.string(),
id: z.string(),
});
export const McpAppToolResultSchema = z
.object({
status: z.enum([
"OK",
"BAD_DESCRIPTION",
"POLICY",
"MODIFIES_SYSTEM",
"UNCLEAR",
"DANGEROUS",
"MALICIOUS",
]),
detail: z.string(),
id: z.string(),
malicious_activity_warning: z.boolean(),
})
.transform((data) => ({
status: data.status,
detail: data.detail,
id: data.id,
maliciousActivityWarning: data.malicious_activity_warning,
}));

export type McpAppToolResult = z.infer<typeof McpAppToolResultSchema>;

Expand Down
120 changes: 114 additions & 6 deletions src/linux_mcp_server/tools/run_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,84 @@ def set_script_state(self, id: str, new_state: ExecutionState):


script_store = ScriptStore()
MAX_CONSECUTIVE_MALICIOUS_ACTIONS = 3
MAX_TOTAL_MALICIOUS_ACTIONS = 4


class BehaviorRecord:
"""
Tracks gatekeeper verdicts for a single session to detect malicious behavior.

Each MALICIOUS verdict sets a temporary warning that forces the next script to
require human confirmation. The temporary warning is cleared once a human approves
execution. Non-malicious verdicts reset the consecutive counter but not the total.

A session is permanently flagged if it accumulates MAX_TOTAL_MALICIOUS_ACTIONS
total malicious actions or MAX_CONSECUTIVE_MALICIOUS_ACTIONS consecutive malicious
actions. Once permanently flagged, all subsequent scripts require confirmation and
the flag cannot be cleared.
"""

def __init__(self):
self._consecutive_malicious_action_counts = 0
self._total_malicious_action_counts = 0
self._has_temporary_warning = False
self._is_malicious = False

def add_record(self, status: GatekeeperStatus):
"""
Record a gatekeeper verdict and update tracking state.

A MALICIOUS verdict increments both the consecutive and total counters and
sets a temporary warning. Any other verdict resets the consecutive counter.
If either threshold is met, the session is permanently flagged. No-ops if
the session is already permanently flagged.
"""
# No need to update the record if it's already considered security compromised
if self._is_malicious:
return

if status == GatekeeperStatus.MALICIOUS:
self._consecutive_malicious_action_counts += 1
self._total_malicious_action_counts += 1
self._has_temporary_warning = True
else:
self._consecutive_malicious_action_counts = 0

# Check if the record matches the conditions of being considered as malicious
if self._total_malicious_action_counts >= MAX_TOTAL_MALICIOUS_ACTIONS:
self._is_malicious = True
return

if self._consecutive_malicious_action_counts >= MAX_CONSECUTIVE_MALICIOUS_ACTIONS:
self._is_malicious = True

def remove_temporary_warning(self):
"""Clear the temporary warning flag, typically after a human approves execution."""
self._has_temporary_warning = False

@property
def malicious_activity_warning(self) -> bool:
"""Return True if the session is permanently flagged or has a pending temporary warning."""
return self._is_malicious or self._has_temporary_warning


class BehaviorRecordManager:
"""Manages per-session BehaviorRecords, creating them on first access."""

def __init__(self):
self._records: dict[str, BehaviorRecord] = dict()

def get_record_by_session_id(self, session_id: str) -> BehaviorRecord:
"""Return the BehaviorRecord for a session, creating one if it doesn't exist."""
if session_id not in self._records:
self._records[session_id] = BehaviorRecord()

return self._records[session_id]


behavior_record_manager = BehaviorRecordManager()

BASH_STRICT_PREAMBLE = "set -euo pipefail; "

SYSTEMD_RUN_ARGS = [
Expand Down Expand Up @@ -209,6 +285,7 @@ class RunScriptInteractiveResult(BaseModel):
id: str
status: GatekeeperStatus
detail: str
malicious_activity_warning: bool


# class UserInfo(BaseModel):
Expand Down Expand Up @@ -241,13 +318,18 @@ class ExecuteScriptResult:
@log_tool_call
@disallow_local_execution_in_containers
async def execute_script(
ctx: Context,
id: t.Annotated[str, Field(description="The associated ID of the script to be executed")],
) -> ToolResult:
script_details = script_store.get_script_details(id)
command = _wrap_script(script_details.script_type, script_details.script)
script_store.set_script_state(id, "executing")
content: list[ContentBlock] = []

# Clear temporary warning flag by human approval
behavior_record = behavior_record_manager.get_record_by_session_id(ctx.session_id)
behavior_record.remove_temporary_warning()

try:
returncode, stdout, stderr = await execute_command(command, host=script_details.host)
except Exception:
Expand Down Expand Up @@ -314,9 +396,12 @@ async def run_script_interactive(
host: Host = None,
) -> ToolResult:
script_details = script_store.get_script_details(token)
behavior_record = behavior_record_manager.get_record_by_session_id(ctx.session_id)

needs_confirmation = script_details.needs_confirmation or behavior_record.malicious_activity_warning

# Verify that this script requires confirmation
if not script_details.needs_confirmation:
if not needs_confirmation:
raise ToolError("This script does not require confirmation. Use run_script instead of run_script_interactive.")

# Check if the passed parameters match the stored script details
Expand All @@ -338,6 +423,8 @@ async def run_script_interactive(
(BASH_STRICT_PREAMBLE + script) if script_type == SCRIPT_TYPE_BASH else script,
readonly=readonly,
)
behavior_record.add_record(gatekeeper_result.status)

if gatekeeper_result.status != GatekeeperStatus.OK:
script_store.set_script_state(token, "rejected-gatekeeper")
raise ToolError(gatekeeper_result.description)
Expand All @@ -352,7 +439,12 @@ async def run_script_interactive(
)
]

structured_content_obj = RunScriptInteractiveResult(id=result_id, status=GatekeeperStatus.OK, detail="")
structured_content_obj = RunScriptInteractiveResult(
id=result_id,
status=GatekeeperStatus.OK,
detail="",
malicious_activity_warning=behavior_record.malicious_activity_warning,
)

return ToolResult(content=content, structured_content=structured_content_obj.model_dump())

Expand Down Expand Up @@ -413,23 +505,28 @@ async def validate_script(
readonly=readonly,
)

behavior_record = behavior_record_manager.get_record_by_session_id(ctx.session_id)
behavior_record.add_record(gatekeeper_result.status)

id = script_store.add_script(description, script, script_type, host, readonly)
script_details = script_store.get_script_details(id)

if gatekeeper_result.status != GatekeeperStatus.OK:
script_store.set_script_state(id, "rejected-gatekeeper")
raise ToolError(gatekeeper_result.description)

needs_confirmation = script_details.needs_confirmation or behavior_record.malicious_activity_warning

result = ToolResult(
content=[
TextContent(
type="text",
text=f"Script passed gatekeeper validation and is stored with ID {id}. Please use {_pick_execution_tool(script_details.needs_confirmation)} to execute the validated script.",
text=f"Script passed gatekeeper validation and is stored with ID {id}. Please use {_pick_execution_tool(needs_confirmation)} to execute the validated script.",
)
],
structured_content={
"token": id,
"needs_confirmation": script_details.needs_confirmation,
"needs_confirmation": needs_confirmation,
},
)
return result
Expand All @@ -448,9 +545,12 @@ async def run_script(
token: t.Annotated[str, Field(description="The token returned by the validate_script tool.")],
) -> str:
script_details = script_store.get_script_details(token)
behavior_record = behavior_record_manager.get_record_by_session_id(ctx.session_id)

needs_confirmation = script_details.needs_confirmation or behavior_record.malicious_activity_warning

# Verify that this script doesn't require confirmation
if script_details.needs_confirmation:
if needs_confirmation:
raise ToolError(f"This script requires confirmation. Use {_pick_execution_tool(True)} instead of run_script.")

script_store.set_script_state(token, "executing")
Expand Down Expand Up @@ -498,13 +598,19 @@ async def run_script_with_confirmation(
host: Host = None,
) -> str:
script_details = script_store.get_script_details(token)
behavior_record = behavior_record_manager.get_record_by_session_id(ctx.session_id)

needs_confirmation = script_details.needs_confirmation or behavior_record.malicious_activity_warning

# Verify that this script requires confirmation
if not script_details.needs_confirmation:
if not needs_confirmation:
raise ToolError(
"This script does not require confirmation. Use run_script instead of run_script_with_confirmation."
)

# Clear temporary warning flag by human approval
behavior_record.remove_temporary_warning()

# Verify the retrieved script details match the incoming parameters
new_details = ScriptDetails(
state="waiting-approval",
Expand All @@ -527,6 +633,8 @@ async def run_script_with_confirmation(
(BASH_STRICT_PREAMBLE + script) if script_type == SCRIPT_TYPE_BASH else script,
readonly=readonly,
)
behavior_record.add_record(gatekeeper_result.status)

if gatekeeper_result.status != GatekeeperStatus.OK:
script_store.set_script_state(token, "rejected-gatekeeper")
raise ToolError(gatekeeper_result.description)
Expand Down