阿里P7教你巧用Java的同步容器和并发容器
在 Java 中,同步容器主要包括 2 类:
Vector、Stack、HashTableCollections 类中提供的静态工厂方法创建的类(由 Collections.synchronizedXxxx 等方法)Vector 实现了 List 接口,Vector 实际上就是一个数组,和 ArrayList 类似,但是 Vector 中的方法都是 synchronized 方法,即进行了同步措施。Stack 也是一个同步容器,它的方法也用 synchronized 进行了同步,它实际上是继承于 Vector 类。HashTable 实现了 Map 接口,它和 HashMap 很相似,但是 HashTable 进行了同步处理,而 HashMap 没有。同步容器的缺陷同步容器的同步原理就是在方法上用 synchronized
修饰。那么,这些方法每次只允许一个线程调用执行。
由于被 synchronized
修饰的方法,每次只允许一个线程执行,其他试图访问这个方法的线程只能等待。显然,这种方式比没有使用 synchronized
的容器性能要差。
同步容器真的一定安全吗?
答案是:未必。同步容器未必真的安全。在做复合操作时,仍然需要加锁来保护。
常见复合操作如下:
迭代:反复访问元素,直到遍历完全部元素;跳转:根据指定顺序寻找当前元素的下一个(下 n 个)元素;条件运算:例如若没有则添加等;不安全的示例<pre Courier New" !important; font-size: 12px !important;">public class Test { static Vector<Integer> vector = new Vector<Integer>();public static void main(String[] args) throws InterruptedException { while(true) { for (int i=0;i<10;i++) vector.add(i); Thread thread1 = new Thread(){ public void run() { for (int i=0;i<vector.size();i++) vector.remove(i); } ; } ; Thread thread2 = new Thread(){ public void run() { for (int i=0;i<vector.size();i++) vector.get(i); } ; } ; thread1.start(); thread2.start(); while(Thread.activeCount()>10) { } }}}</pre>
执行时可能会出现数组越界错误。
Vector 是线程安全的,为什么还会报这个错?很简单,对于 Vector,虽然能保证每一个时刻只能有一个线程访问它,但是不排除这种可能:
当某个线程在某个时刻执行这句时:
<pre Courier New" !important; font-size: 12px !important;">for (int i=0;i<vector.size();i++) vector.get(i);</pre>
假若此时 vector 的 size 方法返回的是 10,i 的值为 9
然后另外一个线程执行了这句:
<pre Courier New" !important; font-size: 12px !important;">for (int i=0;i<vector.size();i++) vector.remove(i);</pre>
将下标为 9 的元素删除了。
那么通过 get 方法访问下标为 9 的元素肯定就会出问题了。
安全示例因此为了保证线程安全,必须在方法调用端做额外的同步措施,如下面所示:
public class Test { static Vector<Integer> vector = new Vector<Integer>(); public static void main(String[] args) throws InterruptedException { while(true) { for (int i=0;i<10;i++) vector.add(i); Thread thread1 = new Thread(){ public void run() { synchronized (Test.class) { //进行额外的同步 for (int i=0;i<vector.size();i++) vector.remove(i); } } ; } ; Thread thread2 = new Thread(){ public void run() { synchronized (Test.class) { for (int i=0;i<vector.size();i++) vector.get(i); } } ; } ; thread1.start(); thread2.start(); while(Thread.activeCount()>10) { } } }}
ConcurrentModificationException 异常
在对 Vector 等容器并发地进行迭代修改时,会报 ConcurrentModificationException 异常,关于这个异常将会在后续文章中讲述。
但是在并发容器中不会出现这个问题。
并发容器JDK 的 java.util.concurrent
包(即 juc)中提供了几个非常有用的并发容器。
ConcurrentHashMap 类在 jdk1.7 中的设计,其基本结构如图所示:
每一个 segment 都是一个 HashEntry<K,V>[] table, table 中的每一个元素本质上都是一个 HashEntry 的单向队列。比如 table[3]为首节点,table[3]->next 为节点 1,之后为节点 2,依次类推。
<pre Courier New" !important; font-size: 12px !important;">public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { // 将整个hashmap分成几个小的map,每个segment都是一个锁;与hashtable相比,这么设计的目的是对于put, remove等操作,可以减少并发冲突,对 // 不属于同一个片段的节点可以并发操作,大大提高了性能final Segment<K,V>[] segments;// 本质上Segment类就是一个小的hashmap,里面table数组存储了各个节点的数据,继承了ReentrantLock, 可以作为互拆锁使用static final class Segment<K,V> extends ReentrantLock implements Serializable { transient volatile HashEntry<K,V>[] table; transient int count;}// 基本节点,存储Key, Value值static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next;}}</pre>
JDK8jdk8 中主要做了 2 方面的改进取消 segments 字段,直接采用 transient volatile HashEntry<K,V>[] table
保存数据,采用 table 数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。将原先 table 数组+单向链表的数据结构,变更为 table 数组+单向链表+红黑树的结构。对于 hash 表来说,最核心的能力在于将 key hash 之后能均匀的分布在数组中。如果 hash 之后散列的很均匀,那么 table 数组中的每个队列长度主要为 0 或者 1。但实际情况并非总是如此理想,虽然 ConcurrentHashMap 类默认的加载因子为 0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为 O(n);因此,对于个数超过 8(默认值)的列表,jdk1.8 中采用了红黑树的结构,那么查询的时间复杂度可以降低到 O(logN),可以改进性能。
<pre Courier New" !important; font-size: 12px !important;">final V putVal(K key, V value, Boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 如果table为空,初始化;否则,根据hash值计算得到数组索引i,如果tab[i]为空,直接新建节点Node即可。注:tab[i]实质为链表或者红黑树的首节点。 if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 如果tab[i]不为空并且hash值为MOVED,说明该链表正在进行transfer操作,返回扩容完成后的table。 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // 针对首个节点进行加锁操作,而不是segment,进一步减少线程冲突 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; // 如果在链表中找到值为key的节点e,直接设置e.val = value即可。 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 如果没有找到值为key的节点,直接新建Node并加入链表即可。 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 如果首节点为TreeBin类型,说明为红黑树结构,执行putTreeVal操作。 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 如果节点数>=8,那么转换链表结构为红黑树结构。 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } }}// 计数增加1,有可能触发transfer操作(扩容)。addCount(1L, binCount);return null;}</pre>
示例
<pre Courier New" !important; font-size: 12px !important;">public class ConcurrentHashMapDemo { public static void main(String[] args) throws InterruptedException { // HashMap 在并发迭代访问时会抛出 ConcurrentModificationException 异常 // Map<Integer, Character> map = new HashMap<>(); Map<Integer, Character> map = new ConcurrentHashMap<>(); Thread wthread = new Thread(() -> { System.out.println("写操作线程开始执行"); for (int i = 0; i < 26; i++) { map.put(i, (char) ('a' + i)); } }); Thread rthread = new Thread(() -> { System.out.println("读操作线程开始执行"); for (Integer key : map.keySet()) { System.out.println(key + " - " + map.get(key)); } }); wthread.start(); rthread.start(); Thread.sleep(1000); }}</pre>
CopyOnWriteArrayList要点作用:CopyOnWrite 字面意思为写入时复制。CopyOnWriteArrayList 是线程安全的 ArrayList。原理:在 CopyOnWriteAarrayList 中,读操作不同步,因为它们在内部数组的快照上工作,所以多个迭代器可以同时遍历而不会相互阻塞(1,2,4)。所有的写操作都是同步的。他们在备份数组(3)的副本上工作。写操作完成后,后备阵列将被替换为复制的阵列,并释放锁定。支持数组变得易变,所以替换数组的调用是原子(5)。写操作后创建的迭代器将能够看到修改的结构(6,7)。写时复制集合返回的迭代器不会抛出 ConcurrentModificationException,因为它们在数组的快照上工作,并且无论后续的修改(2,4)如何,都会像迭代器创建时那样完全返回元素。
array - 对象数组,用于存放元素
<pre Courier New" !important; font-size: 12px !important;"> /** The lock protecting all mutators */final transient ReentrantLock lock = new ReentrantLock();/** The array, accessed only via getArray/setArray. */private transient volatile Object[] array;</pre>
重要方法添加操作添加的逻辑很简单,先将原容器 copy 一份,然后在新副本上执行写操作,之后再切换引用。当然此过程是要加锁的。
<pre Courier New" !important; font-size: 12px !important;">public Boolean add(E e) {//ReentrantLock加锁,保证线程安全final ReentrantLock lock = this.lock;lock.lock();try {Object[] elements = getArray();int len = elements.length;//拷贝原容器,长度为原容器长度加一Object[] newElements = Arrays.copyOf(elements, len + 1);//在新副本上执行添加操作newElements[len] = e;//将原容器引用指向新副本setArray(newElements);return true;}finally {//解锁lock.unlock();}}</pre>
删除操作
删除操作同理,将除要删除元素之外的其他元素拷贝到新副本中,然后切换引用,将原容器引用指向新副本。同属写操作,需要加锁。<pre Courier New" !important; font-size: 12px !important;">public E remove(int index) { //加锁 final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; E oldValue = get(elements, index); int numMoved = len - index - 1; if (numMoved == 0) //如果要删除的是列表末端数据,拷贝前len-1个数据到新副本上,再切换引用 setArray(Arrays.copyOf(elements, len - 1)); else { //否则,将除要删除元素之外的其他元素拷贝到新副本中,并切换引用 Object[] newElements = new Object[len - 1]; System.arraycopy(elements, 0, newElements, 0, index); System.arraycopy(elements, index + 1, newElements, index, numMoved); setArray(newElements); } return oldValue; } finally { //解锁 lock.unlock(); }}</pre>
读操作CopyOnWriteArrayList 的读操作是不用加锁的,性能很高。
<pre Courier New" !important; font-size: 12px !important;">public E get(int index) {return get(getArray(), index);}private E get(Object[] a, int index) {return (E) a[index];}</pre>
示例
<pre Courier New" !important; font-size: 12px !important;">public class CopyOnWriteArrayListDemo { static class ReadTask implements Runnable { List<String> list;ReadTask(List<String> list) { this.list = list;}public void run() { for (String str : list) { System.out.println(str); }}}static class WriteTask implements Runnable {List<String> list;int index;WriteTask(List<String> list, int index) { this.list = list; this.index = index;}public void run() { list.remove(index); list.add(index, "write_" + index);}}public void run() {final int NUM = 10;// ArrayList 在并发迭代访问时会抛出 ConcurrentModificationException 异常 // List<String> list = new ArrayList<>();CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();for (int i = 0; i < NUM; i++) { list.add("main_" + i);}ExecutorService executorService = Executors.newFixedThreadPool(NUM);for (int i = 0; i < NUM; i++) { executorService.execute(new ReadTask(list)); executorService.execute(new WriteTask(list, i));}executorService.shutdown();}public static void main(String[] args) {new CopyOnWriteArrayListDemo().run();}}</pre>
文末彩蛋
针对于上面所涉及到的知识点我总结出了有1到5年开发经验的程序员在面试中涉及到的绝大部分架构面试题及答案做成了文档和架构视频资料免费分享给大家(包括Dubbo、Redis、Netty、zookeeper、Spring cloud、分布式、高并发等架构技术资料),希望能帮助到您面试前的复习且找到一个好的工作,也节省大家在网上搜索资料的时间来学习,也可以关注我一下以后会有更多干货分享。
资料获取方式 QQ群搜索“708-701-457” 即可免费领取声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。