ArrayBlockingQueue 1.8 源码浅析
[TOC]
ArrayBlockingQueue 1.8 源码浅析一,简介二,类UML图三,基本成员ArrayBlockingQueue 是一个用数组实现的有界队列;此队列按照先进先出(FIFO)的规则对元素进行排序;默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序的访问队列,即先阻塞的线程先访问队列;非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问;为了保证公平性,通常会降低吞吐量。
/** The queued items */ // 记录数据的数组 final Object[] items; /** items index for next take, poll, peek or remove */ // 索引用于 take,poll,peek,remove 等方法 int takeIndex; /** items index for next put, offer, or add */ // 索引用于 put,offer,or add 等方法 int putIndex; /** Number of elements in the queue */ // 总数 int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ // 队列的锁 final ReentrantLock lock; /** Condition for waiting takes */ // 用于让线程等待,消费时队列为空 private final Condition notEmpty; /** Condition for waiting puts */ // 用于让线程等待,生产时队列满 private final Condition notFull;
四,常用方法构造方法
我们看下两个构造,其实也就是一个,注意没有无参构造,初始化时必须要给出容量。
public ArrayBlockingQueue(int capacity) { this(capacity, false); } // 初始化一个ArrayBlockingQueue public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); // 初始化一个数组 this.items = new Object[capacity]; // 初始化一个锁 lock = new ReentrantLock(fair); // 用来存放消费者的阻塞线程 notEmpty = lock.newCondition(); // 用来存放生产者的线程 notFull = lock.newCondition(); }
add 方法
可以看出add调用的是offer方法,详情请看offer方法。
public boolean add(E e) { // 调用父类的方法 return super.add(e); } // 父类 AbstractQueue 的add方法 public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
注意:add 插入失败会抛异常。
offer 方法 // offer加入元素 public boolean offer(E e) { // 不能为null checkNotNull(e); // 获取锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 如果数组满了,返回false if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } // enqueue private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; // 获取数组 final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // 唤醒消费阻塞的队列 notEmpty.signal(); }
注意:offer还有一个重载方法,带有超时时间的插入,支持中断offer(E e, long timeout, TimeUnit unit)。
put 方法public void put(E e) throws InterruptedException { // 不能为null checkNotNull(e); // 获取锁 final ReentrantLock lock = this.lock; // 支持中断 lock.lockInterruptibly(); try { // 等于数组的容量 while (count == items.length) // 等待 notFull.await(); enqueue(e); } finally { lock.unlock(); } }
注意:put和前面的offer要区别,offer方法队列满是返回false,put方法是让线程等待,根据自己的场景用合适的方法。
poll 方法 public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
注意:poll也有一个重载方法,带有超时和中断poll(long timeout, TimeUnit unit)。
take 方法 // 消费 public E take() throws InterruptedException { // 获取锁 final ReentrantLock lock = this.lock; // 支持中断 lock.lockInterruptibly(); try { // 队列为空 while (count == 0) // 阻塞 notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
注意:take和poll也是一对方法,poll队列为空返回null,take是让线程等待,直到唤醒。
peek 方法// 获取队尾的元素 不删除 public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } }
size 方法
// 统计个数 size是准确值 public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } }
五,总结
ArrayBlockingQueue 是有界的,所以我们在初始化是容量要设计好,因为它是不可以扩容的,还有我觉得这个队列适合一些稳定并发量的系统,如果并发量突然变大,导致队列满,会造成大量的线程等待,影响系统的响应;我们通过阅读源码也发现队列的源码是很轻量的,使用起来也很简单,让人很好理解;使用这个队列一定要注意put,offer,take,poll这两组方法,根据自己的业务场景选择是直接返回(响应速度快)还是阻塞线程。
参考:《Java 并发编程的艺术》
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。