码农翻身

深入AQS共享模式实现原理

- by MRyan, 2023-04-02


文章已收录至精进并发编程系列 此系列其它文章 https://www.wormholestack.com/tag/并发/

阅读本文之前建议先阅读《深入AQS独占模式实现原理》一文,掌握 AQS 的基本结构与方法功能。

0. 序

我们都知道 Semaphore 是一种共享锁,所以本文会通过 Semaphore 为切入点,来了解 AQS 是如何被使用的,AQS 的共享模式是如何实现的,接着通过解读源码来剖析 AQS 的实现原理,深入 AQS,吃透 AQS。

1. 前置知识

阅读本文之前建议先阅读《深入AQS独占模式实现原理》一文,掌握 AQS 的基本结构与方法功能。

2. Semaphore

Semaphore 称为计数信号量,它允许 N 个任务同时访问某个资源,可以将信号量看做是在向外分发使用资源的许可证,只有成功获取许可证,才能使用资源。

简单来说 Semaphore 是一个控制访问多个共享资源的计数器,其本质上是一个共享锁

举一个生活中的例子帮助各位读者理解 Semaphore,停车场有 10 个停车位,已经占满了 9 个停车位,现在有 2 辆车来到了停车场门口,因为还剩 1 个停车位,所以只会有 1 辆车成功进入停车场(这就取决于公平策略还是非公平策略),另一辆则需要在外面等待,直到停车场有车开出去,里面有空位了,才会被放进去。

停车场就好比 Semaphore,停车场 10 个停车位就好比 Semaphore 的许可证有 10 个,车辆好比线程,每当车辆进入停车场,许可证就会 - 1,当许可证为 0,这表示停车场没有车位了,就需要剩余的车辆在外面等待,每当车辆驶出停车场,许可证就会 + 1,表示有了一个空车位。

Semaphore 就是通过 acquire() 方法获取一个许可,通过 release() 释放一个许可。

2.1 类结构

Semaphore 实现了 AQS 的共享模式,同 ReentrantLock 类结构相同,内部类有 Sync、NonfairSync、FairSync 三个,拥有公平锁和非公平锁的实现,NonfairSync(非公平锁) 与 FairSync(公平锁) 类继承自 Sync 类,Sync 类继承自 AbstractQueuedSynchronizer 抽象类。

Semaphore 默认构造器选择非公平锁 创建具有给定的许可数的 Semaphore。

 public Semaphore(int permits) {
        sync = new NonfairSync(permits); 
    }

2.2 许可获取

Semaphore 提供了 acquire() 方法,来获取一个许可。

 public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

调用 AQS 的 acquireSharedInterruptibly(int arg) 方法,该方法以共享模式获取同步状态

 public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
               // 线程中断 抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        // tryAcquireShared < 0 会阻塞等待
        if (tryAcquireShared(arg) < 0) 
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared(arg) 方法由子类实现,这取决于选择了公平锁 or 非公平锁实现,该方法返回值 < 0,表示许可不足,会阻塞等待,执行 doAcquireSharedInterruptibly(); 方法。

公平锁实现

我们先来看 FairSync(公平锁)是如何实现 tryAcquireShared(arg) 方法的

    protected int tryAcquireShared(int acquires) {
            for (;;) {
              // 阻塞队列中有没有其他线程在排队 返回true 代表有其它线程在当前线程之前优先排队,返回false 表示可以获取锁
                if (hasQueuedPredecessors()) 
                    return -1;
                   // 获取当前的信号量许可
                int available = getState();
              // 设置获得acquires个信号量许可之后,剩余的信号量许可数
                int remaining = available - acquires; 
                if (remaining < 0 ||
                     // CAS设置信号量
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

从上述方法中我们看到了熟悉的方法 getState(),它是获取 state 变量的方法,state 代表的是,剩余可获取的许可数。

tryAcquireShared(arg) 方法通过自旋获取剩余信号量许可,首先通过 hasQueuedPredecessors() 判断阻塞队列中有没有其他线程在排队,如果有排队的线程则获取锁失败,因为它是公平锁。

hasQueuedPredecessors() 实现如下:

// 判断当前线程是否位于CLH同步队列的第一个,也就是阻塞队列中有没有其他线程在排队 返回true 代表有其它线程在当前线程之前优先排队  
public final boolean hasQueuedPredecessors() { 
              // 尾结点
        Node t = tail; 
        // 头节点
        Node h = head; 
        Node s;
     // 头节点!=尾结点,同步队列第一个节点不为null,当前线程不是同步队列第一个节点
        return h != t && 
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

非公平锁实现

说完公平锁实现,我们来看NonfairSync(非公平锁)是如何实现 tryAcquireShared(arg) 方法的

protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }

nonfairTryAcquireShared 代码实现如下:

   final int nonfairTryAcquireShared(int acquires) {
        // 不需要判断当前线程是否位于 CLH 同步队列列头
            for (;;) { 
              // 获取当前的信号量许可
                int available = getState(); 
              // 设置获得acquires个信号量许可之后,剩余的信号量许可数
                int remaining = available - acquires; 
                if (remaining < 0 ||
                    // CAS设置信号量
                    compareAndSetState(available, remaining)) 
                    return remaining;
            }
        }

和公平锁不同,非公平锁不需要判断阻塞队列中是否有其他线程,而是直接通过 CAS 设置信号量许可。


继续回到 acquireSharedInterruptibly(int arg) 方法,当 tryAcquireShared(arg) 返回值为负数时,代表需要阻塞等待。

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 共享模式 入队列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
              // 获取当前节点的前驱节点
                final Node p = node.predecessor(); 
                if (p == head) {
                   // 共享模式获取许可
                    int r = tryAcquireShared(arg);
                  // r>=0说明此时还有锁资源(等于0说明锁资源被当前线程拿走后就没了),设置头节点,并且通知后面的节点也获取锁资源。独占锁和共享锁的差异点就在于此,共享锁在前一个节点获取资源后,会通知后续的节点也一起来获取
                    if (r >= 0) { 
                      // 设置当前节点为头结点并且唤醒后继节点
                        setHeadAndPropagate(node, r); 
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
              // 对于 Semaphore 而言,如果 tryAcquireShared 返回小于 0 时,则会阻塞等待。
                if (shouldParkAfterFailedAcquire(p, node) && 
                    // 和独占模式一样,将CLH队列中当前节点之前的一些CANCELLED状态的节点剔除;前一个节点状态如果 为SIGNAL时,就会阻塞当前线程。不同的是,这里会抛出异常,而不是独占模式的会设定中断位为true, 即响应中断模式,如果线程被中断了会抛出InterruptedException
                    parkAndCheckInterrupt()) 
                    throw new InterruptedException();
            }
        } finally {
           // 如果线程被中断后唤醒,就会取消当前线程获取锁资源的请求 
          if (failed)
            // 如果获取到同步状态,则将节点移出同步队列
                cancelAcquire(node); 
        }
    }

来看上述代码的实现

首先 addWaiter(Node.SHARED) 以共享模式将把当前线程包装成 Node 节点,并把 Node 加到 CLH 阻塞队列的最后面,至于 addWaiter() 方法的实现原理在上节《深入AQS独占模式实现原理》中讲过,这里就不在详细阐述了。

private Node addWaiter(Node mode) {
        // 创建一个 Node 节点,将线程对象存储在 Node 节点中
        Node node = new Node(Thread.currentThread(), mode); 
        // 记录原尾节点
        Node pred = tail; 
          // 说明有线程已经在该锁上等待
        if (pred != null) { 
          // 添加新节点到队列尾部
            node.prev = pred; 
              // CAS把自己设置为队尾, 如果成功后,tail == node 了,这个节点成为阻塞队列新的尾巴
            if (compareAndSetTail(pred, node)) { 
              // cas成功,原尾结点的下一个节点为新节点
                pred.next = node; 
              // 线程入队了,返回节点
                return node; 
            }
        }
       // 直到 enq 方法的场景等待队列为空,或者cas失败有线程竞争入队。那么自旋入阻塞队列 则重试直到成功
        enq(node); 
        return node;
    }

这里需要注意的是链表的操作在多线程环境中是不安全的,所以这里引入了 CAS 机制,通过 if (compareAndSetTail(pred, node)) 来将自己设置为队尾,更新成功则结束,否则 enq(node) 无限重试直到更新成功。

接着我们回到 doAcquireSharedInterruptibly(int arg) 方法中继续解析,获取当前节点的前驱节点,只有前一个节点是头节点,也就是当前节点是实际上的第一个等待着的节点的时候才尝试获取同步状态,仍然是 int r = tryAcquireShared(arg); 通过 tryAcquireShared()方法共享模式获取同步状态,只有 当 r >= 0 时,才调用 setHeadAndPropagate(node, r); 设置头节点,并通知后面的节点也获取锁资源。


我们来看 setHeadAndPropagate(node, r); 这个关键代码的实现:

   private void setHeadAndPropagate(Node node, int propagate) {
     // 记录旧head节点
        Node h = head; 
     // 执行完setHead方法后,node节点成为新的head节点
        setHead(node); 
        // 唤醒当前 node 之后的节点,即 t2 已经醒了,马上唤醒 t3,如果 t3 后面还有 t4,那么 t3 醒了以后,马上将 t4 给唤醒了       
          // 两种情况,一是propagate > 0 表示还有剩余锁资源,说明同步状态还能被其他线程获取
        // 判断原来的或者新的首节点,等待状态为 Node.PROPAGATE 或者 Node.SIGNAL 时,可以继续向下唤醒。
        if (propagate > 0 || h == null || h.waitStatus < 0 || 
            (h = head) == null || h.waitStatus < 0) { 
            Node s = node.next;
          // 没有后继节点或者后继节点是共享类型,进行唤醒
            if (s == null || s.isShared()) 
              // 唤醒后续的共享式获取同步状态的节点
                doReleaseShared(); 
        }
    

具体是否会调用 doReleaseShared 方法还需要判断 node 是否是最后一个节点或者 node 的下一个节点是否是共享节点,才去唤醒。

判断 s 是否为 null 一方面是防止空指针异常,另一方面是如果 node 是 CLH 队列中的最后一个节点的话,这个时候虽然拿到的 s 是 null,但如果此时有其他的线程在 CLH 队列中新添加了一个节点后,此处并不能及时感知到这个变化,于是此时也会走进 doReleaseShared 方法中去处理这种情况。

同时这里会特殊判断共享节点是因为 CLH 队列中可能会存在独占节点和共享节点共存的场景出现,也就是ReentrantReadWriteLock 读写锁的场景,这里会一直传播唤醒共享节点直到遇到一个独占节点为止,后面的节点不管是独占或共享状态都不会再被唤醒了。


继续来看 doReleaseShared() 方法的实现,这个方法实际就是唤醒需要被唤醒的后续节点。

  private void doReleaseShared() {
        for (;;) {
          // 唤醒操作从头结点开始
            Node h = head; 
           // h != null && h != tail说明此时CLH队列中至少有两个节点(包括空节点),即至少含有一个真正在等待着的节点
            if (h != null && h != tail) {
                int ws = h.waitStatus;
               // 如果后继节点需要被唤醒,所以将头节点的状态SIGNAL改为0(因为SIGNAL表示的是下一个节点是阻塞状态)
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;           
                  // 唤醒下一个可以被唤醒的节点
                    unparkSuccessor(h); 
                }
              // head节点的状态为0就说明此时是一个中间过渡状态 说明此时在doReleaseShared方法也就是本方法中有多个线程在同时调用着
                else if (ws == 0 &&
                         // 引入了 PROPAGATE 节点状态 
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;               
            }
            if (h == head)              
                break;
        }
    }

你会发现这里出现了一个陌生的节点状态 Node.PROPAGATE,我们先保持疑问,往下看,下文会解释。

// waitStatus状态为 -3,共享模式释放锁才会使用
static final int PROPAGATE = -3; 

2.3 许可释放

获取了许可,当用完之后就需要释放,Semaphore 提供 release() 方法,来释放许可。

代码如下:

  public void release() {
    // 获取了许可,当用完之后就需要释放
        sync.releaseShared(1); 
    }
// 会调用 Semaphore 内部类 Sync 的 #tryReleaseShared(int arg) 方法,释放同步状态
public final boolean releaseShared(int arg) { 
  // 释放锁资源,也就是做state+1的操作
        if (tryReleaseShared(arg)) { 
          // 唤醒后续可以被唤醒的节点 
            doReleaseShared(); 
            return true;
        }
        return false;
    }

releaseShared(int arg) 方法,会调用 Semaphore 内部类 Sync 的 tryReleaseShared(int arg) 方法,释放同步状态。

protected final boolean tryReleaseShared(int releases) { 
            for (;;) {
                int current = getState();
              // 信号量的许可数 = 当前信号许可数 + 待释放的信号许可数
                int next = current + releases; 
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
              // 设置可获取的信号许可数为next
                if (compareAndSetState(current, next)) 
                    return true;
            }
        }

如该方法返回 true 时,代表释放同步状态成功,从而在 releaseShared(int args) 方法中,调用 doReleaseShared() 方法,可唤醒阻塞等待 Semaphore 的许可的线程,从这里就可以看出,在共享锁模式下,不仅释放锁的方法可以唤醒节点,加锁的方法也会触发唤醒后续节点的操作。

2.4 引入PROPAGATE 状态解决 BUG

纵观整个 AQS 的源码,只有在 doReleaseShared 方法中具体用到了 PROPAGATE 这个状态,不是很准确的说 PROPAGATE 就是为共享模式准备的。

你也许会惊讶的一点,其实在早期版本的 AQS 源码中是没有 PROPAGATE 这个状态的,之所以要引入它是为了解决一个 bug(JDK-6801020)

下面让我们来分析下为什么当年会产生 bug,又为什么会引入 PROPAGATE 这个状态呢,引入了 PROPAGATE 状态是如何解决这个 bug 的。

这个链接就是当时的 bug 记录《BUG JDK-6801020》,也就是并发调用 Semaphorerelease 方法,某些情况下同步队列中排队的线程仍不会被唤醒。

这里我列举了早期版本的实现,我们来对比下差异,感兴趣的小伙伴可以看这个《早期JDK AQS 代码差异》

  private void setHeadAndPropagate(Node node, int propagate) {
    setHead(node);
    if (propagate > 0 && node.waitStatus != 0) {
        Node s = node.next;
        if (s == null || s.isShared())
           unparkSuccessor(node);
    }
  }

   public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
       return true;
   }
   return false;
 }

早期版本的实现相比于现在的实现来说简单,总结起来最主要区别有:

  1. 在 setHeadAndPropagate 方法中,对比早期版本节点 waitStatus 状态的判断改成了 < 0(这个是解决 bug 的关键),而早期版本是 != 0,这样可以让首节点的等待状态为 Node.PROPAGATE 或者 Node.SIGNAL 时,可以继续向下唤醒节点。
  2. 早期版本的 releaseShared 方法中的执行逻辑和独占锁下的 release 方法是一样的,而现在将具体的唤醒逻辑写在了 doReleaseShared 方法里面。

我们来看 JDK 官方给的能检测出 bug 的 case 代码,我们来分析下产生 bug 的原因。

import java.util.concurrent.Semaphore;

public class TestSemaphore {

    /**
     * Semaphore 初始状态为 0
     */
    private static final Semaphore SEM = new Semaphore(0);

    private static class Thread1 extends Thread {
        @Override
        public void run() {
            // 获取 1 个许可,会阻塞等待其他线程释放许可,可被中断
            SEM.acquireUninterruptibly();
        }
    }

    private static class Thread2 extends Thread {
        @Override
        public void run() {
            // 释放 1 个许可
            SEM.release();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10000000; i++) {
            Thread t1 = new Thread1();
            Thread t2 = new Thread1();
            Thread t3 = new Thread2();
            Thread t4 = new Thread2();
            t1.start();
            t2.start();
            t3.start();
            t4.start();
            t1.join();
            t2.join();
            t3.join();
            t4.join();
            System.out.println(i);
        }
    }
}

简单分析下上述 case 代码的作用:

Semaphore 初始许可为 0,同时运行 4 个子线程,分别为 t1,t2,t3,t4。

线程 t1 和 t2 用 acquireUninterruptibly() 获取信号量,而线程 t3 和 t4 用于 release() 释放信号量,其中 10000000 次的循环是为了放大出现 bug 的几率,join 操作是为了阻塞主线程。

我们假设线程 t1 和 t2 先获取许可,因为初始许可为 0,所以 t1 和 t2 想要获取信号量但获取不到被阻塞在 CLH 阻塞队列中,假设此刻的阻塞队列是这样的:

head <=> node1(t1) <=> node2(t2)<=> tail

下面线程开始并发执行

  1. 时刻 1:线程 t3 调用 release 方法释放许可,将 state + 1 变为 1,同时发现此时的 head 节点不为 null 并且 waitStatus 为 -1,于是继续调用 unparkSuccessor 方法,在该方法中会将 head 的 waitStatus 改为 0。
  2. 时刻 2:线程 t1 被上面 线程 t3 调用的 unparkSuccessor 方法所唤醒,调用了 tryAcquireShared 方法,将 state - 1 又变为了 0。注意,此时还没有调用接下来的 setHeadAndPropagate 方法。
  3. 时刻 3:线程 t4 调用 release 方法释放许可,将 state + 1 变为 1,同时发现此时的 head 节点虽然不为 null,但是 waitStatus 为 0,所以就不会执行 unparkSuccessor 方法。
  4. 时刻 4:线程 t1 执行 setHeadAndPropagate -> setHead,将头节点置为自己。但在此时 propagate(propagate 是在时刻 2 时通过传参的方式传进来的,那个时候 -1),也就是剩余的 state 已经为 0 了,所以不会执行 unparkSuccessor 方法。

至此可以发现一轮循环走完后,我们发现阻塞队列中的 线程 t2 永远不会被唤醒,主线程也就会卡在 t2.join(); 处一直在阻塞中。


后来 AQS 引入了 PROPAGATE 状态后,在面对同样的场景下我们来看看它是如何解决这个问题的:

  1. 时刻 1:线程 t3 调用 release 方法释放许可,将 state + 1 变为 1,继续调用doReleaseShared 方法,将 head 的 waitStatus 改为 0,同时调用 unparkSuccessor 方法;
  2. 时刻 2:线程 t1 被上面线程 t3调用的 unparkSuccessor 方法所唤醒,调用了tryAcquireShared,将 state - 1 又变为了 0。注意,此时还没有调用接下来的setHeadAndPropagate 方法;
  3. 时刻 3:线程 t4 调用 release 方法释放许可,将 state + 1 变为1,同时继续调用doReleaseShared 方法,此时会将 head 的 waitStatus 改为 PROPAGATE
  4. 时刻 4:线程 t1 执行 setHeadAndPropagate -> setHead,将新的 head 节点置为自己。虽然此时 propagate 依旧是 0,但是 h 现在是 PROPAGATE 状态所以 h.waitStatus < 0 这个条件是满足的,同时下一个节点也就是线程 t2 也是共享节点,所以会执行 doReleaseShared 方法,将新的 head节点(线程 t1)的 waitStatus 改为 0,同时调用 unparkSuccessor 方法,此时也就会唤醒线程 t2了。

这样其实就在引入了 PROPAGATE 状态后,有效避免在高并发场景下可能出现线程没有被成功唤醒的问题。


这里总结下其实 PROPAGATE 状态的出现,是为了创造出一种区别于 SIGNAL 状态的另外一种状态。

这是因为 SIGNAL 状态的定义是代表后驱节点是阻塞状态,所以这里不能用 SIGNAL 状态来代替。

这个时候将 head 节点由原来的 0 置为 PROPAGATE 状态,以此来保证之前的那些线程也可以读取到此时旧的 head 节点状态是 PROPAGATE,因为 PROPAGATE = -3,它是 < 0 的依然可以调用到 doReleaseShared 方法继续去唤醒下一个节点。

3. 参考资料

https://www.zhihu.com/question/295925198?sort=created
https://www.cnblogs.com/sunddenly/articles/15186083.html

作者:MRyan


本文采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
转载时请注明本文出处及文章链接。本文链接:https://www.wormholestack.com/archives/708/
2024 © MRyan 61 ms