山东省聊城建设学校网站,河南5G网站基站建设信息,如何计算网站pv,做电商是什么工作一、AbstractExecutorService简介AbstractExecutorService是一个抽象类#xff0c;实现了ExecutorService接口#xff0c;提供了线程池的基本实现。它是Java Executor框架的核心类#xff0c;提供了线程池的基本操作#xff0c;如提交任务、管理线程池、执行任务等。自定义…一、AbstractExecutorService简介 AbstractExecutorService是一个抽象类实现了ExecutorService接口提供了线程池的基本实现。它是Java Executor框架的核心类提供了线程池的基本操作如提交任务、管理线程池、执行任务等。 自定义执行器服务可用扩展AbreactExecutorService 并覆盖其方法以提供自己的实现。这使开发人员可以创建适合其特定需求的执行器服务。二、AbstractExecutorService如何使用 AbstractExecutorService是一个抽象类不能直接实例化需要通过继承它的子类ThreadPoolExecutor或ScheduledThreadPoolExecutor来使用。使用AbstractExecutorService创建线程池的步骤如下2.1 没有返回值// 1、创建ThreadPoolExecutor 创建一个固定大小的线程池核心线程数为1最大线程数为5任务队列大小为10
ExecutorService executorService new ThreadPoolExecutor(1, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(10));
// 2、提交任务
executorService.submit(new Runnable() {Overridepublic void run() {//执行任务}
});
// 3、关闭线程池
executorService.shutdown();2.2 有返回值// 1、创建ThreadPoolExecutor 创建一个固定大小的线程池核心线程数为1最大线程数为5任务队列大小为10
ExecutorService executorService new ThreadPoolExecutor(1, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(10));
// 2、提交一个Callable任务
FutureString future executorService.submit(new CallableString() {Overridepublic String call() throws Exception {//执行任务返回结果return hello ExecutorService;}
});
// 3、获取值
String result future.get();
// 4、关闭线程池
executorService.shutdown(); 总之使用AbstractExecutorService创建线程池非常简单只需要创建线程池对象、提交任务、关闭线程池即可。同时AbstractExecutorService也提供了一些高级功能如异步任务、定时任务等可以根据具体需求选择使用。三、如何自定义AbstractExecutorService要自定义一个延迟AbstractExecutorService以下步骤继承AbstractExecutorService类实现 submit 方法 覆盖 submit 方法将任务添加到队列中其中在任务执行前应用延迟。实现 execute 方法 覆盖 execute 方法调用 submit 方法并返回 Future 对象等待任务完成。实现shutdown 方法 覆盖 shutdown 方法停止所有任务并等待它们完成。实现 DelayedTask 类 创建一个 DelayedTask 类用于封装任务和延迟时间。实现 RunnableTask 和 CallableTask 类 创建 RunnableTask 和 CallableTask 类用于封装任务。public class CustomDelayedExecutorService extends AbstractExecutorService {private final DelayQueueDelayedTask? queue new DelayQueue();Overridepublic T FutureT submit(CallableT task) {return schedule(new CallableTask(task));}Overridepublic Future? submit(Runnable task) {return schedule(new RunnableTask(task));}private T FutureT schedule(DelayedTaskT task) {queue.offer(task);return task;}Overridepublic void execute(Runnable command) {submit(command);}Overridepublic void shutdown() {for (DelayedTask? task : queue) {task.cancel(false);}}Overridepublic ListRunnable shutdownNow() {ListRunnable tasks new ArrayList();for (DelayedTask? task : queue) {if (task.cancel()) {tasks.add(task.getTask());}}return tasks;}Overridepublic boolean isShutdown() {return false;}Overridepublic boolean isTerminated() {return false;}Overridepublic boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {return false;}
}
class DelayedTaskT implements RunnableFutureT, Delayed {private final long delay;private final long expire;private final RunnableFutureT task;public DelayedTask(RunnableFutureT task, long delay) {this.task task;this.delay delay;this.expire System.nanoTime() TimeUnit.NANOSECONDS.convert(delay, TimeUnit.MILLISECONDS);}Overridepublic long getDelay(TimeUnit unit) {return unit.convert(expire - System.nanoTime(), TimeUnit.NANOSECONDS);}Overridepublic int compareTo(Delayed o) {return Long.compare(expire, ((DelayedTask?) o).expire);}Overridepublic void run() {if (!isCancelled()) {task.run();}}Overridepublic boolean cancel(boolean mayInterruptIfRunning) {return task.cancel(mayInterruptIfRunning);}Overridepublic boolean isCancelled() {return task.isCancelled();}Overridepublic boolean isDone() {return task.isDone();}Overridepublic T get() throws InterruptedException, ExecutionException {return task.get();}Overridepublic T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {return task.get(timeout, unit);}public RunnableFutureT getTask() {return task;}
}
class RunnableTask implements RunnableFutureVoid {private final Runnable task;public RunnableTask(Runnable task) {this.task task;}Overridepublic void run() {task.run();}Overridepublic boolean cancel(boolean mayInterruptIfRunning) {return false;}Overridepublic boolean isCancelled() {return false;}Overridepublic boolean isDone() {return true;}Overridepublic Void get() throws InterruptedException, ExecutionException {return null;}Overridepublic Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {return null;}
}
class CallableTaskT extends FutureTaskT {public CallableTask(CallableT callable) {super(callable);}
}四、AbstractExecutorService源码分析4.1 AbstractExecutorService dragrams4.2 方法分析4.2.1 newTaskFor(Runnable runnable, T value) 方法 AbstractExecutorService中的newTaskFor方法用于创建一个RunnableFuture对象其中RunnableFuture是Future和Runnable接口的结合体可以用来表示异步执行的结果。该方法的参数包括一个Runnable对象和一个泛型T的值。通过传入的Runnable对象和T类型的值创建了一个FutureTask对象并将其返回。 FutureTask是RunnableFuture的一个具体实现它表示一个可取消的异步计算可以通过调用get方法获取计算结果。在创建FutureTask对象时需要传入一个Callable对象或Runnable对象用于执行异步计算。而在这里我们传入了一个Runnable对象和一个T类型的值表示该Runnable对象的执行结果为T类型的值。protected T RunnableFutureT newTaskFor(Runnable runnable, T value) {// 将runnable包装成FutureTask类型return new FutureTaskT(runnable, value);}4.2.2 submit(Runnable task)方法 AbstractExecutorService中的submit方法是ExecutorService接口中的一个方法用于提交一个Runnable任务并返回一个表示该任务异步执行结果的Future对象。如果传入的task为null则抛出NullPointerException异常;调用newTaskFor(Runnable runnable, T value)方法创建一个RunnableFuture对象ftask其中RunnableFuture是Future和Runnable接口的结合体用于表示异步执行的结果。在该方法中我们将泛型T的值设置为null表示该Runnable任务没有返回值;通过调用execute(Runnable command)方法提交任务进行执行该方法是一个抽象方法需要由子类实现。在该方法中我们将创建的ftask对象作为参数传入表示需要执行的任务为ftask对象。最后将ftask对象返回作为Future对象用于获取任务执行结果或取消任务。public Future? submit(Runnable task) {// 如果任务为null,则抛出nullPointerExceptionif (task null) throw new NullPointerException();// 将task包装成RunnableFutureRunnableFutureVoid ftask newTaskFor(task, null);// 通过调用execute(Runnable command)方法提交任务进行执行execute(ftask);// 返回RunnableFuturereturn ftask;}4.2.3 doInvokeAny(Collection? extends CallableT tasks,boolean timed, long nanos)方法 AbstractExecutorService中doInvokeAny方法用于执行一个任务集合中的所有任务并返回第一个成功执行的任务的结果如果所有任务都执行失败则抛出ExecutionException异常。其中方法参数tasks为任务集合timed和nanos用于指定超时时间如果超时则抛出TimeoutException异常。如果任务集合是否为空则抛出NullPointerException异常如果任务集合的大小为0则抛出IllegalArgumentException异常futures用于存储所有任务的Future对象, ecs对象用于异步执行任务集合中的任务。从迭代器中获取一个任务xecutorCompletionService并将该任务的执行结果的Future添加到futures列表中死循环不断从ExecutorCompletionService中获取已完成的任务的结果Future直到有一个任务完成为止具体细流程如下调用ExecutorCompletionService的poll()方法获取一个已完成的任务的结果Future如果poll()返回null表示当前没有已完成的任务。当前没有已完成的任务还有未执行的任务ntasks 0则从任务集合中获取下一个任务并将其提交给ExecutorCompletionService执行。提交成功后活跃线程数active加一没有执行的任务并且活跃线程数为0即所有任务都已执行完毕则退出循环指定超时时间timed为true则调用poll(nanos, TimeUnit.NANOSECONDS)方法获取已完成的任务的结果Future并等待最多nanos纳秒的时间。如果超时则抛出TimeoutException没有指定超时时间则调用take()方法阻塞等待已完成的任务的结果Future。如果获取到了已完成的任务的结果Future则将活跃线程数active减一并尝试获取该任务的执行结果。如果获取结果时出现异常则将异常保存在ExecutionException中继续下一轮循环如果成功获取到了一个任务的执行结果则直接返回该结果。如果所有任务都已经完成但是没有找到任何一个成功的任务则抛出ExecutionException异常。最后取消所有尚未完成的任务以便节省资源并提高效率privateT T doInvokeAny(Collection? extends CallableT tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {// 如果任务集合是否为空则抛出NullPointerException异常if (tasks null)throw new NullPointerException();int ntasks tasks.size();// 如果任务集合的大小为0则抛出IllegalArgumentException异常if (ntasks 0)throw new IllegalArgumentException();// futures用于存储所有任务的Future对象 ArrayListFutureT futures new ArrayListFutureT(ntasks);ExecutorCompletionServiceT ecs new ExecutorCompletionServiceT(this);try {ExecutionException ee null;// 计算deadline时间final long deadline timed ? System.nanoTime() nanos : 0L;Iterator? extends CallableT it tasks.iterator();// 从迭代器中获取一个任务xecutorCompletionService并将该任务的执行结果的Future添加到futures列表中。futures.add(ecs.submit(it.next()));// 任务数减1--ntasks;// 退出循环标识也就是调用ecs.sumbit的任务数int active 1;// 自旋、死循环for (;;) {// 获取Future对象FutureT f ecs.poll();// 判断future 任务是否已完成if (f null) {// 任务没有完成继续判断是否还有未执行的任务if (ntasks 0) {// 任务s数减1--ntasks;// 再从tasks中获取一个任务提交到ExecutorCompletionService中进行执行。futures.add(ecs.submit(it.next()));// 提交ecs中加1active;}else if (active 0)break;// 有超时时间限制 else if (timed) {//调用ExecutorCompletionService的poll(long timeout, TimeUnit unit)方法等待指定的超时时间。f ecs.poll(nanos, TimeUnit.NANOSECONDS);// 在等待过程中已经超过了指定的超时时间因此会抛出 TimeoutException 异常if (f null)throw new TimeoutException();// 将剩余的超时时间重新计算并继续执行后续的代码。nanos deadline - System.nanoTime();}// 如果没有超时时间限制则调用ExecutorCompletionService的take()方法一直等待直到有任务执行完成。elsef ecs.take(); }// 任务已完成获取到Future对象if (f ! null) {--active;try {// 尝试获取Future对象的执行结果return f.get();} catch (ExecutionException eex) {// 如果获取失败则将异常保存在ee变量中继续等待下一个任务的执行结果ee eex;} catch (RuntimeException rex) {ee new ExecutionException(rex);}}}// 如果所有任务都已经完成但是没有找到任何一个成功的任务则抛出ExecutionException异常。if (ee null)ee new ExecutionException();throw ee;} finally {// 取消所有尚未完成的任务以便节省资源并提高效率for (int i 0, size futures.size(); i size; i)futures.get(i).cancel(true);}}4.2.4 invokeAll(Collection? extends CallableT tasks)方法 AbstractExecutorService中的invokeAll方法它的作用是在执行给定的任务集合tasks中的所有任务并等待所有任务完成后返回一个包含Future对象的列表Future对象可以用来获取每个任务的执行结果。如果其中某个任务抛出异常则该异常将传播到调用者。判断任务是否为空如果空则抛出NullPointerException异常遍历参数tasks中的每个Callable任务将每个任务转换成一个RunnableFuture对象并把任务添加到futures中然后调用execute方法执行任务。循环遍历futures中的每个Future对象如果某个Future对象的任务还没有完成则调用get()方法等待任务完成如果任务抛出异常则忽略异常。如果所有的任务都成功完成则将done标记为true并返回futures否则取消所有未完成的任务并抛出InterruptedException异常。public T ListFutureT invokeAll(Collection? extends CallableT tasks)throws InterruptedException {// 判断任务是否为空如果空则抛出NullPointerException异常if (tasks null)throw new NullPointerException();ArrayListFutureT futures new ArrayListFutureT(tasks.size());boolean done false;try {// 遍历参数tasks中的每个Callable任务for (CallableT t : tasks) {// 将任务转换成一个RunnableFuture对象RunnableFutureT f newTaskFor(t);futures.add(f);// 调用execute方法执行任务execute(f);}// 循环遍历futures中的每个Future对象for (int i 0, size futures.size(); i size; i) {FutureT f futures.get(i);// 如果某个Future对象的任务还没有完成if (!f.isDone()) {try {// 调用get()方法等待任务完成,如果任务抛出异常则忽略异常。f.get();} catch (CancellationException ignore) {} catch (ExecutionException ignore) {}}}// 如果所有的任务都成功完成则将done标记为true并返回futuresdone true;return futures;} finally {if (!done)// 如果所有的任务没有全部成功完成,取消所有未完成的任务for (int i 0, size futures.size(); i size; i)futures.get(i).cancel(true);}
}