网站建设技术是什么,宁夏住房和城乡建设部网站,网站内如何做内部链接,简述什么是seo传送门 
分布式定时任务系列1#xff1a;XXL-job安装 
分布式定时任务系列2#xff1a;XXL-job使用 
分布式定时任务系列3#xff1a;任务执行引擎设计 
分布式定时任务系列4#xff1a;任务执行引擎设计续 Java并发编程实战1#xff1a;java中的阻塞队列 
引子 这篇文章的…传送门 
分布式定时任务系列1XXL-job安装 
分布式定时任务系列2XXL-job使用 
分布式定时任务系列3任务执行引擎设计 
分布式定时任务系列4任务执行引擎设计续 Java并发编程实战1java中的阻塞队列 
引子 这篇文章的主要目不是讨论XXL-job的使用而是要通过它的任务线程实现机制来分析java中阻塞队列的应用 
而这一切要从上周某天公司一个普通的下午说起。 
当时一个同事要添加任务就随口问了一句“阻塞处理策略选啥”我心里一惊以前没注意过这个地方每次都是默认的也就是单机串行。 
就是下图这个 这里面有3个选项 众人先是侃侃讨论了半天后面再去翻了源码来查看发现了下面的blockingQueue 阻塞队列 
什么是阻塞队列 
关于java阻塞队列可以看看java中的阻塞队列。里面例子用到的是有界队列ArrayBlockingQueueXXL-job里面用的是无界队列LinkedBlockingQueue。不论它们的相似及区别点最重要的都是拥有阻塞特性。 
要分析XXL-job是怎么使用阻塞队列的就从XXL-job的调度触发机制入手 
什么是XXL-job任务 
前面在分布式定时任务系列2XXL-job使用里面介绍过XXL-job的使用注册一个任务也很简单只要在业务代码方法中打上XxlJob注解即可下面是官方提供的demo 
/*** 1、简单任务示例Bean模式*/XxlJob(demoJobHandler)public void demoJobHandler() throws Exception {XxlJobHelper.log(XXL-JOB, Hello World.);for (int i  0; i  5; i) {XxlJobHelper.log(beat at:  i);TimeUnit.SECONDS.sleep(2);}// default success} 
开发步骤 1、任务开发在Spring Bean实例中开发Job方法 2、注解配置为Job方法添加注解 XxlJob(value自定义jobhandler名称, init  JobHandler初始化方法, destroy  JobHandler销毁方法)注解value值对应的是调度中心新建任务的JobHandler属性的值。 3、执行日志需要通过 XxlJobHelper.log 打印执行日志 4、任务结果默认任务结果为 成功 状态不需要主动设置如有诉求比如设置任务结果为失败可以通过 XxlJobHelper.handleFail/handleSuccess 自主设置任务结果  任务注册 
对于XXL-job来说是怎么注册并管理这些任务的呢 
在程序启动的时候xxl-job-core包中的XxlJobSpringExecutor类由于实现了接口SmartInitializingSingleton在Bean实例化完成时会调用afterSingletonsInstantiated()方法 
Overridepublic void afterSingletonsInstantiated() {// init JobHandler Repository (for method)initJobHandlerMethodRepository(applicationContext);// refresh GlueFactoryGlueFactory.refreshInstance(1);// super starttry {super.start();} catch (Exception e) {throw new RuntimeException(e);}} 
在initJobHandlerMethodRepository(applicationContext)方法中现在看一下这个方法的代码该方法会提取所有SpringBean的实例中方法上添加了XxlJob注解的方法并进行注册 
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext  null) {return;}// init job handler from methodString[] beanDefinitionNames  applicationContext.getBeanNamesForType(Object.class, false, true);for (String beanDefinitionName : beanDefinitionNames) {Object bean  applicationContext.getBean(beanDefinitionName);MapMethod, XxlJob annotatedMethods  null;   // referred to org.springframework.context.event.EventListenerMethodProcessor.processBeantry {annotatedMethods  MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookupXxlJob() {Overridepublic XxlJob inspect(Method method) {return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error(xxl-job method-jobhandler resolve error for bean[  beanDefinitionName  ]., ex);}if (annotatedMethodsnull || annotatedMethods.isEmpty()) {continue;}for (Map.EntryMethod, XxlJob methodXxlJobEntry : annotatedMethods.entrySet()) {Method executeMethod  methodXxlJobEntry.getKey();XxlJob xxlJob  methodXxlJobEntry.getValue();// registregistJobHandler(xxlJob, bean, executeMethod);}}// ---------------------- job handler repository ----------------------private static ConcurrentMapString, IJobHandler jobHandlerRepository  new ConcurrentHashMapString, IJobHandler();public static IJobHandler loadJobHandler(String name){return jobHandlerRepository.get(name);}public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info( xxl-job register jobhandler success, name:{}, jobHandler:{}, name, jobHandler);return jobHandlerRepository.put(name, jobHandler);} 
注册的方法就是registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod))会将XxlJob注解标记的方法提取出对应的方法名通过反射得到Method信息并实例为对应的MethodJobHandler最后通过ConcurrentMap来管理其中key是注任务名称value为JobHandler实例。 看到这里不知你有没有一个疑问这里的JobHandler里面并没有阻塞队列BlockingQueue它们是怎么关联起来的呢所以这里就要讨论一下XXL-job的任务执行! 
XXL-job的任务执行 
在后台管理手动触发一下测试任务并把代码打上debug断点 首先会发送后台请求 一路跟踪下去会进入到XxlJobExecutor.runExecutor(TriggerParam triggerParam, String address)方法最终是到ExecutorBizImpl.run(TriggerParam triggerParam)方法 
public ReturnTString run(TriggerParam triggerParam) {// load oldjobHandler  jobThreadJobThread jobThread  XxlJobExecutor.loadJobThread(triggerParam.getJobId());IJobHandler jobHandler  jobThread!null?jobThread.getHandler():null;String removeOldReason  null;// validjobHandler  jobThreadGlueTypeEnum glueTypeEnum  GlueTypeEnum.match(triggerParam.getGlueType());if (GlueTypeEnum.BEAN  glueTypeEnum) {// new jobhandlerIJobHandler newJobHandler  XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());// valid old jobThreadif (jobThread!null  jobHandler ! newJobHandler) {// change handler, need kill old threadremoveOldReason  change jobhandler or glue type, and terminate the old job thread.;jobThread  null;jobHandler  null;}// valid handlerif (jobHandler  null) {jobHandler  newJobHandler;if (jobHandler  null) {return new ReturnTString(ReturnT.FAIL_CODE, job handler [  triggerParam.getExecutorHandler()  ] not found.);}}} else if (GlueTypeEnum.GLUE_GROOVY  glueTypeEnum) {// valid old jobThreadif (jobThread ! null !(jobThread.getHandler() instanceof GlueJobHandler ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()triggerParam.getGlueUpdatetime() )) {// change handler or gluesource updated, need kill old threadremoveOldReason  change job source or glue type, and terminate the old job thread.;jobThread  null;jobHandler  null;}// valid handlerif (jobHandler  null) {try {IJobHandler originJobHandler  GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());jobHandler  new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnTString(ReturnT.FAIL_CODE, e.getMessage());}}} else if (glueTypeEnum!null  glueTypeEnum.isScript()) {// valid old jobThreadif (jobThread ! null !(jobThread.getHandler() instanceof ScriptJobHandler ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()triggerParam.getGlueUpdatetime() )) {// change script or gluesource updated, need kill old threadremoveOldReason  change job source or glue type, and terminate the old job thread.;jobThread  null;jobHandler  null;}// valid handlerif (jobHandler  null) {jobHandler  new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));}} else {return new ReturnTString(ReturnT.FAIL_CODE, glueType[  triggerParam.getGlueType()  ] is not valid.);}// executor block strategyif (jobThread ! null) {ExecutorBlockStrategyEnum blockStrategy  ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);if (ExecutorBlockStrategyEnum.DISCARD_LATER  blockStrategy) {// discard when runningif (jobThread.isRunningOrHasQueue()) {return new ReturnTString(ReturnT.FAIL_CODE, block strategy effectExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());}} else if (ExecutorBlockStrategyEnum.COVER_EARLY  blockStrategy) {// kill running jobThreadif (jobThread.isRunningOrHasQueue()) {removeOldReason  block strategy effect  ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();jobThread  null;}} else {// just queue trigger}}// replace thread (new or exists invalid)if (jobThread  null) {jobThread  XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}// push data to queueReturnTString pushResult  jobThread.pushTriggerQueue(triggerParam);return pushResult;} 任务执行过程中如果发现XXL-job对应的执行线程不存在会创建一个新new Thread实例并绑定前面的JobHandler实例这样对于每一个任务都有一个单独的线程也就将执行线程JobThread跟任务关联起来了 
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){JobThread newJobThread  new JobThread(jobId, handler);newJobThread.start();logger.info( xxl-job regist JobThread success, jobId:{}, handler:{}, new Object[]{jobId, handler});JobThread oldJobThread  jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, maps put method return the old value!!!if (oldJobThread ! null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();}return newJobThread;}public JobThread(int jobId, IJobHandler handler) {this.jobId  jobId;this.handler  handler;// 初始化阻塞队列this.triggerQueue  new LinkedBlockingQueueTriggerParam();this.triggerLogIdSet  Collections.synchronizedSet(new HashSetLong());// assign job thread namethis.setName(xxl-job, JobThread-jobId-System.currentTimeMillis());} 
而执行任务的最后一步就是将当前任务入队到刚才的执行线程的BlockingQueue中 
// push data to queueReturnTString pushResult  jobThread.pushTriggerQueue(triggerParam); 而这也就跟文章最前面的代码关联起来了 
/*** new trigger to queue** param triggerParam* return*/public ReturnTString pushTriggerQueue(TriggerParam triggerParam) {// avoid repeatif (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.info( repeate trigger job, logId:{}, triggerParam.getLogId());return new ReturnTString(ReturnT.FAIL_CODE, repeate trigger job, logId:  triggerParam.getLogId());}triggerLogIdSet.add(triggerParam.getLogId());triggerQueue.add(triggerParam);return ReturnT.SUCCESS;} 
BlockingQueue的应用 
消息入队 
至此就可以看出任务触发的大致逻辑了用户在admin手工触发任务之后最终会放入到任务对应的队列之中通过add方法将当前执行参数入队FIFO不阻塞队列如果满了就抛出异常这也是一个比较大的隐患如果触了过于频繁可能会导致OOM而这也就是为什么要设计不同阻塞策略的原因。 
消息消费 
而这些触发的任务怎么消费的呢其实也是在上面的提到的执行线程JobThread 
当实例化一个线程的时候com.xxl.job.core.executor.XxlJobExecutor类会立即启动调用线程的start()方法 启动之后会调用线程的run()方法com.xxl.job.core.thread.JobThread类 在执行线程中会写一个while的无限循环来不停的从阻塞队列中取出任务 循环结束的条件就是上面的阻塞队列如果是覆盖之前调度将会结束循环com.xxl.job.core.biz.impl.ExecutorBizImpl类  文章转载自: http://www.morning.rkrcd.cn.gov.cn.rkrcd.cn http://www.morning.tllws.cn.gov.cn.tllws.cn http://www.morning.rqgjr.cn.gov.cn.rqgjr.cn http://www.morning.kntbk.cn.gov.cn.kntbk.cn http://www.morning.fbhmn.cn.gov.cn.fbhmn.cn http://www.morning.lwcgh.cn.gov.cn.lwcgh.cn http://www.morning.tqwcm.cn.gov.cn.tqwcm.cn http://www.morning.snrhg.cn.gov.cn.snrhg.cn http://www.morning.zlxrg.cn.gov.cn.zlxrg.cn http://www.morning.kjxgc.cn.gov.cn.kjxgc.cn http://www.morning.fpzpb.cn.gov.cn.fpzpb.cn http://www.morning.wcgcm.cn.gov.cn.wcgcm.cn http://www.morning.bfhrj.cn.gov.cn.bfhrj.cn http://www.morning.cnfjs.cn.gov.cn.cnfjs.cn http://www.morning.cdrzw.cn.gov.cn.cdrzw.cn http://www.morning.gdpai.com.cn.gov.cn.gdpai.com.cn http://www.morning.snlxb.cn.gov.cn.snlxb.cn http://www.morning.kxqfz.cn.gov.cn.kxqfz.cn http://www.morning.tnthd.cn.gov.cn.tnthd.cn http://www.morning.qzbwmf.cn.gov.cn.qzbwmf.cn http://www.morning.yjprj.cn.gov.cn.yjprj.cn http://www.morning.hcwjls.com.gov.cn.hcwjls.com http://www.morning.fpxsd.cn.gov.cn.fpxsd.cn http://www.morning.bloao.com.gov.cn.bloao.com http://www.morning.pqwhk.cn.gov.cn.pqwhk.cn http://www.morning.zdmlt.cn.gov.cn.zdmlt.cn http://www.morning.wslpk.cn.gov.cn.wslpk.cn http://www.morning.lfttb.cn.gov.cn.lfttb.cn http://www.morning.neletea.com.gov.cn.neletea.com http://www.morning.jjxnp.cn.gov.cn.jjxnp.cn http://www.morning.jcwt.cn.gov.cn.jcwt.cn http://www.morning.qgjp.cn.gov.cn.qgjp.cn http://www.morning.tnqk.cn.gov.cn.tnqk.cn http://www.morning.lhgkr.cn.gov.cn.lhgkr.cn http://www.morning.xtqld.cn.gov.cn.xtqld.cn http://www.morning.svrud.cn.gov.cn.svrud.cn http://www.morning.dgfpp.cn.gov.cn.dgfpp.cn http://www.morning.ymqrc.cn.gov.cn.ymqrc.cn http://www.morning.gnmhy.cn.gov.cn.gnmhy.cn http://www.morning.wnkjb.cn.gov.cn.wnkjb.cn http://www.morning.ypzr.cn.gov.cn.ypzr.cn http://www.morning.kqylg.cn.gov.cn.kqylg.cn http://www.morning.plhyc.cn.gov.cn.plhyc.cn http://www.morning.nqlnd.cn.gov.cn.nqlnd.cn http://www.morning.jwqqd.cn.gov.cn.jwqqd.cn http://www.morning.bhdtx.cn.gov.cn.bhdtx.cn http://www.morning.tmrjb.cn.gov.cn.tmrjb.cn http://www.morning.kphsp.cn.gov.cn.kphsp.cn http://www.morning.bhmnp.cn.gov.cn.bhmnp.cn http://www.morning.lyjwb.cn.gov.cn.lyjwb.cn http://www.morning.gbnsq.cn.gov.cn.gbnsq.cn http://www.morning.fpkpz.cn.gov.cn.fpkpz.cn http://www.morning.fnjrh.cn.gov.cn.fnjrh.cn http://www.morning.kjcfz.cn.gov.cn.kjcfz.cn http://www.morning.kpqjr.cn.gov.cn.kpqjr.cn http://www.morning.kdrly.cn.gov.cn.kdrly.cn http://www.morning.ljxps.cn.gov.cn.ljxps.cn http://www.morning.rmdwp.cn.gov.cn.rmdwp.cn http://www.morning.cnxpm.cn.gov.cn.cnxpm.cn http://www.morning.ysfj.cn.gov.cn.ysfj.cn http://www.morning.fhhry.cn.gov.cn.fhhry.cn http://www.morning.fbmzm.cn.gov.cn.fbmzm.cn http://www.morning.jwgmx.cn.gov.cn.jwgmx.cn http://www.morning.xglgm.cn.gov.cn.xglgm.cn http://www.morning.jrtjc.cn.gov.cn.jrtjc.cn http://www.morning.ffwrq.cn.gov.cn.ffwrq.cn http://www.morning.nkqxb.cn.gov.cn.nkqxb.cn http://www.morning.hotlads.com.gov.cn.hotlads.com http://www.morning.jghty.cn.gov.cn.jghty.cn http://www.morning.cbczs.cn.gov.cn.cbczs.cn http://www.morning.rjmg.cn.gov.cn.rjmg.cn http://www.morning.rywr.cn.gov.cn.rywr.cn http://www.morning.qjlnh.cn.gov.cn.qjlnh.cn http://www.morning.jcrlx.cn.gov.cn.jcrlx.cn http://www.morning.srmpc.cn.gov.cn.srmpc.cn http://www.morning.jcwrb.cn.gov.cn.jcwrb.cn http://www.morning.xkyqq.cn.gov.cn.xkyqq.cn http://www.morning.zcqgf.cn.gov.cn.zcqgf.cn http://www.morning.wtxdp.cn.gov.cn.wtxdp.cn http://www.morning.ntzbr.cn.gov.cn.ntzbr.cn