听说你想学Java并发编程?先把这个学了(3) -- 20210714

大家好,我是指北君。

这是AQS系列的第三篇文章,也是最后一篇,如果没看过AQS系列前两篇的童鞋,建议先去公号中的”Java并发编程”专栏中把前两篇看完。这篇主要是讲AQS是如何解决线程同步通信问题的。

我们在第一篇中说到AQS使用的是管程模型,而管程模型是使用条件变量来解决同步通信问题的。条件变量会有两个方法,唤醒和等待。当条件满足时,我们会通过唤醒方法将条件队列中的线程放入第二篇所说的同步队列中;如果不满足条件,则会通过等待方法将线程阻塞放入条件队列中。而AQS中通过ConditionObject类实现了条件变量,所以接下来我们就具体看看ConditionObject类吧。


一 属性

我们先看下ConditionObject中的属性

1
2
3
4
/** 链表头节点 */
private transient Node firstWaiter;
/** 链表尾节点 */
private transient Node lastWaiter;

开头说了,条件变量中会有一个条件队列,ConditionObject中的条件队列使用的是单向链表,firstWaiter和lastWaiter为头尾节点,节点也是使用AQS的内部类Node,但同步队列是个双向链表,条件队列是单向链表,所以条件队列使用的是Node类中的nextWaiter属性作为下一个节点的链接指针。

1
2
3
volatile Node prev;
volatile Node next;
Node nextWaiter;

我们可以注意到nextWaiter是没用volatile修饰的,这是因为线程在调用await方法进入条件队列时,是已经拥有了锁的。还有一点需要注意是,条件队列里面的Node只会存在CANCELLED和CONDITION的状态,有别于同步队列。


二 唤醒方法

2.1 signalAll

此方法是唤醒所有条件队列中的节点,即将条件队列中的所有节点都移动到我们第二篇所说的同步队列中,然后再去竞争锁,具体源码如下:

1
2
3
4
5
6
7
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

上面我们说了,要调用唤醒和等待方法,都需要此线程获取锁,首先我们会通过子类复写的方法isHeldExclusively来看此时的线程是否已经获得了锁。如果获得了锁,我们会判断条件队列的头节点是否为null,为null则说明条件队列中没有阻塞的Node;如果不为null,则会通过doSignalAll方法来将条件队列中的所有Node移动到同步队列中

2.1.1 doSignalAll

doSignalAll方法主要功能就是遍历条件队列里面的节点Node,然后通过transferForSignal方法将Node移动到同步队列中,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
       // 将next指向first的后继Node
        Node next = first.nextWaiter;
       // 切断first与后继Node的联系
        first.nextWaiter = null;
       // 将此node转移到同步队列中
        transferForSignal(first);
        // 将first指向first的后继Node
        first = next;
    // 在判断此时的first是否为null,不是则继续循环
    } while (first != null);
}

2.1.2 transferForSignal

transferForSignal主要功能就是将条件队列中的节点Node转移到同步队列中,源码如下:

1
2
3
4
5
6
7
8
9
10
11
final boolean transferForSignal(Node node) {
    // 说明此节点状态为CANCELLED,所以跳过该节点(GC会回收)
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    // 入队方法(独占锁获取中详细阐述过)
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread); 
    return true;
}

首先通过CAS来将Node的状态置为0,如果失败,则说明此时Node状态是CANCELLED,则直接返回false;如果Node状态成功置为了0,我们就通过enq方法将此节点入队到同步队列中,enq方法已经在第二篇文章中讲过,这里就不再复述了。enq方法执行完成后,说明node已经成功进入同步队列了,然后其返回的是入队的前驱节点,如果前驱节点是CANCELLED状态,或者我们将前驱节点的状态变为SIGNAL失败,则我们就需要唤醒此节点去抢锁。这个如果你看了第二篇文章,你肯定是能够想到的。

2.2 signal

看名字也能大概猜到,因为signalAll是将条件队列中所有的Node转移到同步队列中,所以signal肯定是转移单个Node。

1
2
3
4
5
6
7
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

我们可以对比上面的signalAll方法,其唯一不同点就是signalAll内部调用的是doSignalAll方法,而signal内部调用的是doSignal方法,我们接着来看doSignal:

1
2
3
4
5
6
7
8
9
10
private void doSignal(Node first) {
    do {
        // 将firstWaiter指向传入的first的后继节点,
        // 然后判断firstWaiter是否为null,
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

我们可以看到方法里面是个do-While的循环,我们首先将firstWaiter指向first的后继节点并判断是否为null,如果为空,则说明条件队列中只有first这一个节点,所以我们将整个队列清空。然后我们再将first的的nextWaiter指向null断开连接,进入while条件语句中。while条件语句中,会先调transferForSignal来转移Node,如果返回为false,即转移失败,我们会判断此节点下一个节点是否为null,不为null则又进入循环。


三 等待方法

唤醒方法wait,就是将线程阻塞包装成节点放入条件队列中,等到其他线程唤醒(signal)或者自身中断后再重新去获取锁。所以其又可以大致分为两个阶段,线程阻塞前和阻塞后。

3.1 await—阻塞前

我们先来看下await的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final void await() throws InterruptedException {
    // 如果此线程被中断过,直接抛中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 将当前线程包装成节点放入条件队列
    Node node = addConditionWaiter();
    // 释放当前线程持有的锁
    long savedState = fullyRelease(node);
    // 初始化中断模式参数
    int interruptMode = 0;
    // 检查节点是否在同步队列中
    while (!isOnSyncQueue(node)) {
       // 不在同步队列中则阻塞此线程
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 被唤醒后再去获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 当线程是被中断唤醒,node和后继节点是没有断开的
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    // 根据异常标志位对异常进行处理
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
3.1.1 addConditionWaiter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addConditionWaiter() {
    Node t = lastWaiter;
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

addConditionWaiter的大致逻辑为:lastWaiter不是null且它的等待状态不是CONDITION,说明lastWaiter的状态是CANCELLED,所以我们会通过unlinkCancelledWaiters方法来移除条件队列中所有CANCELLED的节点。然后我们会将当前线程包装成一个节点,我们再会判断尾节点是否为null,为null说明条件队列为空,所以我们就将firstWaiter指向新的节点;如果不为null,就将尾节点的后继节点指向新节点,然后再重置lastWaiter。最后将新节点返回。

3.1.2 fullyRelease

此时入队成功后,我们就会调用fullyRelease方法来释放当前线程所持有的锁了,我们具体看下源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final long fullyRelease(Node node) {
    boolean failed = true;
    try {
        long savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

其中释放锁成功调用的是release方法,这个方法在第二篇文章中详述过。如果释放锁成功,则将failed状态置为false,然后返回savedState状态,否则我们就会抛出异常。其中savedState是重入锁的数量,release方法会一起释放掉。

再看下finally,如果释放锁失败,我们此线程会抛异常终止,然后在finally将waitStatus置为CANCELLED,然后等待后面被移出条件队列。

3.1.3 isOnSyncQueue

isOnSyncQueue方法是检查此节点是否在同步队列中,具体源码如下:

1
2
3
4
5
6
7
final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;        
    return findNodeFromTail(node);
}

先看第一个if语句,如果状态是CONDITION或者prev参数是null,说明此节点是在条件队列中,返回为false。再来看第二个if,我们知道,prev和next都是同步队列中的节点连接是用的prev和next,所以如果两个属性不为null,说明此节点是在同步队列中,所以node.next不为null则需要返回true。如果两个if都不成立,说明这个节点状态是0且prev不为null,即属于我们中CAS进入同步队列的情况,则我们会通过findNodeFromTail方法来确认是不是这种情况

3.1.3.1 findNodeFromTail
1
2
3
4
5
6
7
8
9
10
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

如果此时tail就是node的话,说明node在同步队列中,如果不是就像前遍历。我们再回到await方法:

1
2
3
4
5
6
7
// 省略
while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}
// 省略

如果不在同步队列中,则此线程就被park方法阻塞了,只有当线程被唤醒才会在这里开始继续执行下面代码。

3.2 wait—唤醒后

我们再来看看await唤醒后的情形:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final void await() throws InterruptedException {
    // 省略。。。。
    while (!isOnSyncQueue(node)) {
       // 不在同步队列中则阻塞此线程
        LockSupport.park(this); // <----- 被唤醒后从下面开始
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 被唤醒后再去获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 当线程是被中断唤醒时,node和后继节点是没有断开的
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 根据异常标志位对异常进行处理
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

我们需要注意的是,线程在这里被唤醒有两种情况:

  1. 其他线程调用了doSignal或doSignalAll,
  2. 线程被中断。

我们需要确定我们被唤醒的情况是哪种,这里是通过checkInterruptWhileWaiting方法来判断。但在讲这个方法前,我们需先了解这个interruptMode有几种状态:

1
2
3
4
/** wait方法退出时,会重新再中断一次 */
private static final int REINTERRUPT =  1;
/** wait方法退出时,会抛出InterruptedException异常 */
private static final int THROW_IE    = -1;

除了上面两种,还有一种初始态0,它代表线程没有被中断过,不做任何处理。

3.2.1 checkInterruptWhileWaiting
1
2
3
4
5
private int checkInterruptWhileWaiting(Node node) {
   return Thread.interrupted() ?
       (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
       0;
}

我们看下代码,首先我们会检查中断标志位,如果interrupted方法返回false,说明没发生中断,方法最终返回0;如果返回了true,则说明中断了,则我们需要通过transferAfterCancelledWait方法进一步检查其他线程是否执行了唤醒操作。

3.2.1.1 transferAfterCancelledWait
1
2
3
4
5
6
7
8
9
10
11
final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { 
        enq(node);
        return true;
    }

    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;

}

我们先看第一个if条件,如果条件中的CAS操作成功,说明此时的节点肯定是在条件队列中,则我们调动 enq 方法将此节点放入到同步队列中,然后返回true。但是这里需要特别注意,这个节点的nextWaiter还没置为null;如果CAS失败,说明这个节点可能已经在同步队列中或者在入队的过程中,所以我们通过while循环等待此节点入队后返回false。

我们再回到调用transferAfterCncelled 的 checkInterruptWhileWaiting方法中,根据transferAfterCancelledWait方法返回值我们最终会返回REINTERRUPT或THROW_IE。

然后我们返回到调用checkInterruptWhileWaiting方法的await方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final void await() throws InterruptedException {
    // 代码省略
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); 
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 我们现在在这里!!!
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

我们可以看到,如果返回值不为0,则直接break跳出循环,如果为0,则再次回到while条件检查是否在同步队列中。最后我们看最后剩下的三个if语句:

  1. 通过acquireQueued方法来获取锁,这个方法在第二篇中详细讲过,acquireQueued返回true(即获取锁的的过程中被中断了),我们再将interruptMode为0置为REINTERRUPT。
  2. 如果node的nextWaiter不是null。我们会通过unlinkCancelledWaiters方法将条件队列中所有不为CONDITION的节点移除。
  3. 最后一个if,线程拿到锁了,且节点没在同步队列和条件队列中,await方法其实算完成了,我们这时候只需要对中断进行善后处理。如果interruptMode不为0,说明线程是被中断过的,需要通过reportInterruptAfterWait对中断进行处理。
3.2.1.2 reportInterruptAfterWait
1
2
3
4
5
6
7
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

如果是THROW_IE,就是抛异常,如果是REINTERRUPT,就再自我中断一次。


四 总结

好了,AQS如何解决线程同步与通信问题,指北君就分析完了,这里我再总结一下:

AQS通过ConditionObject类来实现条件变量,并通过其唤醒方法、阻塞方法来进行线程的通信。当线程获取锁之后,可以通过signal、signalAll等唤醒方法将条件队列中被阻塞的线程节点转移到同步队列中,然后唤醒去竞争锁;也可以通过wait方法将自己包装成节点并放入条件队列中,然后等待被其他线程唤醒或中断。

到这里,AQS系列就写完了,希望大家能好好体会AQS设计的精妙~

Java Geek Tech wechat
欢迎订阅 Java 技术指北,这里分享关于 Java 的一切。