レッスン 5
2025-07-03 07:00
高度なパターンと本番環境へのデプロイ
高度なパターンと本番環境へのデプロイ
FastMCPアプリケーションがシンプルな開発ツールから本番環境に対応したサービスへと成長するにつれて、高度なパターン、スケーラビリティ、セキュリティ、およびデプロイ戦略を考慮する必要があります。このセクションでは、エンタープライズグレードのFastMCP開発とデプロイについて説明します。
高度なサーバーパターン
サーバー構成とモジュラリティ
FastMCP 2.0は、モジュール型で保守しやすいアプリケーションを可能にする洗練されたサーバー構成パターンをサポートしています。
# auth_server.py
from fastmcp import FastMCP
import jwt
import bcrypt
auth_server = FastMCP("Authentication Service")
@auth_server.tool
def authenticate_user(username: str, password: str) -> dict:
"""ユーザーを認証し、JWTトークンを返します。"""
# ユーザー認証の実装
if verify_credentials(username, password):
token = jwt.encode({"user": username}, "secret", algorithm="HS256")
return {"token": token, "user": username, "authenticated": True}
return {"authenticated": False, "error": "Invalid credentials"}
# data_server.py
from fastmcp import FastMCP
import pandas as pd
data_server = FastMCP("Data Processing Service")
@data_server.tool
def process_dataset(data_path: str, operation: str) -> dict:
"""指定された操作でデータセットを処理します。"""
df = pd.read_csv(data_path)
operations = {
"summary": lambda df: df.describe().to_dict(),
"count": lambda df: {"rows": len(df), "columns": len(df.columns)},
"columns": lambda df: df.columns.tolist()
}
if operation not in operations:
raise ValueError(f"Unknown operation: {operation}")
return operations[operation](df)
# main_server.py
from fastmcp import FastMCP
from auth_server import auth_server
from data_server import data_server
# メインサーバーを作成し、サブサーバーを構成
main_server = FastMCP(
name="Enterprise Application",
instructions="認証とデータ処理を備えた包括的なエンタープライズアプリケーション"
)
# サブサーバーをプレフィックス付きでマウント
main_server.mount(auth_server, prefix="auth")
main_server.mount(data_server, prefix="data")
# メインサーバーのツールを追加
@main_server.tool
def health_check() -> dict:
"""すべてのサービスのヘルスチェックを実行します。"""
return {
"status": "healthy",
"services": ["auth", "data", "main"],
"timestamp": datetime.now().isoformat()
}
if __name__ == "__main__":
main_server.run(transport="http", host="0.0.0.0", port=8000)
API統合のためのプロキシサーバー
既存のAPIをMCPエコシステムに接続するためのプロキシサーバーを作成します。
# api_proxy.py
from fastmcp import FastMCP, Client
import asyncio
# 既存のMCPサーバーのプロキシを作成
async def create_api_proxy():
# リモートMCPサーバーに接続
remote_client = Client("https://api.example.com/mcp/sse")
# プロキシサーバーを作成
proxy_server = FastMCP.as_proxy(
remote_client,
name="API Proxy Server",
instructions="リモートAPIサービス用のプロキシ"
)
# プロキシにカスタムミドルウェアまたはツールを追加
@proxy_server.tool
def local_cache_status() -> dict:
"""ローカルキャッシュのステータスを取得します。"""
return {"cache_enabled": True, "cache_size": "50MB"}
return proxy_server
# 使用例
if __name__ == "__main__":
proxy = asyncio.run(create_api_proxy())
proxy.run(transport="http", port=9000)
動的なツール生成
設定やOpenAPI仕様からツールを動的に生成します。
# dynamic_tools.py
from fastmcp import FastMCP
import yaml
import requests
mcp = FastMCP("Dynamic API Server")
def create_api_tool(endpoint_config):
"""APIエンドポイント設定からツールを動的に作成します。"""
def api_tool(**kwargs):
response = requests.request(
method=endpoint_config["method"],
url=endpoint_config["url"],
**kwargs
)
return response.json()
# 関数メタデータを設定
api_tool.__name__ = endpoint_config["name"]
api_tool.__doc__ = endpoint_config["description"]
return api_tool
# API設定をロード
with open("api_config.yaml", "r") as f:
api_config = yaml.safe_load(f)
# ツールを動的に生成
for endpoint in api_config["endpoints"]:
tool_func = create_api_tool(endpoint)
mcp.tool(tool_func)
# 代替案:OpenAPI仕様から生成
def load_from_openapi(spec_url: str):
"""OpenAPI仕様からツールをロードします。"""
# FastMCPにはOpenAPI統合が組み込まれています
api_server = FastMCP.from_openapi(spec_url)
return api_server
# 使用例
# openapi_server = load_from_openapi("https://api.example.com/openapi.json")
# mcp.mount(openapi_server, prefix="api")
本番環境に対応した認証
JWTベースの認証
# secure_server.py
from fastmcp import FastMCP
from fastmcp.auth import JWTAuthProvider
import os
# JWT認証を設定
auth_provider = JWTAuthProvider(
secret_key=os.getenv("JWT_SECRET_KEY"),
algorithm="HS256",
token_header="Authorization"
)
# 認証済みサーバーを作成
mcp = FastMCP(
"Secure Production Server",
auth_provider=auth_provider
)
@mcp.tool(require_auth=True)
def get_sensitive_data(user_context) -> dict:
"""機密データを取得します - 認証が必要です。"""
return {
"user_id": user_context.user_id,
"data": "sensitive information",
"access_level": user_context.permissions
}
@mcp.tool # 公開ツール、認証不要
def get_public_info() -> dict:
"""公開情報を取得します。"""
return {"status": "public", "version": "1.0.0"}
APIキー認証
# api_key_server.py
from fastmcp import FastMCP
from fastmcp.auth import APIKeyAuthProvider
# APIキー認証を設定
auth_provider = APIKeyAuthProvider(
api_keys={
"key123": {"name": "Client A", "permissions": ["read", "write"]},
"key456": {"name": "Client B", "permissions": ["read"]}
},
header_name="X-API-Key"
)
mcp = FastMCP("API Key Protected Server", auth_provider=auth_provider)
@mcp.tool(require_permissions=["write"])
def write_data(data: dict, auth_context) -> dict:
"""データを書き込みます - 書き込み権限が必要です。"""
# 実装
return {"written": True, "client": auth_context.client_name}
デプロイ戦略
Dockerデプロイ
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 依存関係をインストール
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# アプリケーションをコピー
COPY . .
# 非rootユーザーを作成
RUN useradd -m -u 1000 mcpuser && chown -R mcpuser:mcpuser /app
USER mcpuser
# ヘルスチェック
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# サーバーを実行
EXPOSE 8000
CMD ["python", "server.py"]
# docker-compose.yml
version: '3.8'
services:
fastmcp-server:
build: .
ports:
- "8000:8000"
environment:
- FASTMCP_LOG_LEVEL=INFO
- JWT_SECRET_KEY=${JWT_SECRET_KEY}
- DATABASE_URL=${DATABASE_URL}
volumes:
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- fastmcp-server
restart: unless-stopped
volumes:
redis_data:
Kubernetesデプロイ
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: fastmcp-server
labels:
app: fastmcp-server
spec:
replicas: 3
selector:
matchLabels:
app: fastmcp-server
template:
metadata:
labels:
app: fastmcp-server
spec:
containers:
- name: fastmcp-server
image: your-registry/fastmcp-server:latest
ports:
- containerPort: 8000
env:
- name: FASTMCP_LOG_LEVEL
value: "INFO"
- name: JWT_SECRET_KEY
valueFrom:
secretKeyRef:
name: fastmcp-secrets
key: jwt-secret
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: fastmcp-service
spec:
selector:
app: fastmcp-server
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: fastmcp-ingress
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt-prod
spec:
tls:
- hosts:
- api.yourdomain.com
secretName: fastmcp-tls
rules:
- host: api.yourdomain.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: fastmcp-service
port:
number: 80
監視と可観測性
ログとメトリクス
# monitored_server.py
from fastmcp import FastMCP, Context
import logging
import time
from prometheus_client import Counter, Histogram, start_http_server
# メトリクスを設定
TOOL_CALLS = Counter('mcp_tool_calls_total', 'Total tool calls', ['tool_name', 'status'])
TOOL_DURATION = Histogram('mcp_tool_duration_seconds', 'Tool execution time', ['tool_name'])
# ロギングを設定
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('fastmcp.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
mcp = FastMCP("Monitored Server")
def monitor_tool(func):
"""ツールに監視を追加するデコレータ。"""
def wrapper(*args, **kwargs):
start_time = time.time()
tool_name = func.__name__
try:
logger.info(f"Starting tool: {tool_name}")
result = func(*args, **kwargs)
TOOL_CALLS.labels(tool_name=tool_name, status='success').inc()
logger.info(f"Tool {tool_name} completed successfully")
return result
except Exception as e:
TOOL_CALLS.labels(tool_name=tool_name, status='error').inc()
logger.error(f"Tool {tool_name} failed: {str(e)}")
raise
finally:
duration = time.time() - start_time
TOOL_DURATION.labels(tool_name=tool_name).observe(duration)
logger.info(f"Tool {tool_name} took {duration:.2f}s")
return wrapper
@mcp.tool
@monitor_tool
def process_data(data: list) -> dict:
"""データを監視つきで処理します。"""
# 処理ロジック
return {"processed": len(data), "status": "success"}
# Prometheusメトリクスサーバーを起動
start_http_server(9090)
if __name__ == "__main__":
mcp.run(transport="http", port=8000)
ヘルスチェックとサーキットブレーカー
# resilient_server.py
from fastmcp import FastMCP, Context
import asyncio
from circuit_breaker import CircuitBreaker
import aioredis
mcp = FastMCP("Resilient Server")
# 外部API呼び出し用のサーキットブレーカー
api_circuit_breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=30,
expected_exception=Exception
)
@mcp.tool
async def call_external_api(endpoint: str, ctx: Context) -> dict:
"""サーキットブレーカー保護付きで外部APIを呼び出します。"""
@api_circuit_breaker
async def make_api_call():
# 実際のAPI呼び出しの実装
async with aiohttp.ClientSession() as session:
async with session.get(endpoint) as response:
return await response.json()
try:
return await make_api_call()
except Exception as e:
await ctx.error(f"API call failed: {str(e)}")
return {"error": "Service temporarily unavailable"}
@mcp.resource("health://status")
async def health_check() -> dict:
"""包括的なヘルスチェック。"""
health_status = {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"checks": {}
}
# データベース接続をチェック
try:
# データベースヘルスチェックロジック
health_status["checks"]["database"] = "healthy"
except Exception:
health_status["checks"]["database"] = "unhealthy"
health_status["status"] = "unhealthy"
# Redis接続をチェック
try:
redis = aioredis.from_url("redis://localhost")
await redis.ping()
health_status["checks"]["redis"] = "healthy"
except Exception:
health_status["checks"]["redis"] = "unhealthy"
health_status["status"] = "degraded"
return health_status
セキュリティのベストプラクティス
入力検証とサニタイズ
# secure_tools.py
from fastmcp import FastMCP
from pydantic import BaseModel, validator
import re
import html
mcp = FastMCP("Secure Server")
class UserInput(BaseModel):
username: str
email: str
age: int
@validator('username')
def validate_username(cls, v):
if not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('Username must contain only letters, numbers, and underscores')
if len(v) < 3 or len(v) > 20:
raise ValueError('Username must be between 3 and 20 characters')
return v
@validator('email')
def validate_email(cls, v):
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, v):
raise ValueError('Invalid email format')
return v
@validator('age')
def validate_age(cls, v):
if v < 0 or v > 150:
raise ValueError('Age must be between 0 and 150')
return v
@mcp.tool
def create_user(user_data: UserInput) -> dict:
"""検証済み入力でユーザーを作成します。"""
# 入力はPydanticによって自動的に検証されます
return {
"user_id": generate_user_id(),
"username": user_data.username,
"email": user_data.email,
"status": "created"
}
@mcp.tool
def sanitize_html_content(content: str) -> str:
"""XSSを防ぐためにHTMLコンテンツをサニタイズします。"""
# 基本的なHTMLサニタイゼーション
sanitized = html.escape(content)
return sanitized
レートリミットとスロットリング
# rate_limited_server.py
from fastmcp import FastMCP
from fastmcp.middleware import RateLimitMiddleware
import asyncio
# レート制限を設定
rate_limiter = RateLimitMiddleware(
requests_per_minute=60,
burst_size=10,
storage_backend="redis", # or "memory"
redis_url="redis://localhost:6379"
)
mcp = FastMCP("Rate Limited Server", middleware=[rate_limiter])
@mcp.tool
def expensive_operation(data: str) -> dict:
"""レート制限されるべき高コストな操作。"""
# 高コストな処理をシミュレート
time.sleep(2)
return {"processed": True, "data_length": len(data)}
パフォーマンス最適化
非同期操作とコネクションプーリング
# optimized_server.py
from fastmcp import FastMCP, Context
import asyncio
import aiohttp
import asyncpg
from contextlib import asynccontextmanager
class DatabasePool:
def __init__(self):
self.pool = None
async def create_pool(self):
self.pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/db",
min_size=10,
max_size=20
)
async def close_pool(self):
if self.pool:
await self.pool.close()
db_pool = DatabasePool()
@asynccontextmanager
async def lifespan():
# 起動時
await db_pool.create_pool()
yield
# シャットダウン時
await db_pool.close_pool()
mcp = FastMCP("Optimized Server", lifespan=lifespan)
@mcp.tool
async def query_database(query: str) -> dict:
"""コネクションプールを使用してデータベースクエリを実行します。"""
async with db_pool.pool.acquire() as connection:
result = await connection.fetch(query)
return {"rows": [dict(row) for row in result]}
@mcp.tool
async def batch_http_requests(urls: list[str], ctx: Context) -> list[dict]:
"""複数のHTTPリクエストを同時に実行します。"""
await ctx.info(f"{len(urls)}個の同時リクエストを実行中...")
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
task = session.get(url)
tasks.append(task)
responses = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for i, response in enumerate(responses):
if isinstance(response, Exception):
results.append({"url": urls[i], "error": str(response)})
else:
results.append({
"url": urls[i],
"status": response.status,
"data": await response.text()
})
return results
結論
FastMCP 2.0は、シンプルな開発ツールからエンタープライズグレードのアプリケーションまでスケーリング可能な、本番環境対応のMCPサーバーを構築するための包括的なプラットフォームを提供します。主なポイントは以下の通りです。
開発のベストプラクティス
- 保守しやすいコードのためにモジュール型サーバー構成を使用する
- 包括的なエラーハンドリングと検証を実装する
- 最初から適切なロギングと監視を追加する
- ツールとリソースを徹底的にテストする
本番環境対応
- 適切な認証と認可を実装する
- 環境ベースの構成を使用する
- ヘルスチェックとサーキットブレーカーを追加する
- パフォーマンスとリソース使用量を監視する
デプロイ戦略
- 一貫性のためにアプリケーションをコンテナ化する
- スケーリングのためにKubernetesのようなオーケストレーションプラットフォームを使用する
- 適切なCI/CDパイプラインを実装する
- 高可用性と災害復旧を計画する
セキュリティの考慮事項
- すべての入力を検証してサニタイズする
- レート制限とスロットリングを実装する
- HTTPSと適切な認証を使用する
- 定期的なセキュリティ監査と更新
FastMCPは、現代の開発ワークフローと本番インフラストラクチャとシームレスに統合できる洗練されたAI搭載アプリケーションを構築することを可能にします。開発ツール、データ処理サービス、または複雑なエンタープライズアプリケーションを構築しているかどうかにかかわらず、FastMCPは成功に必要な基盤を提供します。
FastMCPで素晴らしいものを構築しましょう!🚀