📖 协议介绍
什么是 WebSocket?
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。它于 2011 年被 IETF 定为标准 RFC 6455,设计目标是在 Web 浏览器和服务器之间提供实时、双向的数据传输能力。
为什么需要 WebSocket?
传统 HTTP 轮询
- 客户端定期发送请求
- 大量无效请求,浪费资源
- 延迟高,实时性差
- HTTP 头部开销大
WebSocket 优势
- 持久连接,一次握手,多次通信
- 真正的双向实时通信
- 低延迟,高效率
- 更小的协议开销
核心特性
- 🔄 全双工通信:客户端和服务器可以同时发送和接收数据
- 🌐 基于 TCP:建立在 TCP 协议之上,默认端口 80 (WS) 或 443 (WSS)
- 🔗 持久连接:连接建立后保持开放状态,避免重复握手
- 📦 轻量级:协议头部小,数据传输效率高
- 🎯 跨域支持:天然支持跨域通信
- 🔐 安全支持:支持 WSS (WebSocket Secure) 加密传输
协议层次
🤝 握手过程
WebSocket 握手流程
WebSocket 连接建立通过 HTTP 升级机制完成,客户端发送一个带有特殊头部字段的 HTTP 请求。
Upgrade: websocket
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== activate Server Server-->>Client: HTTP 101 响应
Upgrade: websocket
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo= deactivate Server Note over Client,Server: WebSocket 连接建立成功
开始全双工通信 Client->>Server: WebSocket 数据帧 Server->>Client: WebSocket 数据帧 Client->>Server: WebSocket 数据帧
客户端请求示例
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Version: 13
服务器响应示例
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
关键头部字段说明
| 字段名 | 作用 |
|---|---|
Upgrade |
指示协议升级为 websocket |
Connection |
指示连接状态需要升级 |
Sec-WebSocket-Key |
客户端发送的随机字符串,用于安全验证 |
Sec-WebSocket-Accept |
服务器对 Key 的确认响应 |
Sec-WebSocket-Version |
WebSocket 协议版本,当前为 13 |
📦 数据帧结构
WebSocket 帧格式
WebSocket 数据传输使用帧(Frame)作为基本单位,每个帧包含头部和负载两部分。
1 bit"] B0_2["RSV1
1 bit"] B0_3["RSV2
1 bit"] B0_4["RSV3
1 bit"] B0_5["Opcode
4 bits"] end subgraph Byte1["字节 1"] B1_1["MASK
1 bit"] B1_2["Payload Length
7 bits"] end subgraph Extended["字节 2-9 (可选)"] EPL["Extended Payload Length
2 或 8 字节"] end subgraph MaskingKey["字节 10-13 (可选)"] MK["Masking Key
4 字节"] end subgraph Data["Payload Data"] PD["应用数据"] end end style Frame fill:#1e293b,stroke:#3b82f6,color:#fff style Byte0 fill:#3b82f6,stroke:#60a5fa,color:#fff style Byte1 fill:#3b82f6,stroke:#60a5fa,color:#fff style Extended fill:#8b5cf6,stroke:#a78bfa,color:#fff style MaskingKey fill:#10b981,stroke:#34d399,color:#fff style Data fill:#f59e0b,stroke:#fbbf24,color:#fff
字段详解
FIN (1 bit)
标识是否为消息的最后一个帧。1 表示最后一个帧,0 表示还有后续帧。
RSV1, RSV2, RSV3 (各 1 bit)
保留位,通常为 0,用于扩展协议。
Opcode (4 bits)
操作码,定义帧的类型:
0x0: 连续帧0x1: 文本帧 (UTF-8)0x2: 二进制帧0x8: 关闭连接0x9: Ping0xA: Pong
MASK (1 bit)
标识负载是否经过掩码处理。客户端发送的帧必须为 1,服务器发送的帧必须为 0。
Payload Length (7 bits / 7+16 bits / 7+64 bits)
负载长度:
- 0-125: 实际长度
- 126: 后续 2 字节表示长度
- 127: 后续 8 字节表示长度
Masking Key (4 bytes)
当 MASK=1 时存在,用于对负载数据进行异或运算。
Payload Data
实际的应用数据,可能经过掩码处理。
消息分片示例
大消息可以被拆分成多个帧传输:
FIN=0 Opcode=0x1] --> B[中间帧
FIN=0 Opcode=0x0] B --> C[中间帧
FIN=0 Opcode=0x0] C --> D[消息结束
FIN=1 Opcode=0x0]
控制帧
控制帧用于连接管理,不能分片:
- Close (0x8): 优雅关闭连接,可包含状态码和原因
- Ping (0x9): 心跳检测,期望收到 Pong 响应
- Pong (0xA): 对 Ping 的响应
🎯 常见应用场景
实时聊天
即时通讯应用,支持群聊、私聊、消息推送等功能。WebSocket 提供低延迟的消息传输,确保用户实时接收消息。
实时数据监控
服务器监控、股票行情、IoT 设备状态等场景,需要实时推送数据更新。WebSocket 可以高效地将数据变化推送给所有订阅的客户端。
在线游戏
多人在线游戏需要低延迟、高频率的状态同步。WebSocket 的全双工特性非常适合游戏场景,支持实时的玩家位置、动作等数据交换。
协作编辑
类似 Google Docs 的实时协作编辑工具,多个用户同时编辑文档时,需要实时同步光标位置和内容变化。
推送通知
系统通知、提醒、消息推送等场景。服务器可以主动向客户端推送消息,无需客户端轮询。
音视频通话
WebRTC 等音视频通信技术使用 WebSocket 进行信令交换,建立媒体连接。WebSocket 负责传递连接信息和控制指令。
🐍 Python 实现示例
服务器端实现
使用 websockets 库创建简单的 Echo 服务器:
# 安装依赖: pip install websockets
import asyncio
import websockets
import json
async def echo_server(websocket):
"""处理客户端连接"""
print(f"客户端已连接: {websocket.remote_address}")
try:
async for message in websocket:
# 解析消息
try:
data = json.loads(message)
print(f"收到消息: {data}")
# 处理消息
response = {
"type": "echo",
"data": data,
"timestamp": asyncio.get_event_loop().time()
}
# 发送响应
await websocket.send(json.dumps(response))
except json.JSONDecodeError:
# 非JSON消息,直接回显
await websocket.send(f"Echo: {message}")
except websockets.exceptions.ConnectionClosed:
print(f"客户端断开连接: {websocket.remote_address}")
async def main():
"""启动服务器"""
print("WebSocket 服务器启动在 ws://localhost:8765")
async with websockets.serve(echo_server, "localhost", 8765):
await asyncio.Future() # 永久运行
if __name__ == "__main__":
asyncio.run(main())
客户端实现
import asyncio
import websockets
import json
async def websocket_client():
"""WebSocket 客户端示例"""
uri = "ws://localhost:8765"
try:
async with websockets.connect(uri) as websocket:
print(f"已连接到服务器: {uri}")
# 发送消息
messages = [
{"type": "greeting", "message": "Hello, Server!"},
{"type": "data", "value": 42},
{"type": "ping", "timestamp": 1234567890}
]
for msg in messages:
await websocket.send(json.dumps(msg))
print(f"发送: {msg}")
# 接收响应
response = await websocket.recv()
print(f"接收: {response}\n")
# 短暂延迟
await asyncio.sleep(1)
except Exception as e:
print(f"连接错误: {e}")
if __name__ == "__main__":
asyncio.run(websocket_client())
广播服务器
实现消息广播功能,向所有连接的客户端发送消息:
import asyncio
import websockets
import json
# 存储所有连接的客户端
connected_clients = set()
async def broadcast(message):
"""向所有客户端广播消息"""
if connected_clients:
websockets.broadcast(connected_clients, message)
print(f"广播消息给 {len(connected_clients)} 个客户端")
async def handle_client(websocket):
"""处理客户端连接"""
connected_clients.add(websocket)
print(f"客户端连接: {websocket.remote_address}, 总数: {len(connected_clients)}")
try:
async for message in websocket:
data = json.loads(message)
print(f"收到消息: {data}")
# 广播给所有客户端
await broadcast(json.dumps({
"type": "broadcast",
"from": str(websocket.remote_address),
"data": data
}))
except websockets.exceptions.ConnectionClosed:
print(f"客户端断开: {websocket.remote_address}")
finally:
connected_clients.remove(websocket)
print(f"客户端移除, 剩余: {len(connected_clients)}")
async def main():
"""启动广播服务器"""
print("广播服务器启动在 ws://localhost:8765")
async with websockets.serve(handle_client, "localhost", 8765):
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
🔷 Golang 实现示例
服务器端实现
使用 gorilla/websocket 库创建 WebSocket 服务器:
// 安装依赖: go get github.com/gorilla/websocket
package main
import (
"encoding/json"
"log"
"net/http"
"time"
"github.com/gorilla/websocket"
)
// WebSocket 升级器
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true // 生产环境需要验证 Origin
},
}
// 消息结构
type Message struct {
Type string `json:"type"`
Data interface{} `json:"data"`
Timestamp float64 `json:"timestamp"`
}
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
// 升级 HTTP 连接为 WebSocket
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("升级失败: %v", err)
return
}
defer conn.Close()
log.Printf("客户端已连接: %s", conn.RemoteAddr())
for {
// 读取消息
messageType, message, err := conn.ReadMessage()
if err != nil {
log.Printf("读取错误: %v", err)
break
}
log.Printf("收到消息: %s", string(message))
// 解析 JSON 消息
var msg Message
if err := json.Unmarshal(message, &msg); err == nil {
// 构造响应
response := Message{
Type: "echo",
Data: msg.Data,
Timestamp: float64(time.Now().UnixNano()) / 1e9,
}
responseBytes, _ := json.Marshal(response)
// 发送响应
if err := conn.WriteMessage(messageType, responseBytes); err != nil {
log.Printf("发送错误: %v", err)
break
}
} else {
// 非 JSON 消息,直接回显
if err := conn.WriteMessage(messageType, message); err != nil {
log.Printf("发送错误: %v", err)
break
}
}
}
log.Printf("客户端断开连接: %s", conn.RemoteAddr())
}
func main() {
http.HandleFunc("/ws", handleWebSocket)
log.Println("WebSocket 服务器启动在 ws://localhost:8080/ws")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal("服务器启动失败: ", err)
}
}
客户端实现
package main
import (
"encoding/json"
"log"
"net/url"
"time"
"github.com/gorilla/websocket"
)
type Message struct {
Type string `json:"type"`
Data interface{} `json:"data"`
Timestamp float64 `json:"timestamp"`
}
func main() {
// 建立 WebSocket 连接
u := url.URL{Scheme: "ws", Host: "localhost:8080", Path: "/ws"}
log.Printf("连接到服务器: %s", u.String())
conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
log.Fatal("连接失败: ", err)
}
defer conn.Close()
// 发送消息
messages := []Message{
{Type: "greeting", Data: "Hello, Server!"},
{Type: "data", Data: 42},
{Type: "ping", Data: time.Now().Unix()},
}
for _, msg := range messages {
// 序列化消息
messageBytes, err := json.Marshal(msg)
if err != nil {
log.Printf("序列化错误: %v", err)
continue
}
// 发送消息
if err := conn.WriteMessage(websocket.TextMessage, messageBytes); err != nil {
log.Printf("发送错误: %v", err)
break
}
log.Printf("发送: %+v", msg)
// 接收响应
_, response, err := conn.ReadMessage()
if err != nil {
log.Printf("接收错误: %v", err)
break
}
log.Printf("接收: %s\n", string(response))
time.Sleep(1 * time.Second)
}
}
Hub 模式 - 广播服务器
package main
import (
"encoding/json"
"log"
"net/http"
"sync"
"github.com/gorilla/websocket"
)
// Hub 管理所有客户端连接
type Hub struct {
clients map[*Client]bool
broadcast chan []byte
register chan *Client
unregister chan *Client
mu sync.RWMutex
}
// Client 表示一个 WebSocket 客户端
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte
}
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// NewHub 创建新的 Hub
func NewHub() *Hub {
return &Hub{
clients: make(map[*Client]bool),
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
}
}
// Run 运行 Hub
func (h *Hub) Run() {
for {
select {
case client := <-h.register:
h.mu.Lock()
h.clients[client] = true
h.mu.Unlock()
log.Printf("客户端连接, 总数: %d", len(h.clients))
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
h.mu.Unlock()
log.Printf("客户端断开, 剩余: %d", len(h.clients))
case message := <-h.broadcast:
h.mu.RLock()
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
h.mu.RUnlock()
}
}
}
// readPump 从客户端读取消息
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
break
}
// 广播消息
var msg map[string]interface{}
if err := json.Unmarshal(message, &msg); err == nil {
msg["from"] = c.conn.RemoteAddr().String()
broadcastMsg, _ := json.Marshal(msg)
c.hub.broadcast <- broadcastMsg
}
}
}
// writePump 向客户端发送消息
func (c *Client) writePump() {
defer c.conn.Close()
for message := range c.send {
err := c.conn.WriteMessage(websocket.TextMessage, message)
if err != nil {
break
}
}
}
var hub = NewHub()
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("升级失败: %v", err)
return
}
client := &Client{
hub: hub,
conn: conn,
send: make(chan []byte, 256),
}
hub.register <- client
go client.writePump()
go client.readPump()
}
func main() {
go hub.Run()
http.HandleFunc("/ws", handleWebSocket)
log.Println("广播服务器启动在 ws://localhost:8080/ws")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal("服务器启动失败: ", err)
}
}
✨ 最佳实践
性能优化
- 使用连接池管理 WebSocket 连接
- 实现心跳机制检测连接状态
- 合理设置消息缓冲区大小
- 使用二进制帧传输大数据
- 启用消息压缩(permessage-deflate)
安全考虑
- 生产环境使用 WSS (WebSocket Secure)
- 验证 Origin 头部防止 CSRF 攻击
- 实现连接数限制和速率限制
- 输入验证,防止注入攻击
- 定期更新依赖库
错误处理
- 实现优雅的连接关闭机制
- 处理网络中断和重连逻辑
- 记录详细的错误日志
- 设置合理的超时时间
- 实现状态码和错误消息规范