当前位置: 首页 > news >正文

免费学编程的网站有哪些百度域名收录

免费学编程的网站有哪些,百度域名收录,浙江省电子商务网站建设,大连网站推广目录 四、应用 1、粘包与半包 现象分析 粘包 半包 本质 解决方案 短链接 定长解码器 行解码器 长度字段解码器——LTC 2、协议设计与解析 协议的作用 Redis协议 HTTP协议 自定义协议 组成要素 编码器与解码器 编写测试类 Sharable注解 自定义编解码器能否使用Sharable注解 3、在…

目录

四、应用

1、粘包与半包

现象分析

粘包

半包

本质

解决方案

短链接

定长解码器

行解码器

长度字段解码器——LTC

2、协议设计与解析

协议的作用

Redis协议

HTTP协议

自定义协议

组成要素

编码器与解码器

编写测试类

 @Sharable注解

自定义编解码器能否使用@Sharable注解

3、在线聊天室

聊天室业务

用户登录接口

用户会话接口

群聊会话接口

整体结构

客户端代码结构

服务器代码结构

登录

客户端代码

服务器代码

运行结果

单聊

群聊

创建群聊

群聊聊天

加入群聊

退出

查看群聊成员

退出聊天室

连接假死

解决方法



四、应用

1、粘包与半包

粘包和半包问题是数据传输中比较常见的问题,所谓的粘包问题是指数据在传输时,在一条消息中读取到了另一条消息的部分数据,这种现象就叫做粘包。比如发送了两条消息,分别为“ABC”和“DEF”,那么正常情况下接收端也应该收到两条消息“ABC”和“DEF”,但接收端却收到的是“ABCD”,像这种情况就叫做粘包,半包问题是指接收端只收到了部分数据,而非完整的数据的情况就叫做半包。比如发送了一条消息是“ABC”,而接收端却收到的是“AB”和“C”两条信息,这种情况就叫做半包

只要是TCP协议的网络交互都有粘包和半包问题,因为TCP的传输是基于字节的传输方式,数据是以字节的形式进行传输的,并没有明确的边界。因此,在传输过程中,TCP没有办法直接识别数据包的边界,并且在流量控制下,TCP的字节传输还不稳定,当发送方连续发送多个数据包时,这些数据包可能会在网络传输的过程中合并或拆分,导致粘包和半包问题的出现,而UDP则没有这个问题,因为UDP的传输是基于数据报的

现象分析

粘包

现象

  • 发送 abc def,接收 abcdef

原因

  • 应用层
    • 接收方 ByteBuf 设置太大(Netty 默认 1024)
  • 传输层-网络层
    • 滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大(大于256 bytes),这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包
    • Nagle 算法:会造成粘包

Nagle算法的原理如下:当TCP发送方需要发送一个小数据包时,Nagle算法会将这个数据包缓存起来,不立即发送。然后,TCP发送方会继续等待其他数据,直到以下两个条件中的任意一个满足后再发送数据:

  1. 接收到之前发送的数据的确认ACK。
  2. 发送方的发送缓冲区中的数据量达到一定的阈值(一般是MSS,即最大报文长度)。

之所以要缓存起来是因为一个TCP的请求都是要进行数据报头的添加,而IP的报头+TCP的报头 = 40字节,哪怕你只是发送了1个字节的数据,也会被封装为一个41字节的传输内容,那这样子粘包现象就很严重了,为此解决方法就是,缓存多点字节再一起发过来。然而,当发送方连续发送多个小数据包时,这些数据包可能会在网络传输的过程中被合并成一个大数据包,导致粘包问题的出现。这是因为Nagle算法本身不考虑数据包的边界,只是简单地将小数据包缓存起来,直到条件满足后发送。

半包

现象

  • 发送 abcdef,接收 abc def

原因

  • 应用层
    • 接收方 ByteBuf 小于实际发送数据量
  • 传输层-网络层
    • 滑动窗口:假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时接收方窗口中无法容纳发送方的全部报文,发送方只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包
  • 数据链路层
    • MSS 限制:当发送的数据超过 MSS (最大报文长度)限制后,会将数据切分发送,就会造成半包
本质

发生粘包与半包现象的本质是因为 TCP 是流式协议,消息无边界


解决方案

解决方案的思路和我这篇文章的处理方式类似,可以先看一下这个大概思路icon-default.png?t=N7T8https://blog.csdn.net/weixin_73077810/article/details/131843387

短链接长连接是描述客户端与服务器之间TCP连接持续时间的概念。

  • 短链接:短链接通常指的是一次性的临时连接。在短链接中,客户端与服务器建立连接、交换数据后,连接就会关闭。在每次通信之前,需要重新建立连接,进行握手和协商。

短链接的优点是简单、轻量,适用于临时的低频率的通信。但在高并发或频繁通信的场景中,频繁的连接建立和关闭会增加网络开销和延迟。

  • 长连接:长连接指的是客户端与服务器之间持久的TCP连接。在长连接中,连接一经建立,客户端和服务器可以多次长时间地进行双向通信。在连接建立后,数据可以实时、便捷地传输。

长连接的优点是减少连接建立和断开的开销,节省网络资源,减少延迟,提高通信效率。长连接常用于需要实时交互的应用,如即时通信、实时数据传输等。

需要注意的是,长连接可能会带来一些管理上的挑战。服务器需要维护大量的长连接,消耗资源,需要适当管理连接数和超时机制,防止资源浪费和死连接问题。

短链接

        客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开。这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以短链接无法解决半包现象

客户端代码改进

修改channelActive方法

public void channelActive(ChannelHandlerContext ctx) throws Exception {log.debug("sending...");ByteBuf buffer = ctx.alloc().buffer(16);buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});ctx.writeAndFlush(buffer);// 使用短链接,每次发送完毕后就断开连接ctx.channel().close();
}

定长解码器

客户端于服务器约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度。若发送数据长度不足则需要补齐至该长度

服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到FixedLengthFrameDecoder对数据进行定长解码,具体使用方法如下

ch.pipeline().addLast(new FixedLengthFrameDecoder(16));

客户端代码

客户端发送数据的代码如下

// 约定最大长度为16
final int maxLength = 16;
// 被发送的数据
char c = 'a';
// 向服务器发送10个报文
for (int i = 0; i < 10; i++) {ByteBuf buffer = ctx.alloc().buffer(maxLength);// 定长byte数组,未使用部分会以0进行填充byte[] bytes = new byte[maxLength];// 生成长度为0~15的数据for (int j = 0; j < (int)(Math.random()*(maxLength-1)); j++) {bytes[j] = (byte) c;}buffer.writeBytes(bytes);c++;// 将数据发送给服务器ctx.writeAndFlush(buffer);
}

服务器代码

使用FixedLengthFrameDecoder对粘包数据进行拆分,该handler需要添加在LoggingHandler之前,保证数据被打印时已被拆分

// 通过定长解码器对粘包数据进行拆分
ch.pipeline().addLast(new FixedLengthFrameDecoder(16));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));

行解码器

行解码器的是通过分隔符对数据进行拆分来解决粘包半包问题的

  • 可以通过LineBasedFrameDecoder(int maxLength)来拆分以换行符(\n  or  \r\n)为分隔符的数据
  • 可以通过DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters)指定通过什么分隔符来拆分数据(可以传入多个分隔符)

两种解码器都需要传入数据的最大长度,若超出最大长度,会抛出TooLongFrameException异常

以换行符 \n 为分隔符

客户端代码

// 约定最大长度为 64
final int maxLength = 64;
// 被发送的数据
char c = 'a';
for (int i = 0; i < 10; i++) {ByteBuf buffer = ctx.alloc().buffer(maxLength);// 生成长度为0~62的数据Random random = new Random();StringBuilder sb = new StringBuilder();for (int j = 0; j < (int)(random.nextInt(maxLength-2)); j++) {sb.append(c);}// 数据以 \n 结尾sb.append("\n");buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));c++;// 将数据发送给服务器ctx.writeAndFlush(buffer);
}

服务器代码

// 通过行解码器对粘包数据进行拆分,以 \n 为分隔符
// 需要指定最大长度
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(64));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));

以自定义分隔符 \c 为分隔符

客户端代码

// 数据以 \c 结尾
sb.append("\\c");
buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));

服务器代码

// 将分隔符放入ByteBuf中
ByteBuf bufSet = ch.alloc().buffer().writeBytes("\\c".getBytes(StandardCharsets.UTF_8));
// 通过行解码器对粘包数据进行拆分,以 \c 为分隔符
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(64, ch.alloc().buffer().writeBytes(bufSet)));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));

长度字段解码器——LTC

在传送数据时可以在数据中添加一个用于表示有用数据长度的字段,在解码时读取出这个用于表明长度的字段,同时读取其他相关参数,即可知道最终需要的数据是什么样子的

LengthFieldBasedFrameDecoder解码器可以提供更为丰富的拆分方法,其构造方法有五个参数

public LengthFieldBasedFrameDecoder(int maxFrameLength,int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip)

参数解析

  • maxFrameLength 数据最大长度

    • 表示数据的最大长度(包括附加信息、长度标识等内容)
  • lengthFieldOffset 数据长度标识的起始偏移量

    • 用于指明数据第几个字节开始是用于标识有用字节长度的,因为前面可能还有其他附加信息
  • lengthFieldLength 数据长度标识所占字节数(用于指明有用数据的长度)

    • 数据中用于表示有用数据长度的标识所占的字节数
  • lengthAdjustment 长度表示为有用数据的偏移量

    • 用于指明数据长度标识和有用数据之间的距离,因为两者之间还可能有附加信息
  • initialBytesToStrip 数据读取起点

    • 读取起点,不读取 0 ~ initialBytesToStrip 之间的数据

参数图解

使用

通过 EmbeddedChannel 对 handler 进行测试

public class EncoderStudy {public static void main(String[] args) {// 模拟服务器// 使用EmbeddedChannel测试handlerEmbeddedChannel channel = new EmbeddedChannel(// 数据最大长度为1KB,长度标识前后各有1个字节的附加信息,长度标识长度为4个字节(int)new LengthFieldBasedFrameDecoder(1024, 1, 4, 1, 0),new LoggingHandler(LogLevel.DEBUG));// 模拟客户端,写入数据ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();send(buffer, "Hello");channel.writeInbound(buffer);send(buffer, "World");channel.writeInbound(buffer);}private static void send(ByteBuf buf, String msg) {// 得到数据的长度int length = msg.length();byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);// 将数据信息写入buf// 写入长度标识前的其他信息buf.writeByte(0xCA);// 写入数据长度标识buf.writeInt(length);// 写入长度标识后的其他信息buf.writeByte(0xFE);// 写入具体的数据buf.writeBytes(bytes);}
}

运行结果

146  [main] DEBUG io.netty.handler.logging.LoggingHandler  - [id: 0xembedded, L:embedded - R:embedded] READ: 11B+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| ca 00 00 00 05 fe 48 65 6c 6c 6f                |......Hello     |
+--------+-------------------------------------------------+----------------+146  [main] DEBUG io.netty.handler.logging.LoggingHandler  - [id: 0xembedded, L:embedded - R:embedded] READ: 11B+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| ca 00 00 00 05 fe 57 6f 72 6c 64                |......World     |
+--------+-------------------------------------------------+----------------+

2、协议设计与解析

协议的作用

TCP/IP 中消息传输基于字节流的方式,没有边界

协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则

Redis协议

如果我们要向Redis服务器发送一条set name Nyima的指令,需要遵守如下协议

// 该指令一共有3部分,每条指令之后都要添加回车与换行符
*3\r\n
// 第一个指令的长度是3
$3\r\n
// 第一个指令是set指令
set\r\n
// 下面的指令以此类推
$4\r\n
name\r\n
$5\r\n
Nyima\r\n

客户端代码如下

public class RedisClient {static final Logger log = LoggerFactory.getLogger(StudyServer.class);public static void main(String[] args) {NioEventLoopGroup group =  new NioEventLoopGroup();try {ChannelFuture channelFuture = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {// 打印日志ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 回车与换行符final byte[] LINE = {'\r','\n'};// 获得ByteBufByteBuf buffer = ctx.alloc().buffer();// 连接建立后,向Redis中发送一条指令,注意添加回车与换行// set name Nyimabuffer.writeBytes("*3".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("$3".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("set".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("$4".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("name".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("$5".getBytes());buffer.writeBytes(LINE);buffer.writeBytes("Nyima".getBytes());buffer.writeBytes(LINE);ctx.writeAndFlush(buffer);}});}}).connect(new InetSocketAddress("localhost", 6379));channelFuture.sync();// 关闭channelchannelFuture.channel().close().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {// 关闭groupgroup.shutdownGracefully();}}
}

Redis中查询执行结果


HTTP协议

HTTP协议在请求行请求头中都有很多的内容,自己实现较为困难,可以使用HttpServerCodec作为服务器端的解码器与编码器,来处理HTTP请求

// HttpServerCodec 中既有请求的解码器 HttpRequestDecoder 又有响应的编码器 HttpResponseEncoder
// Codec(CodeCombine) 一般代表该类既作为 编码器 又作为 解码器
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>implements HttpServerUpgradeHandler.SourceCodec

 服务器代码

public class HttpServer {static final Logger log = LoggerFactory.getLogger(StudyServer.class);public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();new ServerBootstrap().group(group).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));// 作为服务器,使用 HttpServerCodec 作为编码器与解码器ch.pipeline().addLast(new HttpServerCodec());// 服务器只处理HTTPRequest,具体的限定取决于泛型ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {// 获得请求urilog.debug(msg.uri());// 获得完整响应,设置版本号与状态码DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);// 设置响应内容byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);// 设置响应体长度,避免浏览器一直接收响应内容response.headers().setInt(CONTENT_LENGTH, bytes.length);// 设置响应体response.content().writeBytes(bytes);// 写回响应ctx.writeAndFlush(response);}});}}).bind(8080);}
}

服务器负责处理请求并响应浏览器。所以只需要处理HTTP请求即可

// 服务器只处理HTTPRequest
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>()

获得请求后,需要返回响应给浏览器。需要创建响应对象DefaultFullHttpResponse,设置HTTP版本号及状态码,为避免浏览器获得响应后,因为获得CONTENT_LENGTH而一直空转,需要添加CONTENT_LENGTH字段,表明响应体中数据的具体长度

// 获得完整响应,设置版本号与状态码
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 设置响应内容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 设置响应体长度,避免浏览器一直接收响应内容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 设置响应体
response.content().writeBytes(bytes);

运行结果

浏览器

自定义协议

组成要素
  • 魔数:用来在第一时间判定接收的数据是否为无效数据包
  • 版本号:可以支持协议的升级
  • 序列化算法:消息正文到底采用哪种序列化反序列化方式
    • 如:json、protobuf、hessian、jdk
  • 指令类型:是登录、注册、单聊、群聊… 跟业务相关
  • 请求序号:为了双工通信,提供异步能力
  • 正文长度
  • 消息正文

编码器与解码器
public class MessageCodec extends ByteToMessageCodec<Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {// 设置魔数 4个字节out.writeBytes(new byte[]{'N','Y','I','M'});// 设置版本号 1个字节out.writeByte(1);// 设置序列化方式 1个字节out.writeByte(1);// 设置指令类型 1个字节out.writeByte(msg.getMessageType());// 设置请求序号 4个字节out.writeInt(msg.getSequenceId());// 为了补齐为2的次幂个字节,填充1个字节的数据,满足为16字节out.writeByte(0xff);// 获得序列化后的msgByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();// 获得并设置正文长度 长度用4个字节标识out.writeInt(bytes.length);// 设置消息正文out.writeBytes(bytes);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {// 获取魔数int magic = in.readInt();// 获取版本号byte version = in.readByte();// 获得序列化方式byte seqType = in.readByte();// 获得指令类型byte messageType = in.readByte();// 获得请求序号int sequenceId = in.readInt();// 移除补齐字节in.readByte();// 获得正文长度int length = in.readInt();// 获得正文byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();// 将信息放入List中,传递给下一个handlerout.add(message);// 打印获得的信息正文System.out.println("===========魔数===========");System.out.println(magic);System.out.println("===========版本号===========");System.out.println(version);System.out.println("===========序列化方法===========");System.out.println(seqType);System.out.println("===========指令类型===========");System.out.println(messageType);System.out.println("===========请求序号===========");System.out.println(sequenceId);System.out.println("===========正文长度===========");System.out.println(length);System.out.println("===========正文===========");System.out.println(message);}
}
  • 编码器与解码器方法源于父类ByteToMessageCodec,通过该类可以自定义编码器与解码器,泛型类型为被编码与被解码的类。此处使用了自定义类Message,代表消息

public class MessageCodec extends ByteToMessageCodec<Message>
  • 编码器负责将附加信息与正文信息写入到ByteBuf中,其中附加信息总字节数最好为2n,不足需要补齐。正文内容如果为对象,需要通过序列化将其放入到ByteBuf中
  • 解码器负责将ByteBuf中的信息取出,并放入List中,该List用于将信息传递给下一个handler

编写测试类
public class TestCodec {static final org.slf4j.Logger log = LoggerFactory.getLogger(StudyServer.class);public static void main(String[] args) throws Exception {EmbeddedChannel channel = new EmbeddedChannel();// 添加解码器,避免粘包半包问题channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));// 开启控制台日志channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));// 绑定自定义编码器与解码器,其内部重写父类的encode和decode两个handler方法channel.pipeline().addLast(new MessageCodec());// 自定义的封装dto类LoginRequestMessage user = new LoginRequestMessage("Nyima", "123");// 测试编码与解码ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();// 内部将user正文数据存储到byteBuf的正文位置上new MessageCodec().encode(null, user, byteBuf);channel.writeInbound(byteBuf);}
}
  • 测试类中用到了LengthFieldBasedFrameDecoder,避免粘包半包问题
  • 通过MessageCodec的encode方法将附加信息与正文写入到ByteBuf中,通过channel执行入站操作。入站时会调用decode方法进行解码

运行结果


 @Sharable注解

为了提高handler的复用率,可以将handler创建为handler对象,然后在不同的channel中使用该handler对象进行处理操作

LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
// 不同的channel中使用同一个handler对象,提高复用率
channel1.pipeline().addLast(loggingHandler);
channel2.pipeline().addLast(loggingHandler);

但是并不是所有的handler都能通过这种方法来提高复用率的,例如LengthFieldBasedFrameDecoder。如果多个channel中使用同一个LengthFieldBasedFrameDecoder对象,则可能发生如下问题

  • channel1中收到了一个半包,LengthFieldBasedFrameDecoder发现不是一条完整的数据,则没有继续向下传播
  • 此时channel2中也收到了一个半包,因为两个channel使用了同一个LengthFieldBasedFrameDecoder,存入其中的数据刚好拼凑成了一个完整的数据包。LengthFieldBasedFrameDecoder让该数据包继续向下传播,最终引发错误

为了提高handler的复用率,同时又避免出现一些并发问题,Netty中原生的handler中用@Sharable注解来标明,该handler能否在多个channel中共享。

只有带有该注解,才能通过对象的方式被共享,否则无法被共享


自定义编解码器能否使用@Sharable注解

这需要根据自定义的handler的处理逻辑进行分析

我们的MessageCodec本身接收的是LengthFieldBasedFrameDecoder处理之后的数据,那么数据肯定是完整的,按分析来说是可以添加@Sharable注解的

但是实际情况我们并不能添加该注解,会抛出异常信息ChannelHandler cn.nyimac.study.day8.protocol.MessageCodec is not allowed to be shared

  • 因为MessageCodec继承自ByteToMessageCodec,ByteToMessageCodec类的注解如下

  • 这就意味着ByteToMessageCodec不能被多个channel所共享的

    • 原因:因为该类的目标是:将ByteBuf转化为Message,意味着传进该handler的数据还未被处理过。所以传过来的ByteBuf可能并不是完整的数据,如果共享则会出现问题

如果想要共享,需要怎么办呢?

继承MessageToMessageDecoder即可。该类的目标是:将已经被处理的完整数据再次被处理。传过来的Message如果是被处理过的完整数据,那么被共享也就不会出现问题了,也就可以使用@Sharable注解了。实现方式与ByteToMessageCodec类似

@ChannelHandler.Sharable
public class MessageSharableCodec extends MessageToMessageCodec<ByteBuf, Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {...}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {...}
}

3、在线聊天室

聊天室业务

用户登录接口
public interface UserService {/*** 登录* @param username 用户名* @param password 密码* @return 登录成功返回 true, 否则返回 false*/boolean login(String username, String password);
}
用户会话接口
public interface Session {/*** 绑定会话* @param channel 哪个 channel 要绑定会话* @param username 会话绑定用户*/void bind(Channel channel, String username);/*** 解绑会话* @param channel 哪个 channel 要解绑会话*/void unbind(Channel channel);/*** 获取属性* @param channel 哪个 channel* @param name 属性名* @return 属性值*/Object getAttribute(Channel channel, String name);/*** 设置属性* @param channel 哪个 channel* @param name 属性名* @param value 属性值*/void setAttribute(Channel channel, String name, Object value);/*** 根据用户名获取 channel* @param username 用户名* @return channel*/Channel getChannel(String username);
}
群聊会话接口
public interface GroupSession {/*** 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null* @param name 组名* @param members 成员* @return 成功时返回组对象, 失败返回 null*/Group createGroup(String name, Set<String> members);/*** 加入聊天组* @param name 组名* @param member 成员名* @return 如果组不存在返回 null, 否则返回组对象*/Group joinMember(String name, String member);/*** 移除组成员* @param name 组名* @param member 成员名* @return 如果组不存在返回 null, 否则返回组对象*/Group removeMember(String name, String member);/*** 移除聊天组* @param name 组名* @return 如果组不存在返回 null, 否则返回组对象*/Group removeGroup(String name);/*** 获取组成员* @param name 组名* @return 成员集合, 如果群不存在或没有成员会返回 empty set*/Set<String> getMembers(String name);/*** 获取组成员的 channel 集合, 只有在线的 channel 才会返回* @param name 组名* @return 成员 channel 集合*/List<Channel> getMembersChannel(String name);/*** 判断群聊是否一被创建* @param name 群聊名称* @return 是否存在*/boolean isCreated(String name);
}
整体结构

  • client包:存放客户端相关类

  • message包:存放各种类型的消息

  • protocol包:存放自定义协议

  • server包:存放服务器相关类

    • service包:存放用户相关类
    • session包:单聊及群聊相关会话类
客户端代码结构
public class ChatClient {static final Logger log = LoggerFactory.getLogger(ChatClient.class);public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);MessageSharableCodec messageSharableCodec = new MessageSharableCodec();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group);bootstrap.channel(NioSocketChannel.class);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 自定义的协议解码粘半包处理器ch.pipeline().addLast(new ProtocolFrameDecoder());ch.pipeline().addLast(loggingHandler);ch.pipeline().addLast(messageSharableCodec);}});Channel channel = bootstrap.connect().sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {group.shutdownGracefully();}}
}
服务器代码结构
public class ChatServer {static final Logger log = LoggerFactory.getLogger(ChatServer.class);public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);MessageSharableCodec messageSharableCodec = new MessageSharableCodec();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, worker);bootstrap.channel(NioServerSocketChannel.class);bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProtocolFrameDecoder());ch.pipeline().addLast(loggingHandler);ch.pipeline().addLast(messageSharableCodec);}});Channel channel = bootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}

登录

客户端代码

客户端添加如下handler,分别处理登录、聊天等操作

@Slf4j
public class ChatClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// 这是一个计数锁,只有当其维护的value减为0的时候才会释放CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);// 原子变量AtomicBoolean LOGIN = new AtomicBoolean(false);try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {/*** 创建连接时执行的处理器,用于执行登陆操作*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws                     Exception {// 开辟额外线程(不要让nio的线程被录入阻塞),用于用户登陆及后续操作new Thread(()->{Scanner scanner = new Scanner(System.in);System.out.println("请输入用户名");String username = scanner.next();System.out.println("请输入密码");String password = scanner.next();// 创建包含登录信息的请求体LoginRequestMessage message = new LoginRequestMessage(username, password);// 发送到channel中,注意这里用ctx写出,因为他要从这里找前面的那些处理器进行加工ctx.writeAndFlush(message);// 校验登录结果,如果能获取到锁就说明登录成功if (!loginStatus.get()) {// 登陆失败,关闭channel并返回ctx.channel().close();return;}// 登录成功后,执行其他操作while (true) {System.out.println("==================================");System.out.println("send [username] [content]");System.out.println("gsend [group name] [content]");System.out.println("gcreate [group name] [m1,m2,m3...]");System.out.println("gmembers [group name]");System.out.println("gjoin [group name]");System.out.println("gquit [group name]");System.out.println("quit");System.out.println("==================================");String command = scanner.nextLine();// 获得指令及其参数,并发送对应类型消息// 注意这里!!!!!你发送的消息类型决定了在服务器端处理的handelerString[] commands = command.split(" ");switch (commands[0]){case "send":ctx.writeAndFlush(new ChatRequestMessage(username, commands[1], commands[2]));break;case "gsend":ctx.writeAndFlush(new GroupChatRequestMessage(username,commands[1], commands[2]));break;case "gcreate":// 分割,获得群员名String[] members = commands[2].split(",");Set<String> set = new HashSet<>(Arrays.asList(members));// 把自己加入到群聊中set.add(username);ctx.writeAndFlush(new GroupCreateRequestMessage(commands[1],set));break;case "gmembers":ctx.writeAndFlush(new GroupMembersRequestMessage(commands[1]));break;case "gjoin":ctx.writeAndFlush(new GroupJoinRequestMessage(username, commands[1]));break;case "gquit":ctx.writeAndFlush(new GroupQuitRequestMessage(username, commands[1]));break;case "quit":ctx.channel().close();return;default:System.out.println("指令有误,请重新输入");continue;}}}, "login channel").start();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 注意噢,这个消息的接收是在这里进行输出控制台log.debug("{}", msg);if (msg instanceof LoginResponseMessage) {// 如果是登录响应信息LoginResponseMessage message = (LoginResponseMessage) msg;boolean isSuccess = message.isSuccess();// 登录成功,设置登陆标记if (isSuccess) {loginStatus.set(true);}// 登陆后,唤醒登陆线程,原始计数为1,减了一个后就变为0,释放锁waitLogin.countDown();}}});
服务器代码
@Slf4j
public class ChatServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());// 日志ch.pipeline().addLast(LOGGING_HANDLER);// 自定义的协议编解码操作ch.pipeline().addLast(MESSAGE_CODEC);// 只对LoginRequestMessage解码结果进行操作ch.pipeline().addLast(new SimpleChannelInboundHandler<LoginRequestMessage>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {String username = msg.getUsername();String password = msg.getPassword();// 拿着账号密码去后端做校验,校验通过还要把用户名和channel的对应关系也要存储起来,用来实现单聊的时候看对方在不在线boolean login = UserServiceFactory.getUserService().login(username, password);LoginResponseMessage message;if(login) {message = new LoginResponseMessage(true, "登录成功");} else {message = new LoginResponseMessage(false, "用户名或密码不正确");}ctx.writeAndFlush(message);}});}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}
// 该handler处理登录请求
LoginRequestMessageHandler loginRequestMessageHandler = new LoginRequestMessageHandler();
ch.pipeline().addLast(new LoginRequestMessageHandler());
运行结果

客户端

5665 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec  - 1314474317, 1, 1, 1, 0, 279
5667 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec  - message:AbstractResponseMessage{success=true, reason='登陆成功'}
5667 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - AbstractResponseMessage{success=true, reason='登陆成功'}
success

服务器

11919 [nioEventLoopGroup-3-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec  - 1314474317, 1, 1, 0, 0, 217
11919 [nioEventLoopGroup-3-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec  - message:LoginRequestMessage{username='Nyima', password='123'}7946 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler  - [id: 0x8e7c07f6, L:/127.0.0.1:8080 - R:/127.0.0.1:60572] WRITE: 295B+-------------------------------------------------+|  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 4e 59 49 4d 01 01 01 00 00 00 00 ff 00 00 01 17 |NYIM............|
|00000010| ac ed 00 05 73 72 00 31 63 6e 2e 6e 79 69 6d 61 |....sr.1cn.nyima|
|00000020| 63 2e 73 74 75 64 79 2e 64 61 79 38 2e 6d 65 73 |c.study.day8.mes|
|00000030| 73 61 67 65 2e 4c 6f 67 69 6e 52 65 73 70 6f 6e |sage.LoginRespon|
|00000040| 73 65 4d 65 73 73 61 67 65 e2 34 49 24 72 52 f3 |seMessage.4I$rR.|
|00000050| 07 02 00 00 78 72 00 34 63 6e 2e 6e 79 69 6d 61 |....xr.4cn.nyima|
|00000060| 63 2e 73 74 75 64 79 2e 64 61 79 38 2e 6d 65 73 |c.study.day8.mes|
|00000070| 73 61 67 65 2e 41 62 73 74 72 61 63 74 52 65 73 |sage.AbstractRes|
|00000080| 70 6f 6e 73 65 4d 65 73 73 61 67 65 b3 7e 19 32 |ponseMessage.~.2|
|00000090| 9b 88 4d 7b 02 00 02 5a 00 07 73 75 63 63 65 73 |..M{...Z..succes|
|000000a0| 73 4c 00 06 72 65 61 73 6f 6e 74 00 12 4c 6a 61 |sL..reasont..Lja|
|000000b0| 76 61 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 3b 78 |va/lang/String;x|
|000000c0| 72 00 24 63 6e 2e 6e 79 69 6d 61 63 2e 73 74 75 |r.$cn.nyimac.stu|
|000000d0| 64 79 2e 64 61 79 38 2e 6d 65 73 73 61 67 65 2e |dy.day8.message.|
|000000e0| 4d 65 73 73 61 67 65 dd e9 84 b7 21 db 18 52 02 |Message....!..R.|
|000000f0| 00 02 49 00 0b 6d 65 73 73 61 67 65 54 79 70 65 |..I..messageType|
|00000100| 49 00 0a 73 65 71 75 65 6e 63 65 49 64 78 70 00 |I..sequenceIdxp.|
|00000110| 00 00 00 00 00 00 00 01 74 00 0c e7 99 bb e9 99 |........t.......|
|00000120| 86 e6 88 90 e5 8a 9f                            |.......         |
+--------+-------------------------------------------------+----------------+

单聊

客户端输入send username content即可发送单聊消息,需要服务器端添加处理ChatRequestMessage的handler

@ChannelHandler.Sharable // 必须添加该注解
// 表明只对ChatRequestMessage的消息进行加工
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {// 获得user所在的channelChannel channel = SessionFactory.getSession().getChannel(msg.getTo());// 如果双方都在线if (channel != null) {// 通过接收方与服务器之间的channel发送信息,注意,这里不是写到byteBuf去channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));} else {// 通过发送方与服务器之间的channel发送消息ctx.writeAndFlush(new ChatResponseMessage(false, "对方用户不存在或离线,发送失败"));}}
}
// 该handler处理单聊请求
ChatRequestMessageHandler chatRequestMessageHandler = new ChatRequestMessageHandler();
ch.pipeline().addLast(chatRequestMessageHandler);

运行结果

发送方(zhangsan)

send Nyima hello

接收方(Nyima)

// 收到zhangsan发来的消息
20230 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - ChatResponseMessage{from='zhangsan', content='hello'}

群聊

创建群聊

添加处理GroupCreateRequestMessage的handler

@ChannelHandler.Sharable
// 表明只对GroupCreateRequestMessage的消息进行加工
public class GroupCreateMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {// 获得要创建的群聊名,ctx对应的是发送这个创建群聊的业务请求的人的这个channelString groupName = msg.getGroupName();// 获得要创建的群聊的成员组(首次拉起形成群聊的那几个人,包含自身才行)Set<String> members = msg.getMembers();// 判断该群聊是否创建过,未创建返回null并创建群聊Group group = GroupSessionFactory.getGroupSession().createGroup(groupName, members);if (group == null) {// 向群的创建者发送创建成功消息GroupCreateResponseMessage groupCreateResponseMessage = new GroupCreateResponseMessage(true, groupName + "创建成功");ctx.writeAndFlush(groupCreateResponseMessage);// 获得在线群员的channel,给群员发送入群聊消息List<Channel> membersChannel = GroupSessionFactory.getGroupSession().getMembersChannel(groupName);groupCreateResponseMessage = new GroupCreateResponseMessage(true, "您已被拉入"+groupName);// 给每个在线群员发送消息for(Channel channel : membersChannel) {channel.writeAndFlush(groupCreateResponseMessage);}} else {// 发送失败消息给创建人GroupCreateResponseMessage groupCreateResponseMessage = new GroupCreateResponseMessage(false, groupName + "已存在");ctx.writeAndFlush(groupCreateResponseMessage);}}
}
// 该handler处理创建群聊请求
GroupCreateMessageHandler groupCreateMessageHandler = new GroupCreateMessageHandler();
ch.pipeline().addLast(groupCreateMessageHandler);

运行结果

创建者客户端

// 首次创建
gcreate Netty学习 zhangsan,lisi31649 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - AbstractResponseMessage{success=true, reason='Netty学习创建成功'}
15244 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - AbstractResponseMessage{success=true, reason='您已被拉入Netty学习'}// 再次创建
gcreate Netty学习 zhangsan,lisi
40771 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - AbstractResponseMessage{success=false, reason='Netty学习已存在'}

群员客户端

28788 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - AbstractResponseMessage{success=true, reason='您已被拉入Netty学习'}

群聊聊天
@ChannelHandler.Sharable
// 表明只对GroupChatRequestMessage的消息进行加工
public class GroupChatMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {String groupName = msg.getGroupName();GroupSession groupSession = GroupSessionFactory.getGroupSession();// 判断群聊是否存在boolean isCreated = groupSession.isCreated(groupName);if (isCreated) {// 给群员发送信息List<Channel> membersChannel = groupSession.getMembersChannel(groupName);for(Channel channel : membersChannel) {channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent()));}} else {ctx.writeAndFlush(new GroupChatResponseMessage(false, "群聊不存在"));}}
}
// 该handler处理群聊聊天
GroupChatMessageHandler groupChatMessageHandler = new GroupChatMessageHandler();
ch.pipeline().addLast(groupChatMessageHandler);

运行结果

发送方(群聊存在)

gsend Netty学习 你们好45408 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - GroupChatResponseMessage{from='zhangsan', content='你们好'}

接收方

48082 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - GroupChatResponseMessage{from='zhangsan', content='你们好'}

 发送方(群聊不存在)

gsend Spring学习 你们好25140 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - AbstractResponseMessage{success=false, reason='群聊不存在'}

加入群聊
@ChannelHandler.Sharable
public class GroupJoinMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {GroupSession groupSession = GroupSessionFactory.getGroupSession();// 判断该用户是否在群聊中Set<String> members = groupSession.getMembers(msg.getGroupName());boolean joinFlag = false;// 群聊存在且用户未加入,才能加入if (!members.contains(msg.getUsername()) && groupSession.isCreated(msg.getGroupName())) {joinFlag = true;}if (joinFlag) {// 加入群聊groupSession.joinMember(msg.getGroupName(), msg.getUsername());ctx.writeAndFlush(new GroupJoinResponseMessage(true,"加入"+msg.getGroupName()+"成功"));} else {ctx.writeAndFlush(new GroupJoinResponseMessage(false, "加入失败,群聊未存在或您已加入该群聊"));}}
}
// 该handler处理加入群聊
GroupJoinMessageHandler groupJoinMessageHandler = new GroupJoinMessageHandler();
ch.pipeline().addLast(groupJoinMessageHandler);

运行结果

正常加入群聊

94921 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - AbstractResponseMessage{success=true, reason='加入Netty学习成功'}

加入不能存在或已加入的群聊

44025 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - AbstractResponseMessage{success=false, reason='加入失败,群聊未存在或您已加入该群聊'}

退出
@ChannelHandler.Sharable
public class GroupQuitMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {GroupSession groupSession = GroupSessionFactory.getGroupSession();String groupName = msg.getGroupName();Set<String> members = groupSession.getMembers(groupName);String username = msg.getUsername();// 判断用户是否在群聊中以及群聊是否存在boolean joinFlag = false;if (groupSession.isCreated(groupName) && members.contains(username)) {// 可以退出joinFlag = true;}if (joinFlag) {// 退出成功groupSession.removeMember(groupName, username);ctx.writeAndFlush(new GroupQuitResponseMessage(true, "退出"+groupName+"成功"));} else {// 退出失败ctx.writeAndFlush(new GroupQuitResponseMessage(false, "群聊不存在或您未加入该群,退出"+groupName+"失败"));}}
}
// 该handler处理退出群聊
GroupQuitMessageHandler groupQuitMessageHandler = new GroupQuitMessageHandler();
ch.pipeline().addLast(groupQuitMessageHandler);

运行结果

正常退出

32282 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - AbstractResponseMessage{success=true, reason='退出Netty学习成功'}

退出不存在或未加入的群聊

67404 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - AbstractResponseMessage{success=false, reason='群聊不存在或您未加入该群,退出Netty失败'}

查看群聊成员
@ChannelHandler.Sharable
public class GroupMembersMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {ctx.writeAndFlush(new GroupMembersResponseMessage(GroupSessionFactory.getGroupSession().getMembers(msg.getGroupName())));}
}
// 该handler处理查看成员
GroupMembersMessageHandler groupMembersMessageHandler = new GroupMembersMessageHandler();
ch.pipeline().addLast(groupMembersMessageHandler);

运行结果

46557 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - GroupMembersResponseMessage{members=[zhangsan, Nyima]}

退出聊天室

@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {/*** 断开连接时触发 Inactive事件*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {// 解绑SessionFactory.getSession().unbind(ctx.channel());}/*** 异常退出,需要解绑*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// 解绑SessionFactory.getSession().unbind(ctx.channel());}
}
// 该handler处理退出聊天室
ch.pipeline().addLast(quitHandler);
GroupMembersMessageHandler groupMembersMessageHandler = new GroupMembersMessageHandler();

退出时,客户端会关闭channel并返回

case "quit":// 关闭channel并返回ctx.channel().close();return;

连接假死

原因

  • 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源
  • 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,会白白地消耗资源
  • 应用程序线程阻塞,无法进行数据读写

问题

  • 假死的连接占用的资源不能自动释放
  • 向假死的连接发送数据,得到的反馈是发送超时
解决方法

可以添加IdleStateHandler对空闲时间进行检测,通过构造函数可以传入三个参数

  • readerIdleTimeSeconds 读空闲经过的秒数
  • writerIdleTimeSeconds 写空闲经过的秒数
  • allIdleTimeSeconds 读和写空闲经过的秒数

当指定时间内未发生读或写事件时,会触发特定事件

  • 读空闲会触发  READER_IDLE
  • 写空闲会触发  WRITE_IDLE
  • 读和写空闲会触发  ALL_IDEL

将定时任务的周期设置为 0,这意味着不会触发该空闲状态事件。

想要处理这些事件,需要自定义事件处理函数

服务器端代码

// 用于空闲连接的检测,5s内未读到数据,会触发READ_IDLE事件
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
// 添加双向处理器,负责处理READER_IDLE事件
/*ChannelDuplexHandler 是 Netty 框架中的一个特殊类,它是用来处理网络通信中的读写事件的双向处理器。它扩展
了ChannelInboundHandler和ChannelOutboundHandler,同时负责处理从网络中读取到的数据以及将数据写入到网络中。*/
ch.pipeline().addLast(new ChannelDuplexHandler() {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// 获得事件IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {// 断开连接ctx.channel().close();}}
});
  • 使用IdleStateHandler进行空闲检测
  • 使用双向处理器ChannelDuplexHandler对入站与出站事件进行处理
    • IdleStateHandler中的事件为特殊事件,需要实现ChannelDuplexHandleruserEventTriggered方法,判断事件类型并自定义处理方式,来对事件进行处理

避免因非网络等原因引发的WRITER_IDLE事件,比如网络情况良好,只是用户本身没有输入数据,这时发生WRITER_IDLE事件,直接让服务器断开连接是不可取的

为避免此类情况,需要在客户端向服务器发送心跳包,发送频率要小于服务器设置的IdleTimeSeconds,一般设置为其值的一半

客户端代码

// 发送心跳包,让服务器知道客户端在线
// 3s未发生WRITER_IDLE,就像服务器发送心跳包
// 该值为服务器端设置的READER_IDLE触发时间的一半左右
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
ch.pipeline().addLast(new ChannelDuplexHandler() {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.WRITER_IDLE) {// 发送心跳包ctx.writeAndFlush(new PingMessage());}}
});

http://www.tj-hxxt.cn/news/57330.html

相关文章:

  • wordpress 说说 主题北京seo方法
  • 网站没有被搜索引擎收录推广普通话的内容简短
  • 优秀网站建设网页外包平台
  • 湖南营销型网站建设价格舆情信息范文
  • 黄石企业网站建设整合营销理论
  • 我想投诉做软件的网站营业推广促销
  • 京东网站推广方式附近电脑培训班零基础
  • 双语网站建设重大军事新闻
  • 不锈钢网站建设深圳seo专家
  • 徐州企业做网站免费seo快速排名工具
  • 中文网站模板 免费it培训班大概需要多少钱
  • 安卓和web网站开发专业关键词排名优化软件
  • 上海十大b2c网站建设有没有免费的广告平台
  • 200做网站公司企业网站制作
  • 设计logo网站赚钱上海推广系统
  • 做外贸英文网站seo优化靠谱吗
  • 农机网站模版深圳市企业网站seo营销工具
  • 网站制作湖州今日足球比赛预测推荐分析
  • 接设计网站百度注册入口
  • 企业网站建设指导思想品牌营销咨询公司
  • javaweb做视频网站原理百度竞价推广点击器
  • 网站怎么弄域名网站
  • 雨花台网站建设汕头seo代理商
  • 企业网站的网络营销功能包括google网页版登录入口
  • 宁波十大外贸公司成都网站排名 生客seo
  • 江苏省建设厅副厅长网站衡水网站seo
  • 中小网站公司做的推广怎么样免费网站模板网
  • 网站空间是虚拟机吗网站制作工具有哪些
  • 建筑案例网站网上怎么免费推广
  • wordpress如何播放百度云视频优化大师tv版