ConcurrentLinkedQueue的源码解析(基于JDK1.8)

ConcurrentLinkedQueue的定义

ConcurrentLinkedQueue是 非阻塞的单端队列,其是一个通过链表实现的并发安全的队列。是java中并发环境下性能最好的队列,它是使用非阻塞算法(CAS)来实现线程安全的。它采用先进先出的规则对节点进行排序,当我们添加一个元素时,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。

ConcurrentLinkerQueue 不变式

在后面的源码分析中,我们将会看到队列里有时会处于不一致状态,为此,ConcurrentLinkedQueue使用三个不变式(基本不变式,head的不变式和tail的不变式),来约束队列中方法的执行。通过这三个不变式来维护非阻塞算法的正确性。

基本不变式

在执行方法之前和之后,队列必须要保持的不变式;
当入队插入节点之后,队列中有一个next域为null的(最后)节点
从head开始遍历队列,可以访问所有item域不为null的节点。

head的不变式和可变式

不变式(invariants)
1.在执行方法之前和之后,head必须保持的不变式;
2. 所有"活着"的节点(只未删除节点)都能从head通过调用succ()方法遍历可达。
3.head不能为null。
可变式(Non-invariants)
1.head节点的item域可能为null,也可能不为null。
2.允许tail之后与head,也就是调用succ()方法,从head不可达tail。

tail的不变式和可变式

不变式

  1. tail 节点通过succ()方法一定到达队列中的最后一个节点(node.next=null)
  2. tail 不能为null
    可变式
  3. tail的item可能是null,也可能不是null。
  4. 允许tail滞后于head,也就是调用succ()方法,从head不可达tail
  5. tail的next指针可以指向自身。

内部节点Node

 private static class Node<E> {
 		//存储的数据
        volatile E item;
		//下一个节点引用
        volatile Node<E> next;

   		//构造一个node节点,itemOffset 偏移量设置,一个引用值
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }
        boolean casItem(E cmp, E val) {
		    //原子性的更新item值
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        void lazySetNext(Node<E> val) {
			/*
				调用这个方法和putObject差不多,只是这个方法设置后对应的值的可见性不一定得到保证,这个方法能起到作用,通常是作用在 volatile field上,也就是说,下面的参数val 是被volatile修饰
			*/
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }
        //原子性的更新 nextOffset 上的值
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;
		
    }

head头节点

    private transient volatile Node<E> head;

tail尾节点

private transient volatile Node<E> tail;

节点的说明

1.有效节点:从head向后遍历可达的节点当中,item域不为null的节点
2.无效节点:从head向后遍历可达的节点当中,item域为null的节点
3.已删除节点:从head向后遍历不可达的节点
4.哨兵节点:链接到自身的节点(哨兵节点同样是已删除节点)
5.头节点:队列中第一个有效节点
6.尾节点:队列中next域为null的节点(可以是无效节点)
如下图所示:

在这里插入图片描述

常用方法解析

无参构造方法

    public ConcurrentLinkedQueue() {
		/**
			默认会构造一个dummy节点,dummy的存在是防止一些特殊复杂代码的存在
		*/
        head = tail = new Node<E>(null);
    }

其实就是构造了一个node的item为null的节点,然后head和tail指向这个节点,如下图所示:
在这里插入图片描述

查询后继节点方法 succ()

// 获取p的后继节点,若p.next=p(updateHead 操作导致的),则说明p已经fall off queue,需要jump到head
final Node<E> succ(Node<E> p){
    Node<E> next = p.next;
    return (p == next)? head : next;
}

获取一个节点的后继节点不是node.next吗?在特殊的情况,就是tail指向一个哨兵节点(node.next=node);代码的注释中我提到的是updateHead导致的,那我们接着来看看updateHead方法。

特别的更新头节点方法 updateHead

	/**
	将节点p设置为新的节点(这是原子操作)
	之后将原节点的next指向自己,直接变成一个哨兵节点
	*/
    final void updateHead(Node<E> h, Node<E> p) {
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
    }

主要是这个h.lazySetNext(h),将h.next->h直接变成了一个哨兵节点,这种lazySetNext主要用于无阻塞数据结构的 nulling out。

入队方法

    public boolean offer(E e) {
	    //入队元素不能为空
        checkNotNull(e);
		//创建新的节点
        final Node<E> newNode = new Node<E>(e);
		//死循环,设置节点,p获取尾节点
        for (Node<E> t = tail, p = t;;) {
			//q是p的next节点
            Node<E> q = p.next;
			//获取尾节点的next节点,尾节点没有下一个节点
            if (q == null) {
                // p is last node
				//这一步说明p是尾节点,对p进行cas操作,newNode -> p.next
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
					//设置尾节点,当之前的尾节点和现在插入的节点之间有一个节点时,
					//并不是每一次都cas设置尾节点
                    if (p != t) // hop two nodes at a time
						//cas的设置尾节点,可能会失败
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
			//多线程操作时候,由于poll时候会把旧的head变为自引用,然后将head的next设置为新的head
           	//所以这里需要重新找新的head
		   else if (p == q)
    			/**
				   1. 大前提p是已经被删除的节点
				   2. 判断tail是否已经改变
					   1. tail 已经变化,则说明tail已经重新定位
					   2. tail未变化,而tail指向的节点是要删除的节点,所以让p指向head
				   判断尾节点是否有变化
				   1. 尾节点变化,则用新的尾节点
				   2. 尾节点没变化,将tail指向head
				*/
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
				//7. (p!=t)->说明执行过p=q 操作(向后遍历操作),"(t!=(t=tail)))"  -> 说明尾节点在其他的线程发生变化
				//为什么"(t!=(t=tail)))" 一定要满足呢,因为tail变更,节省了(p=q)后loop 中的无谓操作,tail
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

分析插入过程,我们插入使用3个线程来调用offer方法,ThreadA,ThreaB同时运行,ThreadC最后插入,分析一下offer方法的流程。
第一步,队列属于初始化状态,ThreadA,TheadB同时调用offer方法;创建节点,死循环设置节点,获取尾节点的next节点,此时q==null,两个线程都同时可能看见,然后cas设置尾节点next节点(队列状态如图A所示),我们假设是ThreadA线程cas设置成功了,然后p==t此时的尾节点其实没有发生变化;此时我们来看ThreadB由于A成功了,所以ThreadB cas失败了,重新循环,此时q!=null了,p==q显然不等于,在看下一个else判断p!=t,此时显然p==t,所以才是p=q,然后再次循环,此时的q==null,我们假设没有线程来和ThreadB竞争,所以,cas设置成功,然后p!=t嘛,显然满足所以设置尾节点,此时的设置尾节点的节点和之前的尾节点之间刚刚好有一个节点。
图A
图B

第二步,ThreadC插入,此时的尾节点是ThreadB插入的节点假设是B,获取B的next节点,q==null,然后cas设置节点,完成,p==t,所以不用更新尾节点
图C

出队方法

  public E poll() {
        restartFromHead:
		//循环,为啥这里是两个for循环?其实主要是为了"continue restartFromHead" 后进行第二个
		//for loop中的初始化
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {  //1.进行变量的初始化 p=h=head
                E item = p.item;
				
				//2. 若node.item!=null,则进行cas操作,cas成功则返回值
                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
					//3. 若此时的p!=h,则更新head(当执行到第8步后会 p!=h)
                    if (p != h) // hop two nodes at a time
						//4.进行cas更新head;"(q=p.next)!=null" 怕出现p此时是尾节点;在ConcurrentLinkedQueue中真正的尾节点只有1个(必须满足node.next=null)
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
				//5.queue是空的,p是尾节点
                else if ((q = p.next) == null) {
					//6.这一步除了更新head外,还是helpDelete删除对了操作,删除p之前的节点
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)  //7. p==q  说明p节点已经删除了的head节点,为啥呢?(见updateHead方法)
                    continue restartFromHead;
                else
                    p = q;   //8. 将q赋值给p,进行下个节点的poll操作(初始化一个dummy节点,在单线程情况下,这个if判断是第一个执行的。)
            }
        }
    }

分析poll方法,ThreadD和ThreadE同时执行来分析下队列的变化(主要是p==q的产生)
初始状态

在这里插入图片描述
第一步,ThreadD和ThreadE执行poll操作,item等于null,所以执行下面的操作(q=p.next)==null不等于,p==q不等于,所以p=q,其实就是上图的ThreadA插入的节点,此时item已经不为null了,所以执行cas设置item为null的操作,假设ThreaD执行成功了,那么此时p!=h就满足了,所以此时要更新头节点调用updateHead,这个方法会更新头结点,并且把原来的头结点设置为自己,(如图D所示);接下来我们分析ThreadE,cas失败了需要重新执行,此时的item已经不为null,所以执行下面的操作(q=p.next)==null不等于,p==q 这时其实已经是等于了,因为ThreadD改变了以前头节点的next节点为自己,所以需要重新遍历,最终p就是p.next节点也就是ThreadB节点,然后cas设置item为null,由于p==p.next,所以p发生了变化,所以需要设置ThreadB为头节点。

图D
图E
看到上面的执行流程可能就有人有疑问了,这不是每次都更新头结点吗,没有优化啊,只看poll方法确实是这样,那什么时候会产生不是每次都更新头节点了,那就是当头节点的item不为null的时候,但是如果按初始化的状况来看,头结点的item一直是null,但是当我看了peek方法之后才发现,peek可以改变这个情况,可以设置item不为null的头结点,其实我们可以在poll方法前调用下peek方法,其实就启动了优化策略

总结

ConcurrentLinkedQueue是非阻塞的通过链表实现的队列,它使用CAS原子指令来处理对数据的并发访问。同时,它允许队列处于不一致的状态。这个特性分离了入队/出队操作中包含的两个需要一起原子执行的步骤。从而有效的缩小了入队/出队时的原子化(更新值的)范围为唯一变量。还有在执行size方法时一定要注意这个是不准确的值,在学poll和offer方法时,一定要理解更新head和tail节点的时机,这种优化手段值得我们去学习。

参考

ConcurrentLinkedQueue 源码分析 (基于Java 8)
ConcurrentLinkedQueue 1.8 源码浅析
Java并发中的ConcurrentLinkedQueue源码分析

(完)