AQS
即AbstractQueuedSynchronizer
类称作队列同步器,是构建其他同步器的一个重要的基础框架,同步器自身是没有实现任何同步接口。它是通过控制一个int
类型的state
变量来表示同步状态,使用一个内置的FIFO
(先进先出)队列来构建工作队列操作。
同步器定义有两种资源共享方式:Exclusive
(独占式)和Share
(共享式)的获取同步状态。
独占式:一个时间点只能执行一个线程。
共享式:一个时间点可多个线程同时执行。
使用方式
同步器的设计采用模板模式,要实现一个同步组件得先继承AbstractQueuedSynchronizer
类,通过调用同步器提供的方法和重写同步器的方法来实现。
调用同步器中的方法就是调用前面提到的通过state
变量值的操作来表示同步操作,state
是被volatile
修饰来保证线程可见性。
方法名 | 描述 |
---|---|
getState() | 获取当前线程同步状态值。 |
setState(int newState) | 设置当前同步状态值。 |
compareAndSetState(int expect, int update) | 通过CAS 设置state 的值。 |
为了避免被重写,以上方法都被final
修饰了。
实现同步组件,需要自己根据自己定制化的需求进行处理,所以需要自己重写同步器提供的方法,要重写的方法主要是独占式获取与释放同步状态、共享式获取与释放同步状态。
tryAcquire(int arg) 独占式获取同步状态,返回值为boolean
类型,获取成返回true
,获取失败返回false
。
tryRelease(int arg) 独占式释放同步状态,返回值为boolean
类型,释放成返回true
,释放失败返回false
。
tryAcquireShared(int arg) 共享式获取同步状态,返回值为int
类型,获取成功返回大于 0 的值。
tryReleaseShared(int arg) 共享式释放同步状态,返回值为boolean
类型,释放成返回true
,释放失败返回false
。
isHeldExclusively() 独占模式下是否被当前前程独占,返回值为boolean
类型,已被当前线程所独占返回true
,反之为false
。
同步器队列
一个同步器里面拥有一个同步队列和多个等待队列。
同步队列
在AbstractQueuedSynchronizer
类中,有一个内部类Node
,通过该类构造一个内部的同步队列,这是一个FIFO 双向队列。
当前运行线程回去同步状态时,如果获取失败,则将当前线程信息创建一个Node
追加到同步队列尾部,然后阻塞当前线程,直到队列的上一个节点的同步状态释放,再唤醒当前线程尝试重新获取同步状态。这个重新获取同步状态操作的节点,一定要是同步队列中第一节点。
Node 源码如下:
1 | static final class Node { |
通过以上代码,可以看到节点中保存了节点模式、等待状态、线程引用、前驱和后继节点,构造节点。
同步队列中被阻塞的线程的等待状态包含有四个常量值:CANCELLED、SIGNAL、CONDITION、PROPAGATE ,它们对应的被阻塞原因如下:
CANCELLED
同步队列中当前节点的线程等待超时或被中断,需要从同步队列中取消等待。SIGNAL
当前节点释放同步状态或被取消后,通知后继节点的线程运行。CONDITION
当前节点在 Condition 上等待,当其他线程对 Condition 调用了 signal() 方法后,该节点将添加到同步队列中。PROPAGATE
该状态存在共享模式的首节点中,当前节点唤醒后将传播唤醒其他节点。
同步器中持有同步队列的首节点和尾节点的引用,在AbstractQueuedSynchronizer
中分别对应head
和tail
字段。
所以同步队列的基本结构如图:
等待队列
AbstractQueuedSynchronizer
类中包含一个内部类ConditionObject
,该类实现了Condition
的接口。一个Condition
对象包含一个等待队列,同时Condition
对象可以实现等待/通知功能。
Condition
持有等待队列中的首节点(firstWaiter)和尾节点(lastWaiter),如下图代码所示:
如果当前线程调用Condition.await()
时,会将当前线程信息构建一个 Node 节点,因为Condition
持有等待队列中的首尾节点,所以将当前等待队列中的尾节点的nextWaiter
指向当前线程构建的节点,同时更新lastWaiter
的引用节点。
上述过程中的节点、队列的操作,是获取到锁的线程来调用Condition.await()
的,所以整个执行过程在没有基于 CAS 的情况下,也是线程安全的。
通过以上的描述,可以知道一个同步器中同步队列、等待队列构成的示意图:
当调用Condition.await()
时,同步队列中的首节点,也就是当前线程所创建的节点,会加入到等待队列中的尾部,释放当前线程的同步状态并且唤醒同步队列的后继节点,当前线程也就进入等待状态,这个先后顺序不能颠倒。这个过程相当于同步队列的首节点的线程构造新的节点加入到等待队列的尾部。
当调用Condition.signal()
方法时,会先将等待队列中首节点转移到同步队列尾部,然后唤醒该同步队列中的线程,该线程从Condition.await()
中自旋退出,接着在在同步器的acquireQueued()
中自旋获取同步状态。
当调用Condition.wait()
方法,同步队列首节点转移到等待队列方法:
1 | public final void await() throws InterruptedException { |
上面addc
方法是向等待队列中添加一个新的节点。
1 | private Node addConditionWaiter() { |
当调用Condition.signal()
方法,等待队列首节点转移到同步队列方法:
1 | public final void signal() { |
转移同步队列首节点到同步队列,并唤醒该节点方法doSignal()
1 | private void doSignal(Node first) { |
转移等待队列到同步队列方法transferForSignal(Node node)
1 | final boolean transferForSignal(Node node) { |
等待队列中的头结点线程安全移动到同步队列方法enq(final Node node)
1 | private Node enq(final Node node) { |
独占式同步状态
独占式同步状态获取和释放是线程安全的操作,一个时间点确保只有一个线程获取到同步状态。
独占式同步状态获取
acquire(int arg)
方法是获取独占式同步状态的方法,当线程获取同步失败时,会加入到同步队列中。
1 | public final void acquire(int arg) { |
上述代码中,当执行tryAcquire(int arg)
方法获取同步状态失败时,接着通过addWaiter(Node.EXCLUSIVE)
构造当前线程信息的节点,随后将新构造的节点通过acquireQueued(final Node node, int arg)
方法加入到同步队列中,节点在同步队列中自旋等待获取同步状态。
tryAcquire(int arg)
是自定义同步器实现的,实现该方法需要保证线程安全获取同步状态,前面讲到AQS
提供的compareAndSetState(int expect, int update)
方法通过CAS
设置state
值来保证线程安全。
上面获取独占式同步状态时,主要分析acquireQueued(final Node node, int arg)
方法,节点加入队列并自旋等待。
1 | final boolean acquireQueued(final Node node, int arg) { |
在首节点释放同步状态后,同时唤醒后继节点。后继节点通过自旋的方式(这里利用死循环方式)也会检查自己的前驱节点是否为首节点,如果是前驱节点则会尝试获取同步状态。获取成功则返回,否则判断是否被中断或者继续自旋上述获取同步状态操作。
独占式同步状态释放
release(int arg)
方法是释放同步状态,当释放同步状态后会唤醒后继节点。
1 | public final boolean release(int arg) { |
tryRelease(int arg)
方法同样也是自定义同步器实现。当首节点不为空且处于等待状态时,那么调用unparkSuccessor(Node node)
方法唤醒后继节点。
1 | private void unparkSuccessor(Node node) { |
释放同步状态的整个过程就是:释放同步状态,唤醒后继节点。这个后继节点必须满足,非空、非当前节点、等待状态小于或等于 0 ,即SIGNAL
、CONDITION
、PROPAGATE
和初始化状态。
独占式资源共享方式除了上面的同步状态获取,还有独占式超时获取使用的方法是doAcquireNanos(int arg, long nanosTimeout)
、独占式可中断获取使用的方法是acquireInterruptibly(int arg)
。
共享式同步状态
共享式同步状态同一时间点可以有多个线程获取到同步状态。
共享式同步状态获取
acquireShared(int arg)
方法是共享式同步状态获取的方法。
1 | public final void acquireShared(int arg) { |
tryAcquireShared(int arg)
方法是自定义同步器实现的,返回大于或等于 0 时,表示获取成功。如果小于 0 时,获取同步状态失败后会调用doAcquireShared(int arg)
方法进行再次尝试获取。
1 | private void doAcquireShared(int arg) {、 |
上面代码中,当获取同步状态失败后,则创建一个共享模式类型的节点,然后自旋式获取同步状态,如果前驱节点为首节点时则尝试再次获取同步状态,获取同步状态成功后退出当前自旋。
共享式释放同步状态
releaseShared(int arg)
方法来释放共享式同步状态。
1 | public final boolean releaseShared(int arg) { |
上面tryReleaseShared(int arg)
释放同步状态方法必须保证线程安全,因为它多个线程获取到同步状态时会引发并发操作,可以通过循环操作和 CAS 来确保安前行。
doReleaseShared()
方法唤醒后续等待状态的节点。
1 | private void doReleaseShared() { |
共享同步状态释放后,自旋式依次唤醒队列中节点。
总结
从 AQS 中可以借鉴它利用循环和 CAS 来确保并发的安全性的思路,同时它采用模板设计模式定义一个处理逻辑,将具体的特定处理逻辑交由子类自定义实现。在 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch 以及 Tomncat 的 LimitLatch 都有用其作为同步器。