CopyOnWriteArrayList
CopyOnWriteArrayList是ArrayList的线程安全版本,内部也是通过数组实现,每次对数组的修改都完全拷贝一份新的数组来修改,修改完了再替换掉老数组,这样保证了只阻塞写操作,不阻塞读操作,实现读写分离。
(1)CopyOnWriteArrayList使用ReentrantLock
重入锁加锁,保证线程安全;
(2)CopyOnWriteArrayList的写操作都要先拷贝一份新数组,在新数组中做修改,修改完了再用新数组替换老数组,所以空间复杂度是O(n),性能比较低下;
(3)CopyOnWriteArrayList的读操作支持随机访问,时间复杂度为O(1);
(4)CopyOnWriteArrayList采用读写分离的思想,读操作不加锁,写操作加锁,且写操作占用较大内存空间,所以适用于读多写少
的场合;
(5)CopyOnWriteArrayList只保证最终一致性,不保证实时一致性;
彩蛋
为什么CopyOnWriteArrayList没有size属性?
因为每次修改都是拷贝一份正好可以存储目标个数元素的数组,所以不需要size属性了,数组的长度就是集合的大小,而不像ArrayList数组的长度实际是要大于集合的大小的。
比如,add(E e)操作,先拷贝一份n+1个元素的数组,再把新元素放到新数组的最后一位,这时新数组的长度为len+1了,也就是集合的size了。
ConcurrentHashMap
putVal()
(1)如果桶数组未初始化,则初始化。通过CAS把sizeCtl原子更新为-1,则当前线程进入初始化;
(2)如果待插入的元素所在的桶为空,则尝试把此元素CAS插入到桶的第一个位置;
(3)如果正在扩容,则当前线程一起加入到扩容的过程中;
(4)如果待插入的元素所在的桶不为空且不在迁移元素,则锁住这个桶(分段锁);
(5)如果当前桶中元素以链表方式存储,则在链表中寻找该元素或者插入元素;
(6)如果当前桶中元素以红黑树方式存储,则在红黑树中寻找该元素或者插入元素;
(7)如果元素存在,则返回旧值;
(8)如果元素不存在,整个Map的元素个数加1,并检查是否需要扩容;
添加元素操作中使用的锁主要有(自旋锁 + CAS + synchronized + 分段锁)。
addCount() 判断是否需要扩容
每次添加元素后,元素数量加1,并判断是否达到扩容门槛,达到了则进行扩容或协助扩容。
(1)元素个数的存储方式类似于LongAdder类,存储在不同的段上,并且有一个baseCount。
- 优先更新baseCount
- 如果失败了再更新不同线程对应的段(通过线程的随机probe来计算对应的段),减少不同线程同时更新size时的冲突;
- 如果还是更新失败,则说明发生了冲突,需要扩容counterCells
(2)计算元素个数时把这些段的值及baseCount相加算出总的元素个数;
(3)接下来会根据元素个数判断是否需要扩容,扩容门槛默认为容量的0.75倍;
(4)扩容时sizeCtl高位存储扩容邮戳(resizeStamp),低位存储扩容线程数加1(1+nThreads);
helpTransfer() 协助扩容(迁移元素)
线程添加元素时发现正在扩容且当前元素所在的桶元素已经迁移完成了,则协助迁移其它桶的元素。
transfer() 迁移元素
扩容时容量变为两倍,并把部分元素迁移到其它桶中。
(1)新桶数组大小是旧桶数组的两倍;
(2)迁移元素先从靠后的桶开始;
(3)迁移完成的桶会将Node里的hash标志位设置为MOVED
,标记该桶迁移完成;
(4)迁移时根据hash&n
是否等于0把桶中元素分化成两个链表或树
;
(5)低位链表(树)存储在原来的位置;
(6)高位链表(树)存储在原来的位置加n的位置;
(7)迁移元素时会利用synchronize锁住当前桶,也是分段锁的思想;
remove() 删除元素
删除元素跟添加元素一样,都是先找到元素所在的桶,然后采用分段锁的思想锁住整个桶,再进行操作。
(1)计算hash;
(2)如果所在的桶不存在,表示没有找到目标元素,返回;
(3)如果正在扩容,则协助扩容完成后再进行删除操作;
(4)如果是以链表形式存储的,则遍历整个链表查找元素,找到之后再删除;
(5)如果是以树形式存储的,则遍历树查找元素,找到之后再删除;
(6)如果是以树形式存储的,删除元素之后树较小,则退化成链表;
(7)如果确实删除了元素,则整个map元素个数减1,并返回旧值;
(8)如果没有删除元素,则返回null;
get() 获取元素
获取元素,根据目标key所在桶的第一个元素的不同采用不同的方式获取元素,关键点在于find()方法的重写。
(1)hash到元素所在的桶;
(2)如果桶中第一个元素就是该找的元素,直接返回;
(3)如果是树或者正在迁移元素,则调用各自Node子类的find()方法寻找元素;
(4)如果是链表,遍历整个链表寻找元素;
(5)获取元素没有加锁;
size() 获取元素个数
元素个数的存储也是采用分段的思想,获取元素个数时需要把所有段加起来。
(1)元素的个数依据不同的线程存在在不同的段里;(见addCounter()分析)
(2)计算CounterCell所有段及baseCount的数量之和;
(3)获取元素个数没有加锁;
sizeCtl变量
ConcurrentHashMap中没有threshold和loadFactor这两个字段,而是采用sizeCtl来控制
- 0,代表数组未初始化,且数组的初始容量为16
- 正数,如果数组未初始化,纪录数组的初始容量,如果已初始化,纪录的是数组的扩容阈值
- -1,说明正在初始化
- 扩容时sizeCtl高位存储扩容邮戳(resizeStamp),低位存储扩容线程数加1(1+nThreads)
总结
(1)ConcurrentHashMap是HashMap的线程安全版本;
(2)ConcurrentHashMap采用(数组 + 链表 + 红黑树)的结构存储元素;
(3)ConcurrentHashMap相比于同样线程安全的HashTable,效率要高很多;
(4)ConcurrentHashMap采用的锁有 synchronized,CAS,自旋锁,分段锁,volatile等;
(10)更新操作时如果正在进行扩容,当前线程协助扩容;
(11)更新操作会采用synchronized锁住当前桶的第一个元素,这是分段锁的思想;
(12)整个扩容过程都是通过CAS控制sizeCtl这个字段来进行的,这很关键;
(13)迁移完元素的桶会放置一个ForwardingNode节点,以标识该桶迁移完毕;
(14)元素个数的存储也是采用的分段思想,类似于LongAdder的实现;
(15)元素个数的更新会把不同的线程hash到不同的段上,减少资源争用;
(16)元素个数的更新如果还是出现多个线程同时更新一个段,则会扩容段(CounterCell);
(17)获取元素个数是把所有的段(包括baseCount和CounterCell)相加起来得到的;
(18)查询操作是不会加锁的,所以ConcurrentHashMap不是强一致性的;
(19)ConcurrentHashMap中不能存储key或value为null的元素;
ConcurrentSkipListMap
跳表是一个随机化的数据结构,实质就是一种可以进行二分查找的有序链表。
跳表在原有的有序链表上面增加了多级索引,通过索引来实现快速查找。
跳表不仅能提高搜索性能,同时也可以提高插入和删除操作的性能。
添加元素
总结起来,一共就是三大步:
(1)插入目标节点到数据节点链表中;
(2)建立竖直的down链表;
(3)建立横向的right链表;
Part I:找到目标节点的位置并插入
(1)这里的目标节点是数据节点,也就是最底层的那条链;
(2)寻找目标节点之前最近的一个索引对应的数据节点(数据节点都是在最底层的链表上);
(3)从这个数据节点开始往后遍历,直到找到目标节点应该插入的位置;
(4)如果这个位置有元素,就更新其值(onlyIfAbsent=false);
(5)如果这个位置没有元素,就把目标节点插入;
(6)至此,目标节点已经插入到最底层的数据节点链表中了;
Part II:随机决定是否需要建立索引及其层次,如果需要则建立自上而下的索引
(1)取个随机数rnd,计算(rnd & 0x80000001),即正偶数;
(2)如果等于0,进入创建索引的过程(只要正偶数才会等于0);
(3)计算 while(((rnd>>>=1)&1)!=0)
,决定层级数,默认level为1,从最低位的第二位开始,有几个连续的1则level就加几;
(4)建立竖直的down链表
Part III:建立横向的right链表
(1)从最高层级的头索引节点开始,向右遍历,找到目标索引节点的位置;
(2)如果当前层有目标索引,则把目标索引插入到这个位置,并把目标索引前一个索引向下移一个层级;
(3)如果当前层没有目标索引,则把目标索引位置前一个索引向下移一个层级;
(4)同样地,再向右遍历,寻找新的层级中目标索引的位置,回到第(2)步;
(5)依次循环找到所有层级目标索引的位置并把它们插入到横向的索引链表中;
删除元素
假如初始跳表如下图所示,我们要删除9这个元素。
(1)找到9这个数据节点;
(2)把9这个节点的value值设置为null;
(3)在9后面添加一个marker节点,标记9已经删除了;
(4)让8指向12;
(5)把索引节点与它前一个索引的right断开联系;
(6)跳表高度降级;
至于,为什么要有(2)(3)(4)这么多步骤呢,因为多线程下如果直接让8指向12,可以其它线程先一步在9和12间插入了一个元素10呢,这时候就不对了。
所以这里搞了三步来保证多线程下操作的正确性。
如果第(2)步失败了,则直接重试;
如果第(3)或(4)步失败了,因为第(2)步是成功的,则通过helpDelete()不断重试去删除;
其实helpDelete()里面也是不断地重试(3)和(4);
只有这三步都正确完成了,才能说明这个元素彻底被删除了。
CopyOnWriteArraySet
CopyOnWriteArraySet底层是使用CopyOnWriteArrayList
存储元素的,所以它并不是使用Map来存储元素的。
在添加元素时调用了CopyOnWriteArrayList的addIfAbsent()
方法来保证元素不重复。
ArrayBlockingQueue
ArrayBlockingQueue是java并发包下一个以数组实现的阻塞队列,它是线程安全的.
入队
(1)add(e)时如果队列满了则抛出异常;
(2)offer(e)时如果队列满了则返回false;
(3)put(e)时如果队列满了则使用notFull条件锁
等待;
(4)offer(e, timeout, unit)时如果队列满了则等待一段时间后如果队列依然满就返回false;
(5)利用放指针循环使用数组来存储元素;
出队
(1)remove()时如果队列为空则抛出异常;
(2)poll()时如果队列为空则返回null;
(3)take()时如果队列为空则阻塞等待在条件锁notEmpty
上;
(4)poll(timeout, unit)时如果队列为空则阻塞等待一段时间后如果还为空就返回null;
(5)利用取指针循环从数组中取元素;
总结
(1)ArrayBlockingQueue不需要扩容,因为是初始化时指定容量,并循环利用数组;
(2)ArrayBlockingQueue利用takeIndex和putIndex循环利用数组;
(3)入队和出队各定义了四组方法为满足不同的用途;
(4)利用重入锁和两个条件锁保证并发安全;
LinkedBlockingQueue
LinkedBlockingQueue是java并发包下一个以单链表实现的阻塞队列,它是线程安全的。
入队 put()
(1)使用putLock加锁;
(2)如果队列满了就阻塞在notFull条件上;
(3)否则就入队;
(4)如果入队后元素数量小于容量,唤醒其它阻塞在notFull条件上的线程;
(5)释放锁;
(6)如果放元素之前队列长度为0,就唤醒notEmpty条件;
出队 take()
(1)使用takeLock加锁;
(2)如果队列空了就阻塞在notEmpty条件上;
(3)否则就出队;
(4)如果出队前元素数量大于1,唤醒其它阻塞在notEmpty条件上的线程;
(5)释放锁;
(6)如果取元素之前队列长度等于容量,就唤醒notFull条件;
总结
(1)LinkedBlockingQueue采用单链表的形式实现;
(2)LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞;
(3)LinkedBlockingQueue是有界队列,不传入容量时默认为最大int值;
1、LinkedBlockingQueue与ArrayBlockingQueue对比?
a)后者入队出队采用一把锁,导致入队出队相互阻塞,效率低下;
b)前才入队出队采用两把锁,入队出队互不干扰,效率较高;
c)二者都是有界队列,如果长度相等且出队速度跟不上入队速度,都会导致大量线程阻塞;
d)前者如果初始化不传入初始容量,则使用最大int值,如果出队速度跟不上入队速度,会导致队列特别长,占用大量内存;
SynchronousQueue
-
SynchronousQueue是java并发包下无缓冲阻塞队列,它用来在两个线程之间移交元素。
-
有两种模式,公平模式(Queue)与非公平模式(Stack)
-
queue和stack用于存放阻塞的线程,其上节点有三种模式生产者、消费者、正在匹配中
-
线程会先cas自旋,自旋超过一定次数阻塞。消费者与生产者互相匹配,queue先来先匹配,stack后来先匹配。
PriorityBlockingQueue
PriorityBlockingQueue是java并发包下的优先级阻塞队列。
(1)PriorityBlockingQueue整个入队出队的过程与PriorityQueue基本是保持一致的,采用数组实现的二叉堆;
(2)PriorityBlockingQueue使用一个重入锁+一个notEmpty条件锁控制并发安全;
(3)PriorityBlockingQueue扩容时使用一个单独变量的CAS操作来控制只有一个线程进行扩容;
LinkedTransferQueue
LinkedTransferQueue是LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合体,它综合了这三者的方法,并且提供了更加高效的实现方式。
LinkedTransferQueue使用了一个叫做 dual data structure
的数据结构,或者叫做 dual queue
,译为双重数据结构或者双重队列。
双重队列是什么意思呢?
放取元素使用同一个队列,队列中的节点具有两种模式,一种是数据节点,一种是非数据节点。
放元素时先跟队列头节点对比,如果头节点是非数据节点,就让他们匹配,如果头节点是数据节点,就生成一个数据节点放在队列尾端(入队)。
取元素时也是先跟队列头节点对比,如果头节点是数据节点,就让他们匹配,如果头节点是非数据节点,就生成一个非数据节点放在队列尾端(入队)。
不管是放元素还是取元素,都先跟头节点对比,如果二者模式不一样就匹配它们,如果二者模式一样,就入队。
(1)LinkedTransferQueue可以看作LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合体;
(2)LinkedTransferQueue的实现方式是使用一种叫做 双重队列
的数据结构;
(3)不管是取元素还是放元素都会入队;
(4)先尝试跟头节点比较,如果二者模式不一样,就匹配它们,组成CP,然后返回对方的值;
(5)如果二者模式一样,就入队,并自旋或阻塞等待被唤醒;
(6)至于是否入队及阻塞有四种模式,NOW、ASYNC、SYNC、TIMED;
(7)LinkedTransferQueue全程都没有使用synchronized、重入锁等比较重的锁,基本是通过 自旋+CAS 实现;
(8)对于入队之后,先自旋一定次数后再调用LockSupport.park()或LockSupport.parkNanos阻塞;
LinkedTransferQueue与SynchronousQueue(公平模式)有什么异同呢?
(1)在java8中两者的实现方式基本一致,都是使用的双重队列;
(2)前者完全实现了后者,但比后者更灵活;
(3)后者不管放元素还是取元素,如果没有可匹配的元素,所在的线程都会阻塞;
(4)前者可以自己控制放元素是否需要阻塞线程,比如使用四个添加元素的方法就不会阻塞线程,只入队元素,使用transfer()会阻塞线程;
(5)取元素两者基本一样,都会阻塞等待有新的元素进入被匹配到;
ConcurrentLinkedQueue
(1)ConcurrentLinkedQueue不是阻塞队列;
(2)ConcurrentLinkedQueue不能用在线程池中;
(3)ConcurrentLinkedQueue使用(CAS+自旋)更新头尾节点控制出队入队操作;
DelayQueue
DelayQueue是java并发包下的延时阻塞队列,常用于实现定时任务。
DelayQueue实现了BlockingQueue
,所以它是一个阻塞队列
。
入队
因为DelayQueue是阻塞队列,且优先级队列是无界的,所以入队不会阻塞不会超时,因此它的四个入队方法是一样的。
(1)加锁;
(2)添加元素到优先级队列中;
(3)如果添加的元素是堆顶元素,就把leader置为空,并唤醒等待在条件available上的线程;
(4)解锁;
出队
因为DelayQueue是阻塞队列,所以它的出队有四个不同的方法,有抛出异常的,有阻塞的,有不阻塞的,有超时的。
**poll()**方法比较简单:
(1)加锁;
(2)检查第一个元素,如果为空或者还没到期,就返回null;
(3)如果第一个元素到期了就调用优先级队列的poll()弹出第一个元素;
(4)解锁。
**take()**方法稍微要复杂一些:
(1)加锁;
(2)判断堆顶元素是否为空,为空的话直接阻塞等待;
(3)判断堆顶元素是否到期,到期了直接调用优先级队列的poll()弹出元素;
(4)没到期,再判断前面是否有其它线程在等待,有则直接等待;
(5)前面没有其它线程在等待,则把自己当作第一个线程等待delay时间后唤醒,再尝试获取元素;
(6)获取到元素之后再唤醒下一个等待的线程;
(7)解锁;
即如果堆顶的任务到时间了,就出队;如果堆顶的任务还没到时间,就看它还有多久到时间,利用条件锁等待这段时间,待时间到了后出队;
总结
(1)DelayQueue是阻塞队列;
(2)DelayQueue内部存储结构使用优先级队列;
(3)DelayQueue使用重入锁和条件锁来控制并发安全;
(4)DelayQueue常用于定时任务;
java中的线程池实现定时任务是直接用的DelayQueue吗?
当然不是,ScheduledThreadPoolExecutor中使用的是它自己定义的内部类DelayedWorkQueue,其实里面的实现逻辑基本都是一样的,只不过DelayedWorkQueue里面没有使用现成的PriorityQueue,而是使用数组又实现了一遍优先级队列,本质上没有什么区别。
Comments | 0 条评论