理解 Java Executor 框架

概述

Executor 整体框架如下: Executor-Arch

Executor

Executor 将任务的提交与执行分离,Executor 使用者只需要调用 execute 提交任务,而不用关心任务是如何被执行。execute 是 Executor 接口的唯一方法,定义如下:

void execute(Runnable command)

execute 将执行提交的 Runnable,具体执行细节由接口实现者提供。Executor 并不严格要求异步执行任务。在最简单的情况下,一个 Executor 可以立即在调用者线程执行 Runnable 任务:

class DirectExecutor implements Executor {
    public void execute(Runnable r) {
      r.run();
    }
}

ExecutorService

ExecutorService 接口继承于 Executor,相比于 Executor,ExecutorService 通常会异步执行任务,除了 execute 还提供 submit,invokeAll 和 invokeAny 方法提交任务。

submit 方法接受 Runnable 或者 Callable 任务:

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

两者区别在于:

  • Runnable 不可以有返回值,也不可以抛出 checked 异常
  • Callable 可以有返回值,也可以抛出 checked 异常

submit 将返回一个 Future 对象。一个 Future 对象表示一次异步计算,子类有 FutureTask。可以调用 Future.get 获得异步计算结果,如果计算还未完成,get 将阻塞当前线程。此外,还可以调用 Future.cancel 取消任务的执行。

invokeAll 接受一个任务的集合,并返回一个 Future 列表:

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

注意,只有当所有的任务完成了 invokeAll 才返回,并且一个任务完成可能因为是正常执行,也有可能抛出了异常。

此外,invokeAll 也可以指定超时时间。当发生超时时,会取消还未执行完的任务并返回:

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

invokeAny 类似于 invokeAll,但是如果集合中任意一个任务完成,那么 invokeAny 就会取消剩余未完成的任务并返回:

<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

类似地,invokeAny 也可以指定超时时间。

除此之外,ExecutorService 还提供方法管理和监控状态,一个 ExecutorService 有三个状态:运行、关闭和终止。处于关闭状态的 ExecutorService 将不再接受新任务;处于关闭状态的 ExecutorService 执行完所有任务将进入终止状态。

可以调用 shutdown 关闭 ExecutorService ,方法定义如下:

void shutdown();

shutdown 不影响原先提交的任务,但是会关闭 ExecutorService 使其不再接受新任务。如果向关闭后的 ExecutorService 提交任务(通过 submit,invokeAll 或者 invokeAny),将抛出 RejectedExecutionException 异常。

在 ExecutorService 被关闭后,可以调用 awaitTermination 指定等待任务完成时间:

boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

awaitTermination 会阻塞当前线程(即调用 awaitTermination 方法的线程)直到:

  • 所有任务都完成,返回 true
  • 出现超时,返回 false
  • 当前线程被中断,抛出 InterruptedException 异常 通常,awaitTermination 会与 shutdown 一起配合使用。

shutdownNow 不仅会使得 ExecutorService 不再接受新任务,还会尝试停止当前正在执行的任务,并返回正在等待执行的任务:

void shutdownNow();

shutdownNow 不能保证一定能终止当前所有任务的执行。因为 shutdownNow 通过中断任务执行线程来尝试终止任务,如果任务忽略了 InterruptedException 异常,那么这时就无法终止该任务。如果所有的任务都忽略了 InterruptedException 异常,那么这时 shutdownNow 效果等价于 shutdown。

可以调用 isShutdown 判断 ExecutorService 是否被关闭,如果是,那么方法返回 true:

boolean isShutdown();

在 ExecutorService 被关闭后,如果任务都完成了,那么 isTerminated 将返回 true:

boolean isTerminated();

AbstractExecutorService

AbstractExecutorService 为 ExecutorService 接口提供了默认实现,实现了 submit,invokeAny 和 invokeAll 方法,声明如下:

public abstract class AbstractExecutorService implements ExecutorService

AbstractExecutorService 定义了 newTaskFor 方法,该方法返回一个 FutureTask。FutureTask 是 RunnableFuture 接口的实现,表示一次异步计算。newTaskFor 可以接受 Runnable 和 Callable 任务:

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

submit 将 Runnable 和 Callable 任务封装成 FutureTask,交给 execute 方法执行。execute 由子类实现,可以控制任务执行细节。

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

invokAll 会对集合中的每一个任务创建 FutureTask,并调用 execute 方法执行。直到所有任务都完成才返回:

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                try {
                    f.get();
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

如果运行过程中抛出 InterruptedException,且任务还没有完成,那么在 finally 语句中取消所有任务。

参考

  1. Executor
  2. ExecutorService
  3. AbstractExecutorService
  4. AbstractExecutorService源码理解

Previous post: 理解 Java Future

Next post: Java 泛型、协变、逆变