Java岗大厂面试百日冲刺【Day12】—— 集合框架2(HashMap)

转载自: Java岗大厂面试百日冲刺【Day12】—— 集合框架2(HashMap)

本文已获得原作者 _陈哈哈 授权并经过重新整理规划后发布。

本栏目Java开发岗高频面试题主要出自以下各技术栈:Java基础知识、集合容器、并发编程、JVM、Spring全家桶、MyBatis等ORMapping框架、MySQL数据库、Redis缓存、RabbitMQ消息队列、Linux操作技巧等。

面试题1:说一下 HashMap 的实现原理?

众所周知,HashMap是一个用于存储Key-Value键值对的集合,每一个键值对也叫做Entry(包括Key-Value),其中Key 和 Value 允许为null。这些个键值对(Entry)分散存储在一个数组当中,这个数组就是HashMap的主干。另外,HashMap数组每一个元素的初始值都是Null。

InterviewForJavaByThreeQuestionsADay12-1.webp

值得注意的是:HashMap不能保证映射的顺序,插入后的数据顺序也不能保证一直不变(如扩容后rehash)。

要说HashMap的原理,首先要先了解他的数据结构

InterviewForJavaByThreeQuestionsADay12-2.webp

如上图为JDK1.8版本的数据结构,其实HashMap在JDK1.7及以前是一个“链表散列”的数据结构,即数组 + 链表的结合体。JDK8优化为:数组+链表+红黑树

我们常把数组中的每一个节点称为一个。当向桶中添加一个键值对时,首先计算键值对中key的hash值(hash(key)),以此确定插入数组中的位置(即哪个桶),但是可能存在同一hash值的元素已经被放在数组同一位置了,这种现象称为碰撞,这时按照尾插法(jdk1.7及以前为头插法)的方式添加key-value到同一hash值的元素的最后面,链表就这样形成了。

当链表长度超过8(TREEIFY_THRESHOLD - 阈值)时,链表就自行转为红黑树。

注意:同一hash值的元素指的是key内容一样么?不是。根据hash算法的计算方式,是将key值转为一个32位的int值(近似取值),key值不同但key值相近的很可能hash值相同,如key=“a”和key=“aa”等。

通过上述回答的内容,我们明显给了面试官往深入问的多个诱饵,根据我们的回答,下一步他多可能会追问这些问题:

  1. 如何实现HashMap的有序?
  2. put方法原理是怎么实现的?
  3. 扩容机制原理 → 初始容量、加载因子 → 扩容后的rehash(元素迁移)
  4. 插入后的数据顺序会变的原因是什么?
  5. HashMap在JDK1.7-JDK1.8都做了哪些优化?
  6. 链表红黑树如何互相转换?阈值多少?
  7. 头插法改成尾插法为了解决什么问题?

而我们,当然是提前准备好如何回答好这些问题!当你的回答超过面试同学的认知范围时,主动权就到我们手里了。

如何实现HashMap的有序?

使用LinkedHashMapTreeMap

LinkedHashMap内部维护了一个单链表,有头尾节点,同时LinkedHashMap节点Entry内部除了继承HashMap的Node属性,还有before 和 after用于标识前置节点和后置节点。可以实现按插入的顺序或访问顺序排序。

/**
 * The head (eldest) of the doubly linked list.
*/
transient LinkedHashMap.Entry<K,V> head;

/**
  * The tail (youngest) of the doubly linked list.
*/
transient LinkedHashMap.Entry<K,V> tail;
//将加入的p节点添加到链表末尾
private void linkNodeLast(LinkedHashMap.Entry<K,V> p) {
  LinkedHashMap.Entry<K,V> last = tail;
  tail = p;
  if (last == null)
    head = p;
  else {
    p.before = last;
    last.after = p;
  }
}
//LinkedHashMap的节点类
static class Entry<K,V> extends HashMap.Node<K,V> {
  Entry<K,V> before, after;
  Entry(int hash, K key, V value, Node<K,V> next) {
    super(hash, key, value, next);
  }
}

示例代码:

public static void main(String[] args) {
  Map<String, String> linkedMap = new LinkedHashMap<String, String>();
  linkedMap.put("1", "占便宜");
  linkedMap.put("2", "没够儿");
  linkedMap.put("3", "吃亏");
  linkedMap.put("4", "难受");

  for(linkedMap.Entry<String,String> item: linkedMap.entrySet()){
    System.out.println(item.getKey() + ":" + item.getValue());
  }
}

输出结果:

1:占便宜
2:没够儿
3:吃亏
4:难受

那TreeMap怎么实现有序的?

TreeMap是按照Key的自然顺序或者Comprator的顺序进行排序,内部是通过红黑树来实现。

  1. TreeMap实现了SortedMap接口,它是一个key有序的Map类。
  2. 要么key所属的类实现Comparable接口,或者自定义一个实现了Comparator接口的比较器,传给TreeMap用于key的比较。
TreeMap<String, String> map = new TreeMap<String, String>(new Comparator<String>() {

    @Override
    public int compare(String o1, String o2) {
            return o2.compareTo(o1);
    }
    
});

put方法原理是怎么实现的?

该条问答摘自 安琪拉的博客

InterviewForJavaByThreeQuestionsADay12-3.webp

  1. 判断数组是否为空,为空进行初始化;
  2. 不为空,计算 k 的 hash 值,通过(n - 1) & hash计算应当存放在数组中的下标 index;
  3. 查看 table[index] 是否存在数据,没有数据就构造一个Node节点存放在 table[index] 中;
  4. 存在数据,说明发生了hash冲突(存在二个节点key的hash值一样), 继续判断key是否相等,相等,用新的value替换原数据(onlyIfAbsent为false)
  5. 如果不相等,判断当前节点类型是不是树型节点,如果是树型节点,创造树型节点插入红黑树中;(如果当前节点是树型节点证明当前已经是红黑树了)
  6. 如果不是树型节点,创建普通Node加入链表中;判断链表长度是否大于 8并且数组长度大于64, 大于的话链表转换为红黑树;
  7. 插入完成之后判断当前节点数是否大于阈值,如果大于开始扩容为原数组的二倍。

下面我们看看源码中的内容:

/**
 * 将指定参数key和指定参数value插入map中,如果key已经存在,那就替换key对应的value
 * 
 * @param key 指定key
 * @param value 指定value
 * @return 如果value被替换,则返回旧的value,否则返回null。当然,可能key对应的value就是null。
 */
public V put(K key, V value) {
    //putVal方法的实现就在下面
    return putVal(hash(key), key, value, false, true);
}

从源码中可以看到,put(K key, V value)可以分为三个步骤:

  1. 通过hash(Object key)方法计算key的哈希值。
  2. 通过putVal(hash(key), key, value, false, true)方法实现功能。
  3. 返回putVal方法返回的结果。

那么看看putVal方法的源码是如何实现的?

/**
 * Map.put和其他相关方法的实现需要的方法
 * 
 * @param hash 指定参数key的哈希值
 * @param key 指定参数key
 * @param value 指定参数value
 * @param onlyIfAbsent 如果为true,即使指定参数key在map中已经存在,也不会替换value
 * @param evict 如果为false,数组table在创建模式中
 * @return 如果value被替换,则返回旧的value,否则返回null。当然,可能key对应的value就是null。
 */
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,boolean evict) {
    Node<K,V>[] tab; Node<K,V> p; int n, i;
    //如果哈希表为空,调用resize()创建一个哈希表,并用变量n记录哈希表长度
    if ((tab = table) == null || (n = tab.length) == 0)
        n = (tab = resize()).length;
    //如果指定参数hash在表中没有对应的桶,即为没有碰撞
    if ((p = tab[i = (n - 1) & hash]) == null)
        //直接将键值对插入到map中即可
        tab[i] = newNode(hash, key, value, null);
    else {
        Node<K,V> e; K k;
        //如果碰撞了,且桶中的第一个节点就匹配了
        if (p.hash == hash &&
            ((k = p.key) == key || (key != null && key.equals(k))))
            //将桶中的第一个节点记录起来
            e = p;
        //如果桶中的第一个节点没有匹配上,且桶内为红黑树结构,则调用红黑树对应的方法插入键值对
        else if (p instanceof TreeNode)
            e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
        //不是红黑树结构,那么就肯定是链式结构
        else {
            //遍历链式结构
            for (int binCount = 0; ; ++binCount) {
                //如果到了链表尾部
                if ((e = p.next) == null) {
                    //在链表尾部插入键值对
                    p.next = newNode(hash, key, value, null);
                    //如果链的长度大于TREEIFY_THRESHOLD这个临界值,则把链变为红黑树
                    if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                        treeifyBin(tab, hash);
                    //跳出循环
                    break;
                }
                //如果找到了重复的key,判断链表中结点的key值与插入的元素的key值是否相等,如果相等,跳出循环
                if (e.hash == hash &&
                    ((k = e.key) == key || (key != null && key.equals(k))))
                    break;
                //用于遍历桶中的链表,与前面的e = p.next组合,可以遍历链表
                p = e;
            }
        }
        //如果key映射的节点不为null
        if (e != null) { // existing mapping for key
            //记录节点的vlaue
            V oldValue = e.value;
            //如果onlyIfAbsent为false,或者oldValue为null
            if (!onlyIfAbsent || oldValue == null)
                //替换value
                e.value = value;
            //访问后回调
            afterNodeAccess(e);
            //返回节点的旧值
            return oldValue;
        }
    }
    //结构型修改次数+1
    ++modCount;
    //判断是否需要扩容
    if (++size > threshold)
        resize();
    //插入后回调
    afterNodeInsertion(evict);
    return null;
}

HashMap扩容机制原理

capacity 即容量,默认16。
loadFactor 加载因子,默认是0.75
threshold 阈值。阈值=容量*加载因子。默认12。当元素数量超过阈值时便会触发扩容。
  • 一般情况下,当元素数量超过阈值时便会触发扩容(调用resize()方法)。
  • 每次扩容的容量都是之前容量的2倍。
  • 扩展后Node对象的位置要么在原位置,要么移动到原偏移量两倍的位置。

这里我们以JDK1.8的扩容为例:

HashMap的容量变化通常存在以下几种情况:

  1. 空参数的构造函数:实例化的HashMap默认内部数组是null,即没有实例化。第一次调用put方法时,则会开始第一次初始化扩容,长度为16
  2. 有参构造函数:用于指定容量。会根据指定的正整数找到不小于指定容量的2的幂数,将这个数设置赋值给阈值(threshold)。第一次调用put方法时,会将阈值赋值给容量,然后让 阈值 = 容量 x 加载因子 。(因此并不是我们手动指定了容量就一定不会触发扩容,超过阈值后一样会扩容!!)
  3. 如果不是第一次扩容,则容量变为原来的2倍,阈值也变为原来的2倍。(容量和阈值都变为原来的2倍时,加载因子0.75不变)

此外还有几个点需要注意:

  • 首次put时,先会触发扩容(算是初始化),然后存入数据,然后判断是否需要扩容;可见首次扩容可能会调用两次resize()方法。
  • 不是首次put,则不再初始化,直接存入数据,然后判断是否需要扩容;

扩容时,要扩大空间,为了使hash散列均匀分布,原有部分元素的位置会发生移位。

JDK7的元素迁移

JDK7中,HashMap的内部数据保存的都是链表。因此逻辑相对简单:在准备好新的数组后,map会遍历数组的每个“桶”,然后遍历桶中的每个Entity,重新计算其hash值(也有可能不计算),找到新数组中的对应位置,以头插法插入新的链表。

这里有几个注意点:

  • 是否要重新计算hash值的条件这里不深入讨论,读者可自行查阅源码。因为是头插法,因此新旧链表的元素位置会发生转置现象。
  • 元素迁移的过程中在多线程情境下有可能会触发死循环(无限进行链表反转)。

JDK1.8的元素迁移

JDK1.8则因为巧妙的设计,性能有了大大的提升:由于数组的容量是以2的幂次方扩容的,那么一个Entity在扩容时,新的位置要么在原位置,要么在原长度+原位置的位置。原因如下图:

InterviewForJavaByThreeQuestionsADay12-4.webp

数组长度变为原来的2倍,表现在二进制上就是多了一个高位参与数组下标确定。此时,一个元素通过hash转换坐标的方法计算后,恰好出现一个现象:最高位是0则坐标不变,最高位是1则坐标变为“10000+原坐标”,即“原长度+原坐标”。如下图:

InterviewForJavaByThreeQuestionsADay12-5.webp

因此,在扩容时,不需要重新计算元素的hash了,只需要判断最高位是1还是0就好了。

JDK8的HashMap还有以下细节需要注意:

  • JDK8在迁移元素时是正序的,不会出现链表转置的发生。
  • 如果某个桶内的元素超过8个,则会将链表转化成红黑树,加快数据查询效率。

HashMap在JDK1.8都做了哪些优化?

不同点 JDK 1.7 JDK 1.8
存储结构 数组 + 链表 数组 + 链表 + 红黑树
初始化方式 单独函数:inflateTable() 直接集成到了扩容函数resize()中
hash值计算方式 扰动处理 = 9次扰动 = 4次位运算 + 5次异或运算 扰动处理 = 2次扰动 = 1次位运算 + 1次异或运算
存放数据的规则 无冲突时,存放数组;冲突时,存放链表 无冲突时,存放数组;冲突 & 链表长度 < 8:存放单链表;冲突 & 链表长度 > 8:树化并存放红黑树
插入数据方式 头插法(先讲原位置的数据移到后1位,再插入数据到该位置) 尾插法(直接插入到链表尾部/红黑树)
扩容后存储位置的计算方式 全部按照原来方法进行计算(即hashCode ->> 扰动函数 ->> (h&length-1)) 按照扩容后的规律计算(即扩容后的位置=原位置 or 原位置 + 旧容量)
  • 数组+链表改成了数组+链表或红黑树;

防止发生hash冲突,链表长度过长,将时间复杂度由O(n)降为O(logn);

  • 链表的插入方式从头插法改成了尾插法,简单说就是插入时,如果数组位置上已经有元素,1.7将新元素放到数组中,新节点插入到链表头部,原始节点后移;而JDK1.8会遍历链表,将元素放置到链表的最后;

因为1.7头插法扩容时,头插法可能会导致链表发生反转,多线程环境下会产生环(死循环);

InterviewForJavaByThreeQuestionsADay12-6.webp

这个过程为,先将A复制到新的hash表中,然后接着复制B到链头(A的前边:B.next=A),本来B.next=null,到此也就结束了(跟线程二一样的过程),但是,由于线程二扩容的原因,将B.next=A,所以,这里继续复制A,让A.next=B,由此,环形链表出现:B.next=A; A.next=B

使用头插会改变链表的上的顺序,但是如果使用尾插,在扩容时会保持链表元素原本的顺序,就不会出现链表成环的问题了。

就是说原本是A->B,在扩容后那个链表还是A->B。

  • 扩容的时候1.7需要对原数组中的元素进行重新hash定位在新数组的位置,1.8采用更简单的判断逻辑,位置不变或索引+旧容量大小;
  • 在插入时,1.7先判断是否需要扩容,再插入,1.8先进行插入,插入完成再判断是否需要扩容;

链表红黑树如何互相转换?阈值多少?

链表转红黑树的阈值为:8

红黑树转链表的阈值为:6

经过计算,在hash函数设计合理的情况下,发生hash碰撞8次的几率为百万分之6,从概率上讲,阈值为8足够用;至于为什么红黑树转回来链表的条件阈值是6而不是7或9?因为如果hash碰撞次数在8附近徘徊,可能会频繁发生链表和红黑树的互相转化操作,为了预防这种情况的发生。

面试题2:HashMap是线程安全的吗?

不是线程安全的,在多线程环境下,

  • JDK1.7:会产生死循环、数据丢失、数据覆盖的问题;
    J* DK1.8:中会有数据覆盖的问题。

以1.8为例,当A线程判断index位置为空后正好挂起,B线程开始往index位置写入数据时,这时A线程恢复,执行写入操作,这样A或B数据就被覆盖了。

你是如何解决这个线程不安全问题的?

在Java中有HashTableSynchronizedMapConcurrentHashMap这三种是实现线程安全的Map。

HashTable:是直接在操作方法上加synchronized关键字,锁住整个数组,粒度比较大;

SynchronizedMap:是使用Collections集合工具的内部类,通过传入Map封装出一个SynchronizedMap对象,内部定义了一个对象锁,方法内通过对象锁实现;

ConcurrentHashMap:使用分段锁(CAS + synchronized相结合),降低了锁粒度,大大提高并发度。

ConcurrentHashMap 底层具体实现知道吗?和Hashtable有哪些区别?

ConcurrentHashMap 和 Hashtable 的区别主要体现在实现线程安全的方式上不同。

底层数据结构: JDK1.7的 ConcurrentHashMap 底层采用 分段的数组+链表 实现,JDK1.8 采用的数据结构跟HashMap1.8的结构一样,数组+链表/红黑二叉树。Hashtable 和 JDK1.8 之前的 HashMap 的底层数据结构类似都是采用 数组+链表 的形式,数组是 HashMap 的主体,链表则是主要为了解决哈希冲突而存在的;

实现线程安全的方式

  1. 在JDK1.7的时候,ConcurrentHashMap(分段锁) 对整个桶数组进行了分割分段(Segment),每一把锁只锁容器其中一部分数据,多线程访问容器里不同数据段的数据,就不会存在锁竞争,提高并发访问率。(默认分配16个Segment,比Hashtable效率提高16倍。) 到了 JDK1.8 的时候已经摒弃了Segment的概念,而是直接用 Node 数组+链表+红黑树的数据结构来实现,并发控制使用 synchronized 和 CAS 来操作。(JDK1.6以后 对 synchronized锁做了很多优化) 整个看起来就像是优化过且线程安全的 HashMap,虽然在JDK1.8中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本;
  2. Hashtable(同一把锁) :使用 synchronized 来保证线程安全,效率非常低下。当一个线程访问同步方法时,其他线程也访问同步方法,可能会进入阻塞或轮询状态,如使用 put 添加元素,另一个线程不能使用 put 添加元素,也不能使用 get,竞争会越来越激烈效率越低。

HashTable:

InterviewForJavaByThreeQuestionsADay12-7.webp

JDK1.7的ConcurrentHashMap:

InterviewForJavaByThreeQuestionsADay12-8.webp

JDK1.8的ConcurrentHashMap(TreeBin: 红黑二叉树节点 Node: 链表节点):

InterviewForJavaByThreeQuestionsADay12-9.webp

ConcurrentHashMap 底层具体实现

JDK7:

首先将数据分为一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据时,其他段的数据也能被其他线程访问。

在JDK1.7中,ConcurrentHashMap采用Segment + HashEntry的方式进行实现,结构如下:

一个 ConcurrentHashMap 里包含一个 Segment 数组。Segment 的结构和HashMap类似,是一种数组和链表结构,一个 Segment 包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素,每个 Segment 守护着一个HashEntry数组里的元素,当对 HashEntry 数组的数据进行修改时,必须首先获得对应的 Segment的锁。

  • 该类包含两个静态内部类 HashEntry 和 Segment ;前者用来封装映射表的键值对,后者用来充当锁的角色;
  • Segment 是一种可重入的锁 ReentrantLock,每个 Segment 守护一个HashEntry 数组里得元素,当对 HashEntry 数组的数据进行修改时,必须首先获得对应的 Segment 锁。

InterviewForJavaByThreeQuestionsADay12-10.webp

JDK8:

在JDK1.8中,放弃了Segment臃肿的设计,取而代之的是采用Node + CAS + Synchronized来保证并发安全进行实现,synchronized只锁定当前链表或红黑二叉树的首节点,这样只要hash不冲突,就不会产生并发,效率又提升N倍。

InterviewForJavaByThreeQuestionsADay12-11.webp

你能给我说说ConcurrentHashMap分段锁的实现原理吗?

ConcurrentHashMap中put()是线程安全的。但是很多时候, 由于业务需求, 需要先 get() 操作再 put() 操作,这2个操作无法保证原子性,这样就会产生线程安全问题了。大家在开发中一定要注意。

ConcurrentHashMap的结构示意图如下:

InterviewForJavaByThreeQuestionsADay12-8.webp

在进行数据的定位时,会首先找到 segment, 然后在 segment 中定位 bucket。如果多线程操作同一个 segment, 就会触发 segment 的锁 ReentrantLock, 这就是分段锁的基本实现原理。

源码分析

HashEntry

HashEntry 是 ConcurrentHashMap 的基础单元(节点),是实际数据的载体。

static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }

        /**
         * Sets next field with volatile write semantics.  (See above
         * about use of putOrderedObject.)
         */
        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }

        // Unsafe mechanics
        static final sun.misc.Unsafe UNSAFE;
        static final long nextOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = HashEntry.class;
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
}

Segment

Segment 继承 ReentrantLock 锁,用于存放数组 HashEntry[]。在这里可以看出, 无论1.7还是1.8版本, ConcurrentHashMap 底层并不是对 HashMap 的扩展, 而是同样从底层基于数组+链表进行功能实现。

static final class Segment<K,V> extends ReentrantLock implements Serializable {

        private static final long serialVersionUID = 2249069246763182397L;

        static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

        // 数据节点存储在这里(基础单元是数组)
        transient volatile HashEntry<K,V>[] table;

        transient int count;

        transient int modCount;

        transient int threshold;

        final float loadFactor;

        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }
        // 具体方法不在这里讨论...
}

构造方法

InterviewForJavaByThreeQuestionsADay12-12.webp

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        // 对于concurrencyLevel的理解, 可以理解为segments数组的长度,即理论上多线程并发数(分段锁), 默认16
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        // 默认concurrencyLevel = 16, 所以ssize在默认情况下也是16,此时 sshift = 4
        // ssize = 2^sshift 即 ssize = 1 << sshift
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        // 段偏移量,32是因为hash是int值,int值32位,默认值情况下此时segmentShift = 28
        this.segmentShift = 32 - sshift;
        // 散列算法的掩码,默认值情况下segmentMask = 15, 定位segment的时候需要根据segment[]长度取模, 即hash(key)&(ssize - 1)
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        // 计算每个segment中table的容量, 初始容量=16, 并发数=16, 则segment中的Entry[]长度为1。
        int c = initialCapacity / ssize;
        // 处理无法整除的情况,取上限
        if (c * ssize < initialCapacity)
            ++c;
        // MIN_SEGMENT_TABLE_CAPACITY默认时2,cap是2的n次方
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        // 创建segments并初始化第一个segment数组,其余的segment延迟初始化
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        // 默认并发数=16
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
}

由图和源码可知,当用默认构造函数时,最大并发数是16,即最大允许16个线程同步写操作,且无法扩展。所以如果我们的场景数据量比较大时,应该设置合适的并发数,避免频繁锁冲突。

put()操作

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        // 根据key的hash再次进行hash运算
        int hash = hash(key.hashCode());
        // 基于hash定位segment数组的索引。
        // hash值是int值,32bits。segmentShift=28,无符号右移28位,剩下高4位,其余补0。
        // segmentMask=15,二进制低4位全部是1,所以j相当于hash右移后的低4位。
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        // 找到对应segment
            s = ensureSegment(j);
        // 将新节点插入segment中
        return s.put(key, hash, value, false);
}

InterviewForJavaByThreeQuestionsADay12-13.webp

找出对应segment,如果不存在就创建并初始化

@SuppressWarnings("unchecked")
    private Segment<K,V> ensureSegment(int k) {
        // 当前的segments数组
        final Segment<K,V>[] ss = this.segments;
        // 计算原始偏移量,在segments数组的位置
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        // 判断没有被初始化
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            // 获取第一个segment ss[0]作为原型
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length; // 容量
            float lf = proto.loadFactor; // 负载因子
            int threshold = (int)(cap * lf); // 阈值
            // 初始化ss[k] 内部的tab数组 // recheck
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            // 再次检查这个ss[k]  有没有被初始化
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                // 自旋。getObjectVolatile 保证了读的可见性,所以一旦有一个线程初始化了,那么就结束自旋
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
}

segment插入节点

上一步找到segment位置后计算节点在segment中的位置。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            // 是否获取锁,失败自旋获取锁(直到成功)
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value); // 失败了才会scanAndLockForPut
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                // 获取到bucket位置的第一个节点
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    // hash冲突
                    if (e != null) {
                        K k;
                        // key相等则覆盖
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        // 不相等则遍历链表
                        e = e.next;
                    }
                    else {
                        if (node != null)
                            // 将新节点插入链表作为表头
                            node.setNext(first);
                        else
                            // 创建新节点并插入表头
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        // 判断元素个数是否超过了阈值或者segment中数组的长度超过了MAXIMUM_CAPACITY,如果满足条件则rehash扩容!
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            // 扩容
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                // 解锁
                unlock();
            }
            return oldValue;
}

如果加锁失败则先走 scanAndLockForPut() 方法。

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            // 根据hash获取头结点
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
            // 尝试获取锁,成功就返回,失败就开始自旋
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {
                    // 如果头结点不存在
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    // 和头结点key相等
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        // 下一个节点 直到为null
                        e = e.next;
                }
                // 达到自旋的最大次数
                else if (++retries > MAX_SCAN_RETRIES) {
                    // lock()是阻塞方法。进入加锁方法,失败进入队列,阻塞当前线程
                    lock();
                    break;
                }
                // TODO (retries & 1) == 0 没理解
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    // 头结点变化,需要重新遍历,说明有新的节点加入或者移除
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
}

本站由 新·都在 使用 Stellar 创建。