网站开发相关外文书籍,网站下载软件入口,做服装网站需要什么条件,app store软件下载Executors快速创建线程池的方法 Java通过Executors 工厂提供了5种创建线程池的方法#xff0c;具体方法如下 方法名描述newSingleThreadExecutor()创建一个单线程的线程池#xff0c;该线程池中只有一个工作线程。所有任务按照提交的顺序依次执行#xff0c;保证任务的顺序性…Executors快速创建线程池的方法 Java通过Executors 工厂提供了5种创建线程池的方法具体方法如下 方法名描述newSingleThreadExecutor()创建一个单线程的线程池该线程池中只有一个工作线程。所有任务按照提交的顺序依次执行保证任务的顺序性。当工作线程意外终止时会创建一个新的线程来替代它。适用于需要顺序执行任务且保证任务安全性的场景。newFixedThreadPool(int nThreads)创建一个固定大小的线程池该线程池中的线程数量固定为指定的数量。当有新任务提交时如果线程池中有空闲线程则立即使用空闲线程执行任务如果没有空闲线程则任务将被放入任务队列等待执行。newCachedThreadPool()创建一个缓存线程池该线程池中的线程数量不固定可以根据任务的需求动态调整线程数量。空闲线程会被保留一段时间如果在保留时间内没有任务执行则这些线程将被终止并从线程池中删除。适用于执行大量短期任务的场景newScheduledThreadPool(int corePoolSize)创建一个可调度的线程池该线程池能够按照一定的调度策略执行任务。除了执行任务外还可以按照指定的延迟时间或周期性地执行任务。适用于需要按照计划执行任务、定时任务或周期性任务的场景。newWorkStealingPool(int parallelism) newWorkStealingPool(int parallelism)方法用于创建一个工作窃取线程池。工作窃取线程池是一种特殊的线程池它根据一定的调度策略执行任务。除了执行任务外工作窃取线程池还可以按照指定的延迟时间或周期性地执行任务。
ThreadFactory 在学习Executor创建线程池之前我们先来学习一下 ThreadFactory是一个接口用于创建线程对象的工厂。它定义了一个方法newThread用于创建新的线程。
在Java中线程的创建通常通过Thread类的构造函数进行但是使用ThreadFactory可以将线程的创建过程与线程的执行逻辑分离开来。通过自定义的ThreadFactory我们可以对线程进行更加灵活的配置和管理例如指定线程名称、设置线程优先级、设置线程是否为守护线程等。
ThreadFactory接口只有一个方法
Thread newThread(Runnable runnable);该方法接受一个Runnable对象作为参数并返回一个新的Thread对象。
一般情况下我们可以通过实现ThreadFactory接口来自定义线程的创建。以下是一个示例的自定义ThreadFactory实现
public class MyThreadFactory implements ThreadFactory {// 自定义线程的名称private final String namePrefix test-async-thread;private final AtomicInteger threadNumber new AtomicInteger(1);public MyThreadFactory(String namePrefix) {this.namePrefix namePrefix;}Overridepublic Thread newThread(Runnable runnable) {Thread thread new Thread(runnable);thread.setName(namePrefix - threadNumber.getAndIncrement());thread.setPriority(Thread.NORM_PRIORITY);thread.setDaemon(false);return thread;}
}在上面的示例中MyThreadFactory实现了ThreadFactory接口并通过构造函数传入一个namePrefix参数用于指定线程的名称前缀。
在newThread方法中首先创建一个新的Thread对象并设置线程的名称为namePrefix加上一个递增的数字。然后可以根据需要设置线程的优先级、是否为守护线程等属性。
newSingleThreadExecutor() 创建单线程化线程池
该方法用于创建一个单线程化的线程池也就是只有一个线程的线程池。该线程池中只有一个工作线程它负责按照任务的提交顺序依次执行任务。当有任务提交时会创建一个新的线程来执行任务。如果工作线程意外终止线程池会创建一个新的线程来替代它确保线程池中始终有一个可用的线程。
newSingleThreadExecutor()方法返回的线程池实例实现了ExecutorService接口因此可以使用submit()方法提交任务并获取Future对象或使用execute()方法提交任务。
该线程池适用于需要顺序执行任务且保证任务之间不会发生并发冲突的场景。由于只有一个工作线程所以不存在线程间的竞争和并发问题可以确保任务的安全性。
此外newSingleThreadExecutor()方法创建的线程池还可以用于任务的异常处理。当任务抛出异常时线程池会捕获异常并记录或处理异常避免异常导致整个应用程序崩溃。
需要注意的是由于该线程池只有一个线程如果任务执行时间过长或任务量过大可能会导致任务队列堆积造成应用程序的性能问题。所以在使用该线程池时需要根据任务的特性和需求进行适当的评估和调优。
下面我们使用Executors中newSingleThreadExecutor()方法创建一个单线程线程池 /*** 这里创建的线程是 是Executors.newSingleThreadExecutor() 一样 保证只有一个线程来进行执行 并且按照提交的顺序进行执行* pre* {code* public static ExecutorService newSingleThreadExecutor() {* return new Executors.FinalizableDelegatedExecutorService (* new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable()));* }* }* /pre*/Testpublic void test10() {// 如果有多个任务提交到线程池中那么这个线程池中的线程会依次执行任务 和ExecutorService executorService Executors.newSingleThreadExecutor();// 创建线程 1Thread t1 new Thread(() - {logger.error(t1 ---- 开始执行了~);}, t1);// 创建线程 2Thread t2 new Thread(() - {logger.error(t2 ---- 开始执行了~);}, t2);// 创建线程 3Thread t3 new Thread(() - {logger.error(t3 ---- 开始执行了~);}, t3);// Executor 这个接口定义的功能很有限同时也只支持 Runnale 形式的异步任务// 向线程池提交任务executorService.submit(t1);executorService.submit(t2);executorService.submit(t3);// 关闭线程池executorService.shutdown();}首先通过Executors.newSingleThreadExecutor();创建了一个单线程化的线程池。
然后创建了三个线程t1、t2和t3分别用于执行不同的任务。
接着通过executorService.submit(t1)将t1线程提交到线程池中进行执行。同样地也将t2和t3线程提交到线程池中。
最后通过executorService.shutdown()关闭线程池。
执行时可以观察到日志输出的顺序。由于线程池中只有一个线程所以任务会依次按照提交的顺序进行执行。
需要注意的是通过线程池执行任务后线程的名称不再是我们自定义的线程名称而是线程池的名称如pool-2-thread-1。这是因为具体的执行任务是交给线程池来管理和执行的。
从输出中我们可以看出该线程池有以下特点 单线程化的线程池中的任务都是按照提交的顺序来进行执行的。 该线程池中的唯一线程存活时间是无限的 当线程池中唯一的线程正在繁忙时新提交的任务会进入到其内部的阻塞队列中而且阻塞队列的容量是无限的 // 这是 newSingleThreadExecutor 一个无参的构造方法
public static ExecutorService newSingleThreadExecutor() {// 创建一个FinalizableDelegatedExecutorService实例该实例是ExecutorService接口的一个包装类// 将上面创建的ThreadPoolExecutor实例作为参数传入// 这样就得到了一个单线程化的线程池return new FinalizableDelegatedExecutorService( // 创建一个ThreadPoolExecutor实例指定参数如下// corePoolSize: 1线程池中核心线程的数量为1// maximumPoolSize: 1线程池中最大线程的数量为1// keepAliveTime: 0L空闲线程的存活时间为0毫秒即空闲线程立即被回收// unit: TimeUnit.MILLISECONDS存活时间的时间单位是毫秒// workQueue: new LinkedBlockingQueueRunnable()使用无界阻塞队列作为任务队列new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueueRunnable()));}// 其中 阻塞队列大小如果不传容量默认是整形的最大值
public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
}总体来说ewSingleThreadExecutor();适用于按照任务提交次序一个接一个的执行场景 newFixedThreadPool 创建固定数量的线程池 该方法用于创建一个固定数量的线程池其中唯一的参数是用于设置线程池中线程的数量 newFixedThreadPool是Executors类提供的一个静态方法用于创建一个固定大小的线程池。 方法具体如下 public static ExecutorService newFixedThreadPool(int nThreads)参数nThreads表示线程池中的线程数量即固定的线程数量。线程池中的线程数不会根据任务的多少进行动态调整即使有空闲线程也不会销毁除非调用了线程池的shutdown方法。 newFixedThreadPool方法返回一个ExecutorService对象它是Executor接口的子接口提供了更加丰富的任务提交和管理方法。 通过创建固定大小的线程池可以在任务并发量较高且预期的任务数量固定的情况下提供一定程度的线程复用和线程调度控制。线程池会根据固定的线程数量来创建对应数量的线程并将任务分配给这些线程进行执行。 线程池的工作原理如下 当有任务提交到线程池时线程池中的某个线程会被唤醒来执行任务。如果所有线程都在执行任务新的任务会被放入一个任务队列中等待执行。当任务队列已满时线程池会根据配置的拒绝策略来处理无法执行的任务。 需要注意的是由于线程池的大小是固定的如果任务数量超过线程池的容量任务会在任务队列中等待执行。这可能会导致任务等待时间增加或任务堆积进而影响系统的响应性能。因此在选择线程池大小时需要根据系统的负载情况和任务特点进行合理的配置。 下面我们通过代码来了解一下 newFixedThreadPool
Testpublic void test18() {ThreadPoolExecutor executor (ThreadPoolExecutor) Executors.newFixedThreadPool(2);executor.submit(new Runnable() {Overridepublic void run() {// 随机睡觉timeSleep();printPoolInfo(executor, *[耗时线程-1]);}});executor.submit(new Runnable() {Overridepublic void run() {// 随机睡觉timeSleep();printPoolInfo(executor, *[耗时线程-2]);}});executor.submit(new Runnable() {Overridepublic void run() {timeSleep();printPoolInfo(executor, *[耗时线程-3]);}});for (int i 0; i 5; i) {int finalI i;executor.submit(new Runnable() {Overridepublic void run() {printPoolInfo(executor, 普通线程);}});}// 优雅关闭线程池executor.shutdown();waitPoolExecutedEnd(executor);}// 这个方法用于等待线程全部执行结束public void waitPoolExecutedEnd(ThreadPoolExecutor executor) {// 确保主线程完全等待子线程执行完毕try {// 等待线程池中的任务执行完毕最多等待1天if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {logger.error(线程池中的任务还未全部执行完毕~);}} catch (InterruptedException e) {logger.error(等待线程池中的任务被中断~);}}这段代码创建了一个固定大小为2的线程池 executor并向其中提交了多个任务。其中
通过 Executors.newFixedThreadPool(2) 创建了一个固定大小为2的线程池。向线程池中提交了多个任务包括两个耗时任务和五个普通任务。每个耗时任务都会执行 timeSleep() 方法进行一段随机时间的睡眠然后执行 printPoolInfo() 方法输出当前线程池的信息。普通任务直接执行 printPoolInfo() 方法输出当前线程池的信息。在所有任务提交完毕后调用 executor.shutdown() 方法优雅地关闭线程池并调用 waitPoolExecutedEnd(executor) 方法等待线程池中的任务执行完毕。 根据输出的结果 我们可以观察到
当第一个耗时任务执行时线程池中有2个核心线程在工作。当第三个任务执行时线程池中的任务数量已经达到5个但是活跃线程数量仍然为2说明任务正在等待空闲线程来执行。所有任务执行完成后线程池中的线程数量仍然保持为2这是因为线程池是一个固定大小的线程池。在任务执行的过程中线程池的核心线程数量一直为2线程数量也一直保持在2个因为线程池的大小是固定的。
固定大小线程池适用于以下场景
资源受限场景当系统资源受限无法创建过多线程时固定大小线程池能够限制线程数量防止系统资源被耗尽。稳定的任务处理适用于稳定的任务处理场景例如批量数据处理、定时任务等因为固定大小线程池能够保持一定数量的线程长时间运行避免线程的频繁创建和销毁。控制并发数量在需要限制并发数量的场景下固定大小线程池能够控制同时执行的任务数量防止系统过载。避免资源竞争固定大小线程池可以避免多个任务争夺系统资源而导致的竞争和性能下降。稳定的性能预测在需要稳定的性能预测下固定大小线程池能够提供一致的性能表现因为线程数量是固定的可以更好地进行性能测试和预测。
newCachedThreadPool 创建可以缓存的线程池 newCachedThreadPool 用于创建一个可以缓存的线程池如果线程内的某些线程无事可干那么就会成为空线程可缓存线程池可以灵活回收这些空闲线程 newCachedThreadPool是Executors类提供的一个静态方法用于创建一个缓存型线程池。 方法定义如下 public static ExecutorService newCachedThreadPool()newCachedThreadPool方法返回一个ExecutorService对象它是Executor接口的子接口提供了更加丰富的任务提交和管理方法。 缓存型线程池会根据需要自动创建和回收线程线程池的大小可以根据任务的数量自动调整。如果当前没有可用的空闲线程会创建新的线程来执行任务如果有空闲线程并且它们在指定的时间内没有执行任务那么这些空闲线程将会被回收。 使用缓存型线程池的优点是可以根据任务的数量动态调整线程池的大小以适应不同的负载情况。当任务数量较少时线程池会减少线程的数量以节省资源当任务数量增加时线程池会增加线程的数量以提高并发性。 需要注意的是由于缓存型线程池的大小是不限制的它可能会创建大量的线程如果任务的提交速度超过了线程执行任务的速度可能会导致系统资源消耗过多甚至造成系统崩溃。因此在使用缓存型线程池时需要根据任务特点和系统资源情况进行合理的配置。 下面我们通过一个案例来了解一下newCachedThreadPool但是了解newCachedThreadPool之前我们先来熟悉一个阻塞队列这个会在后面的阻塞队列专题中详细介绍这里只是作为了解
SynchronousQueue
无内部存储容量与其他阻塞队列不同SynchronousQueue 不存储元素其容量为零。换句话说它是一个零容量的队列用于在线程之间同步传输数据。阻塞队列作为 BlockingQueue 接口的一个实现SynchronousQueue 提供了阻塞操作允许线程在队列的插入和移除操作上进行阻塞等待。匹配插入和移除操作在 SynchronousQueue 中每个插入操作必须等待另一个线程的移除操作反之亦然。换句话说发送线程必须等待接收线程而接收线程也必须等待发送线程才能够完成操作。这样的特性保证了数据的可靠传输只有在有线程与之匹配时才会进行数据传输。不支持 peek 操作由于 SynchronousQueue 内部没有存储元素因此不能调用 peek 操作。只有在移除元素时才会有元素可供操作。支持公平和非公平模式SynchronousQueue 可以在构造时指定为公平或非公平模式。在公平模式下队列会按照线程的到达顺序进行操作而在非公平模式下则不保证操作的顺序。
SynchronousQueue 在多线程并发编程中常用于一些特定场景例如生产者-消费者模式中用于传输数据的场景以及一些任务执行器中用于任务的传递等。其特殊的同步机制保证了线程之间数据的可靠传输和同步操作。
基于SynchronousQueue 一个小案例 /*** 理解SynchronousQueue* SynchronousQueue实际上它不是一个真正的队列因为SynchronousQueue没有容量。与其他BlockingQueue阻塞队列不同,* SynchronousQueue是一个不存储元素的BlockingQueue。只是它维护一组线程这些线程在等待着把元素加入或移出队列。* 我们简单分为以下几种特点* 内部没有存储容量为0* 阻塞队列也是blockingqueue的一个实现* 发送或者消费线程会阻塞只有有一对消费和发送线程匹配上才同时退出。* (其中每个插入操作必须等待另一个线程的移除操作同样任何一个移除操作都等待另一个线程的插入操作。因此此队列内部其 实没有任何一个元素因此不能调用peek操作因为只有移除元素时才有元素。)* 配对有公平模式和非公平模式(默认)*/Testpublic void test19() throws InterruptedException {SynchronousQueueString queue new SynchronousQueue();// 我们通过线程内 入队 和 出队 了解下 SynchronousQueue的特性new Thread(new Runnable() {Overridepublic void run() {try {// 入队logger.error(---- 喜羊羊进锅沐浴~ ----);queue.put(喜羊羊);logger.error(---- 懒羊羊进锅沐浴~ ----);queue.put(懒羊羊);logger.error(---- 美羊羊进锅沐浴~ ----);queue.put(美羊羊);} catch (InterruptedException e) {throw new RuntimeException(e);}}}, t1).start();new Thread(new Runnable() {Overridepublic void run() {try {// 入队Thread.sleep(5000);String node1 queue.take();logger.error(---- {} 出锅~ ----, node1);Thread.sleep(10000);String node2 queue.take();logger.error(---- {} 出锅~ ----, node2);Thread.sleep(5000);String node3 queue.take();logger.error(---- {} 出锅~ ----, node3);} catch (InterruptedException e) {throw new RuntimeException(e);}}}, t2).start();Thread.sleep(Integer.MAX_VALUE);}// 根据 结果可以发现 三只羊同时进锅但是一个锅只能容纳一只羊所以只有一只羊能进锅其他的羊只能等待直到锅里的羊出锅才能进锅// 也就是说SynchronousQueue是一个不存储元素的BlockingQueue。只是它维护一组线程这些线程在等待着把元素加入或移出队列。// 喜羊羊进锅然后等待 5s后 喜羊羊出锅此时美羊羊开始进锅了解这个阻塞队列后我们再来了解一下newCachedThreadPool这个线程池还是通过一个案例来进行了解一下具体用法 /*** newCachedThreadPool 创建可以缓存的线程池*/Testpublic void test17() {// 创建可以缓存的线程池// 创建一个可缓存线程池如果线程池长度超过处理需要可灵活回收空闲线程若无可回收则新建线程。// keepAliveTime为60S意味着线程空闲时间超过60S就会被杀死60LThreadPoolExecutor threadPoolExecutor (ThreadPoolExecutor) Executors.newCachedThreadPool();// 这里使用两个线程异步执行// 此处 羊羊线程是不休眠的直接放入线程池new Thread(() - {for (int i 0; i 2; i) {int finalI i;threadPoolExecutor.submit(() - {printPoolInfo(threadPoolExecutor, 羊羊线程 finalI -正在执行);});}}, 任务线程-1).start();// 狼狼线程 此时第一个线程也是不会进行阻塞此时应该是 三个线程 两个羊羊一个狼狼 同时进线程池new Thread(() - {for (int i 0; i 5; i) {if (i 1 || i 2) {try {Thread.sleep(TimeUnit.SECONDS.toMillis(5));} catch (InterruptedException e) {throw new RuntimeException(e);}}int finalI i;threadPoolExecutor.submit(() - {printPoolInfo(threadPoolExecutor, 狼狼线程 finalI -正在执行);});}}, 任务线程-2).start();// 等待全部线程都执行完毕waitPoolExecutedEnd(threadPoolExecutor);threadPoolExecutor.shutdown();}
这段代码的主要逻辑如下
使用 Executors.newCachedThreadPool() 创建了一个可缓存的线程池 threadPoolExecutor。这种线程池的特点是如果线程池长度超过当前任务需求它会灵活地回收空闲线程若没有可回收的线程则会新建线程来处理任务。线程的空闲时间超过60秒后就会被回收。创建了两个新的线程分别用于提交任务到线程池中执行。其中一个线程负责提交羊羊线程另一个线程负责提交狼狼线程。羊羊线程的任务不会进行阻塞直接提交到线程池中执行。狼狼线程的任务中当 i 等于1或2时会进行5秒的睡眠模拟任务的耗时操作然后再提交到线程池中执行。使用 waitPoolExecutedEnd(threadPoolExecutor) 方法等待线程池中的任务执行完毕。在所有任务执行完毕后调用 threadPoolExecutor.shutdown() 方法关闭线程池。 通过结果可以看到每个任务的执行都是间隔5秒执行一次。线程池信息中的线程数量始终为3这是因为 Executors.newCachedThreadPool() 创建的是一个可缓存的线程池其最大线程数量为 Integer.MAX_VALUE因此当任务提交到线程池时如果没有空闲线程可用则会新建线程来处理任务线程数量会一直增加直到达到设定的最大值。在任务提交时活跃线程数量始终为3这是因为每次提交任务时都有空闲线程可用所以不需要新建线程而是直接使用已存在的线程执行任务。狼狼线程的任务会进行5秒的睡眠操作模拟耗时操作因此在执行任务期间线程池中的活跃线程数量会减少直到任务执行完毕后线程池会继续维持3个活跃线程数量。在执行完所有任务后线程池并不会立即关闭因为线程池是可缓存的会等待一段时间后空闲线程自动被回收。
应用场景
短期任务处理适用于处理大量短期任务的场景因为它能够根据需要动态地创建线程处理任务处理完毕后又自动回收线程避免了线程过多占用资源的问题。任务处理时间不确定适用于任务处理时间不确定的场景因为它能够根据实际情况动态调整线程数量保证任务能够及时得到处理提高系统的响应速度。需要快速响应的任务适用于需要快速响应的任务因为它能够快速地创建线程来处理任务缩短任务等待的时间提高任务的处理效率。任务负载波动大适用于任务负载波动大的场景因为它能够根据负载情况动态调整线程数量使系统能够更好地适应负载的变化。
缺点
线程数量不受限制由于 newCachedThreadPool 的最大线程数量为 Integer.MAX_VALUE因此在大量任务提交的情况下可能会导致线程数量过多占用大量系统资源导致系统负载过高甚至引发系统崩溃。不适用于长时间任务由于它的线程数量不受限制适用于处理短期任务但不适用于长时间任务因为长时间任务可能会导致线程数量过多占用大量系统资源。可能导致频繁创建和销毁线程由于 newCachedThreadPool 是一个动态的线程池类型可能会频繁地创建和销毁线程这种线程的创建和销毁操作会带来一定的性能开销。
综上所述newCachedThreadPool 适用于任务处理时间不确定、负载波动大、需要快速响应的场景但在大量长时间任务的情况下需要慎重选择以避免占用过多系统资源。
newScheduledThreadPool 创建可调度的线程池 项目中经常会遇到一些非分布式的调度任务需要在未来的某个时刻周期性执行。实现这样的功能我们有多种方式可以选择 Timer类 jdk1.3引入不推荐 它所有任务都是串行执行的同一时间只能有一个任务在执行而且前一个任务的延迟或异常都将会影响到之后的任务。 Spring的Scheduled注解不是很推荐 这种方式底层虽然是用线程池实现但是有个最大的问题所有的任务都使用的同一个线程池可能会导致长周期的任务运行影响短周期任务运行造成线程池饥饿更加推荐的做法是同种类型的任务使用同一个线程池。 自定义ScheduledThreadPoolExecutor实现调度任务 这也是下面重点讲解的方式通过自定义ScheduledThreadPoolExecutor调度线程池提交调度任务才是最优解。 newScheduledThreadPool 用于创建一个可调度的线程newScheduledThreadPool是Executors类提供的一个静态方法 方法如下 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)newScheduledThreadPool方法返回一个ScheduledExecutorService对象它是ExecutorService接口的子接口提供了任务调度的能力。 可调度的线程池可以用于延时执行任务和周期性执行任务。它可以根据需要自动创建和回收线程并且可以在指定的延时时间后执行任务或在指定的时间间隔内重复执行任务。 使用newScheduledThreadPool创建的可调度线程池有以下特点 corePoolSize参数指定了线程池的核心线程数即线程池中同时执行任务的最大线程数。当任务的延时时间到达时线程池会创建新的线程来执行任务。如果线程池中的线程数量超过核心线程数空闲的线程会在指定的时间内被回收。可以使用schedule方法来延时执行任务也可以使用scheduleAtFixedRate方法或scheduleWithFixedDelay方法来周期性执行任务。 使用示例 ScheduledExecutorService executor Executors.newScheduledThreadPool(3);
executor.schedule(() - {// 延时执行的任务逻辑
}, 5, TimeUnit.SECONDS);executor.scheduleAtFixedRate(() - {// 周期性执行的任务逻辑
}, 1, 3, TimeUnit.SECONDS);executor.scheduleWithFixedDelay(() - {// 周期性执行的任务逻辑
}, 2, 4, TimeUnit.SECONDS);在示例中schedule方法用于延时执行任务它接受一个任务和延时时间表示在指定的延时时间后执行任务。 scheduleAtFixedRate方法用于周期性执行任务它接受一个任务、初始延时时间和周期时间表示在初始延时时间后开始执行任务并以指定的周期时间重复执行任务。 scheduleWithFixedDelay方法也用于周期性执行任务它接受一个任务、初始延时时间和周期时间表示在初始延时时间后开始执行任务并在任务执行完成后等待指定的周期时间然后再执行下一个任务。 总之newScheduledThreadPool方法用于创建一个可调度的线程池可以用于延时执行任务和周期性执行任务。通过合理配置延时时间和周期时间可以满足不同场景下的任务调度需求。 下面我们通过一个案例来了解如何创建延时线程 和 定时线程
Testpublic void test20() {// 使用Executors.newScheduledThreadPool 创建线程池ScheduledThreadPoolExecutor scheduledThreadPool (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(5, new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {// 对线程名称进行自定义return new Thread(r, my-scheduled-job- r.hashCode());}});// 准备工作DelayedThread delayedThread new DelayedThread();LoopThread loopThread new LoopThread();// 执行延时线程 (延时 10s开始执行 )logger.error(延时线程工作准备结束);scheduledThreadPool.schedule(delayedThread, 10, TimeUnit.SECONDS);// 执行循环线程// scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)// command: 执行的任务// initialDelay: 初始延迟的时间// delay: 上次执行结束延迟多久执行// unit单位logger.error(循环线程工作准备结束);scheduledThreadPool.scheduleAtFixedRate(loopThread, 10, 5, TimeUnit.SECONDS);waitPoolExecutedEnd(scheduledThreadPool);scheduledThreadPool.shutdown();}static class DelayedThread implements Runnable {Overridepublic void run() {logger.error(延时线程正在开始执行~~);}}static class LoopThread implements Runnable {Overridepublic void run() {logger.error(循环线程开始执行~~);}}首先通过Executors.newScheduledThreadPool方法创建了一个可调度的线程池ScheduledThreadPoolExecutor并指定了线程池的核心线程数为5。同时通过自定义的ThreadFactory来创建线程并给线程指定了自定义的名称。
接下来定义了两个任务类DelayedThread和LoopThread分别实现Runnable接口。
在测试方法中首先创建了DelayedThread和LoopThread的实例。
然后在测试方法中首先创建了DelayedThread和LoopThread的实例。
然后通过调用scheduledThreadPool.schedule方法将DelayedThread任务提交给线程池并指定延时时间为10秒。这意味着DelayedThread任务将在10秒后执行。
接着通过调用scheduledThreadPool.scheduleAtFixedRate方法将LoopThread任务提交给线程池并指定初始延时时间为10秒、周期时间为5秒。这意味着LoopThread任务将在初始延时时间后开始执行并且每隔5秒重复执行一次。
最后调用waitPoolExecutedEnd方法等待线程池中的任务执行完毕并调用线程池的shutdown方法关闭线程池。
总结起来这段代码演示了使用Executors.newScheduledThreadPool创建可调度的线程池并展示了延时执行和周期性执行的例子。通过合理配置延时时间和周期时间可以实现在指定的时间点或时间间隔内执行任务。
newWorkStealingPool 创建一个可窃取任务的线程池 newWorkStealingPool(int parallelism)方法用于创建一个工作窃取线程池。工作窃取线程池是一种特殊的线程池它根据一定的调度策略执行任务。除了执行任务外工作窃取线程池还可以按照指定的延迟时间或周期性地执行任务。 工作窃取线程池最早由美国计算机科学家 Charles E. Leiserson 和 John C. Bains 发明他们在 1994 年的论文《Scheduling Multithreaded Computations by Work Stealing》中首次提出了这一概念。 工作窃取线程池的设计初衷是为了解决并行计算中独立任务的负载均衡问题。在并行计算中通常存在大量的独立任务需要并行执行而这些独立任务的执行时间往往不一致。如果简单地将任务平均分配给每个线程那些执行时间较短的任务将会导致线程空闲而执行时间较长的任务则可能导致线程被阻塞从而降低整体的执行效率。 为了解决这个问题工作窃取线程池引入了工作窃取算法。该算法允许空闲线程从其他线程的任务队列末尾窃取任务来执行以实现负载均衡。每个线程都维护一个自己的任务队列当线程自己的任务执行完毕后它会尝试从其他线程的任务队列末尾窃取任务执行。这样任务的分配和执行可以更加均衡避免线程之间出现明显的负载不均衡。 我们了解 newWorkStealingPool 先来了解一下Fork/Join
Fork/Join框架是Java提供的一种并行执行任务的框架它基于工作窃取算法实现任务的自动调度和负载均衡。在Fork/Join框架中工作窃取线程池是其中的核心组件。
Fork/Join框架的工作原理如下
每个任务被划分为更小的子任务这个过程通常被称为fork。当一个线程执行fork操作时它会将子任务放入自己的工作队列中。当一个线程完成自己的任务后它会从其他线程的工作队列中steal窃取任务来执行。窃取的任务通常是其他线程工作队列的末尾的任务这样可以减少线程之间的竞争。
工作窃取线程池在Fork/Join框架中的应用主要体现在以下几个方面
任务分割 在Fork/Join框架中任务被递归地分割成更小的子任务直到达到某个终止条件。工作窃取线程池中的线程负责执行这些任务。每个线程都有自己的任务队列当一个线程执行完自己的任务后会从自己的队列中获取新的任务来执行。负载均衡 工作窃取线程池通过工作窃取算法实现负载均衡。当一个线程的任务队列为空时它会从其他线程的任务队列中窃取任务来执行以保持各个线程的工作量相对均衡。这种负载均衡策略可以避免线程之间出现明显的负载不均衡提高整体的执行效率。递归任务执行 Fork/Join框架中的任务通常是递归执行的。当一个任务被分割成多个子任务时每个子任务会被提交到工作窃取线程池中执行。如果子任务还可以进一步分割线程会继续执行这个过程直到任务不能再分割为止。这种递归的任务执行方式能够充分利用线程池中的线程资源提高并行任务的执行效率。Join操作 在Fork/Join框架中一个任务可以等待其子任务执行完成后再继续执行这个操作被称为join。工作窃取线程池在执行任务时会自动进行join操作确保任务的执行顺序满足依赖关系。这样可以避免线程之间的竞争和冲突保证任务的正确性。
总的来说工作窃取线程池在Fork/Join框架中扮演着重要的角色。它通过工作窃取算法和负载均衡策略实现了并行任务的自动调度和执行。通过递归任务执行和join操作工作窃取线程池能够高效地处理大量的并行任务并充分利用系统的并行计算能力。
newWorkStealingPool 是一个创建工作窃取线程池的方法它使用了ForkJoinPool并根据CPU核心数动态调整线程数量。这种线程池适用于CPU密集型的任务。
与其他四种线程池不同newWorkStealingPool 使用了ForkJoinPool。它的优势在于将一个任务拆分成多个小任务并将这些小任务分发给多个线程并行执行。当所有小任务都执行完成后再将它们的结果合并。
相较于之前的线程池newWorkStealingPool 中的每个线程都拥有自己的任务队列而不是多个线程共享一个阻塞队列。
当一个线程发现自己的任务队列为空时它会去其他线程的队列中窃取任务来执行。可以将这个过程简单理解为窃取。为了降低冲突一般情况下自己的本地队列采用后进先出LIFO的顺序而窃取时则采用先进先出FIFO的顺序。由于窃取的动作非常快速这种冲突会大大降低从而提高了性能。这也是一种优化方式。
下面我们通过一个案例 来了解一下 newWorkStealingPool Testpublic void test21() {// 返回可用的计算资源int core Runtime.getRuntime().availableProcessors();logger.error(cpu 可以计算机资源 {}, core);// 无参数的话会根据cpu当前核心数据 动态分配// ExecutorService executorService Executors.newWorkStealingPool();// 当传入参数就可以指定cpu的一个并行数量ForkJoinPool forkJoinPool (ForkJoinPool) Executors.newWorkStealingPool(8);for (int i 1; i core * 2; i) {WorkStealThread workStealThread new WorkStealThread(i, forkJoinPool);forkJoinPool.submit(workStealThread);}// 优雅关闭forkJoinPool.shutdown();// 为了防止 主线程 没有完全执行结束try {Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {throw new RuntimeException(e);}}static class WorkStealThread implements Runnable {private int i;private ForkJoinPool forkJoinPool;WorkStealThread() {}WorkStealThread(int i, ForkJoinPool forkJoinPool) {this.i i;this.forkJoinPool forkJoinPool;}Overridepublic void run() {try {// 随机休眠Thread.sleep(TimeUnit.SECONDS.toMillis(new Random().nextInt(10)));} catch (InterruptedException e) {throw new RuntimeException(e);}logger.error(工作窃取线程-{},线程池的大小[{}],活动线程数[{}],总任务窃取次数[{}],i, forkJoinPool.getPoolSize(), forkJoinPool.getActiveThreadCount(), forkJoinPool.getStealCount());}}
当代码运行时首先通过Runtime.getRuntime().availableProcessors()获取可用的计算资源即CPU核心数并将其记录在日志中。
接下来使用Executors.newWorkStealingPool(8)创建一个工作窃取线程池其中参数8表示线程池的并行度即同时执行的线程数。这个线程池是基于ForkJoinPool实现的具有任务窃取的特性。
然后通过一个循环将一些工作窃取线程提交到线程池中进行并行执行。每个工作窃取线程都有一个编号从1到核心数的两倍。这些线程使用WorkStealThread类实现了Runnable接口。
WorkStealThread类中的run方法定义了线程的执行逻辑。首先线程会随机休眠一段时间模拟执行一些耗时的任务。然后它会输出一些关于线程池状态的信息包括线程池的大小、活动线程数和总任务窃取次数。这些信息会记录在日志中。
最后线程池会被优雅地关闭确保所有任务都能执行完毕。为了防止主线程提前结束使用Thread.sleep(Integer.MAX_VALUE)使主线程休眠直到被中断或抛出异常。
通过以上代码我们可以了解到如何使用newWorkStealingPool方法创建工作窃取线程池并通过工作窃取线程实现并行执行。同时通过获取线程池的状态信息我们可以了解线程池的工作情况包括活动线程数和任务窃取次数。
通过观察结果我们可以发现当前线程执行完毕如果空闲的话会去执行别的线程任务