2.38 Java并发笔记
2.38.1 synchronized实现原理
- 实现原理
    - 同步方法, 锁的是实例对象, 即对象锁, 方法表中的ACC_SYNCHRONIZED字段标识, 并使用该方法的对象在JVM内部对象表示的Klass作为锁对象
- 静态同步方法, 锁的是类对象, 即类锁, 方法表中的ACC_SYNCHRONIZED字段标识, 并使用该方法所属的Class在JVM内部对象表示的Klass作为锁对象
- 同步方法块, 锁的是括号指定的对象, 同步方法块在字节码层面使用monitorenter和monitorexit实现
 
- Java对象头
    - Mark Word(标记字段)
 Mark Word字段图 来源于【死磕Java并发】  - Klass Pointer(类型指针)
 
- 
    monitor(ObjectMonitorC++层实现) Java Monitor图解 来源于探索 Java 同步机制  
- 锁优化
    - 自旋锁, 线程自旋等待一段时间, 不会立即挂起, 看持有锁的线程是否快速释放锁, 但它缺点是会占用CPU时间, 避免CPU浪费, 加入适应自旋锁优化
- 自适应自旋锁, 由上一次同一个锁的自旋时间及锁的拥有者的状态决定, 线程自旋成功则自旋次数增加, 否则, 减少自旋次数或省略自旋过程
- 锁消除, 不存在竞争则把锁消除
- 轻量级锁, 没有多线程竞争的前提下, 减少传统的重量级锁使用操作系统互斥量产生的性能消耗
- 偏向锁, 线程A获得锁, 锁即进入偏向模式, Mark Word变为偏向锁结构, 线程A再次请求锁, 无需操作即获得锁
 
2.38.2 volatile实现原理
- 缓存一致性协议, 当某个CPU写数据时, 如果操作的变量是共享变量, 则通知其它CPU该变量的缓存行是无效; 其它CPU读取变量后发现无效则从主存加载数据
- 保证可见性, 变量被volatile修饰, 则该变量在线程的工作内存无效, 线程修改共享变量后立即同步到主内存, 其它线程读取共享变量从主内存读取
- 禁止重排序(volatile修饰)
    - 编译器重排序, 编译器不改变单线程的语义下, 重排语句
- 处理器重排序, 如果不存在数据依赖, 处理器可以改变对机器指令的执行顺序
- happens-before原则, 无法从happens-before原则推出来, 均可能被随意重排序
 
- 内存屏障
    - 加入volatile关键字, 会多出load前缀的指令, 该指令相当于内存屏障, 内存屏障是一组处理指令, 用于实现对内存操作的顺序限制; volatile的底层即使用内存屏障实现
 
Java内存模型之Happens-before
- 程序次序规则(Program Order Rule): 在一个线程内, 按照程序代码顺序, 书写在前面的操作先行发生于(happens-before)书写在后面的操作。(更确切地说是控制流顺序而不是程序代码顺序)
- 管程锁定规则(Monitor Lock Rule): 一个unlock操作先行发生于(happens-before)后面对同一个锁的lock操作
- volatile变量规则(Volatile Variable Rule): 对于一个volatile变量的写操作先行发生于(happens-before)后面对这个变量的读操作
- 线程启动规则(Thread Start Rule): Thread对象的start()方法线性发生于(happens-before)此线程的每一个动作
- 线程终止规则(Thread Termination Rule): 对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生, 可通过Thread.interrupted()方法检查是否有中断
- 对象终结规则(Finalizer Rule): 一个对象的初始化完成(构造函数执行结束)先行发生于(happens-before)它的finalize()方法的开始
- 传递性(Transitivity): 如果操作A先行发生于(happens-before)操作B, 操作B先行发生于(happens-before)操作C, 那么操作A先行发生于(happens-before)操作C
Java内存之重排序
- 在单线程环境下不能改变程序的运行结果
- 存在数据依赖关系的不允许重排序
- as-if-serial, 重排序后单线程的执行结果不能改变, 编译器, runtime, 处理器必须遵循as-if-serial
Java内存模型之分析volatile
- 在每一个volatile写操作前面插入一个StoreStore屏障, StoreStore屏障可以保证在volatile写之前, 其前面的所有普通写操作都已经刷新到主内存中
- 在每一个volatile写操作后面插入一个StoreLoad屏障, StoreLoad屏障的作用是避免volatile写与后面可能的volatile读/写操作重排序
- 在每一个volatile读操作后面插入一个LoadLoad屏障, LoadLoad屏障用于禁止处理器把上面的volatile读与下面的普通读重排序
- 在每一个volatile读操作后面插入一个LoadStore屏障, LoadStore屏障用于禁止处理器把上面的volatile读与下面的普通写重排序
AQS-CLH同步队列
- CLH同步队列是FIFO双向队列, AQS依赖它完成同步状态管理, 当前线程如果获得同步状态失败时, AQS则将当前线程已经等待状态等信息构成一个节点(Node)并加入到CLH队列, 同时阻塞当前线程, 当同步释放时, 首先唤醒(公平锁)头节点, 使其再次尝试获得同步状态; 结点表示线程, 保存线程的引用(thread), 状态(waitStatus), 前驱节点(prev), 后继节点(next)
static final class Node {
  // 共享
  static final Node SHARED = new Node();
  // 独占
  static final Node EXCLUSIVE = null;
  // 因为超时或者中断, 节点会被设置成取消状态, 被取消节点不会参与竞争中, 且一直保持取消状态不会转变为其它状态
  static final int CANCELLED = 1;
  // 后续节点的线程处于等待状态, 而当前节点的线程如果释放了同步状态或者取消, 将会通知后继节点, 使后继节点的线程得以运行
  static final int SIGNAL = -1;
  // 节点在等待队列中, 节点线程等待在Condition上, 当其它线程对Condition调用signal()后, 该节点将会从等待队列移除到同步队列, 加入同步状态的获取中
  static final int CONDITION = -2;
  // 表示下一次共享式同步状态获取会无条件传播下去
  static final int PROPAGATE = -3;
  // 等待状态
  volatile int waitStatus;
  // 前驱节点
  volatile Node prev;
  //后继节点
  volatile Node next;
  // 获取同步状态的线程
  volatile Thread thread;
  ...
}
AQS-同步状态的获取与释放
- 独占锁
    - acquire(int arg)独占式同步状态获取, 该方法对中断不敏感, 即线程获取状态失败后加入CLH队列, 后续对线程中断操作时, 线程不会从队列移除
 public final void acquire(int arg) { // tryAcquire即尝试获取锁, 获取成功则设置锁状态并返回true, 否则返回false; 该方法自定义同步组件自己实现, 且必须保证线程安全地获取同步状态 // addWaiter, 如果tryAcquire返回false则调用此方法将当前线程加入CLH同步队列尾部 // acquireQueued, 当前线程会根据公平性原则来进行阻塞等待(自旋), 直到获取到锁为止 // selfInterrupt产生一个中断 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // acquireQueued为自旋过程, 当线程(node)进入到同步队列后进入自旋过程, 每个节点都会观察, 当条件满足获取同步状态后, 则从自旋退出, 否则一直自旋 final boolean acquireQueued(final Node node, int arg) { ... // 自旋 for(;;) { final Node p = node.predecessor(); // 当前线程的前驱节点是头结点, 且获取同步状态成功 if (p == head && tryAcquire(arg)) { setHead(node); ... } // 获取失败, 线程等待 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { interrupt = true; } } ... }
- 
    独占式获取响应中断 // 该方法在等待获取同步状态时, 如果当前线程被中断, 则响应中断抛出InterruptedException public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
- 
    独占式超时获取 public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
- 
    独占式同步释放 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) { unparkSuccessor(h); } ... } ... }
- 共享式同步状态获取(共享式允许同一时刻有多个线程获取同步状态)
  public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
    doAcquireShared(arg);
  }
  private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED)
    ...
    for(;;) {
      final Node p = node.predecessor();
      if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
          setHeadAndPropagate(arg);
          ...
        }
        ...
      }
    }
    ...
  }
- 
    共享式同步释放 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
AQS-阻塞和唤醒线程
- 判断当前线程是否需要阻塞
  final boolean acquireQueued(final Node node, int arg) {
    ...
      if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
        interrupt = true;
    ...
  }
  // 获取同步失败后, 先检查线程状态
  private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前驱节点的状态
    int ws = pred.waitStatus;
    // 状态为signal, 表示当前结点处于等待状态, 直接返回true
    if (ws == Node.SIGNAL) return true;
    if (ws > 0) {
      // 前驱节点状态 > 0, 则为Cancelled, 表明该节点已经超时或者中断, 需要从同步队列移除
      do {
        node.prev = pred = pred.prev;
      } while (pred.waitStatus > 0);
      pred.next = node;
    } else {
      // 前驱节点非cancelled, 非signal, 则通过CAS的方式将前驱节点设置为SIGNAL
      compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    ...
  }
  // 阻塞当前线程
  private final boolean parkAndCheckInterrupt() {
    // 调用Unsafe#park();
    LockSupport.park(this);
    return Thread.interrupted();
  }
- 
    唤醒后续节点 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 唤醒后续节点 unparkSuccessor(h); ... } ... } private void unparkSuccessor(Node node) { // 当前节点状态 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); ... // 唤醒后续节点 LockSupport.unpark(s.thread); }
ReentrantLock
- 公平锁和非公平锁的区别在于公平锁的获取是有顺序的
- 
    获取锁 // 非公平锁 ReentrantLock lock = new ReentrantLock(); lock.lock();// ReentrantLock#lock(); // Sync为ReentrantLock的内部类, 它继承AQS(AbstractQueuedSynchronizer), 它包括两个子类: 公平锁FairSync和非公平锁NonFairSync, ReentrantLock大部分功能委托给Sync实现 public void lock() { sync.lock(); } // NonfairSync.java final void lock() { // 尝试获取锁 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // 获取失败, 调用AQS的acquire(int)方法 acquire(1); } // AQS#acquire() public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) ... } // NonFairSync.java protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } // AQS#nonfairTryAcquire() final boolean nonfairTryAcquire(int acquires) { // 当前线程 final Thread current = Thread.currentThread(); int c = getState(); // 0表示该锁处于空闲状态 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 线程重入, 判断线程锁持有的线程是否为当前线程 ... setState(nextc); ... } ... }
- 
    释放锁 // ReentrantLock#unlock() public void unlock() { sync.release(1); } // AQS#release() public final boolean release(int arg) { if (tryRelease(arg)) { ... // LockSupport#unpark(); unparkSuccessor(h); ... } ... } protected final boolean tryRelease(int releases) { ... // state == 0表示已经释放完全, 其它线程可以获取同步状态 if (c == 0) { setExclusiveOwnerThread(null); } setState(c); ... }
- 
    公平锁获取 protected final boolean tryAcquire(int acquires) { ... // 实现和非公平锁基本一致, 只加了hasQueuedPredecessors() if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { ... } ... } // 判断当前线程是否位于CLH同步队列中的第一个 public final boolean hasQueuedPredecessors() { Node t = tail; // 尾节点 Node h = head; // 头节点 Node s; // 当前线程是否同步队列第一个点 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
- ReentrantLock和synchronized的区别
    - 与synchronized相比, ReentrantLock提供更全面的功能, 具备更强的扩展性; 例如: 时间等候, 可中断等候, 锁投票
- ReentrantLock提供条件Condition, 对线程的等待, 唤醒更灵活
- ReentrantLock提供可轮询的锁请求
- ReentrantLock支持更灵活的同步代码块
- ReentrantLock支持中断处理, 且性能synchronized好些(«Java并发编程实战»中提到synchronized已经做了很多优化, 性能和ReentrantLock差不多, 大部分场景使用synchronized即可)
 
ReentrantReadWriteLock
- #readLock(), 读锁为共享锁, 支持多个reader线程同时持有
- #writeLock(), 写锁为独占式, 只支持一个线程持有
CountDownLatch(等待多线程完成)
- 场景: 多线程解析Excel多个sheet的数据, 等所有的sheet解析完后, 程序需要提示解析完成
    - 使用Thread#join()
 public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(new Runnable() { public void run() { ... } }); Thread t2 = new Thread(new Runnable() { public void run() { ... } }); t1.start(); t2.start(); // join实现原理, 不断while循环检查join线程是否存活 t1.join(); t2.join(); }- CountDownLatch(使用AQS实现)
 static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) { new Thread(new Runnable() { public void run() { c.countDown(); ... c.countDown(); } }).start(); // 阻塞等待count == 0. c.await(); }CyclicBarrier
- 使用
  // 参数2表示屏蔽拦截线程的数量
  // 参数是2, 一定要在线程调用几次await();
  static CyclicBarrier c = new CyclicBarrier(2);
  public static void main(String[] args) {
    new Thread(new Runnable() {
      public void run() {
        try {
          c.await();
        } (Exception e) {
        }
      }
    }).start();
    try {
      c.await();
    } (Exception e) {
    }
  }
Semaphore
- 
    用于控制同时访问特定资源的线程数量; 比如控制数据库连接资源访问等 private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore semaphore = new Semaphore(10); public staic void main(String[] args) { for (int i = 0; i < THREAD_COUNT; i++) { threadPool.execute(new Runnable() { public void run() { try { semaphore.acquire(); ... semaphore.release(); } catch(InterruptedException e) { } } }); } }
Exchanger
- 线程数据交换, 通过Exchanger#exchange()
