成都网站建设企业 排名,网站开发实训报告,wordpress导航兰,WordPress查看文章目录 1 为什么需要协议#xff1f;2 redis 协议举例3 http 协议举例4 自定义协议要素4.1 编解码器4.2 什么时候可以加 Sharable 1 为什么需要协议#xff1f; TCP/IP 中消息传输基于流的方式#xff0c;没有边界。
协议的目的就是划定消息的边界#xff0c;制定通信双方要… 目录 1 为什么需要协议2 redis 协议举例3 http 协议举例4 自定义协议要素4.1 编解码器4.2 什么时候可以加 Sharable 1 为什么需要协议 TCP/IP 中消息传输基于流的方式没有边界。
协议的目的就是划定消息的边界制定通信双方要共同遵守的通信规则
例如在网络上传输
下雨天留客天留我不留是中文一句著名的无标点符号句子在没有标点符号情况下这句话有数种拆解方式而意思却是完全不同所以常被用作讲述标点符号的重要性
一种解读
下雨天留客天留我不留另一种解读
下雨天留客天留我不留如何设计协议呢其实就是给网络传输的信息加上“标点符号”。但通过分隔符来断句不是很好因为分隔符本身如果用于传输那么必须加以区分。因此下面一种协议较为常用
定长字节表示内容长度 实际内容例如假设一个中文字符长度为 3按照上述协议的规则发送信息方式如下就不会被接收方弄错意思了
0f下雨天留客06天留09我不留小故事 很久很久以前一位私塾先生到一家任教。双方签订了一纸协议“无鸡鸭亦可无鱼肉亦可白菜豆腐不可少不得束修金”。此后私塾先生虽然认真教课但主人家则总是给私塾先生以白菜豆腐为菜丝毫未见鸡鸭鱼肉的款待。私塾先生先是很不解可是后来也就想通了主人把鸡鸭鱼肉的钱都会换为束修金的也罢。至此双方相安无事。 年关将至一个学年段亦告结束。私塾先生临行时也不见主人家为他交付束修金遂与主家理论。然主家亦振振有词“有协议为证——无鸡鸭亦可无鱼肉亦可白菜豆腐不可少不得束修金。这白纸黑字明摆着的你有什么要说的呢” 私塾先生据理力争“协议是这样的——无鸡鸭亦可无鱼肉亦可白菜豆腐不可少不得束修金。” 双方唇枪舌战你来我往真个是不亦乐乎 这里的束修金也作“束脩”应当是泛指教师应当得到的报酬 2 redis 协议举例
NioEventLoopGroup worker new NioEventLoopGroup();
byte[] LINE {13, 10};
try {Bootstrap bootstrap new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 会在连接 channel 建立成功后会触发 active 事件Overridepublic void channelActive(ChannelHandlerContext ctx) {set(ctx);get(ctx);}private void get(ChannelHandlerContext ctx) {ByteBuf buf ctx.alloc().buffer();buf.writeBytes(*2.getBytes());buf.writeBytes(LINE);buf.writeBytes($3.getBytes());buf.writeBytes(LINE);buf.writeBytes(get.getBytes());buf.writeBytes(LINE);buf.writeBytes($3.getBytes());buf.writeBytes(LINE);buf.writeBytes(aaa.getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}private void set(ChannelHandlerContext ctx) {ByteBuf buf ctx.alloc().buffer();buf.writeBytes(*3.getBytes());buf.writeBytes(LINE);buf.writeBytes($3.getBytes());buf.writeBytes(LINE);buf.writeBytes(set.getBytes());buf.writeBytes(LINE);buf.writeBytes($3.getBytes());buf.writeBytes(LINE);buf.writeBytes(aaa.getBytes());buf.writeBytes(LINE);buf.writeBytes($3.getBytes());buf.writeBytes(LINE);buf.writeBytes(bbb.getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf (ByteBuf) msg;System.out.println(buf.toString(Charset.defaultCharset()));}});}});ChannelFuture channelFuture bootstrap.connect(localhost, 6379).sync();channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {log.error(client error, e);
} finally {worker.shutdownGracefully();
}3 http 协议举例
NioEventLoopGroup boss new NioEventLoopGroup();
NioEventLoopGroup worker new NioEventLoopGroup();
try {ServerBootstrap serverBootstrap new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new HttpServerCodec());ch.pipeline().addLast(new SimpleChannelInboundHandlerHttpRequest() {Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {// 获取请求log.debug(msg.uri());// 返回响应DefaultFullHttpResponse response new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);byte[] bytes h1Hello, world!/h1.getBytes();response.headers().setInt(CONTENT_LENGTH, bytes.length);response.content().writeBytes(bytes);// 写回响应ctx.writeAndFlush(response);}});/*ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug({}, msg.getClass());if (msg instanceof HttpRequest) { // 请求行请求头} else if (msg instanceof HttpContent) { //请求体}}});*/}});ChannelFuture channelFuture serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {log.error(server error, e);
} finally {boss.shutdownGracefully();worker.shutdownGracefully();
}4 自定义协议要素
魔数用来在第一时间判定是否是无效数据包版本号可以支持协议的升级序列化算法消息正文到底采用哪种序列化反序列化方式可以由此扩展例如json、protobuf、hessian、jdk指令类型是登录、注册、单聊、群聊… 跟业务相关请求序号为了双工通信提供异步能力正文长度消息正文
4.1 编解码器
根据上面的要素设计一个登录请求消息和登录响应消息并使用 Netty 完成收发
Slf4j
public class MessageCodec extends ByteToMessageCodecMessage {Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(0);// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组ByteArrayOutputStream bos new ByteArrayOutputStream();ObjectOutputStream oos new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes bos.toByteArray();// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);}Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, ListObject out) throws Exception {int magicNum in.readInt();byte version in.readByte();byte serializerType in.readByte();byte messageType in.readByte();int sequenceId in.readInt();in.readByte();int length in.readInt();byte[] bytes new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois new ObjectInputStream(new ByteArrayInputStream(bytes));Message message (Message) ois.readObject();log.debug({}, {}, {}, {}, {}, {}, magicNum, version, serializerType, messageType, sequenceId, length);log.debug({}, message);out.add(message);}
}测试
EmbeddedChannel channel new EmbeddedChannel(new LoggingHandler(),new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),new MessageCodec()
);
// encode
LoginRequestMessage message new LoginRequestMessage(zhangsan, 123, 张三);
// channel.writeOutbound(message);
// decode
ByteBuf buf ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buf);ByteBuf s1 buf.slice(0, 100);
ByteBuf s2 buf.slice(100, buf.readableBytes() - 100);
s1.retain(); // 引用计数 2
channel.writeInbound(s1); // release 1
channel.writeInbound(s2);解读 4.2 什么时候可以加 Sharable
当 handler 不保存状态时就可以安全地在多线程下被共享但要注意对于编解码器类不能继承 ByteToMessageCodec 或 CombinedChannelDuplexHandler 父类他们的构造方法对 Sharable 有限制如果能确保编解码器不会保存状态可以继承 MessageToMessageCodec 父类
Slf4j
ChannelHandler.Sharable
/*** 必须和 LengthFieldBasedFrameDecoder 一起使用确保接到的 ByteBuf 消息是完整的*/
public class MessageCodecSharable extends MessageToMessageCodecByteBuf, Message {Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, ListObject outList) throws Exception {ByteBuf out ctx.alloc().buffer();// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(0);// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组ByteArrayOutputStream bos new ByteArrayOutputStream();ObjectOutputStream oos new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes bos.toByteArray();// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);outList.add(out);}Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, ListObject out) throws Exception {int magicNum in.readInt();byte version in.readByte();byte serializerType in.readByte();byte messageType in.readByte();int sequenceId in.readInt();in.readByte();int length in.readInt();byte[] bytes new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois new ObjectInputStream(new ByteArrayInputStream(bytes));Message message (Message) ois.readObject();log.debug({}, {}, {}, {}, {}, {}, magicNum, version, serializerType, messageType, sequenceId, length);log.debug({}, message);out.add(message);}
}