Java程序员都曾被问到的一个问题是:

  • 为什么HashMap是线程不安全的?
  • 为什么ConcurrentHashMap是线程安全的?

为什么HashMap是线程不安全的?

Fail-Fast 机制

  • 如果在使用迭代器的过程中有其他线程修改了map,那么将抛出ConcurrentModificationException,这就是所谓fail-fast策略。
  • 看源码的时候我们会看到,在ArrayList,LinkedList,HashMap的内部,有一个int类型的modCount对象,对上述集合内容的修改都将增加这个值。modCount会在迭代器Iterator中使用,在迭代器的构造函数中,有这么一行代码,expectedModCount = modCount。在nextEntity和remove方法的调用过程中,如果modCount != expectedModCount,抛出ConcurrentModificationException异常。
final Entry<K,V> nextEntry() {
    if (modCount != expectedModCount)
        throw new ConcurrentModificationException();
}
public void remove() {
    if (modCount != expectedModCount)
        throw new ConcurrentModificationException();
}

死循环

  • HashMap中有两个重要的属性,一个是容量,一个是加载因子,容量必须是2的n次方,这是为了保证key hash之后的值可以均匀的分散在数组里。当前结点个数等于容量*加载因子之后,需要进行扩容,扩展为原来的二倍,这个过程称为rehash。
  • 因为hashmap是头插入法,新的结点插入在数组的头结点。当线程1执行rehash到newTable[i] = e时,线程1被挂起,此时线程2开始执行rehash并完成,这样会导致结点的循环引用问题。
void transfer(Entry[] newTable, boolean rehash) {
   int newCapacity = newTable.length;
    for (Entry<K,V> e : table) {// table是老数组
        while(null != e) {
            Entry<K,V> next = e.next;
            if (rehash) {
                e.hash = null == e.key ? 0 : hash(e.key);
            }
            int i = indexFor(e.hash, newCapacity);
            e.next = newTable[i];
            newTable[i] = e;
            e = next;
        }
    }
}

为什么ConcurrentHashMap是线程安全的?

java 7中的结构

在这里插入图片描述

  • ConcurrentHashMap的数据结构是由一个Segment数组和多个HashEntry组成,Segment数组的意义就是将一个大的table分割成多个小的table来进行加锁,而每一个Segment元素存储的是HashEntry数组+链表。
  • 在进行数据的定位时,会首先找到 segment, 然后在 segment 中定位 bucket。
  • 如果多线程操作同一个 segment, 就会触发 segment 的锁 ReentrantLock, 这就是分段锁的基本实现原理。
  • Segment 继承 ReentrantLock 锁,用于存放数组 HashEntry[]。segment在初始化后不可以扩容,但是HashEntry可以扩容。
static final class Segment<K,V> extends ReentrantLock implements Serializable {
}
  • 在插入数据的时候,首先需要找到segment,调用该segment的put方法,将新节点插入segment中。在segment中获取锁,如果获取失败,则走scanAndLockForPut方法,如果获取成功,返回,如果失败,则在循环中一直获取,直到成功。
public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 根据key的hash再次进行hash运算
    int hash = hash(key.hashCode());
    // 基于hash定位segment数组的索引。
    // hash值是int值,32bits。segmentShift=28,无符号右移28位,剩下高4位,其余补0。
    // segmentMask=15,二进制低4位全部是1,所以j相当于hash右移后的低4位。
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
    // 找到对应segment
        s = ensureSegment(j);
    // 调用该segment的put方法,将新节点插入segment中
    return s.put(key, hash, value, false);
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 是否获取锁,失败自旋获取锁(直到成功)
    // 拿到结点之后,对结点进行插入操作。
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value); 
}

java 8中的结构

在这里插入图片描述

  • Java8中不再使用segment,不再使用分段锁,而是使用大的数组,为了解决hash碰撞,在链表长度超过一定值(默认8)的情况下,将链表转为红黑树实现,寻址的复杂度从O(n)转换为Olog(n)。
  • 并发控制使用Synchronized和CAS来操作。
  • ConcurrentHashMap的构造函数是一个空函数,初始化是在put中实现的。
  • Node中value和next都用volatile修饰,保证并发的可见性。value和next都用volatile修饰,保证并发的可见性。

悲观锁:可以完全保证数据的独占性和正确性,因为每次请求都会先对数据进行加锁, 然后进行数据操作,最后再解锁,而加锁释放锁的过程会造成消耗,所以性能不高;
乐观锁:假设没有冲突发生,如果检测到冲突,就失败重试,直到成功。数据库的乐观锁是通过版本控制来实现的。

  • 在put的过程中,不断的尝试,因为在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject,这是一种乐观锁的实现方式。
  • 对要加入的特定数组的节点tab[i]加锁,遍历链表或红黑树,对数据插入
// 不参与序列化
transient volatile Node<K,V>[] table; // volatile保证线程间可见
 // 记录容器的容量大小,通过CAS更新
private transient volatile long baseCount;

/**
 * 初始化和扩容控制参数。为负数时表示table正在被初始化或resize:-1(初始化),-(1+扩容线程数)

 * sizeCtl默认值为0,大于0是扩容的阀值
 */
private transient volatile int sizeCtl;

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
     // while(true)循环,不断的尝试,因为在table的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
       // 如果数组(HashMap)还未初始化,进行初始化操作(CAS操作)
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
       // 计算新插入数据在数组(HashMap)中的位置,如果该位置上的节点为null,直接放入数据(CAS操作)
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
       // 如果该节点的hash值为MOVED,说明正在进行扩容操作或者已经扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
       // 
            else {
                V oldVal = null;
                synchronized (f) {  // 对特定数组节点tab[i]加锁
                    if (tabAt(tab, i) == f) { // 判断tab[i]是否有变化
                        if (fh >= 0) { // 插入操作
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) { // 遍历链表
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) { // 如果新插入值和tab[i]处的hash值和key值一样,进行替换
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) { // 如果此节点为尾部节点,把此节点的next引用指向新数据节点
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) { // 如果是一颗红黑树
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD) //如果数组节点的链表长度超过限定长度,转换为一颗红黑树
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount); 
        return null;
    }

原文链接:https://blog.csdn.net/u010659877/article/details/108790315

最后修改日期:2020年9月27日