📖 协议介绍

什么是 WebSocket?

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。它于 2011 年被 IETF 定为标准 RFC 6455,设计目标是在 Web 浏览器和服务器之间提供实时、双向的数据传输能力。

为什么需要 WebSocket?

传统 HTTP 轮询

  • 客户端定期发送请求
  • 大量无效请求,浪费资源
  • 延迟高,实时性差
  • HTTP 头部开销大

WebSocket 优势

  • 持久连接,一次握手,多次通信
  • 真正的双向实时通信
  • 低延迟,高效率
  • 更小的协议开销

核心特性

  • 🔄 全双工通信:客户端和服务器可以同时发送和接收数据
  • 🌐 基于 TCP:建立在 TCP 协议之上,默认端口 80 (WS) 或 443 (WSS)
  • 🔗 持久连接:连接建立后保持开放状态,避免重复握手
  • 📦 轻量级:协议头部小,数据传输效率高
  • 🎯 跨域支持:天然支持跨域通信
  • 🔐 安全支持:支持 WSS (WebSocket Secure) 加密传输

协议层次

应用层 (WebSocket)
传输层 (TCP)
网络层 (IP)
链路层

🤝 握手过程

WebSocket 握手流程

WebSocket 连接建立通过 HTTP 升级机制完成,客户端发送一个带有特殊头部字段的 HTTP 请求。

sequenceDiagram participant Client as 客户端 participant Server as 服务器 Client->>Server: HTTP GET 请求
Upgrade: websocket
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== activate Server Server-->>Client: HTTP 101 响应
Upgrade: websocket
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo= deactivate Server Note over Client,Server: WebSocket 连接建立成功
开始全双工通信 Client->>Server: WebSocket 数据帧 Server->>Client: WebSocket 数据帧 Client->>Server: WebSocket 数据帧

客户端请求示例

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Version: 13

服务器响应示例

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

关键头部字段说明

字段名 作用
Upgrade 指示协议升级为 websocket
Connection 指示连接状态需要升级
Sec-WebSocket-Key 客户端发送的随机字符串,用于安全验证
Sec-WebSocket-Accept 服务器对 Key 的确认响应
Sec-WebSocket-Version WebSocket 协议版本,当前为 13

📦 数据帧结构

WebSocket 帧格式

WebSocket 数据传输使用帧(Frame)作为基本单位,每个帧包含头部和负载两部分。

flowchart TD subgraph Frame["WebSocket 帧"] direction TB subgraph Byte0["字节 0"] B0_1["FIN
1 bit"] B0_2["RSV1
1 bit"] B0_3["RSV2
1 bit"] B0_4["RSV3
1 bit"] B0_5["Opcode
4 bits"] end subgraph Byte1["字节 1"] B1_1["MASK
1 bit"] B1_2["Payload Length
7 bits"] end subgraph Extended["字节 2-9 (可选)"] EPL["Extended Payload Length
2 或 8 字节"] end subgraph MaskingKey["字节 10-13 (可选)"] MK["Masking Key
4 字节"] end subgraph Data["Payload Data"] PD["应用数据"] end end style Frame fill:#1e293b,stroke:#3b82f6,color:#fff style Byte0 fill:#3b82f6,stroke:#60a5fa,color:#fff style Byte1 fill:#3b82f6,stroke:#60a5fa,color:#fff style Extended fill:#8b5cf6,stroke:#a78bfa,color:#fff style MaskingKey fill:#10b981,stroke:#34d399,color:#fff style Data fill:#f59e0b,stroke:#fbbf24,color:#fff

字段详解

FIN (1 bit)

标识是否为消息的最后一个帧。1 表示最后一个帧,0 表示还有后续帧。

RSV1, RSV2, RSV3 (各 1 bit)

保留位,通常为 0,用于扩展协议。

Opcode (4 bits)

操作码,定义帧的类型:

  • 0x0: 连续帧
  • 0x1: 文本帧 (UTF-8)
  • 0x2: 二进制帧
  • 0x8: 关闭连接
  • 0x9: Ping
  • 0xA: Pong

MASK (1 bit)

标识负载是否经过掩码处理。客户端发送的帧必须为 1,服务器发送的帧必须为 0。

Payload Length (7 bits / 7+16 bits / 7+64 bits)

负载长度:

  • 0-125: 实际长度
  • 126: 后续 2 字节表示长度
  • 127: 后续 8 字节表示长度

Masking Key (4 bytes)

当 MASK=1 时存在,用于对负载数据进行异或运算。

Payload Data

实际的应用数据,可能经过掩码处理。

消息分片示例

大消息可以被拆分成多个帧传输:

graph LR A[消息开始
FIN=0 Opcode=0x1] --> B[中间帧
FIN=0 Opcode=0x0] B --> C[中间帧
FIN=0 Opcode=0x0] C --> D[消息结束
FIN=1 Opcode=0x0]

控制帧

控制帧用于连接管理,不能分片:

  • Close (0x8): 优雅关闭连接,可包含状态码和原因
  • Ping (0x9): 心跳检测,期望收到 Pong 响应
  • Pong (0xA): 对 Ping 的响应

🎯 常见应用场景

💬

实时聊天

即时通讯应用,支持群聊、私聊、消息推送等功能。WebSocket 提供低延迟的消息传输,确保用户实时接收消息。

📊

实时数据监控

服务器监控、股票行情、IoT 设备状态等场景,需要实时推送数据更新。WebSocket 可以高效地将数据变化推送给所有订阅的客户端。

🎮

在线游戏

多人在线游戏需要低延迟、高频率的状态同步。WebSocket 的全双工特性非常适合游戏场景,支持实时的玩家位置、动作等数据交换。

📝

协作编辑

类似 Google Docs 的实时协作编辑工具,多个用户同时编辑文档时,需要实时同步光标位置和内容变化。

🔔

推送通知

系统通知、提醒、消息推送等场景。服务器可以主动向客户端推送消息,无需客户端轮询。

📹

音视频通话

WebRTC 等音视频通信技术使用 WebSocket 进行信令交换,建立媒体连接。WebSocket 负责传递连接信息和控制指令。

🐍 Python 实现示例

服务器端实现

使用 websockets 库创建简单的 Echo 服务器:

# 安装依赖: pip install websockets
import asyncio
import websockets
import json

async def echo_server(websocket):
    """处理客户端连接"""
    print(f"客户端已连接: {websocket.remote_address}")
    
    try:
        async for message in websocket:
            # 解析消息
            try:
                data = json.loads(message)
                print(f"收到消息: {data}")
                
                # 处理消息
                response = {
                    "type": "echo",
                    "data": data,
                    "timestamp": asyncio.get_event_loop().time()
                }
                
                # 发送响应
                await websocket.send(json.dumps(response))
                
            except json.JSONDecodeError:
                # 非JSON消息,直接回显
                await websocket.send(f"Echo: {message}")
                
    except websockets.exceptions.ConnectionClosed:
        print(f"客户端断开连接: {websocket.remote_address}")

async def main():
    """启动服务器"""
    print("WebSocket 服务器启动在 ws://localhost:8765")
    
    async with websockets.serve(echo_server, "localhost", 8765):
        await asyncio.Future()  # 永久运行

if __name__ == "__main__":
    asyncio.run(main())

客户端实现

import asyncio
import websockets
import json

async def websocket_client():
    """WebSocket 客户端示例"""
    uri = "ws://localhost:8765"
    
    try:
        async with websockets.connect(uri) as websocket:
            print(f"已连接到服务器: {uri}")
            
            # 发送消息
            messages = [
                {"type": "greeting", "message": "Hello, Server!"},
                {"type": "data", "value": 42},
                {"type": "ping", "timestamp": 1234567890}
            ]
            
            for msg in messages:
                await websocket.send(json.dumps(msg))
                print(f"发送: {msg}")
                
                # 接收响应
                response = await websocket.recv()
                print(f"接收: {response}\n")
                
                # 短暂延迟
                await asyncio.sleep(1)
                
    except Exception as e:
        print(f"连接错误: {e}")

if __name__ == "__main__":
    asyncio.run(websocket_client())

广播服务器

实现消息广播功能,向所有连接的客户端发送消息:

import asyncio
import websockets
import json

# 存储所有连接的客户端
connected_clients = set()

async def broadcast(message):
    """向所有客户端广播消息"""
    if connected_clients:
        websockets.broadcast(connected_clients, message)
        print(f"广播消息给 {len(connected_clients)} 个客户端")

async def handle_client(websocket):
    """处理客户端连接"""
    connected_clients.add(websocket)
    print(f"客户端连接: {websocket.remote_address}, 总数: {len(connected_clients)}")
    
    try:
        async for message in websocket:
            data = json.loads(message)
            print(f"收到消息: {data}")
            
            # 广播给所有客户端
            await broadcast(json.dumps({
                "type": "broadcast",
                "from": str(websocket.remote_address),
                "data": data
            }))
            
    except websockets.exceptions.ConnectionClosed:
        print(f"客户端断开: {websocket.remote_address}")
    finally:
        connected_clients.remove(websocket)
        print(f"客户端移除, 剩余: {len(connected_clients)}")

async def main():
    """启动广播服务器"""
    print("广播服务器启动在 ws://localhost:8765")
    
    async with websockets.serve(handle_client, "localhost", 8765):
        await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())

🔷 Golang 实现示例

服务器端实现

使用 gorilla/websocket 库创建 WebSocket 服务器:

// 安装依赖: go get github.com/gorilla/websocket
package main

import (
    "encoding/json"
    "log"
    "net/http"
    "time"

    "github.com/gorilla/websocket"
)

// WebSocket 升级器
var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool {
        return true // 生产环境需要验证 Origin
    },
}

// 消息结构
type Message struct {
    Type      string      `json:"type"`
    Data      interface{} `json:"data"`
    Timestamp float64     `json:"timestamp"`
}

func handleWebSocket(w http.ResponseWriter, r *http.Request) {
    // 升级 HTTP 连接为 WebSocket
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Printf("升级失败: %v", err)
        return
    }
    defer conn.Close()

    log.Printf("客户端已连接: %s", conn.RemoteAddr())

    for {
        // 读取消息
        messageType, message, err := conn.ReadMessage()
        if err != nil {
            log.Printf("读取错误: %v", err)
            break
        }

        log.Printf("收到消息: %s", string(message))

        // 解析 JSON 消息
        var msg Message
        if err := json.Unmarshal(message, &msg); err == nil {
            // 构造响应
            response := Message{
                Type:      "echo",
                Data:      msg.Data,
                Timestamp: float64(time.Now().UnixNano()) / 1e9,
            }

            responseBytes, _ := json.Marshal(response)
            
            // 发送响应
            if err := conn.WriteMessage(messageType, responseBytes); err != nil {
                log.Printf("发送错误: %v", err)
                break
            }
        } else {
            // 非 JSON 消息,直接回显
            if err := conn.WriteMessage(messageType, message); err != nil {
                log.Printf("发送错误: %v", err)
                break
            }
        }
    }

    log.Printf("客户端断开连接: %s", conn.RemoteAddr())
}

func main() {
    http.HandleFunc("/ws", handleWebSocket)
    
    log.Println("WebSocket 服务器启动在 ws://localhost:8080/ws")
    if err := http.ListenAndServe(":8080", nil); err != nil {
        log.Fatal("服务器启动失败: ", err)
    }
}

客户端实现

package main

import (
    "encoding/json"
    "log"
    "net/url"
    "time"

    "github.com/gorilla/websocket"
)

type Message struct {
    Type      string      `json:"type"`
    Data      interface{} `json:"data"`
    Timestamp float64     `json:"timestamp"`
}

func main() {
    // 建立 WebSocket 连接
    u := url.URL{Scheme: "ws", Host: "localhost:8080", Path: "/ws"}
    log.Printf("连接到服务器: %s", u.String())

    conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
    if err != nil {
        log.Fatal("连接失败: ", err)
    }
    defer conn.Close()

    // 发送消息
    messages := []Message{
        {Type: "greeting", Data: "Hello, Server!"},
        {Type: "data", Data: 42},
        {Type: "ping", Data: time.Now().Unix()},
    }

    for _, msg := range messages {
        // 序列化消息
        messageBytes, err := json.Marshal(msg)
        if err != nil {
            log.Printf("序列化错误: %v", err)
            continue
        }

        // 发送消息
        if err := conn.WriteMessage(websocket.TextMessage, messageBytes); err != nil {
            log.Printf("发送错误: %v", err)
            break
        }

        log.Printf("发送: %+v", msg)

        // 接收响应
        _, response, err := conn.ReadMessage()
        if err != nil {
            log.Printf("接收错误: %v", err)
            break
        }

        log.Printf("接收: %s\n", string(response))
        time.Sleep(1 * time.Second)
    }
}

Hub 模式 - 广播服务器

package main

import (
    "encoding/json"
    "log"
    "net/http"
    "sync"

    "github.com/gorilla/websocket"
)

// Hub 管理所有客户端连接
type Hub struct {
    clients    map[*Client]bool
    broadcast  chan []byte
    register   chan *Client
    unregister chan *Client
    mu         sync.RWMutex
}

// Client 表示一个 WebSocket 客户端
type Client struct {
    hub  *Hub
    conn *websocket.Conn
    send chan []byte
}

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

// NewHub 创建新的 Hub
func NewHub() *Hub {
    return &Hub{
        clients:    make(map[*Client]bool),
        broadcast:  make(chan []byte),
        register:   make(chan *Client),
        unregister: make(chan *Client),
    }
}

// Run 运行 Hub
func (h *Hub) Run() {
    for {
        select {
        case client := <-h.register:
            h.mu.Lock()
            h.clients[client] = true
            h.mu.Unlock()
            log.Printf("客户端连接, 总数: %d", len(h.clients))

        case client := <-h.unregister:
            h.mu.Lock()
            if _, ok := h.clients[client]; ok {
                delete(h.clients, client)
                close(client.send)
            }
            h.mu.Unlock()
            log.Printf("客户端断开, 剩余: %d", len(h.clients))

        case message := <-h.broadcast:
            h.mu.RLock()
            for client := range h.clients {
                select {
                case client.send <- message:
                default:
                    close(client.send)
                    delete(h.clients, client)
                }
            }
            h.mu.RUnlock()
        }
    }
}

// readPump 从客户端读取消息
func (c *Client) readPump() {
    defer func() {
        c.hub.unregister <- c
        c.conn.Close()
    }()

    for {
        _, message, err := c.conn.ReadMessage()
        if err != nil {
            break
        }

        // 广播消息
        var msg map[string]interface{}
        if err := json.Unmarshal(message, &msg); err == nil {
            msg["from"] = c.conn.RemoteAddr().String()
            broadcastMsg, _ := json.Marshal(msg)
            c.hub.broadcast <- broadcastMsg
        }
    }
}

// writePump 向客户端发送消息
func (c *Client) writePump() {
    defer c.conn.Close()

    for message := range c.send {
        err := c.conn.WriteMessage(websocket.TextMessage, message)
        if err != nil {
            break
        }
    }
}

var hub = NewHub()

func handleWebSocket(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Printf("升级失败: %v", err)
        return
    }

    client := &Client{
        hub:  hub,
        conn: conn,
        send: make(chan []byte, 256),
    }

    hub.register <- client

    go client.writePump()
    go client.readPump()
}

func main() {
    go hub.Run()

    http.HandleFunc("/ws", handleWebSocket)
    
    log.Println("广播服务器启动在 ws://localhost:8080/ws")
    if err := http.ListenAndServe(":8080", nil); err != nil {
        log.Fatal("服务器启动失败: ", err)
    }
}

✨ 最佳实践

性能优化

  • 使用连接池管理 WebSocket 连接
  • 实现心跳机制检测连接状态
  • 合理设置消息缓冲区大小
  • 使用二进制帧传输大数据
  • 启用消息压缩(permessage-deflate)

安全考虑

  • 生产环境使用 WSS (WebSocket Secure)
  • 验证 Origin 头部防止 CSRF 攻击
  • 实现连接数限制和速率限制
  • 输入验证,防止注入攻击
  • 定期更新依赖库

错误处理

  • 实现优雅的连接关闭机制
  • 处理网络中断和重连逻辑
  • 记录详细的错误日志
  • 设置合理的超时时间
  • 实现状态码和错误消息规范