diff --git a/forge/ee/routes/mcp/server.js b/forge/ee/routes/mcp/server.js index 989276c372..3994dd465a 100644 --- a/forge/ee/routes/mcp/server.js +++ b/forge/ee/routes/mcp/server.js @@ -1,7 +1,17 @@ +const { McpServer } = require('@modelcontextprotocol/sdk/server/mcp.js') +const { StreamableHTTPServerTransport } = require('@modelcontextprotocol/sdk/server/streamableHttp.js') + +const { loadToolDefinitions, registerTools } = require('./toolLoader') + +// Load tool definitions once at startup +const toolDefinitions = loadToolDefinitions() + /** * MCP Platform Tools Server * * Exposes FlowFuse platform management capabilities as MCP tools. + * Stateless Streamable HTTP: each POST creates a fresh McpServer and transport. + * Auth via Bearer token (PAT), forwarded through app.inject() to existing routes. * * @param {import('../../../forge').ForgeApplication} app */ @@ -18,9 +28,58 @@ module.exports = async function (app) { } }) - // POST handler will be implemented in #7429 + /** + * POST / - MCP protocol endpoint (Streamable HTTP) + * + * Each request creates a fresh McpServer instance with a stateless transport. + * The auth token is forwarded to all internal route calls via app.inject(). + */ app.post('/', async (request, reply) => { - reply.code(501).send({ code: 'not_implemented', error: 'MCP endpoint not yet implemented' }) + const server = new McpServer( + { name: 'FlowFuse Platform', version: '1.0.0' }, + { capabilities: { tools: {} } } + ) + + const transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: undefined // stateless, no server-side sessions + }) + + // Bind inject to this request's auth token + const inject = (opts) => { + return app.inject({ + ...opts, + headers: { + ...opts.headers, + authorization: request.headers.authorization + } + }) + } + + // Stub scope check: will enforce PAT scopes once scoped PATs (#7411) land. + // When implemented, this will check tool.annotations against the PAT's + // readOnly flag and team scope restrictions. + const checkScope = (_tool) => { + return null // no restriction for now + } + + registerTools(server, toolDefinitions, inject, checkScope) + + await server.connect(transport) + + // Hand off response handling to the MCP transport. + // reply.hijack() tells Fastify we're managing the response directly. + reply.hijack() + + // The MCP SDK's transport uses @hono/node-server internally, which sets + // a drain timeout that calls socket.destroySoon(). Fastify's app.inject() + // creates mock sockets that lack this method, so we polyfill it. + const socket = request.raw.socket + if (socket && !socket.destroySoon) { + socket.destroySoon = () => socket.destroy?.() + } + + await transport.handleRequest(request.raw, reply.raw, request.body) + await server.close() }) // GET and DELETE are not supported in stateless mode diff --git a/forge/ee/routes/mcp/toolLoader.js b/forge/ee/routes/mcp/toolLoader.js new file mode 100644 index 0000000000..f8655ed360 --- /dev/null +++ b/forge/ee/routes/mcp/toolLoader.js @@ -0,0 +1,69 @@ +const fs = require('fs') +const path = require('path') + +const toolsDir = path.join(__dirname, 'tools') + +/** + * Loads all tool definition files from the tools/ directory. + * Each file should export an array of tool definitions with: + * { name, description, inputSchema, annotations, handler } + * + * Definitions are loaded once at startup and reused across requests. + */ +function loadToolDefinitions () { + const files = fs.readdirSync(toolsDir).filter(f => f.endsWith('.js')) + const allTools = [] + for (const file of files) { + const tools = require(path.join(toolsDir, file)) + allTools.push(...tools) + } + return allTools +} + +/** + * Registers all tool definitions on a McpServer instance. + * Called once per request since the server is stateless (fresh per request). + * + * @param {import('@modelcontextprotocol/sdk/server/mcp.js').McpServer} server + * @param {Array} toolDefinitions - loaded tool definitions + * @param {Function} inject - app.inject helper bound to the request's auth token + * @param {Function} checkScope - scope check function (stub for now) + */ +function registerTools (server, toolDefinitions, inject, checkScope) { + for (const tool of toolDefinitions) { + const config = { + description: tool.description, + annotations: tool.annotations + } + if (tool.inputSchema && Object.keys(tool.inputSchema).length > 0) { + config.inputSchema = tool.inputSchema + } + + server.registerTool(tool.name, config, async (args) => { + const scopeError = checkScope(tool) + if (scopeError) { + return scopeError + } + const response = await tool.handler(args, { inject }) + return formatResponse(response) + }) + } +} + +/** + * Formats an app.inject() response into an MCP CallToolResult. + */ +function formatResponse (response) { + const body = response.json() + if (response.statusCode >= 400) { + return { + content: [{ type: 'text', text: JSON.stringify(body) }], + isError: true + } + } + return { + content: [{ type: 'text', text: JSON.stringify(body, null, 2) }] + } +} + +module.exports = { loadToolDefinitions, registerTools } diff --git a/forge/ee/routes/mcp/tools/teams.js b/forge/ee/routes/mcp/tools/teams.js new file mode 100644 index 0000000000..1efd178c91 --- /dev/null +++ b/forge/ee/routes/mcp/tools/teams.js @@ -0,0 +1,26 @@ +const { z } = require('zod') + +module.exports = [ + { + name: 'list-teams', + description: 'List all teams the authenticated user belongs to. Returns team names, slugs, IDs, and membership roles.', + annotations: { readOnlyHint: true, destructiveHint: false }, + inputSchema: {}, + handler: async (args, { inject }) => { + const response = await inject({ method: 'GET', url: '/api/v1/user/teams' }) + return response + } + }, + { + name: 'get-team', + description: 'Get details of a specific team by its ID, including team type, member count, and instance counts.', + annotations: { readOnlyHint: true, destructiveHint: false }, + inputSchema: { + teamId: z.string().describe('The ID or hashid of the team') + }, + handler: async (args, { inject }) => { + const response = await inject({ method: 'GET', url: `/api/v1/teams/${args.teamId}` }) + return response + } + } +] diff --git a/test/unit/forge/ee/routes/mcp/server_spec.js b/test/unit/forge/ee/routes/mcp/server_spec.js index cc2282dcb5..5562b42798 100644 --- a/test/unit/forge/ee/routes/mcp/server_spec.js +++ b/test/unit/forge/ee/routes/mcp/server_spec.js @@ -26,109 +26,331 @@ describe('MCP Platform Tools Server', function () { await app.close() }) - it('should register the expertPlatformAutomation feature flag', async function () { - app.config.features.enabled('expertPlatformAutomation').should.equal(true) - }) + /** + * Parses an SSE response from the MCP transport. + * Extracts JSON-RPC messages from `data:` lines. + */ + function parseSSEResponse (response) { + const body = response.body + if (response.headers['content-type']?.includes('application/json')) { + return { statusCode: response.statusCode, result: JSON.parse(body) } + } + const messages = [] + const lines = body.split('\n') + for (const line of lines) { + if (line.startsWith('data: ')) { + try { + messages.push(JSON.parse(line.slice(6))) + } catch (e) { + // skip non-JSON data lines + } + } + } + if (messages.length === 1) { + return { statusCode: response.statusCode, result: messages[0] } + } + return { statusCode: response.statusCode, messages } + } - it('should return 501 for POST /api/v1/mcp with valid PAT (endpoint shell)', async function () { - const response = await app.inject({ - method: 'POST', - url: '/api/v1/mcp', - headers: { - authorization: `Bearer ${TestObjects.alicePAT.token}` - }, - payload: { jsonrpc: '2.0', method: 'initialize', id: 1 } + describe('Feature flag', function () { + it('should register the expertPlatformAutomation feature flag', async function () { + app.config.features.enabled('expertPlatformAutomation').should.equal(true) }) - response.statusCode.should.equal(501) }) - it('should return 401 for POST /api/v1/mcp without auth', async function () { - const response = await app.inject({ - method: 'POST', - url: '/api/v1/mcp', - payload: { jsonrpc: '2.0', method: 'initialize', id: 1 } + describe('Authentication', function () { + it('should return 401 without auth', async function () { + const response = await app.inject({ + method: 'POST', + url: '/api/v1/mcp', + payload: { jsonrpc: '2.0', method: 'initialize', id: 1 } + }) + response.statusCode.should.equal(401) + }) + + it('should return 401 with invalid token', async function () { + const response = await app.inject({ + method: 'POST', + url: '/api/v1/mcp', + headers: { + authorization: 'Bearer invalid-token' + }, + payload: { jsonrpc: '2.0', method: 'initialize', id: 1 } + }) + response.statusCode.should.equal(401) }) - response.statusCode.should.equal(401) }) - it('should return 401 for POST /api/v1/mcp with invalid token', async function () { - const response = await app.inject({ - method: 'POST', - url: '/api/v1/mcp', - headers: { - authorization: 'Bearer invalid-token' - }, - payload: { jsonrpc: '2.0', method: 'initialize', id: 1 } + describe('Transport', function () { + it('should return 405 for GET', async function () { + const response = await app.inject({ + method: 'GET', + url: '/api/v1/mcp', + headers: { + authorization: `Bearer ${TestObjects.alicePAT.token}` + } + }) + response.statusCode.should.equal(405) + }) + + it('should return 405 for DELETE', async function () { + const response = await app.inject({ + method: 'DELETE', + url: '/api/v1/mcp', + headers: { + authorization: `Bearer ${TestObjects.alicePAT.token}` + } + }) + response.statusCode.should.equal(405) }) - response.statusCode.should.equal(401) }) - it('should return 405 for GET /api/v1/mcp', async function () { - const response = await app.inject({ - method: 'GET', - url: '/api/v1/mcp', - headers: { - authorization: `Bearer ${TestObjects.alicePAT.token}` - } + describe('Initialize', function () { + it('should respond with server info and capabilities', async function () { + const response = await app.inject({ + method: 'POST', + url: '/api/v1/mcp', + headers: { + authorization: `Bearer ${TestObjects.alicePAT.token}`, + 'content-type': 'application/json', + accept: 'application/json, text/event-stream' + }, + payload: { + jsonrpc: '2.0', + method: 'initialize', + id: 1, + params: { + protocolVersion: '2025-03-26', + capabilities: {}, + clientInfo: { name: 'test-client', version: '1.0.0' } + } + } + }) + response.statusCode.should.equal(200) + const { result } = parseSSEResponse(response) + result.should.have.property('result') + result.result.should.have.property('serverInfo') + result.result.serverInfo.name.should.equal('FlowFuse Platform') + result.result.serverInfo.version.should.equal('1.0.0') + result.result.should.have.property('capabilities') + result.result.capabilities.should.have.property('tools') }) - response.statusCode.should.equal(405) }) - it('should return 405 for DELETE /api/v1/mcp', async function () { - const response = await app.inject({ - method: 'DELETE', - url: '/api/v1/mcp', - headers: { - authorization: `Bearer ${TestObjects.alicePAT.token}` - } + describe('Tool listing', function () { + it('should list registered tools with annotations', async function () { + const response = await app.inject({ + method: 'POST', + url: '/api/v1/mcp', + headers: { + authorization: `Bearer ${TestObjects.alicePAT.token}`, + 'content-type': 'application/json', + accept: 'application/json, text/event-stream' + }, + payload: [ + { jsonrpc: '2.0', method: 'notifications/initialized' }, + { jsonrpc: '2.0', method: 'tools/list', id: 2 } + ] + }) + response.statusCode.should.equal(200) + const parsed = parseSSEResponse(response) + const messages = parsed.messages || [parsed.result] + const toolsResponse = messages.find(m => m.id === 2) + toolsResponse.should.have.property('result') + toolsResponse.result.should.have.property('tools') + toolsResponse.result.tools.should.be.an.Array() + toolsResponse.result.tools.length.should.be.greaterThan(0) + + const listTeams = toolsResponse.result.tools.find(t => t.name === 'list-teams') + listTeams.should.be.an.Object() + listTeams.should.have.property('description') + listTeams.annotations.readOnlyHint.should.equal(true) + listTeams.annotations.destructiveHint.should.equal(false) + + const getTeam = toolsResponse.result.tools.find(t => t.name === 'get-team') + getTeam.should.be.an.Object() + getTeam.should.have.property('inputSchema') }) - response.statusCode.should.equal(405) }) - it('should not break existing registration routes', async function () { - const { token } = await app.instance.refreshAuthTokens() - const response = await app.inject({ - method: 'POST', - url: `/api/v1/teams/${app.team.hashid}/mcp/instance/${app.instance.id}/test-node`, - headers: { - authorization: `Bearer ${token}`, - 'content-type': 'application/json' - }, - payload: { - name: 'test-server', - protocol: 'http', - endpointRoute: '/mcp', - title: 'Test MCP', - version: '1.0.0', - description: 'test' - } + describe('Tool execution', function () { + it('list-teams should return teams for the authenticated user', async function () { + const response = await app.inject({ + method: 'POST', + url: '/api/v1/mcp', + headers: { + authorization: `Bearer ${TestObjects.alicePAT.token}`, + 'content-type': 'application/json', + accept: 'application/json, text/event-stream' + }, + payload: [ + { jsonrpc: '2.0', method: 'notifications/initialized' }, + { jsonrpc: '2.0', method: 'tools/call', id: 2, params: { name: 'list-teams', arguments: {} } } + ] + }) + const parsed = parseSSEResponse(response) + const messages = parsed.messages || [parsed.result] + const toolResult = messages.find(m => m.id === 2) + toolResult.should.have.property('result') + toolResult.result.should.have.property('content') + toolResult.result.content[0].type.should.equal('text') + + const data = JSON.parse(toolResult.result.content[0].text) + data.should.have.property('teams') + data.teams.should.be.an.Array() + data.teams.length.should.be.greaterThan(0) + data.teams[0].should.have.property('name', 'ATeam') + }) + + it('get-team should return team details by ID', async function () { + const teamId = app.team.hashid + const response = await app.inject({ + method: 'POST', + url: '/api/v1/mcp', + headers: { + authorization: `Bearer ${TestObjects.alicePAT.token}`, + 'content-type': 'application/json', + accept: 'application/json, text/event-stream' + }, + payload: [ + { jsonrpc: '2.0', method: 'notifications/initialized' }, + { jsonrpc: '2.0', method: 'tools/call', id: 2, params: { name: 'get-team', arguments: { teamId } } } + ] + }) + const parsed = parseSSEResponse(response) + const messages = parsed.messages || [parsed.result] + const toolResult = messages.find(m => m.id === 2) + toolResult.should.have.property('result') + + const data = JSON.parse(toolResult.result.content[0].text) + data.should.have.property('name', 'ATeam') }) - response.statusCode.should.equal(200) - - // Verify listing also works - await login(app) - const listResponse = await app.inject({ - method: 'GET', - url: `/api/v1/teams/${app.team.hashid}/mcp`, - cookies: { sid: TestObjects.aliceSid } + + it('get-team should return error for non-existent team', async function () { + const response = await app.inject({ + method: 'POST', + url: '/api/v1/mcp', + headers: { + authorization: `Bearer ${TestObjects.alicePAT.token}`, + 'content-type': 'application/json', + accept: 'application/json, text/event-stream' + }, + payload: [ + { jsonrpc: '2.0', method: 'notifications/initialized' }, + { jsonrpc: '2.0', method: 'tools/call', id: 2, params: { name: 'get-team', arguments: { teamId: 'nonexistent' } } } + ] + }) + const parsed = parseSSEResponse(response) + const messages = parsed.messages || [parsed.result] + const toolResult = messages.find(m => m.id === 2) + toolResult.should.have.property('result') + toolResult.result.isError.should.equal(true) }) - listResponse.statusCode.should.equal(200) - const body = listResponse.json() - body.should.have.property('servers') - body.servers.should.be.an.Array() }) - async function login (app) { - if (TestObjects.aliceSid) { - return - } - const response = await app.inject({ - method: 'POST', - url: '/account/login', - payload: { username: 'alice', password: 'aaPassword', remember: false } + describe('Stateless behavior', function () { + it('should not leak state between sequential requests', async function () { + // First request: initialize + const res1 = await app.inject({ + method: 'POST', + url: '/api/v1/mcp', + headers: { + authorization: `Bearer ${TestObjects.alicePAT.token}`, + 'content-type': 'application/json', + accept: 'application/json, text/event-stream' + }, + payload: { + jsonrpc: '2.0', + method: 'initialize', + id: 1, + params: { + protocolVersion: '2025-03-26', + capabilities: {}, + clientInfo: { name: 'test-client', version: '1.0.0' } + } + } + }) + res1.statusCode.should.equal(200) + + // Second request: independent initialize (no session carry-over) + const res2 = await app.inject({ + method: 'POST', + url: '/api/v1/mcp', + headers: { + authorization: `Bearer ${TestObjects.alicePAT.token}`, + 'content-type': 'application/json', + accept: 'application/json, text/event-stream' + }, + payload: { + jsonrpc: '2.0', + method: 'initialize', + id: 1, + params: { + protocolVersion: '2025-03-26', + capabilities: {}, + clientInfo: { name: 'test-client-2', version: '2.0.0' } + } + } + }) + res2.statusCode.should.equal(200) + + // Both should have succeeded independently + const parsed1 = parseSSEResponse(res1) + const parsed2 = parseSSEResponse(res2) + parsed1.result.result.serverInfo.name.should.equal('FlowFuse Platform') + parsed2.result.result.serverInfo.name.should.equal('FlowFuse Platform') + + // No Mcp-Session-Id header (stateless) + should(res1.headers['mcp-session-id']).be.undefined() + should(res2.headers['mcp-session-id']).be.undefined() }) - TestObjects.aliceSid = response.cookies[0].value - } + }) + + describe('Existing registration routes', function () { + it('should not break existing registration routes', async function () { + const { token } = await app.instance.refreshAuthTokens() + const response = await app.inject({ + method: 'POST', + url: `/api/v1/teams/${app.team.hashid}/mcp/instance/${app.instance.id}/test-node`, + headers: { + authorization: `Bearer ${token}`, + 'content-type': 'application/json' + }, + payload: { + name: 'test-server', + protocol: 'http', + endpointRoute: '/mcp', + title: 'Test MCP', + version: '1.0.0', + description: 'test' + } + }) + response.statusCode.should.equal(200) + + await login(app) + const listResponse = await app.inject({ + method: 'GET', + url: `/api/v1/teams/${app.team.hashid}/mcp`, + cookies: { sid: TestObjects.aliceSid } + }) + listResponse.statusCode.should.equal(200) + const body = listResponse.json() + body.should.have.property('servers') + body.servers.should.be.an.Array() + }) + + async function login (app) { + if (TestObjects.aliceSid) { + return + } + const response = await app.inject({ + method: 'POST', + url: '/account/login', + payload: { username: 'alice', password: 'aaPassword', remember: false } + }) + TestObjects.aliceSid = response.cookies[0].value + } + }) }) describe('Feature flag disabled', function () {