AbstractQueuedSynchronizer(AQS)是许多Java并发控制类的基础,比如ReentrantLock、Semaphone、CountDownLatch等都是基于AQS来完成的,因此了解AQS的基本原理是了解Java并发控制类的基础。
AQS是一个抽象类,需要子类的实现其中的功能,所以我们选取ReentrantLock来分析AQS的工作原理。
ReentrantLock与AbstractQueuedSynchronizer
AbstractQueuedSynchronizer(AQS)是许多Java并发控制类的基础,比如ReentrantLock、Semaphone、CountDownLatch等都是基于AQS来完成的,因此了解AQS的基本原理是了解Java并发控制类的基础。
AQS是一个抽象类,需要子类的实现其中的功能,所以我们选取ReentrantLock来分析AQS的工作原理。
在构造ReentrantLock的时候可以指定是否公平,我们先以非公平锁为例讲解。
lock
当调用ReentrantLock的lock方法时,实际调用的是NonfairSync的lock方法:
1 | final void lock() { |
lock方法先通过CAS尝试将AQS中的state从0修改为1。这里的state其实表示的是当前持有锁的线程数量,如果state为0表示当前没有线程获得锁,可以直接将exclusiveOwnerThread
设置为当前线程,这样锁就获取成功了。
如果已经有线程取得了锁,则执行acquire(1)
:
1 | public final void acquire(int arg) { |
acquire
首先调用tryAcquire
,tryAcquire
有子类来实现,NonfairSync的实现方法是nonfairTryAcquire
:
1 | final boolean nonfairTryAcquire(int acquires) { |
首先获取state,如果state为0再次尝试将state从0设置为1,这个步骤在lock
中已经尝试过,这里再一次尝试,尽量快速的获得锁。
如果state不为0,说明有线程占有了锁,于是检查当前占有锁的线程是否就是当前线程,如果是,则简单地将state增加1,然后返回true。
如果state不为0,且其他的线程占有了锁,则返回false。
回到AQS的acquire
函数,如果tryAcquire
返回了true,则整个if条件不成立,后面的acquireQueued
方法不再执行,函数返回,当前线程获得了锁。
如果tryAcquire
返回false,接着执行后面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
。
首先通过addWaiter
增加一个排他类型的节点:
1 | private Node addWaiter(Node mode) { |
首先根据当前线程和mode创建一个Node对象。
因为一开始的tail肯定为null,之间进入enq(node)
1 | private Node enq(final Node node) { |
enq
函数的主要作用是将node节点添加到AQS管理的链表中。这是一个死循环,根据CAS操作来保证关键步骤的原子性,从而可以保证多线程访问时的线程安全。
当第一个线程进入enq
时,head和tail都是null。进入if(t == null)
的代码块,执行compareAndSetHead(new Node)
新建一个空的head。如果成功,将tail也指向head所指向的节点,然后继续循环。如果失败(被其他线程抢先新建了head),继续循环。
当链表完成初始化之后,循环进入else
代码块。首先将node.prev
指向tail,再将tail指向node,接着原先tail的后继指向node。即尝试将node挂到tail后面,tail指向node,这是一个在双向链表尾部增加节点的操作。因为有compareAndSetTail(t, node)
的存在,每次只有一个线程能在尾部成功添加节点,失败的线程继续循环。
回到前面的addWaiter
,这个函数的目的就是要在双向链表的尾部添加node节点,当链表初始化完成后,进入if (pred != null)
代码块,尝试一次添加节点的动作,成功则返回,如果失败进入enq
的死循环,直到node添加成功为止。
在回到前面的acquireQueued
:
1 | final boolean acquireQueued(final Node node, int arg) { |
acquireQueued
也是一个死循环,查看if (p == head && tryAcquire(arg))
,p是node节点的前一个节点,我们发现只有当前节点的前一个节点是head,且tryAcquire
成功获取锁(state为0且通过CAS设置state成功,或者锁的占有者是当前线程)时,进行进一步处理,然后跳出死循环。删除原来的头节点,重新设置头节点。
如果无法获取锁,执行后面的代码。
首先调用shouldParkAfterFailedAcquire(p, node)
判断是否应该挂起线程:
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
首先判断前一个节点的状态是否为Node.SIGNAL
,如果是则返回true,如果不是则返回false。
如果前一个节点的状态大于0(被CANCELLED
掉了),通过while循环向前寻找一个没有CANCELLED
的节点,删除那些CANCELLED
的节点。
如果前一个节点的值不大于0,则通过CAS将前一个节点的状态修改为Node.SIGNAL
。因为acquireQueued
方法是一个死循环,所以一定会有一个节点的状态为Node.SIGNAL
,然后返回true。
shouldParkAfterFailedAcquire
返回true,则执行parkAndCheckInterrupt()
方法,它通过LockSupport.park(this)
将当前线程挂起,它需要等待一个中断、unpark()
方法来唤醒它。
unlock
当调用ReentrantLock的lock方法时,实际调用的是AQS的release方法:
1 | public void unlock() { |
release
方法首先调用tryRelease
判定可以释放锁,以ReentrantLock为例:
1 | protected final boolean tryRelease(int releases) { |
首先计算锁释放之后AQS的state,如果state变为0,则表示锁已经完全释放,再将exclusiveOwnerThread
设置为null,返回true,这样其他线程就可以获得锁;否则仅仅是设置释放之后的state,返回false。
回到release
方法,如果tryRelease
函数返回true,也就是锁完全释放,则唤醒头节点的后继节点。
1 | private void unparkSuccessor(Node node) { |
首先获取head节点的next节点。
如果这个后继节点不为空,则通过LockSupport.unpark
方法来唤醒对应被挂起的线程,于是该节点从acquireQueued
方法的parkAndCheckInterrupt
处唤醒,继续循环。判断tryAcquire
是否能获得锁,能获得锁的话返回执行任务,如果获取锁又失败了,继续挂起等待。
如果这个后继节点为空或者它的状态为CANCELLED
,则从双向链表的尾部遍历寻找一个状态不为CANCELLED
的节点,唤醒它。
至于为什么从尾部开始向前遍历,需要看后文的lockInterruptibly
方法,其中的cancelAcquire
方法中只是设置了next的变化,没有设置prev的变化,在最后由这样一行代码node.next = node
,如果这时执行了unparkSuccessor
方法,并且向后遍历的话,就成了死循环了,所以这时只有prev是稳定的。
lockInterruptibly
该方法和lock
方法的区别是lockInterruptibly
会响应中断。也就是说,如果线程在lockInterruptibly
处等待,可以使用interrupt来使线程继续执行(抛出InterruptedException
),而如果线程在lock
处等待,线程不会响应中断。
在ReentrantLock中,lockInterruptibly
方法调用的是AQS中的acquireInterruptibly
:
1 | public void lockInterruptibly() throws InterruptedException { |
首先仍然调用tryAcquire
方法判断是否可以获得锁,如果无法获得锁,执行doAcquireInterruptibly
方法:
1 | private void doAcquireInterruptibly(int arg) throws InterruptedException { |
该方法和前面分析过的acquireQueued
方法差不多,真正的区别在于parkAndCheckInterrupt
被中断唤醒之后的操作,parkAndCheckInterrupt
被中断唤醒之后返回值为true,因此进入if代码块中:
acquireQueued
方法仅仅将变量interrupted
设为true,进入下一次循环,下一次循环中如果仍然获取不到锁,线程再一次进入等待状态,因此给程序的现象就是线程无法响应中断。
而doAcquireInterruptibly
方法抛出了InterruptedException
,线程得以退出等待状态。因为此时的failed
变量为true,所以最后还需执行cancelAcquire
方法,用于将该节点标记为取消状态:
1 | private void cancelAcquire(Node node) { |
首先设置该节点不再关联任何线程,跳过node前面被CANCELLED
的节点。
如果node是tail(尾节点),将tail指向的位置向前移动一个节点,即将node节点从链表的末尾中删除。
如果node不是tail。
1 | 1. 如果pred(node的前节点)不是head,且它的状态为SIGNAL(等待被唤醒),且pred的线程不为null,则将node的后节点赋值给pred的后节点,即将node节点从链表的中间删除。 |
然后将next指向了自己。
这里可能会有疑问,既然要删除节点,为什么都没有对prev进行操作,而仅仅是修改了next?
要明确的一点是,这里修改指针的操作都是CAS操作,在AQS中所有以compareAndSet
开头的方法都是尝试更新,并不保证成功。
那么在执行cancelAcquire方法时,当前节点的前继节点有可能已经执行完并移除队列了(参见setHead
方法),所以在这里只能用CAS来尝试更新,而就算是尝试更新,也只能更新next,不能更新prev,因为prev是不确定的,否则有可能会导致整个队列的不完整,例如把prev指向一个已经移除队列的node。
什么时候修改prev呢?其实prev是由其他线程来修改的。回到shouldParkAfterFailedAcquire
方法,该方法有这样一段代码:
1 | do { |
这段代码的作用就是通过prev遍历到第一个不是取消状态的node,并修改prev。
这里为什么可以更新prev?因为shouldParkAfterFailedAcquire
方法是在获取锁失败的情况下才能执行,因此进入该方法时,说明已经有线程获得锁了,并且在执行该方法时,当前节点之前的节点不会变化(因为只有当下一个节点获得锁的时候才会设置head),所以这里可以更新prev,而且不必用CAS来更新。
来源
1 | https://blog.wangqi.love/articles/Java/ReentrantLock%E4%B8%8EAbstractQueuedSynchronizer.html |
- 本文作者: 初心
- 本文链接: http://funzzz.fun/2021/05/14/ReentrantLock与AbstractQueuedSynchronizer/
- 版权声明: 本博客所有文章除特别声明外,均采用 MIT 许可协议。转载请注明出处!