ConcurrentHashMap 源码阅读


转载

源自 Gentryhuang 大佬,感谢大佬的帮助!!!万分感谢!!!

概述

HashMap 不是线程安全的,在高并发环境下可能会出现系列问题,如链表成环(JDK1.7)、数据覆盖(JDK1.8)。想要避免 HashMap 的线程安全问题有很多办法,比如改用 HashTable 或者使用 Collections.synchronizedMap() 将 HashMap 包装成线程安全的。虽然这两种方法能解决线程安全问题,但是性能成了一个问题,无论读写操作都会给整个集合加锁,导致同一时间的其他操作阻塞。这个时候 ConcurrentHashMap 就应运而生了,它是基于 HashMap 实现的线程安全的集合。

考虑到 ConcurrentHashMap 在 JDK 不同的版本中实现差异较大,我们先对 JDK 1.7 版本进行简单介绍,然后重点介绍 JDK 1.8 版本中的实现。

JDK1.7

JDK 1.7 对 ConcurrentHashMap 的实现关键是:HashMap + Segment。哈希项的结构如下:

// 哈希桶数组
transient volatile Node<K,V>[] table;

static final class HashEntry<K,V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;
}

HashEntry 的成员变量 value 和 next 是被关键字 volatile 修饰的,也就是说所有线程都可以及时检查到其他线程对这两个变量的改变,因此可以在不加锁的情况下读取到这两个引用的最新值。

Segment 通过继承 ReentrantLock 来进行加锁,通过每次锁住一个 Segment 来降低锁的粒度并保证了每个 Segment 内操作的线程安全性,从而实现全局线程安全。Segment 本身就相当于一个 HashMap 对象,每个 Segment 包含一个 HashEntry 数组。整个 ConcurrentHashMap 的结构如下:

可以看到,ConcurrentHashMap 是一个二级哈希表,桶数组是 Segment 数组,数组中的每个元素对应一个 HashMap 。

ConcurrentHashMap 设计成二级哈希表的目的就是采用锁分段技术提高并发度,每个 Segment 各自持有一把锁,Segment 之间互不影响。在保证线程安全的同时降低了锁的粒度,使并发操作效率更高。下面看看 ConcurrentHashMap 并发读写的几种情况:

  • 不同 Segment 的并发写入

    不同 Segment 的写入是可以并发执行的,因为每个 Segment 持有的锁不是同一个。
  • 同一个 Segment 的并发读写

    同一个 Segment 的读写可以并发执行。
  • 同一个 Segment 的并发写入

    Segment 的写入是需要上锁的,因此对同一个 Segment 的并发写入会被阻塞。

了解了 ConcurrentHashMap 并发读写的情况后,下面我们对常见的几个方法的执行流程简单说明。

get 方法

  1. 对输入的 key 做 hash 运算得到 hash 值;
  2. 通过 hash 值对 Segment 数组定位对应的 Segment 对象;
  3. 再次通过 hash 值定位到 Segment 中数组的具体桶;

可以看到,计算 key 的 hash 值后,要进行两次定位,第一次为了定位这个 key 是属于哪个 Segment;第二次为了定位这个 key 属于 Segment 中数组哪个桶;

put 方法

  1. 对输入的 key 做 hash 运算得到 hash 值;
  2. 通过 hash 值对 Segment 数组定位对应的 Segment 对象;
  3. 获取 Segment 对应的可重入锁;
  4. 再次通过 hash 值定位到 Segment 中数组的具体桶;
  5. 插入或覆盖 HashEntry 对象;
  6. 释放可重入锁;

size

size 方法是统计 ConcurrentHashMap 中的元素数量,需要把各个 Segment 内的元素数量汇总起来。

  1. 遍历所有的 Segment;
  2. 把 Segment 的元素数量累加起来;
  3. 把 Segment 的修改次数累加起来;
  4. 判断所有 Segment 的总修改次数是否大于上一次的总修改次数。如果大于,说明统计过程中有修改,重新统计,尝试次数+1;如果不是。说明没有修改,统计结束。
  5. 如果尝试次数超过阈值,则对每一个 Segment 加锁,再重新统计。
  6. 判断所有 Segment 的总修改次数是否大于上一次的总修改次数,由于已经加锁,次数一定和上次相等。
  7. 依次释放 Segment 锁,统计结束;

说明:这里的 size 方法的思想有点类似乐观锁和悲观锁。为了尽量不锁住 Segment ,先乐观地统计元素数量,当尝试一定次数后仍然失败才会加锁统计。

小结

ConcurrentHashMap 的结构图如下:


  • ConcurrentHashMap 是线程安全的,其读取不需要加锁,通过引入 Segment 将写锁粒度缩小,每个 Segment 中的桶数组就相当于 HashTable;
  • 由于引入了 Segment ,在读取和写入的时候需要做两次映射定位,第一次定位 Segment,第二次定位 Segment 中数组的桶;
  • 到了第二次定位桶的过程,就和 HashMap 是一致的了;

JDK1.8

相比 JDK 1.7 使用的 Segment 保证线程安全,JDK 1.8 进行了重写式优化,取消了 Segment 的设计,取而代之的是通过 CAS 操作和 synchronized 关键字来实现线程安全。当然,在存储结构上仍然基于 HashMap 。

ConcurrentHashMap 主要 UML 类图如下:

源码分析

写到这里,脑海里又响起了一阵声音,为什么要研究 ConcurrentHashMap 源码?我始终认为知其然知其所以然,才能算得上真正了解一个事物。在分析源码之前,我们以问题的方式进行驱动。

  • ConcurrentHashMap 是怎么保证线程安全的?如,初始化时怎么保证安全、扩容时怎么保证安全、如何安全地执行读写操作、如何安全获取元素个数。
  • ConcurrentHashMap 并发效率是如何提高的?怎么控制并发的。

属性

    /* ---------------- Constants -------------- */
    /**
     * 最大容量,当两个构造函数中任何一个带参数的函数隐式指定较大的值时使用
     */
    private static final int MAXIMUM_CAPACITY = 1 << 30;

    /**
     * 默认容量大小 16,大小必须是 2^N
     */
    private static final int DEFAULT_CAPACITY = 16;

    /**
     * 桶的树化阈值
     */
    static final int TREEIFY_THRESHOLD = 8;

    /**
     * 桶的链表还原阈值
     */
    static final int UNTREEIFY_THRESHOLD = 6;

    /**
     * 最小树化容量阈值:使用红黑树时最小的表容量。当 HashMap 中的容量 >= 该值时,才允许树形化链表即将链表转成红黑树
     */
    static final int MIN_TREEIFY_CAPACITY = 64;

    /**
     * sizeCtl 中用于生成标记的位数。
     */
    private static int RESIZE_STAMP_BITS = 16;

    /**
     * 可以帮助调整大小的最大线程数。
     */
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

    /**
     * 在 sizeCtl 中记录大小标记的位移,为 16
     */
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

    /*
     * 节点中的 hash 
     */
    static final int MOVED = -1; //  ForwardingNode 类型节点的 hash 值
    static final int TREEBIN = -2; // 红黑树的根节点的 hash 值
    static final int RESERVED = -3; // hash for transient reservations
    static final int HASH_BITS = 0x7fffffff; // 普通节点哈希的可用位

    /**
     * CPU 的数量,以限制某些大小
     */
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    /* ---------------- Fields -------------- */

    /**
     * 存储数据的 Node 数组,长度是 2 的幂
     *
     * 使用 volatile 来保证每次获取到的都是最新的值
     */
    transient volatile Node<K, V>[] table;

    /**
     * 下一个要使用的表;仅在调整大小时为非空。
     */
    private transient volatile Node<K, V>[] nextTable;

    /**
     * 基本计数器值,主要在没有争用时使用
     */
    private transient volatile long baseCount;

    /**
     * 多个含义,要重点理解
     */
    private transient volatile int sizeCtl;

    /**
     * 调整大小时要拆分的下一个表索引(加一个)。
     */
    private transient volatile int transferIndex;

    /**
     * 调整大小和/或创建 CounterCell 时使用自旋锁(通过 CAS 锁定)。
     * 0:不 busy  1:busy
     */
    private transient volatile int cellsBusy;

    /**
     * 计数单元桶。当非空时,大小是 2 的幂。
     */
    private transient volatile CounterCell[] counterCells;

ConcurrentHashMap 中的属性比较多,属于 HashMap 中的属性就不再说明,下面对核心属性进行说明。

MOVED: ForwardingNode 类型节点的 hash 值,在数组扩容的过程中如果旧数组的某个桶元素为空或者迁移完毕就会使用 ForwardingNode 对象填充,作为一个标记;
baseCount: 记录元素个数的基本计数器,主要在没有竞争的情况下使用;
counterCells: 元素个数的计数单元桶,在并发环境下将元素个数的记录分摊到不同的计数器上,缓解单个计数器的竞争;
cellsBusy: 调整或创建计数单元 CounterCell 时的标记,表示某个线程竞争成功;

sizeCtl

sizeCtl 这个属性非常重要,对理解整个 ConcurrentHashMap 至关重要。

  1. 正常情况下,sizeCtl 存储的是扩容阈值,固定为数组容量的 0.75 倍,默认值为 0;
  2. 当初始化数组时,sizeCtl 会被设置为 -1,表示某个线程抢到了初始化数组的资格,没有抢到资格的线程自旋以等待数组初始化完成。数组初始化完成后,会把 sizeCtl 设置为扩容的阈值;
  3. 当数组达到扩容条件时,sizeCtl 不再存储扩容阈值,而是用来标识正处于扩容过程。在扩容前,会将 sizeCtl 设置为一个负数,在扩容完成后会重新计算扩容阈值并赋值到 sizeCtl 。

其中,扩容过程中 sizeCtl 为负数的情况不太好理解,下面我们详细说说这种情况下的 sizeCtl 为负数时的组成。假设场景为某时刻数组的容量为 16 ,此时达到了扩容的条件。

第一步:扩容前先标识数组处于扩容的过程,即设置 sizeCtl 的值;

线程通过执行 U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2) 代码片段设置 sizeCtl 的值,因为此时数组容量为 16,针对该容量扩容的标识值 rs=32795(计算出来的),对应的二进制数为:1000 0000 0001 1011 0000 0000 0000 0010 ,具体组成如下图:

第二步:一旦扩容开始了,执行写操作(新增、删除)的线程发现集合处于扩容过程,此时就会加入到迁移元素的流程中,加快扩容进度;其中“发现”的情况如下:

  • 线程在新增元素后,如果判断达到扩容条件,那么会进入到扩容流程,如果此时已经有线程在扩容了,那么当前线程会加入到迁移元素的流程;
  • 线程在定位数组桶时,发现桶中元素类型是 ForwardingNode ,那么会先加入到迁移元素流程,完成迁移后再执行写操作;

构造方法

    /**
     * 使用默认初始表大小 (16) 创建一个新的空映射。
     */
    public ConcurrentHashMap() {
    }

    /**
     * Creates a new, empty map with an initial table size
     * accommodating the specified number of elements without the need
     * to dynamically resize.
     * 创建一个新的空映射,其初始表大小可容纳指定数量的元素,无需动态调整大小。
     *
     * @param initialCapacity The implementation performs internal
     *                        sizing to accommodate this many elements.
     * @throws IllegalArgumentException if the initial capacity of
     *                                  elements is negative
     */
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();

        // 根据传入容量大小计算容量,返回大于 initialCapacity 且最近的2的整数次幂的数
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                MAXIMUM_CAPACITY :
                tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));

        // 将容量大小赋值给 sizeCtl,初始化后 sizeCtl 作为扩容阈值
        this.sizeCtl = cap;
    }

初始化 ConcurrentHashMap 时底层数组是没有进行初始化的,会等到第一次添加元素时才会初始化。此外,相比 HashMap 控制数组容量及填充度分别使用了 thresholdloadFactor,ConcurrentHashMap 使用了 sizeCtl 属性来控制,具体如下:

  • 非扩容的情况下,sizeCtl 表示的语意同 threshold;
  • 在计算扩容阈值时,使用了固定的负载因子 0.75;

这个在后续的源码中会有体现。

put() - 新增元素

    /**
     * key 和 value,都不允许为 null
     */
    public V put(K key, V value) {
        // 默认覆盖式
        return putVal(key, value, false);
    }

可以看到,使用 put 方法默认式覆盖式新增元素。

    /**
     * 整体流程和 HashMap 类似
     *
     * @param key          key
     * @param value        value
     * @param onlyIfAbsent 是否覆盖
     * @return
     */
    final V putVal(K key, V value, boolean onlyIfAbsent) {

        // key 、value 都不允许为空
        if (key == null || value == null) throw new NullPointerException();

        // 得到 hash 值
        int hash = spread(key.hashCode());

        // 用于记录要插入的元素所在桶的元素个数,主要用于判断是否将链表转为红黑树的情况
        int binCount = 0;

        // 自旋,结合 CAS 使用(如果 CAS 失败,则会重新取整个桶进行下面的流程),直到 put 操作完成后退出循环
        for (Node<K, V>[] tab = table; ; ) {
            Node<K, V> f;
            int n, i, fh;

            //  1 如果数组为空,则进行数组初始化
            if (tab == null || (n = tab.length) == 0)
                // 初始化数组
                tab = initTable();

                // 2 定位到 hash 值对应的数组下标,得到第一个节点 f
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {

                // 如果数组该位置没有节点,则使用一次 CAS 操作将这个新值放入其中即可。
                // 如果 CAS 失败,说明是并发操作,进入到下一个循环重试;如果 CAS 成功,则 break ,流程结束
                if (casTabAt(tab, i, null,
                        new Node<K, V>(hash, key, value, null)))
                    // 插入成功,退出循环
                    break;                   // no lock when adding to empty bin
            }

            // 3 如果 hash 值对应的位置有节点,且取的第一个节点的 hash 值为 MOVED ,说明当前数组处于扩容过程,则当前线程帮忙一起迁移元素,
            // 然后再执行插入元素操作
            else if ((fh = f.hash) == MOVED)
                // 辅助数据迁移
                tab = helpTransfer(tab, f);

                // 4 非以上三种情况,也就是当前定位的桶不为空,且不在迁移元素,那么锁住这个桶(以桶的第一个元素作为锁对象-分段锁)
                // 要插入的元素在该桶,则替换值(onlyIfAbsent=false);不在该桶,则插入到链表结尾或插入树中;
            else {

                V oldVal = null;

                // 4.1 获取 synchronized 的锁
                synchronized (f) {
                    // 再次检测第一个元素是否有变化,如果有变化则进入下一次循环,从头来过
                    // Unsafe 类的 volatile 式查看值,保证获取到的值都是最新的
                    if (tabAt(tab, i) == f) {

                        // 如果第一个元素的hash值大于等于0(说明不是在迁移,也不是树),那就是桶中的元素使用的是链表方式存储
                        if (fh >= 0) {
                            // 记录链表的元素个数
                            binCount = 1;

                            // 遍历链表,尾节点插入数据
                            for (Node<K, V> e = f; ; ++binCount) {
                                K ek;
                                // 如果发现了相同的 key ,判断是否要进行值覆盖,然后也就可以 break 了
                                if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                                (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;

                                    // 找到了相同 key 的元素,根据情况进行覆盖并退出循环
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }

                                // 到了链表的末尾还没发现相同 key 的元素,那么就将这个新值放到链表的最后,尾插法
                                Node<K, V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K, V>(hash, key,
                                            value, null);
                                    break;
                                }
                            }
                        }
                        // 树结构
                        else if (f instanceof TreeBin) {
                            Node<K, V> p;
                            // 记录树中元素个数为 2「注意,树的情况没有进行累加」
                            binCount = 2;
                            // 调用红黑树的插入方法插入元素,如果成功则返回 null,否则返回找到的节点
                            if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key,
                                    value)) != null) {
                                oldVal = p.val;

                                // 找到了相同 key 的元素,根据情况进行覆盖并退出循环
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }

                // 如果 binCount不为0,说明成功插入了元素或者寻找到了元素
                if (binCount != 0) {
                    // 如果链表元素个数 >= 8 ,那么尝试进行链表树化「只有数组容量 >= 64 时才会真正进行树化,否则优先扩容」
                    // 因为上面把元素插入到树中时,binCount只赋值了2,并没有计算整个树中元素的个数,所以不会重复树化
                    if (binCount >= TREEIFY_THRESHOLD)
                        // 树化
                        treeifyBin(tab, i);

                    // 如果要插入的元素已经存在,那么把旧值返回
                    if (oldVal != null)
                        return oldVal;

                    // 退出外层大循环,流程结束
                    break;
                }
            }
        }

        // 如果是插入元素的情况,则:累加元素个数 & 判断扩容
        addCount(1L, binCount);

        // 成功插入元素返回 null
        return null;
    }

从上述新增元素的源码中可以看到,如果将并发控制的逻辑剔除,剩下的逻辑和 HashMap 的 put 方法基本类似。主干流程如下:

  1. 不允许 key 或 value 为 null;
  2. 如果桶数组未初始化,执行初始化流程,然后再尝试添加元素;
  3. 如果待插入的元素所在的桶为空,则使用 CAS 尝试将元素放到该桶中;
  4. 如果数组正处于扩容过程,那么当前线程一起加入到扩容的过程中;
  5. 如果待插入的元素所在的桶不为空,且没有迁移元素,那么锁住这个桶,锁粒度就是桶元素对象;
    • 如果当前桶中元素以链表方式存储,则在链表中寻找该元素或者尾插入元素
    • 如果当前桶中元素以红黑树方式存储,则在红黑树中寻找该元素或者插入元素;
  6. 针对链表的情况判断是否需要进行树化;
  7. 如果要插入的元素(元素 key)存在,则返回旧值,此时不需要累加元素个数以及判断容量情况;
  8. 如果要插入的元素(元素 key)不存在,则累加元素个数,并检查是否需要扩容;

了解了添加元素的主干逻辑后,下面我们对上述方法中针对并发控制的关键点进行说明

  • 使用自旋结合CAS的方式完成新增元素操作;
  • 使用Unsafe 类的 volatile 式的获取方法查看元素,保证每次获取到的元素都是最新的。注意,虽然桶数组 table 使用了 volatile 修饰,但也只能保证其引用的可见性,并不能确保其数组中的元素是否是最新的,所以通过 Unsafe 类的 volatile 式获取到最新的元素;
  • 协同扩容流程优先,新增元素流程靠后;
  • 使用 synchronized 实现分段锁,完成新增元素的流程。注意,这里将 key 映射的桶元素作为锁对象,达到缩小锁范围的目的。在 hash 不冲突的情况下,可同时支持 n 个线程同时 put 操作,n 为桶数组的大小。

initTable() - 初始化数组

初始化数组流程是在新增元素时进行的,通过对新增元素的源码分析后我们知道,初始化数组流程是不受锁的保护的,那么该方法是如何处理并发问题的呢?下面我们一起来看看源码是如何实现的。

 private final Node<K, V>[] initTable() {
        Node<K, V>[] tab;
        int sc;

        // 每次循环都获取最新的 Node 数组引用
        // 初始化数组时,sizeCtl 为数组容量大小,默认为 0;如果 sizeClt > 0 ,那使用的是带有初始化容量的构造方法
        while ((tab = table) == null || tab.length == 0) {

            // 如果 sizeCtl < 0 ,说明其它线程正在进行初始化或扩容,那么就让出 CPU,从运行状态回到就绪状态
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin 初始化失败,只是旋转

                // CAS 一下,将 sizeCtl 设置为 -1,代表抢到了初始化数组的资格,当前线程进入初始化,成功后退出循环。
                // 如果原子更新失败则说明有其它线程先一步进入初始化了,则进入下一次循环,如果下一次循环时还没初始化完毕,则sizeCtl<0进入上面if的逻辑让出CPU,如果初始化完毕退出循环;
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {

                    // 再次检查table是否为空
                    if ((tab = table) == null || tab.length == 0) {

                        // 如果 sc 为 0,则使用 DEFAULT_CAPACITY 默认初始容量是 16
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;

                        // 创建数组,长度为 16 或初始化时提供的长度(也是 2^N)
                        @SuppressWarnings("unchecked")
                        Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];

                        // 将创建的数组赋值给 table
                        table = tab = nt;

                        // 设置 sc 为数组长度的 0.75 倍
                        // n - (n >>> 2) = n - n/4 = 0.75 * n
                        sc = n - (n >>> 2);
                    }

                } finally {
                    // 由于这里只会有一个线程在执行,直接赋值即可,没有线程安全问题。把sc赋值给sizeCtl,这时存储的是扩容阈值
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

初始化数组的主要过程如下:

  1. 使用 CAS 设置 sizeCtl 为 -1 ,控制只有一个线程初始化桶数组,CAS 失败的线程会等待桶数组初始化完毕;
  2. sizeCtl 在初始化后存储的是桶数组的扩容阈值,其中扩容阈值的计算是固定的,大小为桶数组大小的 0.75 倍。这里的负载因子和扩容阈值相当于都是固定了,这也正是没有使用 threshold 和 loadFactor 属性而是使用 sizeCtl 控制数组的原因。

可见,就算有多个线程同时进行 put 操作,在初始化数组时使用了乐观锁 CAS 操作来决定哪个线程有资格进行初始化,其它线程只能等待数组初始化完成。处理并发点如下:

  • 桶数组 table 使用了 volatile 修饰,保证获取到的都是最新的值;
  • 数组控制 sizeCtl 属性使用了 volatile 修饰,保证线程间的可见性;
  • 使用 CAS 操作保证设置 sizeCtl 标记的原子性,只有一个线程能设置成功,没有设置成功的线程,等待数组初始化完成即可;

addCount() - 计数并判断扩容

每次添加元素后(注意,是添加不是覆盖),集合中的元素数需要加 1,然后判断是否达到扩容阈值,达到了需要进行扩容或协助迁移元素。

    /** 
     * @param x     要累加的数量
     * @param check 如果<0,不检查调整大小;如果<= 1,只检查是否无竞争;
     */
    private final void addCount(long x, int check) {

        /*----------------------- 1、累加元素个数  ---------------------------*/

        // 这里使用的思想和 LongAdder 类很类似。
        CounterCell[] as;
        long b, s;

        // 统计元素个数的操作
        if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {

            /* 执行到这里,说明:
            1.counterCells被初始化完成了,不为null
            2.增加数量到 baseCount 失败了,存在线程竞争,接下来尝试增加到当前线程映射到的桶上
            3.先优先尝试把数量加到 baseCount 上,如果失败再加到对应的 CounterCell 上
            */
            CounterCell a;
            long v;
            int m;

            //标志是否存在竞争
            boolean uncontended = true;

            // 在计数桶数组中随机选一个计数桶,然后使用CAS操作将此计数桶中的value+1
            // 1 先判断计数桶是否还没初始化,也就是 as==nul,进入语句
            // 2 判断计数桶长度是否为空或,若是进入语句块
            // 3 这里做了一个线程变量随机数,与上桶大小-1,若桶的这个位置为空,进入语句块
            // 4 到这里说明桶已经初始化了,且随机的这个位置不为空,尝试CAS操作使桶加1,失败进入语句块
            if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended =
                            U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {

                // 若CAS操作失败,证明有竞争,进入fullAddCount方法
                // 失败几次,那么就对counterCells进行扩容,以减少多个线程hash到同一个桶的概率
                fullAddCount(x, uncontended);

                // 返回
                return;
            }

            if (check <= 1)
                return;

            // 计算元素个数
            s = sumCount();
        }

        /*------------------ 2、判断是否需要进行扩容 ----------------*/

        if (check >= 0) {
            Node<K, V>[] tab, nt;
            int n, sc;

            // 如果元素个数达到了扩容阈值,则进行扩容。
            while (s >= (long) (sc = sizeCtl) && (tab = table) != null &&
                    (n = tab.length) < MAXIMUM_CAPACITY) {

                // rs 为要扩容容量为 n 的数组的一个标识,如数组容量 n=16,那 rs=32795
                int rs = resizeStamp(n);

                // sc < 0 说明有线程正在迁移
                if (sc < 0) {

                    //  判断扩容是否已经完成了,如果完成则退出循环
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                        break;

                    // 扩容未完成,则当前线程加入迁移元素流程中,并把扩容线程数加 1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);

                    // 不为负数,则为第一个迁移的线程,此时多了创建新数组的操作
                    // sizeCtl 的高 16 位存储 rs 这个标识
                    // sizeCtl 的低 16 位存储扩容线程数,刚开始为2,表示有一个线程正在迁移,如果为3,代表2个线程正在迁移以此类推…
                } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))

                    // 进入扩容并迁移元素的逻辑,传入 nextTab 为 null 就会触发创建新数组进而扩容
                    transfer(tab, null);

                // 重新计算元素个数
                s = sumCount();
            }
        }
    }

从上面的源码中也可以看到,上述方法包含两部分的逻辑,一个是记录集合中元素个数,另一个是达到扩容阈值执行扩容逻辑。下面我们分别介绍。

记数

在 ConcurrentHashMap 中,快速统计元素个数是非常重要的,不仅使用方会关注集合中元素个数,而且集合内部需要根据元素个数来判断是否需要扩容。使用的思想和 LongAdder 类似,如下图:

    /**
     * 桶是由线程对应的,多个线程可以对应一个桶
     */
    @sun.misc.Contended
    static final class CounterCell {
        /**
         * 当前桶对应的元素数量,可见性保证
         */
        volatile long value;

        CounterCell(long x) {
            value = x;
        }
    }

在设计时,使用了分而治之的思想,除了基础的计数,将每一个计数都分散到各个 CounterCell 对象(桶)中,使竞争最小化。总的来说,ConcurrentHashMap 的大小分为两部分:

  • 基础计数 baseCount: 在记录元素个数时优先 CAS 更新该属性,如果 CAS 失败才会采用分治桶计数的方式。即在线程竞争不大的时候,直接使用CAS操作baseCount值即可;
  • 分治桶计数 CounterCell[]: 若出现了CAS操作失败的情况,则证明此时有线程竞争了,计数方式转变为分而治之的桶计数方式,也就是线程映射到不同的 CounterCell;

了解了 ConcurrentHashMap 计数的思想后,下面看看源码是如何实现的。

    private final void addCount(long x, int check) {

        /*----------------------- 1、累加元素个数  ---------------------------*/
        CounterCell[] as;
        long b, s;

        // 统计元素个数的操作
        if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {

            /*------------- 执行到这里,说明:
            1.counterCells被初始化完成了,不为null
            2.增加数量到 baseCount 失败了,存在线程竞争,接下来尝试增加到当前线程映射到的桶上
            3.先优先尝试把数量加到 baseCount 上,如果失败再加到对应的 CounterCell 上
            ----------------*/
            CounterCell a;
            long v;
            int m;

            //标志是否存在竞争
            boolean uncontended = true;

            // 在计数桶数组中根据当前线程选一个计数桶,然后使用CAS操作将此计数桶中的value+1
            // 1 先判断计数桶是否还没初始化,也就是 as==nul,进入语句
            // 2 判断计数桶长度是否为空或,若是进入语句块
            // 3 这里做了一个线程变量随机数,与上桶大小-1,若桶的这个位置为空,进入语句块
            // 4 到这里说明桶已经初始化了,且随机的这个位置不为空,尝试CAS操作使桶加1,失败进入语句块
            if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended =
                            U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {

                //若CAS操作失败,证明有竞争,进入fullAddCount方法
                // 失败几次,那么就对counterCells进行扩容,以减少多个线程hash到同一个桶的概率
                fullAddCount(x, uncontended);

                // 返回
                return;
            }

            if (check <= 1)
                return;

            // 计算元素个数
            s = sumCount();
        }

        //... 省略判断扩容逻辑
  }

从源码中也能看到,在计数时优先使用基础计数,有竞争的情况下会使用分而治之桶计数,对应的逻辑在 fullAddCount 方法中。这里有两种情况,一种是计数桶数组还没有初始化,另外一种是计数数组已经初始化过了,此外,如果计数桶数组存在,优先使用分而治之桶计数。下面我们从源码层面看这两种情况。

初始化计数桶数组
   // See LongAdder version for explanation
    private final void fullAddCount(long x, boolean wasUncontended) {
        // 线程对应的随机值
        int h;
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true;
        }

        boolean collide = false;                // True if last slot nonempty

        // 自旋,更新元素个数
        for (; ; ) {
            CounterCell[] as;
            CounterCell a;
            int n;
            long v;

            // 如果计数单元桶!=null,证明已经初始化过,那么就针对当前线程映射一个计算单元并累加数量
            if ((as = counterCells) != null && (n = as.length) > 0) {
                 //... 略

                // 进入此语句块进行计数桶的初始化
                // CAS设置cellsBusy=1,表示现在计数桶 busy 中,控制并发
            } else if (cellsBusy == 0 && counterCells == as &&
                    U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {

                // 若有线程同时初始化计数桶,由于CAS操作只有一个线程进入这里
                boolean init = false;

                try {

                    //再次确认计数桶为空
                    if (counterCells == as) {
                        //初始化一个长度为2的计数桶
                        CounterCell[] rs = new CounterCell[2];
                        //h为当前线程相关的一个随机数,& 1 代表,在 0、1中随机的一个
                        //也就是在0、1下标中随便选一个计数桶,x=1,放入1的值代表增加1个容量
                        rs[h & 1] = new CounterCell(x);
                        //将初始化好的计数桶赋值给ConcurrentHashMap中的属性
                        counterCells = rs;
                        init = true;
                    }
                } finally {
                    //最后将busy标识设置为0,表示不busy了
                    cellsBusy = 0;
                }

                // 初始化计数桶数组后退出循环
                if (init)
                    break;

                //若有线程同时来初始化计数桶,则没有抢到busy资格的线程就CAS递增基础计数 baseCount
            } else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }

可以看到,当更新基础计数器 baseCount 失败后,获取到初始化资格的线程会初始化一个大小为 2 的计算桶数组。完成了计数桶数组的初始化后,在之后的计数都将会使用计算桶来计数

注意:这里同样使用了 CAS 保证并发安全,如果在初始化时,有并发竞争,那么竞争失败的线程会 CAS 更新基础计数器 baseCount,尽可能提高并发度。

扩容计数桶

从上面初始化计数桶数组中我们知道,计数桶的长度为 2,在并发大的时候竞争依然会很大,因此就需要计数桶数组的扩容,这个过程就对应 fullAddCount 方法的另一个分支。

    private final void fullAddCount(long x, boolean wasUncontended) {
        // 线程对应的随机值,用于对计数桶数组取模定位计算桶
        int h;
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true;
        }

        boolean collide = false;                // True if last slot nonempty

        // 自旋,更新元素个数
        for (; ; ) {
            CounterCell[] as;
            CounterCell a;
            int n;
            long v;

            // 如果计数单元桶!=null,证明已经初始化过,那么就针对当前线程映射一个计算桶并计算元素数
            if ((as = counterCells) != null && (n = as.length) > 0) {

                //从计数桶数组中定位当前线程映射的计数桶,若为null表示该桶为空,需要创建一个
                if ((a = as[(n - 1) & h]) == null) {

                    //判断计数单元 的 busy 状态(用于控制并发),为 0 表示不忙碌,即没有线程竞争
                    if (cellsBusy == 0) {            // Try to attach new Cell

                        // 创建一个计数单元桶,初始值为传入的个数 x
                        CounterCell r = new CounterCell(x); // Optimistic create

                        // CAS 操作 busy 为 1,标记计数桶处理中
                        if (cellsBusy == 0 &&
                                U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs;
                                int m, j;
                                // 再检查一次计数桶为null
                                if ((rs = counterCells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {

                                    //将刚刚创建的计数桶赋值给对应位置
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                // 处理完成后,标记计数桶处理完成
                                cellsBusy = 0;
                            }

                            // 如果是创建计数桶的情况,直接退出循环即可
                            if (created)
                                break;

                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;

                /*------------- 走到这里,说明当前线程映射的计数桶不为空 ----------------*/

                } else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash

                    //走到这里代表计数桶不为null,尝试递增计数桶
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;

                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale

                    //若CAS操作失败了,到了这里,会先进入一次,然后再走一次刚刚的for循环
                    //若是第二次for循环,collide=true,则不会走进去
                else if (!collide)
                    collide = true;

                    /* 走到这里,说明竞争较大,多个线程都映射到了一个计数桶,导致更新计数失败较多,那么就需要通过扩容计数桶数组来减小竞争 */
                    //一个线程若进行了多次CAS操作计数桶失败,那么就需要进行计数桶扩容逻辑,完成扩容后重新计数
                else if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {

                    try {
                        // 确认计数桶还是同一个
                        if (counterCells == as) {// Expand table unless stale

                            //将长度扩大到2倍
                            CounterCell[] rs = new CounterCell[n << 1];

                            // 遍历旧计数桶,直接将旧计数桶拿过来填充
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }

                    } finally {
                        // 完成扩容后,标记计数桶处理完成
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }

                // 重新计算当前线程的随机值,用于定位对应的计数桶
                h = ThreadLocalRandom.advanceProbe(h);

                // 进入此语句块进行计数桶的初始化
                // CAS设置cellsBusy=1,表示现在计数桶 busy 中
            } else if (cellsBusy == 0 && counterCells == as &&
                    U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {

                   //... 省略

                //若有线程同时来初始化计数桶,则没有抢到busy资格的线程就先来CAS递增baseCount
            } else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }

在 CAS 操作计数桶失败多次之后,会先进行扩容计数桶数组流程,扩容完毕后再次尝试 CAS 操作映射的计数桶。一般扩容后再次 CAS 操作大都会成功,除非并发太大了。

统计元素个数
    final long sumCount() {
        // 计算所有计数桶及 baseCount 的数量之和
        CounterCell[] as = counterCells;
        CounterCell a;
        long sum = baseCount;
        if (as != null) {
            // 遍历 CounterCell ,不保证可见性
            for (int i = 0; i < as.length; ++i) {
                // 使用 getObjectVolatile 方法保证可见性
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

由于集合中的元素存储在两个部分中,因此需要将两部分的数量累加起来。

注意:上述统计元素个数的方法是弱一致性的,因为在遍历 CounterCell 数组时,不能保证其内部属性 value 的可见性。

计数小结

计数时优先利用 CAS 操作基础计数器 baseCount,在竞争不大时一般都能成功,此时性能没有啥大的损失;

若存在线程竞争基础计数器 baseCount,则初始化计数桶数组来分摊竞争。在初始化计数桶数组的过程中也存在竞争的话,通过 CAS 保证只有一个线程有资格进行初始化,没有抢到初始化资格的线程直接尝试 CAS 操作基础计数器的方式完成计数,没有完成计数的线程会进行自旋,尽可能最大化并发度

如果已经存在计数桶数组,那么使用计数桶计数,采用分而治之的方式来计数。由于计数桶数组能同时提供计数的桶数量有限,如果还是存在频繁失败的情况,那么就对计数桶数组扩容,并发安全的处理同样使用 CAS 来保证,这样就可以支持同时更多线程并发计数了。其中,在为线程映射计数桶时,利用位运算和随机数以负载均衡的方式将线程计数请求接近均匀的分散到不同的计数桶中

判断扩容

    /** because resizings are lagging additions.
     *
     * @param x     要累加的数量
     * @param check 如果<0,不检查调整大小;如果<= 1,只检查是否无竞争;
     */
    private final void addCount(long x, int check) {

        /*----------------------- 1、累加元素个数  ---------------------------*/
        CounterCell[] as;
        long b, s;

        //... 省略

        // 计算元素个数
        s = sumCount();

        /*------------------ 2、判断是否需要进行扩容 ----------------*/

        if (check >= 0) {
            Node<K, V>[] tab, nt;
            int n, sc;

            // 如果元素个数达到了扩容阈值,则进行扩容。
            while (s >= (long) (sc = sizeCtl) && (tab = table) != null &&
                    (n = tab.length) < MAXIMUM_CAPACITY) {

                // rs 为要扩容容量为 n 的数组的一个标识,如数组容量 n=16,那 rs=32795
                int rs = resizeStamp(n);

                // sc < 0 说明有线程正在迁移
                if (sc < 0) {

                    //  判断扩容是否已经完成了,如果完成则退出循环
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                        break;

                    // 扩容未完成,则当前线程加入迁移数据流程中,并把扩容线程数加 1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);

                    // 不为负数,则为第一个迁移的线程,此时多了创建新数组的操作
                    // sizeCtl 的高 16 位存储 rs 这个标识
                    // sizeCtl 的低 16 位存储扩容线程数,刚开始为2,表示有一个线程正在迁移,如果为3,代表2个线程正在迁移以此类推…
                } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))

                    // 进入扩容并迁移元素的逻辑,传入 nextTab 为 null 就会触发创建新数组进而扩容
                    transfer(tab, null);

                // 重新计算元素个数
                s = sumCount();
            }
        }
    }

每次添加元素后,集合中的元素数量加 1。紧接着会判断是否达到扩容阈值,达到的话则进行扩容或协助扩容

如果达到扩容条件,那么会有一个线程 CAS 设置扩容标识位 sizeCtl 成功,此时 sizeCtl 的含义前文已经详细介绍。获取到扩容资格的线程会执行 transfer(tab, null) 方法进入扩容和迁移元素流程,启动扩容流程后进入上述方法的线程会感知到 sizeCtl<0 ,此时如果扩容还没有结束,也会加入到迁移元素的流程中,此时需要注意 sizeCtl 值的更新

tryPresize() - 扩容或迁移元素

除了执行 put 方法时检测到达到扩容阈值会扩容,检测链表是否要进行树化的时候也需要判断可否优先扩容。

private final void treeifyBin(Node<K,V>[] tab, int index) {
  Node<K,V> b; int n, sc;

  if (tab != null) {
    // 若数组长度小于64,则优先扩容
    if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
      // 尝试扩容
      tryPresize(n << 1);

    // 转换为红黑树
    else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
      synchronized (b) {
        //...转换为红黑树的操作
      }
    }
  }
}

treeifyBin 方法会将链表转换为红黑树以增加查找效率,但在这之前会检查数组长度,若小于 64 则会优先做扩容操作。其中尝试扩容的方法和前文中的 addCount 方法的判断是否扩容分支逻辑基本类似,就不再重复说明,源代码注释如下:

    /**
     * 扩容
     */
    private final void tryPresize(int size) {
        // 取 2^n 值
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);
        int sc;

        // 数组没有在初始化或扩容
        while ((sc = sizeCtl) >= 0) {
            Node<K, V>[] tab = table;
            int n;

            // 该分支用于初始化数组,主要用于 putAll 方法添加元素的场景
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }

            // 无需进行扩容
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
                // 进行扩容
            else if (tab == table) {
                // 扩容 table 的 epoch
                int rs = resizeStamp(n);

                // 如果处于扩容过程中
                if (sc < 0) {
                    Node<K, V>[] nt;
                    // 扩容完毕了,退出循环
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                        break;

                    //  CAS 将 sizeCtl 加 1 表示当前线程加入到扩容过程,然后执行 transfer 方法
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);

                    // CAS 设置 sizeCtl 成功后执行 transfer 方法进行扩容
                } else if (U.compareAndSwapInt(this, SIZECTL, sc,
                        (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

transfer() - 扩容或迁移元素

    /**
     * @param tab     旧数组
     * @param nextTab 扩大2倍后的新数组,如果为空说明还没有创建新数组
     */
    private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {
        // 记录旧数组容量
        int n = tab.length, stride;

        // 根据机器CPU核心数来计算,一条线程负责Node数组中多长的迁移量, stride 就是当前线程分到的迁移量
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range


        /*--------------------------------- 1、扩容 -------------------------------------*/

        // nextTab 为空,说明还没开始扩容,就创建一个新桶数组
        if (nextTab == null) {            // initiating
            try {
                // 新桶数组大小是原桶的 2 倍
                @SuppressWarnings("unchecked")
                Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }

            // 扩容期间的新数组
            nextTable = nextTab;
            // 旧数组容量作为迁移下标
            transferIndex = n;
        }



        /*--------------------------------- 2、迁移元素 ---------------------------------*/

        // 新数组大小
        int nextn = nextTab.length;

        // 新建一个 ForwardingNode 类型的标记节点,其 hash 值为 MOVED,并把新桶数组存储在里面,这个对象作为旧数组某个桶迁移完毕的标志。
        ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab);

        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab

        // 从后往前,依次对旧数组每个桶进行元素的迁移
        for (int i = 0, bound = 0; ; ) {
            Node<K, V> f;
            int fh;

            // 2.1 确定要迁移的桶的下标
            // i 的值会从 n-1 依次递减,其中 n 是旧桶数组的大小。比如,i 从 15 开始一直减到 1 这样去迁移元素
            while (advance) {
                int nextIndex, nextBound;

                // i为当前正在处理的Node数组下标,每次处理一个Node节点就会自减1
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;

                    // CAS 下标,保证每个线程获取到正确的下标,支持并发迁移元素
                } else if (U.compareAndSwapInt
                        (this, TRANSFERINDEX, nextIndex,
                                nextBound = (nextIndex > stride ?
                                        nextIndex - stride : 0))) {
                    bound = nextBound;

                    // 确定好小标后,就可以退出确定下标的循环了
                    i = nextIndex - 1;
                    advance = false;
                }
            }

            // 2.2 对下标为 i 的桶进行处理

            // 如果旧数组元素迁移完了
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;

                // 如果所有桶中的元素都迁移完成了,则替换旧桶数组,并设置下一次扩容阈值为新桶数组容量的 0.75 倍
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }

                // 当前线程扩容完成,把扩容线程数-1
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {

                    // 扩容完成,条件成立,也就是此时是最后一个线程走到这里,和首次进行扩容的 sizeCtl 一样
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;

                    // 标记完成
                    finishing = advance = true;

                    // i 重新赋值为 n,这样会再重新遍历一次桶数组,看看是不是都迁移完成了
                    i = n; // recheck before commit
                }

                // 如果桶中无数据,直接在旧桶的 i 位置放入 ForwardingNode 表示该桶已迁移完
            } else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);

                // 如果桶中第一个元素的 hash 值为 MOVED ,说明它是 ForwardingNode 节点,表示该桶已迁移完
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed

                // 迁移 i 位置桶中的元素
            else {

                // 锁定该桶,并迁移元素
                synchronized (f) {

                    // 再次判断当前桶第一个元素是否有修改,也就是可能其它线程先一步迁移了元素
                    if (tabAt(tab, i) == f) {
                        Node<K, V> ln, hn;

                        // 桶中第一个元素 fh >= 0 说明桶是链表形式,这里与 HashMap 迁移算法基本一致,仅做了点优化
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K, V> lastRun = f;
                            for (Node<K, V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }

                            // 看看最后这几个元素归属于低位链表还是高位链表
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            } else {
                                hn = lastRun;
                                ln = null;
                            }

                            // 遍历链表,把hash&n为0的放在低位链表中,不为0的放在高位链表中
                            for (Node<K, V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash;
                                K pk = p.key;
                                V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K, V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K, V>(ph, pk, pv, hn);
                            }

                            // 低位链表的位置不变 - CAS操作
                            setTabAt(nextTab, i, ln);
                            // 高位链表的位置是原位置加n - CAS操作
                            setTabAt(nextTab, i + n, hn);

                            // 使用 Unsafe 的 volatile 式标记该桶已迁移完成,即在该桶中放置 ForwardingNode 类型的元素以标记该桶迁移完成
                            setTabAt(tab, i, fwd);

                            // advance为true,返回上面进行 --i 操作
                            advance = true;

                            // 如果桶中第一个元素是树节点,也是一样分化成两颗树,也是根据hash&n为0放在低位树中,不为0放在高位树中, 基本同 HashMap
                        } else if (f instanceof TreeBin) {
                            TreeBin<K, V> t = (TreeBin<K, V>) f;
                            TreeNode<K, V> lo = null, loTail = null;
                            TreeNode<K, V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K, V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K, V> p = new TreeNode<K, V>
                                        (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                } else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }

                            // 如果分化的树中元素个数小于等于6,则退化成链表
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K, V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K, V>(hi) : t;

                            // 低位树的位置不变 - CAS操作
                            setTabAt(nextTab, i, ln);
                            // 高位树的位置是原位置加n - CAS操作
                            setTabAt(nextTab, i + n, hn);

                            // 使用 Unsafe 的 volatile 式标记该桶已迁移,即在桶中放置 ForwardingNode 类型的元素,标记该桶迁移完成
                            setTabAt(tab, i, fwd);
                            // advance为true,返回上面进行--i操作
                            advance = true;
                        }

                    }
                }
            }
        }
    }

上述方法实现了扩容和迁移元素的逻辑。下面我们先对关键步骤进行说明,然后再对并发处理进行分析。

  1. 如果还没开始扩容,那么就先进行扩容,扩容后的数组大小是旧数组的 2 倍;
  2. 创建完新的数组后,就可以开始将旧数组中的元素迁移到新数组中了,迁移的方式是从后往前逐个桶迁移:
    • 为当前执行的线程分配迁移的桶,该线程会根据具体情况处理该桶;
    • 判断是否已经迁移完毕,迁移完毕就用新数组替换旧数组,并根据新数组的容量以 0.75 计算扩容阈值并赋予 sizeCtl,扩容过程结束;
    • 判断当前桶是否为空,为空就直接 CAS 填充 ForwardingNode 对象,表示该桶迁移完了;
    • 判断当前桶是否为 ForwardingNode 对象(也就是元素的 hash 值为 MOVED) ,如果是则表示该桶已经迁移完了,跳过即可;
    • 以上条件都不成立,那么就锁定该桶,然后将该桶元素迁移到新数组中,迁移方式基本同 HashMap 的迁移方式,主要区别是在迁移完成后,使用 ForwardingNode 对象打标记,表示该桶已经完成了迁移;

在扩容时,ConcurrentHashMap 支持并发扩容,在扩容过程中同时支持查找元素。这种无阻塞算法,将并发度大大的提高了。下面我们就对并发处理进行分析。

  1. 临时的新数组 nextTable 使用 volatile 修饰,保证可见性;
  2. 使用 ForwardingNode 类型对象封装新数组,这个对象非常重要,不仅作为旧数组某个桶迁移完成的标记,还支持了扩容时的元素查找
  3. 使用 CAS 为不同的线程分配待迁移的不同桶,实现并发扩容;
  4. 使用 synchronized 分段锁保证迁移桶元素时的安全;

helpTransfer() - 辅助数据迁移

假设在进行 put 或 remove 操作时正处于迁移的过程,映射到的桶已经被迁移了(Node.hash==MOVED),此时该怎么办呢?ConcurrentHashMap 采用的是辅助扩容的方式,完成扩容后再执行对应的写操作

    /**
     * 
     * 辅助扩容(迁移元素)
     * 
     *  f 所在桶元素迁移完了才去协助迁移其它桶元素
     *
     * @param tab 旧数组
     * @param f   一般为 ForwardingNode 类型
     */
    final Node<K, V>[] helpTransfer(Node<K, V>[] tab, Node<K, V> f) {
        Node<K, V>[] nextTab;
        int sc;

        // 如果桶数组不为空 && 当前桶第一个元素 f 是 ForwardingNode 类型 && 封装的 nextTab(新数组) 不为空,说明当前桶已经迁移完毕了,才去帮忙迁移其它桶的元素。
        if (tab != null && (f instanceof ForwardingNode) &&
                (nextTab = ((ForwardingNode<K, V>) f).nextTable) != null) {

            // 获取扩容旧数组的标记位,类似 epoch 的概念
            int rs = resizeStamp(tab.length);

            // sizeCtl < 0 && nextTable 不为空,说明集合正处于扩容过程
            while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {

                // 协助迁移元素前,再次判断扩容是否结束,如果结束直接返回,无需协助迁移
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;

                // 扩容线程数加 1 ,标示多一个线程参与协助迁移
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    // 当前线程帮忙迁移元素
                    transfer(tab, nextTab);
                    break;
                }
            }

            // 帮忙迁移完元素后,返回新数组
            return nextTab;
        }

        // 非扩容过程,返回旧数组
        return table;
    }

辅助扩容是为了支持并发扩容的,当检测到定位的桶是 MOVED 状态时,那么当前线程会先辅助一起完成扩容,然后再继续执行写操作,这个逻辑一般需要自旋支持,因此我们可以看到前面介绍的 put 方法有一个自旋逻辑,在后面要介绍的 remove 方法也是这么做的。下面我们对辅助扩容方法中关键点进行分析:

  1. 判断是否在进行扩容的条件是 sizeCtl < 0 && nextTable != null,这两个条件缺一不可。sizeCtl < 0 的情况有两种,一种是在初始化数组时,通过 CAS 设置为 -1 才能拿到初始化资格;另一种是在扩容的时候,通过 CAS 设置为一个负数,高16位和低16位分别表示扩容旧数组的 epoch,参与扩容的线程数,这个前文已经详细说明。nextTable 只有在扩容期间才不会为 null,扩容完成后会重置为 null;
  2. 在迁移元素前,需要再次判断扩容是否结束,结束的情况下旧直接返回即可;没有结束的话,就更新 sizeCtl 的值,也就是增加参与扩容线程数;
  3. 如果参与了协助扩容,就返回扩容完成后的新数组;如果没有参与扩容,那么还是返回旧数组;

remove() - 删除元素

删除元素和添加元素逻辑类似,都是先定位对应的桶,如果该桶被标记迁移了,那么就先辅助迁移元素,完成迁移后继续执行写操作;如果桶没有被标志迁移,那么就采用 synchronized 分段锁锁住整个桶,然后执行对应的写操作。

    public V remove(Object key) {
        // 调用替换节点方法
        return replaceNode(key, null, null);
    }

   final V replaceNode(Object key, V value, Object cv) {
        // 计算 hash
        int hash = spread(key.hashCode());

        // 自旋
        for (Node<K, V>[] tab = table; ; ) {
            Node<K, V> f;
            int n, i, fh;

            // 如果数组为空,或目标 key 所在的桶不存在,跳出循环返回 null
            if (tab == null || (n = tab.length) == 0 ||
                    (f = tabAt(tab, i = (n - 1) & hash)) == null)
                break;

                // 如果桶中第一个元素的 hash 值为 MOVED ,说明这个桶已经迁移完了,但是其它的桶不知道,因此当前线程协助迁移,协助扩容完成后再进行删除操作
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);

                // 查找元素并执行删除
            else {
                V oldVal = null;
                // 标记是否处理过
                boolean validated = false;

                // 加锁(锁对象使用桶的第一个元素,达到分段的效果)
                synchronized (f) {
                    // 再次验证当前桶的第一个元素是否被修改
                    if (tabAt(tab, i) == f) {

                        // 桶的第一个元素的 hash >= 0 表示是链表节点
                        if (fh >= 0) {
                            validated = true;

                            // 遍历链表寻找目标节点
                            for (Node<K, V> e = f, pred = null; ; ) {
                                K ek;
                                if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                                (ek != null && key.equals(ek)))) {
                                    // 找到目标节点
                                    V ev = e.val;

                                    if (cv == null || cv == ev ||
                                            (ev != null && cv.equals(ev))) {
                                        oldVal = ev;

                                        // 如果 value 不为空则替换旧值
                                        if (value != null)
                                            e.val = value;
                                        else if (pred != null)
                                            // 如果前置节点不为空,删除当前节点
                                            pred.next = e.next;
                                        else
                                            // 如果前置节点为空,说明是桶中第一个元素,删除即可
                                            setTabAt(tab, i, e.next);
                                    }
                                    break;
                                }
                                pred = e;


                                // 遍历到链表尾部还没找到元素,跳出循环
                                if ((e = e.next) == null)
                                    break;
                            }

                            // 桶的第一个元素类型为树,则遍历树找目标节点
                        } else if (f instanceof TreeBin) {
                            validated = true;
                            TreeBin<K, V> t = (TreeBin<K, V>) f;
                            TreeNode<K, V> r, p;
                            if ((r = t.root) != null &&
                                    (p = r.findTreeNode(hash, key, null)) != null) {

                                // 找到目标节点
                                V pv = p.val;
                                if (cv == null || cv == pv ||
                                        (pv != null && cv.equals(pv))) {
                                    oldVal = pv;
                                    // 如果value不为空则替换旧值
                                    if (value != null)
                                        p.val = value;
                                        // 如果 value 为空,从树中删除
                                    else if (t.removeTreeNode(p))

                                        // 判断是否退化成链表
                                        //  t.removeTreeNode(p)这个方法返回true表示删除节点后树的元素个数较少,此时退化成链表
                                        setTabAt(tab, i, untreeify(t.first));
                                }
                            }
                        }
                    }
                }

                // 如果处理过,就不需要循环了,可以退出了
                if (validated) {
                    // 如果找到了元素,返回其旧值
                    if (oldVal != null) {
                        // 如果要替换的值为空,元素个数减 1,不考虑调整数组大小
                        if (value == null)
                            addCount(-1L, -1);
                        return oldVal;
                    }
                    break;
                }
            }
        }

        // 没找到元素返回空
        return null;
    }

删除方法和新增方法类似,就不再说明。需要注意的一点是,如果定位到的桶是以红黑树存储的,那么删除元素后需要判断是否要退化为链表。

get() - 获取元素

一般来说,对于 get 操作是没有线程安全问题的,只有可见性的问题,只需要确保获取的元素是线程间可见的即可,针对这个问题 ConcurrentHashMap 使用 volatile 关键字修饰节点的值 val 和后继指针来保证可见性。

除了可见性问题,ConcurrentHashMap 还需要解决一个问题,这个问题是由其支持并发扩容带来的。也就是说,在扩容期间访问元素的时候,集合中的元素可能分布在新旧两个数组中,针对这种情况的访问策略是,先访问旧数组,如果旧数组相应位置的桶已经被标记迁移完成了,那么就需要继续访问新数组以查询目标元素,否则只需访问旧数组即可(注意,即使定位到的旧数组桶处于迁移过程也没关系,在完成迁移之前该桶中的元素之间的关联是不变的)。而这个策略实现的核心是 ForwardingNode 类型对象,它作为旧数组某个桶迁移完成的标记的同时,内部封装了新数组,并且重写了查找元素的方法,而这个重写的查找方法就是从新数组中查找元素的。

下面我们对源码进行分析,看看 ConcurrentHashMap 具体是如何实现的。

    public V get(Object key) {
        Node<K, V>[] tab;
        Node<K, V> e, p;
        int n, eh;
        K ek;

        // 计算 hash
        int h = spread(key.hashCode());

        // 如果元素所在的桶存在且里面有元素
        if ((tab = table) != null && (n = tab.length) > 0 &&
                 // 使用 Unsafe 的 volatile 式获取节点,保证最新值
                (e = tabAt(tab, (n - 1) & h)) != null) {

            // 如果第一个元素就是要找的元素,直接返回
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;

                // hash < 0 说明第一个元素是树类型,或者数组处于扩容过程(元素类型是 ForwardingNode)
            } else if (eh < 0)
                // 使用find寻找元素,find的寻找方式依据Node的不同子类有不同的实现方式:
                // 处于扩容过程:java.util.concurrent.ConcurrentHashMap.ForwardingNode.find
                // 非扩容过程,也就是树类型:java.util.concurrent.ConcurrentHashMap.TreeNode.find
                return (p = e.find(h, key)) != null ? p.val : null;

            // 执行到这里,说明元素所在的桶中是链表,遍历链表即可
            while ((e = e.next) != null) {
                if (e.hash == h &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }

        // 没有找到
        return null;
    }
  1. 在访问元素时会计算 key 的 hash 值;
  2. 如果定位到的桶元素就是要查找的元素,那么直接返回对应的 value;
  3. 如果定位到的桶元素不是要查找的元素,那么就需要做出判断,这个判断非常重要,对应三种情况:
    • 如果桶元素的 hash < 0 && hash == -1,那么说明桶元素的类型是 ForwardingNode,此时集合处于扩容过程,查找元素调用 ForwardingNode 实现的查找方法;
    • 如果桶元素的 hash < 0 && hash == -2,那么说明桶元素的类型是 TreeNode,是一颗红黑树,查找元素调用 TreeNode 实现的查找方法;
    • 不是前面两种情况,那么桶中就是链表,遍历链表查找即可;
  4. 没有定位到对应的桶,直接返回 null;

关于链表和红黑树查找没有可说的,下面我们对处于扩容状态的查找进行分析。

    /**
     * 扩容期间作为桶迁移完成的标志元素
     */
    static final class ForwardingNode<K, V> extends Node<K, V> {

        /**
         * 扩容期间的新数组。保存新Node数组的引用是为了支持在迁移的过程不阻塞地查找值。
         */
        final Node<K, V>[] nextTable;

        /**
         * 构造方法
         *
         * @param tab 新数组
         */
        ForwardingNode(Node<K, V>[] tab) {
            // hash 固定为 MOVED 
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }

        /**
         * 扩容期间查找元素,那么从扩容期间的新数组查找
         *    
         * @param h
         * @param k
         * @return
         */
        Node<K, V> find(int h, Object k) {
            // loop to avoid arbitrarily deep recursion on forwarding nodes
            outer:
            for (Node<K, V>[] tab = nextTable; ; ) {
                Node<K, V> e;
                int n;

                // 新的数组为空,或者对应的桶为空,直接返回
                if (k == null || tab == null || (n = tab.length) == 0 ||
                        (e = tabAt(tab, (n - 1) & h)) == null)
                    return null;

                // 定位桶查找目标元素
                for (; ; ) {
                    int eh;
                    K ek;

                    // 判断是否是要查找的元素
                    if ((eh = e.hash) == h &&
                            ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;

                    // 非链表的情况
                    if (eh < 0) {
                        // 如果元素是 ForwardingNode ,说明该桶的元素已经迁移到新的数组中,那么需要从新的数组中查找
                        if (e instanceof ForwardingNode) {
                            tab = ((ForwardingNode<K, V>) e).nextTable;
                            continue outer;

                            // 如果 e 元素对应的桶还没有迁移,则根据红黑树查找
                        } else
                            return e.find(h, k);
                    }

                    // 遍历链表
                    if ((e = e.next) == null)
                        return null;
                }
            }
        }
    }

至此,**ForwardingNode** 这个特殊节点的作用已经介绍完毕了,下面再进行总结:

  • 标识旧数组某个桶迁移完成,以支持并发扩容;
  • 封装新数组,以支持在迁移的过程中非阻塞查找值;

浅谈Redis rehash

如果了解 Redis 的扩容机制的话,你会发现两者有异曲同工之处。

  • 扩容操作:Redis 实现的是渐进式 rehash,主线程以一定的步长迁移桶元素;ConcurrentHashMap 实现的是辅助扩容,多个线程可以并发迁移桶元素;
  • 扩容期间查找元素:Redis 在扩容的时候使用到了两个哈希表,会先从 0 号哈希表查找,没有再从 1 号哈希表查找;ConcurrentHashMap 在扩容过程也同时使用了两个桶数组,查找的时候也会先到旧数组中定位桶元素,如果桶元素是迁移标识元素 ForwardingNode,那么会再从新数组中查找;

JDK1.7 与 JDK1.8的区别

JDK 1.7 的核心实现也采用了分段锁的技术,每个段 Segment 中对应一个小 HashMap,Segment 通过继承 ReentrantLock 来保证其内部的 HashMap 安全。

JDK 1.8 废弃了 Segment 分段锁的实现,采用多种 CAS + Node 级别的 synchronized 锁来保证并发安全,整体减小了锁竞争,减小程序同步的部分。

一致性问题

这里我们讨论的一致性问题是针对 put 和 get 方法来说的。

JDK 1.7 的实现中 ConcurrentHashMap 表现为弱一致性,因为写操作只是简单赋值操作,读操作也是简单读操作。具体实现如下:

写操作:

V put(K key, int hash, V value, boolean onlyIfAbsent) {
  lock();
  ...
    tab[index] = new HashEntry<K,V>(...);
  ...
    unlock();
}

读操作:

V get(Object key, int hash) {
    if (count != 0) { // read-volatile
        HashEntry<K,V> e = getFirst(hash);
        while (e != null) {
            if (e.hash == hash && key.equals(e.key)) {
                V v = e.value;
                if (v != null)
                    return v;
                return readValueUnderLock(e); // recheck
            }
            e = e.next;
        }
    }
    return null;
}

在进行写操作时,先加锁,然后直接给数组中的桶赋值,此时虽然数组 tab 使用 vlolatile 修饰保证可见性,但是不保证元素的可见性,因此无法保证能及时读取到写入的元素。由此可以看出,在 JDK 1.8 之前的读写操作是弱一致性的。

要保证读写强一致性,一般可以加锁,或者使用 volatile 修饰变量。如果采用加锁的方式,会导致 ConcurrentHashMap 的性能问题,这也违背了设计的初衷,因此可以考虑使用 volatile 来保证可见性。但是我们需要知道的是,volatile 可以修饰变量却不能修饰数组中的元素,也就是修饰数组只能保证数组的地址可见性。

JDK 1.8 正是从上述点进行突破,使用了 Unsafe 类来实现数组元素的可见性。

写操作:

final V putVal(K key, V value, boolean onlyIfAbsent) {
      ...
    for (Node<K,V>[] tab = table;;) {
      // 使用 Unsafe 类 volatile 式的操作查看值
      if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        // 如果数组该位置没有节点,则使用一次 CAS 操作将这个新值放入其中即可。  
        if (casTabAt(tab, i, null,
                     new Node<K,V>(hash, key, value, null)))
          break;
      }
    }
      ...
}

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
  return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

读操作:

public V get(Object key) {
   ...
   // 如果元素所在的桶存在且里面有元素 
  if ((tab = table) != null && (n = tab.length) > 0 &&
      // 使用 Unsafe 的 volatile 式获取节点,保证最新值
      (e = tabAt(tab, (n - 1) & h)) != null) {
    ...
  }
  return null;
}

@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
  return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
  • 写操作使用 CAS 更新内存中数组中的元素;
  • 读操作使用 Unsafe 的 volatile 式获取内存中数组中的元素;

由于读写都是直接对内存操作,所以通过上述方式可以保证 put 和 get 的强一致性。ConcurrentHashMap 是通过 compareAndSwapObject 来取代对数组元素直接赋值的操作,通过 getObjectVolatile 来弥补无法对数组元素进行 volatile 读的缺陷。

需要注意,JDK 1.8 下的 ConcurrentHashMap 并非所有操作都是强一致性的,其中计算集合元素个数的 size() 方法就是弱一致性的,具体原因通过前面的源码就能看出来。

小结

ConcurrentHashMap 运用各类 CAS 操作保证并发安全,并在关键逻辑位置使用 synchronized 分段锁以小范围进行锁定保证安全。

在扩容时,ConcurrentHashMap 支持多线程并发扩容,在扩容过程的同时支持 get 获取元素,这种无阻塞算法,大大提高了并发度。其中,整个扩容过程都是通过 CAS 控制 sizeCtl 这个属性来进行的,某个桶完成迁移会使用 ForwardingNode 节点填充以标识该桶迁移完毕。整个迁移过程类似 Redis 中的 rehash 过程。

ConcurrentHashMap 元素个数的存储采用的是分段存储思想,类似 LongAdder 的实现,提高并发度。

思考

ConcurrentHashMap 设计非常精妙,读完源码后收获很大,很多设计让人眼前一亮,也开阔了技术视野。

  • 自旋 + CAS,采用乐观锁的思想,避免线程阻塞,减少线程上下文切换的时间;
  • 分段锁的思想,使用 synchronized 锁住单个桶对象以减小锁范围;
  • 分段存储集合元素个数,减小并发更新一个字段的竞争;
  • 支持并发扩容,尽可能无阻塞实现读写;

参考:

漫画:什么是ConcurrentHashMap


文章作者: Huowy
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Huowy !
评论
  目录