Status: Implemented Author: Studio of Two
The current pipes.json node model supports only subprocess-based executables
(binary, shell, script). Every node must be a program that reads stdin
and writes stdout.
This excludes a whole class of context-processing tools that are only reachable via the MCP protocol — web scrapers, GitHub, context-mode, database connectors, proprietary enterprise APIs. Calling these today requires authoring a thin wrapper script per tool, which is exactly the boilerplate CPP was designed to eliminate.
The mcp node type makes MCP tools first-class citizens in a
pipes.json pipeline, with the same timeout guard, trace accounting, and tee
support as any binary node.
[stdin: URL]
│
firecrawl/scrape ← MCP node, fetches and cleans live web page
│
markitdown ← binary node, converts to structured Markdown
│
context-mode/index ← MCP node, indexes content for RAG
│
semantic-sift-cli ← binary node, distils indexed context
│
[stdout: distilled context]
Without the mcp node type this requires four external wrapper scripts.
With it, the entire flow is declared in pipes.json and managed by the
orchestrator.
| Field | Type | Required | Description |
|---|---|---|---|
type |
string |
No | "mcp" activates the MCP client path. Omit or set to "binary" for the current subprocess path. |
server |
string |
Yes (if type=mcp) |
Server registry key — matches a key in the servers block of pipes.json or ~/.mcp-pipe.json. |
tool |
string |
Yes (if type=mcp) |
Fully-qualified tool name as registered by the MCP server (e.g., get_file, index). |
input_key |
string |
No | Argument key used to pass stdin content to the tool. Default: "content". |
args |
object |
No | Additional static key/value arguments merged with input_key. |
All existing fields (tee, help_msg) remain valid on mcp nodes.
{
"version": "1.0",
"servers": {
"firecrawl": {
"command": ["python", "-m", "firecrawl_mcp.server"],
"env": { "FIRECRAWL_API_KEY": "${FIRECRAWL_API_KEY}" }
},
"context-mode": {
"command": ["python", "-m", "context_mode.server"]
}
},
"pipes": [ ... ],
"mappings": [ ... ]
}| Field | Type | Description |
|---|---|---|
command |
string[] |
argv to spawn the MCP server process (stdio transport). |
env |
object |
Extra environment variables. Values prefixed with ${VAR} are resolved from the host environment at runtime. |
The servers block follows the same local-precedence merge as pipes and
mappings: entries in the project pipes.json override ~/.mcp-pipe.json.
{
"version": "1.0",
"servers": {
"firecrawl": {
"command": ["python", "-m", "firecrawl_mcp.server"],
"env": { "FIRECRAWL_API_KEY": "${FIRECRAWL_API_KEY}" }
},
"context-mode": {
"command": [
"C:/path/to/venv/Scripts/python.exe",
"-m", "context_mode.server"
]
}
},
"pipes": [
{
"name": "web-research-pipe",
"nodes": [
{
"type": "mcp",
"server": "firecrawl",
"tool": "scrape",
"input_key": "url",
"help_msg": "Firecrawl MCP server not reachable. Check FIRECRAWL_API_KEY and server config."
},
{
"cmd": "markitdown"
},
{
"type": "mcp",
"server": "context-mode",
"tool": "index",
"input_key": "content"
},
{
"cmd": "semantic-sift-cli",
"args": ["semantic", "--rate", "0.4"]
}
]
}
],
"mappings": [
{ "trigger": "tool:web_fetch|tool:web_search", "pipe": "web-research-pipe" }
]
}run_pipe() gains a single branch at the top of the node loop:
for node in pipe_config.get("nodes", []):
if node.get("type") == "mcp":
stdout = await _run_mcp_node(node, current_input, server_registry, process_env)
else:
# existing subprocess.Popen path — unchanged
...Because mcp.client.stdio uses anyio internally, _run_mcp_node is an
async function. run_pipe() will be promoted to async and callers updated
accordingly (see §3.4).
async def _run_mcp_node(
node: dict,
stdin_data: str,
server_registry: dict,
env: dict,
) -> str:
server_key = node["server"]
tool_name = node["tool"]
input_key = node.get("input_key", "content")
static_args = {k: v for k, v in node.get("args", {}).items()}
server_cfg = server_registry.get(server_key)
if not server_cfg:
raise ValueError(f"MCP server '{server_key}' not found in servers registry.")
# Resolve ${VAR} env placeholders
resolved_env = _resolve_env_placeholders(server_cfg.get("env", {}))
child_env = {**env, **resolved_env}
cmd = server_cfg["command"]
server_params = StdioServerParameters(command=cmd[0], args=cmd[1:], env=child_env)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
arguments = {input_key: stdin_data, **static_args}
result = await session.call_tool(tool_name, arguments)
# Extract text from the CallToolResult content list
return _extract_text(result)_extract_text() iterates result.content, concatenates all TextContent
items, and falls back to str(result) if none are found.
config_loader.load_pipes_config() is extended to merge the servers block
from both pipes.json and ~/.mcp-pipe.json (local precedence), and return
it alongside the existing pipes and mappings.
run_pipe() receives the merged server_registry dict as a new parameter
(default {}), keeping the function signature backward-compatible for all
existing callers.
run_pipe() becomes async. Call-site impact:
| Caller | Change |
|---|---|
orchestrator.py main() |
asyncio.run(run_pipe(...)) |
api.pipe() |
asyncio.run(run_pipe(...)) |
dynamic.run_dynamic_pipe() |
becomes async |
server.py MCP tools |
already in async context — await run_pipe(...) |
cli.py subcommands |
asyncio.run(...) wrapper |
| All existing tests | wrap calls in asyncio.run() or use pytest-anyio |
Pipes with no mcp nodes execute identically to today — the asyncio.run
overhead for a pure-subprocess pipe is negligible (<1ms).
The Echo Guard hash is computed on the full pipe input and stored with a 30s TTL. If the same content passes through any pipe twice within 30s, the second pass is suppressed.
A pipe like:
[context-mode/index] → [semantic-sift-cli] → [context-mode/search] → [semantic-sift-cli]
Would have the second semantic-sift-cli node suppress itself because the
hash of its input (the output of context-mode/search) happens to collide with
a prior hash — or because the guard is keyed on the original pipe input, not
per-node input.
Change the hash key from:
hash_key = sha256(input_data.encode()).hexdigest()to:
hash_key = sha256(f"{pipe_name}:{node_index}:{input_data}".encode()).hexdigest()This scopes each guard entry to a specific (pipe, node_position, content)
tuple, making it impossible for two different nodes in the same pipe to collide,
while still preventing genuine double-sift loops across separate pipe
invocations.
_validate_nodes() is extended with two rules for mcp nodes:
type == "mcp"nodes must have bothserverandtoolkeys.type == "mcp"nodes are exempt from the shell-metacharacter check (they have nocmdto validate).type == "mcp"nodes do not count as shell utility nodes for the sift-terminal guard — onlybinary/shellnodes with allowlisted commands trigger that constraint.
The mcp node type was fully implemented in v0.2.0, including:
- Schema & Config: Native
serversregistry inpipes.jsonwith${VAR}resolution. - Async Orchestration: Core
run_pipespine promoted toasyncfor non-blocking MCP stdio transport. - Node-Scoped Echo Guard: Hashing logic updated to
pipe:node:contentto support multi-sift chains. - MCP Bridge: The
mcp-pipe toolCLI subcommand provides a direct shell-to-MCP interface.
- HTTP/SSE transport: only
stdioMCP servers are supported in this phase. HTTP transport requires a different connection model and session lifecycle. Tracked as a future item. - Server pooling / keep-alive: each
mcpnode spawns and tears down its own server process. Connection reuse across pipe runs is a Phase 8 concern. - Auth beyond env vars: OAuth flows, keychain integration, etc. are out of
scope.
${VAR}env placeholder resolution covers the common case.
Building Systems, not Patches — Studio of Two.