From cac8aead58d9ffdab928f60fe565594a30a66fde Mon Sep 17 00:00:00 2001 From: Steve-Mcl Date: Thu, 25 Jun 2026 08:11:19 +0100 Subject: [PATCH 1/5] Add MCP client implementation with endpoint handling and SDK resolution --- lib/mcp.js | 466 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 466 insertions(+) create mode 100644 lib/mcp.js diff --git a/lib/mcp.js b/lib/mcp.js new file mode 100644 index 00000000..eb923ea1 --- /dev/null +++ b/lib/mcp.js @@ -0,0 +1,466 @@ +const path = require('path') +const { warn, debug } = require('./logging/log') + +const CLIENT_NAME = 'flowfuse-launcher' +const CLIENT_VERSION = '1.0.0' + +const MCP_SDK_PACKAGE = '@modelcontextprotocol/sdk' +const MCP_HOST_PACKAGE = '@flowfuse-nodes/nr-mcp-server-nodes' + +/** Scope value the platform issues for tokens authorised to talk to MCP servers. */ +const MCP_TOKEN_SCOPE = 'ff-expert:mcp' + +/** + * @typedef {object} accessToken + * @property {string} [scheme] - auth scheme, e.g. 'Bearer'. Basic auth is currently rejected. + * @property {string} [token] - the token value + * @property {string|string[]} [scope] - token scope(s); only tokens including `ff-expert:mcp` are honoured + * + * @typedef {object} McpEndpointSpec - Specification for an MCP endpoint. Additional properties are allowed and will be passed back in the result (for correlation purposes). + * @property {string} endpoint - The path of the MCP server e.g. '/mcp'. Should not contain the host/port; those are determined to agent/launcher + * @property {object} [headers] - extra request headers to send with every MCP HTTP request + * @property {accessToken} [accessToken] - access token; merged into `Authorization` when scoped for MCP + * + * @typedef {object} McpEndpoint {{ key: string, url: string, headers: object, skip: boolean, ... }} (additional properties allowed) + * @property {string} key - unique key for the endpoint; used for result mapping and logging (often the same as `url`) + * @property {string} url - MCP endpoint URL or path on the local Node-RED instance + * @property {object} [headers] - extra request headers to send with every MCP HTTP request + * @property {boolean} skip - when true, the endpoint should be skipped (e.g. due to unsupported auth scheme)} + * @property {Error} [error] - if present, indicates an error that caused this endpoint to be skipped + * + * @typedef {import('@modelcontextprotocol/sdk/client').Client} McpClient + * @typedef {import('@modelcontextprotocol/sdk/client/streamableHttp.js').StreamableHTTPClientTransport} StreamableHTTPClientTransport + */ + +/** + * Error thrown when the MCP SDK cannot be located/loaded from the Node-RED + * instance. Carries a deliberately host-path-free message so it is safe to + * surface back to the platform/UI; the underlying detail (attempted specifiers, + * resolver error) is logged at debug level only. The original error, when any, + * is attached as `cause`. + */ +class McpSdkUnavailableError extends Error { + constructor (message, { cause } = {}) { + super(message || 'MCP SDK is not available in this Node-RED instance') + this.name = 'McpSdkUnavailableError' + this.code = 'MCP_SDK_UNAVAILABLE' + if (cause) { + this.cause = cause + } + } +} + +/** + * Resolve an MCP SDK sub-module to an absolute, symlink-resolved file path, + * scoped to the Node-RED project's node_modules. The SDK is shipped by + * @flowfuse-nodes/nr-mcp-server-nodes (or any other Node-RED package that depends + * on it), so it lives in the running Node-RED instance rather than the + * device-agent's own dependencies. + * + * We use `require.resolve(..., { paths })` rather than hand-building a path into + * `dist/cjs`: this honours the SDK's `exports` map and Node's full resolution + * algorithm, so it keeps working across build-layout changes and non-flat + * installs (npm-hoisted, host-nested, pnpm, yarn). `paths` keeps resolution + * scoped to the project so an unrelated copy elsewhere can't be picked up. + * + * Resolution is attempted, in order, from: + * 1. The host package's own node_modules: + * /node_modules//node_modules — where the SDK ends up when + * the host package is symlinked to a source checkout (npm cannot hoist deps + * across the symlink boundary). + * 2. The project root node_modules: + * /node_modules — the normal hoisted `npm install` case. + * + * @param {string} subPath - sub-path within the SDK, e.g. 'client/index.js' + * @param {string} projectDir - the Node-RED project directory (must contain node_modules) + * @returns {string} absolute, symlink-resolved path to the SDK module file + */ +function resolveFromProject (subPath, projectDir) { + if (!projectDir) { + throw new Error('Node-RED project directory is not available') + } + const specifier = `${MCP_SDK_PACKAGE}/${subPath}` + const paths = [ + path.join(projectDir, 'node_modules', MCP_HOST_PACKAGE, 'node_modules'), + path.join(projectDir, 'node_modules') + ] + return require.resolve(specifier, { paths }) +} + +/** + * Resolve an endpoint string to a URL. + * - If it already parses as an absolute URL, it is used as-is. + * - Otherwise it is treated as a path on the local Node-RED instance and joined to + * `http://127.0.0.1:`. 127.0.0.1 is preferred over `localhost` to avoid + * IPv4/IPv6 resolution differences and hosts-file surprises. + * @param {string} endpoint + * @param {string} host + * @param {number} port + * @returns {URL} + */ +function endpointToUrl (endpoint, host, port) { + try { + const u = new URL(endpoint) + if (u.protocol === 'http:' || u.protocol === 'https:') { + return u + } + } catch (_) { /* fall through to local-path handling */ } + const pathPart = endpoint.startsWith('/') ? endpoint : `/${endpoint}` + return new URL(`http://${host || '127.0.0.1'}:${port}${pathPart}`) +} + +/** + * Normalise an endpoint into a uniform shape the rest of the class can work with. + * Accepts either a bare string (URL/path) or an object spec carrying auth/headers. + * + * @param {McpEndpointSpec|string} spec + * @param {string} host + * @param {number} port + * @returns {McpEndpoint} + * `key` is what the result map is keyed by; `skip` is set when the endpoint + * should not be contacted (e.g. unsupported auth scheme). + */ +function normalizeEndpoint (spec, host, port) { + if (typeof spec === 'string') { + return { + key: spec, + url: endpointToUrl(spec, host, port).toString(), + headers: { 'Content-Type': 'application/json' }, + skip: false + } + } + if (!spec || typeof spec !== 'object' || typeof spec.mcpEndpoint !== 'string') { + return { + key: '', + skip: true, + error: 'Invalid endpoint spec: must be a string or object with an "mcpEndpoint" property' + } + } + const headers = { ...(spec.headers || {}) } + if (!headers['Content-Type'] && !headers['content-type']) { + headers['Content-Type'] = 'application/json' + } + let skip = false + const accessToken = spec.accessToken || null + if (accessToken) { + const { scheme, token, scope } = accessToken + const expertScopedToken = scope === MCP_TOKEN_SCOPE || (Array.isArray(scope) && scope.includes(MCP_TOKEN_SCOPE)) + if (expertScopedToken) { + if (scheme === 'Basic' || scheme === 'basic') { + // Basic auth is currently unsupported; mark for the caller to skip. + skip = true + } else if (token) { + headers.Authorization = scheme ? `${scheme} ${token}` : token + } + } + } + return { + ...spec, // preserve any additional properties for correlation/logging purposes + key: spec.mcpEndpoint, + url: endpointToUrl(spec.mcpEndpoint, host, port).toString(), + headers, + skip + } +} + +/** + * MCP client wrapper around the upstream `@modelcontextprotocol/sdk` Client. + * + * Configuration (port / project directory) is captured at construction time so that + * call-site code only has to supply the per-request payload. + */ +class MCP { + /** + * @param {object} options + * @param {string} options.nodeRedUserDir - Node-RED project directory containing node_modules; the MCP SDK is loaded from here + * @param {string} [options.host='127.0.0.1'] - host to use when connecting to local endpoints; typically '127.0.0.1' + * @param {number} options.port - local Node-RED HTTP port; used when an endpoint is a bare path rather than a full URL + */ + constructor ({ nodeRedUserDir, host = '127.0.0.1', port } = {}) { + this.nodeRedUserDir = nodeRedUserDir + this.host = host + this.port = port + /** Cached SDK classes — populated on first successful load. */ + this._sdk = null + /** Absolute paths the cached SDK was loaded from; used to purge the require cache on reset. */ + this._sdkPaths = null + } + + // #region internal helpers + + /** + * Lazy load and cache the MCP SDK classes used by this module. + * Throws {@link McpSdkUnavailableError} if the SDK can't be located/loaded + * from the Node-RED project's node_modules. + * @returns {{ Client: McpClient, StreamableHTTPClientTransport: StreamableHTTPClientTransport }} + * @throws {McpSdkUnavailableError} + */ + _loadSdk () { + if (this._sdk) { + return this._sdk // already loaded and cached + } + let clientPath, transportPath + try { + clientPath = resolveFromProject('client/index.js', this.nodeRedUserDir) + transportPath = resolveFromProject('client/streamableHttp.js', this.nodeRedUserDir) + } catch (err) { + // Keep host filesystem paths out of the surfaced error; log detail at debug level only. + debug(`MCP SDK resolution failed: ${err.message}`) + throw new McpSdkUnavailableError(undefined, { cause: err }) + } + debug(`Loading MCP SDK from ${clientPath}`) + const { Client } = require(clientPath) + const { StreamableHTTPClientTransport } = require(transportPath) + if (!Client || !StreamableHTTPClientTransport) { + throw new McpSdkUnavailableError('MCP SDK loaded but expected exports are missing') + } + this._sdkPaths = [clientPath, transportPath] + this._sdk = { Client, StreamableHTTPClientTransport } + return this._sdk + } + + /** + * Drop the cached SDK reference and purge its entries from Node's require + * cache, so the next {@link _loadSdk} re-resolves and reloads from disk. + * + * The launcher calls this whenever Node-RED is (re)started or stopped: the + * project's node_modules may have just been reinstalled, relinked (Docker + * module_cache), or removed, so any previously-loaded SDK could be stale or + * gone. Purging the require cache matters for the in-place `npm install` + * case, where the SDK is overwritten at the same path the process already + * cached. + */ + resetSdk () { + this._sdk = null + if (this._sdkPaths) { + for (const p of this._sdkPaths) { + this._purgeFromRequireCache(p) + } + this._sdkPaths = null + } + } + + /** + * Delete the given module file and every other cached module under the same + * SDK package root from `require.cache`, so a subsequent require reloads the + * package fresh rather than stitching new entry points onto stale submodules. + * The SDK is only ever required by this wrapper (Node-RED runs in a separate + * child process), so purging is safe. + * @param {string} resolvedPath - an absolute path inside the SDK package + */ + _purgeFromRequireCache (resolvedPath) { + const marker = `${path.sep}${MCP_SDK_PACKAGE.split('/').join(path.sep)}${path.sep}` + const idx = resolvedPath.indexOf(marker) + const pkgRoot = idx >= 0 ? resolvedPath.slice(0, idx + marker.length) : resolvedPath + const matches = Object.keys(require.cache).filter(k => k === resolvedPath || k.startsWith(pkgRoot)) + for (const key of matches) { + delete require.cache[key] + } + } + + /** + * Connect a fresh MCP client to the given (already-normalised) endpoint spec. + * Caller is responsible for `client.close()` when finished. + * @param {McpEndpoint} endpoint + * @returns {Promise} a connected MCP Client instance + */ + async _connect (endpoint) { + const { Client, StreamableHTTPClientTransport } = this._loadSdk() + const transport = new StreamableHTTPClientTransport(endpoint.url, { + requestInit: { headers: endpoint.headers } + }) + const client = new Client({ name: CLIENT_NAME, version: CLIENT_VERSION }) + await client.connect(transport) + return client + } + + async _getEndpointFeatures (/** @type {McpEndpoint} */ endpoint) { + const features = { + tools: [], + resources: [], + resourceTemplates: [], + prompts: [], + capabilities: {} + } + /** @type {McpClient} */ + let client + try { + client = await this._connect(endpoint) + const capabilities = client.getServerCapabilities() || {} + features.capabilities = capabilities + + if (capabilities.tools) { + try { + const { tools } = await client.listTools() + features.tools = tools || [] + } catch (err) { + debug(`MCP listTools failed for ${endpoint.key}: ${err.message}`) + } + } + if (capabilities.resources) { + try { + const { resources } = await client.listResources() + features.resources = resources || [] + } catch (err) { + debug(`MCP listResources failed for ${endpoint.key}: ${err.message}`) + } + try { + const { resourceTemplates } = await client.listResourceTemplates() + features.resourceTemplates = resourceTemplates || [] + } catch (err) { + debug(`MCP listResourceTemplates failed for ${endpoint.key}: ${err.message}`) + } + } + if (capabilities.prompts) { + try { + const { prompts } = await client.listPrompts() + features.prompts = prompts || [] + } catch (err) { + debug(`MCP listPrompts failed for ${endpoint.key}: ${err.message}`) + } + } + } catch (err) { + warn(`Failed to query MCP endpoint '${endpoint.key}': ${err.message}`) + features.error = err.message + } finally { + if (client) { + try { + await client.close() + } catch (_) { /* ignore */ } + } + } + return features + } + + // #endregion internal helpers + + // #region Public API + + /** + * Get a list of features from MCP server(s) running in the Node-RED instance. + * @param {Array} endpoints - list of MCP endpoints to query. + * Each entry may be a bare URL/path string, or an object `{ url, headers?, accessToken? }` + * where `accessToken` is `{ scheme, token, scope }`. + * @returns {Promise>} a map of endpoint key/url to features or error; the key is typically the URL or `spec.url` value + */ + async getFeatures (endpoints) { + const result = [] + if (!Array.isArray(endpoints) || endpoints.length === 0) { + return result + } + const normalisedEndpoints = endpoints.map(e => normalizeEndpoint(e, this.host, this.port)) + + // Fail fast if the SDK isn't installed/available. + try { + this._loadSdk() + } catch (err) { + warn(`MCP SDK not available: ${err.message}`) + for (const endpoint of normalisedEndpoints) { + result.push({ + spec: endpoint, + error: `MCP SDK not available: ${err.message}` + }) + } + return result + } + + await Promise.all(normalisedEndpoints.map(async (spec) => { + /** @type {McpEndpoint} */ + if (spec.error) { + result.push({ + spec, + error: spec.error + }) + return + } + if (spec.skip) { + result.push({ + spec, + error: 'Endpoint skipped: unsupported auth scheme' + }) + return + } + const features = await this._getEndpointFeatures(spec) + const reply = { + spec, + features, + error: features.error || null + } + result.push(reply) + })) + return result + } + + /** + * Call a tool on an MCP server running in the Node-RED instance. + * Throws on any failure (caller is expected to convert to an error response). + * @param {string|McpEndpointSpec} endpoint - MCP endpoint URL/path, or `{ url, headers?, accessToken? }` + * @param {string} name - name of the tool to invoke + * @param {object} [input] - arguments to pass to the tool + * @returns {Promise} the tool call result as returned by the MCP server + */ + async callTool (endpoint, name, input) { + if (!endpoint) { + throw new Error('endpoint is required') + } + if (!name) { + throw new Error('Tool name is required') + } + const spec = normalizeEndpoint(endpoint, this.host, this.port) + if (spec.skip) { + throw new Error('Endpoint skipped: unsupported auth scheme') + } + let client + try { + client = await this._connect(spec) + const response = await client.callTool({ name, arguments: input || {} }) + return response + } finally { + if (client) { + try { + await client.close() + } catch (_) { /* ignore */ } + } + } + } + + /** + * Read a resource from an MCP server running in the Node-RED instance. + * Throws on any failure (caller is expected to convert to an error response). + * @param {string|McpEndpointSpec} endpoint - MCP endpoint URL/path, or `{ url, headers?, accessToken? }` + * @param {string} uri - URI of the resource + * @returns {Promise} the resource read result + */ + async readResource (endpoint, uri) { + if (!endpoint) { + throw new Error('endpoint is required') + } + if (!uri) { + throw new Error('Resource URI is required') + } + const spec = normalizeEndpoint(endpoint, this.host, this.port) + if (spec.skip) { + throw new Error('Endpoint skipped: unsupported auth scheme') + } + + let client + try { + client = await this._connect(spec) + const response = await client.readResource({ uri }) + return response + } finally { + if (client) { + try { + await client.close() + } catch (_) { /* ignore */ } + } + } + } + + // #endregion public API +} + +module.exports = { + MCP, + McpSdkUnavailableError +} From 309c606c239edcc3a505a3d4857c51d5a13d5a2d Mon Sep 17 00:00:00 2001 From: Steve-Mcl Date: Thu, 25 Jun 2026 08:31:35 +0100 Subject: [PATCH 2/5] Add MCP integration and command handling to MQTTClient --- lib/mqtt.js | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/lib/mqtt.js b/lib/mqtt.js index d8a9d87d..71eda072 100644 --- a/lib/mqtt.js +++ b/lib/mqtt.js @@ -1,7 +1,9 @@ +const path = require('path') const mqtt = require('mqtt') const { info, warn, debug, setMQTT, getBufferedMessages } = require('./logging/log') const { IntervalJitter } = require('./IntervalJitter') const EditorTunnel = require('./editor/tunnel') +const { MCP } = require('./mcp') const { getWSProxyAgent } = require('./utils') const { randomInt } = require('crypto') @@ -47,6 +49,14 @@ class MQTTClient { reconnectPeriod: randomInt(13000, 25000), queueQoSZero: false } + + /** @type {MCP} */ + this.mcp = new MCP({ + nodeRedUserDir: path.join(config.dir, 'project'), + port: config.port, + host: config.host + }) + setMQTT(this) } @@ -144,6 +154,51 @@ class MQTTClient { await this.handleActionRequest(msg) } else if (msg.command === 'reportPackages') { this.reportPackages = true + } else if (msg.command === 'get-liveState') { + debug('Getting instance live state') + try { + const response = await this.agent.getState() || {} + response.mcp = { + supportedCommands: ['mcp:get-features', 'mcp:call-tool', 'mcp:read-resource'] + } + this.sendCommandResponse(msg, response) + } catch (error) { + warn(`Error getting instance live state: ${error}`) + this.sendCommandResponse(msg, { error: error.toString() }) + } + } else if (msg.command === 'mcp:get-features') { + // MCP ROUTE: step 3 (remote) + // Called By: the Forge platform (via MQTT command message) + // Calls To : MCP helper class + debug('Getting MCP features') + try { + const { mcpEndPoints } = msg.payload || {} + const response = await this.mcp.getFeatures(mcpEndPoints) + this.sendCommandResponse(msg, response) + } catch (error) { + warn(`Error getting MCP features: ${error}`) + this.sendCommandResponse(msg, { error: error.toString() }) + } + } else if (msg.command === 'mcp:call-tool') { + debug('Calling MCP tool', msg.payload) + try { + const { endpoint, name, input } = msg.payload || {} + const response = await this.mcp.callTool(endpoint, name, input) + this.sendCommandResponse(msg, response) + } catch (error) { + warn(`Error calling MCP tool: ${error}`) + this.sendCommandResponse(msg, { error: error.toString() }) + } + } else if (msg.command === 'mcp:read-resource') { + debug('Reading MCP resource', msg.payload) + try { + const { endpoint, uri } = msg.payload || {} + const response = await this.mcp.readResource(endpoint, uri) + this.sendCommandResponse(msg, response) + } catch (error) { + warn(`Error reading MCP resource: ${error}`) + this.sendCommandResponse(msg, { error: error.toString() }) + } } else { warn(`Unknown command type received from platform: ${msg.command}`) } From f9a2bb14e0b93f131a6d18265e88fd6eafc79556 Mon Sep 17 00:00:00 2001 From: Steve-Mcl Date: Thu, 25 Jun 2026 08:32:24 +0100 Subject: [PATCH 3/5] Maintain userProperties through command response --- lib/mqtt.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/mqtt.js b/lib/mqtt.js index 71eda072..4a1ee18a 100644 --- a/lib/mqtt.js +++ b/lib/mqtt.js @@ -385,6 +385,7 @@ class MQTTClient { sendCommandResponse (request, response) { const correlationData = request?.correlationData const responseTopic = request?.responseTopic || this.responseTopic + const userProperties = request?.userProperties || {} const command = request?.command if (!correlationData || !responseTopic || !command) { @@ -396,6 +397,7 @@ class MQTTClient { deviceId: this.deviceId, // for message routing and verification command, // for command response verification correlationData, // for correlating response with request + userProperties, // for any additional metadata payload: response // the actual response payload } const messageJSON = JSON.stringify(message) From 1d77439fc5257a5fb63e6fe117d6a7b03280c867 Mon Sep 17 00:00:00 2001 From: Steve-Mcl Date: Thu, 25 Jun 2026 08:35:38 +0100 Subject: [PATCH 4/5] Reset cached MCP SDK on start / stop to ensure fresh resolution --- lib/launcher.js | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lib/launcher.js b/lib/launcher.js index 5003cf57..eee54d0a 100644 --- a/lib/launcher.js +++ b/lib/launcher.js @@ -549,6 +549,12 @@ class Launcher { } this.state = States.STARTING + + // The node-red node_modules may have been (re)installed or relinked, + // so any previously-resolved MCP SDK could now be stale. Drop the cached + // reference so it is re-resolved against the current install on next use. + this.agent?.mqttClient?.mcp?.resetSdk?.() + if (!existsSync(this.projectDir) || !existsSync(this.files.flows) || !existsSync(this.files.credentials) || @@ -739,6 +745,11 @@ class Launcher { let finalState = States.STOPPED this.stopReason = reason || 'shutdown' info('Stopping Node-RED. Reason: ' + this.stopReason) + + // Drop any cached MCP SDK reference: the SDK files may be removed (clean + // stop) or replaced before the next start, so they must not be reused. + this.agent?.mqttClient?.mcp?.resetSdk?.() + if (this.stopReason === States.SUSPENDED) { finalState = States.SUSPENDED } From c740e3cdb419bcbdba753b8e9120fa384997c7d0 Mon Sep 17 00:00:00 2001 From: Stephen McLaughlin <44235289+Steve-Mcl@users.noreply.github.com> Date: Thu, 25 Jun 2026 17:43:18 +0100 Subject: [PATCH 5/5] default port to 1880 --- lib/mcp.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mcp.js b/lib/mcp.js index eb923ea1..8219b64c 100644 --- a/lib/mcp.js +++ b/lib/mcp.js @@ -106,7 +106,7 @@ function endpointToUrl (endpoint, host, port) { } } catch (_) { /* fall through to local-path handling */ } const pathPart = endpoint.startsWith('/') ? endpoint : `/${endpoint}` - return new URL(`http://${host || '127.0.0.1'}:${port}${pathPart}`) + return new URL(`http://${host || '127.0.0.1'}:${port || 1880}${pathPart}`) } /**