Websocket 教程

引言

在 Web2.0 时代,用户越来越期望 Web 应用的实时交互功能,而传统的 Web 技术实现起来会比较繁琐且浪费资源。因此在 2011 年的时候,IETF 发布了 WebScoket 协议标准,该协议允许客户端和服务端实时交互和双向沟通。

这篇博客将介绍什么是 WebSocket,WebSocket 是如何工作的。

背景

传统的 Web 应用程序依赖于 HTTP 的请求响应模型:客户端发送一次 HTTP 请求,然后等待服务器的响应。这种方式在大多数的场景下都没有问题,但是对于实时性要求比较高的场景,比如实时更新(比赛分数),直播弹幕,在线聊天,请求响应模型只能够实现一个伪实时系统,且非常的浪费资源。为什么这么说呢?

请求响应模型要实时获取服务器中最新的消息,只能够通过长轮询(long polling)的方式来实现,但是长轮询的方式非常的消耗资源,多次 HTTP 请求,只有一次请求是有效的, 其他的请求都是无效的, 这种方式既消耗资源,效率又不高。同时请求响应模型实现起来需要在实时性和浪费资源这两个方面进行取舍。要想实时性高,就得加快轮询的频率,这也就意味着无效请求大量增加;要想节省资源,就得降低轮询频率,这就导致实时性变低。

而 WebSocket 就非常好的解决了请求响应模型的问题。

什么是 WebSocket

WebSocket 是建立在 TCP 长连接上的一个全双工应用层协议:

  • TCP 长连接:意味着免去了请求响应模型中重复打开和关闭 TCP 连接的开销。
  • 全双工:意味着通信的双方可以同时发送和接受信息,互不干扰。

WebSocket 的工作原理

客户端通过 HTTP 向服务器发送将协议切换成 WebSocket 的握手请求。如果服务器支持 WebSocket 协议,就会在同一个 TCP 上将 HTTP 协议切换成 WebSocket 协议。升级的过程如下:

  1. 客户端向服务器发送带有指定请求头的 GET 请求,请求示例:
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Origin: http://example.com

比较重要的请求头:

  • Upgrade: websocket: 表示客户端想要升级成 WebSocket 协议
  • Connection: Upgrade: 表示该请求是一个想要升级连接的请求
  • Sec-WebSocket-Key: 客户端生成的 base64 编码随机值,用于服务器生成响应和验证握手
  • Sec-WebSocket-Version: 客户端所支持的 WebSocket 协议版本,通常是 13。如果服务端不支持该版本,需要返回一个Sec-WebSocket-Versionheader,用于告知客户端服务端支持的 WeSocket 版本。
  • Origin: 告诉服务器请求来源于哪个网站,非必须字段,基于安全方面的考虑,强烈推荐带上该请求头。服务端可通过白名单机制来决定是否要应答此次握手。
  1. 服务端验证以及接受升级

如果服务器接受升级,服务器需要将响应码设置成 101,并返回指定的一些字段:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

比较重要的字段:

  • 101 Switching Protocols: 表示同意切换协议
  • Upgrade: websocket: 确认切换的协议为:websocket
  • Connection: Upgrade: 确认升级连接
  • Sec-WebSocket-Accept: 该字段的值由服务器基于请求的 Sec-WebSocket-Key 字段生成。服务端将 Sec-WebSocket-Key 字段的值与一个 GUID 进行拼接,并使用 SHA-1 算法进行 hash 计算,最后将 hash 值使用 Base64 算法进行编码
  1. TCP 连接保持打开

WebSocket 的 URL 以 ws://wss:// 开头,其他的都与 http://https:// 类似。

  • ws:// 明文进行传输
  • was:// 基于 TSL/SSL 进行密文传输

WebSocket 使用场景

  • 实时聊天应用
  • 在线游戏
  • 多人协同应用
  • 金融交易平台
  • ……

搭建 WebSocket 服务器

引入依赖

在 pom.xml 文件中新增 websocket 的依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

具体代码

WebSocketConfig.java

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

WebSocketServerConfig.java

import com.alibaba.nacos.common.utils.CollectionUtils;
import org.springframework.stereotype.Component;

import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
import java.util.List;
import java.util.Map;

@Component
public class WebSocketServerConfig extends ServerEndpointConfig.Configurator {

    @Override
    public boolean checkOrigin(String originHeaderValue) {
        return true;
    }

    @Override
    public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
        Map<String, List<String>> parameterMap = request.getParameterMap();
        List<String> erpList = parameterMap.get("erp");
        if(!CollectionUtils.isEmpty(erpList)){
            sec.getUserProperties().put("erp", erpList.get(0));
        }
    }

}

ChickenSocket.java

import com.autmaple.circle.server.config.websocket.WebSocketServerConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@ServerEndpoint(value = "/chicken/socket", configurator = WebSocketServerConfig.class)
@Component
public class ChickenSocket {

    /**
     * 记录当前在线连接数
     */
    private static final AtomicInteger onlineCount = new AtomicInteger(0);

    /**
     * 存放所有在线的客户端
     */
    private static final Map<String, ChickenSocket> clients = new ConcurrentHashMap<>();

    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;

    /**
     * erp唯一标识
     */
    private String erp = "";

    public Session getSession() {
        return session;
    }

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, EndpointConfig conf) throws IOException {
        //获取用户信息
        try {
            Map<String, Object> userProperties = conf.getUserProperties();
            String erp = (String) userProperties.get("erp");
            this.erp = erp;
            this.session = session;
            if (clients.containsKey(this.erp)) {
                clients.get(this.erp).session.close();
                clients.remove(this.erp);
                onlineCount.decrementAndGet();
            }
            clients.put(this.erp, this);
            onlineCount.incrementAndGet();
            log.info("有新连接加入:{},当前在线人数为:{}", erp, onlineCount.get());
            sendMessage("连接成功", this.session);
        } catch (Exception e) {
            log.error("建立链接错误{}", e.getMessage(), e);
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        try {
            if (clients.containsKey(erp)) {
                clients.get(erp).session.close();
                clients.remove(erp);
                onlineCount.decrementAndGet();
            }
            log.info("有一连接关闭:{},当前在线人数为:{}", this.erp, onlineCount.get());
        } catch (Exception e) {
            log.error("连接关闭错误,错误原因{}", e.getMessage(), e);
        }
    }

    /**
     * 收到客户端消息后调用的方法
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("服务端收到客户端[{}]的消息:{}", this.erp, message);
        //心跳机制
        if (message.equals("ping")) {
            this.sendMessage("pong", session);
        }
    }

    @OnError
    public void onError(Session session, Throwable error) {
        log.error("Socket:{},发生错误,错误原因{}", erp, error.getMessage(), error);
        try {
            session.close();
        } catch (Exception e) {
            log.error("onError.Exception{}", e.getMessage(), e);
        }
    }

    /**
     * 指定发送消息
     */
    public void sendMessage(String message, Session session) {
        log.info("服务端给客户端[{}]发送消息{}", this.erp, message);
        try {
            session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            log.error("{}发送消息发生异常,异常原因{}", this.erp, message);
        }
    }

    /**
     * 群发消息
     */
    public void sendMessage(String message) {
        for (Map.Entry<String, ChickenSocket> sessionEntry : clients.entrySet()) {
            String erp = sessionEntry.getKey();
            ChickenSocket socket = sessionEntry.getValue();
            Session session = socket.session;
            log.info("服务端给客户端[{}]发送消息{}", erp, message);
            try {
                session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                log.error("{}发送消息发生异常,异常原因{}", this.erp, message);
            }
        }
    }

    public ChickenSocket getChickenSocket(String userName) {
        return clients.get(userName);
    }
}

客户端的使用

var ws = new WebSocket("ws://docker.local.com/chicken/socket");

ws.onopen = function(event) { 
  console.log("Connection open ..."); 
  ws.send("Hello WebSockets!");
};

ws.onmessage = function(event) {
  console.log( "Received Message: " + event.data);
  ws.close();
};

ws.onclose = function(event) {
  console.log("Connection closed.");
}; 
使用 Hugo 构建
主题 StackJimmy 设计