2.38 Java并发笔记

5 minute read

2.38.1 synchronized实现原理

  • 实现原理
    • 同步方法, 锁的是实例对象, 即对象锁, 方法表中的ACC_SYNCHRONIZED字段标识, 并使用该方法的对象在JVM内部对象表示的Klass作为锁对象
    • 静态同步方法, 锁的是类对象, 即类锁, 方法表中的ACC_SYNCHRONIZED字段标识, 并使用该方法所属的Class在JVM内部对象表示的Klass作为锁对象
    • 同步方法块, 锁的是括号指定的对象, 同步方法块在字节码层面使用monitorentermonitorexit实现
  • Java对象头
    • Mark Word(标记字段)

    Mark Word字段图 来源于【死磕Java并发】 Mark Word字段图 来源于【死磕Java并发】

    • Klass Pointer(类型指针)
  • monitor(ObjectMonitorC++层实现)

    Java Monitor图解 来源于探索 Java 同步机制 Java Monitor图解 来源于探索 Java 同步机制

  • 锁优化
    • 自旋锁, 线程自旋等待一段时间, 不会立即挂起, 看持有锁的线程是否快速释放锁, 但它缺点是会占用CPU时间, 避免CPU浪费, 加入适应自旋锁优化
    • 自适应自旋锁, 由上一次同一个锁的自旋时间及锁的拥有者的状态决定, 线程自旋成功则自旋次数增加, 否则, 减少自旋次数或省略自旋过程
    • 锁消除, 不存在竞争则把锁消除
    • 轻量级锁, 没有多线程竞争的前提下, 减少传统的重量级锁使用操作系统互斥量产生的性能消耗
    • 偏向锁, 线程A获得锁, 锁即进入偏向模式, Mark Word变为偏向锁结构, 线程A再次请求锁, 无需操作即获得锁

2.38.2 volatile实现原理

  • 缓存一致性协议, 当某个CPU写数据时, 如果操作的变量是共享变量, 则通知其它CPU该变量的缓存行是无效; 其它CPU读取变量后发现无效则从主存加载数据
  • 保证可见性, 变量被volatile修饰, 则该变量在线程的工作内存无效, 线程修改共享变量后立即同步到主内存, 其它线程读取共享变量从主内存读取
  • 禁止重排序(volatile修饰)
    • 编译器重排序, 编译器不改变单线程的语义下, 重排语句
    • 处理器重排序, 如果不存在数据依赖, 处理器可以改变对机器指令的执行顺序
    • happens-before原则, 无法从happens-before原则推出来, 均可能被随意重排序
  • 内存屏障
    • 加入volatile关键字, 会多出load前缀的指令, 该指令相当于内存屏障, 内存屏障是一组处理指令, 用于实现对内存操作的顺序限制; volatile的底层即使用内存屏障实现

Java内存模型之Happens-before

  • 程序次序规则(Program Order Rule): 在一个线程内, 按照程序代码顺序, 书写在前面的操作先行发生于(happens-before)书写在后面的操作。(更确切地说是控制流顺序而不是程序代码顺序)
  • 管程锁定规则(Monitor Lock Rule): 一个unlock操作先行发生于(happens-before)后面对同一个锁的lock操作
  • volatile变量规则(Volatile Variable Rule): 对于一个volatile变量的写操作先行发生于(happens-before)后面对这个变量的读操作
  • 线程启动规则(Thread Start Rule): Thread对象的start()方法线性发生于(happens-before)此线程的每一个动作
  • 线程终止规则(Thread Termination Rule): 对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生, 可通过Thread.interrupted()方法检查是否有中断
  • 对象终结规则(Finalizer Rule): 一个对象的初始化完成(构造函数执行结束)先行发生于(happens-before)它的finalize()方法的开始
  • 传递性(Transitivity): 如果操作A先行发生于(happens-before)操作B, 操作B先行发生于(happens-before)操作C, 那么操作A先行发生于(happens-before)操作C

Java内存之重排序

  • 在单线程环境下不能改变程序的运行结果
  • 存在数据依赖关系的不允许重排序
  • as-if-serial, 重排序后单线程的执行结果不能改变, 编译器, runtime, 处理器必须遵循as-if-serial

Java内存模型之分析volatile

  • 在每一个volatile写操作前面插入一个StoreStore屏障, StoreStore屏障可以保证在volatile写之前, 其前面的所有普通写操作都已经刷新到主内存中
  • 在每一个volatile写操作后面插入一个StoreLoad屏障, StoreLoad屏障的作用是避免volatile写与后面可能的volatile读/写操作重排序
  • 在每一个volatile读操作后面插入一个LoadLoad屏障, LoadLoad屏障用于禁止处理器把上面的volatile读与下面的普通读重排序
  • 在每一个volatile读操作后面插入一个LoadStore屏障, LoadStore屏障用于禁止处理器把上面的volatile读与下面的普通写重排序

AQS-CLH同步队列

  • CLH同步队列是FIFO双向队列, AQS依赖它完成同步状态管理, 当前线程如果获得同步状态失败时, AQS则将当前线程已经等待状态等信息构成一个节点(Node)并加入到CLH队列, 同时阻塞当前线程, 当同步释放时, 首先唤醒(公平锁)头节点, 使其再次尝试获得同步状态; 结点表示线程, 保存线程的引用(thread), 状态(waitStatus), 前驱节点(prev), 后继节点(next)
static final class Node {
  // 共享
  static final Node SHARED = new Node();
  // 独占
  static final Node EXCLUSIVE = null;
  // 因为超时或者中断, 节点会被设置成取消状态, 被取消节点不会参与竞争中, 且一直保持取消状态不会转变为其它状态
  static final int CANCELLED = 1;
  // 后续节点的线程处于等待状态, 而当前节点的线程如果释放了同步状态或者取消, 将会通知后继节点, 使后继节点的线程得以运行
  static final int SIGNAL = -1;
  // 节点在等待队列中, 节点线程等待在Condition上, 当其它线程对Condition调用signal()后, 该节点将会从等待队列移除到同步队列, 加入同步状态的获取中
  static final int CONDITION = -2;
  // 表示下一次共享式同步状态获取会无条件传播下去
  static final int PROPAGATE = -3;
  // 等待状态
  volatile int waitStatus;
  // 前驱节点
  volatile Node prev;
  //后继节点
  volatile Node next;
  // 获取同步状态的线程
  volatile Thread thread;

  ...
}

AQS-同步状态的获取与释放

  • 独占锁
    • acquire(int arg)独占式同步状态获取, 该方法对中断不敏感, 即线程获取状态失败后加入CLH队列, 后续对线程中断操作时, 线程不会从队列移除
      public final void acquire(int arg) {
        // tryAcquire即尝试获取锁, 获取成功则设置锁状态并返回true, 否则返回false; 该方法自定义同步组件自己实现, 且必须保证线程安全地获取同步状态
        // addWaiter, 如果tryAcquire返回false则调用此方法将当前线程加入CLH同步队列尾部
        // acquireQueued, 当前线程会根据公平性原则来进行阻塞等待(自旋), 直到获取到锁为止
        // selfInterrupt产生一个中断
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
      }
    
      // acquireQueued为自旋过程, 当线程(node)进入到同步队列后进入自旋过程, 每个节点都会观察, 当条件满足获取同步状态后, 则从自旋退出, 否则一直自旋
      final boolean acquireQueued(final Node node, int arg) {
        ...
          // 自旋
          for(;;) {
            final Node p = node.predecessor();
            // 当前线程的前驱节点是头结点, 且获取同步状态成功
            if (p == head && tryAcquire(arg)) {
              setHead(node);
              ...
            }
    
            // 获取失败, 线程等待
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
              interrupt = true;
            }
          }
        ...
      }
    
  • 独占式获取响应中断

      // 该方法在等待获取同步状态时, 如果当前线程被中断, 则响应中断抛出InterruptedException
      public final void acquireInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted()) throw new InterruptedException();
        if (!tryAcquire(arg)) doAcquireInterruptibly(arg);
      }
    
  • 独占式超时获取

      public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
          if (Thread.interrupted()) throw new InterruptedException();
          return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
      }
    
  • 独占式同步释放

      public final boolean release(int arg) {
        if (tryRelease(arg)) {
          Node h = head;
          if (h != null && h.waitStatus != 0) {
            unparkSuccessor(h);
          }
          ...
        }
        ...
      }
    
  • 共享式同步状态获取(共享式允许同一时刻有多个线程获取同步状态)
  public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
    doAcquireShared(arg);
  }

  private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED)
    ...
    for(;;) {
      final Node p = node.predecessor();
      if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
          setHeadAndPropagate(arg);
          ...
        }
        ...
      }
    }
    ...
  }
  • 共享式同步释放

      public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
          doReleaseShared();
          return true;
        }
        return false;
      }
    

AQS-阻塞和唤醒线程

  • 判断当前线程是否需要阻塞
  final boolean acquireQueued(final Node node, int arg) {
    ...
      if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
        interrupt = true;
    ...
  }

  // 获取同步失败后, 先检查线程状态
  private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前驱节点的状态
    int ws = pred.waitStatus;
    // 状态为signal, 表示当前结点处于等待状态, 直接返回true
    if (ws == Node.SIGNAL) return true;

    if (ws > 0) {
      // 前驱节点状态 > 0, 则为Cancelled, 表明该节点已经超时或者中断, 需要从同步队列移除
      do {
        node.prev = pred = pred.prev;
      } while (pred.waitStatus > 0);
      pred.next = node;
    } else {
      // 前驱节点非cancelled, 非signal, 则通过CAS的方式将前驱节点设置为SIGNAL
      compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    ...
  }

  // 阻塞当前线程
  private final boolean parkAndCheckInterrupt() {
    // 调用Unsafe#park();
    LockSupport.park(this);
    return Thread.interrupted();
  }
  • 唤醒后续节点

      public final boolean release(int arg) {
        if (tryRelease(arg)) {
          Node h = head;
          if (h != null && h.waitStatus != 0)
            // 唤醒后续节点
            unparkSuccessor(h);
          ...
        }
        ...
      }
    
      private void unparkSuccessor(Node node) {
        // 当前节点状态
        int ws = node.waitStatus;
        if (ws < 0)
          compareAndSetWaitStatus(node, ws, 0);
        ...
        // 唤醒后续节点
        LockSupport.unpark(s.thread);
      }
    

ReentrantLock

  • 公平锁和非公平锁的区别在于公平锁的获取是有顺序的
  • 获取锁

      // 非公平锁
      ReentrantLock lock = new ReentrantLock();
      lock.lock();
    
      // ReentrantLock#lock();
      // Sync为ReentrantLock的内部类, 它继承AQS(AbstractQueuedSynchronizer), 它包括两个子类: 公平锁FairSync和非公平锁NonFairSync, ReentrantLock大部分功能委托给Sync实现
      public void lock() {
        sync.lock();
      }
    
      // NonfairSync.java
      final void lock() {
        // 尝试获取锁
        if (compareAndSetState(0, 1))
          setExclusiveOwnerThread(Thread.currentThread());
        else
          // 获取失败, 调用AQS的acquire(int)方法
          acquire(1);
      }
    
      // AQS#acquire()
      public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          ...
      }
    
      // NonFairSync.java
      protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
      }
    
      // AQS#nonfairTryAcquire()
      final boolean nonfairTryAcquire(int acquires) {
        // 当前线程
        final Thread current = Thread.currentThread();
        int c = getState();
        // 0表示该锁处于空闲状态
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
              setExclusiveOwnerThread(current);
              return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
          // 线程重入, 判断线程锁持有的线程是否为当前线程
          ...
          setState(nextc);
          ...
        }
        ...
      }
    
  • 释放锁

      // ReentrantLock#unlock()
      public void unlock() {
        sync.release(1);
      }
    
      // AQS#release()
      public final boolean release(int arg) {
        if (tryRelease(arg)) {
          ...
          // LockSupport#unpark();
          unparkSuccessor(h);
          ...
        }
        ...
      }
    
      protected final boolean tryRelease(int releases) {
        ...
        // state == 0表示已经释放完全, 其它线程可以获取同步状态
        if (c == 0) {
            setExclusiveOwnerThread(null);
        }
        setState(c);
        ...
      }
    
  • 公平锁获取

      protected final boolean tryAcquire(int acquires) {
        ...
        // 实现和非公平锁基本一致, 只加了hasQueuedPredecessors()
        if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
          ...
        }
        ...
      }
    
      // 判断当前线程是否位于CLH同步队列中的第一个
      public final boolean hasQueuedPredecessors() {
        Node t = tail; // 尾节点
        Node h = head; // 头节点
        Node s;
    
        // 当前线程是否同步队列第一个点
        return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
      }
    
  • ReentrantLock和synchronized的区别
    • 与synchronized相比, ReentrantLock提供更全面的功能, 具备更强的扩展性; 例如: 时间等候, 可中断等候, 锁投票
    • ReentrantLock提供条件Condition, 对线程的等待, 唤醒更灵活
    • ReentrantLock提供可轮询的锁请求
    • ReentrantLock支持更灵活的同步代码块
    • ReentrantLock支持中断处理, 且性能synchronized好些(«Java并发编程实战»中提到synchronized已经做了很多优化, 性能和ReentrantLock差不多, 大部分场景使用synchronized即可)

ReentrantReadWriteLock

  • #readLock(), 读锁为共享锁, 支持多个reader线程同时持有
  • #writeLock(), 写锁为独占式, 只支持一个线程持有

CountDownLatch(等待多线程完成)

  • 场景: 多线程解析Excel多个sheet的数据, 等所有的sheet解析完后, 程序需要提示解析完成
    • 使用Thread#join()
      public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(new Runnable() {
          public void run() {
            ...
          }
        });
    
        Thread t2 = new Thread(new Runnable() {
          public void run() {
            ...
          }
        });
    
        t1.start();
        t2.start();
        // join实现原理, 不断while循环检查join线程是否存活
        t1.join();
        t2.join();
      }
    
    • CountDownLatch(使用AQS实现)
      static CountDownLatch c = new CountDownLatch(2);
    
      public static void main(String[] args) {
        new Thread(new Runnable() {
          public void run() {
            c.countDown();
            ...
            c.countDown();
          }
        }).start();
        // 阻塞等待count == 0.
        c.await();
      }
    
    

    CyclicBarrier

  // 参数2表示屏蔽拦截线程的数量
  // 参数是2, 一定要在线程调用几次await();
  static CyclicBarrier c = new CyclicBarrier(2);

  public static void main(String[] args) {
    new Thread(new Runnable() {
      public void run() {
        try {
          c.await();
        } (Exception e) {

        }
      }
    }).start();
    try {
      c.await();
    } (Exception e) {

    }
  }

Semaphore

  • 用于控制同时访问特定资源的线程数量; 比如控制数据库连接资源访问等

      private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
      private static Semaphore semaphore = new Semaphore(10);
    
      public staic void main(String[] args) {
    
        for (int i = 0; i < THREAD_COUNT; i++) {
          threadPool.execute(new Runnable() {
            public void run() {
              try {
                semaphore.acquire();
                ...
                semaphore.release();
              } catch(InterruptedException e) {
    
              }
            }
          });
        }
      }
    

Exchanger

  • 线程数据交换, 通过Exchanger#exchange()

参考