跳到主要内容

最佳实践指南

本文档提供WaveYo-API插件开发的最佳实践和建议,帮助您编写高质量、可维护的插件代码。

错误处理

异常处理最佳实践

def register(app, **dependencies):
try:
# 获取日志服务
log_service = dependencies.get('log_service')
logger = log_service.get_logger(__name__)

# 插件初始化逻辑
validator = get_env_validator()
env_schema = {
"API_KEY": {
"type": EnvVarType.STRING,
"required": True,
"description": "API密钥"
}
}

env_vars = validator.validate_env_vars("my-plugin", env_schema)
api_key = env_vars["API_KEY"]

# 注册路由和服务
app.include_router(router)

logger.info("插件初始化成功")

except ValueError as e:
# 环境变量验证失败
logger.error(f"环境变量验证失败: {e}")
raise PluginInitializationError(f"环境变量配置错误: {e}")

except DependencyError as e:
# 依赖服务不可用
logger.error(f"依赖服务不可用: {e}")
raise PluginInitializationError(f"依赖服务缺失: {e}")

except Exception as e:
# 其他未知错误
logger.critical(f"插件初始化过程中发生未知错误: {e}")
raise PluginInitializationError(f"初始化失败: {e}")

自定义异常类

class PluginInitializationError(Exception):
"""插件初始化失败异常"""
pass

class DependencyError(Exception):
"""依赖服务错误异常"""
pass

class ConfigurationError(Exception):
"""配置错误异常"""
pass

性能优化

异步编程模式

# 正确:使用异步函数
@router.get("/data")
async def get_data():
# 异步数据库操作
data = await fetch_data_async()
return {"data": data}

# 错误:在异步函数中使用同步阻塞操作
@router.get("/data-slow")
async def get_data_slow():
# 这会阻塞事件循环
data = fetch_data_sync() # 同步操作
return {"data": data}

缓存策略

from functools import lru_cache
import asyncio

# 内存缓存
@lru_cache(maxsize=128)
def get_config_value(key):
# 昂贵的配置读取操作
return read_config(key)

# 异步缓存模式
cache = {}

async def get_cached_data(key, ttl=300):
"""带TTL的异步缓存"""
if key in cache:
cached_data, expiry = cache[key]
if time.time() < expiry:
return cached_data

# 缓存未命中或已过期
data = await fetch_expensive_data(key)
cache[key] = (data, time.time() + ttl)
return data

连接池管理

class DatabaseConnectionPool:
def __init__(self, max_connections=10):
self.semaphore = asyncio.Semaphore(max_connections)
self.connections = []

async def get_connection(self):
async with self.semaphore:
if self.connections:
return self.connections.pop()
else:
return await create_new_connection()

async def release_connection(self, conn):
self.connections.append(conn)

安全性

输入验证

from pydantic import BaseModel, constr, conint
from fastapi import HTTPException

class UserCreateRequest(BaseModel):
username: constr(min_length=3, max_length=20, pattern=r'^[a-zA-Z0-9_]+$')
email: constr(pattern=r'^[^@]+@[^@]+\.[^@]+$')
age: conint(gt=0, lt=150)

@router.post("/users")
async def create_user(user: UserCreateRequest):
# 输入已通过Pydantic验证
# 业务逻辑处理
return {"message": "用户创建成功"}

敏感信息处理

import os
from cryptography.fernet import Fernet

class SecureConfig:
def __init__(self):
# 从环境变量获取加密密钥
key = os.getenv('CONFIG_ENCRYPTION_KEY')
if not key:
raise ConfigurationError("加密密钥未配置")

self.cipher = Fernet(key.encode())

def encrypt_value(self, value):
"""加密敏感数据"""
return self.cipher.encrypt(value.encode()).decode()

def decrypt_value(self, encrypted_value):
"""解密敏感数据"""
return self.cipher.decrypt(encrypted_value.encode()).decode()

权限控制

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

async def get_current_user(token: str = Depends(oauth2_scheme)):
# 验证token并获取用户信息
user = await validate_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证令牌",
headers={"WWW-Authenticate": "Bearer"},
)
return user

async def require_admin(user: dict = Depends(get_current_user)):
"""要求管理员权限"""
if user.get('role') != 'admin':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="需要管理员权限"
)
return user

代码质量

代码组织

# 推荐的文件结构
plugins/yoapi_plugin_my_service/
├── __init__.py # 主入口,注册函数
├── plugin.json # 元数据
├── requirements.txt # 依赖
├── models.py # 数据模型
├── schemas.py # Pydantic模型
├── services.py # 业务服务
├── routers.py # API路由
├── middleware.py # 中间件
├── utils.py # 工具函数
└── exceptions.py # 自定义异常

类型注解

from typing import List, Dict, Optional, AsyncGenerator
from pydantic import BaseModel

class User(BaseModel):
id: int
name: str
email: str

async def get_users(
page: int = 1,
limit: int = 10
) -> Dict[str, List[User]]:
"""获取用户列表,返回类型明确的字典"""
users = await user_service.get_users(page, limit)
return {"users": users, "total": len(users)}

async def stream_data() -> AsyncGenerator[bytes, None]:
"""异步生成器,明确返回类型"""
async for chunk in data_source.stream():
yield chunk

单元测试

import pytest
from fastapi.testclient import TestClient
from unittest.mock import AsyncMock, patch

@pytest.fixture
def test_client():
from main import create_app
app = create_app()
return TestClient(app)

@pytest.mark.asyncio
async def test_plugin_registration():
"""测试插件注册功能"""
mock_logger = AsyncMock()
dependencies = {'log_service': mock_logger}

# 导入并测试register函数
from plugins.yoapi_plugin_my_service import register

app = FastAPI()
register(app, **dependencies)

# 验证日志被调用
mock_logger.get_logger.assert_called_once()
mock_logger.info.assert_called_with("插件已成功注册")

监控和日志

结构化日志

import json
from datetime import datetime

def structured_log(logger, level: str, message: str, **context):
"""生成结构化日志"""
log_data = {
"timestamp": datetime.now().isoformat(),
"level": level.upper(),
"message": message,
"plugin": "my_plugin",
**context
}

if level == "info":
logger.info(json.dumps(log_data))
elif level == "error":
logger.error(json.dumps(log_data))
# 其他级别...

# 使用示例
structured_log(
logger,
"info",
"用户注册成功",
user_id=123,
email="user@example.com",
registration_time="2024-01-15T10:30:00"
)

性能监控

import time
from prometheus_client import Counter, Histogram

# 定义指标
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'HTTP request duration', ['endpoint'])

@router.get("/api/data")
async def get_data():
start_time = time.time()

try:
# 业务逻辑
data = await fetch_data()
REQUEST_COUNT.labels(method='GET', endpoint='/api/data').inc()

return {"data": data}
finally:
duration = time.time() - start_time
REQUEST_DURATION.labels(endpoint='/api/data').observe(duration)

部署考虑

配置管理

class ConfigManager:
def __init__(self):
self.config = {}
self.watchers = []

async def load_config(self):
"""异步加载配置"""
# 从多种源加载配置:环境变量、配置文件、远程配置中心等
self.config.update(await self._load_from_env())
self.config.update(await self._load_from_file())
self.config.update(await self._load_from_remote())

def get(self, key, default=None):
"""安全获取配置值"""
return self.config.get(key, default)

def watch(self, key, callback):
"""监听配置变化"""
self.watchers.append((key, callback))

健康检查

@router.get("/health")
async def health_check():
"""健康检查端点"""
health_status = {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"version": "1.0.0",
"dependencies": await check_dependencies()
}

# 检查关键依赖
if not await check_database():
health_status["status"] = "degraded"
health_status["database"] = "unavailable"

return health_status

async def check_dependencies():
"""检查插件依赖的服务状态"""
dependencies_status = {}

# 检查数据库连接
try:
await database.ping()
dependencies_status["database"] = "healthy"
except Exception as e:
dependencies_status["database"] = f"unhealthy: {e}"

# 检查其他依赖...
return dependencies_status

遵循这些最佳实践将帮助您开发出高质量、可维护且安全的WaveYo-API插件。