juc 中的异步、并发接口和类

2018-05-30

概述

并行与并发,同步与异步

并行(parallelism)与并发(concurrency)是不同的概念,可以只存在并行而没有并发(位并行),也可以只存在并发而没有并行(单 CPU 通过时分完成多任务)。个人认为理解并行与并发最好的一句话是 “Concurrency is about dealing with lots of things once, and Parallelism is about doing lots of things at once

同步(Synchronous)与异步(Asynchronous)是另一组不同的概念。在同步模型中,程序必须执行完当前任务才能继续执行下一个任务;而在异步模型中,程序可以同时启动多个任务,然后等待这些任务的执行完成。异步与并发没有直接联系,可以在单线程中实现异步。

java.util.concurrent

java.util.concurrent 中有许多与异步、并发执行相关的接口,在我看来可以分为三类:

  1. Runnable 和 Callable
  2. Future 相关
  3. Fork/Join 框架

这些接口和类的关系如下: concurrency-and-asynchronous-interfaces-in-java

Runnable 和 Callable 接口

Runnable 和 Callable 接口是 Java 并发编程的最基本接口。Runnable 可以作为参数创建新的线程,只包含一个方法 $\mathtt {run()}$,该方法定义了线程的执行逻辑。$\mathtt {run()}$ 不接受参数,返回结果为 void,也不允许抛出受检异常(checked exceptions),于是有了 Callable 接口。Callable 接口也只包含了一个方法 $\mathtt {call()}$,定义了线程的执行逻辑。$\mathtt {call()}$ 返回值是线程计算结果,如果无法完成计算还有可能抛出异常。

Future

一个 Future 代表了一次异步计算的结果(异步计算不一定需要并发),这样的结果可以使用 $\mathtt {get()}$ 方法得到,$\mathtt {get()}$ 会一直阻塞直到计算完成。除此之外,Future 还提供了 $\mathtt {cancel()}$ 方法可以取消本次计算。如果只想利用 Future 的可取消能力,那么可以声明类型为 Future<?>。

由于异步计算有许多种形式,包括使用多线程执行,定期执行等,于是 Future 接口又派生出其他子接口。

RunnableFuture 接口

RunnableFuture 接口继承了 Runnable 和 Future,表示可以在其他线程单独执行的 Future。当 $\mathtt {run()}$ 方法执行完成,那么 Future 就完成了。

FutureTask 是 RunnableFuture 的基本实现,可以用来包裹(wrap)Callable 或者 Runnable 对象,AbstractExecutorService 中提供了如下方法为 Callable 或者 Runnable 创建 FutureTask:

public abstract class AbstractExecutorService implements ExecutorService {
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable,
                                               T value);

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable);
}

而且 FutureTask 实现了 Runnable 接口,可以提交到 Executor 中执行。

ScheduledFuture

ScheduledFuture 表示一个延迟执行或者定时重复执行的 Future,ScheduledExecutorService 提供了为 Runnable 或者 Callable 创建 ScheduledTask 的接口:

public interface ScheduledExecutorService extends ExecutorService {
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}

RunnableScheduledFuture

类似于 RunnableFuture,RunnableScheduledFuture 表示可以在其他线程执行的 ScheduledFuture。

CompletionStage 和 CompletableFuture

没有 CompletionStage 和 CompletableFuture 之前,使用 Future 的做法类似如下:

ExecutorService pool = ... // 创建线程池
Future future = pool.submit(...)  // 提交 Runnable 或者 Callable

try{
    ...  // 等待 Future 完成
}catch(InterruptedException e){
    ...
}

创建完成 Future 之后,通常来说我们会选择同步阻塞等待 Future 执行完成。但是一种更好的做法是为 Future 定义 callback,异步处理运行结果。JDK 8 之后加入 CompletionStage 和 CompletableFuture,允许我们使用 reactive 的方式异步编程。

CompletionStage 定义了异步执行的接口,CompletableFuture 则实现了 Future 和 CompletionStage 接口,可以作为普通的 Future 使用,还可以定义异步调用。一个例子如下:

CompletableFuture<String> future = ...  
future.thenAccept(info -> System.out.println(info))

当 future 计算完成后,就会将计算结果作为参数调用 thenAccept 打印出来,整个过程都是异步执行的。

CompletableFuture 提供了创建实例的工厂方法:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor);

    public static CompletableFuture<Void> runAsync(Runnable runnable);
    public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor);
}

这些方法会创建 CompletableFuture 实例,然后在线程池中异步执行 Runnable 或者 Supplier,默认使用 $\mathtt {ForkJoinPool.commonPol()}$,也可以提供自定义线程池。Supplier 也是 Java 8 新加入的函数接口,类似于 Callable,但是不能抛出受检(checked)异常。

CompletableFuture 包含了两类 API,第一类包含了异步执行结果的使用者的 API,包括了:

  1. 继承 Future 的接口
  2. 继承 Future 扩展的接口
  3. 创建 CompletableFuture 的工厂方法
  4. 继承 CompletionStage 的响应式 API
  5. 静态批量操作($\mathtt {allOf}$ 和 $\mathtt {anyOf}$等)

第二类包含了异步执行结果的提供者的 API,包括了:

  1. 设置/重设 result:$\mathtt {complete()}$,$\mathtt {completeExceptionally()}$ 等
  2. 监控 API:$\mathtt {isDone()}$,$\mathtt {isCompletedExceptionally()}$ 等

有三种方式向 CompletableFuture 添加一个新的动作,分别是:

<U> CompletableFuture<U>    thenApply  (Function<T, U> action)
    CompletableFuture<Void> thenAccept (Consumer<T> action)
    CompletableFuture<Void> thenRun    (Runnable action)

区别在于:

  • thenApply 的 Function<T, U> 接受异步执行结果,然后返回一个新的结果
  • thenAccept 的 Consumer 虽然接受异步执行结果,但是返回 Void
  • thenRun 的 Runnable 既不需要接受参数同时也返回 Void。

thenRun 有三种执行策略,分别定义如下:

CompletableFuture<Void> thenRun      (Runnable action)
CompletableFuture<Void> thenRunAsync (Runnable action)
CompletableFuture<Void> thenRunAsync (Runnable action,
                                      Executor executor)

区别在于当 CompletableFuture 执行完成后

  • thenRun 使用同一条线程执行 Runnable
  • theRunAsync 使用新的线程执行 Runnable,如果提供了线程池,那么使用提供线程池的线程执行

类似地,thenApply 和 thenAccept 等函数也有相应的 async 实现,含义相同。除此之外,我们还可以对不同的 CompletableFuture 进行组合,比如可以组合两个 CompletableFuture,提供一个回调函数,只有当两者都执行完成后才执行:

<U,V> CompletionStage<V> thenCombine (CompletionStage<? extends U> other,
                                      BiFunction<? super T,? super U,? extends V> fn)

另一个可以组合 CompletableFuture 的方法是 $\mathtt {thenComponse}$:

<U> CompletionStage<U> thenCompose (Function<? super T,? extends CompletionStage<U>> fn)

除此之外,还有 $\mathtt {allOf}$ 和 $\mathtt {anyOf}$:

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Void> anyOf(CompletableFuture<?>... cfs)

两者都返回一个 CompletableFuture,但是执行完成条件不同。

如果从函数式编程的角度来理解,CompletableFuture 是一个 Monad,$\mathtt {thenApply}$ 等价于 $\mathtt {map}$ 或者 $\mathtt {fmap}$;$\mathtt {thenCompose}$ 等价于 $\mathtt {flatMap}$ 或者 $\mathtt {bind}$;$\mathtt {theCombine}$ 等价于 $\mathtt {liftM2}$。

CompletableFuture 提供了多种方式获取最终执行结果,最简单的情况是阻塞直到计算完成,比如说使用方法 $\mathtt {get()}$ 和 $\mathtt {join()}$。除此之外,还可以

FutureTask 与 CompletableFuture 的区别

FutureTask 紧紧的耦合了异步计算结果获取和任务控制,提供了:

  • 任务结果查询
  • 任务控制
  • 任务取消
  • 任务完成(即,设置/重新设置结果)

CompletableFuture 则解耦了结果获取和任务控制,还提供了响应式编程。

Fork/Join 框架

Fork/Join 框架基于 “divide-and-conquer”,将耗时较长的任务分成数个较小的任务,并行执行这些任务,然后汇总最终结果。

Fork/Join 框架中最基础的接口是 ForkJoinTask ,表示一个轻量级的 Future,有两个重要的方法: $\mathtt {fork()}$ 和 $\mathtt {join()}$。$\mathtt {fork()}$ 用于异步执行子任务;$\mathtt {join()}$ 可以阻塞当前线程直到任务执行完成。ForkJoinTask 通常要求执行的任务不会阻塞,并且尽可能减少同步。ForkJoinTask 提供了 $\mathtt {adapt()}$ 方法,用于将 Runnable 或者 Callable 转变成 ForkJoinTask。

人们不需要直接继承 ForkJoinTask,而是继承 RecursiveAction、RecursiveTask 或者 CountedCompleter。RecursiveAction 表示一个递归执行,返回值是 Void 的 For/Join 任务;RecursiveTask 则适用于那些有返回值的任务;CountedCompleter 适用于事件驱动的异步任务,一项任务的完成可以触发其他任务。

参考

  1. Parallel computing
  2. java.util.concurrent docs
  3. Concurrency vs Multi-threading vs Asynchronous Programming : Explained
  4. Future Diagram
  5. ForkJoinTask Diagram
  6. Introduction to CompletionStage and CompletableFuture
  7. New Concurrency Utilities in Java 8