2.38 Java并发笔记
2.38.1 synchronized实现原理
- 实现原理
- 同步方法, 锁的是实例对象, 即对象锁, 方法表中的ACC_SYNCHRONIZED字段标识, 并使用该方法的对象在JVM内部对象表示的Klass作为锁对象
- 静态同步方法, 锁的是类对象, 即类锁, 方法表中的ACC_SYNCHRONIZED字段标识, 并使用该方法所属的Class在JVM内部对象表示的Klass作为锁对象
- 同步方法块, 锁的是括号指定的对象, 同步方法块在字节码层面使用
monitorenter
和monitorexit
实现
- Java对象头
- Mark Word(标记字段)
Mark Word字段图 来源于【死磕Java并发】
- Klass Pointer(类型指针)
-
monitor(ObjectMonitorC++层实现)
Java Monitor图解 来源于探索 Java 同步机制
- 锁优化
- 自旋锁, 线程自旋等待一段时间, 不会立即挂起, 看持有锁的线程是否快速释放锁, 但它缺点是会占用CPU时间, 避免CPU浪费, 加入适应自旋锁优化
- 自适应自旋锁, 由上一次同一个锁的自旋时间及锁的拥有者的状态决定, 线程自旋成功则自旋次数增加, 否则, 减少自旋次数或省略自旋过程
- 锁消除, 不存在竞争则把锁消除
- 轻量级锁, 没有多线程竞争的前提下, 减少传统的重量级锁使用操作系统互斥量产生的性能消耗
- 偏向锁, 线程A获得锁, 锁即进入偏向模式,
Mark Word
变为偏向锁结构, 线程A再次请求锁, 无需操作即获得锁
2.38.2 volatile实现原理
- 缓存一致性协议, 当某个CPU写数据时, 如果操作的变量是共享变量, 则通知其它CPU该变量的缓存行是无效; 其它CPU读取变量后发现无效则从主存加载数据
- 保证可见性, 变量被volatile修饰, 则该变量在线程的工作内存无效, 线程修改共享变量后立即同步到主内存, 其它线程读取共享变量从主内存读取
- 禁止重排序(volatile修饰)
- 编译器重排序, 编译器不改变单线程的语义下, 重排语句
- 处理器重排序, 如果不存在数据依赖, 处理器可以改变对机器指令的执行顺序
- happens-before原则, 无法从happens-before原则推出来, 均可能被随意重排序
- 内存屏障
- 加入volatile关键字, 会多出load前缀的指令, 该指令相当于内存屏障, 内存屏障是一组处理指令, 用于实现对内存操作的顺序限制; volatile的底层即使用内存屏障实现
Java内存模型之Happens-before
- 程序次序规则(Program Order Rule): 在一个线程内, 按照程序代码顺序, 书写在前面的操作先行发生于(happens-before)书写在后面的操作。(更确切地说是控制流顺序而不是程序代码顺序)
- 管程锁定规则(Monitor Lock Rule): 一个unlock操作先行发生于(happens-before)后面对同一个锁的lock操作
- volatile变量规则(Volatile Variable Rule): 对于一个volatile变量的写操作先行发生于(happens-before)后面对这个变量的读操作
- 线程启动规则(Thread Start Rule): Thread对象的start()方法线性发生于(happens-before)此线程的每一个动作
- 线程终止规则(Thread Termination Rule): 对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生, 可通过Thread.interrupted()方法检查是否有中断
- 对象终结规则(Finalizer Rule): 一个对象的初始化完成(构造函数执行结束)先行发生于(happens-before)它的finalize()方法的开始
- 传递性(Transitivity): 如果操作A先行发生于(happens-before)操作B, 操作B先行发生于(happens-before)操作C, 那么操作A先行发生于(happens-before)操作C
Java内存之重排序
- 在单线程环境下不能改变程序的运行结果
- 存在数据依赖关系的不允许重排序
- as-if-serial, 重排序后单线程的执行结果不能改变, 编译器, runtime, 处理器必须遵循as-if-serial
Java内存模型之分析volatile
- 在每一个volatile写操作前面插入一个StoreStore屏障, StoreStore屏障可以保证在volatile写之前, 其前面的所有普通写操作都已经刷新到主内存中
- 在每一个volatile写操作后面插入一个StoreLoad屏障, StoreLoad屏障的作用是避免volatile写与后面可能的volatile读/写操作重排序
- 在每一个volatile读操作后面插入一个LoadLoad屏障, LoadLoad屏障用于禁止处理器把上面的volatile读与下面的普通读重排序
- 在每一个volatile读操作后面插入一个LoadStore屏障, LoadStore屏障用于禁止处理器把上面的volatile读与下面的普通写重排序
AQS-CLH同步队列
- CLH同步队列是FIFO双向队列, AQS依赖它完成同步状态管理, 当前线程如果获得同步状态失败时, AQS则将当前线程已经等待状态等信息构成一个节点(Node)并加入到CLH队列, 同时阻塞当前线程, 当同步释放时, 首先唤醒(公平锁)头节点, 使其再次尝试获得同步状态; 结点表示线程, 保存线程的引用(thread), 状态(waitStatus), 前驱节点(prev), 后继节点(next)
static final class Node {
// 共享
static final Node SHARED = new Node();
// 独占
static final Node EXCLUSIVE = null;
// 因为超时或者中断, 节点会被设置成取消状态, 被取消节点不会参与竞争中, 且一直保持取消状态不会转变为其它状态
static final int CANCELLED = 1;
// 后续节点的线程处于等待状态, 而当前节点的线程如果释放了同步状态或者取消, 将会通知后继节点, 使后继节点的线程得以运行
static final int SIGNAL = -1;
// 节点在等待队列中, 节点线程等待在Condition上, 当其它线程对Condition调用signal()后, 该节点将会从等待队列移除到同步队列, 加入同步状态的获取中
static final int CONDITION = -2;
// 表示下一次共享式同步状态获取会无条件传播下去
static final int PROPAGATE = -3;
// 等待状态
volatile int waitStatus;
// 前驱节点
volatile Node prev;
//后继节点
volatile Node next;
// 获取同步状态的线程
volatile Thread thread;
...
}
AQS-同步状态的获取与释放
- 独占锁
acquire(int arg)
独占式同步状态获取, 该方法对中断不敏感, 即线程获取状态失败后加入CLH队列, 后续对线程中断操作时, 线程不会从队列移除
public final void acquire(int arg) { // tryAcquire即尝试获取锁, 获取成功则设置锁状态并返回true, 否则返回false; 该方法自定义同步组件自己实现, 且必须保证线程安全地获取同步状态 // addWaiter, 如果tryAcquire返回false则调用此方法将当前线程加入CLH同步队列尾部 // acquireQueued, 当前线程会根据公平性原则来进行阻塞等待(自旋), 直到获取到锁为止 // selfInterrupt产生一个中断 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // acquireQueued为自旋过程, 当线程(node)进入到同步队列后进入自旋过程, 每个节点都会观察, 当条件满足获取同步状态后, 则从自旋退出, 否则一直自旋 final boolean acquireQueued(final Node node, int arg) { ... // 自旋 for(;;) { final Node p = node.predecessor(); // 当前线程的前驱节点是头结点, 且获取同步状态成功 if (p == head && tryAcquire(arg)) { setHead(node); ... } // 获取失败, 线程等待 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { interrupt = true; } } ... }
-
独占式获取响应中断
// 该方法在等待获取同步状态时, 如果当前线程被中断, 则响应中断抛出InterruptedException public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
-
独占式超时获取
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
-
独占式同步释放
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) { unparkSuccessor(h); } ... } ... }
- 共享式同步状态获取(共享式允许同一时刻有多个线程获取同步状态)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED)
...
for(;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(arg);
...
}
...
}
}
...
}
-
共享式同步释放
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
AQS-阻塞和唤醒线程
- 判断当前线程是否需要阻塞
final boolean acquireQueued(final Node node, int arg) {
...
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupt = true;
...
}
// 获取同步失败后, 先检查线程状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点的状态
int ws = pred.waitStatus;
// 状态为signal, 表示当前结点处于等待状态, 直接返回true
if (ws == Node.SIGNAL) return true;
if (ws > 0) {
// 前驱节点状态 > 0, 则为Cancelled, 表明该节点已经超时或者中断, 需要从同步队列移除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 前驱节点非cancelled, 非signal, 则通过CAS的方式将前驱节点设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
...
}
// 阻塞当前线程
private final boolean parkAndCheckInterrupt() {
// 调用Unsafe#park();
LockSupport.park(this);
return Thread.interrupted();
}
-
唤醒后续节点
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 唤醒后续节点 unparkSuccessor(h); ... } ... } private void unparkSuccessor(Node node) { // 当前节点状态 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); ... // 唤醒后续节点 LockSupport.unpark(s.thread); }
ReentrantLock
- 公平锁和非公平锁的区别在于公平锁的获取是有顺序的
-
获取锁
// 非公平锁 ReentrantLock lock = new ReentrantLock(); lock.lock();
// ReentrantLock#lock(); // Sync为ReentrantLock的内部类, 它继承AQS(AbstractQueuedSynchronizer), 它包括两个子类: 公平锁FairSync和非公平锁NonFairSync, ReentrantLock大部分功能委托给Sync实现 public void lock() { sync.lock(); } // NonfairSync.java final void lock() { // 尝试获取锁 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // 获取失败, 调用AQS的acquire(int)方法 acquire(1); } // AQS#acquire() public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) ... } // NonFairSync.java protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } // AQS#nonfairTryAcquire() final boolean nonfairTryAcquire(int acquires) { // 当前线程 final Thread current = Thread.currentThread(); int c = getState(); // 0表示该锁处于空闲状态 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 线程重入, 判断线程锁持有的线程是否为当前线程 ... setState(nextc); ... } ... }
-
释放锁
// ReentrantLock#unlock() public void unlock() { sync.release(1); } // AQS#release() public final boolean release(int arg) { if (tryRelease(arg)) { ... // LockSupport#unpark(); unparkSuccessor(h); ... } ... } protected final boolean tryRelease(int releases) { ... // state == 0表示已经释放完全, 其它线程可以获取同步状态 if (c == 0) { setExclusiveOwnerThread(null); } setState(c); ... }
-
公平锁获取
protected final boolean tryAcquire(int acquires) { ... // 实现和非公平锁基本一致, 只加了hasQueuedPredecessors() if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { ... } ... } // 判断当前线程是否位于CLH同步队列中的第一个 public final boolean hasQueuedPredecessors() { Node t = tail; // 尾节点 Node h = head; // 头节点 Node s; // 当前线程是否同步队列第一个点 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
- ReentrantLock和synchronized的区别
- 与synchronized相比, ReentrantLock提供更全面的功能, 具备更强的扩展性; 例如: 时间等候, 可中断等候, 锁投票
- ReentrantLock提供条件Condition, 对线程的等待, 唤醒更灵活
- ReentrantLock提供可轮询的锁请求
- ReentrantLock支持更灵活的同步代码块
- ReentrantLock支持中断处理, 且性能synchronized好些(«Java并发编程实战»中提到synchronized已经做了很多优化, 性能和ReentrantLock差不多, 大部分场景使用synchronized即可)
ReentrantReadWriteLock
#readLock()
, 读锁为共享锁, 支持多个reader线程同时持有#writeLock()
, 写锁为独占式, 只支持一个线程持有
CountDownLatch(等待多线程完成)
- 场景: 多线程解析Excel多个sheet的数据, 等所有的sheet解析完后, 程序需要提示解析完成
- 使用
Thread#join()
public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(new Runnable() { public void run() { ... } }); Thread t2 = new Thread(new Runnable() { public void run() { ... } }); t1.start(); t2.start(); // join实现原理, 不断while循环检查join线程是否存活 t1.join(); t2.join(); }
- CountDownLatch(使用AQS实现)
static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) { new Thread(new Runnable() { public void run() { c.countDown(); ... c.countDown(); } }).start(); // 阻塞等待count == 0. c.await(); }
CyclicBarrier
- 使用
// 参数2表示屏蔽拦截线程的数量
// 参数是2, 一定要在线程调用几次await();
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
public void run() {
try {
c.await();
} (Exception e) {
}
}
}).start();
try {
c.await();
} (Exception e) {
}
}
Semaphore
-
用于控制同时访问特定资源的线程数量; 比如控制数据库连接资源访问等
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore semaphore = new Semaphore(10); public staic void main(String[] args) { for (int i = 0; i < THREAD_COUNT; i++) { threadPool.execute(new Runnable() { public void run() { try { semaphore.acquire(); ... semaphore.release(); } catch(InterruptedException e) { } } }); } }
Exchanger
- 线程数据交换, 通过
Exchanger#exchange()