侧边栏壁纸
博主头像
MULY博主等级

大直若屈,大巧若拙,大辩若讷

  • 累计撰写 11 篇文章
  • 累计创建 19 个标签
  • 累计收到 1 条评论

SpringBoot整合WebSocket实践

MULY
2022-04-24 / 0 评论 / 0 点赞 / 194 阅读 / 23,328 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-04-24,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

简介

先来看下维基百科WebSocket的简介:

WebSocket是一种与HTTP不同的协议。两者都位于OSI模型应用层,并且都依赖于传输层的TCP协议。 虽然它们不同,但是RFC 6455中规定:it is designed to work over HTTP ports 80 and 443 as well as to support HTTP proxies and intermediaries(WebSocket通过HTTP端口80和443进行工作,并支持HTTP代理和中介),从而使其与HTTP协议兼容。 为了实现兼容性,WebSocket握手使用HTTP Upgrade头[1]从HTTP协议更改为WebSocket协议。

WebSocket协议支持Web浏览器(或其他客户端应用程序)与Web服务器之间的交互,具有较低的开销,便于实现客户端与服务器的实时数据传输。 服务器可以通过标准化的方式来实现,而无需客户端首先请求内容,并允许消息在保持连接打开的同时来回传递。通过这种方式,可以在客户端和服务器之间进行双向持续对话。 通信通过TCP端口80或443完成,这在防火墙阻止非Web网络连接的环境下是有益的。另外,Comet之类的技术以非标准化的方式实现了类似的双向通信。

大多数浏览器都支持该协议,包括Google ChromeFirefoxSafariMicrosoft EdgeInternet ExplorerOpera

WebSocket协议规范将ws(WebSocket)和wss(WebSocket Secure)定义为两个新的统一资源标识符(URI)方案,分别对应明文和加密连接。除了方案名称和片段ID(不支持#)之外,其余的URI组件都被定义为此URI的通用语法。

使用浏览器开发人员工具,开发人员可以检查WebSocket握手以及WebSocket框架。

WebSocket用于前端(Web浏览器)和后端(Web服务器)保持长时间连接及数据实时传输。

客户端首先请求内容,并允许消息在保持连接打开的同时来回传递,通过这种方式,可以在客户端和服务器之间进行双向持续对话。

扩展:维持http长连接的几种方式有以下几种,具体可网上搜索相关实现,这里只介绍WebSocket保持长连接。

  • ajax 轮询
  • long poll 轮询
  • iframe 长连接
  • WebSocket.

以下开始SpringBoot整合WebSocket

引入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.7.9</version>
    </dependency>

开启WebSocket支持

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * 开启WebSocket支持
 *
 * @author wave-muly
 * @date 2021/6/21 下午5:01
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

会话操作接口封装

如果需要客户端服务端消息传输,WebSocket提供了javax.websocket.Session类,这里只用到其中几个方法

  • socketChannel.getBasicRemote().sendText(msg);
  • socketChannel.getAsyncRemote().sendText(msg);
  • socketChannel.isOpen();
  • socketChannel.close();

这里定义四个方法,主要是对javax.websocket.Session类操作的封装

以下是该接口的包装实现类

/**
 * socket会话操作接口
 * <p>
 * 该接口面向会话,须基于会话的通道调用。
 * 该接口支持扩展,可参考WebSocket模块中{@link com.muly.wave.socket.websocket.operator.channel}包下的类
 *
 * @author wave-muly
 * @date 2021/6/1 上午11:46
 */
public interface SocketSessionOperatorApi {

    /**
     * 写出数据,经过责任链
     *
     * @author wave-muly
     * @date 2021/6/1 上午11:48
     **/
    void writeAndFlush(Object obj);

    /**
     * 写出数据,不经过责任链
     *
     * @author wave-muly
     * @date 2021/6/1 上午11:48
     **/
    void writeToChannel(Object obj);

    /**
     * 关闭会话
     *
     * @author wave-muly
     * @date 2021/6/1 上午11:48
     **/
    void close();

    /**
     * 是否存活
     *
     * @return {@link boolean}
     * @author wave-muly
     * @date 2021/6/1 上午11:50
     **/
    boolean isInvalid();
}
import com.muly.wave.socket.api.session.SocketSessionOperatorApi;

/**
 * 对Api模块的操作类进行扩展
 * <p>
 * 暂时只写接口,SocketOperatorApi方法不够用时再对此类进行扩展
 *
 * @author wave-muly
 * @date 2021/6/1 下午3:44
 */
public interface SocketChannelExpandInterFace extends SocketSessionOperatorApi {

}
import com.alibaba.fastjson.JSON;

import javax.websocket.Session;
import java.io.IOException;

/**
 * Socket操作类实现
 * <p>
 * 简单封装Spring Boot的默认WebSocket
 *
 * @author wave-muly
 * @date 2021/6/1 下午3:41
 */
public class WaveSocketOperator implements SocketChannelExpandInterFace {

    /**
     * 实际操作的通道
     */
    private Session socketChannel;

    public WaveSocketOperator(Session socketChannel) {
        this.socketChannel = socketChannel;
    }

    @Override
    public void writeAndFlush(Object obj) {
        try {
            if (socketChannel.isOpen()) {
                socketChannel.getBasicRemote().sendText(JSON.toJSONString(obj));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void writeToChannel(Object obj) {
        if (socketChannel.isOpen()) {
            socketChannel.getAsyncRemote().sendText(JSON.toJSONString(obj));
        }
    }

    @Override
    public void close() {
        try {
            if (socketChannel.isOpen()) {
                socketChannel.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public boolean isInvalid() {
        return socketChannel.isOpen();
    }
}

SocketSession

该类持有会话操作接口SocketSessionOperatorApi

import com.muly.wave.socket.api.session.SocketSessionOperatorApi;
import lombok.Data;

/**
 * Socket会话
 *
 * @author wave-muly
 * @date 2021/6/1 上午11:28
 */
@Data
public class SocketSession<T extends SocketSessionOperatorApi> {

    /**
     * 会话ID,每一个新建的会话都有(目前使用通道ID)
     */
    private String sessionId;

    /**
     * 会话唯一标识
     */
    private String userId;

    /**
     * 该会话监听的消息类型
     */
    private String messageType;

    /**
     * token信息
     */
    private String token;

    /**
     * 连接时间
     */
    private Long connectionTime;

    /**
     * 最后活跃时间
     */
    private Long lastActiveTime;

    /**
     * 操作API
     */
    private T socketOperatorApi;

    /**
     * 自定义数据
     */
    private Object data;

}

接下来再定义个存储SocketSession的会话中心类

SessionCenter会话中心

import cn.hutool.core.util.ObjectUtil;
import com.muly.wave.socket.api.session.pojo.SocketSession;
import com.muly.wave.socket.websocket.operator.channel.WaveSocketOperator;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * 会话中心
 * <p>
 * 维护所有的会话
 *
 * @author wave-muly
 * @date 2021/6/1 下午1:43
 */
public class SessionCenter {

    /**
     * 所有用户会话维护
     */
    private static ConcurrentMap<String, List<SocketSession<WaveSocketOperator>>> socketSessionMap = new ConcurrentHashMap<>();

    /**
     * 获取维护的所有会话
     *
     * @return {@link ConcurrentMap< String, SocketSession< WaveSocketOperator >>}
     * @author wave-muly
     * @date 2021/6/1 下午2:13
     **/
    public static ConcurrentMap<String, List<SocketSession<WaveSocketOperator>>> getSocketSessionMap() {
        return socketSessionMap;
    }

    /**
     * 根据用户ID获取会话信息列表
     *
     * @param userId 用户ID
     * @return {@link SocketSession <GunsSocketOperator>}
     * @author wave-muly
     * @date 2021/6/1 下午1:48
     **/
    public static List<SocketSession<WaveSocketOperator>> getSessionByUserId(String userId) {
        return socketSessionMap.get(userId);
    }

    /**
     * 根据用户ID和消息类型获取会话信息列表
     *
     * @param userId 用户ID
     * @return {@link SocketSession <GunsSocketOperator>}
     * @author wave-muly
     * @date 2021/6/1 下午1:48
     **/
    public static List<SocketSession<WaveSocketOperator>> getSessionByUserIdAndMsgType(String userId) {
        return socketSessionMap.get(userId);
    }

    /**
     * 根据会话ID获取会话信息
     *
     * @param sessionId 会话ID
     * @return {@link SocketSession <GunsSocketOperator>}
     * @author wave-muly
     * @date 2021/6/1 下午1:48
     **/
    public static SocketSession<WaveSocketOperator> getSessionBySessionId(String sessionId) {
        for (List<SocketSession<WaveSocketOperator>> values : socketSessionMap.values()) {
            for (SocketSession<WaveSocketOperator> session : values) {
                if (sessionId.equals(session.getSessionId())) {
                    return session;
                }
            }
        }
        return null;
    }

    /**
     * 设置会话
     *
     * @param socketSession 会话详情
     * @author wave-muly
     * @date 2021/6/1 下午1:49
     **/
    public static void addSocketSession(SocketSession<WaveSocketOperator> socketSession) {
        List<SocketSession<WaveSocketOperator>> socketSessions = socketSessionMap.get(socketSession.getUserId());
        if (ObjectUtil.isEmpty(socketSessions)) {
            socketSessions = Collections.synchronizedList(new ArrayList<>());
            socketSessionMap.put(socketSession.getUserId(), socketSessions);
        }
        socketSessions.add(socketSession);
    }

    /**
     * 连接关闭
     *
     * @param sessionId 会话ID
     * @author wave-muly
     * @date 2021/6/1 下午3:25
     **/
    public static void closed(String sessionId) {
        Set<Map.Entry<String, List<SocketSession<WaveSocketOperator>>>> entrySet = socketSessionMap.entrySet();
        Iterator<Map.Entry<String, List<SocketSession<WaveSocketOperator>>>> iterator = entrySet.iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, List<SocketSession<WaveSocketOperator>>> next = iterator.next();
            List<SocketSession<WaveSocketOperator>> value = next.getValue();
            if (ObjectUtil.isNotEmpty(value)) {
                value.removeIf(GunsSocketOperatorSocketSession -> GunsSocketOperatorSocketSession.getSessionId().equals(sessionId));
            }
        }
    }
}

这个类的主要作用是所有用户会话的维护,存储在支持并发访问的ConcurrentHashMap里,定义的方法主要是用来操作map来实现会话的加入、关闭及查询。

Socket通用操作类

有了操作javax.websocket.Session类及会话中心,现在封装一个服务端通过会话中心拿到Session给客户端发送消息的接口.

定义一个SocketOperatorApi接口:

import com.muly.wave.socket.api.exception.SocketException;
import com.muly.wave.socket.api.message.SocketMsgCallbackInterface;

/**
 * Socket通用操作类
 * <p>
 * 可通过该类直接发送消息,每一个Socket实现的子模块必须实现该接口,以提供统一的操作API
 *
 * @author wave-muly
 * @date 2021/6/2 上午9:25
 */
public interface SocketOperatorApi {

    /**
     * 发送消息到指定会话
     *
     * @param msgType   消息类型可参考{@link com.muly.wave.socket.api.enums}枚举类
     * @param sessionId 会话ID
     * @param msg       消息体
     * @author wave-muly
     * @date 2021/6/11 下午2:19
     **/
    void sendMsgOfUserSessionBySessionId(String msgType, String sessionId, Object msg) throws SocketException;

    /**
     * 发送消息到指定用户的所有会话
     * <p>
     * 如果用户同一个消息类型建立了多个会话,则统一全部发送
     *
     * @param msgType 消息类型可参考{@link com.muly.wave.socket.api.enums}枚举类
     * @param userId  用户ID
     * @param msg     消息体
     * @author wave-muly
     * @date 2021/6/2 上午9:35
     **/
    void sendMsgOfUserSession(String msgType, String userId, Object msg) throws SocketException;

    /**
     * 发送消息到所有会话
     *
     * @param msgType 消息类型可参考{@link com.muly.wave.socket.api.enums}枚举类
     * @param msg     消息体
     * @author wave-muly
     * @date 2021/6/2 上午9:35
     **/
    void sendMsgOfAllUserSession(String msgType, Object msg);

    /**
     * 根据会话id关闭会话
     *
     * @param socketId 会话id
     * @author wave-muly
     * @date 2021/8/13 16:00
     **/
    void closeSocketBySocketId(String socketId);

    /**
     * 监听指定类型消息
     * <p>
     * 1.该方法每调用一次即注册一个监听,同一个消息类型多次调用只有最后一次生效
     *
     * @param msgType           消息类型可参考{@link com.muly.wave.socket.api.enums}枚举类
     * @param callbackInterface 消息监听器
     * @author wave-muly
     * @date 2021/6/2 上午9:54
     **/
    void msgTypeCallback(String msgType, SocketMsgCallbackInterface callbackInterface);
}

以下为该接口的实现类,该实现类主要用于服务端向客户端发送消息

import cn.hutool.core.util.ObjectUtil;
import com.muly.wave.socket.api.SocketOperatorApi;
import com.muly.wave.socket.api.exception.SocketException;
import com.muly.wave.socket.api.exception.enums.SocketExceptionEnum;
import com.muly.wave.socket.api.message.SocketMsgCallbackInterface;
import com.muly.wave.socket.api.session.pojo.SocketSession;
import com.muly.wave.socket.websocket.message.SocketMessageCenter;
import com.muly.wave.socket.websocket.operator.channel.WaveSocketOperator;
import com.muly.wave.socket.websocket.pojo.WebSocketMessageDTO;
import com.muly.wave.socket.websocket.session.SessionCenter;
import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.List;

/**
 * WebSocket操作实现类
 * <p>
 * 如果是Spring boot项目,通过注入SocketOperatorApi接口操作socket,需将本来交给Spring管理
 *
 * @author wave-muly
 * @date 2021/6/2 上午10:41
 */
@Component
public class WebSocketOperator implements SocketOperatorApi {

    @Override
    public void sendMsgOfUserSessionBySessionId(String msgType, String sessionId, Object msg) throws SocketException {
        SocketSession<WaveSocketOperator> session = SessionCenter.getSessionBySessionId(sessionId);
        if (ObjectUtil.isEmpty(session)) {
            throw new SocketException(SocketExceptionEnum.SESSION_NOT_EXIST);
        }
        WebSocketMessageDTO webSocketMessageDTO = new WebSocketMessageDTO();
        webSocketMessageDTO.setData(msg);
        webSocketMessageDTO.setServerMsgType(msgType);
        session.getSocketOperatorApi().writeAndFlush(webSocketMessageDTO);
    }

    @Override
    public void sendMsgOfUserSession(String msgType, String userId, Object msg) throws SocketException {
        // 根据用户ID获取会话
        List<SocketSession<WaveSocketOperator>> socketSessionList = SessionCenter.getSessionByUserIdAndMsgType(userId);
        if (ObjectUtil.isEmpty(socketSessionList)) {
            throw new SocketException(SocketExceptionEnum.SESSION_NOT_EXIST);
        }
        WebSocketMessageDTO webSocketMessageDTO = new WebSocketMessageDTO();
        webSocketMessageDTO.setData(msg);
        webSocketMessageDTO.setServerMsgType(msgType);
        for (SocketSession<WaveSocketOperator> session : socketSessionList) {
            // 发送内容
            session.getSocketOperatorApi().writeAndFlush(webSocketMessageDTO);
        }
    }

    @Override
    public void sendMsgOfAllUserSession(String msgType, Object msg) {
        Collection<List<SocketSession<WaveSocketOperator>>> values = SessionCenter.getSocketSessionMap().values();
        WebSocketMessageDTO webSocketMessageDTO = new WebSocketMessageDTO();
        webSocketMessageDTO.setData(msg);
        webSocketMessageDTO.setServerMsgType(msgType);
        for (List<SocketSession<WaveSocketOperator>> sessions : values) {
            for (SocketSession<WaveSocketOperator> session : sessions) {
                // 找到该类型的通道
                if (session.getMessageType().equals(msgType)) {
                    session.getSocketOperatorApi().writeAndFlush(webSocketMessageDTO);
                }
            }
        }
    }

    @Override
    public void closeSocketBySocketId(String socketId) {
        SessionCenter.closed(socketId);
    }

    @Override
    public void msgTypeCallback(String msgType, SocketMsgCallbackInterface callbackInterface) {
        SocketMessageCenter.setMessageListener(msgType, callbackInterface);
    }
}

该接口涉及到一个WebSocket交互通用对象

import com.muly.wave.socket.api.SocketOperatorApi;
import lombok.Data;

/**
 * WebSocket交互通用对象
 *
 * @author wave-muly
 * @date 2021/6/1 下午2:56
 */
@Data
public class WebSocketMessageDTO {

    /**
     * 服务端发送的消息类型(客户端如果需要监听该消息类型,注册对应的消息处理器即可)
     */
    private String serverMsgType;

    /**
     * 客户端发送的消息类型(服务端需要处理的消息类型)
     */
    private String clientMsgType;

    /**
     * 目标Id
     */
    private String toUserId;

    /**
     * 发送者ID
     */
    private String formUserId;

    /**
     * 具体发送的数据
     */
    private Object data;

}

鉴于以上多次提到了,msgType这个属性,这个类型定义了消息类型,用于区分服务端与客户端之间消息通讯的类型

以下定义了三种类型的枚举方便管理,

  • SystemMessageTypeEnum

    该枚举适用于服务端监听首次连接和断开连接,以S00开头

    import lombok.Getter;
    
    /**
     * 服务端监听器枚举
     * <p>
     * 说明:该枚举适用于服务端监听首次连接和断开连接
     *
     * @author wave-muly
     * @date 2021/6/3 上午9:14
     */
    @Getter
    public enum SystemMessageTypeEnum {
    
        /**
         * 监听首次连接
         */
        SYS_LISTENER_ONOPEN("S00001", "监听首次连接"),
    
        /**
         * 监听断开连接
         */
        SYS_LISTENER_ONCLOSE("S00002", "监听断开连接"),
    
        /**
         * 监听异常信息
         */
        SYS_LISTENER_ONERROR("S00003", "监听异常信息");
    
        private final String code;
    
        private final String name;
    
        SystemMessageTypeEnum(String code, String name) {
            this.code = code;
            this.name = name;
        }
    }
    
  • ServerMessageTypeEnum

    该枚举适用于服务器推送给客户端消息时使用,以100开头

    import lombok.Getter;
    
    /**
     * 服务端消息类型枚举
     * <p>
     * 说明:该枚举适用于服务器推送给客户端消息时使用
     *
     * @author wave-muly
     * @date 2021/6/3 上午9:14
     */
    @Getter
    public enum ServerMessageTypeEnum {
    
        /**
         * 系统通知消息类型
         */
        SYS_NOTICE_MSG_TYPE("100001", "系统通知消息类型"),
    
        /**
         * 连接消息回复
         */
        SYS_REPLY_MSG_TYPE("100002", "连接消息回复");
    
        private final String code;
    
        private final String name;
    
        ServerMessageTypeEnum(String code, String name) {
            this.code = code;
            this.name = name;
        }
    }
    
  • ClientMessageTypeEnum

    该枚举适用于服务器接收到客户端发来的消息,判断消息类型时使用,以200开头

    import lombok.Getter;
    
    /**
     * 客户端消息类型枚举
     * <p>
     * 说明:该枚举适用于服务器接收到客户端发来的消息,判断消息类型时使用
     *
     * @author wave-muly
     * @date 2021/6/3 上午9:14
     */
    @Getter
    public enum ClientMessageTypeEnum {
    
        /**
         * 用户连接鉴权
         */
        USER_CONNECTION_AUTHENTICATION("200000", "用户连接鉴权"),
    
        /**
         * 用户心跳消息类型
         */
        USER_HEART("299999", "用户心跳消息类型");
    
        private final String code;
    
        private final String name;
    
        ClientMessageTypeEnum(String code, String name) {
            this.code = code;
            this.name = name;
        }
    }
    

    特殊说明一下serverMsgTypeclientMsgType的区别
    1.serverMsgType字段是服务端发送给客户端的字段
    2.clientMsgType字段是客户端发送给服务器的字段
    例如:客户端发送给服务器一个心跳消息(type:299999),服务端如果需要处理该消息就注册一个该消息的监听器,那么收到消息服务端会把消息推送给对应的监听器,接口见{@link SocketOperatorApi#msgTypeCallback}

上述四个方法中,还涉及到一个方法void msgTypeCallback(String msgType, SocketMsgCallbackInterface callbackInterface);

其中SocketMsgCallbackInterface为Socket消息接收回调接口,根据枚举类型注入相应的消息处理方法,

import com.muly.wave.socket.api.session.pojo.SocketSession;

/**
 * Socket消息接收回调接口
 *
 * @author wave-muly
 * @date 2021/6/2 上午9:53
 */
@FunctionalInterface
public interface SocketMsgCallbackInterface {

    /**
     * 收到消息的回调
     *
     * @param msgType       消息类型
     * @param msg           消息体
     * @param socketSession 本次通信的会话
     * @author wave-muly
     * @date 2021/6/2 上午9:51
     **/
    void callback(String msgType, Object msg, SocketSession socketSession);
}

该接口是一个函数式接口,可以使用java8的lambda表达式注入回调方法。

再次构造个消息回调方法中心,用于存储各种消息类型的消息回调方法处理。messageListenerMap的key即为msgType

import com.muly.wave.socket.api.message.SocketMsgCallbackInterface;
import java.util.HashMap;
import java.util.Map;

/**
 * 会话消息中心
 * <p>
 * 维护所有消息类型对应的处理器
 *
 * @author wave-muly
 * @date 2021/6/1 下午2:20
 */
public class SocketMessageCenter {

    /**
     * 所有消息监听器维护
     */
    private static Map<String, SocketMsgCallbackInterface> messageListenerMap = new HashMap<>();

    /**
     * 设置消息类型的监听器
     *
     * @param msgType  消息类型
     * @param listener 监听器
     * @author wave-muly
     * @date 2021/6/1 下午2:25
     **/
    public static void setMessageListener(String msgType, SocketMsgCallbackInterface listener) {
        messageListenerMap.put(msgType, listener);
    }

    /**
     * 获取消息监听器
     *
     * @param msgType 消息类型
     * @return {@link SocketMsgCallbackInterface}
     * @author wave-muly
     * @date 2021/6/1 下午2:26
     **/
    public static SocketMsgCallbackInterface getSocketMsgCallbackInterface(String msgType) {
        return messageListenerMap.get(msgType);
    }
}

消息监听处理器WebSocketServer

该类为WebSocket的入口类,用于处理和客户端的连接关闭及接收客户端发送的消息

import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.muly.wave.socket.api.enums.ClientMessageTypeEnum;
import com.muly.wave.socket.api.enums.ServerMessageTypeEnum;
import com.muly.wave.socket.api.enums.SystemMessageTypeEnum;
import com.muly.wave.socket.api.message.SocketMsgCallbackInterface;
import com.muly.wave.socket.api.session.pojo.SocketSession;
import com.muly.wave.socket.websocket.message.SocketMessageCenter;
import com.muly.wave.socket.websocket.operator.channel.WaveSocketOperator;
import com.muly.wave.socket.websocket.pojo.WebSocketMessageDTO;
import com.muly.wave.socket.websocket.session.SessionCenter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

/**
 * 消息监听处理器
 *
 * @author wave-muly
 * @date 2021/6/1 下午2:35
 */
@Slf4j
@ServerEndpoint(value = "/webSocket/{token}")
@Component
public class WebSocketServer {

    /**
     * 连接建立调用的方法
     * <p>
     * 暂时无用,需要在建立连接的时候做一些事情的话可以修改这里
     *
     * @param session 会话信息
     * @author wave-muly
     * @date 2021/6/21 下午5:14
     **/
    @OnOpen
    public void onOpen(Session session, @PathParam("token") String token) {
        String userId = token;
        
        // todo 这里可以加上token校验处理获取用户信息
//        try {
//            // 解析用户信息
//            DefaultJwtPayload defaultPayload = JwtContext.me().getDefaultPayload(token);
//            userId = defaultPayload.getUserId().toString();
//        } catch (io.jsonwebtoken.JwtException e) {
//            try {
//                session.close();
//            } catch (IOException ioException) {
//                ioException.printStackTrace();
//            }
//        }

        // 操作api包装
        WaveSocketOperator gunsSocketOperator = new WaveSocketOperator(session);

        // 回复消息
        WebSocketMessageDTO replyMsg = new WebSocketMessageDTO();
        replyMsg.setServerMsgType(ServerMessageTypeEnum.SYS_REPLY_MSG_TYPE.getCode());
        replyMsg.setToUserId(userId);

        // 创建会话对象
        SocketSession<WaveSocketOperator> socketSession = new SocketSession<>();
        try {
            // 设置回复内容
            replyMsg.setData(session.getId());
            socketSession.setSessionId(session.getId());
            socketSession.setUserId(userId);
            socketSession.setSocketOperatorApi(gunsSocketOperator);
            socketSession.setToken(token);
            socketSession.setConnectionTime(System.currentTimeMillis());

            // 维护会话
            SessionCenter.addSocketSession(socketSession);
        } finally {
            // 回复消息
            gunsSocketOperator.writeAndFlush(replyMsg);

            // 触发首次连接回调
            SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(SystemMessageTypeEnum.SYS_LISTENER_ONOPEN.getCode());
            if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
                // 触发回调
                socketMsgCallbackInterface.callback(SystemMessageTypeEnum.SYS_LISTENER_ONOPEN.getCode(), null, socketSession);
            }
        }

    }

    /**
     * 连接关闭调用的方法
     *
     * @param session 会话信息
     * @author wave-muly
     * @date 2021/6/21 下午5:14
     **/
    @OnClose
    public void onClose(Session session) {
        try {
            SocketSession<WaveSocketOperator> socketSession = SessionCenter.getSessionBySessionId(session.getId());
            // 触发首次连接回调
            SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(SystemMessageTypeEnum.SYS_LISTENER_ONCLOSE.getCode());
            if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
                // 触发回调
                socketMsgCallbackInterface.callback(SystemMessageTypeEnum.SYS_LISTENER_ONCLOSE.getCode(), null, socketSession);
            }
        } finally {
            SessionCenter.closed(session.getId());
        }
    }

    /**
     * 收到消息调用的方法
     *
     * @param message        接收到的消息
     * @param socketChannel 会话信息
     * @author wave-muly
     * @date 2021/6/21 下午5:14
     **/
    @OnMessage
    public void onMessage(String message, Session socketChannel) {

        // 转换为Java对象
        WebSocketMessageDTO webSocketMessageDTO = JSON.parseObject(message, WebSocketMessageDTO.class);

        // 维护通道是否已初始化
        SocketSession<WaveSocketOperator> socketSession = SessionCenter.getSessionBySessionId(socketChannel.getId());

        // 心跳包
        if (ObjectUtil.isNotEmpty(socketSession) && ClientMessageTypeEnum.USER_HEART.getCode().equals(webSocketMessageDTO.getClientMsgType())) {
            // 更新会话最后活跃时间
            if (ObjectUtil.isNotEmpty(socketSession)) {
                socketSession.setLastActiveTime(System.currentTimeMillis());
            }
            return;
        }

        // 用户ID为空不处理直接跳过
        if (ObjectUtil.isEmpty(webSocketMessageDTO.getFormUserId())) {
            return;
        }

        // 会话建立成功执行业务逻辑
        if (ObjectUtil.isNotEmpty(socketSession)) {

            // 更新最后会话时间
            socketSession.setLastActiveTime(System.currentTimeMillis());

            // 找到该消息的处理器
            SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(webSocketMessageDTO.getClientMsgType());
            if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
                // 触发回调
                socketMsgCallbackInterface.callback(webSocketMessageDTO.getClientMsgType(), webSocketMessageDTO, socketSession);
            } else {
                socketChannel.getAsyncRemote().sendText("{\"serverMsgType\":\"404\"}");
            }
        }
    }

    /**
     * 会话发送异常调用的方法
     *
     * @param session 会话信息
     * @param error    错误信息
     * @author wave-muly
     * @date 2021/6/21 下午5:14
     **/
    @OnError
    public void onError(Session session, Throwable error) {
        SocketSession<WaveSocketOperator> socketSession = SessionCenter.getSessionBySessionId(session.getId());
        // 触发首次连接回调
        SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(SystemMessageTypeEnum.SYS_LISTENER_ONERROR.getCode());
        if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
            // 触发回调
            socketMsgCallbackInterface.callback(SystemMessageTypeEnum.SYS_LISTENER_ONERROR.getCode(), error, socketSession);
        }
        log.error("session 发生错误:" + session.getId());
    }
}

上述类中存在处理逻辑,定义了首次连接回调的方法,这里会去找SocketMessageCenter方法操作中心里的SystemMessageTypeEnum.SYS_LISTENER_ONOPEN方法

			// 触发首次连接回调
            SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(SystemMessageTypeEnum.SYS_LISTENER_ONOPEN.getCode());
            if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
                // 触发回调
                socketMsgCallbackInterface.callback(SystemMessageTypeEnum.SYS_LISTENER_ONOPEN.getCode(), null, socketSession);
            }

以下为示例,项目启动时注入一个首次连接回调处理方法:在msgType为SystemMessageTypeEnum.SYS_LISTENER_ONOPEN(S00001)时会调用该接口

/**
 * @Author: wave-muly
 * @Date: 2021/10/9 14:09
 */
@Slf4j
@Component
@Order(99)
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class MessageListener implements CommandLineRunner {
    private final SocketOperatorApi socketOperatorApi;
    @Override
    public void run(String... args) throws Exception {
        // 项目启动默认注册了 监听首次连接的监听器 在客户端连接时会调用该监听器
        socketOperatorApi.msgTypeCallback(SystemMessageTypeEnum.SYS_LISTENER_ONOPEN.getCode(), (msgType, msg, socketSession)->{
            log.info("connection success");
            socketSession.getSocketOperatorApi().writeAndFlush("connection success");
        });
    }
}

再定义一个服务端向客户端发送消息的示例,这里定义一个Controller

import com.muly.wave.socket.api.SocketOperatorApi;
import com.muly.wave.socket.api.enums.ServerMessageTypeEnum;
import com.muly.wave.socket.api.exception.SocketException;
import com.muly.wave.socket.websocket.pojo.SysMessage;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;

import java.util.Date;


/**
 * @Author: wave-muly
 * @Date: 2021/10/9 11:28
 */
@RequestMapping("/api/v1/sys")
@RestController
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class SysMessageController {

    private final SocketOperatorApi socketOperatorApi;

    @PostMapping(value = "/notice/{userId}", produces = MediaType.APPLICATION_JSON_VALUE)
    public void notice(@PathVariable(name = "userId") String userId, @RequestBody String messageContent) {
        SysMessage item = new SysMessage();
        item.setReceiveUserId(Long.valueOf(userId));
        item.setMessageContent(messageContent);
        item.setMessageType(ServerMessageTypeEnum.SYS_NOTICE_MSG_TYPE.getCode());
        item.setMessageSendTime(new Date());
        try {
            socketOperatorApi.sendMsgOfUserSession(ServerMessageTypeEnum.SYS_NOTICE_MSG_TYPE.getCode(), item.getReceiveUserId().toString(), item);
        } catch (SocketException socketException) {
            // 该用户不在线
        }
    }

}

以下为浏览器测试 [WebSocket在线测试](WebSocket在线测试_在线模拟websocket请求工具 (jsonin.com))

image-20220424165738022

至此SpringBoot整合WebSocket完成,后续项目集成,只需要定义好msgType消息类型、处理token校验逻辑即可。
代码详见 wave-socket

0

评论区