课 5
2025-07-03 07:00
高级模式与生产部署
高级模式与生产环境部署
随着您的 FastMCP 应用从简单的开发工具发展成为可投入生产的服务,您需要考虑高级模式、可扩展性、安全性以及部署策略。本节将涵盖企业级的 FastMCP 开发和部署。
高级服务器模式
服务的组合与模块化
FastMCP 2.0 支持复杂的服务器组合模式,从而实现模块化、易于维护的应用:
# auth_server.py
from fastmcp import FastMCP
import jwt
import bcrypt
auth_server = FastMCP("Authentication Service")
@auth_server.tool
def authenticate_user(username: str, password: str) -> dict:
"""Authentication Service: 认证用户并返回 JWT 令牌。"""
# 用户认证的实现逻辑
if verify_credentials(username, password):
token = jwt.encode({"user": username}, "secret", algorithm="HS256")
return {"token": token, "user": username, "authenticated": True}
return {"authenticated": False, "error": "Invalid credentials"}
# data_server.py
from fastmcp import FastMCP
import pandas as pd
data_server = FastMCP("Data Processing Service")
@data_server.tool
def process_dataset(data_path: str, operation: str) -> dict:
"""Data Processing Service: 对数据集进行指定操作。"""
df = pd.read_csv(data_path)
operations = {
"summary": lambda df: df.describe().to_dict(),
"count": lambda df: {"rows": len(df), "columns": len(df.columns)},
"columns": lambda df: df.columns.tolist()
}
if operation not in operations:
raise ValueError(f"Unknown operation: {operation}")
return operations[operation](df)
# main_server.py
from fastmcp import FastMCP
from auth_server import auth_server
from data_server import data_server
from datetime import datetime # 导入 datetime 模块
# 创建主服务器并组合子服务器
main_server = FastMCP(
name="Enterprise Application",
instructions="Comprehensive enterprise application with authentication and data processing"
)
# 挂载子服务器并设置前缀
main_server.mount(auth_server, prefix="auth")
main_server.mount(data_server, prefix="data")
# 添加主服务器工具
@main_server.tool
def health_check() -> dict:
"""检查所有服务的健康状态。"""
return {
"status": "healthy",
"services": ["auth", "data", "main"],
"timestamp": datetime.now().isoformat()
}
if __name__ == "__main__":
main_server.run(transport="http", host="0.0.0.0", port=8000)
用于API集成的代理服务器
创建代理服务器,将现有 API 桥接到 MCP 生态系统:
# api_proxy.py
from fastmcp import FastMCP, Client
import asyncio
# 为现有 MCP 服务器创建代理
async def create_api_proxy():
# 连接到远程 MCP 服务器
remote_client = Client("https://api.example.com/mcp/sse")
# 创建代理服务器
proxy_server = FastMCP.as_proxy(
remote_client,
name="API Proxy Server",
instructions="Proxy for remote API services"
)
# 为代理添加自定义中间件或工具
@proxy_server.tool
def local_cache_status() -> dict:
"""获取本地缓存状态。"""
return {"cache_enabled": True, "cache_size": "50MB"}
return proxy_server
# 使用示例
if __name__ == "__main__":
proxy = asyncio.run(create_api_proxy())
proxy.run(transport="http", port=9000)
动态工具生成
从配置或 OpenAPI 规范中动态生成工具:
# dynamic_tools.py
from fastmcp import FastMCP
import yaml
import requests
mcp = FastMCP("Dynamic API Server")
def create_api_tool(endpoint_config):
"""根据API端点配置动态创建工具。"""
def api_tool(**kwargs):
response = requests.request(
method=endpoint_config["method"],
url=endpoint_config["url"],
**kwargs
)
return response.json()
# 设置函数元数据
api_tool.__name__ = endpoint_config["name"]
api_tool.__doc__ = endpoint_config["description"]
return api_tool
# 加载 API 配置
with open("api_config.yaml", "r") as f:
api_config = yaml.safe_load(f)
# 动态生成工具
for endpoint in api_config["endpoints"]:
tool_func = create_api_tool(endpoint)
mcp.tool(tool_func)
# 替代方案:从 OpenAPI 规范生成
def load_from_openapi(spec_url: str):
"""从 OpenAPI 规范加载工具。"""
# FastMCP 内置 OpenAPI 集成
api_server = FastMCP.from_openapi(spec_url)
return api_server
# 使用示例
# openapi_server = load_from_openapi("https://api.example.com/openapi.json")
# mcp.mount(openapi_server, prefix="api")
生产就绪型认证
基于JWT的认证
# secure_server.py
from fastmcp import FastMCP
from fastmcp.auth import JWTAuthProvider
import os
# 配置 JWT 认证
auth_provider = JWTAuthProvider(
secret_key=os.getenv("JWT_SECRET_KEY"),
algorithm="HS256",
token_header="Authorization"
)
# 创建认证服务器
mcp = FastMCP(
"Secure Production Server",
auth_provider=auth_provider
)
@mcp.tool(require_auth=True)
def get_sensitive_data(user_context) -> dict:
"""获取敏感数据,需要认证。"""
return {
"user_id": user_context.user_id,
"data": "sensitive information",
"access_level": user_context.permissions
}
@mcp.tool # 公共工具,无需认证
def get_public_info() -> dict:
"""获取公共信息。"""
return {"status": "public", "version": "1.0.0"}
API 密钥认证
# api_key_server.py
from fastmcp import FastMCP
from fastmcp.auth import APIKeyAuthProvider
# 配置 API 密钥认证
auth_provider = APIKeyAuthProvider(
api_keys={
"key123": {"name": "Client A", "permissions": ["read", "write"]},
"key456": {"name": "Client B", "permissions": ["read"]}
},
header_name="X-API-Key"
)
mcp = FastMCP("API Key Protected Server", auth_provider=auth_provider)
@mcp.tool(require_permissions=["write"])
def write_data(data: dict, auth_context) -> dict:
"""写入数据,需要写入权限。"""
# 实现逻辑
return {"written": True, "client": auth_context.client_name}
部署策略
Docker 部署
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用
COPY . .
# 创建非 root 用户
RUN useradd -m -u 1000 mcpuser && chown -R mcpuser:mcpuser /app
USER mcpuser
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# 运行服务器
EXPOSE 8000
CMD ["python", "server.py"]
# docker-compose.yml
version: '3.8'
services:
fastmcp-server:
build: .
ports:
- "8000:8000"
environment:
- FASTMCP_LOG_LEVEL=INFO
- JWT_SECRET_KEY=${JWT_SECRET_KEY}
- DATABASE_URL=${DATABASE_URL}
volumes:
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- fastmcp-server
restart: unless-stopped
volumes:
redis_data:
Kubernetes 部署
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: fastmcp-server
labels:
app: fastmcp-server
spec:
replicas: 3
selector:
matchLabels:
app: fastmcp-server
template:
metadata:
labels:
app: fastmcp-server
spec:
containers:
- name: fastmcp-server
image: your-registry/fastmcp-server:latest
ports:
- containerPort: 8000
env:
- name: FASTMCP_LOG_LEVEL
value: "INFO"
- name: JWT_SECRET_KEY
valueFrom:
secretKeyRef:
name: fastmcp-secrets
key: jwt-secret
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: fastmcp-service
spec:
selector:
app: fastmcp-server
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: fastmcp-ingress
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt-prod
spec:
tls:
- hosts:
- api.yourdomain.com
secretName: fastmcp-tls
rules:
- host: api.yourdomain.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: fastmcp-service
port:
number: 80
监控和可观测性
日志记录和指标
# monitored_server.py
from fastmcp import FastMCP, Context
import logging
import time
from prometheus_client import Counter, Histogram, start_http_server
# 设置指标
TOOL_CALLS = Counter('mcp_tool_calls_total', 'Total tool calls', ['tool_name', 'status'])
TOOL_DURATION = Histogram('mcp_tool_duration_seconds', 'Tool execution time', ['tool_name'])
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('fastmcp.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
mcp = FastMCP("Monitored Server")
def monitor_tool(func):
"""为工具添加监控的装饰器。"""
def wrapper(*args, **kwargs):
start_time = time.time()
tool_name = func.__name__
try:
logger.info(f"Starting tool: {tool_name}")
result = func(*args, **kwargs)
TOOL_CALLS.labels(tool_name=tool_name, status='success').inc()
logger.info(f"Tool {tool_name} completed successfully")
return result
except Exception as e:
TOOL_CALLS.labels(tool_name=tool_name, status='error').inc()
logger.error(f"Tool {tool_name} failed: {str(e)}")
raise
finally:
duration = time.time() - start_time
TOOL_DURATION.labels(tool_name=tool_name).observe(duration)
logger.info(f"Tool {tool_name} took {duration:.2f}s")
return wrapper
@mcp.tool
@monitor_tool
def process_data(data: list) -> dict:
"""处理数据并进行监控。"""
# 处理逻辑
return {"processed": len(data), "status": "success"}
# 启动 Prometheus 指标服务器
start_http_server(9090)
if __name__ == "__main__":
mcp.run(transport="http", port=8000)
健康检查和熔断器
# resilient_server.py
from fastmcp import FastMCP, Context
import asyncio
from circuit_breaker import CircuitBreaker
import aioredis
import aiohttp # 导入 aiohttp 模块
from datetime import datetime # 导入 datetime 模块
mcp = FastMCP("Resilient Server")
# 外部 API 调用的熔断器
api_circuit_breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=30,
expected_exception=Exception
)
@mcp.tool
async def call_external_api(endpoint: str, ctx: Context) -> dict:
"""调用受熔断器保护的外部 API。"""
@api_circuit_breaker
async def make_api_call():
# 实际 API 调用实现
async with aiohttp.ClientSession() as session:
async with session.get(endpoint) as response:
return await response.json()
try:
return await make_api_call()
except Exception as e:
await ctx.error(f"API call failed: {str(e)}")
return {"error": "Service temporarily unavailable"}
@mcp.resource("health://status")
async def health_check() -> dict:
"""全面的健康检查。"""
health_status = {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"checks": {}
}
# 检查数据库连接
try:
# 数据库健康检查逻辑(此处省略具体实现)
health_status["checks"]["database"] = "healthy"
except Exception:
health_status["checks"]["database"] = "unhealthy"
health_status["status"] = "unhealthy"
# 检查 Redis 连接
try:
redis = aioredis.from_url("redis://localhost")
await redis.ping()
health_status["checks"]["redis"] = "healthy"
except Exception:
health_status["checks"]["redis"] = "unhealthy"
health_status["status"] = "degraded"
return health_status
安全最佳实践
输入验证和净化
# secure_tools.py
from fastmcp import FastMCP
from pydantic import BaseModel, validator
import re
import html
mcp = FastMCP("Secure Server")
class UserInput(BaseModel):
username: str
email: str
age: int
@validator('username')
def validate_username(cls, v):
if not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('用户名必须只包含字母、数字和下划线')
if len(v) < 3 or len(v) > 20:
raise ValueError('用户名长度必须在 3 到 20 个字符之间')
return v
@validator('email')
def validate_email(cls, v):
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, v):
raise ValueError('邮箱格式无效')
return v
@validator('age')
def validate_age(cls, v):
if v < 0 or v > 150:
raise ValueError('年龄必须在 0 到 150 之间')
return v
def generate_user_id():
"""生成用户 ID 的占位函数"""
return "user_" + str(hash("some_unique_string"))[1:10] # 示例实现,实际应使用更安全的生成方式
@mcp.tool
def create_user(user_data: UserInput) -> dict:
"""使用已验证的输入创建用户。"""
# 输入由 Pydantic 自动验证
return {
"user_id": generate_user_id(),
"username": user_data.username,
"email": user_data.email,
"status": "created"
}
@mcp.tool
def sanitize_html_content(content: str) -> str:
"""净化 HTML 内容以防止 XSS。"""
# 基础 HTML 净化
sanitized = html.escape(content)
return sanitized
速率限制和流量控制
# rate_limited_server.py
from fastmcp import FastMCP
from fastmcp.middleware import RateLimitMiddleware
import asyncio
import time # 导入 time 模块
# 配置速率限制
rate_limiter = RateLimitMiddleware(
requests_per_minute=60,
burst_size=10,
storage_backend="redis", # 或 "memory"
redis_url="redis://localhost:6379"
)
mcp = FastMCP("Rate Limited Server", middleware=[rate_limiter])
@mcp.tool
def expensive_operation(data: str) -> dict:
"""一个应该被速率限制的耗时操作。"""
# 模拟耗时处理
time.sleep(2)
return {"processed": True, "data_length": len(data)}
性能优化
异步操作和连接池
# optimized_server.py
from fastmcp import FastMCP, Context
import asyncio
import aiohttp
import asyncpg
from contextlib import asynccontextmanager
class DatabasePool:
def __init__(self):
self.pool = None
async def create_pool(self):
self.pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/db",
min_size=10,
max_size=20
)
async def close_pool(self):
if self.pool:
await self.pool.close()
db_pool = DatabasePool()
@asynccontextmanager
async def lifespan():
# 启动时
await db_pool.create_pool()
yield
# 关闭时
await db_pool.close_pool()
mcp = FastMCP("Optimized Server", lifespan=lifespan)
@mcp.tool
async def query_database(query: str) -> dict:
"""使用连接池执行数据库查询。"""
async with db_pool.pool.acquire() as connection:
result = await connection.fetch(query)
return {"rows": [dict(row) for row in result]}
@mcp.tool
async def batch_http_requests(urls: list[str], ctx: Context) -> list[dict]:
"""同时进行多个 HTTP 请求。"""
await ctx.info(f"正在发起 {len(urls)} 个并发请求...")
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
task = session.get(url)
tasks.append(task)
responses = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for i, response in enumerate(responses):
if isinstance(response, Exception):
results.append({"url": urls[i], "error": str(response)})
else:
results.append({
"url": urls[i],
"status": response.status,
"data": await response.text()
})
return results
总结
FastMCP 2.0 提供了一个全面的平台,用于构建可投入生产的 MCP 服务器,使其能够从简单的开发工具扩展到企业级应用。主要要点包括:
开发最佳实践
- 采用模块化的服务器组合以实现可维护的代码
- 实现全面的错误处理和验证
- 从一开始就添加适当的日志记录和监控
- 彻底测试您的工具和资源
生产就绪度
- 实施适当的认证和授权机制
- 使用基于环境的配置
- 添加健康检查和熔断器
- 监控性能和资源使用情况
部署策略
- 将您的应用容器化以确保一致性
- 使用 Kubernetes 等编排平台进行扩展
- 实施适当的 CI/CD 管道
- 规划高可用性和灾难恢复
安全考量
- 验证并净化所有输入
- 实施速率限制和流量控制
- 使用 HTTPS 和正确的认证
- 定期进行安全审计和更新
FastMCP 使构建复杂的 AI 驱动应用成为可能,这些应用能够与现代开发工作流程和生产基础设施无缝集成。无论您是构建开发工具、数据处理服务还是复杂的企业应用,FastMCP 都提供了成功所需的基础。
祝您使用 FastMCP 构建愉快!🚀