密码和登录限制指南¶
概述¶
FastAPI-Easy 提供了:
- 安全的密码哈希 (bcrypt)
- 登录尝试限制
- 账户锁定机制
- 暴力破解防护
密码管理¶
初始化密码管理器¶
from fastapi_easy.security import PasswordManager
# 创建密码管理器
password_manager = PasswordManager(rounds=12)
哈希密码¶
# 在用户注册时哈希密码
password = "user_password_123"
hashed_password = password_manager.hash_password(password)
# 保存到数据库
user.password_hash = hashed_password
db.add(user)
db.commit()
验证密码¶
# 在用户登录时验证密码
@app.post("/auth/login")
async def login(username: str, password: str):
user = db.query(User).filter(User.username == username).first()
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
# 验证密码
if not password_manager.verify_password(password, user.password_hash):
raise HTTPException(status_code=401, detail="Invalid credentials")
# 生成 Token
access_token = jwt_auth.create_access_token(subject=str(user.id))
return {"access_token": access_token, "token_type": "bearer"}
检查是否需要重新哈希¶
# 定期检查密码是否需要重新哈希
# 例如:当 bcrypt rounds 增加时
new_password_manager = PasswordManager(rounds=14)
if new_password_manager.needs_rehash(user.password_hash):
# 在用户下次登录时重新哈希
new_hash = new_password_manager.hash_password(original_password)
user.password_hash = new_hash
db.commit()
登录限制¶
初始化登录尝试跟踪器¶
from fastapi_easy.security import LoginAttemptTracker
# 创建跟踪器
# 最多 5 次尝试,锁定 15 分钟,60 分钟后重置
tracker = LoginAttemptTracker(
max_attempts=5,
lockout_duration_minutes=15,
reset_duration_minutes=60,
)
记录登录尝试¶
@app.post("/auth/login")
async def login(username: str, password: str):
# 检查是否被锁定
if tracker.is_locked_out(username):
remaining = tracker.get_lockout_remaining_seconds(username)
raise HTTPException(
status_code=429,
detail=f"Account locked. Try again in {remaining} seconds",
)
user = db.query(User).filter(User.username == username).first()
if not user or not password_manager.verify_password(password, user.password_hash):
# 记录失败的尝试
tracker.record_attempt(username, success=False)
attempt_count = tracker.get_attempt_count(username)
raise HTTPException(
status_code=401,
detail=f"Invalid credentials ({attempt_count} attempts)",
)
# 记录成功的尝试(清除计数)
tracker.record_attempt(username, success=True)
# 生成 Token
access_token = jwt_auth.create_access_token(subject=str(user.id))
return {"access_token": access_token, "token_type": "bearer"}
获取锁定信息¶
@app.get("/auth/lockout-status/{username}")
async def get_lockout_status(username: str):
"""获取账户锁定状态"""
if tracker.is_locked_out(username):
remaining = tracker.get_lockout_remaining_seconds(username)
return {
"locked": True,
"remaining_seconds": remaining,
}
else:
attempt_count = tracker.get_attempt_count(username)
return {
"locked": False,
"attempts": attempt_count,
"max_attempts": 5,
}
重置用户¶
@app.post("/admin/reset-lockout/{username}")
async def reset_lockout(
username: str,
current_user: dict = Depends(require_role("admin")),
):
"""管理员重置用户锁定状态"""
tracker.reset_user(username)
return {"message": f"Lockout reset for {username}"}
完整的登录流程¶
from fastapi_easy.security import (
PasswordManager,
LoginAttemptTracker,
AuditLogger,
AuditEventType,
)
# 初始化
password_manager = PasswordManager()
tracker = LoginAttemptTracker()
audit_logger = AuditLogger()
@app.post("/auth/login")
async def login(username: str, password: str):
"""完整的登录流程"""
# 1. 检查是否被锁定
if tracker.is_locked_out(username):
remaining = tracker.get_lockout_remaining_seconds(username)
audit_logger.log(
event_type=AuditEventType.LOGIN_LOCKED,
username=username,
status="failure",
)
raise HTTPException(status_code=429, detail="Account locked")
# 2. 查找用户
user = db.query(User).filter(User.username == username).first()
if not user:
tracker.record_attempt(username, success=False)
audit_logger.log(
event_type=AuditEventType.LOGIN_FAILURE,
username=username,
status="failure",
details={"reason": "user_not_found"},
)
raise HTTPException(status_code=401, detail="Invalid credentials")
# 3. 验证密码
if not password_manager.verify_password(password, user.password_hash):
tracker.record_attempt(username, success=False)
audit_logger.log(
event_type=AuditEventType.LOGIN_FAILURE,
username=username,
user_id=str(user.id),
status="failure",
details={"reason": "invalid_password"},
)
raise HTTPException(status_code=401, detail="Invalid credentials")
# 4. 记录成功的尝试
tracker.record_attempt(username, success=True)
# 5. 生成 Token
access_token = jwt_auth.create_access_token(subject=str(user.id))
refresh_token = jwt_auth.create_refresh_token(subject=str(user.id))
# 6. 记录审计日志
audit_logger.log(
event_type=AuditEventType.LOGIN_SUCCESS,
username=username,
user_id=str(user.id),
status="success",
)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
}
最佳实践¶
1. 使用强密码要求¶
import re
def validate_password(password: str) -> bool:
"""验证密码强度"""
# 至少 8 个字符
if len(password) < 8:
return False
# 包含大写字母
if not re.search(r'[A-Z]', password):
return False
# 包含小写字母
if not re.search(r'[a-z]', password):
return False
# 包含数字
if not re.search(r'\d', password):
return False
# 包含特殊字符
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
return False
return True
@app.post("/auth/register")
async def register(username: str, password: str):
if not validate_password(password):
raise HTTPException(
status_code=400,
detail="Password must be at least 8 characters and contain uppercase, lowercase, digit, and special character",
)
# 继续注册流程
2. 定期更新 bcrypt rounds¶
# 每年或根据安全建议更新 rounds
# 当计算能力增加时,增加 rounds 以保持安全性
password_manager = PasswordManager(rounds=14) # 从 12 增加到 14
3. 监控登录失败¶
# 定期检查登录失败
failed_logins = audit_logger.get_failed_logins(username="admin", limit=10)
for login in failed_logins:
print(f"Failed login attempt: {login['timestamp']}")
4. 实现密码重置¶
@app.post("/auth/forgot-password")
async def forgot_password(email: str):
"""密码重置请求"""
user = db.query(User).filter(User.email == email).first()
if not user:
# 不要透露用户是否存在
return {"message": "If user exists, reset link will be sent"}
# 生成重置令牌
reset_token = secrets.token_urlsafe(32)
user.reset_token = reset_token
user.reset_token_expires = datetime.utcnow() + timedelta(hours=1)
db.commit()
# 发送重置链接
send_reset_email(user.email, reset_token)
return {"message": "Reset link sent to email"}
@app.post("/auth/reset-password")
async def reset_password(token: str, new_password: str):
"""重置密码"""
user = db.query(User).filter(User.reset_token == token).first()
if not user or user.reset_token_expires < datetime.utcnow():
raise HTTPException(status_code=400, detail="Invalid or expired token")
# 哈希新密码
user.password_hash = password_manager.hash_password(new_password)
user.reset_token = None
user.reset_token_expires = None
db.commit()
return {"message": "Password reset successfully"}
完整示例¶
参考 examples/06_with_permissions.py 获取完整的工作示例。