借鉴ConcurrentHashMap 并发处理技巧

这篇文章只是简单总结下ConcurrentHashMap的精髓,并没有详细分析源码。

网上解析ConcurrentHashMap源码的文章很多,有详细的,也有简单介绍的。但是个人认为如果不是要去排查Bug的话,那也没有比较一行行的斟酌,看源码应该梳理整理结构,理解作者大体思路即可,当然,这仅仅是个人看法。

ConcurrentHashMap作为并发容器,在并发不是特别高的情况下,性能比同步容器能快很多,这最大的功劳得益于CAS的使用。通过ConcurrnetHashMap的源码,可以学习到很多精髓的思想,接下来一一列举:

以下ConcurrentHashMap简称CMP

CAS

如果说可以使用CMP来替换锁的使用,可能我会写出如下代码:

private boolean lock=false;

public void put(){
    while(cas(lock,false,true)){
        //执行操作
    }
}

嗯,使用的方式和synchronized一模一样,但是增加了自旋的功能,同时不会引起上下文切换。

然而CAS能做的不仅仅这些,上面的代码中,CAS所带来的收益仅仅是减少了锁排队的开销,然而在CMP中,CAS的功能性被用到了极致,CMP通过尝试代码执行中所有的可能性,将整个方法通过CAS拆分为多个可能,每个可能仅仅尝试使用一次CAS替换,极大的减少了争用的可能。

talk is cheap,show me the code

CMP#putVal中,如果使用以上代码模板对putVal加锁,那么依然会出现如果1000个线程同时put,那么这1000个线程依然需要排队等待处理。然而在CMP代码中,代码模板如下:

public V putVal(K key,V value){
    //死循环
    while(true){
        //如果没有初始化,则尝试初始化
        if(tab==null){
            cas(tab,null,new table());
        }
        //如果当前数组节点为空,则尝试添加一个新的链表节点
        else if(tableAt(hash)==null){
            if(casTableAt(hash,null,new Node())){
                break;
            }
        }
        //如果当前节点被标记为正在扩容
        else if(f.hash == MOVED){
            //帮助一起扩容
            helpTransfer
        }
        //否则,对node加锁,遍历节点并访问
        else{
            synchronized(f){
                //...
                break;
            }
        }
    }
}

可以看到,在putVal方法中,CMP详细的分析了当添加一个节点可能出现的情况,同时针对所有的情况进行的尝试性的操作,这样也避免了if-then-act的竞态问题。在源码里面,各个情况都被包含在一个整体的while(true)循环中,然后通过CAS处理各种情况,只有当出现了节点真正的被添加的情况,才会退出循环,这里体现的一个并发优化思想便是锁细化,将锁的粒度控制在最小的范围

这样做最大的好处就在于可以有多个线程在同一个方法中同时执行,比如:

  • 第一个线程正好执行第一个if,进行初始化
  • 第二个线程正好执行第二个if,进行添加链表节点
  • 第三个线程正好执行扩容
  • 第四个线程正好执行遍历节点

可以看到,要是符号条件,可能正好有4个线程同时各司其职,穿梭执行,通过这种细化的方式,真正的实现了非阻塞的概念,因为整个方法被包裹在一个while(true)循环中,因此不用担心第一个if执行完后,当前Map被其他线程修改,毕竟每次执行都有CAS进行条件判断,这便是CASsynchronized性能高的另外一个原因。

任务协助

在平时我们使用锁时候,如果一个线程获取了锁,那么其他线程只能等待,使用CAS的线程一般都会自旋轮询,然而对于一个大型的任务来说,自旋可能是比较浪费CPU的行为,而在CMP中,CAS发现某种条件不满足时,并没有一直空转,而是进行了将自己转换为一个消费者,帮助消费其他线程正在执行但是还未完成的任务。

同样在putVal方法中,

//如果当前节点被标记为正在扩容
else if(f.hash == MOVED){
    //帮助一起扩容
    helpTransfer
}

helpTransfertransfer的代码比较复杂和繁琐,因此这里便不贴出来一一解析,这里大体讲解下思想:

Transfer方法中,Node[]数组被当做一个任务池,每个线程每次都可以在Node[]这里划分一部分(16)任务,每个线程都是一个消费者,当其他现在在put元素的时候,发现有当前正在扩容,也会成为一个消费者去领取任务进行扩容。

对于CAS来说,如果无法获取到指定的条件,那么空转也只会浪费CPU,反正都是非阻塞,将自己也加入其他任务的执行,能够更好地利用CPU

这种思想不仅仅在CMP出现,在ConcurrentLinkedList插入新元素,更新尾结点的过程中,如果当前线程发现出现了中间状态,也会协助完成中间状态,而不是等待上一个线程执行完成。

分段加锁

分段的思想在JDK1.7HashMap源码中体现的最为突出,在CMP中也有,对于CAS来说,在争用比较大的时候,由于每次CAS都无法达到条件,可能使得CAS的效率反而降低。对于CMP来说,每次修改操作,都需要修改CMPsize,因此如果并发过大,那么CAS修改size可能会成为性能瓶颈。

CMP中,计算size的方法如下:

     final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

目的就是为了减少CAS更新baseCount的冲突,在更新size时,线程会先尝试更新baseCount,如果失败,则随机选择CounterCell[]数组中一个位置,将其值进行+1操作即可,这样极大的减少了线程的冲突。

避免伪共享

伪共享: 不同CPU同时写一个缓存行中的不同数据,导致两个CPU中的缓存交替失效的问题。

这是一个多线程中老生常谈的问题,在CMP的源码中也有出现:

@sun.misc.Contended 
static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

可以看到,在更新size()大小的时候,对于CounterCell中的value元素还是用了内存填充来避免伪共享问题,使得每次更新value的时候,不会锁定其他值。

可以看到CMP中为了优化size()操作,做了极大的努力。然而这个size()依然是一个不确定的值,并且目前还没办法进行加锁操作。

总结

Doug Lea真是永远的神,这样的代码即使我能想到,也不敢用在生产环境中,能运用这样的思想,写出如此精妙的代码,需要丰富的知识作为支撑,多学多看,共勉。