网站如何做京东联盟,网站制作方案书,手机建站程序,常用的网站有多种类型目录
6.Reactor模型
6.1 单线程Reactor
6.2 主从多线程Reactor (主---Boss | 从---Worker | 一主多从机制)
7.扩展与补充
8.Reactor模型的实现
8.1 多线程Reactor模型的实现#xff08;一个Boss线程#xff0c;一个Worker线程#xff09;
8.2 多线程Reactor模…目录
6.Reactor模型
6.1 单线程Reactor
6.2 主从多线程Reactor (主---Boss | 从---Worker | 一主多从机制)
7.扩展与补充
8.Reactor模型的实现
8.1 多线程Reactor模型的实现一个Boss线程一个Worker线程
8.2 多线程Reactor模型的实现一个Boss线程多个Worker线程
9.零拷贝
10.硬件驱动程序与软件驱动程序数据库驱动程序的关系
11.linux内核与centosredhat等这些产品有啥区别和关系 6.Reactor模型
6.1 单线程Reactor 6.2 主从多线程Reactor (主---Boss | 从---Worker | 一主多从机制)
但是单线程版的Reactor一定存在性能瓶颈的你想想一个线程做了所有的连接读写的操作肯定存在极大的性能瓶颈。
所以后续引入了主从多线程Reactor模式
1.Boss主线程负责监听并且处理客户端的连接事件。它具有一个独立的Selector监控器所有的客户端都会注册存储到该Selector上这样才能完成监听连接事件的操作。
2.当Boss主线程监听并处理完客户端的连接事件后它会把这个客户端交给Worker从线程进行处理从线程肯定会先把该客户端像主线程一样先注册存储到当前从线程所对应的Selector上。从线程会进行监听与处理该客户端后续可能会进行的读或写的事件。从线程可能会有多个每一个从线程都具有一个独立的Selector。
补充
1.Boss主线程与Worker从线程们之间一定是建立了联系的这样主线程才能把客户端交给从线程。每一个线程都具有一个独立的Selector
2.分配多少个线程给Boss或Worker分配一个线程给Boss剩余(Cpu核数-1)个线程分配给Worker从线程。
先说为什么一共分配CPU核数的线程为了避免频繁的线程上下文切换为了实现真正的并行。后续肯定还可以引入线程池的优化
为什么Worker线程分配的多因为客户端连接就一次后续Boss把客户端甩手给Worker后客户端的读写事件就不再让Boss去处理了读写事件可是远比连接事件多的
如图所示 7.扩展与补充
扩展
主从架构与主备架构的区别
主从架构
主节点也干活从节点也干活。只是二者干的活不一样主节点一般做写操作(主要的工作)从节点一般做次要的工作 如 读操作。缺点主节点一旦宕机就废掉了。
主备架构
主节点会同步数据到另外一共备用主节点当主节点宕机时会切换到另外一台时刻同步数据的备用主节点。解决了主从架构的缺点。
主备架构MySQL的双主架构Redis的哨兵集群
如何实现主备架构中主节点同步数据到另外一台备用主节点呢
使用zookeeper集群这一CP原理架构遵循CP的是强一致性的架构即当zookeeper集群节点出现宕机时需要重新选举节点作为主节点时拒绝对外提供服务此时就保证了强一致性。但是降低了性能这就是CAP原则不可以同时满足C A P。
当然zookeeper可以替换为consuletcd 反向代理与正向代理
反向代理时针对于服务器端
1.做负载均衡
2.URL重写 面对写入的URL重写
3.动静分离 新的问题当客户端连接这个服务器集群时正在连接的服务器宕机了你怎么做到客户端无感的客户端socket连接服务器时需要通过服务器的ip连接如果当前这台服务器节点宕机你不是要更新此时连接的ip这怎么可能做到无感啊但通过以下技术就是可以做到无感。。。
方法1VIP虚拟IP
其实都不需要更新ip在过去传统的开发中使用的是Virtual IP 虚拟IP。客户端访问的是虚拟IP对应的服务虚拟IP这个服务再找到具体的节点服务器资源虚拟IP服务器肯定会涉及到分发保活注册服务器节点等但是这一切对于客户端来说是无感知的它是不知道服务器节点的切换的。
这些都可以通过早期的一些技术完成如lvskeepavlid。
方法2IP飘逸
当发现节点服务器坏了那么偷偷的把这台服务器节点ip给复制给另外一台服务器节点这就是ip飘逸这样也做到了客户端无感知
解决技术HAproxy
方法3云原生OPS自动化运维
这些技术实际上都是运维的技术现如今微服务为什么说是云原生的一个分支因为云原生主张OPS自动化运维就是要取代运维
抛开上述三种运维方式仅仅是站在java层面去开发一个程序去实现这一客户端无感
其实思路很简单就是做一个redis缓存缓存的是ip-主机节点的ip映射集合。
然后在client与server端分别开一个agentagent之间做网络通信然后agent再与client或server端内部做交互传递数据。 8.Reactor模型的实现
8.1 多线程Reactor模型的实现一个Boss线程一个Worker线程
Reactor模型多个客户端连接同一服务端。在服务端做了Reactor模型的编程逻辑。
通过一个Boss线程(可以有多个看具体业务)处理多个NIO的接收连接的操作也就是ServerSocketChannel的ACCEPT事件
通过一个worker线程可以有多个看具体业务处理读写的操作也就是SocketChannel的READ或WRITE事件。
重点你处理的read或write事件一定是前面对应这次连接的boss线程的连接得来的你不能前后没联系。是一种传递的关系。但是每一个线程无论是boss还是worker都具有独立的Selector监管器
第一版本代码
package com.messi.netty_basic_01.Reactor_prac;import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;public class ReactorBossServer2 {public static void main(String[] args) throws Exception{ServerSocketChannel serverSocketChannel ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8000));Selector selector Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while (true) {selector.select();IteratorSelectionKey iterator selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key iterator.next();iterator.remove();if (key.isAcceptable()) {ServerSocketChannel channel (ServerSocketChannel) key.channel();SocketChannel socketChannel channel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector,SelectionKey.OP_READ);//TODO 调用Worker线程去处理读写事件~~~~}}}}
}对于TODO位置我们要进行调用worker线程去进行处理读写事件。但是具体要开启多少个worker线程呢是过来一个客户端连接就需要开启一个worker线程去处理该连接对应的读写事件吗
当然不是在之前我们使用一个线程处理所有注册在复用器上的channel对象对应的所有事件这里我们使用Boss线程处理连接事件。worker线程处理读写事件但是也没必要一个线程对应一个客户端连接的后续读写事件啊如果线程过大内存占用过大线程之间切换也会更加频繁性能很低的。
下面暂且把worker线程设为开启一个那么我们需要把创建Worker的代码设为公共的而不能每过来一个客户端连接后就进行创建一份新的Worker。具体见下面代码
第二版代码
package com.messi.netty_basic_01.Reactor_prac;import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;public class ReactorBossServer2 {public static void main(String[] args) throws Exception{ServerSocketChannel serverSocketChannel ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8000));Selector selector Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//放在外面做一个共享变量Worker2 worker2 new Worker2();while (true) {selector.select();IteratorSelectionKey iterator selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key iterator.next();iterator.remove();if (key.isAcceptable()) {ServerSocketChannel channel (ServerSocketChannel) key.channel();SocketChannel socketChannel channel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector,SelectionKey.OP_READ);//TODO 调用Worker线程去处理读写事件~~~~worker2.register(socketChannel);}}}}
}第三版代码
package com.messi.netty_basic_01.Reactor_prac;import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;public class ReactorBossServer2 {public static void main(String[] args) throws Exception{ServerSocketChannel serverSocketChannel ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8000));Selector selector Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//放在外面做一个共享变量Worker2 worker2 new Worker2(worker-01);while (true) {selector.select();IteratorSelectionKey iterator selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key iterator.next();iterator.remove();if (key.isAcceptable()) {ServerSocketChannel channel (ServerSocketChannel) key.channel();SocketChannel socketChannel channel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector,SelectionKey.OP_READ);//TODO 调用Worker线程去处理读写事件~~~~worker2.register(socketChannel);}}}}
}
package com.messi.netty_basic_01.Reactor_prac;import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;public class Worker2 implements Runnable{private String name;private Thread thread ;private Selector selector;private volatile boolean isCreated;public Worker2(String name) {this.name name;}public void register(SocketChannel socketChannel) {if (!isCreated) {this.thread new Thread(this,name);thread.start();selector Selector.open();isCreated true;}}Overridepublic void run() {}
}
为什么Worker2线程类构造方法就一个参数name
对于其他属性threadselector的创建完全是可以在Worker2内部完成的。woker2线程本身就是Boss线程调用的Boss线程只初始化worker线程的name这样解耦合降低代码的侵入性。你想想并发场景下在Boss线程创建多个Thread或Selector这样是不是Boss线程端代码太难看造成代码的极大污染。
为什么把threadselector的初始化创建放在register方法中
其实也是可以放在构造方法中的。
为什么要使用isCreated标记位
保证thread线程创建和启动只进行一次使用标志位isCreated的方式在register中进行初始化。使用标记位isCreated一定要记住isCreated在完成一次初始化后要置为true如果不使用isCreatedboss线程在触发连接事件后调用worker的register方法。但此时还是main线程在执行register方法。那么boss线程触发一次客户端连接事件就会对应创建一个新线程来处理该客户端连接对应后续的读写事件。
补充
在多线程并发共享isCreated这个变量时要加上volatile关键字volatile可以保证当其中一个线程修改isCreated的值的时候会及时通知告知其他线程isCreated的最新值为xxx。保证了isCreated的可见性因为标记位通常做if判断对可见性要求高。
但此时boss为单线程并且由于是迭代器一个个遍历触发的事件的所以同一时刻只有一个boss线程调用register方法所以isCreated并不存在多线程共享的并发安全问题。养成良好习惯说不准后续boss变为多线程了。。。 worker2.register(socketChannel)接着调用 为什么出现空指针异常 第四版代码
package com.messi.netty_basic_01.Reactor_prac;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;public class ReactorBossServer2 {private static final Logger log LoggerFactory.getLogger(ReactorBossServer2.class);public static void main(String[] args) throws Exception{log.debug(boss线程开启 );ServerSocketChannel serverSocketChannel ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8000));Selector selector Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//放在外面做一个共享变量Worker2 worker2 new Worker2(worker-01);while (true) {selector.select();IteratorSelectionKey iterator selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key iterator.next();iterator.remove();if (key.isAcceptable()) {ServerSocketChannel channel (ServerSocketChannel) key.channel();SocketChannel socketChannel channel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector,SelectionKey.OP_READ);//TODO 调用Worker线程去处理读写事件~~~~log.debug(boss线程即将要调用Worker2类的register方法 );worker2.register(socketChannel);log.debug(boss线程完成调用Worker2类的register方法 );}}}}
}package com.messi.netty_basic_01.Reactor_prac;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;public class Worker2 implements Runnable{private static final Logger log LoggerFactory.getLogger(Worker2.class);private String name;private Thread thread ;private Selector selector;private volatile boolean isCreated;public Worker2(String name) {this.name name;}public void register(SocketChannel socketChannel) throws Exception{if (!isCreated) {selector Selector.open();this.thread new Thread(this,name);thread.start();isCreated true;}log.debug(boss线程将要执行socketChannel#register方法 );socketChannel.register(selector, SelectionKey.OP_READ);}Overridepublic void run() {log.debug(开启一个新线程worker-01 );while (true) {try {selector.select();IteratorSelectionKey iterator selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key iterator.next();iterator.remove();if (key.isReadable()) {SocketChannel channel (SocketChannel) key.channel();ByteBuffer buffer ByteBuffer.allocate(30);int read channel.read(buffer);if (read -1) {log.debug(客户端断开 );key.cancel();} else {buffer.flip();CharBuffer decode Charset.forName(UTF-8).decode(buffer);log.info(worker-01线程读取的结果为{},decode.toString());}}}} catch (IOException e) {e.printStackTrace();}}}
}package com.messi.netty_basic_01.Reactor_prac;import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;public class MyClient2 {public static void main(String[] args) throws Exception{SocketChannel socketChannel SocketChannel.open();socketChannel.connect(new InetSocketAddress(8000));socketChannel.write(Charset.forName(UTF-8).encode(梅西));System.out.println(MyClient2.main);}}测试 但是代码在多线程场景下存在bug因为多线程环境下一切皆有可能如果新开启的线程执行的run方法(selector.select())在register方法之前执行。由于selector阻塞等待导致register注册不上。最终就一直阻塞等待着就死掉了。。。。。下面详细分析一下
补充一点
为什么创建一个线程后thread.start()不会立即执行
因为真正创建出一个线程并且切换都是需要一定时间的最重要的是当前main线程的时间片可能还没使用完所以CPU是不会轻易切换线程的但是在多线程环境下一切是皆有可能可能start后就切换到该新线程了。但是当main线程时间片耗尽后就一定会切换到新创建的线程的即调用run方法中的业务逻辑。通常main线程时间片耗尽的原因是因为执行的业务线复杂导致时间片耗尽这里我们可以模拟一下Thread.sleep(2000)休眠2s此时main线程时间片耗尽然后就完成了线程切换线程切换到新创建的线程并且执行run方法
但就是因为先执行了run方法后执行了register方法代码就出现了bug
出现bug的原因症结就是因为run方法(selector.select())在register方法之前执行。由于selector阻塞等待导致register注册不上。但是在多线程环境下一切皆有可能所以你无法保证run方法(selector.select())一定在register方法之后执行说白了就是因为register方法和run方法(select()方法调用)二者不是在同一线程中你无法百分百做到控制方法的执行顺序。
演示一下这个bug
睡眠模拟bug》》》》 断点设为Thread级别当前线程挂起时只挂自己。如果设为ALL那么当前线程挂起时会挂起所有的线程 debug测试 切换到main线程往下执行发现也阻塞了阻塞在register这句代码 分析如下 解决方法1
搞一个同步锁把selector.wakeup()和register方法的调用搞一块 为什么要搞一个同步锁为何多此一举呢
搞同步锁为的就是保证wakeup唤醒和register注册之间没有其他情况的发生不然不可能保证有意外的发生如下 但是同步锁依旧存在很大问题其实问题就是在于性能低下。所以后续我们模仿Netty做一个队列把要执行的代码传递给新开启的线程中让selector.select()和register方法在同一个线程中执行调用并且加一个wakeup做兜底操作就完美的解决了这个问题。其实就是为了保证在selector.select阻塞selector之前完成register注册READ事件的操作如果selector因为多线程环境下的因素导致在register方法之前阻塞(一切皆有可能)所以使用wakeup方法做兜底wakeup像做了一个标记无论在wakeup前还是后做阻塞了都可以唤醒一次
解决方法2
所以最终解决方案模仿Netty底层做一个并发队列把调用register的代码当作一个任务存储到该并发队列中然后在run方法对应的线程中再把该队列中存储的register调用代码取出来这样就保证了register方法和selector.select()在同一个线程中调用。那么你就可以控制先后顺序了。最后再补充一个selector.wakeup()的唤醒作为兜底操作。当selector.select()再次阻塞时直接唤醒然后就可以接着在同一线程中执行register方法啦。
具体如下
ConcurrentLinkedQueue队列可以在两个线程之间进行传递一些代码功能知道这些就够了。 测试 多线程环境下一切皆有可能因为你也不知道走到哪一句代码CPU就调度执行其他线程了所以要考虑更多的情况然后保证并发安全。
第五版代码最终代码
package com.messi.netty_basic_01.Reactor_prac;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;public class ReactorBossServer2 {private static final Logger log LoggerFactory.getLogger(ReactorBossServer2.class);public static void main(String[] args) throws Exception{log.debug(boss线程开启 );ServerSocketChannel serverSocketChannel ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8000));Selector selector Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//放在外面做一个共享变量Worker2 worker2 new Worker2(worker-01);while (true) {selector.select();IteratorSelectionKey iterator selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key iterator.next();iterator.remove();if (key.isAcceptable()) {ServerSocketChannel channel (ServerSocketChannel) key.channel();SocketChannel socketChannel channel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector,SelectionKey.OP_READ);//TODO 调用Worker线程去处理读写事件~~~~log.debug(boss线程即将要调用Worker2类的register方法 );worker2.register(socketChannel);log.debug(boss线程完成调用Worker2类的register方法 );}}}}
}
package com.messi.netty_basic_01.Reactor_prac;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;public class Worker2 implements Runnable{private static final Logger log LoggerFactory.getLogger(Worker2.class);private String name;private Thread thread ;private Selector selector;private volatile boolean isCreated;public Worker2(String name) {this.name name;}private ConcurrentLinkedQueueRunnable queue new ConcurrentLinkedQueue();public void register(SocketChannel socketChannel) throws Exception{if (!isCreated) {selector Selector.open();this.thread new Thread(this,name);thread.start();isCreated true;}log.debug(boss线程将要执行socketChannel#register方法 );//使用睡眠的方式模拟出在register方法执行前执行了繁重的业务代码逻辑Thread.sleep(2000);log.debug(main线程睡眠 );
// synchronized (this) {
// selector.wakeup();
// socketChannel.register(selector, SelectionKey.OP_READ);
// }queue.add(()-{try {socketChannel.register(selector,SelectionKey.OP_READ);} catch (ClosedChannelException e) {throw new RuntimeException(e);}});selector.wakeup();}Overridepublic void run() {while (true) {log.debug(开启一个新线程worker-01 );try {selector.select();Runnable poll queue.poll();if (poll ! null) {poll.run();}IteratorSelectionKey iterator selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey key iterator.next();iterator.remove();if (key.isReadable()) {SocketChannel channel (SocketChannel) key.channel();ByteBuffer buffer ByteBuffer.allocate(30);int read channel.read(buffer);if (read -1) {log.debug(客户端断开 );key.cancel();} else {buffer.flip();CharBuffer decode Charset.forName(UTF-8).decode(buffer);log.info(worker-01线程读取的结果为{},decode.toString());}}}} catch (IOException e) {e.printStackTrace();}}}
}
package com.messi.netty_basic_01.Reactor_prac;import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;public class MyClient2 {public static void main(String[] args) throws Exception{SocketChannel socketChannel SocketChannel.open();socketChannel.connect(new InetSocketAddress(8000));socketChannel.write(Charset.forName(UTF-8).encode(梅西));System.out.println(MyClient2.main);}}
8.2 多线程Reactor模型的实现一个Boss线程多个Worker线程
相比之前的代码我们要做的就是增加Worker线程的数量 然后随机选举出一个Worker线程进行register 代码
package com.messi.netty_basic_01.reactor;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;public class ReactorBossServer {private static final Logger log LoggerFactory.getLogger(ReactorBossServer.class);public static void main(String[] args) throws Exception{log.debug(boss线程开启 );//建立ServerSocketChannelServerSocketChannel ssc ServerSocketChannel.open();//设置为非阻塞ssc.configureBlocking(false);//监听端口8000ssc.bind(new InetSocketAddress(8000));//创建Selector监管者Selector selector Selector.open();//把ServerSocketChannel注册到Selector监管者上的keys这一HashSet中对应注册事件为ACCEPTssc.register(selector, SelectionKey.OP_ACCEPT);// Worker worker new Worker(worker-01);//模拟多线程的情况一个Boss线程多个Worker线程。在实际开发中要使用线程池Worker[] workers new Worker[2];//模拟出多个Worker线程for (int i 0; i workers.length; i) {workers[i] new Worker(worker - i);}//原子操作的变量AtomicInteger index new AtomicInteger();while (true) {//轮询selector.select();//得到Selector上注册的Channel对应的所有事件 ---》Boss(main)线程对应注册的只有ACCEPT事件Worker线程处理READWRITE事件IteratorSelectionKey iterator selector.selectedKeys().iterator();//遍历每个事件while (iterator.hasNext()) {SelectionKey key iterator.next();iterator.remove();if (key.isAcceptable()) {//拿到的肯定是ACCEPT事件对应的ServerSocketChannelServerSocketChannel channel (ServerSocketChannel) key.channel();//accept方法接收得到SocketChannel 这个是建立连接后服务端与客户端之间进行READ或WRITE的通道SocketChannel socketChannel channel.accept();//同样设为非阻塞socketChannel.configureBlocking(false);log.debug(boss线程准备调用worker线程的register方法 );//在Boss单线程的情况下由于是while迭代器逐个遍历所以这句代码同一时刻只有一个线程访问不存在并发安全问题
// worker.register(socketChannel);//index.getAndIncrement(): 获取到index的值并且1线程安全的//index.getAndIncrement() % workers.length 得到的结果为[0,workers.length-1]//注意哈一次拿到一个事件去处理而不是一次拿一堆由于Boss是单线程, 所以同一时刻register方法只有一个线程去访问无并发安全问题。workers[index.getAndIncrement() % workers.length].register(socketChannel) ;log.debug(boss线程调用结束worker线程的register方法 );}}}}}
package com.messi.netty_basic_01.reactor;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;public class Worker implements Runnable{private static final Logger log LoggerFactory.getLogger(Worker.class);//worker的名称private String name;//worker是一个子线程的任务所以应该有一个线程其实就是实际干活的private Thread thread;//每一个worker子线程都会有一个Selector轮询器。一定注意每一个线程都具有独立的Selector复用器和主线程不是同一个Selectorprivate Selector selector;//加上一个volatile来解决多线程环境下isCreated值的可见性问题//但是在我们这个程序中并不存在多线程安全问题因为Boss线程为单线程并且迭代器遍历客户端事件时是逐个遍历请求register//所以不存在多线程并发安全问题。只不过这里养成良好习惯给标记位加上volatile。为了保证后续多个Boss线程同时访问的环境下的可用性private volatile boolean isCreated ; //标记位//并发队列负责传递[代码 或 功能]到另外一个线程private ConcurrentLinkedQueueRunnable queue new ConcurrentLinkedQueue() ;public Worker(String name) {this.name name;}public void register(SocketChannel socketChannel) throws Exception{log.debug(worker#register invoke);//使用标记位方式保证下列初始化只执行一次// 分析一下为什么只让执行一次// Boss(main)线程调用register方法中的下列这些代码。其实就是Boss线程在与处理完客户端建立连接的事件后的调用// 如果每一次处理完一个客户端连接就开启一个新Thread去处理该客户端后续的READ或Write事件那么是不是倒退了// 所以要控制每一个Worker对象才会异步开启一个线程去执行处理很多客户端的读写事件后续压力大了可以搞一个Worker线程池多个Worker线程去处理所有客户端的读写事件// 绝对不是一个Worker线程处理一个客户端的读写事件那样性能耗费太大了而且没必要// 但是还有一个细节需要思考// 对于同一Worker对象而言异步线程Thread只创建启动一个。但是对于不同的Worker对象异步线程Thread会重新创建启动一个// 为什么会这样呢因为isCreated标记位对于一个新Worker对象isCreated初始值为false, 那么自然会执行下列这段代码// 下列这个if包围的代码还可以放在哪// 放在Worker的构造方法中只在对象创建初始化执行一次if (!isCreated) {//创建线程this.thread new Thread(this,name);//先对Selector进行初始化避免后续NUll Exceptionselector Selector.open();//异步起一个线程this.thread.start();//标志位设为true 保证初始化代码只执行一次isCreated true;System.out.println(我是梅西);}log.debug(socketChannel#register );//使用睡眠模拟繁杂的业务逻辑Thread.sleep(2000);
// synchronized (this) {
// //其实就是打了一个标记。即调用一次wakeup后无论此时selector.select()阻塞还是之后才阻塞都会减少阻塞一次
// selector.wakeup();
// socketChannel.register(selector, SelectionKey.OP_READ);
// }//把SocketChannel注册到Selector并且设置READ事件的代码放到并发队列中 传递给后续新开启的线程queue.add(()-{try {socketChannel.register(selector, SelectionKey.OP_READ);} catch (ClosedChannelException e) {e.printStackTrace();}});//兜底操作唤醒一次Selector。无论Selector在wakeup方法执行前阻塞还是在wakeup执行后阻塞wakeup都可以唤醒一次Selector阻塞像是一个标记计数selector.wakeup();System.out.println(Worker.register);}Overridepublic void run() {while (true) {log.debug(worker线程的run方法开始执行 );try {//应用层等待。内核层轮询监听事件的发生selector.select();//从并发队列中获取任务Runnable poll queue.poll();//任务不为空则执行该任务if (poll ! null) {poll.run();}//一旦Selector监听到keys这一HashSet中注册的Channel对应事件的发生就会把这些触发的事件对应的Channel的引用复制copy到SelectionKeys这一HashSet中//这一copy迁移Channel引用的工作是selector.select()做的IteratorSelectionKey iterator selector.selectedKeys().iterator();//遍历Selector监听到的事件集合迭代器while (iterator.hasNext()) {//一个一个的来SelectionKey key iterator.next();//使用完后就删除掉 避免空指针异常iterator.remove();//Worker线程监听Read事件if (key.isReadable()) {//获取到SocketChannel 该SocketChannel为客户端与服务端之间进行读写操作的管道SocketChannel socketChannel (SocketChannel) key.channel();//创建Buffer缓冲区 默认为写模式ByteBuffer buffer ByteBuffer.allocate(30);//把SocketChannel中的数据读取到buffer缓冲区socketChannel.read(buffer);//切换到读模式buffer.flip();//解码CharBuffer result Charset.forName(UTF-8).decode(buffer);//输出读取到的结果数据log.info(worker线程读取的结果result为 : {},result);}}} catch (IOException e) {e.printStackTrace();}}}
}
package com.messi.netty_basic_01.reactor;import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;public class MyClient {public static void main(String[] args) throws Exception{//客户端创建SocketChannelSocketChannel socketChannel SocketChannel.open();//客户端与服务端之间通过SocketChannel进行连接socketChannel.connect(new InetSocketAddress(8000));//客户端写数据给服务端socketChannel.write(Charset.forName(UTF-8).encode(leomessi)) ;//.....System.out.println(MyClient.main);}}
测试
debug的方式进行开启多个客户端与服务端建立通信 Server端每次都会选一个Worker线程执行register方法 讨论交流
worker线程搞多少个比较好
CPU核数减1。现代计算机一般都是单CPU多个核也就是俗称的多核CPU。假设目前为八核那么开启8-17个worker线程为什么开7个worker线程一个线程给boss线程一共8个线程更好的利用这八个核实现真正意义上的并行。截止到目前有好几个问题1为什么worker线程数要多于boss线程2为什么正好开启核数个线程效率最佳即核数boss线程数worker线程数3能不能开启线程个数超过最大核数最大开启线程数为多少
依次回答1对于一个客户端而言连接只进行一次连接之后的读写可能回触发多次而我们又知道boss线程是处理连接的worker线程处理连接后的读写事件的答案显而易见了。2为了实现真正意义上的并行因为当线程数超过CPU核数时CPU需要不断的频繁调度切换。并且线程创建是占用内存的线程越多内存消耗越大。3当然可以最大开启线程数取决于具体硬件配置你要看硬件的并发能力你要CPU切换能力强内存大线程开多点其实也没毛病的。
9.零拷贝
一定要看这篇文章
一文彻底弄懂零拷贝原理 - 掘金
基本知识 无论是磁盘IO还是网络IO实际上最终都需要操作具体的硬件磁盘IO---磁盘文件网络IO---网络网卡
但是呢我们编写的JVM应用程序是无法直接操控硬件的中间需要通过操作系统做中间者具体原因这里简单说一下因为对于磁盘网络资源我们不能让用户程序直接操控一方面是硬件版本问题一方面是安全问题怎么可能让用户直接操控内存呢这多不安全呀
具体的调用过程为JVM用户程序---操作系统---硬件驱动程序---具体的硬件
总结一下
1.JVM中的java应用程序不能直接操作硬件需要操作系统的介入
2.内存分为两种也可以理解成内存空间分为两部分
一部分为用户地址空间这部分的内存空间通常给应用程序如JVM进行使用应用程序通常是进程级别我们常说的线程线程实际上是共享进程的内存资源的一种抽象细化线程是利用进程空间的。应用程序其实就是我们平常编写的各种代码动态运行起来的一种概念统称。
另外一部分为内核地址空间这部分的内存空间通常给操作系统操作系统也是占用内存的一种特殊的程序操作系统通常会提供一系列公共的功能API。啥是公共的功能API比如应用程序需要申请内存空间应用程序就需要先调用操作系统提供的公共功能API。
3.为什么socket网络连接io流资源线程资源都是珍贵的资源使用完毕后要及时关闭
对于这些资源是对安全性要求极高的资源你仅仅通过用户程序是无法完成这些资源的操控的必须陷入操作系统内核让OS去真正调用申请。所以这些资源是珍贵的, 是位于OS内存的公共资源。
并且申请的这些公共资源都是占用着操作系统所对应的内核地址空间内存我们应用程序是陷入内核使用这部分内核内存空间的。
假如说当前用户程序使用完毕后不断开对这部分操作系统内存的引用(即不断开socket网络连接IO流)会影响这台计算机其他应用程序对操作系统这部分公共资源内存的调用和使用的因为不可能一台计算机只有一个进程应用程序甚至导致其他应用程序无法申请到这些珍贵的资源。所以使用完后应用程序要及时断开对珍贵资源的连接。
并且要减少应用程序与这部分资源的交互因为需要陷入内核切换的时候性能耗费一定是很大的
传统方式进行拷贝不进行零拷贝优化
下面分析涉及到的代码程序 总结
参看文章一文彻底弄懂零拷贝原理 - 掘金
分析 注释创建直接内存的API就是ByteBuffer buffer ByteBuffer.allocateDirect(20);
零拷贝优化1mmapwrite
涉及到的代码 通过NIO的API创建直接内存空间所谓直接内存就是应用程序不再开辟用户地址内存空间(堆内存)了而是直接使用操作系统的内存。用户程序可以编写代码直接操控这一块直接内存可以读写清除等操作。
这也叫做内存映射好像是把操作系统的内存映射到用户地址内存空间上减少了数据的拷贝。
但应用程序一味的使用直接内存也存在很大弊端因为OS不具备JVM的GC机制所以如果应用程序不及时释放操作系统的直接内存那么可能会存在很大隐患。
eg调用NIO-APIByteBuffer.allocateDirect(10)这句代码创建的就是10字节大小的操作系统内存高速页缓存这种。然后应用程序获取到这块直接内存后在应用程序中就可以使用java代码直接操作这块直接内存了。
总结
见文章一文彻底弄懂零拷贝原理 - 掘金
补充
但是注意内存映射/开辟OS直接内存是针对于文件的读写操作。对于网络相关的数据读写操作我们不可以使用直接内存映射为什么因为你无法把这一台JVM的相应内容数据给另外一台JVM。
零拷贝优化2linux内核2.1版本的零拷贝 - sendfile方式
涉及到的代码 所有的零拷贝都是文件相关的socket不涉及零拷贝。零拷贝是指不进行JVM虚拟机层面的拷贝只进行系统级别的拷贝.
总结
见文章一文彻底弄懂零拷贝原理 - 掘金
零拷贝优化3linux内核2.4版本的零拷贝- 带有 scatter/gather 的 sendfile方式 对Linux2.4内核进行sendFile方法的优化的一些理解这次优化直接让保存到高速页缓存的文件读取数据直接发送到网卡对应的驱动程序再给到网卡。而无需拷贝到socket缓存。
为什么可以这样因为无论是文件对应的高速页缓存还是网卡对应的socket缓存其实内部结构都是一样的只不过可能标记位不同既然差不多一样把页缓存的内存地址偏移量记录传输到socket缓存然后直接把高速页缓存的数据给到网卡驱动程序然后再给网卡这有问题吗
总结
见文章一文彻底弄懂零拷贝原理 - 掘金
零拷贝优化4 - splice 方式 splice 调用和sendfile 非常相似用户应用程序必须拥有两个已经打开的文件描述符一个表示输入设备一个表示输出设备。与sendfile不同的是splice允许任意两个文件互相连接而并不只是文件与socket进行数据传输。对于从一个文件描述符发送数据到socket这种特例来说一直都是使用sendfile系统调用而splice一直以来就只是一种机制它并不仅限于sendfile的功能。也就是说 sendfile 是 splice 的一个子集。
在 Linux 2.6.17 版本引入了 splice而在 Linux 2.6.23 版本中 sendfile 机制的实现已经没有了但是其 API 及相应的功能还在只不过 API 及相应的功能是利用了 splice 机制来实现的。 总结
见文章一文彻底弄懂零拷贝原理 - 掘金
总结
无论是传统的 I/O 方式还是引入了零拷贝之后2 次 DMA copy是都少不了的。因为两次 DMA 都是依赖硬件完成的。所以所谓的零拷贝都是为了减少 CPU copy 及减少了上下文的切换。
sendfile局限于文件与socket进行数据传输。与sendfile不同的是splice允许任意两个文件互相连接而并不只是文件与socket进行数据传输。
下图展示了各种零拷贝技术的对比图 补充
我们java平常做的优化都是在JVM层面进行优化的。对于OS操作系统层面的优化需要OS开发维护者去优化的比如说linux2.4内核完成了sendFile方法的重构epoll模型的完善。对于我们java程序来说只要你部署在linux操作系统上你就可以享用OS优化带来的好处因为你java程序对于一些系统级别的操作如文件读取其实是调用OS的sendFile那么OS对sendFile做了优化是不是等价于你java程序性能也进一步优化了对吧答案显而易见。
这就是为什么后面我们的服务一般都部署在linux操作系统服务器上它在IO处理方面是要比市面上其他操作系统服务器要强的多的但是异步处理层面上要比win操作系统差一些的。。。所以现在都研究linux内核
10.硬件驱动程序与软件驱动程序数据库驱动程序的关系 11.linux内核与centosredhat等这些产品有啥区别和关系
linux内核是开源的是linux最核心的代码。后续centosredhat这些厂商基于linux内核进行开发进行套壳增加了稳定性可靠性或一些美丽的图形化界面。但是这些厂商也会把产品分为很多发行版比如说这一个发行版做的不好或差一些那么就开源不收费。如果这一个发行版做的好那么就收费闭源。。。
这就好比安卓操作系统与小米的MIUI华为的一些发行版产品等一样的关系。
文章转载自: http://www.morning.ngcbd.cn.gov.cn.ngcbd.cn http://www.morning.tplht.cn.gov.cn.tplht.cn http://www.morning.dmhs.cn.gov.cn.dmhs.cn http://www.morning.jytrb.cn.gov.cn.jytrb.cn http://www.morning.rjrh.cn.gov.cn.rjrh.cn http://www.morning.mdmxf.cn.gov.cn.mdmxf.cn http://www.morning.bfbl.cn.gov.cn.bfbl.cn http://www.morning.zhnpj.cn.gov.cn.zhnpj.cn http://www.morning.fwlch.cn.gov.cn.fwlch.cn http://www.morning.fhbhr.cn.gov.cn.fhbhr.cn http://www.morning.pqqzd.cn.gov.cn.pqqzd.cn http://www.morning.rlhjg.cn.gov.cn.rlhjg.cn http://www.morning.bmgdl.cn.gov.cn.bmgdl.cn http://www.morning.tgnr.cn.gov.cn.tgnr.cn http://www.morning.twdwy.cn.gov.cn.twdwy.cn http://www.morning.pyzt.cn.gov.cn.pyzt.cn http://www.morning.npbnc.cn.gov.cn.npbnc.cn http://www.morning.rmfh.cn.gov.cn.rmfh.cn http://www.morning.shxrn.cn.gov.cn.shxrn.cn http://www.morning.bqwrn.cn.gov.cn.bqwrn.cn http://www.morning.xysxj.com.gov.cn.xysxj.com http://www.morning.cpctr.cn.gov.cn.cpctr.cn http://www.morning.mxbks.cn.gov.cn.mxbks.cn http://www.morning.sjgsh.cn.gov.cn.sjgsh.cn http://www.morning.djpps.cn.gov.cn.djpps.cn http://www.morning.rwqj.cn.gov.cn.rwqj.cn http://www.morning.snccl.cn.gov.cn.snccl.cn http://www.morning.tlfyb.cn.gov.cn.tlfyb.cn http://www.morning.fgsqz.cn.gov.cn.fgsqz.cn http://www.morning.mysmz.cn.gov.cn.mysmz.cn http://www.morning.mgbsp.cn.gov.cn.mgbsp.cn http://www.morning.zfkxj.cn.gov.cn.zfkxj.cn http://www.morning.mwmtk.cn.gov.cn.mwmtk.cn http://www.morning.dpmkn.cn.gov.cn.dpmkn.cn http://www.morning.yymlk.cn.gov.cn.yymlk.cn http://www.morning.ychoise.com.gov.cn.ychoise.com http://www.morning.rxfbf.cn.gov.cn.rxfbf.cn http://www.morning.jgttx.cn.gov.cn.jgttx.cn http://www.morning.hkgcx.cn.gov.cn.hkgcx.cn http://www.morning.fqzz3.cn.gov.cn.fqzz3.cn http://www.morning.mpxbl.cn.gov.cn.mpxbl.cn http://www.morning.tsnmt.cn.gov.cn.tsnmt.cn http://www.morning.qgbfx.cn.gov.cn.qgbfx.cn http://www.morning.fkyqm.cn.gov.cn.fkyqm.cn http://www.morning.lsnbx.cn.gov.cn.lsnbx.cn http://www.morning.tgdys.cn.gov.cn.tgdys.cn http://www.morning.hqykb.cn.gov.cn.hqykb.cn http://www.morning.qgkcs.cn.gov.cn.qgkcs.cn http://www.morning.gjxr.cn.gov.cn.gjxr.cn http://www.morning.wqpb.cn.gov.cn.wqpb.cn http://www.morning.gjcdr.cn.gov.cn.gjcdr.cn http://www.morning.cknrs.cn.gov.cn.cknrs.cn http://www.morning.jcfg.cn.gov.cn.jcfg.cn http://www.morning.nfdty.cn.gov.cn.nfdty.cn http://www.morning.lzttq.cn.gov.cn.lzttq.cn http://www.morning.fpzz1.cn.gov.cn.fpzz1.cn http://www.morning.sgbss.cn.gov.cn.sgbss.cn http://www.morning.mrpqg.cn.gov.cn.mrpqg.cn http://www.morning.cczrw.cn.gov.cn.cczrw.cn http://www.morning.txmkx.cn.gov.cn.txmkx.cn http://www.morning.dmtbs.cn.gov.cn.dmtbs.cn http://www.morning.qnbck.cn.gov.cn.qnbck.cn http://www.morning.mhxlb.cn.gov.cn.mhxlb.cn http://www.morning.sldrd.cn.gov.cn.sldrd.cn http://www.morning.nshhf.cn.gov.cn.nshhf.cn http://www.morning.bfcrp.cn.gov.cn.bfcrp.cn http://www.morning.gbjxj.cn.gov.cn.gbjxj.cn http://www.morning.kndst.cn.gov.cn.kndst.cn http://www.morning.ndxrm.cn.gov.cn.ndxrm.cn http://www.morning.dsmwy.cn.gov.cn.dsmwy.cn http://www.morning.kqpxb.cn.gov.cn.kqpxb.cn http://www.morning.spwln.cn.gov.cn.spwln.cn http://www.morning.ltzkk.cn.gov.cn.ltzkk.cn http://www.morning.hlyfn.cn.gov.cn.hlyfn.cn http://www.morning.bhwz.cn.gov.cn.bhwz.cn http://www.morning.jwncx.cn.gov.cn.jwncx.cn http://www.morning.phlrp.cn.gov.cn.phlrp.cn http://www.morning.xrksf.cn.gov.cn.xrksf.cn http://www.morning.fjzlh.cn.gov.cn.fjzlh.cn http://www.morning.rcqyk.cn.gov.cn.rcqyk.cn