[TOC]

Java 并发工具CountDownLatch和CyclicBarrier 原理解析一,简介

CountDownLatch 允许一个或者多个线程等待其他线程完成操作。

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程达到一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

二,代码演示CountDownLatchDemo

public class CountDownLatchDemo { public static final CountDownLatch count = new CountDownLatch(10); private static int j = 0; public static void main(String[] args) throws Exception { for (int i = 0; i < 10; i++) { new Thread( ()-> { System.out.println("我是"+(++j)); count.countDown(); } ).start(); } count.await(); System.out.println("我是总数"+j+"!!!"); }}运行结果:我是1我是2我是3我是4我是5我是6我是7我是8我是9我是10我是总数10!!!CyclicBarrierDemo

public class CyclicBarrierDemo { private static final CyclicBarrier c = new CyclicBarrier(6,new Thread(() -> System.out.println("我是最后一个") )); private static AtomicInteger index = new AtomicInteger(1); public static void main(String[] args) throws Exception, BrokenBarrierException { for (int i = 1; i <= 6; i ++) { new Thread(() -> { try { c.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println("我是:"+(index.getAndIncrement())); }) .start(); } }}运行结果:我是最后一个我是:1我是:2我是:3我是:4我是:5我是:6三,源码解析CountDownLatch 源码

原理:

CountDownLatch 又叫做闭锁,CountDownLatch 的构造函数接受一个int类型的参数作为计数器,如果你想等待n个节点完成,那就传入N;当我们调用CountDownLatch 的countDown方法时,N就会减1,CountDownLatch的await会阻塞当前方法,直到N变成0;由于countDown方法可以用在任何地方,这里说的N个点,可以是N个线程,也可以是1一个线程里面的N个步骤。

源码:

// 构造方法public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }// 内部类 Sync 继承AQSprivate static final class Sync extends AbstractQueuedSynchronizer {}countDown 方法

public void countDown() { // 调用了AQS的releaseShared方法 sync.releaseShared(1); } // 这是Sync的tryReleaseShared // AQS的releaseShared会调用子类的tryReleaseShared 用来控制count // tryReleaseShared 共享式的释放状态 具体参考AQS protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { // 获取的其实就是我们构造函数的count int c = getState(); // count == 0 证明整个记录流程已经完毕了 if (c == 0) return false; // 减1 int nextc = c-1; if (compareAndSetState(c, nextc)) cas 更新 return nextc == 0; // 等于0,返回ture 证明计数结束了,可以去唤醒同步队列的线程了 // 唤醒是AQS的releaseShared方法 // 结合CountDownLatch的await方法理解整这里 } }await 方法

public void await() throws InterruptedException { // 共享式获取同步转态 sync.acquireSharedInterruptibly(1); } // 这是Sync的方法 // await 其实是调用的AQS的acquireSharedInterruptibly 但是aqs会调用子类tryAcquireShared // 我们看到值有state等于0 才会返回true 成功 -1 表示失败 失败就要加入同步队列 // 所以在countDown方法里面等于0 为什么要去唤醒 ,应为这里会进入同步队列 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }

通过源码我们可以发现只有当countDown 这个方法计数递减完毕,别的线程才能执行,因为调用await的线程会进入AQS的同步队列,然后阻塞。

CyclicBarrier 源码

原理:

CyclicBarrier 默认构造方法是CyclicBarrier (int parties),器参数表示屏障拦截的线程数量,每个线程调用await告诉CyclicBarrier 我已经到达屏障了,然后当前线程被阻塞;CyclicBarrier 海提供了一个高级的构造函数,CyclicBarrier (int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction线程,方便处理更复杂的业务逻辑。

源码:

public CyclicBarrier(int parties) { this(parties, null); }public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation();await 方法

public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 获取锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 这一代的状态 final Generation g = generation; // 默认为false Barrier被Broken 就会为true if (g.broken) throw new BrokenBarrierException(); // 线程被中断了,标记为breakBarrier,唤醒所有线程 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 计数器减减 int index = --count; // 到达 trip if (index == 0) { // tripped boolean ranAction = false; try { // 执行构造函数里面的线程 final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; // 唤醒所有等待线程 然后重置 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out // 一直自旋直到发生:tripped, broken, interrupted, timed out for (;;) { try { // 带时间 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); // 自旋过程中发生中断 } catch (InterruptedException ie) { // 等于说明当前被重点的这个线程没有被broken // 抛异常 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // 不等于说明后来的线程已经broken了 // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. // 中断线程 breakBarrier已经没有意义了 Thread.currentThread().interrupt(); } } if (g.broken) // 屏蔽Broken throw new BrokenBarrierException(); // 别的线程更新了generation 不属于当前代 if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }

我们发现CyclicBarrier是所有线程一起阻塞,直到到达屏障点,然后全部唤醒一起执行。

四,总结

CountDownLatch和CyclicBarrier都可以实现一个线程等待一个或者多个线程到达一个点之后才执行,但是这一个或者多个线程的状态却是不一样的,CountDownLatch是来一个执行一个不会阻塞,直到大家执行完了,在执行调用await方法的线程,CyclicBarrier是来一个阻塞一个,直到大家都阻塞完毕,然后在优先执行构造函数里面的线程,在唤醒所有阻塞的线程;CountDownLatch的计数器只能执行一次,CyclicBarrier可以执行多次,所以CyclicBarrier可以执行复杂的业务场景。

参考 《Java 并发编程的艺术》