Concurrent包(一):深入AQS

Concurrent包(一):深入AQS

concurrent包

Java.util.concurrent 是在并发编程中比较常用的工具类,里面包含很多用来在并发场景中使用的组件。比如线程池、阻塞队列、计时器、同步器、并发集合等等。

Lock简介

在 Lock 接口出现之前,Java 中的应用程序对于多线程的并发安全处理只能基于synchronized 关键字来解决。但是在 Java5 以后,Lock 的出现可以解决synchronized 在某些场景中的短板,虽然它失去了像synchronize关键字隐式加锁解锁的便捷性,但是却拥有了锁获取和释放的可操作性,可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。

Lock是一个接口,在Concurrent包中只有ReentrantLock实现了Lock接口。ReentrantLock表示重入锁,重入锁指的是线程在获得锁之后,再次获取该锁不需要阻塞,而是直接关联一次计数器增加重入次数。

简单看一下ReentrantLock,大部分方法都使用到了Sync这个属性,而Sync继承了AbstractQueuedSynchronizer,这就是我们今天要说的AQS。

AQS

AQS的功能:独占和共享
独占锁:每次只能有一个线程持有锁,比如 ReentrantLock。
共享锁:允许多个线程同时获取锁,并发访问共享资源,比如ReentrantReadWriteLock。(但是在写线程访问时,所有读线程和其他写线程都会被阻塞)

同步组件主要是通过重写AQS的几个protected方法来表达自己的同步语义。

1. 同步队列

1.1 结构

AQS 队列内部维护的是一个 FIFO( First Input First Output) 的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任意一个节点开始很方便的访问前驱和后继。

1.2 入列:

1.3 出列:

Node的源码

static final class Node {

  /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();// 共享锁标志
  /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;// 独占锁标志
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
    volatile int waitStatus;
    volatile Node prev;// 前驱节点
    volatile Node next;// 后继节点
    volatile Thread thread;// 当前线程
    Node nextWaiter;// 存储在condition队列中的后继节点
        // 是否为共享节点
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter:将线程构造成一个node,添加到等待队列
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition:在condition队列中使用
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

2. 独占锁

2.1 获取独占锁

2.1.1 acquire()方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

首先尝试获取锁,若成功获取到,则继续去业务代码执行。若未获取到锁,则调用addWaiter()方法和acquireQueued()方法来加入队列。

2.1.2 addWaiter()方法

/**
 * Creates and enqueues node for current thread and given mode.
 * 为当前线程和给定的模式创建和入队一个node
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    // 将当前线程构建为Node类型。
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure

    Node pred = tail;// 获取到原tail节点(原尾节点)

    if (pred != null) {// 原尾节点不为空时,说明队列中存在节点
        node.prev = pred;// 把当前线程的Node的prev指向原尾节点
        if (compareAndSetTail(pred, node)) {// 通过cas把node加入到AQS队列,也就是将其设置为tail
            pred.next = node;// 设置成功以后,把原尾节点的 next 指向当前 node
            return node;
        }
    }
    enq(node);// tail=null,把 node 添加到同步队列
    return node;
}

参数中的mode:Node.EXCLUSIVE 表示独占锁,Node.SHARED 表示共享锁。

入队时分两种情况:

  • 队列中已经存在node时:即原尾节点pred != null时,把新增的node的prev指向原尾节点(node.prev = pred),并通过cas将新增node设置为tail,再将原尾节点的next指向新增node即新尾节点。

  • 队列中没有node时:通过enq(node)把当前节点加入到队列中。

enq()方法:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;// 第一次循环尾节点为空,第二次循环尾节点会指向一个空node
        if (t == null) { // Must initialize 若队列未初始化,则必须先初始化
            if (compareAndSetHead(new Node())) // 新建一个空node,将其设置为head
                tail = head;// 将tail也指向这个空node
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

此时队列为空,还是分两种情况:

  • 第一次循环,如果当前线程是第一个加入同步队列,即队列未初始化时:新建一个空node,将其设置为head,同时将tail也指向这个空node,完成队列的初始化,然后继续循环。

  • 继续循环时,队列已初始化:则步骤同addWaiter,此时的原尾节点即为初始化时新建的那个空node

2.1.3 acquireQueued()

通过 addWaiter() 方法把线程添加到链表后,会接着把 Node 作为参数传递给acquireQueued()方法去竞争锁。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();// 获取当前节点的前一个节点
            if (p == head && tryAcquire(arg)) { // 如果是head节点,说明当前节点有资格去竞争锁,并tryAcquire()尝试获取锁
                setHead(node); // 获取锁成功后,设置head为当前线程的node,当前线程获得执行权限
                p.next = null; // 把原 head 节点从链表中移除(将其next指向null)
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())//若tryAcquire()尝试获取锁失败,则说明前一个线程还未释放锁
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);// 最终如果获取锁失败,则取消获得锁的操作
    }
}

若前一个线程还未释放锁,则会通过shouldParkAfterFailedAcquire()方法根据waitStatus来判断是否需要挂起这个线程,若需要挂起,则通过parkAndCheckInterrupt()挂起。

同步组件可以通过重写tryAcquire()方法来获取锁。

获取到锁之后的setHead()方法:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

shouldParkAfterFailedAcquire()方法:根据waitStatus来判断是否需要挂起这个线程,Node 有 5 中状态:

  • CANCELLED(1):在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该线程的Node。其结点的 waitStatus 为 CANCELLED,即结束状态,进入该状 态后的结点将不会再变化。

  • SIGNAL(-1):只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程。

  • CONDITION(- 2):和 Condition 有关系。

  • PROPAGATE(-3):共享模式下,PROPAGATE 状态的线程处于可运行状态。

  • 默认初始状态(0)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; //获取前一个线程node的waitStatus状态
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;// 为-1则表示可以放心挂起当前线程
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.意味着prev节点取消了排队,直接移除这个节点
         */
        do {
            node.prev = pred = pred.prev; // pred=pred.prev;node.prev=pred;
        } while (pred.waitStatus > 0);// 循环把 CANCELLED 状态的节点移除
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);// 修改 pred 节点的状态为 SIGNAL(-1)
    }
    return false;
}

parkAndCheckInterrupt()方法:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

通过 LockSupport.park 挂起当前线程。

2.2 释放独占锁

2.2.1 release()方法

public final boolean release(int arg) {
    if (tryRelease(arg)) {// tryRelease()释放锁
        Node h = head; //获取head节点
        if (h != null && h.waitStatus != 0)// 前头节点不为空且waitStatus不是0的话
            unparkSuccessor(h);// 唤起head的后继节点的线程
        return true;
    }
    return false;
}

2.2.2 unparkSuccessor()方法

private void unparkSuccessor(Node node) {

    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;//head节点的后继节点
    if (s == null || s.waitStatus > 0) { //如果后继节点为null或者status>0(cancelled)状态
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
          //从尾部节点开始扫描,找到距离head最近的一个waitStatus<=0的节点
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//next节点不为空,直接唤醒这个线程
}

为什么从尾部节点开始扫描:

在线程unparkSuccessor()的同时,若此时发生了入队操作:

node.prev = pred;// 把当前线程的Node的prev指向原尾节点
if (compareAndSetTail(pred, node)) {// 通过cas把node加入到AQS队列,也就是将其设置为tail
  pred.next = node;// 设置成功以后,把原尾节点的 next 指向当前 node
  return node;
}

在CAS操作compareAndSetTail(pred, node)之后,新增的node已经成为新的tail,但是老的tail又为将它的next指向新增的node(即pred.next = node;还未执行),此时若由head节点开始循环,那么新增的tail则有可能循环扫描不到。所以要从tail开始循环。

2.3 可中断式获取锁

Lock相对于synchronized有一些更方便的特性,比如响应中断、超时等待等。响应中断式表示线程中断时会抛出异常,其源码就是AQS中的

acquireInterruptibly方法:

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))//线程获取锁失败
        doAcquireInterruptibly(arg);
}

doAcquireInterruptibly(arg)方法:

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这个方法和之前的 acquireQueued()大致相同,区别在于获取锁失败后,挂起线程时(此时会有线程中断),中断后会抛出异常。

2.4 超时等待式获取锁

在AQS中体现在tryAcquireNanos()方法:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

doAcquireNanos方法:同样和acquireQueued() 方法类似,但是多了nanosTimeout计时。

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;//计算截止时间
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { //同acquireQueued()类似,当前线程获得锁出队列
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)//判断是否已超时
                return false;//已经超时返回false
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);//线程未超时则阻塞等待
            if (Thread.interrupted())
                throw new InterruptedException();//线程被中断则抛出被中断异常
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

LockSupport.parkNanos(this, nanosTimeout)阻塞等待时间结束。

3. 共享锁

3.1 共享锁的获取

共享锁在同一时刻可以允许多个线程访问,但是在写线程访问时,所有的读线程和其他写线程都会被阻塞。共享锁维护了一个读锁、一个写锁。当执行读操作的时候,获取读锁,读锁不会被阻塞,因为读操作不会影响执行结果。在执行写操作时,获取写锁,若已经有线程持有写锁的情况下,当前线程会被阻塞,只有当写锁释放以后,其他读写操作才能继续执行。使用读写锁提升读操作的并发性,也保证每次写操作对所有的读写操作的可见性,比如ReentrantReadWriteLock就是读写锁。

  • 读锁与读锁可以共享
  • 读锁与写锁不可以共享(排他)
  • 写锁与写锁不可以共享(排他)

3.1.1 acquireShared()方法

AQS通过acquireShared()方法来获取共享锁:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

首先调用tryAcquireShared方法,tryAcquireShared返回值是一个int类型,当返回值为大于等于0的时候方法结束说明获得成功获取锁,否则,表明获取锁失败,会执行doAcquireShared方法

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();//获取前一个结点
            if (p == head) {//如果前一个节点是head节点的话,则说明当前节点有资格获取锁
                int r = tryAcquireShared(arg);//获取锁
                if (r >= 0) {//获取到了锁
                    setHeadAndPropagate(node, r);//设置当前结点为头结点,然后去唤醒后续结点
                    p.next = null; // help GC 释放头结点,等待GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

addWaiter(Node.SHARED)和独占锁的addWaiter类似,区别是Node的模式不同。

3.2 共享锁释放

3.2.1 releaseShared()方法

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

tryReleaseShared尝试释放锁,成功则执行doReleaseShared方法:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);//唤醒后继节点
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS CAS失败则重试
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

在共享式锁的释放过程中,对于能够支持多个线程同时访问的并发组件,必须保证多个线程能够安全的释放同步状态,这里采用的CAS保证,当CAS操作失败continue,在下一次循环中进行重试。


参考:

并发编程

《java并发编程的艺术》

文章已创建 17

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部