制作深圳网站建设,做公司网站排名,android购物商城源码,wordpress网站打开慢目录 创建数据库表xml实体类启动类线程类客户端代码handlecontroller类缓存tcp链接 接到一个需求#xff0c;需要实现转发通讯模块tcp数据其他的服务器#xff0c;也就是转发tcp数据到多客户端 任务拆解:
首先需要建立多客户端#xff0c;每个客户端有一个独立的clientId和… 目录 创建数据库表xml实体类启动类线程类客户端代码handlecontroller类缓存tcp链接 接到一个需求需要实现转发通讯模块tcp数据其他的服务器也就是转发tcp数据到多客户端 任务拆解:
首先需要建立多客户端每个客户端有一个独立的clientId和对应的tcp通道对应能动态的根据clientId关闭对应的转发任务停止服务的时候需要断开所有的客户端连接减少开销客户端需要实现断线重连(考虑到断网的清空)
注意本篇文章是只是实现转发操作不支持转发的服务器反向控制设备需要做特殊处理如果大家感兴趣给我留言
下面我们根据我们头脑风暴的结果来想办法实现上面的过程
创建数据库表
CREATE TABLE station_message_transmit (id bigint(32) NOT NULL COMMENT 主键,station_id int(11) NOT NULL COMMENT 站点id,host varchar(50) DEFAULT NULL COMMENT 主机ip,port int(11) DEFAULT NULL COMMENT 端口,create_by varchar(64) DEFAULT NULL COMMENT 创建人,create_time datetime DEFAULT NULL COMMENT 创建时间,update_by varchar(64) DEFAULT NULL COMMENT 修改人,update_time datetime DEFAULT NULL COMMENT 创建时间,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf8;所有的转发数据都是基于单个站点(单个设备)id是唯一的后续会通过该id绑定tcp通道来实现发数据关闭连接等操作
xml
?xml version1.0?
projectxsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdxmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancemodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.0.5.RELEASE/versionrelativePath //parentgroupIdboot.base.tcp.client/groupIdartifactIdboot-example-base-tcp-client-2.0.5/artifactIdversion0.0.1-SNAPSHOT/versionnameboot-example-base-tcp-client-2.0.5/nameurlhttp://maven.apache.org/urlpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingproject.reporting.outputEncodingUTF-8/project.reporting.outputEncodingjava.version1.8/java.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdio.netty/groupIdartifactIdnetty-all/artifactId/dependencydependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger2/artifactIdversion2.9.2/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.20/versionscopeprovided/scope/dependency/dependenciesbuildplugins!-- 打包成一个可执行jar --plugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdexecutionsexecutiongoalsgoalrepackage/goal/goals/execution/executions/plugin/plugins/build
/project
所需要的依赖这里只是实现一个简单的demo,来实践一下我的设想是否能实现。
实体类
yml配置文件就不需要配置一切从简默认的端口是8080
package com.test;import lombok.Data;/*** author wu* version 1.0* date 2023/10/18 16:39*/
Data
public class StationMessageTransmit {/** 唯一编号 */private Long id;/** 站点id */private Integer stationId;/** 主机ip */private String host;/** 端口 */private Integer port;
}
启动类
package com.test;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStartedEvent;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** wu*/
SpringBootApplication
EnableAsync
EnableScheduling
public class BootNettyClientApplication implements CommandLineRunner, ApplicationListener {public static void main( String[] args ) {SpringApplication app new SpringApplication(BootNettyClientApplication.class);app.run(args);System.out.println( Hello World! );}AsyncOverridepublic void run(String... args) throws Exception {StationMessageTransmit tran new StationMessageTransmit();tran.setId(1L);tran.setHost(192.168.10.128);tran.setPort(5000);tran.setStationId(13);StationMessageTransmit tran1 new StationMessageTransmit();tran1.setId(2L);tran1.setHost(192.168.10.128);tran1.setPort(5001);tran1.setStationId(13);ListStationMessageTransmit traces new ArrayListStationMessageTransmit();traces.add(tran);traces.add(tran1);for (StationMessageTransmit trace : traces) {BootNettyClientThread thread new BootNettyClientThread(trace);thread.start();}}Overridepublic void onApplicationEvent(ApplicationEvent applicationEvent) {if(applicationEvent instanceof ContextClosedEvent){System.out.println(应用关闭事件);for (Map.EntryString, ChannelHandlerContext entry : BootNettyClientGroupCache.groupMapCache.entrySet()) {ChannelHandlerContext channelHandlerContext entry.getValue();if(channelHandlerContext ! null){System.out.println(关闭链接:entry.getKey());channelHandlerContext.close();}}}else if(applicationEvent instanceof ContextRefreshedEvent){System.out.println(应用刷新事件);}else if(applicationEvent instanceof ContextStartedEvent){System.out.println(应用开启事件);}else if(applicationEvent instanceof ContextStoppedEvent){System.out.println(应用停止事件);}}
} run方法里面主要干的活是一个伪代码模拟从数据拿数据再初始化创建多个客户端。 onApplicationEvent方法主要是监控服务停止的事件这是考虑到tcp是长链接跟其他服务器链接是一直没有中断会存在多次重建连接的问题所以需要再关闭事件中关闭所有的tcp客户端连接
线程类
package com.test;/**** netty 客户端* wu*/
public class BootNettyClientThread extends Thread {private StationMessageTransmit trace;public BootNettyClientThread(StationMessageTransmit trace){this.trace trace;}Overridepublic void run() {try {new BootNettyClient().connect(trace);} catch (Exception e) {throw new RuntimeException(e);}}
}
传实体类主要是为了保证clientId和通道保证对应
客户端代码
package com.test;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;/**** netty 客户端* wu*/
public class BootNettyClient {private EventLoopGroup group;public void connect(StationMessageTransmit trace) throws Exception{/*** 客户端的NIO线程组**/group new NioEventLoopGroup();try {/*** Bootstrap 是一个启动NIO服务的辅助启动类 客户端的*/Bootstrap bootstrap new Bootstrap();bootstrap bootstrap.group(group);bootstrap bootstrap.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);/*** 设置 I/O处理类,主要用于网络I/O事件记录日志编码、解码消息*/bootstrap bootstrap.handler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024 * 1024));socketChannel.pipeline().addLast(new StringDecoder());socketChannel.pipeline().addLast(new TcpHandler(trace));}});/*** 连接服务端*/ChannelFuture future bootstrap.connect(trace.getHost(), trace.getPort()).sync();if(future.isSuccess()) {System.out.println(netty client start successtrace.toString());/*** 等待连接端口关闭*/future.channel().closeFuture().sync();}} finally {/*** 退出释放资源*/group.shutdownGracefully().sync();}}}
handle
package com.test;import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;/**** I/O数据读写处理类* wu*/
ChannelHandler.Sharable
public class TcpHandler extends ChannelInboundHandlerAdapter{private static ScheduledExecutorService SCHEDULED_EXECUTOR Executors.newScheduledThreadPool(5);private StationMessageTransmit trace;public TcpHandler(StationMessageTransmit trace){this.trace trace;}/*** 从服务端收到新的数据时这个方法会在收到消息时被调用*/Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {if(msg null){return;}System.out.println(channelRead:read msg:msg.toString());//回应服务端//ctx.write(I got server message thanks server!);}/*** 从服务端收到新的数据、读取完成时调用*/Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws IOException {System.out.println(channelReadComplete);ctx.flush();}/*** 当出现 Throwable 对象才会被调用即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时*/Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException {System.out.println(exceptionCaught);cause.printStackTrace();ctx.close();//抛出异常断开与客户端的连接}/*** 客户端与服务端第一次建立连接时 执行*/Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {super.channelActive(ctx);InetSocketAddress inSocket (InetSocketAddress) ctx.channel().remoteAddress();String clientIp inSocket.getAddress().getHostAddress();System.out.println(服务器ip:clientIp,clientId:trace.getId());BootNettyClientGroupCache.save(trace.getId().toString(), ctx);}/*** 客户端与服务端 断连时 执行*/Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {super.channelInactive(ctx);InetSocketAddress inSocket (InetSocketAddress) ctx.channel().remoteAddress();String clientIp inSocket.getAddress().getHostAddress();ctx.close(); //断开连接时必须关闭否则造成资源浪费System.out.println(channelInactive:clientIp);//客户端重连//reset();}/*** 客户端重连*/public void reset(){//增加一个伪代码从服务器查询id对应的转发数据是否存在不存在则不继续运行转发任务SCHEDULED_EXECUTOR.schedule(() - {try {System.err.println(服务端链接不上开始重连操作...);new BootNettyClient().connect(trace);} catch (Exception e) {e.printStackTrace();}}, 3, TimeUnit.SECONDS);}}
reset方法是为了实现客户端重连3秒钟调用一次channelInactive方法客户端和服务器断开连接时会触发channelActive方法客户端和服务器建立连接时需要实现client和通道的绑定关系方便后续回写数据
controller类
package com.test;import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;/*** wu*/
RestController
public class BootNettyClientController {/*** 给所有客户端发送消息* param content* return*/PostMapping(/reportAllClientDataToServer)public String reportAllClientDataToServer(RequestParam(namecontent, required true) String content) {for (Map.EntryString, ChannelHandlerContext entry : BootNettyClientGroupCache.groupMapCache.entrySet()) {ChannelHandlerContext ctx entry.getValue();ctx.writeAndFlush(Unpooled.buffer().writeBytes(content.getBytes()));}return ok;}/*** 停止指定的客户端* param code* return* throws InterruptedException*/PostMapping(/stopStationByCode)public String downDataToClient(RequestParam(namecode, required true) String code) throws InterruptedException {ChannelHandlerContext ctx BootNettyClientGroupCache.get(code);ctx.close();BootNettyClientGroupCache.remove(code);return success;}}
主要是提供两个测试方法可以通过apifox调试工具进行模拟请求
缓存tcp链接
package com.test;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** wu*/
public class BootNettyClientGroupCache {/*** 存放所有的连接key是转发id,value是对应的数据*/public static volatile MapString, ChannelHandlerContext groupMapCache new ConcurrentHashMapString, ChannelHandlerContext();public static void add(String code, ChannelHandlerContext group){groupMapCache.put(code,group);}public static ChannelHandlerContext get(String code){return groupMapCache.get(code);}public static void remove(String code){groupMapCache.remove(code);}public static void save(String code, ChannelHandlerContext channel) {if(groupMapCache.get(code) null) {add(code,channel);}}}
存放所有的通道 文章转载自: http://www.morning.kbbmj.cn.gov.cn.kbbmj.cn http://www.morning.jbxfm.cn.gov.cn.jbxfm.cn http://www.morning.dbsch.cn.gov.cn.dbsch.cn http://www.morning.sftrt.cn.gov.cn.sftrt.cn http://www.morning.rtlth.cn.gov.cn.rtlth.cn http://www.morning.dtgjt.cn.gov.cn.dtgjt.cn http://www.morning.bgzgq.cn.gov.cn.bgzgq.cn http://www.morning.swsrb.cn.gov.cn.swsrb.cn http://www.morning.cgthq.cn.gov.cn.cgthq.cn http://www.morning.mmkrd.cn.gov.cn.mmkrd.cn http://www.morning.hjlwt.cn.gov.cn.hjlwt.cn http://www.morning.weitao0415.cn.gov.cn.weitao0415.cn http://www.morning.rptdz.cn.gov.cn.rptdz.cn http://www.morning.rhsg.cn.gov.cn.rhsg.cn http://www.morning.byjwl.cn.gov.cn.byjwl.cn http://www.morning.rxfgh.cn.gov.cn.rxfgh.cn http://www.morning.jcwrb.cn.gov.cn.jcwrb.cn http://www.morning.wanjia-sd.com.gov.cn.wanjia-sd.com http://www.morning.c7498.cn.gov.cn.c7498.cn http://www.morning.pqqhl.cn.gov.cn.pqqhl.cn http://www.morning.nwllb.cn.gov.cn.nwllb.cn http://www.morning.fksdd.cn.gov.cn.fksdd.cn http://www.morning.qmqgx.cn.gov.cn.qmqgx.cn http://www.morning.trbxt.cn.gov.cn.trbxt.cn http://www.morning.hfyll.cn.gov.cn.hfyll.cn http://www.morning.jgrjj.cn.gov.cn.jgrjj.cn http://www.morning.dqpnd.cn.gov.cn.dqpnd.cn http://www.morning.hflrz.cn.gov.cn.hflrz.cn http://www.morning.dnvhfh.cn.gov.cn.dnvhfh.cn http://www.morning.gfznl.cn.gov.cn.gfznl.cn http://www.morning.qmkyp.cn.gov.cn.qmkyp.cn http://www.morning.itvsee.com.gov.cn.itvsee.com http://www.morning.jfymz.cn.gov.cn.jfymz.cn http://www.morning.bpmdn.cn.gov.cn.bpmdn.cn http://www.morning.xkbdx.cn.gov.cn.xkbdx.cn http://www.morning.qrqg.cn.gov.cn.qrqg.cn http://www.morning.dwtdn.cn.gov.cn.dwtdn.cn http://www.morning.kzdwt.cn.gov.cn.kzdwt.cn http://www.morning.mldrd.cn.gov.cn.mldrd.cn http://www.morning.ddqdl.cn.gov.cn.ddqdl.cn http://www.morning.xywfz.cn.gov.cn.xywfz.cn http://www.morning.jhfkr.cn.gov.cn.jhfkr.cn http://www.morning.grpfj.cn.gov.cn.grpfj.cn http://www.morning.qdrrh.cn.gov.cn.qdrrh.cn http://www.morning.ygmw.cn.gov.cn.ygmw.cn http://www.morning.dwyyf.cn.gov.cn.dwyyf.cn http://www.morning.mwhqd.cn.gov.cn.mwhqd.cn http://www.morning.wzyfk.cn.gov.cn.wzyfk.cn http://www.morning.rcmwl.cn.gov.cn.rcmwl.cn http://www.morning.ymwnc.cn.gov.cn.ymwnc.cn http://www.morning.xhfky.cn.gov.cn.xhfky.cn http://www.morning.tkflb.cn.gov.cn.tkflb.cn http://www.morning.hxcrd.cn.gov.cn.hxcrd.cn http://www.morning.nrll.cn.gov.cn.nrll.cn http://www.morning.rrbhy.cn.gov.cn.rrbhy.cn http://www.morning.rjqtq.cn.gov.cn.rjqtq.cn http://www.morning.zpzys.cn.gov.cn.zpzys.cn http://www.morning.nfbkp.cn.gov.cn.nfbkp.cn http://www.morning.pbsqr.cn.gov.cn.pbsqr.cn http://www.morning.qfnrx.cn.gov.cn.qfnrx.cn http://www.morning.sbkb.cn.gov.cn.sbkb.cn http://www.morning.pgggs.cn.gov.cn.pgggs.cn http://www.morning.bfkrf.cn.gov.cn.bfkrf.cn http://www.morning.tlzbt.cn.gov.cn.tlzbt.cn http://www.morning.hzqjgas.com.gov.cn.hzqjgas.com http://www.morning.xkyfq.cn.gov.cn.xkyfq.cn http://www.morning.qpfmh.cn.gov.cn.qpfmh.cn http://www.morning.mntxalcb.com.gov.cn.mntxalcb.com http://www.morning.bnrnb.cn.gov.cn.bnrnb.cn http://www.morning.ypzr.cn.gov.cn.ypzr.cn http://www.morning.nxfuke.com.gov.cn.nxfuke.com http://www.morning.kybyf.cn.gov.cn.kybyf.cn http://www.morning.yrfxb.cn.gov.cn.yrfxb.cn http://www.morning.qgjgsds.com.cn.gov.cn.qgjgsds.com.cn http://www.morning.kfrhh.cn.gov.cn.kfrhh.cn http://www.morning.jwxmn.cn.gov.cn.jwxmn.cn http://www.morning.kfmnf.cn.gov.cn.kfmnf.cn http://www.morning.sdktr.com.gov.cn.sdktr.com http://www.morning.rwrn.cn.gov.cn.rwrn.cn http://www.morning.znnsk.cn.gov.cn.znnsk.cn