Concurrent包(一):深入AQS
concurrent包
Java.util.concurrent 是在并发编程中比较常用的工具类,里面包含很多用来在并发场景中使用的组件。比如线程池、阻塞队列、计时器、同步器、并发集合等等。
Lock简介
在 Lock 接口出现之前,Java 中的应用程序对于多线程的并发安全处理只能基于synchronized 关键字来解决。但是在 Java5 以后,Lock 的出现可以解决synchronized 在某些场景中的短板,虽然它失去了像synchronize关键字隐式加锁解锁的便捷性,但是却拥有了锁获取和释放的可操作性,可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。
Lock是一个接口,在Concurrent包中只有ReentrantLock实现了Lock接口。ReentrantLock表示重入锁,重入锁指的是线程在获得锁之后,再次获取该锁不需要阻塞,而是直接关联一次计数器增加重入次数。
简单看一下ReentrantLock,大部分方法都使用到了Sync这个属性,而Sync继承了AbstractQueuedSynchronizer,这就是我们今天要说的AQS。
AQS
AQS的功能:独占和共享
独占锁:每次只能有一个线程持有锁,比如 ReentrantLock。
共享锁:允许多个线程同时获取锁,并发访问共享资源,比如ReentrantReadWriteLock。(但是在写线程访问时,所有读线程和其他写线程都会被阻塞)
同步组件主要是通过重写AQS的几个protected方法来表达自己的同步语义。
1. 同步队列
1.1 结构
AQS 队列内部维护的是一个 FIFO( First Input First Output) 的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任意一个节点开始很方便的访问前驱和后继。
1.2 入列:
1.3 出列:
Node的源码
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();// 共享锁标志
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;// 独占锁标志
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;// 前驱节点
volatile Node next;// 后继节点
volatile Thread thread;// 当前线程
Node nextWaiter;// 存储在condition队列中的后继节点
// 是否为共享节点
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter:将线程构造成一个node,添加到等待队列
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition:在condition队列中使用
this.waitStatus = waitStatus;
this.thread = thread;
}
}
2. 独占锁
2.1 获取独占锁
2.1.1 acquire()方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先尝试获取锁,若成功获取到,则继续去业务代码执行。若未获取到锁,则调用addWaiter()方法和acquireQueued()方法来加入队列。
2.1.2 addWaiter()方法
/**
* Creates and enqueues node for current thread and given mode.
* 为当前线程和给定的模式创建和入队一个node
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// 将当前线程构建为Node类型。
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;// 获取到原tail节点(原尾节点)
if (pred != null) {// 原尾节点不为空时,说明队列中存在节点
node.prev = pred;// 把当前线程的Node的prev指向原尾节点
if (compareAndSetTail(pred, node)) {// 通过cas把node加入到AQS队列,也就是将其设置为tail
pred.next = node;// 设置成功以后,把原尾节点的 next 指向当前 node
return node;
}
}
enq(node);// tail=null,把 node 添加到同步队列
return node;
}
参数中的mode:Node.EXCLUSIVE 表示独占锁,Node.SHARED 表示共享锁。
入队时分两种情况:
-
队列中已经存在node时:即原尾节点
pred != null
时,把新增的node的prev指向原尾节点(node.prev = pred
),并通过cas将新增node设置为tail,再将原尾节点的next指向新增node即新尾节点。 -
队列中没有node时:通过
enq(node)
把当前节点加入到队列中。
enq()方法:
private Node enq(final Node node) {
for (;;) {
Node t = tail;// 第一次循环尾节点为空,第二次循环尾节点会指向一个空node
if (t == null) { // Must initialize 若队列未初始化,则必须先初始化
if (compareAndSetHead(new Node())) // 新建一个空node,将其设置为head
tail = head;// 将tail也指向这个空node
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
此时队列为空,还是分两种情况:
-
第一次循环,如果当前线程是第一个加入同步队列,即队列未初始化时:新建一个空node,将其设置为head,同时将tail也指向这个空node,完成队列的初始化,然后继续循环。
-
继续循环时,队列已初始化:则步骤同addWaiter,此时的原尾节点即为初始化时新建的那个空node。
2.1.3 acquireQueued()
通过 addWaiter() 方法把线程添加到链表后,会接着把 Node 作为参数传递给acquireQueued()方法去竞争锁。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();// 获取当前节点的前一个节点
if (p == head && tryAcquire(arg)) { // 如果是head节点,说明当前节点有资格去竞争锁,并tryAcquire()尝试获取锁
setHead(node); // 获取锁成功后,设置head为当前线程的node,当前线程获得执行权限
p.next = null; // 把原 head 节点从链表中移除(将其next指向null)
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//若tryAcquire()尝试获取锁失败,则说明前一个线程还未释放锁
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);// 最终如果获取锁失败,则取消获得锁的操作
}
}
若前一个线程还未释放锁,则会通过shouldParkAfterFailedAcquire()方法根据waitStatus来判断是否需要挂起这个线程,若需要挂起,则通过parkAndCheckInterrupt()挂起。
同步组件可以通过重写tryAcquire()方法来获取锁。
获取到锁之后的setHead()方法:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
shouldParkAfterFailedAcquire()方法:根据waitStatus来判断是否需要挂起这个线程,Node 有 5 中状态:
-
CANCELLED(1):在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该线程的Node。其结点的 waitStatus 为 CANCELLED,即结束状态,进入该状 态后的结点将不会再变化。
-
SIGNAL(-1):只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程。
-
CONDITION(- 2):和 Condition 有关系。
-
PROPAGATE(-3):共享模式下,PROPAGATE 状态的线程处于可运行状态。
-
默认初始状态(0)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //获取前一个线程node的waitStatus状态
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;// 为-1则表示可以放心挂起当前线程
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.意味着prev节点取消了排队,直接移除这个节点
*/
do {
node.prev = pred = pred.prev; // pred=pred.prev;node.prev=pred;
} while (pred.waitStatus > 0);// 循环把 CANCELLED 状态的节点移除
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);// 修改 pred 节点的状态为 SIGNAL(-1)
}
return false;
}
parkAndCheckInterrupt()方法:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
通过 LockSupport.park 挂起当前线程。
2.2 释放独占锁
2.2.1 release()方法
public final boolean release(int arg) {
if (tryRelease(arg)) {// tryRelease()释放锁
Node h = head; //获取head节点
if (h != null && h.waitStatus != 0)// 前头节点不为空且waitStatus不是0的话
unparkSuccessor(h);// 唤起head的后继节点的线程
return true;
}
return false;
}
2.2.2 unparkSuccessor()方法
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//head节点的后继节点
if (s == null || s.waitStatus > 0) { //如果后继节点为null或者status>0(cancelled)状态
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
//从尾部节点开始扫描,找到距离head最近的一个waitStatus<=0的节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//next节点不为空,直接唤醒这个线程
}
为什么从尾部节点开始扫描:
在线程unparkSuccessor()的同时,若此时发生了入队操作:
node.prev = pred;// 把当前线程的Node的prev指向原尾节点
if (compareAndSetTail(pred, node)) {// 通过cas把node加入到AQS队列,也就是将其设置为tail
pred.next = node;// 设置成功以后,把原尾节点的 next 指向当前 node
return node;
}
在CAS操作compareAndSetTail(pred, node)
之后,新增的node已经成为新的tail,但是老的tail又为将它的next指向新增的node(即pred.next = node;
还未执行),此时若由head节点开始循环,那么新增的tail则有可能循环扫描不到。所以要从tail开始循环。
2.3 可中断式获取锁
Lock相对于synchronized有一些更方便的特性,比如响应中断、超时等待等。响应中断式表示线程中断时会抛出异常,其源码就是AQS中的
acquireInterruptibly方法:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))//线程获取锁失败
doAcquireInterruptibly(arg);
}
doAcquireInterruptibly(arg)方法:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这个方法和之前的 acquireQueued()大致相同,区别在于获取锁失败后,挂起线程时(此时会有线程中断),中断后会抛出异常。
2.4 超时等待式获取锁
在AQS中体现在tryAcquireNanos()方法:
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
doAcquireNanos方法:同样和acquireQueued() 方法类似,但是多了nanosTimeout计时。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;//计算截止时间
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //同acquireQueued()类似,当前线程获得锁出队列
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)//判断是否已超时
return false;//已经超时返回false
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);//线程未超时则阻塞等待
if (Thread.interrupted())
throw new InterruptedException();//线程被中断则抛出被中断异常
}
} finally {
if (failed)
cancelAcquire(node);
}
}
LockSupport.parkNanos(this, nanosTimeout)
阻塞等待时间结束。
3. 共享锁
3.1 共享锁的获取
共享锁在同一时刻可以允许多个线程访问,但是在写线程访问时,所有的读线程和其他写线程都会被阻塞。共享锁维护了一个读锁、一个写锁。当执行读操作的时候,获取读锁,读锁不会被阻塞,因为读操作不会影响执行结果。在执行写操作时,获取写锁,若已经有线程持有写锁的情况下,当前线程会被阻塞,只有当写锁释放以后,其他读写操作才能继续执行。使用读写锁提升读操作的并发性,也保证每次写操作对所有的读写操作的可见性,比如ReentrantReadWriteLock就是读写锁。
- 读锁与读锁可以共享
- 读锁与写锁不可以共享(排他)
- 写锁与写锁不可以共享(排他)
3.1.1 acquireShared()方法
AQS通过acquireShared()方法来获取共享锁:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
首先调用tryAcquireShared方法,tryAcquireShared返回值是一个int类型,当返回值为大于等于0的时候方法结束说明获得成功获取锁,否则,表明获取锁失败,会执行doAcquireShared方法:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//获取前一个结点
if (p == head) {//如果前一个节点是head节点的话,则说明当前节点有资格获取锁
int r = tryAcquireShared(arg);//获取锁
if (r >= 0) {//获取到了锁
setHeadAndPropagate(node, r);//设置当前结点为头结点,然后去唤醒后续结点
p.next = null; // help GC 释放头结点,等待GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
addWaiter(Node.SHARED)和独占锁的addWaiter类似,区别是Node的模式不同。
3.2 共享锁释放
3.2.1 releaseShared()方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared尝试释放锁,成功则执行doReleaseShared方法:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);//唤醒后继节点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS CAS失败则重试
}
if (h == head) // loop if head changed
break;
}
}
在共享式锁的释放过程中,对于能够支持多个线程同时访问的并发组件,必须保证多个线程能够安全的释放同步状态,这里采用的CAS保证,当CAS操作失败continue,在下一次循环中进行重试。
参考:
《java并发编程的艺术》