一、简介

前面的一篇文章我们介绍了ConcurrentHashMap1.7版本版本的源码介绍,我们知道1.7版本的ConcurrentHashMap采用的是分段锁的思想,提高了锁的数量,提高了并发的特性,但是也有其局限性,例如就是并发的数量也就是锁的数量是不可改变的等;我们今天要介绍的1.8版本的ConcurrentHashMap其实也是采用了多锁的思想,不过在1.8中没有了segments这些东西了,每次锁住的数组中的一个元素或者桶(其实也就是数组或者树的头结点),然后锁也和1.7发生变了,使用的是Synchronized锁,1.8中的锁是随着数组的长度发生变化的,提升了并发的数量的灵活性,还有就是1.8的数据结构也发生了一些变化,采用的是数组+链表+红黑树(链表到达阈值会树化),结构如下图所示:

二、基本成员
先介绍一些基本成员,只有了解了这些成员的概念后,才能去更好的去理解方法。
ConcurrentHashMap的一些成员变量

/** node数组的最大容量 2^30 */ private static final int MAXIMUM_CAPACITY = 1 << 30; /** 默认初始化值16,必须是2的冥 */ private static final int DEFAULT_CAPACITY = 16; /** 虚拟机限制的最大数组长度,在ArrayList中有说过,jdk1.8新引入的,需要与toArrar()相关方法关联 */ static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /** 并发数量,1.7遗留,兼容以前版本 */ private static final int DEFAULT_CONCURRENCY_LEVEL = 16; /** 负载因子,兼容以前版本,构造方法中指定的参数是不会被用作loadFactor的,为了计算方便,统一使用 n - (n >> 2) 代替浮点乘法 *0.75 */ private static final float LOAD_FACTOR = 0.75f; /** 链表转红黑树,阈值>=8 */ static final int TREEIFY_THRESHOLD = 8; /** 树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量, * <=UNTREEIFY_THRESHOLD 则untreeify(lo)) */ static final int UNTREEIFY_THRESHOLD = 6; /** 链表转红黑树的阈值,64(map容量小于64时,链表转红黑树时先进行扩容) */ static final int MIN_TREEIFY_CAPACITY = 64;/** 下面这三个和多线程协助扩容有关 */ /** // 扩容操作中,transfer这个步骤是允许多线程的,这个常量表示一个线程执行transfer时,最少要对连续的16个hash桶进行transfer // (不足16就按16算,多控制下正负号就行) private static final int MIN_TRANSFER_STRIDE = 16; /** 生成sizeCtl所使用的bit位数 */ private static int RESIZE_STAMP_BITS = 16; /** 参与扩容的最大线程数 */ private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; /** 移位量,把生成戳移位后保存在sizeCtl中当做扩容线程计数的基数,相反方向移位后能够反解出生成 戳 */ private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; /* * Encodings for Node hash fields. See above for explanation. */ static final int MOVED = -1; // 表示正在转移 static final int TREEBIN = -2; // 表示已经转换为树 static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash /** 可用处理器数量 */ static final int NCPU = Runtime.getRuntime().availableProcessors(); /** 用于存放node数组 */ transient volatile Node<K,V>[] table; /** * baseCount为并发低时,直接使用cas设置成功的值 * 并发高,cas竞争失败,把值放在counterCells数组里面的counterCell里面 * 所以map.size = baseCount + (每个counterCell里面的值累加) */ private transient volatile long baseCount; /** * 控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义 * 当为负数时:-1代表正在初始化,-N就代表在扩容,-N-RS-2就代表有多少个线程在协助扩容 * 当为0时:代表当时的table还没有被初始化 * 当为正数时:表示初始化或者下一次进行扩容的大小 */ private transient volatile int sizeCtl; /** * 通过cas实现的锁,0 无锁,1 有锁 */ private transient volatile int cellsBusy; /** * counterCells数组,具体的值在每个counterCell里面 */ private transient volatile CounterCell[] counterCells;

Node,内部类,主要用于存储键值,有ForwardingNode、ReservationNode、TreeNode和TreeBin四个子类,具体在后面代码用到的时候讲。

static class Node<K,V> implements Entry<K,V> { final int hash; final K key; // val和next 在扩容时可能发生变化,加上volatile关键字,提供可见性与重排序 volatile V val; volatile Node<K,V> next; // 不允许修改val public final V setValue(V value) { throw new UnsupportedOperationException(); }

三、主要方法
上面介绍了一些基本成员,主要介绍一些常用方法,只有红黑树相关方法占时不讲(还没有搞明白红黑树,嘿嘿)。
①、构造方法

** * 无参构造 */ public ConcurrentHashMap() { } /** * 指定初始化大小的构造,不能小于0 * @param initialCapacity */ public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); // cap必须是2的n次方, int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; }

②、初始化方法(采用延迟初始化,在put方法里面)

private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 空的table 才能初始化 if ((sc = sizeCtl) < 0) // 表示其它线程正在初始化或者扩容 // 当前线程把执行权交给其它线程(拥有相同优先级的线程),然后变成可运行状态 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 原子操作表示把SIZECTL设置为-1,正在初始化 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 初始化大小 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 初始化 table = tab = nt; sc = n - (n >>> 2); // 下一次扩容阈值 n*0,75 } } finally { sizeCtl = sc; } break; } } return tab; }

③、put方法(重要,其中的扩容方法比较难理解)
分析:
1)、我们可以通过源码判断key和value不允许为null。
2)、需要判断table有没有初始化,没有调用initTable初始化,然后接着循环。
3)、判断key的hash(调用spread方法)的位置有没有值,证明是第一个,使用cas设置,为什么cas,可能不止一个线程。
4)、判断当前线程的hash是不是MOVED,其实就是节点是不是ForwardingNode节点,ForwardingNode代表正在扩容,至于为什么会是ForwardingNode,这个在扩容的方法里面再讲,如果是ForwardingNode节点就协助扩容,也就是当前也去扩容,然后扩容完毕,在执行循环,协助扩容执行helpTransfer方法。
5)、如果不是扩容、table也初始化了和hash位置也有值了,那证明当前hash的位置是链表或者树,接下来锁住这个节点,进行链表或者树的节点的追加,如果存在相同的key,就替换,最后释放锁。
6)、判断链表的节点数,有没有大于等于8,满足就树化,调用treeifyBin方法,这个方法会在树化前判断大于等于64吗,没有就扩容,调用tryPresize方法,有就树化。
7)、修改节点的数量,调用addCount方法。

public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { // 不允许key或者value 为null if (key == null || value == null) throw new NullPointerException(); // 获取hash int hash = spread(key.hashCode()); int binCount = 0; // 遍历table,死循环,直到插入成功 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) // table 还没有初始化 tab = initTable(); // 初始化 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 当前位置为空 ,直接插入 if (casTabAt(tab, i, null, // 使用cas来进行设置 new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) // 如果在进行扩容,则先进行协助扩容 tab = helpTransfer(tab, f); else { V oldVal = null; // 没有在扩容,头结点也不是空, // 锁住链表或者树的头节点 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { // 普通Node的hash值为key的hash值大于零,而ForwardingNode的是-1,TreeBin是-2 binCount = 1; // 遍历链表 for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 找到了相同的key oldVal = e.val; if (!onlyIfAbsent) e.val = value; // 替换value 结束循环 break; } Node<K,V> pred = e; if ((e = e.next) == null) { // 找到最后一个节点 pred.next = new Node<K,V>(hash, key, value, null); // 把当前节点设置为最后一个节点的next break; } } } else if (f instanceof TreeBin) { // 如果是树结构 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { // 树节点插入,存在就替换 oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) // 如果链表大于等于8,树化 treeifyBin(tab, i); // 树化 if (oldVal != null) // 证明存在相同的key,是替换return旧值 return oldVal; break; } } } addCount(1L, binCount); //数量加1 return null; }

注:上面的put方法用到了initTable、helpTransfer、treeifyBin、tryPresize和addCount方法,接下来我们按照程序的流程讲解下这个几个方法。
initTable方法,这个在前文讲过了。
helpTransfer方法帮助扩容
分析:
1)、这里得讲一下resizeStamp方法Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)),其实这个方法就是获取table的length的二进制的最高位的前面0的个数,然后|上2^15,举个例子吧,假如现在的length为16,那么二进制是多少了16是2^4,所以二进制10000,所以其实就是27 | 2 ^ 15,这里还得顺便讲一下MAXIMUM_CAPACITY = 1 << 30(为什么是2^30了),因为这是正数的最大值,那么再给这个数加上一些值会发生什么了(其实这可能就是为什么要去二进制最高位前面0的个数的原因)。
2)、怎么判断的扩容已经开始了,我们知道sizectl为-1是代表正在初始化,大于0表示已经初始化,如下方法也判断了size小于0,那么什么时候sizectl还会为负数了,其实开始扩容的时候(参考addCount和tryPresize方法,方方法里面都有(rs << RESIZE_STAMP_SHIFT) + 2,这里rs就是上面的resizeStamp的返回值,其实就是左移16位,int的正数的最大值是2^30,再给他加值会变成负数,对rs右移在累加显然已经大于了2^30,所以他是负数),由此判断出扩容已经开始。
3)、什么时候协助扩容了,当前是扩容开始了,但是还没结束,所以下面的满足sc小于的里面的第一if就是判断扩容有没有完成,第二if就是使用cas加入协助扩容的过程。
4)、transfer方法,我们在后面详解。

/** * 帮助扩容 */ final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; // 原table不等于空,当前节点必须是fwd节点,nextTab已经初始化 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { // 其实就是去tab.length二进制最高位前面有多少个0,然后 | 1 << 15 int rs = resizeStamp(tab.length); // nextTab和成员变量一样,table也一样,sizeCtl<0,表示在扩容 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { // sc >>> RESIZE_STAMP_SHIFT) != rs 说明扩容完毕或者有其它协助扩容者 // sc == rs + 1 表示只剩下最后一个扩容线程了,其它都扩容完毕了 // transferIndex <= 0 扩容结束了 // sc == rs + MAX_RESIZERS 到达最大值 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 当前线程参加扩容,sc+1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }

treeifyBin方法,在table的length小于64时会调用tryPresize 先进行扩容,调用tryPresize方法,在下文会进行解释。

private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // 当前table的length小于64,就扩容 tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }

tryPresize 方法,这里的逻辑和helpTransfer都差不多,至于transfer方法主菜我们在后面上。

private final void tryPresize(int size) { // 计算扩容的size int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; // 证明table已经初始了或者还没有初始化 while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; if (tab == null || (n = tab.length) == 0) {// 证明还没有初始化,需要初始化 n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } } // 大于最大容量返回 else if (c <= sc || n >= MAXIMUM_CAPACITY) break; // 已经初始化,并且没有大于最大容量 else if (tab == table) { int rs = resizeStamp(n); // 判断是否需要协助扩容 if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 开始扩容 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } }

transfer扩容方法,比较难理解的一个方法
分析:
1)、看stride这个参数其实就是算每个线程处理的数量,和CPU有关,最小是16.
2)、初始化一个原来二倍的新table就是 nextTable,然后这个过程可能会出错,n<<1可能为负数,设置nextTable和transferIndex,其中transferIndex就是原table的长度。
3)、初始化一个ForwardingNode节点在后面会用到。
4)、死循环for,这个循环就是为每个线程分配任务,然后每个线程处理各自的任务,倒叙分配,举个例子,加入table.length=32,现在的stride为16,第一个线程其实就是32到16(不包含32,因为是索引),第二个线程就是0-15,参考这一段代码((U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0)))),然后遍历每个段,处理节点,知道处理完成,具体逻辑参考代码注释。

/** * 扩容方法 */ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; // n >>> 3(也就是除以8) / cpu个数,每个cpu的每个线程负责的迁移的数量 // 这样的目的是为了每个cpu处理的桶一样多,避免出现任务转移不均匀的现象,如果桶少的话,默认一个cpu(一个线程)处理16个桶 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 扩容table 没有初始化 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 初始化原来的length两倍的table nextTab = nt; } catch (Throwable ex) { // try to cope with OOME // 初始化失败,使用integer的最大值 sizeCtl = Integer.MAX_VALUE; return; // 结束 } // 更新成员变量 nextTable = nextTab; // 更新转移下标,就是运来的table的length transferIndex = n; } // 新table的length int nextn = nextTab.length; // 创建一个fwd节点,用于占位.当别的节点发现这个槽位中有fwd节点时,则跳过这个节点 // 它的hash为MOVED ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 首次推进为true,如果为true说明需要再次推进一个目标(i--),反之如果是false,那么就不能推进下标,需要将当前的下标处理完毕 boolean advance = true; // 完成状态,如果为true,就结束方法 boolean finishing = false; // to ensure sweep before committing nextTab // 死循环,因为是倒着遍历,所以i是点前线程的最大位置(i---),bound是边界,也就是区间里面的最小值 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // 如果当前线程可以向后推进,这个循环就是控制i递减.同时每个线程都会进入这里取得自己需要转移的桶的下标区间 // 1. true while (advance) { int nextIndex, nextBound; // 1. -1 >= 0,false if (--i >= bound || finishing) advance = false; //transferIndex <= 0 说明已经没有需要迁移的桶了 // 1.nextIndex = 16 <= 0 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //更新 transferIndex //为当前线程分配任务,处理的桶结点区间为(nextBound,nextIndex) // 1.16 > 16 ? 16 -16 : 0 区间 16 到0 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; // 0 i = nextIndex - 1;// 15 advance = false; } } // i = 15 nextn = 32 // i < 0 ,表示数据迁移已经完成 // i >= n 和 i + n >= nextn 表示最后一个线程也执行完成了,扩容完成了 // 第二个if里面的i=n if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { // 完成扩容 nextTable = null; // 删除成员变量 table = nextTab; // 更新table sizeCtl = (n << 1) - (n >>> 1); // 更新阈值 return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 表示一个线程退出扩容 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 说明还有其他线程正在扩容 return; // 当前线程结束 // 当前线程为最后一个线程,负责在检查一个整个队列 finishing = advance = true; // i = n; // recheck before commit } } // 待迁移桶为null,用cas把当前节点设置为ForwardingNode节点,表示已经处理 else if ((f = tabAt(tab, i)) == null) // 第一个线程 获取i处的数据为null, advance = casTabAt(tab, i, null, fwd);// 设置当前节点为 fwd 节点 else if ((fh = f.hash) == MOVED) // 如果当前节点为 MOVED,说明已经处理过了,直接跳过 advance = true; // already processed else { // 节点不为空,锁住i位置的头结点 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { // 表示是链表 int runBit = fh & n; // fn表示f.hash & n ,表示获取原来table的位置 Node<K,V> lastRun = f; // lastRun = f for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; // 获取节点的位置 if (b != runBit) { // 如果这个节点和上一个节点的位置不一样,记录节点和位置 runBit = b; // 当前节点的位置 lastRun = p; // 当前节点 } } // 不管runBit有没有发生变化,只可能是0或者n, // ln表示的不变化的节点 // hn表示的是变化节点的位置 if (runBit == 0) { // 如果是0,那么ln=lastRun就是位置没有变的这条链 hn=null变化链需要遍历重组 ln = lastRun; hn = null; } else { // 如果当前节点不是0,hn=lastRun这个变化链,ln=null没有变化的链需要遍历重组 hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 原来位置 setTabAt(nextTab, i, ln); // 变化位置 setTabAt(nextTab, i + n, hn); // 原来table的位置设置fwd节点,表示扩容 setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }

addCount主要是用于修改map的size,和扩容用的,这里我们只看后半部分扩容部分,修改count部分在map的size方法讲解

private final void addCount(long x, int check) { CounterCell[] as; long b, s; // 尝试使用cas更新baseCount失败 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; // 在counterCells没有初始化,或者尝试cas更新当前线程的CounterCell失败时 // 调用fullAddCount更新 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } // check >= 0,新加入一个值 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // s代表了 现在map的数据量 // sc= 12 ,证明刚刚初始化,没有进行扩容 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && // 当前容量大于sc,table已经有值,table的cap小于最大cap (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); // 这一步不好理解, // Integer.numberOfLeadingZeros(n) 其实就是最高位前面有多少个0,n代表table的长度 // | (1 << (RESIZE_STAMP_BITS - 1)) 2^15 二进制16位,第16位1,其余15位0 // 其实就是相加 // 表示正在扩容 if (sc < 0) { // sc >>> RESIZE_STAMP_SHIFT // 扩容结束 // 只有最后一个线程在扩容 // sc == rs + MAX_RESIZERS 达到最大数 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // sc 加1,表示有一个线程,协助参加了扩容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 表示没有正在进行扩容,开始扩容 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }

④、get方法(get方法的逻辑相对就要简单点了,请看代码注释)

// get方法 public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // 获取hash值 int h = spread(key.hashCode()); // table不为null,table已经初始化,通过hash查找的node不为nul if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { // hash相等 if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 找到了相同的key return e.val; // 返回当前e的value } else if (eh < 0) // hash小于0,说明是特殊节点(TreeBin或ForwardingNode)调用find return (p = e.find(h, key)) != null ? p.val : null; // 不是上面的情况,那就是链表了,遍历链表 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }

⑤、size方法(和1.7的处理方式截然不同)

public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }

CounterCell[]这个数组就是记录的map的count,这里就不得不讲一下addCount方法的前半部分,只有理解了每次添加一个元素,count是怎么处理的,才能明白为什么要有这个数组
CounterCell 内部类

static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } }

addCount方法,只看前半部分,我们可以看出其实修改map的count,先是使用cas修改basecount,然后可能存在多个线程同时修改,所以会失败,失败就用CounterCell[]数组处理,调用fullAddCount方法。

/** * * @param x 1L * @param check 默认值是0,等于0时,代表插入为null * 不等于0时,check等于2代表了树,其它代表了链表 */ private final void addCount(long x, int check) { CounterCell[] as; long b, s; // 尝试使用cas更新baseCount失败 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; // 在counterCells没有初始化,或者尝试cas更新当前线程的CounterCell失败时 // 调用fullAddCount更新 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } } }

fullAddCount,主要用来记录竞争导致的basecount修改失败的这些操作,其实主要就是把这些失败的次数记录在CounterCell[]数组里面,然后在统计size时,就是basecount+CounterCell[]里面的次数。
分析:CounterCell[]可以看做是一个map,因为好多处理方法和map类似,我们先来看下数组的长度,默认是2,其实是可以扩容的,每次扩容2倍(扩容不能超过cpu的数量),然后怎么插入值了,和map类似都需要确定位置,那么怎么确定位置了,map是通过key的hashcode,而这个数组是通过一个并发随机数ThreadLocalRandom来说生成一个随机数,然后通过这随机数&数组的长度减一确定位置,是不是很map一样,还有就是他没有链表这个概念,那冲突了怎么办了,其实就是累加,这个数组还有锁的概念就是cellsBusy,因为这里可能也是多个线程来执行,等于零就表示没有锁,等于一就表示有锁,在插入新值、扩容和创建数组这些操作都需要获取锁,具体的方法的概念就到这里,具体的逻辑参考代码注释。

/** * * @param x 需要更新的值 * @param wasUncontended 是否发生竞争 */ private final void fullAddCount(long x, boolean wasUncontended) { int h; // 初始化一个随机值 // ThreadLocalRandom是JDK 7之后提供并发产生随机数,能够解决多个线程发生的竞争争夺。 if ((h = ThreadLocalRandom.getProbe()) == 0) { // 为当前线程初始化一个随机值 ThreadLocalRandom.localInit(); // force initialization // 获取这个值 h = ThreadLocalRandom.getProbe(); // 由于重新生成了probe,未冲突标志位设置为true wasUncontended = true; } // 冲突标志位,决定了扩容还是不扩容 boolean collide = false; // True if last slot nonempty for (;;) { CounterCell[] as; CounterCell a; int n; long v; // counterCells数组已经被初始化了 if ((as = counterCells) != null && (n = as.length) > 0) { // 求在counterCells中的位置,与hash一样求%,因为counterCells数组长度是2的冥 if ((a = as[(n - 1) & h]) == null) { // 当前位置没有CounterCell if (cellsBusy == 0) {// Try to attach new Cell // 创建新的CounterCell CounterCell r = new CounterCell(x); // Optimistic create if (cellsBusy == 0 && // cellsBusy=0还没有加锁,使用cas进行加锁,cellsBusy设置为1 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try { // Recheck under lock CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; // 放进数组 created = true; } } finally { cellsBusy = 0; } // 操作成功,退出死循环 if (created) break; continue; // Slot is now non-empty } } collide = false; } // 在调用fullAddCount之前就发生了竞争 // 然后wasUncontended=true,未发生竞争,然后重新循环更新 else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash // 当前位置的CounterCell不为空,进行累加 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; // 数组被扩容了 // 数组大于了cpu数量 // 设置冲突标志, collide = false,防止扩容 else if (counterCells != as || n >= NCPU) collide = false; // 设置冲突标志,重新执行循环 // 如果下次循环执行到该分支,并且冲突标志仍然为true // 那么会跳过该分支,到下一个分支进行扩容// At max size or stale // 这个位置决定了扩容还是不扩容 false就不扩容,true就扩容 else if (!collide) collide = true; // 扩容,CAS设置cellsBusy值 else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {// Expand table unless stale // 容量扩大一倍 CounterCell[] rs = new CounterCell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } // 为当前线程重新计算probe h = ThreadLocalRandom.advanceProbe(h); } // 证明数组为空 // 获取锁,初始化数组 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table if (counterCells == as) { // 没有被其它线程初始化 // 初始化,默认长度2 CounterCell[] rs = new CounterCell[2]; // 创建新的CounterCell(x),位置为rs[h&(2-1)] rs[h & 1] = new CounterCell(x); // 赋值给成员变量counterCells counterCells = rs; // 初始化成功 init = true; } } finally { // 释放锁 cellsBusy = 0; } if (init) // 结束循环 break; } // 证明在CounterCell上也存在竞争,那么尝试对baseCount进行更新 else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }

四、总结

本文主要基于jdk1.8介绍了ConcurrentHashMap的一部分常用方法,主要讲了get、put和size者三个常用方法,其中比较难理解的是put方法,其中在扩容和协助扩容作者的设计让人眼前一亮,大师就是大师,绝对值得你去一探究竟,还有就是size也采用了分治的思想(不知道这个词合适不,个人理解),就是统计累加count时,没有竞争的单独处理,有竞争的单独处理,而没有采用自旋,极大的提升了效率;1.8和1.7 的区别很大,首先数据结构发生变化,其次锁也发生了变化、扩容侧率和size的统计等;最后附上学习学习Map和ConcurrentHashMap的一点小建议,如果想从1.7和1.8两个版本看,建议从这个方向(jdk版本一样,数据结构基本没有发生变化)HashMap 1.7>>ConcurrentHashMap 1.7>>HashMap 1.8>>[ConcurrentHashMap 1.8]()
参考 《Java 并发编程的艺术》