1. Socket 配置参数中添加
1.1 读取 Socket 字节时的字节序
1.2 读取数据时,单次读取最大缓存值
1.3 从 Socket 读取数据时,遵从的数据包结构协议
1.4 服务器返回数据的最大值,防止客户端内存溢出
/*** @Description: Socket 配置参数*/public class IOSocketOptions {/*** 是否为调试模式,默认为 true*/private static boolean isDebug = true;/*** Socket 主机地址*/private SocketAddress socketAddress;/*** Socket 备用主机地址*/private SocketAddress backupAddress;/*** 写入 Socket 字节时的字节序*/private ByteOrder writeOrder;/*** 读取 Socket 字节时的字节序*/private ByteOrder readOrder;/*** 写数据时单个数据包的最大值 默认 100*/private int maxWriteBytes;/*** 读取数据时,单次读取最大缓存值,默认 50,数值越大效率越高,但是系统消耗也越大*/private int maxReadBytes;/*** 从 Socket 读取数据时,遵从的数据包结构协议,在业务层进行定义*/private IMessageProtocol messageProtocol;/*** 服务器返回数据的最大值 (单位Mb) ,防止客服端内存溢出*/private int maxResponseDataMb;/*** 连接超时时间(单位毫秒)*/private int connectTimeout;/*** 是否重连 Socket,默认为 true*/private boolean isReConnection;/*** Socket 重连管理器*/private AbsReConnection reConnectionManager;/*** Socket 工厂*/private SocketFactory socketFactory;/*** Socket 安全套接字协议相关配置*/private SocketSSLConfig socketSSLConfig;/*** IO 字节流的编码方式,默认 UTF-8*/private Charset charsetName;/*** 静态内部类*/public static class Builder {IOSocketOptions socketOptions;// 首先获取一个默认的配置public Builder() {this(getDefaultOptions());}public Builder(IOSocketOptions defaultOptions) {socketOptions = defaultOptions;}/*** 设置 Socket 主机地址** @param socketAddress* @return*/public Builder setSocketAddress(SocketAddress socketAddress) {socketOptions.socketAddress = socketAddress;return this;}/*** 设置 Socket 备用主机地址** @param backupAddress* @return*/public Builder setBackupAddress(SocketAddress backupAddress) {socketOptions.backupAddress = backupAddress;return this;}/*** 设置服务器返回数据的允许的最大值,单位兆/Mb** @param maxResponseDataMb* @return*/public Builder setMaxResponseDataMb(int maxResponseDataMb) {socketOptions.maxResponseDataMb = maxResponseDataMb;return this;}/*** 设置连接超时时间(单位毫秒)** @param connectTimeout* @return*/public Builder setConnectTimeout(int connectTimeout) {socketOptions.connectTimeout = connectTimeout;return this;}/*** 是否重连 Socket** @param reConnection* @return*/public Builder setReConnection(boolean reConnection) {socketOptions.isReConnection = reConnection;return this;}/*** 设置 Socket 重连管理器** @param reConnectionManager* @return*/public Builder setReConnectionManager(AbsReConnection reConnectionManager) {socketOptions.reConnectionManager = reConnectionManager;return this;}/*** 自定义创建 socket 工厂** @param socketFactory*/public Builder setSocketFactory(SocketFactory socketFactory) {socketOptions.socketFactory = socketFactory;return this;}/*** 安全套接字协议的配置** @param socketSSLConfig* @return*/public Builder setSocketSSLConfig(SocketSSLConfig socketSSLConfig) {socketOptions.socketSSLConfig = socketSSLConfig;return this;}/*** 设置 Socket 写字节时的字节序** @param writeOrder*/public Builder setWriteOrder(ByteOrder writeOrder) {socketOptions.writeOrder = writeOrder;return this;}/*** 设置 Socket 读取字节时的字节序** @param readOrder* @return*/public Builder setReadOrder(ByteOrder readOrder) {socketOptions.readOrder = readOrder;return this;}/*** 设置写数据时,单个数据包的最大值** @param maxWriteBytes* @return*/public Builder setMaxWriteBytes(int maxWriteBytes) {socketOptions.maxWriteBytes = maxWriteBytes;return this;}/*** 设置读取数据时,单次读取最大缓存值** @param maxReadBytes* @return*/public Builder setMaxReadBytes(int maxReadBytes) {socketOptions.maxReadBytes = maxReadBytes;return this;}/*** 设置读取数据的数据结构协议** @param messageProtocol*/public Builder setMessageProtocol(IMessageProtocol messageProtocol) {socketOptions.messageProtocol = messageProtocol;return this;}/*** 设置 IO 字节流的编码方式,默认 UTF-8** @param charsetName* @return*/public Builder setCharsetName(Charset charsetName) {socketOptions.charsetName = charsetName;return this;}public IOSocketOptions build() {return socketOptions;}}/*** 获取默认的配置*/public static IOSocketOptions getDefaultOptions() {IOSocketOptions options = new IOSocketOptions();options.socketAddress = null;options.backupAddress = null;options.isReConnection = true;// 是否重连主机,默认为 trueoptions.maxWriteBytes = 100;options.maxReadBytes = 50;options.writeOrder = ByteOrder.BIG_ENDIAN;// 大端序options.readOrder = ByteOrder.BIG_ENDIAN;// 大端序options.messageProtocol = null;// 默认数据包结构协议为 nulloptions.maxResponseDataMb = 5;// 默认接收最大值 5MBoptions.reConnectionManager = new DefaultReConnection(); // 默认 Socket 重连器options.connectTimeout = 10 * 1000; // 连接超时默认 5 秒options.socketFactory = null;options.socketSSLConfig = null;options.charsetName = StandardCharsets.UTF_8;return options;}public Charset getCharsetName() {return charsetName;}public boolean isIsDebug() {return isDebug;}public SocketAddress getSocketAddress() {return socketAddress;}public SocketAddress getBackupAddress() {return backupAddress;}public ByteOrder getWriteOrder() {return writeOrder;}public int getMaxWriteBytes() {return maxWriteBytes;}public ByteOrder getReadOrder() {return readOrder;}public int getMaxReadBytes() {return maxReadBytes;}public IMessageProtocol getMessageProtocol() {return messageProtocol;}public int getMaxResponseDataMb() {return maxResponseDataMb;}public int getConnectTimeout() {return connectTimeout;}public boolean isReConnection() {return isReConnection;}public AbsReConnection getReConnectionManager() {return reConnectionManager;}public SocketFactory getSocketFactory() {return socketFactory;}public SocketSSLConfig getSocketSSLConfig() {return socketSSLConfig;}public static void setIsDebug(boolean isDebug) {IOSocketOptions.isDebug = isDebug;}}
2. 定义读取到的数据实体类,OriginReadData.java
/*** @Description: 读取到的数据*/public class OriginReadData implements Serializable {/*** 包头数据*/private byte[] headerData;/*** 包体数据*/private byte[] bodyData;public byte[] getHeaderData() {return headerData;}public void setHeaderData(byte[] headerData) {this.headerData = headerData;}public byte[] getBodyData() {return bodyData;}public void setBodyData(byte[] bodyData) {this.bodyData = bodyData;}/*** 获取数据体 body 的字符串** @return*/public String getBodyString() {//IOSocketOptions.getCharsetNamereturn new String(getBodyData(), StandardCharsets.UTF_8);}/*** 获取完整的数据,包括包头和包体** @return*/public byte[] getOriginDataBytes() {return Util.concatBytes(getHeaderData(), getBodyData());}}
3. 创建接口
3.1 定义消息数据格式,IMessageProtocol.java
/*** @Description: 消息数据格式*/public interface IMessageProtocol {/*** 获取包头的长度** @return*/int getHeaderLength();/*** 获取数据包体的长度,根据协议包体长度应该写在包头中,在读取数据时用到** @param header* @param byteOrder* @return*/int getBodyLength(byte[] header, ByteOrder byteOrder);/*** 根据协议打包消息格式,返回 byte 数组* @param sender* @return*/byte[] pack(byte[] sender);}
3.2 添加 Socket 读取数据行为,IOAction.java
/*** @Description: Socket 读取数据行为*/public interface IOAction {/*** 收到消息的响应*/String ACTION_READ_COMPLETE = "action_read_complete";}
3.3 添加 Socket 行为监听接口,ISocketActionListener.java
/*** @Description: Socket 行为监听接口*/public interface ISocketActionListener {/*** Socket 连接成功** @param socketAddress 主机地址*/void onSocketConnectSuccess(SocketAddress socketAddress);/*** Socket 连接失败** @param socketAddress 主机地址* @param isNeedReconnect 是否需要重新连接*/void onSocketConnectFail(SocketAddress socketAddress, boolean isNeedReconnect);/*** 断开 Socket 连接** @param socketAddress 主机地址* @param isNeedReconnect 是否需要重新连接*/void onSocketDisconnect(SocketAddress socketAddress, boolean isNeedReconnect);/*** Socket 数据响应** @param socketAddress 主机地址* @param originReadData 读取到的数据为类格式*/void onSocketResponse(SocketAddress socketAddress, OriginReadData originReadData);/*** Socket 数据响应** @param socketAddress 主机地址* @param readData 读取到的数据为 String 类型*/void onSocketResponse(SocketAddress socketAddress, String readData);/*** Socket 数据响应** @param socketAddress 主机地址* @param readData 读取到的数据为 byte 数组类型*/void onSocketResponse(SocketAddress socketAddress, byte[] readData);}
4. 添加异常类
4.1 可恢复 Socket 读取数据异常,ReadRecoverableException.java
/*** @Description: 可恢复 Socket 读取数据异常*/public class ReadRecoverableException extends Exception {public ReadRecoverableException(String message) {super(message);}}
4.2 不可修复的读取错误异常,ReadUnrecoverableException.java
/*** @Description: 不可修复的读取错误*/public class ReadUnrecoverableException extends Exception {public ReadUnrecoverableException(String message) {super(message);}}
5. 实现 Socket 读取数据
5.1 定义 Socket 读取数据接口 IReader.java
/*** @Description: Socket 读数据接口*/public interface IReader<T> {/*** 打开数据的读取*/void openReader();/*** 读数据*/void read() throws Exception;/*** 关闭数据的读取*/void closeReader();/*** 设置参数** @param t*/void setOption(T t);}
5.2 实现 Socket 读取数据接口
/*** @Description: Socket 读数据*/public class IOReader implements IReader<IOSocketOptions> {/*** 输入流*/private InputStream inputStream;/*** 读取原始数据的缓存空间*/private ByteBuffer originBuffer;/*** Socket 连接管理器*/private IConnectionManager connectionManager;/*** Socket 行为分发器*/private ISocketActionDispatch actionDispatch;/*** Socket 配置参数*/private IOSocketOptions socketOptions;/*** 读数据时,余留数据的缓存*/private ByteBuffer remainingBuffer;/*** 读数据线程*/private Thread readerThread;/*** 是否停止线程*/private boolean stopThread;public IOReader(IConnectionManager connectionManager, ISocketActionDispatch actionDispatch) {this.connectionManager = connectionManager;this.socketOptions = connectionManager.getOptions();this.actionDispatch = actionDispatch;}@Overridepublic void openReader() {init();if (readerThread == null || !readerThread.isAlive()) {readerThread = new Thread(readerTask, "reader thread");stopThread = false;readerThread.start();}}//初始化private void init() {// 获取输入流inputStream = connectionManager.getInputStream();// 没有定义消息协议if (socketOptions != null && socketOptions.getMessageProtocol() == null) {originBuffer = ByteBuffer.allocate(1024 * 4);}}/*** 读取数据任务*/private Runnable readerTask = new Runnable() {@Overridepublic void run() {while (!stopThread) {try {read();} catch (Exception e) {LogUtil.e("读取数据异常.");e.printStackTrace();// 停止线程stopThread = true;release();}}}};@Overridepublic void read() throws IOException, ReadRecoverableException, ReadUnrecoverableException {OriginReadData originData = new OriginReadData();// 读取数据时,数据包结构协议IMessageProtocol messageProtocol = this.socketOptions.getMessageProtocol();// 消息协议为 null,则直接读取原始消息,不建议这样使用,因为会发生分包,黏包的问题if (messageProtocol == null) {readOriginDataFromStream(originData);return;}// 定义了消息协议体// 包头的长度int headerLength = messageProtocol.getHeaderLength();// 包头数据的 bufferByteBuffer headerBuffer = ByteBuffer.allocate(headerLength);// 设置读取数据时,byte 的字节序headerBuffer.order(socketOptions.getReadOrder());// 1.读取 Headerif (remainingBuffer != null) { // 有余留数据// flip 方法: 一般从 ByteBuffer 读取数据前调用,将 limit 设置为当前 position,将 position 设置为 0,在读取数据时,limit 代表可读取数据的有效长度remainingBuffer.flip();// 读取余留数据的长度int length = Math.min(remainingBuffer.remaining(), headerLength);// 读取余留的数据headerBuffer.put(remainingBuffer.array(), 0, length);if (length < headerLength) { // 余留数据小于一个 Header// 剩余的数据置 nullremainingBuffer = null;// 从 stream 中读取剩余的 header 数据readHeaderFromSteam(headerBuffer, headerLength - length);} else {// 移动开始读取数据的指针remainingBuffer.position(headerLength);}} else {// 无余留数据// 从 Stream 读取一个完整的 header,capacity: 容量readHeaderFromSteam(headerBuffer, headerBuffer.capacity());}// 保存 headeroriginData.setHeaderData(headerBuffer.array());// 2.读取 bodyint bodyLength = messageProtocol.getBodyLength(originData.getHeaderData(), socketOptions.getReadOrder());if (bodyLength > 0) {if (bodyLength > socketOptions.getMaxResponseDataMb() * 1024 * 1024) {throw new ReadUnrecoverableException("服务器返回的单次数据超过了规定的最大值,可能你的 Socket 消息协议不对,一般消息格式为: " + "Header+Body,其中 Header 保存消息长度和类型等,Body 保存消息内容,请规范好你的协议.");}// 分配包体缓存空间ByteBuffer bodyBuffer = ByteBuffer.allocate(bodyLength);bodyBuffer.order(socketOptions.getReadOrder());// 有余留数据if (remainingBuffer != null) {// 余留数据缓存 包体取数据的起始点int bodyStartPosition = remainingBuffer.position();// 获取余留数据长度 与 包体长度 中 取小的值int length = Math.min(remainingBuffer.remaining(), bodyLength);// 读 length 大小的余留数据bodyBuffer.put(remainingBuffer.array(), bodyStartPosition, length);// 余留数据缓存移动 positionremainingBuffer.position(bodyStartPosition + length);//读取的余留数据刚好等于一个 bodyLengthif (length == bodyLength) {if (remainingBuffer.remaining() > 0) {// 创建临时数据缓存ByteBuffer temp = ByteBuffer.allocate(remainingBuffer.remaining());// 设置字节序temp.order(socketOptions.getReadOrder());// 添加余留数据剩下的数据temp.put(remainingBuffer.array(), remainingBuffer.position(), remainingBuffer.remaining());// 重新赋值余留数据缓存remainingBuffer = temp;} else {// 余留数据缓存已取完,则当前余留数据缓存置为 nullremainingBuffer = null;}// 保存 bodyoriginData.setBodyData(bodyBuffer.array());LogUtil.i("Socket 收到数据: " + originData.getBodyString());// 分发数据actionDispatch.dispatchAction(IOAction.ACTION_READ_COMPLETE, originData);// 读取结束return;} else {//余留数据缓存已取完,当前余留数据缓存置为 nullremainingBuffer = null;}}// 无余留或者有部分余留数据,则从 Stream 中读取readBodyFromStream(bodyBuffer);// body 保存到 originData 中originData.setBodyData(bodyBuffer.array());} else if (bodyLength == 0) { // 没有 body 数据originData.setBodyData(new byte[0]);if (remainingBuffer != null) { //有余留数据// 处理 body 包体时,判断是否有余留数据if (remainingBuffer.hasRemaining()) {//有余留数据// 创建临时数据缓存ByteBuffer temp = ByteBuffer.allocate(remainingBuffer.remaining());// 设置字节序temp.order(socketOptions.getReadOrder());// 添加余留数据剩下的数据temp.put(remainingBuffer.array(), remainingBuffer.position(), remainingBuffer.remaining());// 重新赋值余留数据缓存remainingBuffer = temp;} else { //余留数据缓存已取完,则当前余留数据缓存置为 nullremainingBuffer = null;}}} else {//if (bodyLength < 0)throw new ReadUnrecoverableException("数据 body 的长度不能小于0");}LogUtil.i("Socket 收到数据: " + originData.getBodyString());//分发数据actionDispatch.dispatchAction(IOAction.ACTION_READ_COMPLETE, originData);}/*** 读取包头的数据** @param headerBuffer 包头的数据缓存* @param readLength 包头的长度* @throws IOException* @throws ReadRecoverableException*/private void readHeaderFromSteam(ByteBuffer headerBuffer, int readLength) throws IOException, ReadRecoverableException {for (int i = 0; i < readLength; i++) {byte[] bytes = new byte[1];// 从输入流中读取数据,无数据时会阻塞int value = inputStream.read(bytes);// -1: 在读取文件中,表示读到了末尾,在 Socket 中表示一般因为服务器断开了连接if (value <= -1) {throw new ReadRecoverableException("读取数据失败,可能是因为 Socket 跟服务器断开了连接");}//存放包头数据headerBuffer.put(bytes);}}/*** 从 Stream 中读取数据,填满 byteBuffer,如果读取有多,则缓存在 remainingBuffer 中** @param byteBuffer 包体的数据缓存* @throws IOException* @throws ReadRecoverableException*/private void readBodyFromStream(ByteBuffer byteBuffer) throws IOException, ReadRecoverableException {// while 循环直到 byteBuffer 装满数据while (byteBuffer.hasRemaining()) {//从服务器单次读取的最大值byte[] bufferArray = new byte[socketOptions.getMaxReadBytes()];int len = inputStream.read(bufferArray);if (len <= -1) {throw new ReadRecoverableException("读取数据失败,可能是因为 Socket 跟服务器断开了连接");}//缓存中剩余多少数据int remaining = byteBuffer.remaining();// 从 Stream 读到的数据长度 大于 byteBuffer 的剩余空间时if (len > remaining) {byteBuffer.put(bufferArray, 0, remaining);// 将多余的数据保存到 remainingBuffer 中缓存,等下一次再读取remainingBuffer = ByteBuffer.allocate(len - remaining);remainingBuffer.order(socketOptions.getReadOrder());remainingBuffer.put(bufferArray, remaining, len - remaining);} else {// 从 stream 读取的数据小于或者等于 byteBuffer 的剩余空间时byteBuffer.put(bufferArray, 0, len);}}}/*** 当消息协议为 null 时,直接返回原始数据** @param readData* @throws IOException* @throws ReadRecoverableException*/private void readOriginDataFromStream(OriginReadData readData) throws IOException, ReadRecoverableException {//用全局 originBuffer 避免重复创建字节数组int len = inputStream.read(originBuffer.array());// no more dataif (len <= -1) {throw new ReadRecoverableException("读取数据失败,可能 Socket 跟服务器断开了连接");}//bytes 复制byte[] data = new byte[len];originBuffer.get(data, 0, len);readData.setBodyData(data);// Charset.forName("GBK")LogUtil.i("Socket 收到数据: " + readData.getBodyString());// 分发数据actionDispatch.dispatchAction(IOAction.ACTION_READ_COMPLETE, readData);// 相当于把指针重新指向 position = 0originBuffer.clear();}@Overridepublic void setOption(IOSocketOptions socketOptions) {this.socketOptions = socketOptions;}@Overridepublic void closeReader() {try {// 关闭线程,释放资源shutDownThread();release();} catch (InterruptedException e) {e.printStackTrace();}}/*** 关闭读取数据线程*/private void shutDownThread() throws InterruptedException {if (readerThread != null && readerThread.isAlive() && !readerThread.isInterrupted()) {stopThread = true;// 中断线程readerThread.interrupt();readerThread.join();LogUtil.i("shutDownThread");}}/*** 释放资源*/private void release() {if (originBuffer != null) {originBuffer = null;}if (remainingBuffer != null) {remainingBuffer = null;}if (readerThread != null && !readerThread.isAlive()) {readerThread = null;}try {if (inputStream != null) {inputStream.close();}} catch (IOException e) {e.printStackTrace();} finally {inputStream = null;}LogUtil.i("release");}}
6. 测试方法
//测试按钮private void initView(View view) {SocketAddress socketAddress = new SocketAddress("192.168.1.52", 6688);TcpConnection tcpConnection = new TcpConnection(socketAddress);IOSocketOptions socketOptions = new IOSocketOptions.Builder().setSocketAddress(socketAddress).build();tcpConnection.setOptions(socketOptions);IOReader reader = new IOReader(tcpConnection, null);view.findViewById(R.id.but_connect).setOnClickListener(v -> tcpConnection.connect());view.findViewById(R.id.but_start).setOnClickListener(v -> reader.openReader());view.findViewById(R.id.but_stop).setOnClickListener(v -> reader.closeReader());}