[TOC]

LinkedTransferQueue 1.8 源码解析一,简介

LinkedTransferQueue 是一个由链表结构组成的wujie阻塞传输队列,它是一个很多队列的结合体(ConcurrentLinkedQueue,LinkedBlockingQueue,SynchronousQueue),在除了有基本阻塞队列的功能(但是这个阻塞队列没有使用锁)之外;队列实现了TransferQueue接口重写了tryTransfer和transfer方法,这组方法和SynchronousQueue公平模式的队列类似,具有匹配的功能。

二,UML图

三,基本成员

// 是否是多核 private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; // 自旋次数 private static final int FRONT_SPINS = 1 << 7; // 前驱节点正在处理,当前节点需要自旋的次数 private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; // 容忍清除节点失败次数的阈值 static final int SWEEP_THRESHOLD = 32; static final class Node { // 表示存放数据还是获取数据 final boolean isData; // false if this is a request node // 存放数据是item有值 volatile Object item; // initially non-null if isData; CASed to match // next节点 volatile Node next; // 等待线程 volatile Thread waiter; // 构造 Node(Object item, boolean isData) { UNSAFE.putObject(this, itemOffset, item); // relaxed write this.isData = isData; } } // 头结点 transient volatile Node head; // 尾节点 private transient volatile Node tail; // xfer方法的入参, 不同类型的方法内部调用xfer方法时入参不同 private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer

注意:xfer 者几个参数很重要。

NOW: 表示的是立即,不需要等待的意思,用于poll和tryTransfer方法,poll 队列为空返回,tryTransfer队列没有消费者,直接返回,都是不等待的。

ASYNC:异步,offer, put, add等入队方法,由于是×××队列,所以不会阻塞。

SYNC:同步表示会阻塞,take一个元素,没有就会阻塞,transfer传输,必须等待消费者来消费。

TIMED: 带超时时间的now,会等待一定的时间后返回。

四,常用方法构造方法

public LinkedTransferQueue() { }NOW 相关方法poll 方法

// 队尾弹出一个元素,没有就返回null public E poll() { return xfer(null, false, NOW, 0); }tryTransfer 方法

// 立即转交一个元素给消费者,如果此时队列没有消费者,那就false public boolean tryTransfer(E e) { return xfer(e, true, NOW, 0) == null; }ASYNC 相关方法offer方法

public boolean offer(E e) { xfer(e, true, ASYNC, 0); return true; }put 方法

public void put(E e) { xfer(e, true, ASYNC, 0); }add 方法

public boolean add(E e) { xfer(e, true, ASYNC, 0); return true; }SYNC 相关方法transfer 方法

// 转交一个元素给消费者,如果此时队列没有消费者,那就阻塞 public void transfer(E e) throws InterruptedException { if (xfer(e, true, SYNC, 0) != null) { // 清除方法 Thread.interrupted(); // failure possible only due to interrupt throw new InterruptedException(); } }take 方法

public E take() throws InterruptedException { E e = xfer(null, false, SYNC, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }TIMED 相关方法poll(long timeout, TimeUnit unit) 和 tryTransfer(E e, long timeout, TimeUnit unit)方法

public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = xfer(null, false, TIMED, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) return e; throw new InterruptedException(); } public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); }

我们可以看见上面所有的方法都是调用的xfer方法,下面我们来详解下这个方法。

核心方法 xfer

private E xfer(E e, boolean haveData, int how, long nanos) { // 插入元素, if (haveData && (e == null)) throw new NullPointerException(); Node s = null; // the node to append, if needed retry: for (;;) { // 死循环 // restart on append race // 从头结点开始匹配 for (Node h = head, p = h; p != null;) { // find & match first node boolean isData = p.isData; // 获取节点的类型 Object item = p.item; // item 的值 // 两种情况 1.put节点 item != null isData 为true 2.take item = null false isData false // 或者节点已经被匹配了 if (item != p && (item != null) == isData) { // unmatched // 节点没有被匹配过 if (isData == haveData) // can't match // 类型一致,只能执行入队操作 break; if (p.casItem(item, e)) { // match 匹配,可能存在多线程竞争匹配 for (Node q = p; q != h;) { // 不是头节点了,头结点发生了改变,被匹配了,自己也匹配了, // 下一个节点 Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { // 自关联 节点不要了 h.forgetNext(); break; } // advance and retry // head 已经被更新过,或者更新head失败,需要重新判断 // h = head == null,队列为空 // (q = h.next) == null 最后一个节点 // 头接单的下一个节点有没有被匹配 // 说明值有头结点匹配了,头结点的next节点也匹配了,才要更新头结点,优化手段 if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } // 匹配成功 LockSupport.unpark(p.waiter); return LinkedTransferQueue.<E>cast(item); } } // 已经匹配就往下走 Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } /* // xfer方法的入参, 不同类型的方法内部调用xfer方法时入参不同 private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer*/ // 模式不同只能入队啦 if (how != NOW) { // No matches available if (s == null) // 创建一个新节点 s = new Node(e, haveData); // tryAppend 给tail追加节点 Node pred = tryAppend(s, haveData); // 不能添加到这个节点 ,重新循环 if (pred == null) continue retry; // lost race vs opposite mode // ASYNC 添加成功返回了 // SYNC TIMED 需要阻塞线程 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); } // now 是立即返回 return e; // not waiting } }

分析:

从头节点开始匹配,判断头节点有没有被匹配,或者头节点的模式和入队节点的模式是否相同。如果模式相同或者已经被匹配了,就去走入队或者出队流程。如果模式不同,就可以匹配了,casItem设置item,完成数据的传递,然后判断q != h,q发生变化说明头结点被别的线程匹配了,这里可能多个线程来匹配,所以头节点是可能发生变化的,我们不是每一次都更新头节点,而是当头节点被匹配,头结点的下一个节点也被匹配才会更新头节点,这是一种优化手段;当我们匹配成功了,唤醒匹配的节点LockSupport.unpark(p.waiter),然后返回。我们再来看模式不同或者队列为空时,我们需要做的就是入队操作,第一步判断how 不是NOW,NOW对应的方法是polltryTransfer ,是不会等待的,也不会入队的,所以直接返回;接下来的几种状态都是要入队的,所以创建一个s = new Node(e, haveData),然后调用tryAppend方法入队追加到队尾,返回前置节点;此时在判断how是ASYNC还是SYNCTIMEDASYNC不要等待所以直接返回,SYNC`TIMED是需要等待的,所以调用awaitMatch方法等待,直到匹配成功或者超时时间到了。tryAppend 方法

入队尾

private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t;;) { // move p to last node and append 遍历 Node n, u; // temps for reads of next & tail if (p == null && (p = head) == null) { // 还没有节点 if (casHead(null, s)) return s; // initialize } // 是否符合入队要求 else if (p.cannotPrecede(haveData)) return null; // lost race vs opposite mode // p.next 不为null,说明p真正的尾节点,p需要向后推进 else if ((n = p.next) != null) // not last; keep traversing p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list // p.next = null,说明找到最后一个节点了,可以入队了 // 可能存在竞争,失败,就继续下一个节点 else if (!p.casNext(null, s)) p = p.next; // re-read on CAS failure else { // 入队成功了 if (p != t) { // 说明此时的入队节点的前节点p和尾节点有距离 是否需要更新尾节点 // update if slack now >= 2 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t); } return p; } } }awaitMatch 方法

private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); int spins = -1; // initialized after first item and cancel checks ThreadLocalRandom randomYields = null; // bound if needed for (;;) { Object item = s.item; // 被匹配过了 if (item != e) { // matched // assert item != s; s.forgetContents(); // avoid garbage return LinkedTransferQueue.<E>cast(item); } // 被中断 超时时间到了 if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // cancel unsplice(pred, s);// return e; } // 初始化自旋 if (spins < 0) { // establish spins at/near front //初始化自旋次数,即计算自旋次数 if ((spins = spinsFor(pred, s.isData)) > 0) randomYields = ThreadLocalRandom.current(); } // 自旋递减 else if (spins > 0) { // spin --spins; if (randomYields.nextInt(CHAINED_SPINS) == 0) Thread.yield(); // occasionally yield } // 自旋次数到了 就会阻塞 // 设置阻塞线程 else if (s.waiter == null) { s.waiter = w; // request unpark then recheck } // 超时阻塞 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos > 0L) LockSupport.parkNanos(this, nanos); } // 阻塞 else { LockSupport.park(this); } } }五,总结

LinkedTransferQueue 是很多队列的集合体,虽然方法基本一样,但是实现却是大大的不同,我们以前的阻塞队列几乎都是使用锁来控制入队和出队的,LinkedTransferQueue 没有使用锁,入队和出队都是使用自旋加cas实现的,比锁的消耗更低,使用了很多的优化(控制自旋次数等),性能更高;队列是wujie的,所以使用时一定要注意内存的问题。

参考《Java 并发编程的艺术》