[TOC]

SynchronousQueue 1.8 源码解析一,简介

SynchronousQueue 是一个很奇怪的队列,感觉都不能叫队列,因为内部没有数据的存储空间,队列不能peek,因为不存在元素,任何入队的线程都会阻塞,直到有线程来出队,也就是这个队列是一组操作,入队和出队要一起离开,出队也是一样,必须等入队,必须结伴而行;队列支持公平和非公平的模式(指的是队列匹配线程的顺序),公平模式的数据结构是队列(FIFO),非公平模式使用的是栈(LIFO)。

二,UML 图

三,基本成员

abstract static class Transferer<E> { // 出队入队都是这一个方法 abstract E transfer(E e, boolean timed, long nanos);} // npu数 static final int NCPUS = Runtime.getRuntime().availableProcessors(); // 带超时时间的自旋次数 static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32; // 没有超时的自旋次数 static final int maxUntimedSpins = maxTimedSpins * 16;TransferStack 非公平的实现,主要成员

TransferStack 继承 Transferer

注意:这几个状态很重要,因为继承了Transferer,所以出队和入队都是使用的transfer方法,状态是用来区分的,后面方法部分会详解

/** 0表示消费者 */ static final int REQUEST = 0; /** 1表示数据的生产者 */ static final int DATA = 1; /** 2 表示数据正在匹配 */ static final int FULFILLING = 2; static final class SNode { volatile SNode next; // 下一个节点 volatile SNode match; // 匹配的节点 volatile Thread waiter; // 等待的线程 Object item; // 数据 int mode; // 模式 0 , 1 , 2 } /** 头结点 */ volatile SNode head;TransferQueue 公平实现,主要成员

TransferQueue 继承 Transferer

static final class QNode { volatile QNode next; // next 节点 volatile Object item; // 数据项 volatile Thread waiter; // 等待线程 final boolean isData; // 区分生产和消费 } /** 头结点 */ transient volatile QNode head; /** 尾节点 */ transient volatile QNode tail;四,常用方法构造方法

public SynchronousQueue() { this(false); } // 构造方法,fair表示公平或者非公平 public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }TransferStack 非公平常用方法offer 方法

public boolean offer(E e) { // e 不能为null if (e == null) throw new NullPointerException(); return transferer.transfer(e, true, 0) != null; } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // e 不能为null if (e == null) throw new NullPointerException(); if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); }put 方法

public void put(E e) throws InterruptedException { // e 不能为null if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { // 支持中断 Thread.interrupted(); throw new InterruptedException(); }poll 方法

public E poll() { return transferer.transfer(null, true, 0); }take 方法

public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }transfer 方法

E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed // 根据所传元素判断为生产or消费 int mode = (e == null) ? REQUEST : DATA; for (;;) { // 无限循环 SNode h = head; // 获取头结点 if (h == null || h.mode == mode) { // 头结点为空或者当前节点状态(0,1,2)和头结点相同 if (timed && nanos <= 0) { // can't wait 设置有时间 // 节点不为null并且为取消状态 if (h != null && h.isCancelled()) // 弹出取消的节点 casHead(h, h.next); // pop cancelled node else // 超时直接返回null return null; // 没有设置超时 } else if (casHead(h, s = snode(s, e, h, mode))) { // 将h设为自己的next节点 // 空旋或者阻塞直到s结点被FulFill操作所匹配 SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // wait was cancelled 节点被取消了 clean(s); return null; } // 找到匹配的线程了 // h == head 可能已经已经被匹配 // h.next 等于s 不同类型 if ((h = head) != null && h.next == s) // 弹出h 和 s casHead(h, s.next); // help s's fulfiller return (E) ((mode == REQUEST) ? m.item : s.item); } // 未匹配 } else if (!isFulfilling(h.mode)) { // try to fulfill // 尝试匹配节点 if (h.isCancelled()) // already cancelled // 节点被取消 casHead(h, h.next); // pop and retry // 修改头结点 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match // 没有下一个节点了,结束这次循环,走最外层循环重新开始 if (m == null) { // all waiters are gone // m等于null casHead(s, null); // pop fulfill node // cas 设置head s = null; // use new node next time break; // restart main loop // 结束循环 } SNode mn = m.next; if (m.tryMatch(s)) { // 尝试匹配,成功 casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match // 失败,说明m背的线程匹配了,或者取消了 s.casNext(m, mn); // help unlink // 修改next节点 } } } else { // help a fulfiller 正在匹配 SNode m = h.next; // m is h's match if (m == null) // waiter is gone 匹配完成了 casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }awaitFulfill 方法

SNode awaitFulfill(SNode s, boolean timed, long nanos) { // 计算时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 获取当前线程 Thread w = Thread.currentThread(); // shouldSpin控制自旋 // shouldSpin 用于检测当前节点是否需要自旋 // 如果栈为空、该节点是首节点或者该节点是匹配节点,则先采用自旋,否则阻塞 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 死循环 if (w.isInterrupted()) // 线程被中断 s.tryCancel(); SNode m = s.match; if (m != null) // 存在匹配节点 ,返回 return m; if (timed) { // 存在超时设置 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } // 自旋;每次自旋的时候都需要检查自身是否满足自旋条件,满足就 - 1,否则为0 if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; // 设置node的线程 else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter // 如果不是超时,就阻塞 else if (!timed) LockSupport.park(this); // 设置超时阻塞 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }clean 方法

void clean(SNode s) { s.item = null; // forget item s.waiter = null; // forget thread // next节点 SNode past = s.next; // next节点也被中断了,直接删除 if (past != null && past.isCancelled()) past = past.next; // Absorb cancelled nodes at head // 从栈顶开始找,清除取消的节点 SNode p; while ((p = head) != null && p != past && p.isCancelled()) casHead(p, p.next); // Unsplice embedded nodes // 从有效的头节点开始p ,到s的后继节点,继续清除 while (p != null && p != past) { SNode n = p.next; if (n != null && n.isCancelled()) p.casNext(n, n.next); else p = n; } }

分析transfer 方法:

我们可以发现transfer 是通过e是空来判断是offer方法还是poll方法的,也就是入队者和出对者的区分。

第一种情况,如果队列为空head,或者队列存在的head节点和自己的模式相同,首先判断有没有超时或者取消,有就执行这些操作,没有就执行入队操作,然后把新加入的节点加入栈顶,然后调用awaitFulfill方法阻塞线程,直到被中断,超时或者匹配成功,为什么要阻塞了因为大家的模式都相同没法匹配,所以只能阻塞线程,直到一个不同模式的线程匹配成功,唤醒自己,这也是awaitFulfill方法的结束流程。

第二种情况,如果入队的模式不同,通过isFulfilling方法判断head节点有没有在匹配,没有就执行匹配流程,

首先判断节点是否被取消了,没有在判断自己是不是唯一的一个节点,如果是循环,重新开始流程,如果不是上面的这些情况,就可以开始匹配节点了,调用tryMatch方法,成功唤醒另一个节点,然后一起出栈,返回结果,失败就向后推进,找下一个节点,这里可能别的线程会竞争的匹配。

第三种情况,入队的模式不同,但是head节点正在匹配,那就帮助它匹配完成,然后重新走整个循环。TransferQueue 公平常用方法

入队和出队的方法是一样的,我们主要看下transfer 方法吧。

transfer 方法

E transfer(E e, boolean timed, long nanos) { QNode s = null; // constructed/reused as needed // 判断是生产者 还是消费者 boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; // 没有初始化 if (t == null || h == null) // saw uninitialized value continue; // spin // h==t 刚刚初始化 t.isData == isData,尾节点和当前节点的类型一样 if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; // 获取尾节点 if (t != tail) // 尾节点发生变了变化 // inconsistent read continue; if (tn != null) { // 重设为节点 // lagging tail advanceTail(t, tn); continue; } if (timed && nanos <= 0) // 超时了 // can't wait return null; if (s == null) // 构建新节点 s = new QNode(e, isData); if (!t.casNext(null, s)) // failed to link in 有竞争 continue; // 替换尾节点 advanceTail(t, s); // swing tail and wait // 自旋/阻塞 返回的是中断取消/匹配的节点 Object x = awaitFulfill(s, e, timed, nanos); // 中断 if (x == s) { // wait was cancelled clean(t, s); return null; } // 匹配成功了,需要执行出队操作 if (!s.isOffList()) { // not already unlinked // 修改头结点 advanceHead(t, s); // unlink if head if (x != null) // and forget fields s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; } else { // complementary-mode // 出队从头元素开始 QNode m = h.next; // node to fulfill if (t != tail || m == null || h != head) // 队列发生变化重来 continue; // inconsistent read Object x = m.item; // isData == (x != null) 判断模式是否相同,不相同才能匹配 // x == m 说明已经被中断或者超时了 // m.casItem(x, e) 匹配 if (isData == (x != null) || // m already fulfilled x == m || // m cancelled !m.casItem(x, e)) {// lost CAS advanceHead(h, m); // dequeue and retry continue; } // 匹配成功 // 替换头 advanceHead(h, m); // successfully fulfilled // 唤醒等待线程 LockSupport.unpark(m.waiter); // 唤醒线程 return (x != null) ? (E)x : e; } } }

分析transfer方法:

也是通过e来判断是入队还是出队的,都是调用transfer方法,transfer方法可以看出两部分,入队和匹配。第一部分入队,入队的模式也是相同的,入队是从尾节点开始,获取尾节点,判断尾节点有没有发生变化,可能存在多线程的情况,发生改变就重新遍历,没有就判断尾节点有没有next节点,有就说明别的线程添加了新的节点,需要更新尾节点,然后构造新的节点加入当前尾节点的next节点,更新尾节点,然后调用awaitFulfill阻塞当前节点,直到中断,超时或者匹配,然后清除匹配成功的节点,调用clean方法。第二部分匹配(出队),出队是从头节点开始,然后判断模式是否不同,是否被取消,cas设置item,其实也就是数据的传递,如果匹配成功,唤醒等待在m的线程,这里注意把m设置成了头结点,其实就是把m节点弹出了,因为我们匹配取得头结点的next节点。五,总结

SynchronousQueue 的实现还是很复杂的,我们可以发现虽然是个阻塞队列,可是没有使用锁;这个队列适合传递的场景,队列没有存储元素的队列,出队和入队必须结伴而行。

参考 《Java 并发编程的艺术》