审计日志指南¶
概述¶
FastAPI-Easy 提供了完整的审计日志系统,用于:
- 记录安全事件
- 跟踪用户活动
- 合规性审计
- 安全分析
快速开始¶
初始化审计日志¶
from fastapi_easy.security import AuditLogger, AuditEventType
# 创建审计日志记录器
audit_logger = AuditLogger(max_logs=10000)
记录事件¶
# 记录登录成功
audit_logger.log(
event_type=AuditEventType.LOGIN_SUCCESS,
user_id="user123",
username="john_doe",
status="success",
)
# 记录登录失败
audit_logger.log(
event_type=AuditEventType.LOGIN_FAILURE,
username="john_doe",
status="failure",
details={"reason": "invalid_password"},
)
# 记录权限被拒绝
audit_logger.log(
event_type=AuditEventType.PERMISSION_DENIED,
user_id="user123",
resource="admin_panel",
action="access",
status="failure",
)
事件类型¶
认证事件¶
AuditEventType.LOGIN_SUCCESS # 登录成功
AuditEventType.LOGIN_FAILURE # 登录失败
AuditEventType.LOGIN_LOCKED # 账户被锁定
AuditEventType.LOGOUT # 登出
AuditEventType.TOKEN_REFRESH # Token 刷新
AuditEventType.TOKEN_EXPIRED # Token 过期
授权事件¶
AuditEventType.PERMISSION_DENIED # 权限被拒绝
AuditEventType.PERMISSION_GRANTED # 权限被授予
AuditEventType.UNAUTHORIZED # 未授权
AuditEventType.ACCOUNT_LOCKED # 账户被锁定
AuditEventType.ACCOUNT_UNLOCKED # 账户被解锁
用户管理事件¶
AuditEventType.PASSWORD_CHANGED # 密码已更改
AuditEventType.ROLE_CHANGED # 角色已更改
AuditEventType.PERMISSION_CHANGED # 权限已更改
查询审计日志¶
获取所有日志¶
# 获取最近 100 条日志
logs = audit_logger.get_logs(limit=100)
for log in logs:
print(f"{log['timestamp']}: {log['event_type']} - {log['username']}")
按用户过滤¶
# 获取特定用户的活动
user_logs = audit_logger.get_user_activity(
username="john_doe",
limit=50,
)
for log in user_logs:
print(f"{log['timestamp']}: {log['event_type']}")
获取失败的登录¶
# 获取特定用户的失败登录尝试
failed_logins = audit_logger.get_failed_logins(
username="john_doe",
limit=10,
)
for login in failed_logins:
print(f"Failed login: {login['timestamp']}")
按事件类型过滤¶
from fastapi_easy.security import AuditEventType
# 获取所有权限拒绝事件
permission_denied = audit_logger.get_logs(
event_type=AuditEventType.PERMISSION_DENIED,
limit=50,
)
完整的审计日志集成¶
from fastapi import FastAPI, Depends, HTTPException
from fastapi_easy.security import (
PasswordManager,
LoginAttemptTracker,
AuditLogger,
AuditEventType,
get_current_user,
require_role,
)
app = FastAPI()
# 初始化
password_manager = PasswordManager()
tracker = LoginAttemptTracker()
audit_logger = AuditLogger()
@app.post("/auth/login")
async def login(username: str, password: str):
"""登录端点(带审计日志)"""
# 检查锁定
if tracker.is_locked_out(username):
audit_logger.log(
event_type=AuditEventType.LOGIN_LOCKED,
username=username,
status="failure",
)
raise HTTPException(status_code=429, detail="Account locked")
# 验证用户
user = authenticate_user(username, password)
if not user:
tracker.record_attempt(username, success=False)
audit_logger.log(
event_type=AuditEventType.LOGIN_FAILURE,
username=username,
status="failure",
)
raise HTTPException(status_code=401, detail="Invalid credentials")
# 成功登录
tracker.record_attempt(username, success=True)
audit_logger.log(
event_type=AuditEventType.LOGIN_SUCCESS,
user_id=str(user.id),
username=username,
status="success",
)
# 生成 Token
access_token = jwt_auth.create_access_token(subject=str(user.id))
return {"access_token": access_token, "token_type": "bearer"}
@app.post("/logout")
async def logout(current_user: dict = Depends(get_current_user)):
"""登出端点(带审计日志)"""
audit_logger.log(
event_type=AuditEventType.LOGOUT,
user_id=current_user["user_id"],
status="success",
)
return {"message": "Logged out successfully"}
@app.get("/admin-panel")
async def admin_panel(current_user: dict = Depends(require_role("admin"))):
"""管理员面板(带审计日志)"""
audit_logger.log(
event_type=AuditEventType.PERMISSION_GRANTED,
user_id=current_user["user_id"],
resource="admin_panel",
action="access",
status="success",
)
return {"message": "Welcome to admin panel"}
@app.get("/audit-logs")
async def get_audit_logs(
current_user: dict = Depends(require_role("admin")),
):
"""获取审计日志(仅管理员)"""
logs = audit_logger.export_logs()
return {"logs": logs}
审计日志分析¶
检测异常活动¶
def detect_suspicious_activity():
"""检测可疑活动"""
# 检查多次失败的登录
all_logs = audit_logger.export_logs()
failed_logins = {}
for log in all_logs:
if log["event_type"] == "login_failure":
username = log["username"]
failed_logins[username] = failed_logins.get(username, 0) + 1
# 如果某个用户有超过 10 次失败登录,发出警告
for username, count in failed_logins.items():
if count > 10:
print(f"⚠️ Warning: {username} has {count} failed login attempts")
# 定期运行检测
import schedule
schedule.every(1).hour.do(detect_suspicious_activity)
生成审计报告¶
def generate_audit_report(start_date, end_date):
"""生成审计报告"""
logs = audit_logger.export_logs()
# 按事件类型统计
event_counts = {}
for log in logs:
event_type = log["event_type"]
event_counts[event_type] = event_counts.get(event_type, 0) + 1
# 按用户统计
user_activity = {}
for log in logs:
username = log.get("username", "unknown")
user_activity[username] = user_activity.get(username, 0) + 1
return {
"period": f"{start_date} to {end_date}",
"event_counts": event_counts,
"user_activity": user_activity,
"total_events": len(logs),
}
# 生成报告
report = generate_audit_report("2025-01-01", "2025-01-31")
print(report)
最佳实践¶
1. 定期导出和备份日志¶
import json
from datetime import datetime
def backup_audit_logs():
"""备份审计日志"""
logs = audit_logger.export_logs()
filename = f"audit_logs_{datetime.now().isoformat()}.json"
with open(filename, "w") as f:
json.dump(logs, f, indent=2)
print(f"Audit logs backed up to {filename}")
# 每天备份
schedule.every().day.at("23:59").do(backup_audit_logs)
2. 监控关键事件¶
def monitor_critical_events():
"""监控关键事件"""
logs = audit_logger.export_logs()
critical_events = [
AuditEventType.PERMISSION_DENIED,
AuditEventType.LOGIN_LOCKED,
AuditEventType.ACCOUNT_LOCKED,
]
for log in logs:
if log["event_type"] in [e.value for e in critical_events]:
# 发送警告
send_alert(f"Critical event: {log['event_type']}")
3. 保留日志策略¶
def cleanup_old_logs(days=90):
"""清理旧日志"""
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=days)
# 导出日志到存储
logs = audit_logger.export_logs()
# 清理内存中的日志
audit_logger.clear_logs()
# 保存到数据库或文件系统
save_logs_to_storage(logs)
# 每周清理
schedule.every().week.do(cleanup_old_logs)
4. 合规性报告¶
def generate_compliance_report():
"""生成合规性报告"""
logs = audit_logger.export_logs()
# 统计登录事件
login_events = [l for l in logs if "login" in l["event_type"]]
# 统计权限变更
permission_changes = [l for l in logs if "permission" in l["event_type"]]
# 统计失败事件
failed_events = [l for l in logs if l["status"] == "failure"]
return {
"total_logins": len(login_events),
"permission_changes": len(permission_changes),
"failed_events": len(failed_events),
"compliance_status": "compliant" if len(failed_events) < 100 else "review_needed",
}
完整示例¶
参考 examples/06_with_permissions.py 获取完整的工作示例。