八股篇(1):LocalThread、CAS和AQS

ThreadLocal

ThreadLocal 的作用

  • 线程隔离:ThreadLocal 为每个线程提供了独立的变量副本,这意味着线程之间不会互相影响,可以安全地在多线程环境中使用这些变量。
  • 降低耦合度:在同一个线程内的多个函数或组件之间,使用 ThreadLocal 可以减少参数的传递,降低代码之间的耦合度,使代码更加清晰和模块化。
  • 性能优势:由于 ThreadLocal 避免了线程间的同步开销,所以在大量线程并发执行时,相比传统的锁机制,它可以提供更好的性能。
public class ThreadLocalExample {
    private static ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);

    public static void main(String[] args) {
        Runnable task = () -> {
            int value = threadLocal.get();
            value += 1;
            threadLocal.set(value);
            System.out.println(Thread.currentThread().getName() + " Value: " + threadLocal.get());
        };

        Thread thread1 = new Thread(task, "Thread-1");
        Thread thread2 = new Thread(task, "Thread-2");

        thread1.start(); // 输出: Thread-1 Value: 1
        thread2.start(); // 输出: Thread-2 Value: 1
    }
}

ThreadLocal 原理了解吗?

public class Thread implements Runnable {
    //......
    //与此线程有关的ThreadLocal值。由ThreadLocal类维护
    ThreadLocal.ThreadLocalMap threadLocals = null;

    //与此线程有关的InheritableThreadLocal值。由InheritableThreadLocal类维护
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    //......
}

从上面的 Thread 类源代码可以看出 Thread 中有一个 threadLocals 和一个 inheritableThreadLocals 变量,它们都是 ThreadLocalMap 类型的变量。默认情况下这两个变量都是 null,只有当前线程调用 ThreadLocal 类的 setget 方法时才创建它们,实际上调用者两个方法的时候,我们调用的是 ThreadLocalMap 类对应的 get()set() 方法。

ThreadLocal 类的 set() 方法

public void set(T value) {
    //获取当前请求的线程
    Thread t = Thread.currentThread();
    //取出 Thread 类内部的 threadLocals 变量(哈希表结构)
    ThreadLocalMap map = getMap(t);
    if (map != null)
        // 将需要存储的值放入到这个哈希表中
        map.set(this, value);
    else
        createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

通过上面内容,我们足以通过猜测得出结论:最终的变量是放在了当前线程的 ThreadLocalMap 中,并不是存在 ThreadLocal 上,ThreadLocal 可以理解为只是 ThreadLocal 的封装,传递了变量值。ThreadLocal 类中可以通过 ThreadLocal.currentThread() 获取到当前线程对象后,直接通过 getMap(Thread t) 可以访问到该线程的 ThreadLocal 对象。

每个 Thread 中都具有一个 ThreadLocalMap,而 ThreadLocalMap 可以存储以 ThreadLocal 为 key,Object 对象为 value 的键值对。

ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    //......
}

比如我们在同一个线程中声明了两个 ThreadLocal 对象的话,Thread 内部都是使用仅有的那个 ThreadLocalMap 存放数据的,ThreadLocalMap 的 key 就是 ThreadLocal 对象,value 就是 ThreadLocal 对象调用 set 方法设置的值。

ThreadLocalMapThreadLocal 的静态内部类。

ThreadLocal 内存泄漏问题怎么导致的?

ThreadLocal 内存泄漏的根本原因在于其内部实现机制。
因为每个线程维护一个名为 ThreadLocalMap 的 map。当你使用 ThreadLocal 存储值时,实际上是将值存储在当前下称的 ThreadLocalMap 中,其中 ThreadLocal 实例本身作为 key,而要存储的值作为 value。

ThreadLocalMapEntry 定义如下:

static class Entry extends WeakReference<ThreadLocal<?>> {
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

ThreadLocalMap 的 key 和 value 引用机制:

  • key 是弱引用ThreadLocalMap 中的 ThreadLocal 的弱引用(WeakReference<ThreadLocal<>>)。这意味着,如果 ThreadLocal 实例不再被任何强引用指向,垃圾回收器会在下次 GC 时回收该实例,导致 ThreadLocalMap 中对应的 key 变为 null。
  • value 是强引用:即时 key 被 GC 回收,value 仍然被 ThreadLocalMap.Entry 强引用存在,无法被 GC 回收。

ThreadLocal 实例失去强引用后,其对应的 value 仍然存在于 ThreadLocalMap 中,因为 Entry 对象强引用了它。如果线程持续存活(例如线程池中的线程),ThreadLocalMap 也会一直存在,导致 key 为 null 的 entry 无法被垃圾回收,造成内存泄漏。
虽然 ThreadLocalMapget()set()remove() 操作时会尝试清理 key 为 null 的 entry,但这种清理机制是被动的,并不完全可靠。

如何避免内存泄漏的发生?

  1. 在使用完 ThreadLocal 后,必须调用 remove() 方法。这是最安全和最推荐的做法。remove() 方法会从 ThreadLocalMap 中显式地移除对应的 entry,彻底解决内存泄漏的风险。即使将 ThreadLocal 定义为 static final,也强烈建议在每次使用后调用 remove()
  2. 在线程池等线程复用的场景下,使用 try-finally 块可以确保即使发生异常,remove() 方法也一定会被执行。

CAS

什么是CAS?

CAS 即比较并交换(CompareAndSwap),它包含三个操作数:

  1. 变量内存地址,V表示
  2. 旧的预期值,A表示
  3. 准备设置的新值,B表示

执行 CAS 操作的时候,只有当V=A时,才会去用B去更新V的值,否则不会执行更新操作。CAS 是一条 CPU 的原子指令(cmpxchg),不会造成数据不一致的问题。Java 的 Unsafe 提供的 CAS 操作(CompareAndSwapXXX)底层实现即为CPU指令 cmpxchg

CAS有什么缺点?

  • ABA 问题:变量值在操作过程中先被其他线程由 A 修改为 B,又被改回 A,CAS无法感知中途变化,导致操作为误判为“未变更”。比如:

      线程1读取变量为`A`,准备改为`C`。
      此时线程2将变量`A` -> `B` -> `A`。
      线程1的 CAS 操作执行时发现变量仍为 `A`,单状态已丢失中间变化。
    

如何解决? Java 提供的工具类会在 CAS 操作中增加版本号(Stamp)或标记,每次修改都更新版本号,使得即使值相同也能识别变更历史。比如,可以用 AtomicStampedReference 来解决 ABA 问题,通过比对值和版本号识别 ABA 问题。

AtomicStampedReference<Integer> ref = new AtomicStampedReference<>(100, 0);

// 尝试修改并更新版本号
boolean success = ref.compareAndSet(100, 200, 0, 1);
// 前提:当前值等于100,且版本号等于0,才会更新为(200, 1),并返回 true
  • 循环时间长开销大:自旋 CAS 的方式如果长时间不成功,会给 CPU 带来很大的开销。
  • 只能保证一个共享变量的原子操作:只对一个共享变量操作可以保证原子性,但是多个则不行,多个可以通过 AtomicReference 或者锁 Synchronized 实现。

为什么不能所有的锁都用 CAS?

CAS 操作是基于循环重试的机制,如果 CAS 操作一直未成功,线程会一直自旋重试,占用 CPU 资源。在高并发场景下,大量线程自旋会导致 CPU 资源被浪费。

典型应用

在Unsafe类中,提供了 compareAndSwapObjectcompareAndSwapIntcompareAndSwapLong方法来实现对 Objectintlong 类型的CAS操作。以compareAndSwapInt为例:

public final native boolean compareAndSwapInt(Object o, long offset,int expected,int x);
参数中o为需要更新的对象,offset为这个对象中整形字段的偏移量,如果这个值与expected相同,则将字段的值设为x这个新值,并且此更新是不可中断的,也就是一个原子操作。下面是一个使用compareAndSwapInt的例子:
private volatile int a = 0; // 共享变量,初始值设为 0
private static final Unsafe unsafe;
private static final long fieldOffset;

static {
    try {
        // 获取 Unsafe 实例
        Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
        theUnsafe.setAccessible(true);
        unsafe = (Unsafe) theUnsafe.get(null);
        // 获取字段 a 的偏移量
        fieldOffset = unsafe.objectFieldOffset(CasTest.class.getDeclaredField("a"));
    } catch (Exception e) {
        throw new RuntimeException("Failed to initialize Unsafe or field offset", e);
    }
}

public static void main(String[] args) {
    CasTest casTest = new CasTest();

    Thread t1 = new Thread(() -> {
        for (int i = 1; i <= 4; i++) {
            casTest.incrementAndPrint(i);
        }
    });

    Thread t2 = new Thread(() -> {
        for (int i = 5; i <= 9; i++) {
            casTest.incrementAndPrint(i);
        }
    });

    t1.start();
    t2.start();

    // 等待线程结束,以便观察完整输出
    try {
        t1.join();
        t2.join();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

// 将递增和打印封装在一个强原子性的方法内
private void incrementAndPrint(int targetValue) {
    while(true) {
        int currentValue = a;   // 读取当前 a 的值
        if (currentValue == targetValue - 1) {
            if (unsafe.compareAndSwapInt(this, fieldOffset, currentValue, targetValue)) {
                // CAS 成功,则将 a 的值设置为 targetValue
                System.out.println(targetValue + " ");
                break;  // 成功更新并打印后跳出循环
            }
            // 如果 CAS 失败,意味着在读取 currentValue 和执行 CAS 之间,a 的值被其他线程修改了,
            // 此时 currentValue 已经不是 a 的最新值,需要重新读取并重试。
        }
        // 如果 currentValue != targetValue - 1,说明还没轮到当前线程更新
        // 或者已经被其他线程更新超过了,让出 CPU 给其他线程机会
        Thread.yield(); // 提示 CPU 调度器可以切换线程,减少无效自旋
    }
}

在上述例子中,我们创建了两个线程,他们都尝试修改共享变量 a。每个线程在调用 incrementAndPrint(targetValue)方法时:

  1. 会先读取当前 a 的值。
  2. 判断 currentValue 是否等于 targetValue - 1(即期望值的前一个值)。
  3. 如果条件满足,则调用 unsafe.compareAndSwapInt() 尝试将 a 从 currentValue 更新到 targetValue
  4. 如果 CAS 操作成功(返回 true),打印 targetValue 并退出循环。
  5. 如果 CAS 失败,或者 currentValue 不满足条件,则当前线程会继续循环(自旋),并通过 Thread.yield() 尝试让出 CPU,直到成功更新并打印或者条件满足。
    这种机制确保了每个数字(从 1 到 9)只会被成功设置并打印一次,并且是按顺序进行的。
    在这里插入图片描述

需要注意的是:

  1. compareAndSwapInt 本身是只执行一次比较和交换操作,并立即返回结果。因此,为了确保操作最终成功,我们需要在代码中显示的实现自旋逻辑(如 while(true) 循环),不断尝试知道 CAS 操作成功。
  2. AtomicInteger 的实现:JDK 中的 java.util.concurrent.atomic.AtomicInteger 类内部正是用了类似的 CAS 操作和自旋逻辑来实现其原子性的 getAndIncrement()compareAndSet() 等方法。直接使用 AtomicInteger 通常是更安全的做法,因为它封装了底层的复杂性。
  3. CPU 消耗:长时间的自旋会消耗 CPU 的资源。在竞争激烈或条件长时间不满足的情况下,可以考虑加入更复杂的退避策略(Thread.sleep()LockSupport.parkNanos())来优化。

AQS

AQS 是什么?

AQS(AbstractQueuedSynchronizer,抽象队列同步器)是从 JDK1.5 开始提供的 Java 并发核心组件。

AQS 解决了开发者在实现同步器时的复杂性问题。它提供了一个通用框架,用于实现各种同步器,例如 可重入锁ReentrantLock)、信号量Semaphore)和 倒计时器CountDownLatch)。通过封装底层的线程同步机制,AQS 将复杂的线程管理逻辑隐藏起来,使开发者只需要专注于具体的同步逻辑。

简单来说,AQS 是一个抽象类,为同步器提供了通用的 执行框架。它定义了 资源获取和释放的通用流程,而具体的资源获取逻辑则由具体同步器通过重写模板方法来实现。因此,可以将 AQS 看作是同步器的 基础"底座",而同步器则是基于 AQS 实现的 具体"应用"

AQS 的原理是什么?

AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时所分配的机制,这个机制 AQS 是基于 CLH 锁 进一步优化实现的。

CLH 锁 对自旋锁进行了改进,是基于单链表的自旋锁。在多线程场景下,会将请求获取锁的线程组织成一个单项队列,每个等待的线程会通过自旋访问前一个线程节点的状态,前一个节点释放锁之后,当前节点才可以获取锁。CLH 锁 的队列结构如下图所示。
在这里插入图片描述

AQS 中使用的 等待队列 是 CLH 锁队列的变体。

AQS 的 CLH 变体队列是一个双向队列,会将暂时获取不到锁的线程加入到该队列中,CLH 变体队列和原本的 CLG 锁队列的区别主要有两点:

  • 自旋 优化为 自旋 + 阻塞:自旋操作的性能很高,但大量的自选操作比较占用 CPU 资源,因此在 CLH 变体队列中会优先通过自旋锁尝试获取锁,如果失败再进行阻塞等待。
  • 单项队列 优化为 双向队列:在 CLH 变体队列中,会对等待的线程进行阻塞操作,当队列前边的线程释放锁之后,需要对后边的线程进行唤醒,因此增加了 next 指针,成为了双向队列。

AQS 将每条请求共享资源的线程封装成一个 CLH 变体队列的一个节点(Node)来实现锁的分配。在 CLH 变体队列中,一个节点表示一个线程,它保存着线程的引用(thread)、当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。

AQS 中的 CLH 变体队列结构如下图所示:
在这里插入图片描述

AQS(AbstractQueueSynchronized)的核心原理图:
在这里插入图片描述

AQS 使用 init 成员变量 state 表示同步状态,通过内置的 线程等待队列 来完成获取资源线程的排队工作。

state 变量由 volatile 修饰,用于展示当前临界资源的获锁情况。

// 共享变量,使用volatile修饰保证线程可见性
private volatile int state;

另外,状态信息 state 可以通过 protected 类型的 getState()setState()compareAndSwap() 进行操作。并且,这几个方法都是 final 修饰的,在子类中无法被重写。

//返回同步状态的当前值
protected final int getState() {
     return state;
}
 // 设置同步状态的值
protected final void setState(int newState) {
     state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
      return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

ReentrantLock 为例,state 初始值为 0,表示未锁定状态。A 线程 lock() 时,会调用 tryAcquire() 独占该锁并将 state + 1。此后,其他线程再 tryAcquire() 就会失败,知道 A 线程 unlock()state = 0(即释放锁)为止,其他线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取锁的,此时 state 会累加,这就是可重入的概念。但是获取多少次就要释放多少次,这样才能保证 state 是能回到零态的。

再以 CountDownLatch 以例,任务分为 N 个子线程去执行,state 也初始化 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后 countDown() 一次,state 会 CAS 减 1。等到所有子线程都执行完后(即 state = 0),会 unpark() 主调用线程,然后主调用线程就会从 wait() 函数返回,继续后续动作。

Logo

开源鸿蒙跨平台开发社区汇聚开发者与厂商,共建“一次开发,多端部署”的开源生态,致力于降低跨端开发门槛,推动万物智联创新。

更多推荐