安全最佳实践¶
概述¶
本指南提供了使用 FastAPI-Easy 安全模块的最佳实践。
1. 密钥管理¶
✅ 正确做法¶
# 使用环境变量
import os
from fastapi_easy.security import init_jwt_auth
secret_key = os.getenv("JWT_SECRET_KEY")
if not secret_key:
raise ValueError("JWT_SECRET_KEY environment variable is not set")
jwt_auth = init_jwt_auth(secret_key=secret_key)
❌ 错误做法¶
# 不要硬编码密钥
jwt_auth = init_jwt_auth(secret_key="my-secret-key-123")
# 不要在代码中提交密钥
# git add .env # ❌ 错误
密钥轮换¶
# 定期轮换密钥
# 1. 生成新密钥
new_secret_key = secrets.token_urlsafe(32)
# 2. 创建新的 JWT 认证实例
new_jwt_auth = init_jwt_auth(secret_key=new_secret_key)
# 3. 允许旧密钥的 Token 继续工作(过渡期)
# 4. 更新环境变量
# 5. 重启应用
2. 密码安全¶
✅ 正确做法¶
from fastapi_easy.security import PasswordManager
import re
password_manager = PasswordManager(rounds=12)
def validate_password_strength(password: str) -> bool:
"""验证密码强度"""
# 至少 12 个字符
if len(password) < 12:
return False
# 包含大写字母
if not re.search(r'[A-Z]', password):
return False
# 包含小写字母
if not re.search(r'[a-z]', password):
return False
# 包含数字
if not re.search(r'\d', password):
return False
# 包含特殊字符
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
return False
return True
@app.post("/auth/register")
async def register(username: str, password: str):
if not validate_password_strength(password):
raise HTTPException(
status_code=400,
detail="Password is too weak",
)
hashed = password_manager.hash_password(password)
# 保存用户
❌ 错误做法¶
# 不要存储明文密码
user.password = password # ❌ 错误
# 不要使用简单的哈希
import hashlib
user.password = hashlib.md5(password.encode()).hexdigest() # ❌ 错误
# 不要允许弱密码
if len(password) >= 4: # ❌ 太弱
user.password = password_manager.hash_password(password)
3. Token 安全¶
✅ 正确做法¶
# 设置合理的过期时间
jwt_auth = init_jwt_auth(
access_token_expire_minutes=15, # 短期访问令牌
refresh_token_expire_days=7, # 长期刷新令牌
)
# 在客户端使用 HttpOnly Cookie
@app.post("/auth/login")
async def login(username: str, password: str):
access_token = jwt_auth.create_access_token(subject=str(user.id))
refresh_token = jwt_auth.create_refresh_token(subject=str(user.id))
response = JSONResponse(
content={"message": "Login successful"},
status_code=200,
)
# 使用 HttpOnly Cookie 存储 Token
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
secure=True, # 仅 HTTPS
samesite="strict",
max_age=15 * 60, # 15 分钟
)
response.set_cookie(
key="refresh_token",
value=refresh_token,
httponly=True,
secure=True,
samesite="strict",
max_age=7 * 24 * 60 * 60, # 7 天
)
return response
❌ 错误做法¶
# 不要在 localStorage 中存储 Token
# localStorage.setItem('token', token) # ❌ 易受 XSS 攻击
# 不要设置过长的过期时间
jwt_auth = init_jwt_auth(
access_token_expire_minutes=10080, # ❌ 7 天太长
)
# 不要在 URL 中传输 Token
# GET /api/data?token=xyz # ❌ 错误
4. 登录安全¶
✅ 正确做法¶
from fastapi_easy.security import LoginAttemptTracker
tracker = LoginAttemptTracker(
max_attempts=5,
lockout_duration_minutes=15,
)
@app.post("/auth/login")
async def login(username: str, password: str):
# 检查是否被锁定
if tracker.is_locked_out(username):
remaining = tracker.get_lockout_remaining_seconds(username)
raise HTTPException(
status_code=429,
detail=f"Too many attempts. Try again in {remaining} seconds",
)
# 验证用户
user = authenticate_user(username, password)
if not user:
tracker.record_attempt(username, success=False)
raise HTTPException(status_code=401, detail="Invalid credentials")
# 成功
tracker.record_attempt(username, success=True)
# 生成 Token
access_token = jwt_auth.create_access_token(subject=str(user.id))
return {"access_token": access_token}
❌ 错误做法¶
# 不要透露用户是否存在
if user_exists(username): # ❌ 信息泄露
if not verify_password(password, user.password):
raise HTTPException(detail="Invalid password")
else:
raise HTTPException(detail="User not found")
# 正确做法:
raise HTTPException(detail="Invalid username or password")
# 不要允许无限制的登录尝试
# 没有登录限制 # ❌ 易受暴力破解
5. 权限检查¶
✅ 正确做法¶
from fastapi_easy.security import require_role, require_permission
# 使用装饰器检查权限
@app.delete("/users/{user_id}")
async def delete_user(
user_id: int,
current_user: dict = Depends(require_role("admin")),
):
"""删除用户 - 仅管理员"""
# 权限检查已自动进行
return {"message": "User deleted"}
# 检查资源所有权
@app.put("/items/{item_id}")
async def update_item(
item_id: int,
item: Item,
current_user: dict = Depends(get_current_user),
):
"""更新项目 - 仅所有者"""
existing_item = get_item(item_id)
if existing_item.owner_id != int(current_user["user_id"]):
raise HTTPException(status_code=403, detail="Not authorized")
# 更新项目
return update_item_in_db(item_id, item)
❌ 错误做法¶
# 不要在业务逻辑中检查权限
@app.delete("/users/{user_id}")
async def delete_user(user_id: int, current_user: dict):
if current_user["role"] != "admin": # ❌ 太晚了
raise HTTPException(status_code=403)
# 业务逻辑
# 不要信任客户端发送的权限
@app.post("/items")
async def create_item(item: Item, user_role: str): # ❌ 不安全
if user_role == "admin":
# 给予特殊权限
pass
6. 审计和监控¶
✅ 正确做法¶
from fastapi_easy.security import AuditLogger, AuditEventType
audit_logger = AuditLogger()
@app.post("/auth/login")
async def login(username: str, password: str):
# 记录所有登录尝试
try:
user = authenticate_user(username, password)
audit_logger.log(
event_type=AuditEventType.LOGIN_SUCCESS,
user_id=str(user.id),
username=username,
status="success",
)
# 生成 Token
access_token = jwt_auth.create_access_token(subject=str(user.id))
return {"access_token": access_token}
except Exception as e:
audit_logger.log(
event_type=AuditEventType.LOGIN_FAILURE,
username=username,
status="failure",
details={"error": str(e)},
)
raise
# 定期检查审计日志
def check_suspicious_activity():
"""检查可疑活动"""
failed_logins = audit_logger.get_failed_logins("admin", limit=10)
if len(failed_logins) > 5:
# 发送警告
send_alert("Multiple failed login attempts for admin account")
import schedule
schedule.every(1).hour.do(check_suspicious_activity)
❌ 错误做法¶
# 不要忽略审计日志
@app.post("/auth/login")
async def login(username: str, password: str):
# 没有记录登录尝试 # ❌ 无法审计
user = authenticate_user(username, password)
access_token = jwt_auth.create_access_token(subject=str(user.id))
return {"access_token": access_token}
# 不要只记录成功的事件
# 记录失败的事件同样重要
7. HTTPS 和传输安全¶
✅ 正确做法¶
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
app = FastAPI()
# 配置 CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourdomain.com"], # 仅允许特定域名
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["Authorization"],
)
# 配置受信任的主机
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["yourdomain.com", "www.yourdomain.com"],
)
# 在生产环境中使用 HTTPS
# 使用 SSL 证书(Let's Encrypt)
# 配置 HSTS 头
❌ 错误做法¶
# 不要在开发环境中使用 HTTP
# uvicorn app:app --host 0.0.0.0 # ❌ 不安全
# 不要允许所有 CORS 源
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # ❌ 不安全
)
# 不要禁用 HTTPS
# 在生产环境中必须使用 HTTPS
8. 依赖和更新¶
✅ 正确做法¶
# 定期更新依赖
# pip install --upgrade PyJWT bcrypt
# 使用 requirements.txt 固定版本
# PyJWT==2.8.0
# bcrypt==4.0.0
# 定期检查安全漏洞
# pip install safety
# safety check
❌ 错误做法¶
9. 错误处理¶
✅ 正确做法¶
# 不要透露内部错误信息
@app.get("/data")
async def get_data(current_user: dict = Depends(get_current_user)):
try:
# 业务逻辑
pass
except Exception as e:
# 记录详细错误
logger.error(f"Error: {e}", exc_info=True)
# 返回通用错误信息
raise HTTPException(
status_code=500,
detail="Internal server error",
)
❌ 错误做法¶
# 不要在错误信息中透露敏感信息
raise HTTPException(
status_code=500,
detail=f"Database error: {str(e)}", # ❌ 泄露数据库信息
)
# 不要忽视错误
try:
# 业务逻辑
pass
except:
pass # ❌ 忽视错误
10. 部署检查清单¶
- 使用环境变量配置所有密钥
- 启用 HTTPS
- 配置 CORS
- 设置合理的 Token 过期时间
- 启用登录限制
- 启用审计日志
- 定期备份审计日志
- 监控可疑活动
- 定期更新依赖
- 进行安全审计
- 配置日志和监控
- 设置告警规则