Skip to content

Federated MCP Implementation: Technical Guide

Working Code
Complete technical documentation with code examples and configurations

📋 System Architecture

Components

mermaid
graph TB
    A[Claude Desktop] --> B[Research Library MCP Server]
    B --> C[Federated MCP Client]
    C --> D[HTTP JSON-RPC 2.0]
    D --> E[Mnemoverse Docs MCP Server]
    E --> F[Documentation Files]
    
    B --> G[Local Knowledge Base]
    B --> H[Database Session]
    
    E --> I[VitePress Search Index]
    E --> J[File System]

Protocols and Ports

ServiceProtocolPortEndpoint
Research LibraryMCP stdio-Claude Desktop
Mnemoverse DocsHTTP + MCP3003/mcp, /api/search
VitePress DocsHTTP3000(development)

🚀 Mnemoverse Docs MCP Server

Main File: api-server.js

javascript
/**
 * Dual HTTP + MCP server for mnemoverse-docs
 * Supports both REST API and MCP JSON-RPC 2.0
 */
const express = require('express');
const cors = require('cors');
const fs = require('fs');
const path = require('path');

const app = express();
const PORT = 3003;

// Middleware
app.use(express.json());
app.use(cors({
  origin: ['http://localhost:3000', 'http://localhost:3001', 'http://localhost:3002'],
  allowedHeaders: ['Content-Type', 'mcp-session-id'],
  exposedHeaders: ['mcp-session-id']
}));

// === SEARCH FUNCTIONS ===

function searchDocuments(query, maxResults = 10) {
  const results = [];
  const docsPath = path.join(__dirname, 'docs');
  
  function searchInDirectory(dirPath, section = '') {
    const items = fs.readdirSync(dirPath);
    
    for (const item of items) {
      const itemPath = path.join(dirPath, item);
      const stat = fs.statSync(itemPath);
      
      if (stat.isDirectory() && !item.startsWith('.')) {
        const newSection = section ? `${section}/${item}` : item;
        searchInDirectory(itemPath, newSection);
      } else if (item.endsWith('.md')) {
        try {
          const content = fs.readFileSync(itemPath, 'utf-8');
          const title = extractTitle(content) || item.replace('.md', '');
          
          if (content.toLowerCase().includes(query.toLowerCase()) ||
              title.toLowerCase().includes(query.toLowerCase())) {
            
            const excerpt = generateExcerpt(content, query, 200);
            const score = calculateScore(title, content, query);
            
            results.push({
              title,
              path: section ? `${section}/${item}` : item,
              section: section.split('/')[0] || 'root',
              excerpt,
              score,
              url: `/docs/${section ? section + '/' : ''}${item.replace('.md', '')}`,
              lastModified: stat.mtime.toISOString()
            });
          }
        } catch (error) {
          console.error(`Error reading ${itemPath}:`, error.message);
        }
      }
      
      if (results.length >= maxResults) break;
    }
  }
  
  searchInDirectory(docsPath);
  return results.sort((a, b) => b.score - a.score).slice(0, maxResults);
}

function extractTitle(content) {
  const lines = content.split('\n');
  for (const line of lines) {
    if (line.startsWith('# ')) {
      return line.substring(2).trim();
    }
  }
  return null;
}

function generateExcerpt(content, query, maxLength) {
  const lowerContent = content.toLowerCase();
  const lowerQuery = query.toLowerCase();
  const index = lowerContent.indexOf(lowerQuery);
  
  if (index === -1) {
    return content.substring(0, maxLength) + '...';
  }
  
  const start = Math.max(0, index - 50);
  const end = Math.min(content.length, index + query.length + 50);
  const excerpt = content.substring(start, end);
  
  return (start > 0 ? '...' : '') + excerpt + (end < content.length ? '...' : '');
}

function calculateScore(title, content, query) {
  const lowerQuery = query.toLowerCase();
  let score = 0;
  
  // Title match gets high score
  if (title.toLowerCase().includes(lowerQuery)) {
    score += 10;
  }
  
  // Content matches
  const matches = (content.toLowerCase().match(new RegExp(lowerQuery, 'g')) || []).length;
  score += matches;
  
  return score;
}

function getDocumentByPath(docPath) {
  try {
    const docsPath = path.join(__dirname, 'docs');
    const fullPath = path.join(docsPath, docPath);
    
    // Security check
    if (!fullPath.startsWith(docsPath)) {
      throw new Error('Invalid path - outside docs directory');
    }
    
    const mdPath = fullPath.endsWith('.md') ? fullPath : fullPath + '.md';
    
    if (!fs.existsSync(mdPath)) {
      return null;
    }
    
    const content = fs.readFileSync(mdPath, 'utf-8');
    const title = extractTitle(content) || path.basename(docPath, '.md');
    const stat = fs.statSync(mdPath);
    
    return {
      title,
      content,
      lastModified: stat.mtime.toISOString(),
      path: docPath
    };
  } catch (error) {
    console.error(`Error reading document ${docPath}:`, error.message);
    return null;
  }
}

// === HTTP REST API ENDPOINTS ===

app.get('/health', (req, res) => {
  res.json({
    status: 'healthy',
    service: 'mnemoverse-docs-api',
    timestamp: new Date().toISOString(),
    version: '1.0.0'
  });
});

app.post('/api/search', (req, res) => {
  try {
    const { query, maxResults = 10 } = req.body;
    
    if (!query) {
      return res.status(400).json({ error: 'Query parameter is required' });
    }
    
    const results = searchDocuments(query, maxResults);
    
    res.json({
      query,
      results,
      total: results.length,
      searchEngine: 'simple-text-search'
    });
  } catch (error) {
    console.error('Search error:', error);
    res.status(500).json({ error: 'Internal server error', message: error.message });
  }
});

// === MCP PROTOCOL ENDPOINTS ===

// MCP Discovery
app.get('/.well-known/mcp', (req, res) => {
  res.json({
    mcpVersion: "2024-11-05",
    protocols: ["sse", "stdio"],
    capabilities: {
      tools: true,
      resources: false,
      prompts: false,
      sampling: false
    },
    info: {
      name: "mnemoverse-docs-mcp",
      version: "1.0.0",
      description: "MCP Server for Mnemoverse Documentation Search"
    },
    endpoints: {
      sse: `http://localhost:${PORT}/mcp/sse`,
      http: `http://localhost:${PORT}/mcp`
    }
  });
});

// MCP JSON-RPC Endpoint
app.post('/mcp', async (req, res) => {
  try {
    const { jsonrpc, method, params, id } = req.body;
    
    if (jsonrpc !== '2.0') {
      return res.status(400).json({
        jsonrpc: '2.0',
        error: { code: -32600, message: 'Invalid Request' },
        id
      });
    }
    
    switch (method) {
      case 'tools/list':
        res.json({
          jsonrpc: '2.0',
          id,
          result: {
            tools: [
              {
                name: 'search_docs',
                description: 'Search through Mnemoverse documentation',
                inputSchema: {
                  type: 'object',
                  properties: {
                    query: { type: 'string', description: 'Search query' },
                    maxResults: { type: 'number', description: 'Maximum results', default: 10 }
                  },
                  required: ['query']
                }
              },
              {
                name: 'get_document',
                description: 'Get full content of a specific document by path',
                inputSchema: {
                  type: 'object',
                  properties: {
                    path: { type: 'string', description: 'Document path' }
                  },
                  required: ['path']
                }
              }
            ]
          }
        });
        break;
        
      case 'tools/call':
        await handleMcpToolCall(req, res, params, id);
        break;
        
      default:
        res.status(400).json({
          jsonrpc: '2.0',
          error: { code: -32601, message: `Method not found: ${method}` },
          id
        });
    }
  } catch (error) {
    console.error('MCP error:', error);
    res.status(500).json({
      jsonrpc: '2.0',
      error: { code: -32603, message: 'Internal error' },
      id: req.body.id
    });
  }
});

async function handleMcpToolCall(req, res, params, id) {
  const { name, arguments: args } = params;
  
  try {
    switch (name) {
      case 'search_docs':
        const { query, maxResults = 10 } = args;
        
        if (!query) {
          return res.status(400).json({
            jsonrpc: '2.0',
            error: { code: -32602, message: 'Query parameter is required' },
            id
          });
        }
        
        const searchResults = searchDocuments(query, maxResults);
        
        res.json({
          jsonrpc: '2.0',
          id,
          result: {
            content: [
              {
                type: 'text',
                text: JSON.stringify({
                  query,
                  results: searchResults,
                  total: searchResults.length,
                  searchEngine: 'mnemoverse-docs-mcp'
                }, null, 2)
              }
            ]
          }
        });
        break;
        
      case 'get_document':
        const { path } = args;
        
        if (!path) {
          return res.status(400).json({
            jsonrpc: '2.0',
            error: { code: -32602, message: 'Path parameter is required' },
            id
          });
        }
        
        const documentContent = getDocumentByPath(path);
        
        if (!documentContent) {
          return res.status(404).json({
            jsonrpc: '2.0',
            error: { code: -32603, message: `Document not found: ${path}` },
            id
          });
        }
        
        res.json({
          jsonrpc: '2.0',
          id,
          result: {
            content: [
              {
                type: 'text',
                text: JSON.stringify({
                  path,
                  title: documentContent.title,
                  content: documentContent.content,
                  lastModified: documentContent.lastModified
                }, null, 2)
              }
            ]
          }
        });
        break;
        
      default:
        res.status(400).json({
          jsonrpc: '2.0',
          error: { code: -32601, message: `Unknown tool: ${name}` },
          id
        });
    }
  } catch (error) {
    console.error('Tool call error:', error);
    res.status(500).json({
      jsonrpc: '2.0',
      error: { code: -32603, message: `Tool execution failed: ${error.message}` },
      id
    });
  }
}

// Start server
app.listen(PORT, 'localhost', () => {
  console.log(`✅ Mnemoverse Docs API Server running on http://localhost:${PORT}`);
  console.log(`🔍 Search endpoint: POST/GET /api/search`);
  console.log(`🤖 MCP endpoint: POST /mcp`);
  console.log(`🔌 MCP discovery: GET /.well-known/mcp`);
  console.log(`❤️  Health check: GET /health`);
  console.log(`📚 Ready for federated MCP calls!`);
});

📱 Research Library Federated MCP Client

Main File: federated_mcp_client.py

python
"""
Federated MCP functions for Research Library
Client for MCP calls to mnemoverse-docs server
"""

import asyncio
import json
import logging

try:
    import aiohttp
except ImportError:
    aiohttp = None
    logging.warning("aiohttp not installed, docs API search will be disabled")

logger = logging.getLogger(__name__)


async def call_docs_mcp_server(method: str, params: dict) -> dict:
    """
    Low-level call to mnemoverse-docs MCP server via HTTP JSON-RPC
    """
    if aiohttp is None:
        raise Exception("aiohttp not available for MCP calls")
    
    try:
        async with aiohttp.ClientSession() as session:
            mcp_url = "http://localhost:3003/mcp"
            
            payload = {
                "jsonrpc": "2.0",
                "method": method,
                "params": params,
                "id": f"federated_{method}_{asyncio.get_event_loop().time()}"
            }
            
            async with session.post(mcp_url, json=payload) as response:
                if response.status == 200:
                    data = await response.json()
                    
                    if "error" in data:
                        raise Exception(f"MCP Error: {data['error']}")
                    
                    return data.get("result", {})
                else:
                    response_text = await response.text()
                    raise Exception(f"HTTP Error: {response.status} - {response_text}")
                    
    except Exception as e:
        logger.error(f"MCP call failed: {e}")
        raise


async def search_vitepress_docs_mcp(query: str, max_results: int = 5) -> dict:
    """
    Federated MCP call for documentation search
    
    Args:
        query: Search query
        max_results: Maximum number of results
        
    Returns:
        dict: Search results in format {
            "query": str,
            "results": List[dict],
            "total": int,
            "searchEngine": str
        }
    """
    try:
        result = await call_docs_mcp_server("tools/call", {
            "name": "search_docs",
            "arguments": {
                "query": query,
                "maxResults": max_results
            }
        })
        
        # Extract data from MCP response
        if "content" in result and len(result["content"]) > 0:
            text_content = result["content"][0].get("text", "{}")
            try:
                return json.loads(text_content)
            except json.JSONDecodeError:
                return {"error": "Invalid JSON response", "raw": text_content}
        
        return {"error": "No content in response", "result": result}
        
    except Exception as e:
        logger.error(f"MCP federated search failed: {e}")
        return {"error": str(e)}


async def get_vitepress_document_mcp(path: str) -> dict:
    """
    Federated MCP call for document retrieval
    
    Args:
        path: Document path (e.g., "research/mcp-x-protocol-spec.md")
        
    Returns:
        dict: Document content in format {
            "path": str,
            "title": str,
            "content": str,
            "lastModified": str
        }
    """
    try:
        result = await call_docs_mcp_server("tools/call", {
            "name": "get_document", 
            "arguments": {
                "path": path
            }
        })
        
        # Extract data from MCP response
        if "content" in result and len(result["content"]) > 0:
            text_content = result["content"][0].get("text", "{}")
            try:
                return json.loads(text_content)
            except json.JSONDecodeError:
                return {"error": "Invalid JSON response", "raw": text_content}
        
        return {"error": "No content in response", "result": result}
        
    except Exception as e:
        logger.error(f"MCP federated document get failed: {e}")
        return {"error": str(e)}


class FederatedMCPClient:
    """
    High-level client for federated MCP calls
    Provides simple interface for working with mnemoverse-docs
    """
    
    def __init__(self, base_url: str = "http://localhost:3003"):
        self.base_url = base_url
        self.mcp_endpoint = f"{base_url}/mcp"
    
    async def search_docs(self, query: str, max_results: int = 5) -> dict:
        """
        Search documentation
        
        Usage:
            client = FederatedMCPClient()
            results = await client.search_docs("MCP protocol")
            
            for doc in results.get('results', []):
                print(f"- {doc['title']} (score: {doc['score']})")
        """
        return await search_vitepress_docs_mcp(query, max_results)
    
    async def get_document(self, path: str) -> dict:
        """
        Get full document content
        
        Usage:
            client = FederatedMCPClient()
            doc = await client.get_document("about.md")
            
            print(f"Title: {doc['title']}")
            print(f"Content length: {len(doc['content'])}")
        """
        return await get_vitepress_document_mcp(path)
    
    async def health_check(self) -> bool:
        """
        Check MCP server availability
        """
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(f"{self.base_url}/health") as response:
                    return response.status == 200
        except Exception:
            return False
    
    async def list_tools(self) -> dict:
        """
        Get list of available MCP tools
        """
        try:
            result = await call_docs_mcp_server("tools/list", {})
            return result
        except Exception as e:
            logger.error(f"Failed to list tools: {e}")
            return {"error": str(e)}
    
    async def discover_server(self) -> dict:
        """
        MCP server discovery via .well-known endpoint
        """
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(f"{self.base_url}/.well-known/mcp") as response:
                    if response.status == 200:
                        return await response.json()
                    else:
                        return {"error": f"HTTP {response.status}"}
        except Exception as e:
            logger.error(f"Failed to discover server: {e}")
            return {"error": str(e)}


# Convenience functions for direct use
async def quick_search(query: str, max_results: int = 3) -> list:
    """
    Quick search with minimal code
    
    Usage:
        results = await quick_search("MCP protocol")
        for result in results:
            print(f"📄 {result['title']}")
    """
    client = FederatedMCPClient()
    search_result = await client.search_docs(query, max_results)
    return search_result.get('results', [])


async def quick_get_doc(path: str) -> str:
    """
    Quick document content retrieval
    
    Usage:
        content = await quick_get_doc("about.md")
        print(content[:200] + "...")
    """
    client = FederatedMCPClient()
    doc_result = await client.get_document(path)
    return doc_result.get('content', '')


# Compatibility functions for existing code
async def _search_vitepress_docs_mcp(query: str, max_results: int = 5) -> dict:
    """Compatibility with research_library_server.py"""
    return await search_vitepress_docs_mcp(query, max_results)


async def _get_vitepress_document(path: str) -> dict:
    """Compatibility with research_library_server.py"""
    return await get_vitepress_document_mcp(path)

🧪 Comprehensive Tests

File: test_comprehensive_federated_mcp.py

python
#!/usr/bin/env python3
"""
Comprehensive test of federated MCP calls
Tests all aspects of research-library ↔ mnemoverse-docs integration
"""

import asyncio
import json
import sys
import time

sys.path.insert(0, 'src')

from mcp_servers.federated_mcp_client import FederatedMCPClient


async def test_mcp_discovery():
    """Test MCP discovery and autodetection"""
    print("\n🔍 TEST: MCP Server Discovery")
    print("-" * 40)
    
    client = FederatedMCPClient()
    
    # Health check
    is_healthy = await client.health_check()
    print(f"✅ Health check: {'PASSED' if is_healthy else 'FAILED'}")
    
    # Server discovery
    discovery = await client.discover_server()
    if "error" not in discovery:
        print(f"✅ MCP Discovery: {discovery['info']['name']} v{discovery['info']['version']}")
        print(f"   Capabilities: {list(discovery['capabilities'].keys())}")
    else:
        print(f"❌ Discovery failed: {discovery['error']}")
    
    # Tools list
    tools = await client.list_tools()
    if "error" not in tools:
        tool_names = [tool['name'] for tool in tools.get('tools', [])]
        print(f"✅ Available tools: {tool_names}")
    else:
        print(f"❌ Tools list failed: {tools['error']}")


async def test_search_functionality():
    """Test search functionality"""
    print("\n📝 TEST: Search Functionality")
    print("-" * 40)
    
    client = FederatedMCPClient()
    
    test_queries = [
        ("MCP", "should find MCP-related documents"),
        ("API", "should find API documentation"),
        ("architecture", "should find architectural documents"),
        ("nonexistent_term_12345", "should not find anything")
    ]
    
    for query, expectation in test_queries:
        print(f"\n🔍 Search: '{query}' ({expectation})")
        
        start_time = time.time()
        result = await client.search_docs(query, max_results=3)
        end_time = time.time()
        
        if "error" in result:
            print(f"❌ Error: {result['error']}")
            continue
        
        results = result.get('results', [])
        print(f"✅ Found {len(results)} results in {end_time - start_time:.3f}s")
        
        for i, doc in enumerate(results[:2], 1):
            print(f"  {i}. {doc.get('title', 'No title')} (score: {doc.get('score', 0)})")


async def test_document_retrieval():
    """Test document retrieval"""
    print("\n📄 TEST: Document Retrieval")
    print("-" * 40)
    
    client = FederatedMCPClient()
    
    test_docs = [
        "about.md",
        "guides/getting-started.md",
        "research/mcp-x-protocol-spec.md",
        "nonexistent.md"
    ]
    
    for doc_path in test_docs:
        print(f"\n📄 Retrieving: '{doc_path}'")
        
        start_time = time.time()
        result = await client.get_document(doc_path)
        end_time = time.time()
        
        if "error" in result:
            print(f"❌ Error: {result['error']}")
            continue
        
        print(f"✅ Document retrieved in {end_time - start_time:.3f}s")
        print(f"   Title: {result.get('title', 'No title')}")
        print(f"   Size: {len(result.get('content', ''))} characters")
        print(f"   Modified: {result.get('lastModified', 'Unknown')}")


async def test_performance():
    """Test performance"""
    print("\n⚡ TEST: Performance")
    print("-" * 40)
    
    client = FederatedMCPClient()
    
    # Sequential requests
    queries = ["MCP", "API", "design", "implementation"]
    
    print(f"🏃‍♂️ Sequential requests ({len(queries)} items)")
    start_time = time.time()
    
    for query in queries:
        await client.search_docs(query, max_results=2)
    
    sequential_time = time.time() - start_time
    print(f"⏱️  Time: {sequential_time:.3f}s")
    
    # Parallel requests
    print(f"🚀 Parallel requests ({len(queries)} items)")
    start_time = time.time()
    
    tasks = [client.search_docs(query, max_results=2) for query in queries]
    await asyncio.gather(*tasks)
    
    parallel_time = time.time() - start_time
    print(f"⏱️  Time: {parallel_time:.3f}s")
    print(f"📈 Speedup: {sequential_time / parallel_time:.1f}x")


async def test_error_handling():
    """Test error handling"""
    print("\n⚠️  TEST: Error Handling")
    print("-" * 40)
    
    client = FederatedMCPClient()
    
    # Empty query
    print("🔍 Empty query")
    result = await client.search_docs("", max_results=1)
    if "error" in result:
        print("✅ Correctly handled empty query error")
    else:
        print("❌ Should have been error for empty query")
    
    # Invalid document path
    print("\n📄 Non-existent document")
    result = await client.get_document("totally/fake/path.md")
    if "error" in result:
        print("✅ Correctly handled non-existent document error")
    else:
        print("❌ Should have been error for non-existent document")


async def main():
    """Main testing function"""
    print("🚀 COMPREHENSIVE FEDERATED MCP CALLS TEST")
    print("=" * 60)
    print("🔗 Testing research-library ↔ mnemoverse-docs integration")
    
    try:
        await test_mcp_discovery()
        await test_search_functionality()
        await test_document_retrieval()
        await test_performance()
        await test_error_handling()
        
        print("\n" + "=" * 60)
        print("🎯 TEST RESULTS")
        print("=" * 60)
        
        print("✅ MCP Discovery and autodetection working")
        print("✅ Documentation search functioning correctly")
        print("✅ Document retrieval working stably")
        print("✅ Performance meeting requirements")
        print("✅ Error handling implemented correctly")
        
        print("\n🏗️ Federated architecture ready for production!")
        print("🔗 Can integrate into research_library_server.py")
        
    except Exception as e:
        print(f"\n❌ CRITICAL TESTING ERROR: {e}")
        raise


if __name__ == "__main__":
    asyncio.run(main())

🔧 Integration into research_library_server.py

Update Search with Federated Calls

python
# In research_library_server.py, function _handle_tool_call

elif name == "search":
    # 🆕 Federated search with fallback
    try:
        # First try federated MCP search
        docs_results_raw = await _search_vitepress_docs_mcp(arguments["query"], limit=5)
        
        # Convert MCP results to required format
        docs_results = []
        if "results" in docs_results_raw:
            for result in docs_results_raw["results"]:
                docs_results.append({
                    "title": result["title"],
                    "content": result["excerpt"],
                    "source": "docs-mcp",
                    "relevance": min(result["score"] / 10.0, 1.0),
                    "url": result["url"],
                    "section": result["section"],
                    "path": result["path"]
                })
        
        logger.info(f"Found {len(docs_results)} results via federated MCP")
        
    except Exception as mcp_error:
        logger.warning(f"Federated MCP search failed: {mcp_error}, falling back to old method")
        # Fallback to old method
        docs_results = await _search_vitepress_docs(arguments["query"], limit=5)
    
    # Search in local database (as before)
    async for db_session in get_async_session():
        query_service = AgentQueryService(db_session)
        db_results = await query_service.semantic_search(
            query=arguments["query"],
            limit=5
        )
        
        # Combine results
        all_results = docs_results + [
            {
                "title": r["title"],
                "content": r.get("content", ""),
                "source": "database",
                "relevance": r.get("relevance_score", 0.5),
                "url": f"/source/{r['source_id']}"
            } for r in db_results
        ]
        
        return {
            "query": arguments["query"],
            "results": all_results[:10],
            "sources": {
                "docs": len(docs_results),
                "database": len(db_results),
                "method": "federated_mcp" if docs_results else "fallback"
            },
            "type": "search"
        }

Add New get_document Tool

python
# Add to list_tools()
types.Tool(
    name="get_document",
    description="Get full document text by path",
    inputSchema={
        "type": "object",
        "properties": {
            "path": {
                "type": "string",
                "description": "Document path (e.g., 'research/mcp-x-protocol-spec.md')"
            }
        },
        "required": ["path"]
    }
)

# Add to _handle_tool_call
elif name == "get_document":
    try:
        doc_result = await _get_vitepress_document(arguments["path"])
        
        if "error" in doc_result:
            return {
                "error": doc_result["error"],
                "path": arguments["path"],
                "type": "document_error"
            }
        
        return {
            "path": arguments["path"],
            "title": doc_result.get("title", "Unknown"),
            "content": doc_result.get("content", ""),
            "lastModified": doc_result.get("lastModified", ""),
            "size": len(doc_result.get("content", "")),
            "type": "document"
        }
        
    except Exception as e:
        logger.error(f"Failed to get document {arguments['path']}: {e}")
        return {
            "error": f"Failed to retrieve document: {str(e)}",
            "path": arguments["path"],
            "type": "document_error"
        }

📊 Monitoring and Logging

Structured Logging for MCP Calls

python
# In federated_mcp_client.py
import structlog

logger = structlog.get_logger(__name__)

async def call_docs_mcp_server(method: str, params: dict) -> dict:
    request_id = f"mcp_{int(time.time() * 1000)}"
    
    logger.info(
        "mcp_request_start",
        request_id=request_id,
        method=method,
        params=params
    )
    
    start_time = time.time()
    
    try:
        # ... existing code ...
        
        duration = time.time() - start_time
        logger.info(
            "mcp_request_success",
            request_id=request_id,
            method=method,
            duration=duration,
            response_size=len(str(result))
        )
        
        return result
        
    except Exception as e:
        duration = time.time() - start_time
        logger.error(
            "mcp_request_failed",
            request_id=request_id,
            method=method,
            duration=duration,
            error=str(e)
        )
        raise

Performance Metrics

python
# metrics.py
from prometheus_client import Counter, Histogram, Gauge

# MCP metrics
mcp_requests_total = Counter('mcp_requests_total', 'Total MCP requests', ['method', 'status'])
mcp_request_duration = Histogram('mcp_request_duration_seconds', 'MCP request duration')
mcp_active_connections = Gauge('mcp_active_connections', 'Active MCP connections')

def record_mcp_request(method: str, status: str, duration: float):
    mcp_requests_total.labels(method=method, status=status).inc()
    mcp_request_duration.observe(duration)

Health Checks for Federated Connections

python
async def check_federated_health():
    """Check health of all federated connections"""
    
    health_status = {
        "timestamp": datetime.utcnow().isoformat(),
        "services": {}
    }
    
    # Check mnemoverse-docs
    try:
        client = FederatedMCPClient()
        is_healthy = await client.health_check()
        
        if is_healthy:
            # Additional MCP functionality check
            tools = await client.list_tools()
            mcp_healthy = "error" not in tools
        else:
            mcp_healthy = False
        
        health_status["services"]["mnemoverse_docs"] = {
            "status": "healthy" if is_healthy and mcp_healthy else "unhealthy",
            "http_health": is_healthy,
            "mcp_health": mcp_healthy,
            "url": client.base_url
        }
        
    except Exception as e:
        health_status["services"]["mnemoverse_docs"] = {
            "status": "error",
            "error": str(e)
        }
    
    # Overall status
    all_healthy = all(
        service["status"] == "healthy" 
        for service in health_status["services"].values()
    )
    
    health_status["overall"] = "healthy" if all_healthy else "degraded"
    
    return health_status

🚢 Deployment Configurations

Docker Compose for Complete System

yaml
# docker-compose.yml
version: '3.8'

services:
  # VitePress documentation (development)
  vitepress-docs:
    build:
      context: ./mnemoverse-docs
      dockerfile: Dockerfile.vitepress
    ports:
      - "3000:3000"
    volumes:
      - ./mnemoverse-docs/docs:/app/docs
    environment:
      - NODE_ENV=development

  # MCP server for documentation
  docs-mcp-server:
    build:
      context: ./mnemoverse-docs
      dockerfile: Dockerfile.mcp
    ports:
      - "3003:3003"
    volumes:
      - ./mnemoverse-docs/docs:/app/docs:ro
    environment:
      - NODE_ENV=production
      - MCP_DOCS_PATH=/app/docs
    depends_on:
      - vitepress-docs
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:3003/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  # Research Library MCP server
  research-library:
    build:
      context: ./mnemoverse-research-library
      dockerfile: Dockerfile
    environment:
      - DOCS_MCP_URL=http://docs-mcp-server:3003
      - DATABASE_URL=postgresql://user:pass@postgres:5432/research_db
    depends_on:
      - docs-mcp-server
      - postgres
    healthcheck:
      test: ["CMD", "python", "-c", "import asyncio; from src.health import check_federated_health; print(asyncio.run(check_federated_health()))"]
      interval: 60s
      timeout: 30s
      retries: 3

  # Database
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: research_db
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

volumes:
  postgres_data:

networks:
  default:
    name: mnemoverse_network

Kubernetes Manifests

yaml
# k8s-docs-mcp-server.yml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: docs-mcp-server
  labels:
    app: docs-mcp-server
spec:
  replicas: 2
  selector:
    matchLabels:
      app: docs-mcp-server
  template:
    metadata:
      labels:
        app: docs-mcp-server
    spec:
      containers:
      - name: docs-mcp-server
        image: mnemoverse/docs-mcp-server:1.0.0
        ports:
        - containerPort: 3003
        env:
        - name: NODE_ENV
          value: "production"
        - name: MCP_DOCS_PATH
          value: "/app/docs"
        volumeMounts:
        - name: docs-volume
          mountPath: /app/docs
          readOnly: true
        livenessProbe:
          httpGet:
            path: /health
            port: 3003
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /.well-known/mcp
            port: 3003
          initialDelaySeconds: 5
          periodSeconds: 5
        resources:
          requests:
            memory: "128Mi"
            cpu: "100m"
          limits:
            memory: "256Mi"
            cpu: "500m"
      volumes:
      - name: docs-volume
        configMap:
          name: docs-content

---
apiVersion: v1
kind: Service
metadata:
  name: docs-mcp-service
spec:
  selector:
    app: docs-mcp-server
  ports:
  - port: 3003
    targetPort: 3003
  type: ClusterIP

🔗 Next step: Production Integration