FastMCP:构建MCP服务器与客户端的完整指南

FastMCP:构建MCP服务器与客户端的完整指南

FastMCP 2.0 全方位教程:用 Python 的高效方式构建模型上下文协议 (MCP) 服务器与客户端。学习如何为大型语言模型 (LLM) 应用创建工具、资源和提示。

课 5 2025-07-03 07:00

高级模式与生产部署

高级模式与生产环境部署

随着您的 FastMCP 应用从简单的开发工具发展成为可投入生产的服务,您需要考虑高级模式、可扩展性、安全性以及部署策略。本节将涵盖企业级的 FastMCP 开发和部署。

高级服务器模式

服务的组合与模块化

FastMCP 2.0 支持复杂的服务器组合模式,从而实现模块化、易于维护的应用:

# auth_server.py
from fastmcp import FastMCP
import jwt
import bcrypt

auth_server = FastMCP("Authentication Service")

@auth_server.tool
def authenticate_user(username: str, password: str) -> dict:
    """Authentication Service: 认证用户并返回 JWT 令牌。"""
    # 用户认证的实现逻辑
    if verify_credentials(username, password):
        token = jwt.encode({"user": username}, "secret", algorithm="HS256")
        return {"token": token, "user": username, "authenticated": True}
    return {"authenticated": False, "error": "Invalid credentials"}

# data_server.py
from fastmcp import FastMCP
import pandas as pd

data_server = FastMCP("Data Processing Service")

@data_server.tool
def process_dataset(data_path: str, operation: str) -> dict:
    """Data Processing Service: 对数据集进行指定操作。"""
    df = pd.read_csv(data_path)

    operations = {
        "summary": lambda df: df.describe().to_dict(),
        "count": lambda df: {"rows": len(df), "columns": len(df.columns)},
        "columns": lambda df: df.columns.tolist()
    }

    if operation not in operations:
        raise ValueError(f"Unknown operation: {operation}")

    return operations[operation](df)

# main_server.py
from fastmcp import FastMCP
from auth_server import auth_server
from data_server import data_server
from datetime import datetime # 导入 datetime 模块

# 创建主服务器并组合子服务器
main_server = FastMCP(
    name="Enterprise Application",
    instructions="Comprehensive enterprise application with authentication and data processing"
)

# 挂载子服务器并设置前缀
main_server.mount(auth_server, prefix="auth")
main_server.mount(data_server, prefix="data")

# 添加主服务器工具
@main_server.tool
def health_check() -> dict:
    """检查所有服务的健康状态。"""
    return {
        "status": "healthy",
        "services": ["auth", "data", "main"],
        "timestamp": datetime.now().isoformat()
    }

if __name__ == "__main__":
    main_server.run(transport="http", host="0.0.0.0", port=8000)

用于API集成的代理服务器

创建代理服务器,将现有 API 桥接到 MCP 生态系统:

# api_proxy.py
from fastmcp import FastMCP, Client
import asyncio

# 为现有 MCP 服务器创建代理
async def create_api_proxy():
    # 连接到远程 MCP 服务器
    remote_client = Client("https://api.example.com/mcp/sse")

    # 创建代理服务器
    proxy_server = FastMCP.as_proxy(
        remote_client,
        name="API Proxy Server",
        instructions="Proxy for remote API services"
    )

    # 为代理添加自定义中间件或工具
    @proxy_server.tool
    def local_cache_status() -> dict:
        """获取本地缓存状态。"""
        return {"cache_enabled": True, "cache_size": "50MB"}

    return proxy_server

# 使用示例
if __name__ == "__main__":
    proxy = asyncio.run(create_api_proxy())
    proxy.run(transport="http", port=9000)

动态工具生成

从配置或 OpenAPI 规范中动态生成工具:

# dynamic_tools.py
from fastmcp import FastMCP
import yaml
import requests

mcp = FastMCP("Dynamic API Server")

def create_api_tool(endpoint_config):
    """根据API端点配置动态创建工具。"""
    def api_tool(**kwargs):
        response = requests.request(
            method=endpoint_config["method"],
            url=endpoint_config["url"],
            **kwargs
        )
        return response.json()

    # 设置函数元数据
    api_tool.__name__ = endpoint_config["name"]
    api_tool.__doc__ = endpoint_config["description"]

    return api_tool

# 加载 API 配置
with open("api_config.yaml", "r") as f:
    api_config = yaml.safe_load(f)

# 动态生成工具
for endpoint in api_config["endpoints"]:
    tool_func = create_api_tool(endpoint)
    mcp.tool(tool_func)

# 替代方案:从 OpenAPI 规范生成
def load_from_openapi(spec_url: str):
    """从 OpenAPI 规范加载工具。"""
    # FastMCP 内置 OpenAPI 集成
    api_server = FastMCP.from_openapi(spec_url)
    return api_server

# 使用示例
# openapi_server = load_from_openapi("https://api.example.com/openapi.json")
# mcp.mount(openapi_server, prefix="api")

生产就绪型认证

基于JWT的认证

# secure_server.py
from fastmcp import FastMCP
from fastmcp.auth import JWTAuthProvider
import os

# 配置 JWT 认证
auth_provider = JWTAuthProvider(
    secret_key=os.getenv("JWT_SECRET_KEY"),
    algorithm="HS256",
    token_header="Authorization"
)

# 创建认证服务器
mcp = FastMCP(
    "Secure Production Server",
    auth_provider=auth_provider
)

@mcp.tool(require_auth=True)
def get_sensitive_data(user_context) -> dict:
    """获取敏感数据,需要认证。"""
    return {
        "user_id": user_context.user_id,
        "data": "sensitive information",
        "access_level": user_context.permissions
    }

@mcp.tool  # 公共工具,无需认证
def get_public_info() -> dict:
    """获取公共信息。"""
    return {"status": "public", "version": "1.0.0"}

API 密钥认证

# api_key_server.py
from fastmcp import FastMCP
from fastmcp.auth import APIKeyAuthProvider

# 配置 API 密钥认证
auth_provider = APIKeyAuthProvider(
    api_keys={
        "key123": {"name": "Client A", "permissions": ["read", "write"]},
        "key456": {"name": "Client B", "permissions": ["read"]}
    },
    header_name="X-API-Key"
)

mcp = FastMCP("API Key Protected Server", auth_provider=auth_provider)

@mcp.tool(require_permissions=["write"])
def write_data(data: dict, auth_context) -> dict:
    """写入数据,需要写入权限。"""
    # 实现逻辑
    return {"written": True, "client": auth_context.client_name}

部署策略

Docker 部署

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用
COPY . .

# 创建非 root 用户
RUN useradd -m -u 1000 mcpuser && chown -R mcpuser:mcpuser /app
USER mcpuser

# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# 运行服务器
EXPOSE 8000
CMD ["python", "server.py"]
# docker-compose.yml
version: '3.8'

services:
  fastmcp-server:
    build: .
    ports:
      - "8000:8000"
    environment:
      - FASTMCP_LOG_LEVEL=INFO
      - JWT_SECRET_KEY=${JWT_SECRET_KEY}
      - DATABASE_URL=${DATABASE_URL}
    volumes:
      - ./logs:/app/logs
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    restart: unless-stopped

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - fastmcp-server
    restart: unless-stopped

volumes:
  redis_data:

Kubernetes 部署

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: fastmcp-server
  labels:
    app: fastmcp-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: fastmcp-server
  template:
    metadata:
      labels:
        app: fastmcp-server
    spec:
      containers:
      - name: fastmcp-server
        image: your-registry/fastmcp-server:latest
        ports:
        - containerPort: 8000
        env:
        - name: FASTMCP_LOG_LEVEL
          value: "INFO"
        - name: JWT_SECRET_KEY
          valueFrom:
            secretKeyRef:
              name: fastmcp-secrets
              key: jwt-secret
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: fastmcp-service
spec:
  selector:
    app: fastmcp-server
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer

---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: fastmcp-ingress
  annotations:
    kubernetes.io/ingress.class: nginx
    cert-manager.io/cluster-issuer: letsencrypt-prod
spec:
  tls:
  - hosts:
    - api.yourdomain.com
    secretName: fastmcp-tls
  rules:
  - host: api.yourdomain.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: fastmcp-service
            port:
              number: 80

监控和可观测性

日志记录和指标

# monitored_server.py
from fastmcp import FastMCP, Context
import logging
import time
from prometheus_client import Counter, Histogram, start_http_server

# 设置指标
TOOL_CALLS = Counter('mcp_tool_calls_total', 'Total tool calls', ['tool_name', 'status'])
TOOL_DURATION = Histogram('mcp_tool_duration_seconds', 'Tool execution time', ['tool_name'])

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('fastmcp.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

mcp = FastMCP("Monitored Server")

def monitor_tool(func):
    """为工具添加监控的装饰器。"""
    def wrapper(*args, **kwargs):
        start_time = time.time()
        tool_name = func.__name__

        try:
            logger.info(f"Starting tool: {tool_name}")
            result = func(*args, **kwargs)
            TOOL_CALLS.labels(tool_name=tool_name, status='success').inc()
            logger.info(f"Tool {tool_name} completed successfully")
            return result
        except Exception as e:
            TOOL_CALLS.labels(tool_name=tool_name, status='error').inc()
            logger.error(f"Tool {tool_name} failed: {str(e)}")
            raise
        finally:
            duration = time.time() - start_time
            TOOL_DURATION.labels(tool_name=tool_name).observe(duration)
            logger.info(f"Tool {tool_name} took {duration:.2f}s")

    return wrapper

@mcp.tool
@monitor_tool
def process_data(data: list) -> dict:
    """处理数据并进行监控。"""
    # 处理逻辑
    return {"processed": len(data), "status": "success"}

# 启动 Prometheus 指标服务器
start_http_server(9090)

if __name__ == "__main__":
    mcp.run(transport="http", port=8000)

健康检查和熔断器

# resilient_server.py
from fastmcp import FastMCP, Context
import asyncio
from circuit_breaker import CircuitBreaker
import aioredis
import aiohttp # 导入 aiohttp 模块
from datetime import datetime # 导入 datetime 模块

mcp = FastMCP("Resilient Server")

# 外部 API 调用的熔断器
api_circuit_breaker = CircuitBreaker(
    failure_threshold=5,
    recovery_timeout=30,
    expected_exception=Exception
)

@mcp.tool
async def call_external_api(endpoint: str, ctx: Context) -> dict:
    """调用受熔断器保护的外部 API。"""
    @api_circuit_breaker
    async def make_api_call():
        # 实际 API 调用实现
        async with aiohttp.ClientSession() as session:
            async with session.get(endpoint) as response:
                return await response.json()

    try:
        return await make_api_call()
    except Exception as e:
        await ctx.error(f"API call failed: {str(e)}")
        return {"error": "Service temporarily unavailable"}

@mcp.resource("health://status")
async def health_check() -> dict:
    """全面的健康检查。"""
    health_status = {
        "status": "healthy",
        "timestamp": datetime.now().isoformat(),
        "checks": {}
    }

    # 检查数据库连接
    try:
        # 数据库健康检查逻辑(此处省略具体实现)
        health_status["checks"]["database"] = "healthy"
    except Exception:
        health_status["checks"]["database"] = "unhealthy"
        health_status["status"] = "unhealthy"

    # 检查 Redis 连接
    try:
        redis = aioredis.from_url("redis://localhost")
        await redis.ping()
        health_status["checks"]["redis"] = "healthy"
    except Exception:
        health_status["checks"]["redis"] = "unhealthy"
        health_status["status"] = "degraded"

    return health_status

安全最佳实践

输入验证和净化

# secure_tools.py
from fastmcp import FastMCP
from pydantic import BaseModel, validator
import re
import html

mcp = FastMCP("Secure Server")

class UserInput(BaseModel):
    username: str
    email: str
    age: int

    @validator('username')
    def validate_username(cls, v):
        if not re.match(r'^[a-zA-Z0-9_]+$', v):
            raise ValueError('用户名必须只包含字母、数字和下划线')
        if len(v) < 3 or len(v) > 20:
            raise ValueError('用户名长度必须在 3 到 20 个字符之间')
        return v

    @validator('email')
    def validate_email(cls, v):
        email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        if not re.match(email_pattern, v):
            raise ValueError('邮箱格式无效')
        return v

    @validator('age')
    def validate_age(cls, v):
        if v < 0 or v > 150:
            raise ValueError('年龄必须在 0 到 150 之间')
        return v

def generate_user_id():
    """生成用户 ID 的占位函数"""
    return "user_" + str(hash("some_unique_string"))[1:10] # 示例实现,实际应使用更安全的生成方式

@mcp.tool
def create_user(user_data: UserInput) -> dict:
    """使用已验证的输入创建用户。"""
    # 输入由 Pydantic 自动验证
    return {
        "user_id": generate_user_id(),
        "username": user_data.username,
        "email": user_data.email,
        "status": "created"
    }

@mcp.tool
def sanitize_html_content(content: str) -> str:
    """净化 HTML 内容以防止 XSS。"""
    # 基础 HTML 净化
    sanitized = html.escape(content)
    return sanitized

速率限制和流量控制

# rate_limited_server.py
from fastmcp import FastMCP
from fastmcp.middleware import RateLimitMiddleware
import asyncio
import time # 导入 time 模块

# 配置速率限制
rate_limiter = RateLimitMiddleware(
    requests_per_minute=60,
    burst_size=10,
    storage_backend="redis",  # 或 "memory"
    redis_url="redis://localhost:6379"
)

mcp = FastMCP("Rate Limited Server", middleware=[rate_limiter])

@mcp.tool
def expensive_operation(data: str) -> dict:
    """一个应该被速率限制的耗时操作。"""
    # 模拟耗时处理
    time.sleep(2)
    return {"processed": True, "data_length": len(data)}

性能优化

异步操作和连接池

# optimized_server.py
from fastmcp import FastMCP, Context
import asyncio
import aiohttp
import asyncpg
from contextlib import asynccontextmanager

class DatabasePool:
    def __init__(self):
        self.pool = None

    async def create_pool(self):
        self.pool = await asyncpg.create_pool(
            "postgresql://user:pass@localhost/db",
            min_size=10,
            max_size=20
        )

    async def close_pool(self):
        if self.pool:
            await self.pool.close()

db_pool = DatabasePool()

@asynccontextmanager
async def lifespan():
    # 启动时
    await db_pool.create_pool()
    yield
    # 关闭时
    await db_pool.close_pool()

mcp = FastMCP("Optimized Server", lifespan=lifespan)

@mcp.tool
async def query_database(query: str) -> dict:
    """使用连接池执行数据库查询。"""
    async with db_pool.pool.acquire() as connection:
        result = await connection.fetch(query)
        return {"rows": [dict(row) for row in result]}

@mcp.tool
async def batch_http_requests(urls: list[str], ctx: Context) -> list[dict]:
    """同时进行多个 HTTP 请求。"""
    await ctx.info(f"正在发起 {len(urls)} 个并发请求...")

    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            task = session.get(url)
            tasks.append(task)

        responses = await asyncio.gather(*tasks, return_exceptions=True)

        results = []
        for i, response in enumerate(responses):
            if isinstance(response, Exception):
                results.append({"url": urls[i], "error": str(response)})
            else:
                results.append({
                    "url": urls[i],
                    "status": response.status,
                    "data": await response.text()
                })

        return results

总结

FastMCP 2.0 提供了一个全面的平台,用于构建可投入生产的 MCP 服务器,使其能够从简单的开发工具扩展到企业级应用。主要要点包括:

开发最佳实践

  • 采用模块化的服务器组合以实现可维护的代码
  • 实现全面的错误处理和验证
  • 从一开始就添加适当的日志记录和监控
  • 彻底测试您的工具和资源

生产就绪度

  • 实施适当的认证和授权机制
  • 使用基于环境的配置
  • 添加健康检查和熔断器
  • 监控性能和资源使用情况

部署策略

  • 将您的应用容器化以确保一致性
  • 使用 Kubernetes 等编排平台进行扩展
  • 实施适当的 CI/CD 管道
  • 规划高可用性和灾难恢复

安全考量

  • 验证并净化所有输入
  • 实施速率限制和流量控制
  • 使用 HTTPS 和正确的认证
  • 定期进行安全审计和更新

FastMCP 使构建复杂的 AI 驱动应用成为可能,这些应用能够与现代开发工作流程和生产基础设施无缝集成。无论您是构建开发工具、数据处理服务还是复杂的企业应用,FastMCP 都提供了成功所需的基础。

祝您使用 FastMCP 构建愉快!🚀