跳转至

WebSocket 支持

FastAPI-Easy 支持 WebSocket 实时通信。本文档介绍如何使用 WebSocket。


启用 WebSocket

from fastapi import WebSocket
from fastapi_easy import CRUDRouter

@app.websocket("/ws/items")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Echo: {data}")
    except Exception as e:
        await websocket.close()

WebSocket 连接

客户端连接

const ws = new WebSocket("ws://localhost:8000/ws/items");

ws.onopen = function(event) {
    console.log("Connected");
    ws.send("Hello");
};

ws.onmessage = function(event) {
    console.log("Message:", event.data);
};

ws.onclose = function(event) {
    console.log("Disconnected");
};

实时数据推送

服务器端

@app.websocket("/ws/items")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # 从数据库获取最新数据
            items = db.query(Item).all()
            await websocket.send_json([item.dict() for item in items])
            await asyncio.sleep(1)
    except Exception as e:
        await websocket.close()

客户端

ws.onmessage = function(event) {
    const items = JSON.parse(event.data);
    console.log("Items:", items);
};

最佳实践

1. 错误处理

@app.websocket("/ws/items")
async def websocket_endpoint(websocket: WebSocket):
    try:
        await websocket.accept()
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Echo: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")
    except Exception as e:
        print(f"Error: {e}")
        await websocket.close()

2. 连接管理

class ConnectionManager:
    def __init__(self):
        self.active_connections = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

常见问题

Q: 如何处理 WebSocket 断开连接?

A: 使用 try-except 捕获 WebSocketDisconnect 异常。

Q: 如何广播消息给所有连接?

A: 使用 ConnectionManager 管理所有连接。

Q: 如何在 WebSocket 中发送 JSON 数据?

A: 使用 websocket.send_json() 方法。