JDK源码解析实战 – AbstractQueuedSynchronizer源码解析
AbstractQueuedSynchronizer 笼统同步队列简称 AQS ,它是实现同步器的基础组件,
并发包中锁的底层就是使用 AQS 实现的.
大多数开发者可能永远不会直接使用AQS ,但是知道其原理对于架构设计还是很有帮助的,而且要了解ReentrantLock、CountDownLatch等高级锁我们必需搞懂 AQS.
1 整体感知
1.1 架构图
image
AQS框架大致分为五层,自上而下由浅入深,从AQS对外暴露的API究竟层基础数据.
当有自己设置同步器接入时,只要重写第一层所需要的部分方法就可,不需要关注底层具体的实现流程。当自己设置同步器进行加锁或者者解锁操作时,先经过第一层的API进入AQS内部方法,而后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列解决,而这些解决方式均依赖于第五层的基础数据提供层。
AQS 本身就是一套锁的框架,它定义了取得锁和释放锁的代码结构,所以假如要新建锁,只需继承 AQS,并实现相应方法就可。
1.2 类设计
该类提供了一种框架,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关的同步器(信号量,事件等)。此类的设计旨在为大多数依赖单个原子int值表示 state 的同步器提供切实有用的基础。子类必需定义更改此 state 的 protected 方法,并定义该 state 对于 acquired 或者 released 此对象而言意味着什么。鉴于这些,此类中的其余方法将执行全局的排队和阻塞机制。子类可以维护其余状态字段,但是就同步而言,仅跟踪使用方法 getState,setState 和 compareAndSetState 操作的原子升级的int值。
子类应定义为用于实现其所在类的同步属性的非公共内部帮助器类。
子类应定义为用于实现其所在类的同步属性的非 public 内部辅助类。类AbstractQueuedSynchronizer不实现任何同步接口。 相反,它定义了诸如acquireInterruptible之类的方法,可以通过具体的锁和相关的同步器适当地调用这些方法来实现其 public 方法。
此类支持默认的排他模式和共享模式:
- 当以独占方式进行获取时,其余线程尝试进行的获取将无法成功
- 由多个线程获取的共享模式可能(但不肯定)成功
该类不了解这些差异,只是从机制的意义上说,当共享模式获取成功时,下一个等待线程(假如存在)也必需确定它能否也可以获取。在不同模式下等待的线程们共享相同的FIFO队列。 通常,实现的子类仅支持这些模式之一,但也可以同时出现,比方在ReadWriteLock.仅支持排他模式或者共享模式的子类无需定义支持未使用模式的方法.
此类定义了一个内嵌的 ConditionObject 类,可以由支持排他模式的子类用作Condition 的实现,该子类的 isHeldExclusively 方法报告相对于当前线程能否独占同步,使用当前 getState 值调用的方法 release 会完全释放此对象 ,并取得给定的此保存状态值,最终将该对象恢复为其先前的获取状态。否则,没有AbstractQueuedSynchronizer方***创立这样的条件,因而,假如无法满足此束缚,请不要使用它。ConditionObject的行为当然取决于其同步器实现的语义。
此类提供了内部队列的检查,检测和监视方法,以及条件对象的相似方法。 可以根据需要使用 AQS 将它们导出到类中以实现其同步机制。
此类的序列化仅存储基础原子整数维护状态,因而反序列化的对象具备空线程队列。 需要序列化性的典型子类将定义一个readObject方法,该方法在反序列化时将其恢复为已知的初始状态。
2 用法
要将此类用作同步器的基础,使用getState setState和/或者compareAndSetState检查和/或者修改同步状态,以重新定义以下方法(如适用)
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
默认情况下,这些方法中的每一个都会抛 UnsupportedOperationException。
这些方法的实现必需在内部是线程安全的,并且通常应简短且不阻塞。 定义这些方法是使用此类的唯一受支持的方法。 所有其余方法都被公告为final,由于它们不能独立变化。
从 AQS 继承的方法对跟踪拥有排他同步器的线程很有用。 鼓励使用它们-这将启用监视和诊断工具,以帮助客户确定哪些线程持有锁。
尽管此类基于内部的FIFO队列,它也不会自动执行FIFO获取策略。 独占同步的核心采用以下形式:
- Acquire
while (!tryAcquire(arg)) { 假如线程尚未入队,则将其加入队列; 可能阻塞当前线程;}
- Release
if (tryRelease(arg)) 取消阻塞第一个入队的线程;
共享模式与此类似,但可能涉及级联的signal.
由于 acquire 中的检查是入队前被调用的,所以新获取的线程可能会在被阻塞和排队的其余线程之前插入。 但是,假如需要,可以定义tryAcquire和/或者tryAcquireShared以通过内部调用一种或者多种检查方法来禁用插入,从而提供公平的FIFO获取顺序。 特别是,假如hasQueuedPredecessors()(公平同步器专门设计的一种方法)返回true,则大多数公平同步器都可以定义tryAcquire返回false.
对于默认的插入(也称为贪婪,放弃和convoey -avoidance)策略,吞吐量和可伸缩性通常最高。 虽然不能保证这是公平的或者避免饥饿,但允许较早排队的线程在较晚排队的线程之前进行重新竞争,并且每个重新争用都有一次机会可以毫无偏向地成功竞争过进入的线程。 同样,虽然获取通常不需要自旋,但在阻塞之前,它们可能会执行tryAcquire的屡次调用,并插入其余任务。 假如仅短暂地保持排他同步,则这将带来自旋的大部分好处,而假如不进行排他同步,则不会带来很多负担。 假如需要的话,可以通过在调用之前使用“fast-path”检查来获取方法来加强此功能,并可能预先检查hasContended()和/或者hasQueuedThreads(),以便仅在同步器可能不存在争用的情况下这样做。
此类为同步提供了有效且可扩展的基础,部分是通过将其使用范围规范化到可以依赖于int状态,acquire 和 release 参数以及内部的FIFO等待队列的同步器。 当这还不够时,可以使用原子类、自己设置队列类和锁支持阻塞支持从较低级别构建同步器。
3 使用案例
这里是一个不可重入的排他锁,它使用值0表示解锁状态,使用值1表示锁定状态。尽管不可重入锁并不严格要求记录当前所有者线程,但是这个类这样做是为了更容易监视使用情况。它还支持条件,并暴露其中一个检测方法:
class Mutex implements Lock, java.io.Serializable { // 我们内部的辅助类 private static class Sync extends AbstractQueuedSynchronizer { // 报告能否处于锁定状态 protected boolean isHeldExclusively() { return getState() == 1; } // 假如 state 是 0,获取锁 public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 通过将 state 置 0 来释放锁 protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; } // 提供一个 Condition Condition newCondition() { return new ConditionObject(); } // 反序列化属性 private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // 重置到解锁状态 } } // 同步对象完成所有的工作。我们只是期待它. private final Sync sync = new Sync(); public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
这是一个闩锁类,它相似于CountDownLatch,只是它只要要一个单信号即可以触发。由于锁存器是非独占的,所以它使用共享的获取和释放方法。
class BooleanLatch { private static class Sync extends AbstractQueuedSynchronizer { boolean isSignalled() { return getState() != 0; } protected int tryAcquireShared(int ignore) { return isSignalled() ? 1 : -1; } protected boolean tryReleaseShared(int ignore) { setState(1); return true; } } private final Sync sync = new Sync(); public boolean isSignalled() { return sync.isSignalled(); } public void signal() { sync.releaseShared(1); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } }
4 基本属性与框架
4.1 继承体系图
image
4.2 定义
image
可知 AQS 是一个笼统类,生来就是被各种子类锁继承的。继承自AbstractOwnableSynchronizer,其作用就是为了知道当前是哪个线程取得了锁,便于后续的监控
image
4.3 属性
4.3.1 状态信息
volatile 修饰,对于可重入锁,每次取得锁 +1,释放锁 -1
image
可以通过 getState 得到同步状态的当前值。该操作具备 volatile 读的内存语义。
image
setState 设置同步状态的值。该操作具备 volatile 写的内存语义
image
compareAndSetState 假如当前状态值等于期望值,则以原子方式将同步状态设置为给定的升级值。此操作具备 volatile 读和写的内存语义
image
自旋比使用定时挂起更快。粗略预计足以在非常短的超时时间内提高响应能力,当设置等待时间时才会用到这个属性
image
这写方法都是Final的,子类无法重写。
独占模式
image
共享模式
image
4.3.2 同步队列
- 作用
阻塞获取不到锁(独占锁)的线程,并在适当时机从队首释放这些线程
同步队列底层数据结构是个双向链表
等待队列的头,推迟初始化。 除初始化外,只能通过 setHead 方法修改
image
注意:假如head存在,则其waitStatus保证不会是 CANCELLED等待队列的尾部,推迟初始化。 仅通过方法 enq 修改以增加新的等待节点
image
4.3.4 条件队列
为什么需要条件队列
个同步队列并不是所有场景都能搞定,在遇到锁 + 队列结合的场景时,就需要 Lock + Condition,先使用 Lock 决定
- 哪些线程可以取得锁
- 哪些线程需要到同步队列里面排队阻塞
取得锁的多个线程在碰到队列满或者者空的时候,可以使用 Condition 来管理这些线程,让这些线程阻塞等待,而后在合适的时机后,被正常唤醒。
同步队列 + 条件队列联手使用的场景,最多被使用到锁 + 队列的场景中。
作用
AQS 的内部类,结合锁实现线程同步。存放调用条件变量的 await 方法后被阻塞的线程
实现了 Condition 接口,而 Condition 接口就相当于 Object 的各种监控方法
image
需要使用时,直接 new ConditionObject()。
4.3.5 Node
同步队列和条件队列的共用节点。
入队时,用 Node 把线程包装一下,而后把 Node 放入两个队列中,我们看下 Node 的数据结构,如下:
4.3.5.1 模式
共享模式
image
独占模式
image
4.3.5.2 等待状态
image
注意等待状态仅能为如下值
SIGNAL
- 同步队列中的节点在自旋获取锁时,假如前一个节点的状态是
SIGNAL
,那么自己就直接被阻塞,否则会一直自旋 该节点的后继节点会被(或者很快)阻塞(通过park),因而当前节点释放或者取消时必需unpark其后继节点。为了避免竞争,acquire方法必需首先指示它们需要一个 signal,而后重试原子获取,而后在失败时阻塞。
image
CANCELLED
指示线程已被取消
image
取消:因为超时或者中断,该节点被取消。
节点永远不会离开此状态,即此为一种终极状态。特别是,具备 cancelled 节点的线程永远不会再次阻塞。
CONDITION
该节点当前在条件队列中,当节点从同步队列被转移到条件队列时,状态就会被更改成 CONDITION
image
在被转移之前,它不会用作同步队列的节点,此时状态将设置为0(该值的使用与该字段的其余用途无关,仅仅是简化了机制)。
PROPAGATE
线程处在 SHARED
情景下,该字段才会启用。
- 指示下一个acquireShared应该无条件传播,共享模式下,该状态的进程处于 Runnable 状态
image
releaseShared 应该传播到其余节点。 在doReleaseShared中对此进行了设置(仅适用于头节点),以确保传播继续进行,即便此后进行了其余操作也是如此。
0
以上都不是,初始化状态。
小结
这些值是以数字方式排列,极大方便了开发者的使用。我们在平常开发也可以定义少量有特殊意义的常量值。
非负值表示节点不需要 signal。 因而,大多数代码并不需要检查特定值,检查符号就可。
- 对于普通的同步节点,该字段初始化为0
- 对于条件节点,该字段初始化为
CONDITION
使用CAS(或者在可能的情况下进行无条件的 volatile 写)对其进行修改。
注意两个状态的区别
- state 是锁的状态,int 型,子类继承 AQS 时,都是要根据 state 字段来判断有无得到锁
- waitStatus 是节点(Node)的状态
4.3.5.3 数据结构
前驱节点
- 链接到当前节点/线程所依赖的用来检查 waitStatus 的前驱节点
image
在入队期间赋值,并且仅在出队时将其清空(为了GC)。
此外,在取消一个前驱结点后,在找到一个未取消的节点后会短路,这将始终存在,由于头节点永远不会被取消:只有成功 acquire 后,一个节点才会变为头。
取消的线程永远不会成功获取,并且线程只会取消自身,不会取消任何其余节点。
后继节点
链接到后继节点,当前节点/线程在释放时将其unpark。 在入队时赋值,在绕过已取消的前驱节点时进行调整,在出队时清零(为了GC)。 入队操作直到附加后才赋值前驱节点的下一个字段,因而看到 null 的下一个字段并不肯定意味着该节点位于队列末尾。 但是,假如下一个字段显示为空,则我们可以从尾部扫描上一个以进行再次检查。 已取消节点的下一个字段设置为指向节点本身而不是null,以使isOnSyncQueue的工作更轻松。
image
使该节点入队的线程。 在构造时初始化,使用后消亡。
image
在同步队列中,nextWaiter 表示当前节点是独占模式还是共享模式
在条件队列中,nextWaiter 表示下一个节点元素
链接到在条件队列等待的下一个节点,或者者链接到特殊值SHARED
。 因为条件队列仅在以独占模式保存时才被访问,因而我们只要要一个简单的链接队列就可在节点等待条件时保存节点。 而后将它们转移到队列中以重新获取。 并且因为条件只能是独占的,因而我们使用特殊值来表示共享模式来保存字段。
image
5 Condition 接口
JDK5 时提供。
条件队列 ConditionObject 实现了 Condition 接口
image
本节就让我们一起来研究之
image
Condition 将对象监视方法(wait,notify和notifyAll)分解为不同的对象,从而通过与任意Lock实现结合使用,从而使每个对象具备多个wait-sets。 当 Lock 替换了 synchronized 方法和语句的使用,Condition 即可以替换了Object监视器方法的使用。
Condition 的实现可以提供与 Object 监视方法不同的行为和语义,例如保证通知的顺序,或者者在执行通知时不需要保持锁定。 假如实现提供了这种专门的语义,则实现必需记录这些语义。
请注意,Condition实例只是普通对象,它们本身可以用作 synchronized 语句中的目标,并且可以调用自己的监视器 wait 和 notification 方法。 获取 Condition 实例的监视器锁或者使用其监视器方法与获取与该条件相关联的锁或者使用其 await 和 signal 方法没有特定的关系。 建议避免混淆,除非可能在自己的实现中,否则不要以这种方式使用 Condition 实例。
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
(ArrayBlockingQueue类提供了此功能,因而没有理由实现此示例用法类。)
定义出少量方法,这些方法奠定了条件队列的基础
API
await
使当前线程等待,直到被 signalled 或者被中断
image
与此 Condition 相关联的锁被原子释放,并且出于线程调度目的,当前线程被禁用,并且处于休眠状态,直到发生以下四种情况之一:
- 其它线程为此 Condition 调用了 signal 方法,并且当前线程刚好被选择为要唤醒的线程
- 其它线程为此 Condition 调用了 signalAll 方法
- 其它线程中断了当前线程,并且当前线程支持被中断
- 要么发生“虚假唤醒”。
在所有情况下,在此方法可以返回之前,必需重新获取与此 Condition 关联的锁,才能真正被唤醒。当线程返回时,可以保证保持此锁。
await 超时时间
使当前线程等待,直到被 signal 或者中断,或者经过指定的等待时间
image
此方法在行为上等效于:
awaitNanos(unit.toNanos(time)) > 0
所以,尽管入参可以是任意单位的时间,但其实仍会转化成纳秒
awaitNanos
image
注意这里选择纳秒是为了避免计算剩余等待时间时的截断误差
signal()
唤醒条件队列中的一个线程,在被唤醒前必需先取得锁
image
signalAll()
唤醒条件队列中的所有线程
image
6 锁的获取
获取锁显式的方法就是 Lock.lock () ,最终目的其实是想让线程取得对资源的访问权。而 Lock 又是 AQS 的子类,lock 方法根据情况一般会选择调用 AQS 的 acquire 或者 tryAcquire 方法。
acquire 方法 AQS 已经实现了,tryAcquire 方法是等待子类去实现,acquire 方法制定了获取锁的框架,先尝试使用 tryAcquire 方法获取锁,获取不到时,再入同步队列中等待锁。tryAcquire 方法 AQS 中直接抛出一个异常,表明需要子类去实现,子类可以根据同步器的 state 状态来决定能否能够取得锁,接下来我们详细看下 acquire 的源码解析。
acquire 也分两种,一种是独占锁,一种是共享锁
6.1 acquire 独占锁
独占模式下,尝试取得锁
image
在独占模式下获取,忽略中断。 通过至少调用一次 tryAcquire(int) 来实现,并在成功后返回。 否则,将线程排队,并可能反复阻塞和解除阻塞,并调用 tryAcquire(int) 直到成功。 该方法可用于实现方法 Lock.lock()。
对于 arg 参数,该值会传送给 tryAcquire,但不会被解释,可以实现你喜欢的任何内容。
- 看一下 tryAcquire 方法
imageAQS 对其只是简单的实现,具体获取锁的实现方法还是由各自的公平锁和非公平锁单独实现,实现思路一般都是 CAS 赋值 state 来决定能否能取得锁(阅读后文的 ReentrantLock 核心源码解析就可)。
执行流程
- 尝试执行一次 tryAcquire
- 成功直接返回
- 失败走 2
线程尝试进入同步队列,首先调用 addWaiter 方法,把当前线程放到同步队列的队尾
接着调用 acquireQueued 方法
- 阻塞当前节点
- 节点被唤醒时,使其能够取得锁
- 假如 2、3 失败了,中断线程
6.1.1 addWaiter
image
执行流程
- 通过当前的线程和锁模式新建一个节点
- pred 指针指向尾节点tail
- 将Node 的 prev 指针指向 pred
通过compareAndSetTail方法,完成尾节点的设置。该方法主要是对tailOffset和Expect进行比较,假如tailOffset的Node和Expect的Node地址是相同的,那么设置Tail的值为Update的值。
image
假如 pred 指针为 null(说明等待队列中没有元素),或者者当前 pred 指针和 tail 指向的位置不同(说明被别的线程已经修改),就需要 enq
image
把新的节点增加到同步队列的队尾。
假如没有被初始化,需要进行初始化一个头结点出来。但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点。假如经历了初始化或者者并发导致队列中有元素,则与之前的方法相同。其实,addWaiter就是一个在双端链表增加尾节点的操作,需要注意的是,双端链表的头结点是一个无参构造函数的头结点。
线程获取锁的时候,过程大体如下:
- 当没有线程获取到锁时,线程1获取锁成功
线程2申请锁,但是锁被线程1占有
image
假如再有线程要获取锁,依次在队列中往后排队就可。
在 addWaiter 方法中,并没有进入方法后立马就自旋,而是先尝试一次追加到队尾,假如失败才自旋,由于大部分操作可能一次就会成功,这种思路在自己写自旋的时候可以多多参考哦。
6.1.2 acquireQueued
阻塞当前线程。
- 自旋使前驱结点的 waitStatus 变成
signal
,而后阻塞自身 - 取得锁的线程执行完成后,释放锁时,会唤醒阻塞的节点,之后再自旋尝试取得锁
final boolean acquireQueued(final Node node, int arg) { // 标识能否成功获得资源 boolean failed = true; try { // 标识能否在等待过程被中断过 boolean interrupted = false; // 自旋,结果要么获取锁或者者中断 for (;;) { // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 若 p 是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(此前的头结点还只是虚节点) if (p == head && tryAcquire(arg)) { // 获取锁成功,将头指针移动到当前的 node setHead(node); p.next = null; // 辅助GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
看其中的具体方法:
setHead
image
方法的核心:
shouldParkAfterFailedAcquire
依据前驱节点的等待状态判断当前线程能否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取头结点的节点状态 int ws = pred.waitStatus; // 说明头结点处于唤醒状态 if (ws == Node.SIGNAL) /* * 该节点已经设置了状态,要求 release 以 signal,以便可以安全park */ return true; // 前文说过 waitStatus>0 是取消状态 if (ws > 0) { /* * 跳过已被取消的前驱结点并重试 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus 必需为 0 或者 PROPAGATE。 表示我们需要一个 signal,但不要 park。 调用者将需要重试以确保在 park 之前还无法获取。 */ // 设置前驱节点等待状态为 SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
为避免自旋导致过度消费 CPU 资源,以判断前驱节点的状态来决定能否挂起当前线程
挂起流程图
image
如下解决 prev 指针的代码。shouldParkAfterFailedAcquire 是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因而这个时候变更 prev 指针较安全。
image
parkAndCheckInterrupt
将当前线程挂起,阻塞调用栈并返回当前线程的中断状态
image
一图小结该方法流程
image
从上图可以看出,跳出当前循环的条件是当“前驱节点是头结点,且当前线程获取锁成功”。
6.1.3 cancelAcquire
shouldParkAfterFailedAcquire中取消节点是怎样生成的呢?什么时候会把一个节点的waitStatus设置为-1?又是在什么时间释放节点通知到被挂起的线程呢?来一起研究本小节源码。
image
private void cancelAcquire(Node node) { // 假如节点不存在,无视该方法 if (node == null) return; // 设置该节点不关联任何线程,即虚节点 node.thread = null; // 跳过被取消的前驱结点们 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext 是要取消拼接的显著节点。假如没有,以下情况 CAS 将失败,在这种情况下,我们输掉了和另一个cancel或者signal的竞争,因而无需采取进一步措施。 // 通过前驱节点,跳过取消状态的node Node predNext = pred.next; // 这里可以使用无条件写代替CAS,把当前node的状态设置为CANCELLED // 在这个原子步骤之后,其余节点可以跳过我们。 // 在此之前,我们不受其余线程的干扰。 node.waitStatus = Node.CANCELLED; // 假如是 tail 节点, 移除自身 // 假如当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点 // 升级失败的话,则进入else,假如升级成功,将tail的后继节点设置为null if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // 辅助 GC } }
当前的流程:
- 获取当前节点的前驱节点,假如前驱节点的状态是
CANCELLED
,那就一直往前遍历,找到第一个waitStatus <= 0
的节点,将找到的Pred节点和当前Node关联,将当前Node设置为CANCELLED
。 - 根据当前节点的位置,考虑以下三种情况:
(1) 当前节点是尾节点。
(2) 当前节点是Head的后继节点。
(3) 当前节点不是Head的后继节点,也不是尾节点。
根据(2),来分析每一种情况的流程。
当前节点是尾节点
image
当前节点是Head的后继节点
image
- 当前节点不是Head的后继节点,也不是尾节点
image
通过上面的流程,我们对于CANCELLED
节点状态的产生和变化已经有了大致的理解,但是为什么所有的变化都是对Next指针进行了操作,而没有对Prev指针进行操作呢?什么情况下会对Prev指针进行操作?
执行
cancelAcquire
时,当前节点的前驱节点可能已经出队(已经执行过try代码块中的shouldParkAfterFailedAcquire),假如此时修改 prev 指针,有可能会导致 prev 指向另一个已经出队的 Node,因而这块变化 prev 指针不安全。
6.2 tryAcquireNanos
尝试以独占模式获取,假如中断将停止,假如超过给定超时将直接失败。首先检查中断状态,而后至少调用一次#tryAcquire,成功后返回。否则,线程将排队,可能会反复地阻塞和取消阻塞,调用#tryAcquire,直到成功或者线程中断或者超时结束。此方法可用于实现方法 Lock#tryLock(long, TimeUnit)。
image
尝试性的获取锁, 获取锁不成功, 直接加入到同步队列,加入操作即在doAcquireNanos
doAcquireNanos
以独占限时模式获取。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 截止时间 final long deadline = System.nanoTime() + nanosTimeout; // 将当前的线程封装成 Node 加入到同步对列里面 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { // 获取当前节点的前驱节点(当一个n在同步对列里, 并且没有获取 // lock 的 node 的前驱节点不可能是 null) final Node p = node.predecessor(); // 判断前驱节点能否为 head // 前驱节点是 head, 存在两种情况 // (1) 前驱节点现在持有锁 // (2) 前驱节点为 null, 已经释放锁, node 现在可以获取锁 // 则再调用 tryAcquire 尝试获取 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // 辅助GC failed = false; return true; } // 计算剩余时间 nanosTimeout = deadline - System.nanoTime(); // 超时,直接返回 false if (nanosTimeout <= 0L) return false; // 调用 shouldParkAfterFailedAcquire 判断能否需要阻塞 if (shouldParkAfterFailedAcquire(p, node) && // 若未超时, 并且大于 spinForTimeoutThreshold, 则将线程挂起 nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { // 在整个获取中出错(中断/超时等),则清理该节点 if (failed) cancelAcquire(node); }}
6.3 acquireSharedInterruptibly
以共享模式获取,假如中断将停止。
image
首先检查中断状态,而后至少调用一次 tryAcquireShared(int),成功后返回。否则,线程将排队,可能会反复阻塞和取消阻塞,调用 tryAcquireShared(int),直到成功或者线程被中断。
arg 参数,这个值被传递给 tryAcquireShared(int),但未被解释,可以代表你喜欢的任何东西。假如当前线程被中断,则抛 InterruptedException。
doAcquireSharedInterruptibly
共享可中断模式的获取锁
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 创立"当前线程"的 Node 节点,且其中记录的共享锁 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 获取前驱节点 final Node p = node.predecessor(); // 假如前驱节点是头节点 if (p == head) { // 尝试获取锁(因为前驱节点为头节点,所以可能此时前驱节点已经成功获取了锁,所以尝试获取一下) int r = tryAcquireShared(arg); // 获取锁成功 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // 辅助 GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}
7 锁的释放
7.1 release
以独占模式释放。 假如 tryRelease 返回true,则通过解锁一个或者多个线程来实现。此方法可用于实现方法 Lock#unlock
arg 参数将传送到 tryRelease,并且可以表示你自己喜欢的任何内容。
自己设置实现的 tryRelease 假如返回 true,说明该锁没有被任何线程持有
image
头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
image
h == null
Head还没初始化。初始时 head == null,第一个节点入队,Head会被初始化一个虚节点。所以说,这里假如还没来得及入队,就会出现head == nullh != null && waitStatus == 0
后继节点对应的线程仍在运行中,不需要唤醒h != null && waitStatus < 0
后继节点可能被阻塞了,需要唤醒
unparkSuccessor
private void unparkSuccessor(Node node) { /* * 假如状态是负数的(就可能需要signal),请尝试清理预期的signal。 假如失败或者状态被等待线程更改,则OK。 */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * 要unpark的线程保留在后继线程中,后者通常就是下一个节点。 但是,假如取消或者显然为空,从尾部逆向移动以找到实际的未取消后继者。 */ Node s = node.next; // 假如下个节点为 null 或者者 cancelled,就找到队列最开始的非cancelled 的节点 if (s == null || s.waitStatus > 0) { s = null; // 从尾部节点开始到队首方向查找,寻得队列第一个 waitStatus<0 的节点。 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 假如下个节点非空,而且unpark状态<=0的节点 if (s != null) LockSupport.unpark(s.thread); }
- 之前的addWaiter方法的节点入队并不是原子操作
image
标识部分可以看做是 tail 入队的原子操作,但是此时pred.next = node;
尚未执行,假如这个时候执行了unparkSuccessor
,就无法从前往后找了 - 在产生
CANCELLED
状态节点的时候,先断开的是 next 指针,prev 指针并未断开,因而也是必需要从后往前遍历才能够遍历完
7.2 releaseShared
以共享模式释放。 假如 tryReleaseShared(int) 返回true,则通过解除一个或者多个线程的阻塞来实现。
arg 参数 – 该值传送给 tryReleaseShared(int),但并未实现,可以自己设置喜欢的任何内容。
image
执行流程
- tryReleaseShared 尝试释放共享锁,失败返回 false,true 成功走2
- 唤醒当前节点的后续阻塞节点
doReleaseShared
共享模式下的释放动作 – 表示后继信号并确保传播(注意:对于独占模式,假如需要signal,释放仅相当于调用head的unparkSuccessor)。
private void doReleaseShared() { /* * 即便有其余正在进行的acquire/release,也要确保 release 传播。 * 假如需要signal,则以尝试 unparkSuccessor head节点的常规方式进行。 * 但假如没有,则将状态设置为 PROPAGATE,以确保释放后继续传播。 * 此外,在执行此操作时,必需循环以防增加新节点。 * 另外,与unparkSuccessor的其余用法不同,我们需要知道CAS重置状态能否失败,假如重新检查,则失败。 */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 循环以重新检查 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // 在失败的CAS上循环 } if (h == head) // 假如头结点变了则循环 break; } }
8 中断解决
唤醒对应线程后,对应的线程就会继续往下执行。继续执行acquireQueued
方法以后,中断如何解决?
8.1 parkAndCheckInterrupt
park 的便捷方法,而后检查能否中断
image
- 再看回 acquireQueued 代码,不管 parkAndCheckInterrupt 返回什么,都会执行下次循环。若此时获取锁成功,就返回当前的 interrupted。
image
- acquireQueued 为True,就会执行 selfInterrupt
image
8.2 selfInterrupt
image
该方法是为了中断线程。
获取锁后还要中断线程的起因:
- 当中断线程被唤醒时,并不知道被唤醒的起因,可能是当前线程在等待中被中断,也可能释放锁后被唤醒。因而通过 Thread.interrupted() 检查中断标识并记录,假如发现该线程被中断过,就再中断一次
- 线程在等待资源的过程中被唤醒,唤醒后还是会不断尝试获取锁,直到抢到锁。即在整个流程中,并不响应中断,只是记录中断的记录。最后抢到锁返回了,那么假如被中断过的话,就需要补充一次中断
总结
AQS 的源码实在是太多了,我们只研究核心源码,其余部分源码都可以参考研究。
作为后面的读写等高级锁的基础,务必要搞懂 AQS 的设计哲学。
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » JDK源码解析实战 – AbstractQueuedSynchronizer源码解析