当前位置: 首页 > news >正文

安徽省建设委员会网站百度站长平台账号购买

安徽省建设委员会网站,百度站长平台账号购买,进不了建设银行网站,网站建设基础考试源码解析目标 分析最核心组件EventLoop在Netty运行过程中所参与的事情,以及具体实现 源码解析 依然用netty包example下Echo目录下的案例代码,单我们写一个NettyServer时候,第一句话就是 EventLoopGroup bossGroup new NioEventLoopGroup(…

源码解析目标

  • 分析最核心组件EventLoop在Netty运行过程中所参与的事情,以及具体实现

源码解析

  • 依然用netty包example下Echo目录下的案例代码,单我们写一个NettyServer时候,第一句话就是 EventLoopGroup bossGroup = new NioEventLoopGroup(1);,我们先来看看NioEventLoop的UML图

在这里插入图片描述

  • 首先我们看到ScheduledEecutorService接口,这个接口是concurrent包下的一个定时任务接口,EventLoop实现了这个接口,因此可以接受定时任务,所以我们在Debug的时候,能在EventLoop中找到一个scheduledTaskQueue
  • EventLoop接口我们看下源码,如下,从注释中我们了解到,EventLoop中一旦注册了Channel,就会处理该Channel对应的所有I/O操作
/*** Will handle all the I/O operations for a {@link Channel} once registered.** One {@link EventLoop} instance will usually handle more than one {@link Channel} but this may depend on* implementation details and internals.**/
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {@OverrideEventLoopGroup parent();
}
  • SingleThreadEventExecutor 也是一个比较重要的类,看源码注释,说明了SingleThreadEventExecutor 是一个单个线程的线程池

/*** Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.**/
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {......}
  • 在SingleThreadEventExecutor 类中实现了很多对线程池的操作,例如runAllTask,executer,takeTask,pollTask,看下其中一个构造方法:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,boolean addTaskWakesUp, int maxPendingTasks,RejectedExecutionHandler rejectedHandler) {super(parent);this.addTaskWakesUp = addTaskWakesUp;this.maxPendingTasks = Math.max(16, maxPendingTasks);this.executor = ObjectUtil.checkNotNull(executor, "executor");taskQueue = newTaskQueue(this.maxPendingTasks);rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");}
//其中taskQueue初始化protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {return new LinkedBlockingQueue<Runnable>(maxPendingTasks);}
  • 如上,SingleThreadEventExecutor 队列中元素是实现了Runnable接口的对象,线程池中最重要的方法当然是executer方法,EventLoop是SingleThreadEventExecutor 的子类,那么EventLoop 类也可以直接调用executer方法来完成对事件的执行,我们来看源码
 @Overridepublic void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();if (inEventLoop) {addTask(task);} else {startThread();addTask(task);if (isShutdown() && removeTask(task)) {reject();}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}
  • inEventLoop(); 首先判断EventLoop中线程是否是当前线程,如果是,则直接将task添加到线程池队列中
  • 如果不是则尝试启动一个线程(因为是单个线程的线程池,所以只能且只需要启动一次),之后在将任务添加到队列中去
  • isShutdown() && removeTask(task)) 中逻辑 如果线程已经停止并且删除任务失败,则直接拒绝策略
  • 接着看下addTask的实现
 /*** Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown* before.*/protected void addTask(Runnable task) {if (task == null) {throw new NullPointerException("task");}if (!offerTask(task)) {reject(task);}}final boolean offerTask(Runnable task) {if (isShutdown()) {reject();}return taskQueue.offer(task);}
  • 从注释中可以看出,addTask方法会添加一个task任务到队列中,如果当前线程是shutdown的状态,那么直接抛出异常RejectedExecutionException
  • 接着来看executer方法中的startThread(); ,当我们判断当前线程不是EventLoop中的线程的时候会执行这个方法,他是NioEventLoop中的核心方法,如下源码
 private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {try {doStartThread();} catch (Throwable cause) {STATE_UPDATER.set(this, ST_NOT_STARTED);PlatformDependent.throwException(cause);}}}}private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = state;if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +"before run() implementation terminates.");}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.release();if (!taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});}
  • state == ST_NOT_STARTED 首先通过状态判断是否执行过,保证EventLoop只有一个线程

  • 如果没有启动 用cas的方式STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED) 去修改状态为ST_STARTED,直接调用doStartThread方法

  • 如果失败就回滚

  • 接着分析doStartThread方法,首先会调用Executor的execute方法,这个Executor 是我们在创建EventLoopGroup时候创建的是一个ThreadPerTaskExecutor类,如下图是在channel中对应的EventLoop找到的对象信息,该execute方法会将Runable包装成Netty的FastThreadLocalThread

在这里插入图片描述

  • 接着通过Thread.currentThread() 判断线程是否中断
  • updateLastExecutionTime(); 然后设置最后一次执行的事件
  • 核心方法是:SingleThreadEventExecutor.this.run(); 执行单曲NioEventLoop的run方法,等会重点关注
  • 接着完成run方法的事物处理后,在finally中使用cas不断的修改state状态,设置为ST_SHUTTING_DOWN,也就是当loop中run方法结束运行后,关闭线程,最后还会通过不断轮询来二次确认是否关闭,否则不会break跳出
  • 接下来分析EventLoop中的Run方法,我们进入Run方法,就到了我们之前分析过的NioEventLoop中的run方法,此方法做了三件事情,如下源码

@Overrideprotected void run() {for (;;) {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}
  • NioEventLoop 中的loop轮询是依靠run方法来执行的,在方法中可以看到是一个for循环其中三件事情,如下图中EventLoop部分

    • case SelectStrategy.SELECT: 当事件类似是SELECT 时候, 通过select(wakenUp.getAndSet(false));方法获取感兴趣的事件
    • processSelectedKeys(); 处理选中的事件
    • runAllTasks 执行队列中的任务。
      在这里插入图片描述
  • 上图不管是bossGroup还是WorkerGroup中的EventLoop都是次run方法执行流程

  • select方法实现

private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}if (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}int selectedKeys = selector.select(timeoutMillis);selectCnt ++;if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {break;}if (Thread.interrupted()) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",selectCnt, selector);rebuildSelector();selector = this.selector;// Select again to populate selectedKeys.selector.selectNow();selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}}}
  • 关注点在于 select方法如何体现出非阻塞,如下图中,选择器获取对应事件debug
    在这里插入图片描述

  • 在select中传如参数是1 秒,也就是默认情况下阻塞1秒中,具体的算法: long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;

  • 其中 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); 表示当前时间 - 定时任务的时间,那么timeoutMillis 意思就是,当有定时任务的时候 delayNanos(currentTimeNanos) 时间就部位空,那么定时任务剩余时间 t +0.5秒阻塞的时间,否则就默认1秒中阻塞时间

  • 接着判断: if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks())

    • 如果1秒(或者t+0.5)后能获取到selectedKeys :selectedKeys != 0
    • 或者select被用户唤醒 :oldWakenUp, wakenUp.get()
    • 或者任务队列中有任务存在 : hasTasks()
    • 或者有定时任务即将被执行 : hasScheduledTasks()
  • 有以上任何情况则跳出循环,否则继续沦陷,直到满足其中一个条件为止

  • 接着processSelectedKeys对获取到的selectKey处理

  • 在接着runAllTasks 执行队列任务

总结
  • 每次执行execute方法就会向队列中添加任务。当第一次添加时候就启动线程,执行run方法,run方法是EventLoop的核心实现,负责轮询获取事件,处理事件,执行队列中任务
  • 其中调用selector的select方法默认阻塞一秒,有定时任务就t+0.5,t是定时任务剩余时间,当执行execute方法时候,也就是添加任务的时候,唤醒selector,防止selector阻塞事件过长
  • 当selector返回的时候,会调用processSelectedKeys对selectKey进行处理
  • 当processSelectedKeys 方法执行结束,按照ioRatio比例执行runAllTasks方法默认是 IO 任务时间和非 IO 任务时间是相同的代码如下
  final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
http://www.tj-hxxt.cn/news/46041.html

相关文章:

  • 公安局备案多久网站北京seo服务行者
  • wordpress category_name企业seo网络推广
  • 做建材一般去什么网站宣传营销策划培训
  • 织梦网站安装视频教程如何优化培训方式
  • 外包公司企业网站长沙网络优化产品
  • seo 网站地图优化系统开发
  • 做网站要好多钱宁波seo优化公司排名
  • 做公司网站的流程公司seo是什么级别
  • 成都市做网站全国疫情最新
  • 什么是分类信息网站营销网站内部链接优化方法
  • php开发网站项目心得优化措施最新回应
  • 本地电脑做网站网络推广工具有哪些
  • 网站内容页怎么设计seo体系百科
  • 吉林网站建设制作搜狗seo刷排名软件
  • 变更网站备案信息企业查询app
  • 做暧小视频免费网站可以访问境外的浏览器
  • 苏州企业网站建设定制岳阳seo公司
  • 建设招标网官方网站想要推广网页正式版
  • wordpress.com 域名搜索引擎优化举例说明
  • 做面膜的网站广告联盟哪个比较好
  • 制作公司网站全网热搜榜
  • 做会员卡的网站在线crm管理系统
  • 合肥商业网站建设费用志鸿优化设计
  • 免费网站制作下载2022年最新十条新闻
  • 美女做爰色视频网站怎样优化网站排名靠前
  • 做云购网站seo培训中心
  • 伪装的福祉 wordpress杭州优化排名哪家好
  • 程序员是不是都是做网站的nba新闻最新消息
  • 名字做头诗的网站网站设计公司网站制作
  • wordpress素锦 下载seo全网优化指南