+ {validatedToolResult.maliciousActivityWarning && (
+
+ Suspicious activity detected - your chat client may be under
+ attack. Please examine previous tool calls in detail, and if you
+ have any doubts, do not approve this command and terminate this
+ chat session.
+
+ )}
+
{/* TODO: we can dynamically inject the platform that users are using here */}
Goose wants to perform the following action on{" "}
diff --git a/mcp-app/src/types.ts b/mcp-app/src/types.ts
index 5ff4ee1c..9c14fb60 100644
--- a/mcp-app/src/types.ts
+++ b/mcp-app/src/types.ts
@@ -33,19 +33,27 @@ export const ExecuteScriptResultSchema = z.object({
export type ExecuteScriptResult = z.infer;
-export const McpAppToolResultSchema = z.object({
- status: z.enum([
- "OK",
- "BAD_DESCRIPTION",
- "POLICY",
- "MODIFIES_SYSTEM",
- "UNCLEAR",
- "DANGEROUS",
- "MALICIOUS",
- ]),
- detail: z.string(),
- id: z.string(),
-});
+export const McpAppToolResultSchema = z
+ .object({
+ status: z.enum([
+ "OK",
+ "BAD_DESCRIPTION",
+ "POLICY",
+ "MODIFIES_SYSTEM",
+ "UNCLEAR",
+ "DANGEROUS",
+ "MALICIOUS",
+ ]),
+ detail: z.string(),
+ id: z.string(),
+ malicious_activity_warning: z.boolean(),
+ })
+ .transform((data) => ({
+ status: data.status,
+ detail: data.detail,
+ id: data.id,
+ maliciousActivityWarning: data.malicious_activity_warning,
+ }));
export type McpAppToolResult = z.infer;
diff --git a/src/linux_mcp_server/tools/run_script.py b/src/linux_mcp_server/tools/run_script.py
index dc2e660b..c8e9149f 100644
--- a/src/linux_mcp_server/tools/run_script.py
+++ b/src/linux_mcp_server/tools/run_script.py
@@ -154,8 +154,84 @@ def set_script_state(self, id: str, new_state: ExecutionState):
script_store = ScriptStore()
+MAX_CONSECUTIVE_MALICIOUS_ACTIONS = 3
+MAX_TOTAL_MALICIOUS_ACTIONS = 4
+class BehaviorRecord:
+ """
+ Tracks gatekeeper verdicts for a single session to detect malicious behavior.
+
+ Each MALICIOUS verdict sets a temporary warning that forces the next script to
+ require human confirmation. The temporary warning is cleared once a human approves
+ execution. Non-malicious verdicts reset the consecutive counter but not the total.
+
+ A session is permanently flagged if it accumulates MAX_TOTAL_MALICIOUS_ACTIONS
+ total malicious actions or MAX_CONSECUTIVE_MALICIOUS_ACTIONS consecutive malicious
+ actions. Once permanently flagged, all subsequent scripts require confirmation and
+ the flag cannot be cleared.
+ """
+
+ def __init__(self):
+ self._consecutive_malicious_action_counts = 0
+ self._total_malicious_action_counts = 0
+ self._has_temporary_warning = False
+ self._is_malicious = False
+
+ def add_record(self, status: GatekeeperStatus):
+ """
+ Record a gatekeeper verdict and update tracking state.
+
+ A MALICIOUS verdict increments both the consecutive and total counters and
+ sets a temporary warning. Any other verdict resets the consecutive counter.
+ If either threshold is met, the session is permanently flagged. No-ops if
+ the session is already permanently flagged.
+ """
+ # No need to update the record if it's already considered security compromised
+ if self._is_malicious:
+ return
+
+ if status == GatekeeperStatus.MALICIOUS:
+ self._consecutive_malicious_action_counts += 1
+ self._total_malicious_action_counts += 1
+ self._has_temporary_warning = True
+ else:
+ self._consecutive_malicious_action_counts = 0
+
+ # Check if the record matches the conditions of being considered as malicious
+ if self._total_malicious_action_counts >= MAX_TOTAL_MALICIOUS_ACTIONS:
+ self._is_malicious = True
+ return
+
+ if self._consecutive_malicious_action_counts >= MAX_CONSECUTIVE_MALICIOUS_ACTIONS:
+ self._is_malicious = True
+
+ def remove_temporary_warning(self):
+ """Clear the temporary warning flag, typically after a human approves execution."""
+ self._has_temporary_warning = False
+
+ @property
+ def malicious_activity_warning(self) -> bool:
+ """Return True if the session is permanently flagged or has a pending temporary warning."""
+ return self._is_malicious or self._has_temporary_warning
+
+
+class BehaviorRecordManager:
+ """Manages per-session BehaviorRecords, creating them on first access."""
+
+ def __init__(self):
+ self._records: dict[str, BehaviorRecord] = dict()
+
+ def get_record_by_session_id(self, session_id: str) -> BehaviorRecord:
+ """Return the BehaviorRecord for a session, creating one if it doesn't exist."""
+ if session_id not in self._records:
+ self._records[session_id] = BehaviorRecord()
+
+ return self._records[session_id]
+
+
+behavior_record_manager = BehaviorRecordManager()
+
BASH_STRICT_PREAMBLE = "set -euo pipefail; "
SYSTEMD_RUN_ARGS = [
@@ -209,6 +285,7 @@ class RunScriptInteractiveResult(BaseModel):
id: str
status: GatekeeperStatus
detail: str
+ malicious_activity_warning: bool
# class UserInfo(BaseModel):
@@ -241,6 +318,7 @@ class ExecuteScriptResult:
@log_tool_call
@disallow_local_execution_in_containers
async def execute_script(
+ ctx: Context,
id: t.Annotated[str, Field(description="The associated ID of the script to be executed")],
) -> ToolResult:
script_details = script_store.get_script_details(id)
@@ -248,6 +326,10 @@ async def execute_script(
script_store.set_script_state(id, "executing")
content: list[ContentBlock] = []
+ # Clear temporary warning flag by human approval
+ behavior_record = behavior_record_manager.get_record_by_session_id(ctx.session_id)
+ behavior_record.remove_temporary_warning()
+
try:
returncode, stdout, stderr = await execute_command(command, host=script_details.host)
except Exception:
@@ -314,9 +396,12 @@ async def run_script_interactive(
host: Host = None,
) -> ToolResult:
script_details = script_store.get_script_details(token)
+ behavior_record = behavior_record_manager.get_record_by_session_id(ctx.session_id)
+
+ needs_confirmation = script_details.needs_confirmation or behavior_record.malicious_activity_warning
# Verify that this script requires confirmation
- if not script_details.needs_confirmation:
+ if not needs_confirmation:
raise ToolError("This script does not require confirmation. Use run_script instead of run_script_interactive.")
# Check if the passed parameters match the stored script details
@@ -338,6 +423,8 @@ async def run_script_interactive(
(BASH_STRICT_PREAMBLE + script) if script_type == SCRIPT_TYPE_BASH else script,
readonly=readonly,
)
+ behavior_record.add_record(gatekeeper_result.status)
+
if gatekeeper_result.status != GatekeeperStatus.OK:
script_store.set_script_state(token, "rejected-gatekeeper")
raise ToolError(gatekeeper_result.description)
@@ -352,7 +439,12 @@ async def run_script_interactive(
)
]
- structured_content_obj = RunScriptInteractiveResult(id=result_id, status=GatekeeperStatus.OK, detail="")
+ structured_content_obj = RunScriptInteractiveResult(
+ id=result_id,
+ status=GatekeeperStatus.OK,
+ detail="",
+ malicious_activity_warning=behavior_record.malicious_activity_warning,
+ )
return ToolResult(content=content, structured_content=structured_content_obj.model_dump())
@@ -413,6 +505,9 @@ async def validate_script(
readonly=readonly,
)
+ behavior_record = behavior_record_manager.get_record_by_session_id(ctx.session_id)
+ behavior_record.add_record(gatekeeper_result.status)
+
id = script_store.add_script(description, script, script_type, host, readonly)
script_details = script_store.get_script_details(id)
@@ -420,16 +515,18 @@ async def validate_script(
script_store.set_script_state(id, "rejected-gatekeeper")
raise ToolError(gatekeeper_result.description)
+ needs_confirmation = script_details.needs_confirmation or behavior_record.malicious_activity_warning
+
result = ToolResult(
content=[
TextContent(
type="text",
- text=f"Script passed gatekeeper validation and is stored with ID {id}. Please use {_pick_execution_tool(script_details.needs_confirmation)} to execute the validated script.",
+ text=f"Script passed gatekeeper validation and is stored with ID {id}. Please use {_pick_execution_tool(needs_confirmation)} to execute the validated script.",
)
],
structured_content={
"token": id,
- "needs_confirmation": script_details.needs_confirmation,
+ "needs_confirmation": needs_confirmation,
},
)
return result
@@ -448,9 +545,12 @@ async def run_script(
token: t.Annotated[str, Field(description="The token returned by the validate_script tool.")],
) -> str:
script_details = script_store.get_script_details(token)
+ behavior_record = behavior_record_manager.get_record_by_session_id(ctx.session_id)
+
+ needs_confirmation = script_details.needs_confirmation or behavior_record.malicious_activity_warning
# Verify that this script doesn't require confirmation
- if script_details.needs_confirmation:
+ if needs_confirmation:
raise ToolError(f"This script requires confirmation. Use {_pick_execution_tool(True)} instead of run_script.")
script_store.set_script_state(token, "executing")
@@ -498,13 +598,19 @@ async def run_script_with_confirmation(
host: Host = None,
) -> str:
script_details = script_store.get_script_details(token)
+ behavior_record = behavior_record_manager.get_record_by_session_id(ctx.session_id)
+
+ needs_confirmation = script_details.needs_confirmation or behavior_record.malicious_activity_warning
# Verify that this script requires confirmation
- if not script_details.needs_confirmation:
+ if not needs_confirmation:
raise ToolError(
"This script does not require confirmation. Use run_script instead of run_script_with_confirmation."
)
+ # Clear temporary warning flag by human approval
+ behavior_record.remove_temporary_warning()
+
# Verify the retrieved script details match the incoming parameters
new_details = ScriptDetails(
state="waiting-approval",
@@ -527,6 +633,8 @@ async def run_script_with_confirmation(
(BASH_STRICT_PREAMBLE + script) if script_type == SCRIPT_TYPE_BASH else script,
readonly=readonly,
)
+ behavior_record.add_record(gatekeeper_result.status)
+
if gatekeeper_result.status != GatekeeperStatus.OK:
script_store.set_script_state(token, "rejected-gatekeeper")
raise ToolError(gatekeeper_result.description)