使用 tio-boot 搭建 WebSocket 服务
简介
随着实时网络应用的普及,如即时聊天、在线游戏和实时数据推送等,WebSocket 技术越来越受到开发者的青睐。它允许客户端和服务器之间进行全双工、低延迟的通信,从而实现更加流畅的用户体验。
本文将详细介绍如何使用 tio-boot
框架快速搭建一个功能完善的 WebSocket 服务器。我们将从 WebSocket 的基本原理开始,逐步讲解配置路由、实现消息处理器、处理客户端连接和消息广播等关键步骤,最后通过实际测试验证服务器的功能。
WebSocket 原理概述
什么是 WebSocket?
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,旨在解决传统 HTTP 协议在实时通信场景下的不足。它允许服务器主动向客户端推送数据,客户端也可以随时向服务器发送消息,实现真正的实时双向通信。
WebSocket 的工作流程
建立连接(握手):
- 客户端发送一个带有特殊头部的 HTTP 请求,表示希望升级协议到 WebSocket。
- 服务器接收到请求后,如果支持 WebSocket,则返回一个包含升级协议的响应,双方确认切换到 WebSocket 协议。
数据传输:
- 连接建立后,客户端和服务器可以在不经过额外握手的情况下随时发送数据。
- 数据以帧(frame)的形式传输,支持文本和二进制数据。
关闭连接:
- 任意一方都可以发送关闭帧来终止连接。
- 连接关闭后,双方需重新握手才能建立新的连接。
WebSocket 的优势
- 实时性强:支持服务器主动推送数据,减少了客户端轮询的延迟和资源消耗。
- 低开销:一次握手后保持连接,减少了 HTTP 轮询带来的额外开销。
- 全双工通信:客户端和服务器可以同时发送数据,通信更加灵活高效。
- 适用广泛:适用于聊天应用、实时通知、在线游戏、股票行情等多种场景。
使用 tio-boot 搭建 WebSocket 服务器
tio-boot
内置了 tio-websocket-server
库。下面我们将使用 tio-boot
搭建一个简单的 WebSocket 服务器,实现基本的群聊功能。
功能概述
我们将实现以下功能:
- WebSocket 握手处理:处理客户端的握手请求,完成协议升级,并获取用户信息。
- 用户绑定与分组:在握手成功后,将用户绑定到特定的群组,方便实现消息广播。
- 消息处理与广播:接收并处理客户端发送的文本消息,然后将消息广播给同一群组内的所有用户。
- 连接管理与关闭:管理客户端连接的生命周期,处理连接的建立和关闭,并进行相应的资源清理。
配置 WebSocket 路由
首先,我们需要定义 WebSocket 的路由,将特定的路径映射到对应的消息处理器。
示例代码
import com.litongjava.annotation.AConfiguration;
import com.litongjava.annotation.Initialization;
import com.litongjava.tio.boot.server.TioBootServer;
import com.litongjava.tio.boot.websocket.WebSocketRouter;
@AConfiguration
public class WebSocketConfig {
@Initialization
public void config() {
WebSocketRouter router = TioBootServer.me().getWebSocketRouter();
router.add("/hello", new HelloWebSocketHandler());
}
}
代码解释
@AConfiguration
:标注当前类为配置类。@Initialization
:标注方法在tio-boot
启动时执行。WebSocketRouter
:用于定义 WebSocket 路由,将 URL 路径与对应的处理器进行绑定。TioBootServer.me().getWebSocketRouter().add()
:将配置好的路由注册到TioBootServer
,使其在服务器启动时生效。
单发 WebSocket 示例
下面是一个简单的 WebSocket 处理器示例,实现单一消息的接收和回复功能。
package com.litongjava.websocket.handler;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.common.HttpResponse;
import com.litongjava.tio.websocket.common.WebSocketRequest;
import com.litongjava.tio.websocket.common.WebSocketResponse;
import com.litongjava.tio.websocket.common.WebSocketSessionContext;
import com.litongjava.tio.websocket.server.handler.IWebSocketHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HelloWebSocketHandler implements IWebSocketHandler {
public static final String CHARSET = "utf-8";
/**
* 握手成功后执行,绑定群组并通知其他用户
*/
@Override
public HttpResponse handshake(HttpRequest httpRequest, HttpResponse response, ChannelContext channelContext) throws Exception {
log.info("请求信息: {}", httpRequest);
return response;
}
/**
* 握手完成后执行
*/
@Override
public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
log.info("握手完成: {}", httpRequest);
}
/**
* 处理连接关闭请求,进行资源清理
*/
@Override
public Object onClose(WebSocketRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
Tio.remove(channelContext, "客户端主动关闭连接");
return null;
}
/**
* 处理二进制消息
*/
@Override
public Object onBytes(WebSocketRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
log.info("收到二进制消息,大小: {} bytes", bytes.length);
return null;
}
/**
* 处理文本消息
*/
@Override
public Object onText(WebSocketRequest wsRequest, String text, ChannelContext channelContext) throws Exception {
WebSocketSessionContext wsSessionContext = (WebSocketSessionContext) channelContext.get();
String path = wsSessionContext.getHandshakeRequest().getRequestLine().path;
log.info("路径:{},收到消息:{}", path, text);
String message = "{user_id:'" + channelContext.userid + "',message:'" + text + "'}";
WebSocketResponse wsResponse = WebSocketResponse.fromText(message, CHARSET);
// 发送消息
Tio.send(channelContext, wsResponse);
return null; // 不需要额外的返回值
}
}
群发 WebSocket 示例
WebSocket 处理器
HelloWebSocketHandler
是核心的消息处理器,负责处理各种 WebSocket 事件,如握手、消息接收、连接关闭等。
示例代码
package com.litongjava.websocket.handler;
import java.util.Objects;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.common.HttpResponse;
import com.litongjava.tio.websocket.common.WebSocketRequest;
import com.litongjava.tio.websocket.common.WebSocketResponse;
import com.litongjava.tio.websocket.common.WebSocketSessionContext;
import com.litongjava.tio.websocket.server.handler.IWebSocketHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HelloWebSocketHandler implements IWebSocketHandler {
/**
* 群组ID,用于群聊
*/
public static final String GROUP_ID = "group-01";
public static final String CHARSET = "utf-8";
/**
* 握手成功后执行,绑定群组并通知其他用户
*/
@Override
public HttpResponse handshake(HttpRequest request, HttpResponse response, ChannelContext channelContext) throws Exception {
String clientIp = request.getClientIp();
String name = request.getParam("name");
Tio.bindUser(channelContext, name);
log.info("收到来自 {} 的 WebSocket 握手请求:{}", clientIp, request.toString());
return response;
}
/**
* 握手完成后执行
*/
@Override
public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
// 绑定到群组,便于消息广播
Tio.bindGroup(channelContext, GROUP_ID);
// 获取当前在线用户数量
int count = Tio.getAll(channelContext.tioConfig).getObj().size();
String message = "{name:'admin',message:'" + channelContext.userid + " 进入聊天室,当前在线人数:" + count + "'}";
WebSocketResponse wsResponse = WebSocketResponse.fromText(message, CHARSET);
// 广播消息给群组内所有用户
Tio.sendToGroup(channelContext.tioConfig, GROUP_ID, wsResponse);
}
/**
* 处理二进制消息
*/
@Override
public Object onBytes(WebSocketRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
// 当前示例未处理二进制消息,移除连接
Tio.remove(channelContext, "客户端发送二进制消息,连接关闭");
return null;
}
/**
* 处理连接关闭请求,进行资源清理
*/
@Override
public Object onClose(WebSocketRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
// 资源清理由 Tio 自动处理,此处可添加额外逻辑
log.info("连接关闭,用户:{}", channelContext.userid);
return null;
}
/**
* 处理文本消息
*/
@Override
public Object onText(WebSocketRequest wsRequest, String text, ChannelContext channelContext) throws Exception {
WebSocketSessionContext wsSessionContext = (WebSocketSessionContext) channelContext.get();
String path = wsSessionContext.getHandshakeRequest().getRequestLine().path;
log.info("路径:{},收到消息:{}", path, text);
if ("心跳内容".equals(text)) {
return null; // 忽略心跳消息
}
String message = "{name:'" + channelContext.userid + "',message:'" + text + "'}";
WebSocketResponse wsResponse = WebSocketResponse.fromText(message, CHARSET);
// 广播消息给群组内所有用户
Tio.sendToGroup(channelContext.tioConfig, GROUP_ID, wsResponse);
return null; // 不需要额外的返回值
}
}
代码解释
握手处理(handshake
方法)
- 获取客户端信息:通过
request.getClientIp()
获取客户端 IP 地址,通过request.getParam("name")
获取用户名。 - 用户绑定:使用
Tio.bindUser()
方法将用户与当前连接绑定,便于后续的消息推送和管理。 - 日志记录:记录握手请求的相关信息,方便调试和监控。
握手后处理(onAfterHandshaked
方法)
- 群组绑定:使用
Tio.bindGroup()
方法将用户连接绑定到特定的群组,实现消息的分组广播。 - 在线人数统计:通过
Tio.getAll(channelContext.tioConfig).getObj().size()
获取当前在线用户数量。 - 欢迎消息广播:构建欢迎消息,使用
Tio.sendToGroup()
方法将消息广播给群组内所有用户。
文本消息处理(onText
方法)
- 消息接收与日志记录:接收客户端发送的文本消息,并记录日志。
- 心跳消息过滤:对特定的心跳内容进行过滤,不进行处理。
- 消息广播:构建包含发送者姓名和消息内容的消息对象,使用
Tio.sendToGroup()
方法进行广播。
二进制消息处理(onBytes
方法)
- 当前示例未处理二进制消息:方法调用
Tio.remove()
移除连接,并返回null
。可根据实际需求添加处理逻辑。
连接关闭处理(onClose
方法)
- 资源清理:当收到关闭连接的请求时,记录日志。
Tio
框架会自动处理连接的移除和资源释放。
测试 WebSocket 服务器
在完成配置和实现后,我们需要对 WebSocket 服务器进行测试,确保各项功能正常工作。
测试工具
可以使用以下工具进行测试:
- 浏览器控制台:现代浏览器都支持 WebSocket,可以在控制台中使用 JavaScript 创建连接并发送消息。
- WebSocket 客户端工具:如
WebSocket King
、Postman
等。 - 自定义客户端:编写简单的客户端程序,使用 WebSocket 协议与服务器进行通信。
连接测试
客户端连接请求:
ws://localhost:8080/hello?name=Tong%20Li
服务器日志输出:
2024-09-01 01:02:27.619 [tio-group-3] INFO com.litongjava.websocket.handler.HelloWebSocketHandler.handshake - 收到来自 127.0.0.1 的 WebSocket 握手请求:GET /hello?name=Tong%20Li HTTP/1.1
服务器发送的欢迎消息:
{ "name": "admin", "message": "Tong Li 进入聊天室,当前在线人数:1" }
消息发送测试
客户端发送消息:
hi
服务器日志输出:
2024-09-01 01:03:56.925 [tio-group-6] INFO com.litongjava.websocket.handler.HelloWebSocketHandler.onText - 路径:/hello,收到消息:hi
服务器广播的消息:
{ "name": "Tong Li", "message": "hi" }
断开连接测试
客户端关闭连接:
- 客户端主动关闭 WebSocket 连接。
服务器日志输出:
2024-09-01 01:05:12.123 [tio-group-4] INFO com.litongjava.websocket.handler.HelloWebSocketHandler.onClose - 连接关闭,用户:Tong Li
服务器处理:
- 服务器调用
Tio.remove()
方法,清理连接并释放资源。
IWebSocketHandler 接口详解
IWebSocketHandler
接口定义了处理 WebSocket 各种事件的方法,开发者需要实现该接口以处理不同类型的 WebSocket 消息和事件。
接口源码
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.common.HttpResponse;
import com.litongjava.tio.websocket.common.WebSocketRequest;
public interface IWebSocketHandler {
/**
* 对 httpResponse 参数进行补充并返回,如果返回 null 表示不想与对方建立连接,框架会断开连接。如果返回非 null,框架会将这个对象发送给对方。
* 注意:请不要在这个方法中向对方发送任何消息,因为握手还未完成,发送消息会导致协议交互失败。
* 对于大部分业务,该方法只需要一行代码:return httpResponse;
*
* @param httpRequest 客户端的握手请求
* @param httpResponse 服务器的响应对象
* @param channelContext 当前连接的上下文信息
* @return 修改后的 HttpResponse 对象,或 null 以拒绝握手
* @throws Exception
*/
HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception;
/**
* 握手成功后触发该方法。
*
* @param httpRequest 客户端的握手请求
* @param httpResponse 服务器的响应对象
* @param channelContext 当前连接的上下文信息
* @throws Exception
*/
void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception;
/**
* 当收到 Opcode.BINARY 消息时执行该方法。也就是说,如果你的 WebSocket 是基于 BINARY 传输的,就会调用此方法。
*
* @param wsRequest WebSocket 请求对象
* @param bytes 消息的二进制内容
* @param channelContext 当前连接的上下文信息
* @return 可以是 WebSocketResponse、byte[]、ByteBuffer、String 或 null。如果是 null,框架不会回消息
* @throws Exception
*/
Object onBytes(WebSocketRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception;
/**
* 当收到 Opcode.CLOSE 时执行该方法,业务层在该方法中一般不需要写逻辑,可以为空。
*
* @param wsRequest WebSocket 请求对象
* @param bytes 消息的二进制内容
* @param channelContext 当前连接的上下文信息
* @return 可以是 WebSocketResponse、byte[]、ByteBuffer、String 或 null。如果是 null,框架不会回消息
* @throws Exception
*/
Object onClose(WebSocketRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception;
/**
* 当收到 Opcode.TEXT 消息时执行该方法。也就是说,如果你的 WebSocket 是基于 TEXT 传输的,就会调用此方法。
*
* @param wsRequest WebSocket 请求对象
* @param text 消息的文本内容
* @param channelContext 当前连接的上下文信息
* @return 可以是 WebSocketResponse、byte[]、ByteBuffer、String 或 null。如果是 null,框架不会回消息
* @throws Exception
*/
Object onText(WebSocketRequest wsRequest, String text, ChannelContext channelContext) throws Exception;
}
方法解析
handshake
方法:- 功能:处理客户端发起的 WebSocket 握手请求,决定是否同意升级协议。
- 参数说明:
HttpRequest httpRequest
:客户端的握手请求对象。HttpResponse httpResponse
:服务器的响应对象,可以在此添加必要的响应头信息。ChannelContext channelContext
:当前连接的上下文信息。
- 返回值:返回
HttpResponse
对象。如果返回null
,表示拒绝握手,服务器将断开连接。
onAfterHandshaked
方法:- 功能:在握手成功后执行,可用于初始化连接、绑定用户信息、发送欢迎消息等。
- 参数说明:
- 与
handshake
方法相同。
- 与
onBytes
方法:- 功能:处理客户端发送的二进制消息。
- 参数说明:
WebSocketRequest wsRequest
:WebSocket 请求对象。byte[] bytes
:消息的二进制内容。ChannelContext channelContext
:当前连接的上下文信息。
- 返回值:可以返回需要发送给客户端的响应消息,类型可以是
WebSocketResponse
、byte[]
、ByteBuffer
、String
或null
。返回null
表示不发送任何响应。
onClose
方法:- 功能:处理连接关闭事件,在客户端主动关闭连接或异常断开时触发。
- 参数说明:
- 与
onBytes
方法相同。
- 与
- 返回值:与
onBytes
方法相同。
onText
方法:- 功能:处理客户端发送的文本消息。
- 参数说明:
WebSocketRequest wsRequest
:WebSocket 请求对象。String text
:消息的文本内容。ChannelContext channelContext
:当前连接的上下文信息。
- 返回值:与
onBytes
方法相同。
总结
本文详细介绍了如何使用 tio-boot
框架搭建一个简单且功能完善的 WebSocket 服务器。从 WebSocket 的基本原理入手,逐步讲解了配置路由、实现消息处理器以及处理各种 WebSocket 事件的过程。通过实际的代码示例和详细的解释,展示了在 tio-boot
环境下实现实时通信服务的便捷性和高效性。
在实际应用中,可以根据业务需求扩展和优化当前的实现,如添加身份验证、支持更多类型的消息、处理复杂的业务逻辑等。希望本文能为您在开发实时通信应用时提供有用的指导和参考。