ConcurrentHashMap,可以理解成是HashMap的线程安全版本,同样的,内部使用也是(数组 + 链表 + 红黑树)的结构来存储元素。同时相比于同样提供线程安全的HashTable,效率等各方面都有所提升

概述

ConcurrentHashMap
ConcurrentHashMap

阅读套路

按照正常的逻辑, 首先基础的数据构造, 然后了解其构造方法, 之后了解常用的API(CRUD)操作即可。

  • [x] 增
  • [x] 删
  • [x] 改
  • [x] 查

类的属性

// 和hashmap类似
transient volatile Node<K,V>[] table;
// 用于并发情况下的扩容控制
private transient volatile Node<K,V>[] nextTable;
// 并发安全控制
private transient volatile long baseCount;

private transient volatile int sizeCtl;

private transient volatile int transferIndex;

private transient volatile int cellsBusy;

private transient volatile CounterCell[] counterCells;
// views
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;

private static final int MAXIMUM_CAPACITY = 1 << 30;
/**
 * The default initial table capacity.  Must be a power of 2
 * (i.e., at least 1) and at most MAXIMUM_CAPACITY.
 */
private static final int DEFAULT_CAPACITY = 16;
/**
 * The largest possible (non-power of two) array size.
 * Needed by toArray and related methods.
 */
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
 * The default concurrency level for this table. Unused but
 * defined for compatibility with previous versions of this class.
 */
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
 * The load factor for this table. Overrides of this value in
 * constructors affect only the initial table capacity.  The
 * actual floating point value isn't normally used -- it is
 * simpler to use expressions such as {@code n - (n >>> 2)} for
 * the associated resizing threshold.
 */
private static final float LOAD_FACTOR = 0.75f;
/**
 * The bin count threshold for using a tree rather than list for a
 * bin.  Bins are converted to trees when adding an element to a
 * bin with at least this many nodes. The value must be greater
 * than 2, and should be at least 8 to mesh with assumptions in
 * tree removal about conversion back to plain bins upon
 * shrinkage.
 */
static final int TREEIFY_THRESHOLD = 8;
/**
 * The bin count threshold for untreeifying a (split) bin during a
 * resize operation. Should be less than TREEIFY_THRESHOLD, and at
 * most 6 to mesh with shrinkage detection under removal.
 */
static final int UNTREEIFY_THRESHOLD = 6;
/**
 * The smallest table capacity for which bins may be treeified.
 * (Otherwise the table is resized if too many nodes in a bin.)
 * The value should be at least 4 * TREEIFY_THRESHOLD to avoid
 * conflicts between resizing and treeification thresholds.
 */
static final int MIN_TREEIFY_CAPACITY = 64;
/**
 * Minimum number of rebinnings per transfer step. Ranges are
 * subdivided to allow multiple resizer threads.  This value
 * serves as a lower bound to avoid resizers encountering
 * excessive memory contention.  The value should be at least
 * DEFAULT_CAPACITY.
 */
private static final int MIN_TRANSFER_STRIDE = 16;
/**
 * The number of bits used for generation stamp in sizeCtl.
 * Must be at least 6 for 32bit arrays.
 */
private static int RESIZE_STAMP_BITS = 16;
/**
 * The maximum number of threads that can help resize.
 * Must fit in 32 - RESIZE_STAMP_BITS bits.
 */
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
/**
 * The bit shift for recording size stamp in sizeCtl.
 */
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/*
 * Encodings for Node hash fields. See above for explanation.
 */
static final int MOVED     = -1; // hash for forwarding nodes
static final int TREEBIN   = -2; // hash for roots of trees
static final int RESERVED  = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
/** Number of CPUS, to place bounds on some sizings */
static final int NCPU = Runtime.getRuntime().availableProcessors();

构造方法

public ConcurrentHashMap() {
}


public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

构造方法与HashMap对比可以发现,没有了HashMap中的threshold和loadFactor,而是改用了sizeCtl来控制,而且只存储了容量在里面,那么它是怎么用的呢?官方给出的解释如下:

Table initialization and resizing control.  When negative, the
table is being initialized or resized: -1 for initialization,
else -(1 + the number of active resizing threads).  Otherwise,
when table is null, holds the initial table size to use upon
creation, or 0 for default. After initialization, holds the
next element count value upon which to resize the table.
  1. -1 有线程正在初始化
  2. -(1 + n) 有n个线程正在一起扩容
  3. 0 默认值, 后续在真正初始化的时候使用默认容量
  4. >0 初始化或扩容完成后下一次的扩容门槛

public V put(K key, V value) {
    return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
      // key和value都不能为null
    if (key == null || value == null) throw new NullPointerException();
    // 计算hash值
      int hash = spread(key.hashCode());
    // 要插入的元素所在桶的元素个数
    int binCount = 0;
    // 死循环,结合CAS使用(如果CAS失败,则会重新取整个桶进行下面的流程)
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            // 如果桶未初始化或者桶个数为0,则初始化桶
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 如果要插入的元素所在的桶还没有元素,则把这个元素插入到这个桶中
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                // 如果使用CAS插入元素时,发现已经有元素了,则进入下一次循环,重新操作
                // 如果使用CAS插入元素成功,则break跳出循环,流程结束
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            // 如果要插入的元素所在的桶的第一个元素的hash是MOVED,则当前线程帮忙一起迁移元素
            tab = helpTransfer(tab, f);
        else {
            // 如果这个桶不为空且不在迁移元素,则锁住这个桶(分段锁)
            // 并查找要插入的元素是否在这个桶中
            // 存在,则替换值(onlyIfAbsent=false)
            // 不存在,则插入到链表结尾或插入树中
            V oldVal = null;
            synchronized (f) {
                // 再次检测第一个元素是否有变化,如果有变化则进入下一次循环,从头来过
                if (tabAt(tab, i) == f) {
                    // 如果第一个元素的hash值大于等于0(说明不是在迁移,也不是树)
                    // 那就是桶中的元素使用的是链表方式存储
                    if (fh >= 0) {
                        // 桶中元素个数赋值为1
                        binCount = 1;
                        // 遍历整个桶,每次结束binCount + 1
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 在链表中找到了改元素,覆盖 并退出
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 如果没有找到 则 直接在尾部插入改元素
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 树上找点
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // 如果binCount不为0,说明成功插入了元素或者寻找到了元素
            if (binCount != 0) {
                // 如果链表元素个数达到了8,则尝试树化
                // 因为上面把元素插入到树中时,binCount只赋值了2,并没有计算整个树中元素的个数
                // 所以不会重复树化
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 成功插入元素,元素个数加1(是否要扩容在这个里面)
    addCount(1L, binCount);
    return null;
}

流程和hashmap类似:

  1. 如果hash桶未初始化,则先初始化
  2. 如果当前位置无元素,则通过cas的方式放入元素
  3. 如果当前位置存在元素
    1. 是转移节点(正在扩容),则将当前线程一起加入到扩容中
    2. 不是转移节点,锁这个桶
      1. 如果为链表节点,进行在链表上查找该元素,找到?覆盖:执行插入操作
      2. 如果为树节点,在红黑树中查找该元素,找到?覆盖:执行插入操作
  4. 存在该元素时,返回旧value
  5. 不存在,整个map的元素+ 1,并检查是否需要扩容

因为是线程安全的操作,所以仔细看下在整个put阶段,采用了哪些手段保证线程安全。这个才是我们看CHM源码的关键

数组初始化时的线程安全措施

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // 小于 0 代表有线程正在初始化,释放当前 CPU 的调度权,重新发起锁的竞争
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // CAS 赋值保证当前只有一个线程在初始化,
        // -1  代表当前只有一个线程能初始化 
        // 保证了数组的初始化的安全性
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {              
                                // 很有可能执行到这里的时候,table 已经不为空了,这里是双重 check
                              // 如果把sizeCtl原子更新为-1成功,则当前线程进入初始化
                // 如果原子更新失败则说明有其它线程先一步进入初始化了,则进入下一次循环           
                // 如果下一次循环时还没初始化完毕,则sizeCtl<0进入上面if的逻辑让出CPU           
                // 如果下一次循环更新完毕了,则table.length!=0,退出循环
                if ((tab = table) == null || tab.length == 0) {
                    // 进行初始化
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

小总结

数组初始化时:

  1. 使用CAS锁控制只有一个线程初始化桶数组并且一定能初始化成功;

  2. 然后通过 CAS 设置 sizeCtl 变量的值,来保证同一时刻只能有一个线程对数组进行初始化,CAS 成功之后,还会再次判断当前数组 是否已经初始化完成,如果已经初始化完成,就不会继续再执行初始化

  3. 扩容门槛写死的是桶数组大小的0.75倍,桶数组大小即map的容量,也就是最多存储多少个元素。

通过自旋 + cas + double check 保障了数组初始化时的线程安全。

Java线程中的一些方法的区别(sleep、yield、wait、join)

只有runnable到running时才会占用cpu时间片,其他都会出让cpu时间片。
线程的资源有不少,整体来看可分为如下两类

  1. CPU资源
  2. 锁资源

分析以上四个方法时,查看这两个共享资源是否释放,区分更加明显

  1. sleep:Thread类的方法,必须带一个时间参数。会让当前线程休眠进入阻塞状态并释放CPU(阿里面试题 Sleep释放CPU,wait 也会释放cpu,因为cpu资源太宝贵了,只有在线程running的时候,才会获取cpu片段),提供其他线程运行的机会且不考虑优先级,但如果有同步锁则sleep不会释放锁即其他线程无法获得同步锁 可通过调用interrupt()方法来唤醒休眠线程。

  2. yield:让出CPU调度,Thread类的方法,类似sleep只是不能由用户指定暂停多长时间 ,并且yield()方法只能让同优先级的线程有执行的机会。 yield()只是使当前线程重新回到可执行状态,所以执行yield()的线程有可能在进入到可执行状态后马上又被执行。调用yield方法只是一个建议,告诉线程调度器我的工作已经做的差不多了,可以让别的相同优先级的线程使用CPU了,没有任何机制保证采纳。

  3. wait:Object类的方法(notify()、notifyAll() 也是Object对象),必须放在循环体和同步代码块中,执行该方法的线程会释放锁,进入线程等待池中等待被再次唤醒(notify随机唤醒,notifyAll全部唤醒,线程结束自动唤醒)即放入锁池中竞争同步锁

  4. join:一种特殊的wait,当前运行线程调用另一个线程的join方法,当前线程进入阻塞状态直到另一个线程运行结束等待该线程终止。 注意该方法也需要捕捉异常。等待调用join方法的线程结束,再继续执行。如:t.join();//主要用于等待t线程运行结束,若无此句,main则会执行完毕,导致结果不可预测。

在桶位置放入元素时的线程安全措施(4个措施)

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null,
                 new Node<K,V>(hash, key, value, null)))
        break;                   // no lock when adding to empty bin
}

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
  1. 通过自旋死循环保证一定可以新增成功 1⃣️

    1. 在新增之前,通过==for (Node[] tab = table;;)==这样的死循环来保证新增一定可以成功,一旦新增成功,就可以退出当前死循环,新增失败的话,会重复新增的步骤,直到新增成功为止。
  2. 当前hash桶是否有元素:

    1. 无元素 cas 新增 2⃣️

      Java 这里的写法非常严谨,没有在判断槽点为空的情况下直接赋值,因为在判断槽点为空和赋 值的瞬间,很有可能槽点已经被其他线程赋值了,所以我们采用 CAS 算法,能够保证槽点为空 的情况下赋值成功,如果恰好槽点已经被其他线程赋值,当前 CAS 操作失败,会再次执行 for 自旋,再走槽点有值的 put 流程,这里就是自旋 + CAS 的结合。

    2. 有元素 锁住当前点 (synchronized)3⃣️

      put 时,如果当前槽点有值,就是 key 的 hash 冲突的情况,此时槽点上可能是链表或红黑树, 我们通过锁住槽点,来保证同一时刻只会有一个线程能对槽点进行修改

  3. 红黑树旋转时, 锁住红黑树的根节点,保证同一时刻,当前红黑树只能被一个线程旋转 4⃣️

扩容时的线程安全措施

ConcurrentHashMap 的扩容时机和 HashMap 相同,都是在 put 方法的最后一步检查是否需要扩 容,如果需要则进行扩容,但两者扩容的过程完全不同,ConcurrentHashMap 扩容的方法叫做 transfer,从 put 方法的 addCount 方法进去,就能找到 transfer 方法

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
      // 这里使用的思想跟LongAdder类是一模一样的(后面会讲)
        // 把数组的大小存储根据不同的线程存储到不同的段上(也是分段锁的思想)
        // 并且有一个baseCount,优先更新baseCount,如果失败了再更新不同线程对应的段
        // 这样可以保证尽量小的减少冲突
        // 先尝试把数量加到baseCount上,如果失败再加到分段的CounterCell上
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

// 真正触发扩容的方式
// 扩容主要分 2 步,
// 1. 第一新建新的空数组,
// 2. 第二移动拷贝每个元素到新数组中去 
// tab:原数组,nextTab:新数组
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    // 老数组的长度
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    // 如果新数组为空,初始化,大小为原数组的两倍,n << 1
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    // 新的table长度
    int nextn = nextTab.length;
    // 代表转移节点,如果原数组上是转移节点,说明该节点正在被扩容 (node的hash值为-1)
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    // 无限自旋,i 的值会从原数组的最大值开始,慢慢递减到 0
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            // 结束循环的标志
            if (--i >= bound || finishing)
                advance = false;
            // 已经拷贝完成
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // 每次减少 i 的值
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // if 任意条件满足说明拷贝结束了
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 拷贝结束,直接赋值,因为每次拷贝完一个节点,都在原数组上放转移节点,所以拷 贝完成的节点的数据一定不会再发生变化。
            // 原数组发现是转移节点,是不会操作的,会一直等待转移节点消失之后在进行操作。
                      // 也就是说数组节点一旦被标记为转移节点,是不会再发生任何变动的,所以不会有任 何线程安全的问题
                        // 所以此处直接赋值,没有任何问题。
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // 进行节点的拷贝
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        // 桶内只有的单个数据, 直接拷贝,为链表,循环多次形成链表拷贝
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 在新数组位置上放置拷贝的值
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        // 在老数组位置上放上 ForwardingNode 节点
                        // put 时,发现是 ForwardingNode 节点,就不会再动这个节点的数
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    // 红黑树的拷贝
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

小结

transfer方法的主要思路:

  1. 首先需要把老数组的值全部拷贝到扩容之后的新数组上,先从数组的队尾开始拷贝
  2. 拷贝数组的槽点时,先把原数组槽点锁住,保证原数组槽点不能操作,成功拷贝到新数组时, 把原数组槽点赋值为转移节点
  3. 这时如果有新数据正好需要 put 到此槽点时,发现槽点为转移节点,就会一直等待,所以在 扩容完成之前,该槽点对应的数据是不会发生变化的
  4. 从数组的尾部拷贝到头部,每拷贝成功一次,就把原数组中的节点设置成转移节点;
  5. 直到所有数组数据都拷贝到新数组时,直接把新数组整个赋值给数组容器,拷贝完成。

通过在原数组上设置转移节点,put 时碰到转移节点时会等待扩容成 功之后才能 put 的策略,来保证了整个扩容过程中肯定是线程安全的,因为数组的槽点一旦被设 置成转移节点,在没有扩容完成之前,是无法进行操作的。

(1)新桶数组大小是旧桶数组的两倍;

(2)迁移元素先从靠后的桶开始;

(3)迁移完成的桶在里面放置一ForwardingNode类型的元素,标记该桶迁移完成;

(4)迁移时根据hash&n是否等于0把桶中元素分化成两个链表或树;

(5)低位链表(树)存储在原来的位置;

(6)高们链表(树)存储在原来的位置加n的位置;

(7)迁移元素时会锁住当前桶,也是分段锁的思想;

总结

(1)ConcurrentHashMap是HashMap的线程安全版本;

(2)ConcurrentHashMap采用(数组 + 链表 + 红黑树)的结构存储元素;

(3)ConcurrentHashMap相比于同样线程安全的HashTable,效率要高很多;

(4)ConcurrentHashMap采用的锁有 synchronized,CAS,自旋锁,分段锁,volatile等;

(5)ConcurrentHashMap中没有threshold和loadFactor这两个字段,而是采用sizeCtl来控制;

(6)sizeCtl = -1,表示正在进行初始化;

(7)sizeCtl = 0,默认值,表示后续在真正初始化的时候使用默认容量;

(8)sizeCtl > 0,在初始化之前存储的是传入的容量,在初始化或扩容后存储的是下一次的扩容门槛;

(9)sizeCtl = (resizeStamp << 16) + (1 + nThreads),表示正在进行扩容,高位存储扩容邮戳,低位存储扩容线程数加1;

(10)更新操作时如果正在进行扩容,当前线程协助扩容;

(11)更新操作会采用synchronized锁住当前桶的第一个元素,这是分段锁的思想;

(12)整个扩容过程都是通过CAS控制sizeCtl这个字段来进行的,这很关键;

(13)迁移完元素的桶会放置一个ForwardingNode节点,以标识该桶迁移完毕;

(14)元素个数的存储也是采用的分段思想,类似于LongAdder的实现;

(15)元素个数的更新会把不同的线程hash到不同的段上,减少资源争用;

(16)元素个数的更新如果还是出现多个线程同时更新一个段,则会扩容段(CounterCell);

(17)获取元素个数是把所有的段(包括baseCount和CounterCell)相加起来得到的;

(18)查询操作是不会加锁的,所以ConcurrentHashMap不是强一致性的;

(19)ConcurrentHashMap中不能存储key或value为null的元素;

一些思想

  1. cas + 自选 乐观锁的思想,减少线程上下文切换的时间
  2. 分段锁思想,减少同一时间 竞争锁带来的低效问题
  3. CounterCell,分段存储元素,减少多线程同时更新一个字段带来的低效
  4. 多线程协同进行扩容