一、什么是AQS

AQS AbstractQueuedSynchronizer

AQS 是一种抽象的队列同步器, 它是 java 除了 synchroized 之外自带的一种锁机制, 底层使用了大量的 CAS 进行互斥.

  • 共享锁
  • 互斥锁

AQS原理

以ReentrantLock为例, 在我们执行lock()方法进行上锁的时候, 如果有多个线程同时访问上锁的资源, 其底层会对AQS中的共享变量state进行一个CAS操作, 该操作是具有原子性, 如果线程A通过CAS将state状态修改成功就会获取到锁, 并记录一个共享变量设为A线程独享,则线程B尝试获取锁时由于锁状态state已经发生改变不再为0, 则就会把线程B存入一个FIFO的双向链表中, 该链表主要存储的是没有获取到锁的线程, 然后将里面的线程阻塞, 等到线程A释放锁完毕之后, 再唤醒队列中等待的线程.

由于AQS是自旋锁, 在等待唤醒的时候, 会不停的使用while(cas())的方式, 不停的尝试获取锁.

AQS中为什么采用双向链表,它和单向链表相比,有什么优势?

ASQ中使用双向链表更容易访问相邻的节点.

二、Lock

重入锁 -> 互斥锁

lock()

  • 公平锁和非公平锁

    • 公平锁

      final void lock() {
      acquire(1);
      //抢占1把锁 .
      }
      public final void acquire(int arg) {
      -> AQS里面的方法
      if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
      }
      protected final Boolean tryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
       //表示无锁状态
       if (!hasQueuedPredecessors() &&
       compareAndSetState(0, acquires)) {
           //CAS(#Lock) -> 原子操作| 实现互斥 的判断
           setExclusiveOwnerThread(current);
           //把获得锁的线程保存到 exclusiveOwnerThread中
           return true;
       }
      }
      //如果当前获得锁的线程和当前抢占锁的线程是同一个,表示重入 else if (current == getExclusiveOwnerThread()) {
       int nextc = c + acquires;
       //增加重入次数 .      if (nextc < 0)
       throw new Error("Maximum lock count exceeded");
       setState(nextc);
       //保存state
       return true;
      }
      return false;
      }
    • 非公平锁

      final void lock() {
        //不管当前AQS队列中是否有排队的情况,先去插队
        if (compareAndSetState(0, 1)) //返回false表示抢占锁失败 setExclusiveOwnerThread(Thread.currentThread()); else
        acquire(1);
      }
      public final void acquire(int arg) {
        --AQS
        if (!tryAcquire(arg) &&                             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          selfInterrupt();
      }
      protected final Boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
      }
      final Boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            //hasQueuedPredecessors
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
      }

加入队列并进行自旋等待

  • acquire(int arg)

    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }
  • acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

    • addWaiter(Node.EXCLUSIVE) -> 添加一个互斥锁的节点

      private Node addWaiter(Node mode) {
          //把当前线程封装成一个Node节点。
          Node node = new Node(Thread.currentThread(), mode);
          //后续唤醒线程的时候,需要 得到被唤醒的线程 .
          // Try the fast path of enq; backup to full enq on failure
          Node pred = tail;
          //假设不存在竞争的情况
          if (pred != null) {
              node.prev = pred;
              if (compareAndSetTail(pred, node)) {
                  pred.next = node;
                  return node;
              }
          }
          enq(node);
          return node;
      }
      private Node enq(final Node node) {
          for (;;) {
              //自旋
              Node t = tail;
              if (t == null) {
                  // Must initialize //初始化一个head节点
                  if (compareAndSetHead(new Node())) tail = head;
              } else {
                  node.prev = t;
                  if (compareAndSetTail(t, node)) {
                      = node;
                      t;
                  }
              }
          }
      }
    • acquireQueued() -> 自旋锁和阻塞的操作

      //node表示当前来抢占锁的线程,有可能是ThreadB、 ThreadC。。
      final Boolean acquireQueued(final Node node, int arg) {
        Boolean failed = true;
        try {
            Boolean interrupted = false;
            for (;;) {
                //自旋
                //begin  ->尝试去获得锁(如果是非公平锁)
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    //如果返回true,则不需要等待,直接返 回。
                    setHead(node);
                    p.next = null;
                    // help GC
                    failed = false;
                    return interrupted;
                }
                //end
                //否则,让线程去阻塞(park)
                if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) //LockSupport.park
                interrupted = true;
            }
        }
        finally {
            if (failed)
            cancelAcquire(node);
        }
      }
      //ThreadB、 ThreadC、ThreadD、ThreadE  -> 都会阻塞在下面这个代码的位置 .
      private final Boolean parkAndCheckInterrupt() {
        LockSupport.park(this); //被唤醒 . (interrupt()->)
        return Thread.interrupted(); //中断状态(是否因为中断被唤醒的 .)
      }

unlock

  • release(int arg)

    public final Boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        //得到当前AQS队列中的head节点。
        if (h != null && h.waitStatus != 0) //head节点不为空
        unparkSuccessor(h);
        //
        return true;
    }
    return false;
    }
  • unparkSuccessor(Node node)

    private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0) //表示可以唤醒状态
    compareAndSetWaitStatus(node, ws, 0);
    //恢复成0
    Node s = node.next;
    if (s == null | | s.waitStatus > 0) {
        //说明ThreadB这个线程可能已经被销毁,或 者出现异常 ...
        s = null;
        //从tail -> head进行遍历 .
        for (Node t = tail; t != null && t != node; t = t.prev)
        if (t.waitStatus <= 0) //查找到小于等于0的节点
        s = t;
    }
    if (s != null)
    LockSupport.unpark(s.thread);
    //封装在Node中的被阻塞的线程。ThreadB、
    ThreadC。
    }

1. ReentrantLock

ReentrantLock的实现原理

满足线程的互斥特性

意味着同一个时刻,只允许一个线程进入到加锁的代码中。 -> 多线程环境下,线程的顺序访问。

锁的设计猜想(如果我们自己去实现)

  • 一定会设计到锁的抢占 , 需要有一个标记来实现互斥。 全局变量(0,1)
  • 抢占到了锁,怎么处理(不需要处理.)
  • 没抢占到锁,怎么处理

    • 需要等待(让处于排队中的线程,如果没有抢占到锁,则直接先阻塞->释放CPU资源)。

      • 如何让线程等待?
      • wait/notify(线程通信的机制,无法指定唤醒某个线程)
      • LockSupport.park/unpark(阻塞一个指定的线程,唤醒一个指定的线程)
      • Condition
    • 需要排队(允许有N个线程被阻塞,此时线程处于活跃状态)。

      • 通过一个数据结构,把这N个排队的线程存储起来。
  • 抢占到锁的释放过程,如何处理

    • LockSupport.unpark() -> 唤醒处于队列中的指定线程.\
  • 锁抢占的公平性(是否允许插队)

    • 公平
    • 非公平

ReentrantLock的实现原理分析

2. Wait/Notify

线程的通信 -> 共享内存

Wait/Notify 不是 J.U.C 包下的 -> 基于某个条件来等待或者唤醒

基于Wait/Notify 实现生产者/消费者

public class Consumer implements Runnable{
    private Queue<String> bags;
    private int maxSize;
    public Consumer(Queue<String> bags, int maxSize) {
        this.bags = bags;
        this.maxSize = maxSize;
    }
    @Override
    public void run() {
        while(true){
            synchronized (bags){
                if(bags.isEmpty()){
                    System.out.println("bags为空");
                    try {
                        bags.wait();
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    Thread.sleep(1000);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String bag=bags.remove();
                System.out.println("消费者消费: "+bag);
                bags.notify(); //这里只是唤醒Producer线程,但是Producer线程并不能马上执行。
            } //同步代码块执行结束, monitorexit指令执行完成
        }
    }
}
public class Producer implements Runnable {
    private Queue<String> bags;
    private int maxSize;
    public Producer(Queue<String> bags, int maxSize) {
        this.bags = bags;
        this.maxSize = maxSize;
    }
    @Override
    public void run() {
        int i=0;
        while(true){
            i++;
            synchronized (bags){
                //抢占锁
                if(bags.size()==maxSize){
                    System.out.println("bags 满了");
                    try {
                        //park(); ->JVM ->Native
                        bags.wait();
                        //满了,阻塞当前线程并且释放Producer抢到的锁 } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    Thread.sleep(1000);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("生产者生产:ag"+i);
                bags.add("bag"+i);
                //生产bag
                bags.notify();
                //表示当前已经生产了数据,提示消费者可以消费了 } //同步代码快执行结束
            }
        }
    }
}
public static void main(String[] args){
    Queue<String> message = new LinkedList<>();
    Producer producer = new Producer(message, 10);
    Consumer consumer = new Consumer(message, 10);
    producer.start();
    try {
        Thread.sleep(100);
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
    consumer.start();
}

3. join

join 也是基于 wait/notify来实现,notify是在线程销毁之后调用的,代码如下。

static void ensure_join(JavaThread* thread) {
    // We do not need to grap the Threads_lock, since we are operating on ourself.
    Handle threadObj(thread, thread->threadObj());
    assert(threadObj.not_null(), "java thread object must exist");
    ObjectLocker lock(threadObj, thread);
    // Ignore pending exception (ThreadDeath), since we are exiting anyway
    thread->clear_pending_exception();
    // Thread is exiting. So set thread_status field in  java.lang.Thread class to TERMINATED.
    java_lang_Thread::set_thread_status(threadObj(),
    java_lang_Thread::TERMINATED);
    // Clear the native thread instance - this makes isAlive return false and allows the join()
    // to complete once we've done the notify_all below
    java_lang_Thread::set_thread(threadObj(), NULL);
    lock.notify_all(thread);
    // Ignore pending exception (ThreadDeath), since we are exiting anyway
    thread->clear_pending_exception();
}

4. Condition

Condition 实际上就是 J.U.C 版本的 wait/notify 。可以让线程基于某个条件去等待和唤醒。

public class ConditionDemoWait implements Runnable{
    private Lock lock;
    private Condition condition;
    public ConditionDemoWait(Lock lock, Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }
    @Override
    public void run() {
        System.out.println("begin - ConditionDemoWait");
        lock.lock();
        try{
            condition.await();
            //让当前线程阻塞,Object.wait();
            System.out.println("end - ConditionDemoWait");
        }
        catch (Exception e){
            e.printStackTrace();
        }
        finally {
            lock.unlock();
        }
    }
}
public class ConditionDemeNotify implements Runnable{
    private Lock lock;
    private Condition condition;
    public ConditionDemeNotify(Lock lock, Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }
    @Override
    public void run() {
        System.out.println("begin - ConditionDemeNotify");
        lock.lock();
        //synchronized(lock)
        try{
            condition.signal();
            //让当前线程唤醒  Object.notify(); //因为任何对象都会
            有monitor
            System.out.println("end - ConditionDemeNotify");
        }
        catch (Exception e){
            e.printStackTrace();
        }
        finally {
            lock.unlock();
        }
    }
}

Condition设计猜想

  • 作用: 实现线程的阻塞和唤醒
  • 前提条件: 必须先要获得锁
  • await/signal;signalAll

    • await -> 让线程阻塞, 并且释放锁
    • signal -> 唤醒阻塞的线程
  • 加锁的操作,必然会涉及到AQS的阻塞队列
  • await 释放锁的时候, -> AQS队列中不存在已经释放锁的线程 -> 这个被释放的线程去了哪里?
  • signal 唤醒被阻塞的线程 -> 从哪里唤醒?

通过await方法释放的线程,必须要有一个地方来存储,并且还需要被阻塞; -> 会存在一个等待队列, LockSupport.park阻塞

signal -> 上面 猜想到的 等待队列中,唤醒一个线程,放哪里去?是不是应该再放到AQS队列?

5. CountDownLatch (倒计时器)

① 某一线程在开始运行前等待n个线程执行完毕。将 CountDownLatch 的计数器初始化为n :new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown(),当计数器的值变为0时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

②实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 :new CountDownLatch(1),多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。

CountDownLatch 的三种典型用法

  • ① 某一线程在开始运行前等待n个线程执行完毕。将 CountDownLatch 的计数器初始化为n :new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown(),当计数器的值变为0时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
  • ② 实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 :new CountDownLatch(1),多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。
  • ③ 死锁检测:一个非常方便的使用场景是,你可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。

CountDownLatch 的使用示例

public class CountDownLatchExample1 {
  // 请求的数量
  private static final int threadCount = 550;

  public static void main(String[] args) throws InterruptedException {
    // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
    ExecutorService threadPool = Executors.newFixedThreadPool(300);
    final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    for (int i = 0; i < threadCount; i++) {
      final int threadnum = i;
      threadPool.execute(() -> {// Lambda 表达式的运用
        try {
          test(threadnum);
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        } finally {
          countDownLatch.countDown();// 表示一个请求已经被完成
        }

      });
    }
    countDownLatch.await();
    threadPool.shutdown();
    System.out.println("finish");
  }

  public static void test(int threadnum) throws InterruptedException {
    Thread.sleep(1000);// 模拟请求的耗时操作
    System.out.println("threadnum:" + threadnum);
    Thread.sleep(1000);// 模拟请求的耗时操作
  }
}

上面的代码中,我们定义了请求的数量为550,当这550个请求被处理完成之后,才会执行System.out.println("finish");。

与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

其他N个线程必须引用闭锁对象,因为他们需要通知CountDownLatch对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。所以当N个线程都调 用了这个方法,count的值等于0,然后主线程就能通过await()方法,恢复执行自己的任务。

CountDownLatch 的不足

CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。

CountDownLatch相常见面试题:

解释一下CountDownLatch概念?

CountDownLatch 和CyclicBarrier的不同之处?

给出一些CountDownLatch使用的例子?

CountDownLatch 类中主要的方法?

最后修改:2022 年 06 月 23 日
如果觉得我的文章对你有用,请点个赞吧~