FastMCP: MCPサーバーとクライアント構築の完全ガイド

FastMCP: MCPサーバーとクライアント構築の完全ガイド

FastMCP 2.0の全容を解説するチュートリアルです。このライブラリを使えば、Model Context Protocol (MCP) のサーバーとクライアントをPythonらしく、かつ迅速に構築できます。LLMアプリケーション向けのツール、リソース、プロンプトの作成方法を習得しましょう。

レッスン 5 2025-07-03 07:00

高度なパターンと本番環境へのデプロイ

高度なパターンと本番環境へのデプロイ

FastMCPアプリケーションがシンプルな開発ツールから本番環境に対応したサービスへと成長するにつれて、高度なパターン、スケーラビリティ、セキュリティ、およびデプロイ戦略を考慮する必要があります。このセクションでは、エンタープライズグレードのFastMCP開発とデプロイについて説明します。

高度なサーバーパターン

サーバー構成とモジュラリティ

FastMCP 2.0は、モジュール型で保守しやすいアプリケーションを可能にする洗練されたサーバー構成パターンをサポートしています。

# auth_server.py
from fastmcp import FastMCP
import jwt
import bcrypt

auth_server = FastMCP("Authentication Service")

@auth_server.tool
def authenticate_user(username: str, password: str) -> dict:
    """ユーザーを認証し、JWTトークンを返します。"""
    # ユーザー認証の実装
    if verify_credentials(username, password):
        token = jwt.encode({"user": username}, "secret", algorithm="HS256")
        return {"token": token, "user": username, "authenticated": True}
    return {"authenticated": False, "error": "Invalid credentials"}

# data_server.py
from fastmcp import FastMCP
import pandas as pd

data_server = FastMCP("Data Processing Service")

@data_server.tool
def process_dataset(data_path: str, operation: str) -> dict:
    """指定された操作でデータセットを処理します。"""
    df = pd.read_csv(data_path)

    operations = {
        "summary": lambda df: df.describe().to_dict(),
        "count": lambda df: {"rows": len(df), "columns": len(df.columns)},
        "columns": lambda df: df.columns.tolist()
    }

    if operation not in operations:
        raise ValueError(f"Unknown operation: {operation}")

    return operations[operation](df)

# main_server.py
from fastmcp import FastMCP
from auth_server import auth_server
from data_server import data_server

# メインサーバーを作成し、サブサーバーを構成
main_server = FastMCP(
    name="Enterprise Application",
    instructions="認証とデータ処理を備えた包括的なエンタープライズアプリケーション"
)

# サブサーバーをプレフィックス付きでマウント
main_server.mount(auth_server, prefix="auth")
main_server.mount(data_server, prefix="data")

# メインサーバーのツールを追加
@main_server.tool
def health_check() -> dict:
    """すべてのサービスのヘルスチェックを実行します。"""
    return {
        "status": "healthy",
        "services": ["auth", "data", "main"],
        "timestamp": datetime.now().isoformat()
    }

if __name__ == "__main__":
    main_server.run(transport="http", host="0.0.0.0", port=8000)

API統合のためのプロキシサーバー

既存のAPIをMCPエコシステムに接続するためのプロキシサーバーを作成します。

# api_proxy.py
from fastmcp import FastMCP, Client
import asyncio

# 既存のMCPサーバーのプロキシを作成
async def create_api_proxy():
    # リモートMCPサーバーに接続
    remote_client = Client("https://api.example.com/mcp/sse")

    # プロキシサーバーを作成
    proxy_server = FastMCP.as_proxy(
        remote_client,
        name="API Proxy Server",
        instructions="リモートAPIサービス用のプロキシ"
    )

    # プロキシにカスタムミドルウェアまたはツールを追加
    @proxy_server.tool
    def local_cache_status() -> dict:
        """ローカルキャッシュのステータスを取得します。"""
        return {"cache_enabled": True, "cache_size": "50MB"}

    return proxy_server

# 使用例
if __name__ == "__main__":
    proxy = asyncio.run(create_api_proxy())
    proxy.run(transport="http", port=9000)

動的なツール生成

設定やOpenAPI仕様からツールを動的に生成します。

# dynamic_tools.py
from fastmcp import FastMCP
import yaml
import requests

mcp = FastMCP("Dynamic API Server")

def create_api_tool(endpoint_config):
    """APIエンドポイント設定からツールを動的に作成します。"""
    def api_tool(**kwargs):
        response = requests.request(
            method=endpoint_config["method"],
            url=endpoint_config["url"],
            **kwargs
        )
        return response.json()

    # 関数メタデータを設定
    api_tool.__name__ = endpoint_config["name"]
    api_tool.__doc__ = endpoint_config["description"]

    return api_tool

# API設定をロード
with open("api_config.yaml", "r") as f:
    api_config = yaml.safe_load(f)

# ツールを動的に生成
for endpoint in api_config["endpoints"]:
    tool_func = create_api_tool(endpoint)
    mcp.tool(tool_func)

# 代替案:OpenAPI仕様から生成
def load_from_openapi(spec_url: str):
    """OpenAPI仕様からツールをロードします。"""
    # FastMCPにはOpenAPI統合が組み込まれています
    api_server = FastMCP.from_openapi(spec_url)
    return api_server

# 使用例
# openapi_server = load_from_openapi("https://api.example.com/openapi.json")
# mcp.mount(openapi_server, prefix="api")

本番環境に対応した認証

JWTベースの認証

# secure_server.py
from fastmcp import FastMCP
from fastmcp.auth import JWTAuthProvider
import os

# JWT認証を設定
auth_provider = JWTAuthProvider(
    secret_key=os.getenv("JWT_SECRET_KEY"),
    algorithm="HS256",
    token_header="Authorization"
)

# 認証済みサーバーを作成
mcp = FastMCP(
    "Secure Production Server",
    auth_provider=auth_provider
)

@mcp.tool(require_auth=True)
def get_sensitive_data(user_context) -> dict:
    """機密データを取得します - 認証が必要です。"""
    return {
        "user_id": user_context.user_id,
        "data": "sensitive information",
        "access_level": user_context.permissions
    }

@mcp.tool  # 公開ツール、認証不要
def get_public_info() -> dict:
    """公開情報を取得します。"""
    return {"status": "public", "version": "1.0.0"}

APIキー認証

# api_key_server.py
from fastmcp import FastMCP
from fastmcp.auth import APIKeyAuthProvider

# APIキー認証を設定
auth_provider = APIKeyAuthProvider(
    api_keys={
        "key123": {"name": "Client A", "permissions": ["read", "write"]},
        "key456": {"name": "Client B", "permissions": ["read"]}
    },
    header_name="X-API-Key"
)

mcp = FastMCP("API Key Protected Server", auth_provider=auth_provider)

@mcp.tool(require_permissions=["write"])
def write_data(data: dict, auth_context) -> dict:
    """データを書き込みます - 書き込み権限が必要です。"""
    # 実装
    return {"written": True, "client": auth_context.client_name}

デプロイ戦略

Dockerデプロイ

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 依存関係をインストール
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# アプリケーションをコピー
COPY . .

# 非rootユーザーを作成
RUN useradd -m -u 1000 mcpuser && chown -R mcpuser:mcpuser /app
USER mcpuser

# ヘルスチェック
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# サーバーを実行
EXPOSE 8000
CMD ["python", "server.py"]
# docker-compose.yml
version: '3.8'

services:
  fastmcp-server:
    build: .
    ports:
      - "8000:8000"
    environment:
      - FASTMCP_LOG_LEVEL=INFO
      - JWT_SECRET_KEY=${JWT_SECRET_KEY}
      - DATABASE_URL=${DATABASE_URL}
    volumes:
      - ./logs:/app/logs
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    restart: unless-stopped

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - fastmcp-server
    restart: unless-stopped

volumes:
  redis_data:

Kubernetesデプロイ

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: fastmcp-server
  labels:
    app: fastmcp-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: fastmcp-server
  template:
    metadata:
      labels:
        app: fastmcp-server
    spec:
      containers:
      - name: fastmcp-server
        image: your-registry/fastmcp-server:latest
        ports:
        - containerPort: 8000
        env:
        - name: FASTMCP_LOG_LEVEL
          value: "INFO"
        - name: JWT_SECRET_KEY
          valueFrom:
            secretKeyRef:
              name: fastmcp-secrets
              key: jwt-secret
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: fastmcp-service
spec:
  selector:
    app: fastmcp-server
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer

---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: fastmcp-ingress
  annotations:
    kubernetes.io/ingress.class: nginx
    cert-manager.io/cluster-issuer: letsencrypt-prod
spec:
  tls:
  - hosts:
    - api.yourdomain.com
    secretName: fastmcp-tls
  rules:
  - host: api.yourdomain.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: fastmcp-service
            port:
              number: 80

監視と可観測性

ログとメトリクス

# monitored_server.py
from fastmcp import FastMCP, Context
import logging
import time
from prometheus_client import Counter, Histogram, start_http_server

# メトリクスを設定
TOOL_CALLS = Counter('mcp_tool_calls_total', 'Total tool calls', ['tool_name', 'status'])
TOOL_DURATION = Histogram('mcp_tool_duration_seconds', 'Tool execution time', ['tool_name'])

# ロギングを設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('fastmcp.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

mcp = FastMCP("Monitored Server")

def monitor_tool(func):
    """ツールに監視を追加するデコレータ。"""
    def wrapper(*args, **kwargs):
        start_time = time.time()
        tool_name = func.__name__

        try:
            logger.info(f"Starting tool: {tool_name}")
            result = func(*args, **kwargs)
            TOOL_CALLS.labels(tool_name=tool_name, status='success').inc()
            logger.info(f"Tool {tool_name} completed successfully")
            return result
        except Exception as e:
            TOOL_CALLS.labels(tool_name=tool_name, status='error').inc()
            logger.error(f"Tool {tool_name} failed: {str(e)}")
            raise
        finally:
            duration = time.time() - start_time
            TOOL_DURATION.labels(tool_name=tool_name).observe(duration)
            logger.info(f"Tool {tool_name} took {duration:.2f}s")

    return wrapper

@mcp.tool
@monitor_tool
def process_data(data: list) -> dict:
    """データを監視つきで処理します。"""
    # 処理ロジック
    return {"processed": len(data), "status": "success"}

# Prometheusメトリクスサーバーを起動
start_http_server(9090)

if __name__ == "__main__":
    mcp.run(transport="http", port=8000)

ヘルスチェックとサーキットブレーカー

# resilient_server.py
from fastmcp import FastMCP, Context
import asyncio
from circuit_breaker import CircuitBreaker
import aioredis

mcp = FastMCP("Resilient Server")

# 外部API呼び出し用のサーキットブレーカー
api_circuit_breaker = CircuitBreaker(
    failure_threshold=5,
    recovery_timeout=30,
    expected_exception=Exception
)

@mcp.tool
async def call_external_api(endpoint: str, ctx: Context) -> dict:
    """サーキットブレーカー保護付きで外部APIを呼び出します。"""
    @api_circuit_breaker
    async def make_api_call():
        # 実際のAPI呼び出しの実装
        async with aiohttp.ClientSession() as session:
            async with session.get(endpoint) as response:
                return await response.json()

    try:
        return await make_api_call()
    except Exception as e:
        await ctx.error(f"API call failed: {str(e)}")
        return {"error": "Service temporarily unavailable"}

@mcp.resource("health://status")
async def health_check() -> dict:
    """包括的なヘルスチェック。"""
    health_status = {
        "status": "healthy",
        "timestamp": datetime.now().isoformat(),
        "checks": {}
    }

    # データベース接続をチェック
    try:
        # データベースヘルスチェックロジック
        health_status["checks"]["database"] = "healthy"
    except Exception:
        health_status["checks"]["database"] = "unhealthy"
        health_status["status"] = "unhealthy"

    # Redis接続をチェック
    try:
        redis = aioredis.from_url("redis://localhost")
        await redis.ping()
        health_status["checks"]["redis"] = "healthy"
    except Exception:
        health_status["checks"]["redis"] = "unhealthy"
        health_status["status"] = "degraded"

    return health_status

セキュリティのベストプラクティス

入力検証とサニタイズ

# secure_tools.py
from fastmcp import FastMCP
from pydantic import BaseModel, validator
import re
import html

mcp = FastMCP("Secure Server")

class UserInput(BaseModel):
    username: str
    email: str
    age: int

    @validator('username')
    def validate_username(cls, v):
        if not re.match(r'^[a-zA-Z0-9_]+$', v):
            raise ValueError('Username must contain only letters, numbers, and underscores')
        if len(v) < 3 or len(v) > 20:
            raise ValueError('Username must be between 3 and 20 characters')
        return v

    @validator('email')
    def validate_email(cls, v):
        email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        if not re.match(email_pattern, v):
            raise ValueError('Invalid email format')
        return v

    @validator('age')
    def validate_age(cls, v):
        if v < 0 or v > 150:
            raise ValueError('Age must be between 0 and 150')
        return v

@mcp.tool
def create_user(user_data: UserInput) -> dict:
    """検証済み入力でユーザーを作成します。"""
    # 入力はPydanticによって自動的に検証されます
    return {
        "user_id": generate_user_id(),
        "username": user_data.username,
        "email": user_data.email,
        "status": "created"
    }

@mcp.tool
def sanitize_html_content(content: str) -> str:
    """XSSを防ぐためにHTMLコンテンツをサニタイズします。"""
    # 基本的なHTMLサニタイゼーション
    sanitized = html.escape(content)
    return sanitized

レートリミットとスロットリング

# rate_limited_server.py
from fastmcp import FastMCP
from fastmcp.middleware import RateLimitMiddleware
import asyncio

# レート制限を設定
rate_limiter = RateLimitMiddleware(
    requests_per_minute=60,
    burst_size=10,
    storage_backend="redis",  # or "memory"
    redis_url="redis://localhost:6379"
)

mcp = FastMCP("Rate Limited Server", middleware=[rate_limiter])

@mcp.tool
def expensive_operation(data: str) -> dict:
    """レート制限されるべき高コストな操作。"""
    # 高コストな処理をシミュレート
    time.sleep(2)
    return {"processed": True, "data_length": len(data)}

パフォーマンス最適化

非同期操作とコネクションプーリング

# optimized_server.py
from fastmcp import FastMCP, Context
import asyncio
import aiohttp
import asyncpg
from contextlib import asynccontextmanager

class DatabasePool:
    def __init__(self):
        self.pool = None

    async def create_pool(self):
        self.pool = await asyncpg.create_pool(
            "postgresql://user:pass@localhost/db",
            min_size=10,
            max_size=20
        )

    async def close_pool(self):
        if self.pool:
            await self.pool.close()

db_pool = DatabasePool()

@asynccontextmanager
async def lifespan():
    # 起動時
    await db_pool.create_pool()
    yield
    # シャットダウン時
    await db_pool.close_pool()

mcp = FastMCP("Optimized Server", lifespan=lifespan)

@mcp.tool
async def query_database(query: str) -> dict:
    """コネクションプールを使用してデータベースクエリを実行します。"""
    async with db_pool.pool.acquire() as connection:
        result = await connection.fetch(query)
        return {"rows": [dict(row) for row in result]}

@mcp.tool
async def batch_http_requests(urls: list[str], ctx: Context) -> list[dict]:
    """複数のHTTPリクエストを同時に実行します。"""
    await ctx.info(f"{len(urls)}個の同時リクエストを実行中...")

    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            task = session.get(url)
            tasks.append(task)

        responses = await asyncio.gather(*tasks, return_exceptions=True)

        results = []
        for i, response in enumerate(responses):
            if isinstance(response, Exception):
                results.append({"url": urls[i], "error": str(response)})
            else:
                results.append({
                    "url": urls[i],
                    "status": response.status,
                    "data": await response.text()
                })

        return results

結論

FastMCP 2.0は、シンプルな開発ツールからエンタープライズグレードのアプリケーションまでスケーリング可能な、本番環境対応のMCPサーバーを構築するための包括的なプラットフォームを提供します。主なポイントは以下の通りです。

開発のベストプラクティス

  • 保守しやすいコードのためにモジュール型サーバー構成を使用する
  • 包括的なエラーハンドリングと検証を実装する
  • 最初から適切なロギングと監視を追加する
  • ツールとリソースを徹底的にテストする

本番環境対応

  • 適切な認証と認可を実装する
  • 環境ベースの構成を使用する
  • ヘルスチェックとサーキットブレーカーを追加する
  • パフォーマンスとリソース使用量を監視する

デプロイ戦略

  • 一貫性のためにアプリケーションをコンテナ化する
  • スケーリングのためにKubernetesのようなオーケストレーションプラットフォームを使用する
  • 適切なCI/CDパイプラインを実装する
  • 高可用性と災害復旧を計画する

セキュリティの考慮事項

  • すべての入力を検証してサニタイズする
  • レート制限とスロットリングを実装する
  • HTTPSと適切な認証を使用する
  • 定期的なセキュリティ監査と更新

FastMCPは、現代の開発ワークフローと本番インフラストラクチャとシームレスに統合できる洗練されたAI搭載アプリケーションを構築することを可能にします。開発ツール、データ処理サービス、または複雑なエンタープライズアプリケーションを構築しているかどうかにかかわらず、FastMCPは成功に必要な基盤を提供します。

FastMCPで素晴らしいものを構築しましょう!🚀