谈谈ConcurrentHashMap1.7和1.8的不同实现

ConcurrentHashMap

在多线程环境下,使用HashMap进行put操作时存在丢失数据的情况,为了避免这种bug的隐患,强烈建议使用ConcurrentHashMap代替HashMap,为了对ConcurrentHashMap有更深入的了解,本文将对ConcurrentHashMap1.7和1.8的不同实现进行分析。  

1.7实现

数据结构

jdk1.7中采用Segment + HashEntry的方式进行实现,结构如下:
  1. if (c * ssize < initialCapacity)
  2.     ++c;
  3. int cap = MIN_SEGMENT_TABLE_CAPACITY;
  4. while (cap < c)
  5.     cap <<= 1;
其中Segment在实现上继承了ReentrantLock,这样就自带了锁的功能。  

put实现

当执行put方法插入数据时,根据key的hash值,在Segment数组中找到相应的位置,如果相应位置的Segment还未初始化,则通过CAS进行赋值,接着执行Segment对象的put方法通过加锁机制插入数据,实现如下: 场景:线程A和线程B同时执行相同Segment对象的put方法 1、线程A执行tryLock()方法成功获取锁,则把HashEntry对象插入到相应的位置; 2、线程B获取锁失败,则执行scanAndLockForPut()方法,在scanAndLockForPut方法中,会通过重复执行tryLock()方法尝试获取锁,在多处理器环境下,重复次数为64,单处理器重复次数为1,当执行tryLock()方法的次数超过上限时,则执行lock()方法挂起线程B; 3、当线程A执行完插入操作时,会通过unlock()方法释放锁,接着唤醒线程B继续执行;  

size实现

因为ConcurrentHashMap是可以并发插入数据的,所以在准确计算元素时存在一定的难度,一般的思路是统计每个Segment对象中的元素个数,然后进行累加,但是这种方式计算出来的结果并不一样的准确的,因为在计算后面几个Segment的元素个数时,已经计算过的Segment同时可能有数据的插入或则删除, 在1.7的实现中,采用了如下方式:
  1. try {
  2.     for (;;) {
  3.         if (retries++ == RETRIES_BEFORE_LOCK) {
  4.             for (int j = 0; j < segments.length; ++j)
  5.                 ensureSegment(j).lock(); // force creation
  6.         }
  7.         sum = 0L;
  8.         size = 0;
  9.         overflow = false;
  10.         for (int j = 0; j < segments.length; ++j) {
  11.             Segment<K,V> seg = segmentAt(segments, j);
  12.             if (seg != null) {
  13.                 sum += seg.modCount;
  14.                 int c = seg.count;
  15.                 if (c < 0 || (size += c) < 0)
  16.                     overflow = true;
  17.             }
  18.         }
  19.         if (sum == last)
  20.             break;
  21.         last = sum;
  22.     }
  23. finally {
  24.     if (retries > RETRIES_BEFORE_LOCK) {
  25.         for (int j = 0; j < segments.length; ++j)
  26.             segmentAt(segments, j).unlock();
  27.     }
  28. }
先采用不加锁的方式,连续计算元素的个数,最多计算3次: 1、如果前后两次计算结果相同,则说明计算出来的元素个数是准确的; 2、如果前后两次计算结果都不同,则给每个Segment进行加锁,再计算一次元素的个数;  

1.8实现

数据结构

1.8中放弃了Segment臃肿的设计,取而代之的是采用Node + CAS + Synchronized来保证并发安全进行实现,结构如下: 只有在执行第一次put方法时才会调用initTable()初始化Node数组,实现如下:
  1. private final Node<K,V>[] initTable() {
  2.     Node<K,V>[] tab; int sc;
  3.     while ((tab = table) == null || tab.length == 0) {
  4.         if ((sc = sizeCtl) < 0)
  5.             Thread.yield(); // lost initialization race; just spin
  6.         else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
  7.             try {
  8.                 if ((tab = table) == null || tab.length == 0) {
  9.                     int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
  10.                     @SuppressWarnings("unchecked")
  11.                     Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  12.                     table = tab = nt;
  13.                     sc = n - (n >>> 2);
  14.                 }
  15.             } finally {
  16.                 sizeCtl = sc;
  17.             }
  18.             break;
  19.         }
  20.     }
  21.     return tab;
  22. }
 

put实现

当执行put方法插入数据时,根据key的hash值,在Node数组中找到相应的位置,实现如下: 1、如果相应位置的Node还未初始化,则通过CAS插入相应的数据:
  1. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  2.     if (casTabAt(tab, i, nullnew Node<K,V>(hash, key, value, null)))
  3.         break;                   // no lock when adding to empty bin
  4. }
  2、如果相应位置的Node不为空,且当前该节点不处于移动状态,则对该节点加synchronized锁,如果该节点的hash不小于0,则遍历链表更新节点或插入新节点:
  1. if (fh >= 0) {
  2.     binCount = 1;
  3.     for (Node<K,V> e = f;; ++binCount) {
  4.         K ek;
  5.         if (e.hash == hash &&
  6.             ((ek = e.key) == key ||
  7.              (ek != null && key.equals(ek)))) {
  8.             oldVal = e.val;
  9.             if (!onlyIfAbsent)
  10.                 e.val = value;
  11.             break;
  12.         }
  13.         Node<K,V> pred = e;
  14.         if ((e = e.next) == null) {
  15.             pred.next = new Node<K,V>(hash, key, value, null);
  16.             break;
  17.         }
  18.     }
  3、如果该节点是TreeBin类型的节点,说明是红黑树结构,则通过putTreeVal方法往红黑树中插入节点:
  1. else if (f instanceof TreeBin) {
  2.     Node<K,V> p;
  3.     binCount = 2;
  4.     if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
  5.         oldVal = p.val;
  6.         if (!onlyIfAbsent)
  7.             p.val = value;
  8.     }
  9. }
  4、如果binCount不为0,说明put操作对数据产生了影响,如果当前链表的个数达到8个,则通过treeifyBin方法转化为红黑树,如果oldVal不为空,说明是一次更新操作,没有对元素个数产生影响,则直接返回旧值:
  1. if (binCount != 0) {
  2.     if (binCount >= TREEIFY_THRESHOLD)
  3.         treeifyBin(tab, i);
  4.     if (oldVal != null)
  5.         return oldVal;
  6.     break;
  7. }
  5、如果插入的是一个新节点,则执行addCount()方法尝试更新元素个数baseCount;  

size实现

1.8中使用一个volatile类型的变量baseCount记录元素的个数,当插入新数据或则删除数据时,会通过addCount()方法更新baseCount,实现如下:
  1. if ((as = counterCells) != null ||
  2.     !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
  3.     CounterCell a; long v; int m;
  4.     boolean uncontended = true;
  5.     if (as == null || (m = as.length - 1) < 0 ||
  6.         (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
  7.         !(uncontended =
  8.           U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
  9.         fullAddCount(x, uncontended);
  10.         return;
  11.     }
  12.     if (check <= 1)
  13.         return;
  14.     s = sumCount();
  15. }
  1、初始化时counterCells为空,在并发量很高时,如果存在两个线程同时执行CAS修改baseCount值,则失败的线程会继续执行方法体中的逻辑,使用CounterCell记录元素个数的变化;   2、如果CounterCell数组counterCells为空,调用fullAddCount()方法进行初始化,并插入对应的记录数,通过CAS设置cellsBusy字段,只有设置成功的线程才能初始化CounterCell数组,实现如下:
  1. else if (cellsBusy == 0 && counterCells == as &&
  2.          U.compareAndSwapInt(this, CELLSBUSY, 01)) {
  3.     boolean init = false;
  4.     try {                           // Initialize table
  5.         if (counterCells == as) {
  6.             CounterCell[] rs = new CounterCell[2];
  7.             rs[h & 1] = new CounterCell(x);
  8.             counterCells = rs;
  9.             init = true;
  10.         }
  11.     } finally {
  12.         cellsBusy = 0;
  13.     }
  14.     if (init)
  15.         break;
  16. }
  3、如果通过CAS设置cellsBusy字段失败的话,则继续尝试通过CAS修改baseCount字段,如果修改baseCount字段成功的话,就退出循环,否则继续循环插入CounterCell对象;
  1. else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
  2.     break;
所以在1.8中的size实现比1.7简单多,因为元素个数保存baseCount中,部分元素的变化个数保存在CounterCell数组中,实现如下:
  1. public int size() {
  2.     long n = sumCount();
  3.     return ((n < 0L) ? 0 :
  4.             (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
  5.             (int)n);
  6. }
  7. final long sumCount() {
  8.     CounterCell[] as = counterCells; CounterCell a;
  9.     long sum = baseCount;
  10.     if (as != null) {
  11.         for (int i = 0; i < as.length; ++i) {
  12.             if ((a = as[i]) != null)
  13.                 sum += a.value;
  14.         }
  15.     }
  16.     return sum;
  17. }
通过累加baseCountCounterCell数组中的数量,即可得到元素的总个数;

发表评论

目前评论:5

  • avatar 言曌 作者

    多级评论!

    • avatar 言曌 作者

      @言曌回复!

      • avatar 言曌 作者

        @言曌再回复!

  • avatar 言曌 作者

    评论!!!

    • avatar 言曌 作者

      @言曌回复测试!!