|
| 1 | +#!/usr/bin/env python3 |
| 2 | +""" |
| 3 | +LangGraph agent for Echidna MCP integration. |
| 4 | +
|
| 5 | +Connects to a running Echidna campaign via its MCP server and autonomously |
| 6 | +guides the fuzzer: analyzes coverage, injects targeted transactions, and |
| 7 | +resets priorities when coverage stagnates. |
| 8 | +
|
| 9 | +Requirements: |
| 10 | + pip install langchain-anthropic langgraph httpx |
| 11 | +
|
| 12 | +Usage: |
| 13 | + # Start Echidna with MCP enabled: |
| 14 | + echidna MyContract.sol --server 8080 --format text |
| 15 | +
|
| 16 | + # Run the agent: |
| 17 | + ANTHROPIC_API_KEY=... python examples/mcp_agent.py |
| 18 | +""" |
| 19 | + |
| 20 | +import os |
| 21 | +import time |
| 22 | +import httpx |
| 23 | +from typing import TypedDict, List |
| 24 | +from langchain_anthropic import ChatAnthropic |
| 25 | +from langgraph.graph import StateGraph, END |
| 26 | + |
| 27 | + |
| 28 | +# --------------------------------------------------------------------------- |
| 29 | +# MCP client |
| 30 | +# --------------------------------------------------------------------------- |
| 31 | + |
| 32 | +def call_tool(tool: str, args: dict = None) -> str: |
| 33 | + """Call an MCP tool and return the text response.""" |
| 34 | + payload = { |
| 35 | + "jsonrpc": "2.0", "id": 1, |
| 36 | + "method": "tools/call", |
| 37 | + "params": {"name": tool, "arguments": args or {}}, |
| 38 | + } |
| 39 | + resp = httpx.post("http://localhost:8080/mcp", json=payload, timeout=30) |
| 40 | + result = resp.json().get("result", {}) |
| 41 | + return result.get("content", [{}])[0].get("text", "") |
| 42 | + |
| 43 | + |
| 44 | +# --------------------------------------------------------------------------- |
| 45 | +# Agent state |
| 46 | +# --------------------------------------------------------------------------- |
| 47 | + |
| 48 | +class State(TypedDict): |
| 49 | + coverage: int |
| 50 | + iterations: int |
| 51 | + stagnation: int |
| 52 | + |
| 53 | + |
| 54 | +def parse_int(text: str, key: str) -> int: |
| 55 | + for line in text.splitlines(): |
| 56 | + if key in line: |
| 57 | + try: |
| 58 | + return int(line.split(":")[-1].strip().split()[0].replace(",", "")) |
| 59 | + except ValueError: |
| 60 | + pass |
| 61 | + return 0 |
| 62 | + |
| 63 | + |
| 64 | +# --------------------------------------------------------------------------- |
| 65 | +# Graph nodes |
| 66 | +# --------------------------------------------------------------------------- |
| 67 | + |
| 68 | +def observe(state: State) -> State: |
| 69 | + """Read current campaign status.""" |
| 70 | + status = call_tool("status") |
| 71 | + coverage = parse_int(status, "Coverage") |
| 72 | + iterations = parse_int(status, "Iterations") |
| 73 | + stagnation = state["stagnation"] + 1 if coverage == state["coverage"] else 0 |
| 74 | + print(f" coverage={coverage} iterations={iterations} stagnation={stagnation}") |
| 75 | + return {"coverage": coverage, "iterations": iterations, "stagnation": stagnation} |
| 76 | + |
| 77 | + |
| 78 | +def inject(state: State) -> State: |
| 79 | + """Ask the LLM to suggest targeted transactions and inject them.""" |
| 80 | + llm = ChatAnthropic(model="claude-sonnet-4-5", temperature=0.7) |
| 81 | + coverage_report = call_tool("show_coverage") |
| 82 | + |
| 83 | + prompt = ( |
| 84 | + "You are a smart-contract security expert helping an Echidna fuzzer.\n" |
| 85 | + "The coverage report is:\n\n" |
| 86 | + f"{coverage_report[:2000]}\n\n" |
| 87 | + "Suggest 3 Solidity function calls that would increase coverage.\n" |
| 88 | + "Reply with one call per line, e.g.: transfer(0xABCD..., 100)\n" |
| 89 | + ) |
| 90 | + suggestions = llm.invoke(prompt).content.strip().splitlines() |
| 91 | + |
| 92 | + for tx in suggestions[:3]: |
| 93 | + tx = tx.strip() |
| 94 | + if tx and "(" in tx and not tx.startswith("#"): |
| 95 | + print(f" injecting: {tx}") |
| 96 | + call_tool("inject_fuzz_transactions", {"transactions": tx}) |
| 97 | + |
| 98 | + return {**state, "stagnation": 0} |
| 99 | + |
| 100 | + |
| 101 | +def reset(state: State) -> State: |
| 102 | + """Clear function priorities to encourage exploration.""" |
| 103 | + print(" clearing priorities") |
| 104 | + call_tool("clear_fuzz_priorities") |
| 105 | + return state |
| 106 | + |
| 107 | + |
| 108 | +def route(state: State) -> str: |
| 109 | + if state["stagnation"] >= 3: |
| 110 | + return "inject" |
| 111 | + if state["iterations"] > 0 and state["iterations"] % 100_000 == 0: |
| 112 | + return "reset" |
| 113 | + return END |
| 114 | + |
| 115 | + |
| 116 | +# --------------------------------------------------------------------------- |
| 117 | +# Build and run |
| 118 | +# --------------------------------------------------------------------------- |
| 119 | + |
| 120 | +def build_graph() -> StateGraph: |
| 121 | + g = StateGraph(State) |
| 122 | + g.add_node("observe", observe) |
| 123 | + g.add_node("inject", inject) |
| 124 | + g.add_node("reset", reset) |
| 125 | + g.set_entry_point("observe") |
| 126 | + g.add_conditional_edges("observe", route, {"inject": "inject", "reset": "reset", END: END}) |
| 127 | + g.add_edge("inject", END) |
| 128 | + g.add_edge("reset", END) |
| 129 | + return g.compile() |
| 130 | + |
| 131 | + |
| 132 | +def main(): |
| 133 | + if not os.getenv("ANTHROPIC_API_KEY"): |
| 134 | + print("Set ANTHROPIC_API_KEY before running.") |
| 135 | + return |
| 136 | + |
| 137 | + # Verify connectivity |
| 138 | + try: |
| 139 | + status = call_tool("status") |
| 140 | + if not status: |
| 141 | + raise RuntimeError("empty response") |
| 142 | + print("Connected to Echidna MCP server.") |
| 143 | + except Exception as e: |
| 144 | + print(f"Cannot reach MCP server: {e}") |
| 145 | + print("Start Echidna with: echidna MyContract.sol --server 8080 --format text") |
| 146 | + return |
| 147 | + |
| 148 | + graph = build_graph() |
| 149 | + state: State = {"coverage": 0, "iterations": 0, "stagnation": 0} |
| 150 | + |
| 151 | + duration = int(input("Duration in minutes [10]: ") or 10) |
| 152 | + interval = int(input("Check interval in seconds [60]: ") or 60) |
| 153 | + end_time = time.time() + duration * 60 |
| 154 | + |
| 155 | + step = 0 |
| 156 | + while time.time() < end_time: |
| 157 | + step += 1 |
| 158 | + print(f"\n--- step {step} ---") |
| 159 | + state = graph.invoke(state) |
| 160 | + time.sleep(interval) |
| 161 | + |
| 162 | + print(f"\nDone. Final coverage: {state['coverage']} iterations: {state['iterations']}") |
| 163 | + |
| 164 | + |
| 165 | +if __name__ == "__main__": |
| 166 | + main() |
0 commit comments