查看: 110|回复: 0

并发队列之ConcurrentLinkedQueue

[复制链接]
发表于 2020-2-16 05:55:29 | 显示全部楼层 |阅读模式
  本来想着直接说线程池的,不过在说线程池之前,我们必须要知道并发安全队列;因为一般情况下线程池中的线程数目是一定的,肯定不会超过某个阈值,那么当任务太多了的时间,我们必须把多余的任务保存到并发安全队列中,当线程池中的线程空闲下来了,就会到并发安全队列中拿任务;
  那么什么是并发安全队列呢?实在可以简单看作是一个链表,然后我们先办法去存取节点;总的来说,并发安全队列分为两种,一种是壅闭的,一种是非壅闭的,前者是用锁来实现的,后者用CAS实现的;

一.简单介绍ConcurrentLinkedQueue
  这个队列用法没什么好说的,就雷同LinkedList的用法,怎么对一个链表继承增编削查,不多说,我们就说一下其中几个关键的方法;
  首先,这个队列是一个线程安全的无界非壅闭队列,实在就是一个单向链表,无界的意思就是没有限制最大长度,非壅闭表示用CAS实现入队和出队操作,我们打开这个类就可以知道,有一个内部类Node,其中重要的属性如下所示:
  1. //用于存放节点的值volatile E item;//指向下一个节点volatile Node next;//这里也是用的是UNSAFE类,前面说过了,这个类直接提供CAS操作private static final sun.misc.Unsafe UNSAFE;//item字段的偏移量private static final long itemOffset;//next的偏移量private static final long nextOffset;
复制代码


  然后ConcurrentLinkedQueue中几个重要的属性,好像也没什么重要的,就保存了头节点和尾节点,留意,默认情况下头节点和尾节点都是哨兵节点,也就是一个存null的Node节点
  1. //存放链表的头节点private transient volatile Node head;//存放链表的尾节点private transient volatile Node tail;//UNSAFE对象private static final sun.misc.Unsafe UNSAFE;//head字段的偏移量private static final long headOffset;//tail字段偏移量private static final long tailOffset;
复制代码





  下面我们直接看一些重要方法吧!慢慢分析其中的算法才是关键的

二.offer方法
  这个方法的作用就是在队列末端添加一个节点,假如通报的参数是null,就抛出空指针异常,否则由于该队列是无界队列,该方法会一直返回true,而且该方法使用CAS算法实现的,以是不会壅闭线程;
  1. //队列末端添加一个节点public boolean offer(E e) {    //假如e为空,那么抛出空指针异常    checkNotNull(e);    //将传进来的元素封装成一个节点,Node的构造器中调用UNSAFE.putObject(this, itemOffset, item)把e赋值给节点中的item    final Node newNode = new Node(e);    //[b][1]    [/b]//这里的for循环从最后的节点开始    for (Node t = tail, p = t;;) {      Node q = p.next;      //[b][2][/b]假如q为null,说明p就是最后的节点了        if (q == null) {            //[b][3][/b]CAS更新:假如p节点的下一个节点是null,就把写个节点更新为newNode            if (p.casNext(null, newNode)) {                //[b][4][/b]CAS乐成,但是这时p==t,以是不会进入到这里的if内里,直接返回true                //那么什么时间会走到这内里来呢?实在是要有另外一个线程也在调用offer方法的时间,会进入到这内里来                if (p != t)                     casTail(t, newNode);                  return true;            }        }        else if (p == q) //[b][5][/b]                        p = (t != (t = tail)) ? t : head;        else //[b][6][/b]            p = (p != t && t != (t = tail)) ? t : q;    }}
复制代码

  
  上面实行到[3]的时间,由于头节点和尾节点默认都是指向哨兵节点的,由于这个时间p的下一个节点为null,以是当前线程A实行CAS会乐成,下图所示;



  假如此时尚有一个线程B也来尝试[3]中CAS,由于此时p节点的下一个节点不是null了,于是线程B会跳到[1]出进行第二次循环,然后会到[6]中,由于p和t此时是相等的,以是这里是false,即p=q,下图所示:



  然后线程B又会跳到[1]处进行第三次循环,由于实行了Node q = p.next,以是此时q指向最后的null,就到了[3]处进行CAS,这次是可以乐成的,乐成之后如下图所示:


  
  这个时间因为p!=t,以是可以进入到[4],这里又会进行一个CAS:假如tail和t指向的节点一样,那么就将tail指向新添加的节点,如图所示,这个时间线程B也就实行完了;


  
  实在尚有[5]没有走到,这个是在poll操作之后才实行的,我们先跳过,等说完poll方法之后再回头看看;另外说一下,add方法实在就是调用的是offer方法,就不多说了;



三.poll方法
  这个方法是获取头部的这个节点,假如队列为空则返回null;
  1. public E poll() {    //这里实在就是一个goto的标记,用于跳出for循环    restartFromHead:    //[b][1][/b]    for (;;) {        for (Node h = head, p = h, q;;) {            E item = p.item;            //[b][2][/b]假如当前节点中存的值不为空,则CAS设置为null            if (item != null && p.casItem(item, null)) {                //[b][3][/b]CAS乐成就更新头节点的位置                if (p != h)                     updateHead(h, ((q = p.next) != null) ? q : p);                return item;            }            //[b][4][/b]当前队列为空,就返回null            else if ((q = p.next) == null) {                updateHead(h, p);                return null;            }            //[b][5][/b]当前节点和下一个节点一样,说明节点自引用,则重新找头节点            else if (p == q)                continue restartFromHead;            //[b][6][/b]            else                p = q;        }    }}final void updateHead(Node h, Node p) {    if (h != p && casHead(h, p))        h.lazySetNext(h);}
复制代码
  
  分为几种情况,第一种情况是线程A调用poll方法的时间,发现队列是空的,即头节点和尾节点都指向哨兵节点,就会直接到[4],返回null
  第二种情况,线程A实行到了[4],此时有一个线程却调用offer方法添加了一个节点,下图所示,那么此时线程A就不会走[4]了,[5]也不满足,于是会到[6]这里来,然后线程A又会跳到[1]处进行循环,此时p指向的节点中item不为null,以是会到[2]中;


  
  到了[2]中将p指向的节点中item用CAS设置为null,然后就到了[3],下面第一个图,由于p!=h,q=null,以是最后调用的是updateHead(h,p),这方法:假如头节点和h指向的是一样的,就将头节点指向p,我们还能看到updateHead方法中h.lazySetNext(h)表示h的下一个节点指向自己,下面图二


  到了这里还没完,还记不记得offer方法中有一个地方的代码没有实行的啊!就是这种情况,尾节点自己引用自己,我们再调用offer会怎么样呢?
  回到offer方法,先会到[1],然后q指向自己这个哨兵节点(留意,此时虽然p指向的节点中存的是null,但是p!=null},于是再到[5],此时的图如下左图所示;此时由于t==tail,以是p=head;


  再在offer方法循环一次,此时q指向null,下面左图所示,然后就可以进入[2]中进行CAS,CAS乐成,因为此时p!=t,以是还要进行CAS将tail指向新节点,下面右图所示,可以让GC回收那个垃圾!
妈耶,这里比较绕!哈哈哈哈哈哈哈哈哈哈哈



四.peek方法
  这个方法的作用就是获取队列头部的元素,只获取不移除,留意这个方法和上面的poll方法的区别啊!
  1. public E peek() {    //[1]goto标志    restartFromHead:    for (;;) {        for (Node h = head, p = h, q;;) {            //[2]            E item = p.item;            //[3]            if (item != null || (q = p.next) == null) {                updateHead(h, p);                return item;            }            //[4]            else if (p == q)                continue restartFromHead;            else//[5]                p = q;        }    }}
复制代码
  final void updateHead(Node h, Node p) {
            if (h != p && casHead(h, p))
                    h.lazySetNext(h);
  }
复制代码

  
  假如队列中为空的时间,走到[3]的时间,就会如下图所示,由于h==p,以是updateHead方法啥也不做,然后返回就返回item为null



  假如队列不为空,那么如下左图所示,此时进入循环内不满足条件,会到[5]这里,将p指向q,然后再进行一次循环到[3],将q指向p的后一个节点,下面右图所示;



  然后调用updateHead方法,用CAS将头节点指向p这里,然后将h自己指向自己,下图所示,最后返回item



五.总结
  实在尚有几个方法没说,但是感觉比较容易就不浪费篇幅了,有兴趣的可以看看:size方法用于盘算队列中节点的数目,可是由于没有加锁,在并发的条件下不正确;remove方法删除某个节点,实在就是遍历然后用equals方法比较item是不是一样,只不过假如存在多个符合条件的节点只删除第一个,然后返回true,否则返回false;contains方法判断队列中是否包罗指定item的节点,也就是遍历,很容易;
  最麻烦的就是offer方法和poll方法,offer方法是在队列的最背面添加节点,而poll是获取头节点,并且删除第一个真正的队列节点(留意,节点分为两种,一种是哨兵节点,一种是真正的存了数据的节点啊),还简单的说了一下poll方法和peek方法的区别,后者只是获取,而不删除啊!用下面这个图资助记忆一下;

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?用户注册

x

相关技术服务需求,请联系管理员和客服QQ:2753533861或QQ:619920289
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

帖子推荐:
客服咨询

QQ:2753533861

服务时间 9:00-22:00

快速回复 返回顶部 返回列表