java

位置:IT落伍者 >> java >> 浏览文章

Java多线程之ConcurrentHashMap深入分析


发布日期:2021年03月16日
 
Java多线程之ConcurrentHashMap深入分析

Map体系

Hashtable是JDK 之前Map唯一线程安全的内置实现(CollectionssynchronizedMap不算)Hashtable继承的是Dictionary(Hashtable是其唯一公开的子类)并不继承AbstractMap或者HashMap尽管Hashtable和HashMap的结构非常类似但是他们之间并没有多大联系

ConcurrentHashMap是HashMap的线程安全版本ConcurrentSkipListMap是TreeMap的线程安全版本

最终可用的线程安全版本Map实现是ConcurrentHashMap/ConcurrentSkipListMap/Hashtable/Properties四个但是Hashtable是过时的类库因此如果可以的应该尽可能的使用ConcurrentHashMap和ConcurrentSkipListMap

concurrentHashMap的结构

我们通过ConcurrentHashMap的类图来分析ConcurrentHashMap的结构

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成Segment是一种可重入锁ReentrantLock在ConcurrentHashMap里扮演锁的角色HashEntry则用于存储键值对数据一个ConcurrentHashMap里包含一个Segment数组Segment的结构和HashMap类似是一种数组和链表结构 一个Segment里包含一个HashEntry数组每个HashEntry是一个链表结构的元素 每个Segment守护者一个HashEntry数组里的元素当对HashEntry数组的数据进行修改时必须首先获得它对应的Segment锁

ConcurrentHashMap 类中包含两个静态内部类 HashEntry 和 SegmentHashEntry 用来封装映射表的键 / 值对Segment 用来充当锁的角色每个 Segment 对象守护整个散列映射表的若干个桶每个桶是由若干个 HashEntry 对象链接起来的链表一个 ConcurrentHashMap 实例中包含由若干个 Segment 对象组成的数组

ConcurrentHashMap实现原理

锁分离 (Lock Stripping)

比如HashTable是一个过时的容器类通过使用synchronized来保证线程安全在线程竞争激烈的情况下HashTable的效率非常低下原因是所有访问HashTable的线程都必须竞争同一把锁那假如容器里有多把锁每一把锁用于锁容器其中一部分数据那么当多线程访问容器里不同数据段的数据时线程间就不会存在锁竞争从而可以有效的提高并发访问效率这就是ConcurrentHashMap所使用的锁分段技术

ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分每个段其实就是一个小的hash table它们有自己的锁只要多个修改操作发生在不同的段上它们就可以并发进行同样当一个线程占用锁访问其中一个段数据的时候其他段的数据也能被其他线程访问

ConcurrentHashMap有些方法需要跨段比如size()和containsValue()它们可能需要锁定整个表而而不仅仅是某个段这需要按顺序锁定所有段操作完毕后又按顺序释放所有段的锁这里按顺序是很重要的否则极有可能出现死锁在ConcurrentHashMap内部段数组是final的并且其成员变量实际上也是final的但是仅仅是将数组声明为final的并不保证数组成员也是final的这需要实现上的保证这可以确保不会出现死锁因为获得锁的顺序是固定的不变性是多线程编程占有很重要的地位下面还要谈到

[java] view plaincopy /**

* The segments each of which is a specialized hash table

*/

final Segment<KV>[] segments;

不变(Immutable)和易变(Volatile)

ConcurrentHashMap完全允许多个读操作并发进行读操作并不需要加锁如果使用传统的技术如HashMap中的实现如果允许可以在hash链的中间添加或删除元素读操作不加锁将得到不一致的数据ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的HashEntry代表每个hash链中的一个节点其结构如下所示

[java] view plaincopy static final class HashEntry<KV> {

final K key;

final int hash;

volatile V value;

final HashEntry<KV> next;

}

可以看到除了value不是final的其它值都是final的这意味着不能从hash链的中间或尾部添加或删除节点因为这需要修改next引用值所有的节点的修改只能从头部开始对于put操作可以一律添加到Hash链的头部但是对于remove操作可能需要从中间删除一个节点这就需要将要删除节点的前面所有节点整个复制一遍最后一个节点指向要删除结点的下一个结点这在讲解删除操作时还会详述为了确保读操作能够看到最新的值将value设置成volatile这避免了加锁

ConcurrentHashMap具体实现

ConcurrentHashMap的初始化

ConcurrentHashMap初始化方法是通过initialCapacityloadFactor concurrencyLevel几个参数来初始化segments数组段偏移量segmentShift段掩码segmentMask和每个segment里的HashEntry数组

初始化segments数组让我们来看一下初始化segmentShiftsegmentMask和segments数组的源代码

[java] view plaincopy if (concurrencyLevel > MAX_SEGMENTS)

concurrencyLevel = MAX_SEGMENTS;

// Find poweroftwo sizes best matching arguments

int sshift = ;

int ssize = ;

while (ssize < concurrencyLevel) {

++sshift;

ssize 《= ;

}

segmentShift = sshift;

segmentMask = ssize ;

thissegments = SegmentnewArray(ssize)

由上面的代码可知segments数组的长度ssize通过concurrencyLevel计算得出为了能通过按位与的哈希算法来定位segments数组的索引必须保证segments数组的长度是的N次方(poweroftwo size)所以必须计算出一个是大于或等于concurrencyLevel的最小的的N次方值来作为segments数组的长度假如concurrencyLevel等于ssize都会等于即容器里锁的个数也是注意concurrencyLevel的最大大小是意味着segments数组的长度最大为对应的二进制是

初始化segmentShift和segmentMask这两个全局变量在定位segment时的哈希算法里需要使用sshift等于ssize从向左移位的次数在默认情况下concurrencyLevel等于需要向左移位移动所以sshift等于segmentShift用于定位参与hash运算的位数segmentShift等于减sshift所以等于这里之所以用是因为ConcurrentHashMap里的hash()方法输出的最大数是位的后面的测试中我们可以看到这点segmentMask是哈希运算的掩码等于ssize减掩码的二进制各个位的值都是因为ssize的最大长度是所以segmentShift最大值是segmentMask最大值是对应的二进制是每个位都是

初始化每个Segment输入参数initialCapacity是ConcurrentHashMap的初始化容量loadfactor是每个segment的负载因子在构造方法里需要通过这两个参数来初始化数组中的每个segment

上面代码中的变量cap就是segment里HashEntry数组的长度它等于initialCapacity除以ssize的倍数c如果c大于就会取大于等于c的的N次方值所以cap不是就是的N次方segment的容量threshold=(int)cap*loadFactor默认情况下initialCapacity等于loadfactor等于通过运算cap等于threshold等于零

[java] view plaincopy if (initialCapacity > MAXIMUM_CAPACITY)

initialCapacity = MAXIMUM_CAPACITY;

int c = initialCapacity / ssize;

if (c * ssize < initialCapacity)

++c;

int cap = ;

while (cap < c)

cap 《= ;

for (int i = ; i < thissegmentslength; ++i)

thissegments[i] = new Segment<KV>(cap loadFactor)

定位Segment

既然ConcurrentHashMap使用分段锁Segment来保护不同段的数据那么在插入和获取元素的时候必须先通过哈希算法定位到Segment可以看到ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再哈希

[java] view plaincopy private static int hash(int h) {

h += (h 《 ) ^ xffffcdd;

h ^= (h >>>

h += (h 《

h ^= (h >>>

h += (h 《 ) + (h 《

return h ^ (h >>>

}

之所以进行再哈希其目的是为了减少哈希沖突使元素能够均匀的分布在不同的Segment上从而提高容器的存取效率假如哈希的质量差到极点那么所有的元素都在一个Segment中不仅存取元素缓慢分段锁也会失去意义我做了一个测试不通过再哈希而直接执行哈希计算

[java] view plaincopy Systemoutprintln(IntegerparseInt( ) &

Systemoutprintln(IntegerparseInt( ) &

Systemoutprintln(IntegerparseInt( ) &

Systemoutprintln(IntegerparseInt( ) &

计算后输出的哈希值全是通过这个例子可以发现如果不进行再哈希哈希沖突会非常严重因为只要低位一样无论高位是什么数其哈希值总是一样我们再把上面的二进制数据进行再哈希后结果如下为了方便阅读不足位的高位补了每隔四位用竖线分割下

[java] view plaincopy |||||||

|||||||

|||||||

|||||||

可以发现每一位的数据都散列开了通过这种再哈希能让数字的每一位都能参加到哈希运算当中从而减少哈希沖突ConcurrentHashMap通过以下哈希算法定位segment

[java] view plaincopy final Segment<KV> segmentFor(int hash) {

return segments[(hash >>> segmentShift) & segmentMask];

}

默认情况下segmentShift为segmentMask为再哈希后的数最大是位二进制数据向右无符号移动意思是让高位参与到hash运算中 (hash >>> segmentShift) & segmentMask的运算结果分别是可以看到hash值没有发生沖突

ConcurrentHashMap的get操作

前面提到过ConcurrentHashMap的get操作是不用加锁的我们这里看一下其实现

[java] view plaincopy public V get(Object key) {

int hash = hash(keyhashCode())

return segmentFor(hash)get(key hash)

}

看第三行segmentFor这个函数用于确定操作应该在哪一个segment中进行几乎对ConcurrentHashMap的所有操作都需要用到这个函数我们看下这个函数的实现

[java] view plaincopy final Segment<KV> segmentFor(int hash) {

return segments[(hash >>> segmentShift) & segmentMask];

}

这个函数用了位操作来确定Segment根据传入的hash值向右无符号右移segmentShift位然后和segmentMask进行与操作结合我们之前说的segmentShift和segmentMask的值就可以得出以下结论假设Segment的数量是的n次方根据元素的hash值的高n位就可以确定元素到底在哪一个Segment中

在确定了需要在哪一个segment中进行操作以后接下来的事情就是调用对应的Segment的get方法

[java] view plaincopy V get(Object key int hash) {

if (count != ) { // readvolatile

HashEntry<KV> e = getFirst(hash)

while (e != null) {

if (ehash == hash && keyequals(ekey)) {

V v = evalue;

if (v != null)

return v;

return readValueUnderLock(e) // recheck

}

e = enext;

}

}

return null;

}

先看第二行代码这里对count进行了一次判断其中count表示Segment中元素的数量我们可以来看一下count的定义

[java] view plaincopy transient volatile int count;

可以看到count是volatile的实际上这里里面利用了volatile的语义

对volatile字段的写入操作happensbefore于每一个后续的同一个字段的读操作

因为实际上putremove等操作也会更新count的值所以当竞争发生的时候volatile的语义可以保证写操作在读操作之前也就保证了写操作对后续的读操作都是可见的这样后面get的后续操作就可以拿到完整的元素内容

然后在第三行调用了getFirst()来取得链表的头部

[java] view plaincopy HashEntry<KV> getFirst(int hash) {

HashEntry<KV>[] tab = table;

return tab[hash & (tablength )];

}

同样这里也是用位操作来确定链表的头部hash值和HashTable的长度减一做与操作最后的结果就是hash值的低n位其中n是HashTable的长度以为底的结果

在确定了链表的头部以后就可以对整个链表进行遍历看第取出key对应的value的值如果拿出的value的值是null则可能这个keyvalue对正在put的过程中如果出现这种情况那么就加锁来保证取出的value是完整的如果不是null则直接返回value

ConcurrentHashMap的put操作

看完了get操作再看下put操作put操作的前面也是确定Segment的过程这里不再赘述直接看关键的segment的put方法

[java] view plaincopy V put(K key int hash V value boolean onlyIfAbsent) {

lock()

try {

int c = count;

if (c++ > threshold) // ensure capacity

rehash()

HashEntry<KV>[] tab = table;

int index = hash & (tablength

HashEntry<KV> first = tab[index];

HashEntry<KV> e = first;

while (e != null && (ehash != hash || !keyequals(ekey)))

e = enext;

V oldValue;

if (e != null) {

oldValue = evalue;

if (!onlyIfAbsent)

evalue = value;

}

else {

oldValue = null;

++modCount;

tab[index] = new HashEntry<KV>(key hash first value)

count = c; // writevolatile

}

return oldValue;

} finally {

unlock()

}

}

首先对Segment的put操作是加锁完成的然后在第五行如果Segment中元素的数量超过了阈值(由构造函数中的loadFactor算出)这需要进行对Segment扩容并且要进行rehash关于rehash的过程大家可以自己去了解这里不详细讲了

和第行的操作就是getFirst的过程确定链表头部的位置

行这里的这个while循环是在链表中寻找和要put的元素相同key的元素如果找到就直接更新更新key的value如果没有找到则进入行这里生成一个新的HashEntry并且把它加到整个Segment的头部然后再更新count的值

ConcurrentHashMap的remove操作

Remove操作的前面一部分和前面的get和put操作一样都是定位Segment的过程然后再调用Segment的remove方法

[java] view plaincopy V remove(Object key int hash Object value) {

lock()

try {

int c = count ;

HashEntry<KV>[] tab = table;

int index = hash & (tablength

HashEntry<KV> first = tab[index];

HashEntry<KV> e = first;

while (e != null && (ehash != hash || !keyequals(ekey)))

e = enext;

V oldValue = null;

if (e != null) {

V v = evalue;

if (value == null || valueequals(v)) {

oldValue = v;

// All entries following removed node can stay

// in list but all preceding ones need to be

// cloned

++modCount;

HashEntry<KV> newFirst = enext;

for (HashEntry<KV> p = first; p != e; p = pnext)

newFirst = new HashEntry<KV>(pkey phash

newFirst pvalue)

tab[index] = newFirst;

count = c; // writevolatile

}

}

return oldValue;

} finally {

unlock()

}

}

首先remove操作也是确定需要删除的元素的位置不过这里删除元素的方法不是简单地把待删除元素的前面的一个元素的next指向后面一个就完事了我们之前已经说过HashEntry中的next是final的一经赋值以后就不可修改在定位到待删除元素的位置以后程序就将待删除元素前面的那一些元素全部复制一遍然后再一个一个重新接到链表上去看一下下面这一幅图来了解这个过程

假设链表中原来的元素如上图所示现在要删除元素那么删除元素以后的链表就如下图所示

ConcurrentHashMap的size操作

在前面的章节中我们涉及到的操作都是在单个Segment中进行的但是ConcurrentHashMap有一些操作是在多个Segment中进行比如size操作ConcurrentHashMap的size操作也采用了一种比较巧的方式来尽量避免对所有的Segment都加锁

前面我们提到了一个Segment中的有一个modCount变量代表的是对Segment中元素的数量造成影响的操作的次数这个值只增不减size操作就是遍历了两次Segment每次记录Segment的modCount值然后将两次的modCount进行比较如果相同则表示期间没有发生过写入操作就将原先遍历的结果返回如果不相同则把这个过程再重复做一次如果再不相同则就需要将所有的Segment都锁住然后一个一个遍历了具体的实现大家可以看ConcurrentHashMap的源码这里就不贴了

               

上一篇:Java线程:深入ThreadLocal

下一篇:Java 基本功之中断线程的理解