Future简介

如果一个任务需要返回执行结果,一般我们会实现一个Callable任务,并创建一个线程来执行任务。对于执行时间比较长的任务,显然我们同步的等待结果再去执行后续的业务是不现实的,那么,Future模式是怎样解决这个问题的呢?

Future模式,可以让调用方立即返回,然后它自己会在后面慢慢处理,此时调用者拿到的仅仅是一个凭证,调用者可以先去处理其它任务,在真正需要用到调用结果的场合,再使用凭证去获取调用结果。这个凭证就是这里的Future。

Future接口的定义:

public interface Future<V> { // 取消任务 boolean cancel(boolean mayInterruptIfRunning); // 任务是否取消 boolean isCancelled(); // 标记任务是否执行完成 boolean isDone(); // 阻塞获取任务结果 V get() throws InterruptedException, ExecutionException; // 超时获取任务结果 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}FutureTaskFuture模式中,最重要的就是FutureTask类

FutureTask一共给任务定义了7种状态

1、NEW:表示任务的初始化状态;
2、COMPLETING:表示任务已执行完成(正常完成或异常完成),但任务结果或异常原因还未设置完成,属于中间状态;
3、NORMAL:表示任务已经执行完成(正常完成),且任务结果已设置完成,属于最终状态;
4、EXCEPTIONAL:表示任务已经执行完成(异常完成),且任务异常已设置完成,属于最终状态;
5、CANCELLED:表示任务还没开始执行就被取消(非中断方式),属于最终状态;
6、INTERRUPTING:表示任务还没开始执行就被取消(中断方式),正式被中断前的过渡状态,属于中间状态;
7、INTERRUPTED:表示任务还没开始执行就被取消(中断方式),且已被中断,属于最终状态。

各个状态之间的流转:

FutureTask构造

FutureTask在构造时可以接受Runnable或Callable任务,如果是Runnable,则最终包装成Callable:

public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { // 包装Runnable成为Callable this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }FutureTask成员

private volatile int state;//任务状态private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;private Callable<V> callable; // 真正的任务private volatile Thread runner; // 保存正在执行任务的线程/** * 记录结果或异常 */private Object outcome;/** * 无锁栈(Treiber stack) * 保存等待线程 */private volatile WaitNode waiters;

当调用FutureTask的get方法时,如果任务没有完成,则调用线程会被阻塞,其实就是将线程包装成WaitNode结点保存到waiters指向的栈中。

static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }任务执行run

public void run() { // 仅当任务为NEW状态时, 才能执行任务 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //执行任务 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; //设置异常 setException(ex); } if (ran) //设置任务执行结果outcome set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }}

set方法:

protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v;//存储结果值 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }任务取消

public boolean cancel(boolean mayInterruptIfRunning) { // 仅NEW状态下可以取消任务 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { if (mayInterruptIfRunning) { // 中断任务 try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { //释放所有在栈上等待的线程 finishCompletion(); } return true;}

任务取消后,最终调用finishCompletion方法,释放所有在栈上等待的线程

private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { //自旋释放所有等待线程 Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t);//唤醒线程 } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }获取结果

FutureTask可以通过get方法获取任务结果,如果需要限时等待,可以调用get(long timeout, TimeUnit unit)

public V get() throws InterruptedException, ExecutionException { int s = state; //当前任务的状态是NEW或COMPLETING,会调用awaitDone阻塞线程 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); // 任务执行结果}/** * 返回执行结果. */private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V) x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable) x);}ScheduledFutureTask

1、ScheduledFutureTask在普通FutureTask的基础上增加了周期执行/延迟执行的功能
2、ScheduledFutureTask是ScheduledThreadPoolExecutor这个线程池的默认调度任务类,通过继承FutureTask和Delayed接口来实现周期/延迟功能的。

ScheduledFutureTask的源码非常简单,基本都是委托FutureTask来实现的任务运行

public void run() { // 是否是周期任务 boolean periodic = isPeriodic(); //// 能否运行任务 if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) // 非周期任务:调用FutureTask的run方法运行 ScheduledFutureTask.super.run(); // 周期任务:调用FutureTask的runAndReset方法运行 else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); }}

FutureTask的runAndReset方法与run方法的区别就是当任务正常执行完成后,不会设置任务的最终状态(即保持NEW状态),以便任务重复执行:

protected boolean runAndReset() { // 仅NEW状态的任务可以执行 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); //不设置执行结果 ran = true; } catch (Throwable ex) { setException(ex); } } } finally { runner = null; s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW;//重新设置任务状态为NEW,继续重复执行}