[TOC]

LinkedBlockingQueue 1.8 源码详解一,简介

LinkedBlockingQueue 是一个用链表实现的有界阻塞队列;此队列的默认和最大长度为Integer.MAX_VALUE;此队列按照先进先出的原则对元素就行排序;队列有两个锁,生成和消费各一把锁,都是默认的非公平锁。

二,类UML图

三,基本成员

static class Node<E> { // 我们插入的值 E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ // 下一个node Node<E> next; Node(E x) { item = x; } } /** 队列容量 */ private final int capacity; /** 两个锁,需要使用AtomicInteger保证原子性 */ private final AtomicInteger count = new AtomicInteger(); /** * Head of linked list. * Invariant: head.item == null */ // 头结点 transient Node<E> head; /** * Tail of linked list. * Invariant: last.next == null */ // 尾节点 private transient Node<E> last; /** Lock held by take, poll, etc */ /** take, poll, etc 的锁 */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ /** 等待在队列空 */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ /** put, offer, etc的锁 */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ /** 等待在队列满 */ private final Condition notFull = putLock.newCondition();四,常用方法构造方法

// 无参构造 public LinkedBlockingQueue() { // 默认Integer.MAX_VALUE this(Integer.MAX_VALUE); } // 有参构造 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; // 创建一个item为null的节点 last = head = new Node<E>(null); }offer 方法

public boolean offer(E e) { // e不能为null if (e == null) throw new NullPointerException(); // 总数 final AtomicInteger count = this.count; // 总数等于了容量 返回false if (count.get() == capacity) return false; int c = -1; // 创建一个node Node<E> node = new Node<E>(e); // 获取锁 final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { // 插入链表 enqueue(node); // 加1返回旧值 c = count.getAndIncrement(); // c是增加之前的值,然后加1,再判断有没有可以存储的容量 if (c + 1 < capacity) // 有唤醒下一个线程 notFull.signal(); } } finally { putLock.unlock(); } // 队列有一个元素了,证明之前队列为空,可能已经有元素来消费了,所以就需要唤醒一个等待消费的线程 if (c == 0) signalNotEmpty(); return c >= 0; } private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; }

注意:offer 还有一个重载方法,支持中断,带有超时时间的限制offer(E e, long timeout, TimeUnit unit)。

put 方法

public void put(E e) throws InterruptedException { // 不可以为null if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; // 构建一个节点 Node<E> node = new Node<E>(e); // 获取put锁 final ReentrantLock putLock = this.putLock; // 获取count final AtomicInteger count = this.count; // 调用获取锁的方法,支持中断 putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ // 等于了队列的容量 while (count.get() == capacity) { // 进入阻塞队列 notFull.await(); } // 入队 enqueue(node); // 返回的是自增前的值 c = count.getAndIncrement(); // 如果这个元素入队以后,还有多于的空间,唤醒等待队列的线程 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // c==0,证明之前队列是空的,唤醒一个获取线程 if (c == 0) signalNotEmpty(); }poll 方法

这次我们看个带超时时间的poll方法。

// 带超时时间的消费一个元素 public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 支持中断的获取锁 takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); // count-- 返回旧值 c = count.getAndDecrement(); // 还有元素,唤醒一个等待获取的线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // 队列还有一个位置,唤醒一个入队线程 if (c == capacity) signalNotFull(); return x; } private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC // 自引用 head = first; E x = first.item; first.item = null; return x; }take 方法

// 获取元素 public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 队列为null 就阻塞 while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // 队列消费一个元素,可以唤醒一个生产线程了 if (c == capacity) signalNotFull(); return x; }peek 方法

// 获取第一个元素 public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }size 方法

public int size() { return count.get(); }五,总结

LinkedBlockingQueue 可以看做是一个×××队列,因为最大容量是Integer.MAX_VALUE,这已经很大了,所以使用时一定注意容量问题,避免内存溢出,但是好处就是可以不用我们去初始容量;队列在入队和出队使用了两把锁,提高了并发性,相对于一把锁来说;我们可以发现队列的底层数据结构采用的是链表,对比ArrayBlockingQueue的数组数据结构,在处理数据的同时,节点本身也需要处理垃圾回收,所以相对于数组来的数据来说增加了垃圾回收,可能影响性能;LinkedBlockingQueue 和ArrayBlockingQueue 两个可以对比学习,追求系统稳定性,性能就使用ArrayBlockingQueue ,追求并发性,可能发生大量请求时(系统不是很稳定)要注意内存溢出就使用LinkedBlockingQueue ,使用场景属于个人理解,欢迎指正。

《参考 Java 并发编程的艺术》