Java JUC源码分析
Java JUC源码分析
概念
三大特性
原子性
一个或者多个操作在CPU执行的过程中不被中断的特性
可见性
一个线程修改一个变量后,其他线程能够立即得知这个修改
各自的线程有其工作内存,即保存时的副本存储的空间
有序性
代码的执行顺序
编译器 和 处理器执行指令时会进行从排序,改变语序 这里注意什么时候能改什么时候不能改!
Volatile
保证不同线程对变量操作的内存可见性(修改变量直接刷新回主存),使缓存行无效
禁止指令重排序 注意是并不是完全就是一样的顺序,只保证先后顺序
为什么不能保证 count++ 原子性?
单个变量的读写 具有原子性 i = 100
但复合操作则不行 i++
底层机制:内存屏障
使用 : 状态标记、单例模式
锁
锁的分类
- 可重入、不可重入
- 悲观锁、乐观锁
- 公平锁、非公平锁
- 互斥锁、共享锁
结构 可重入锁 读锁 写锁
将大的方法加入同步锁会影响执行效率
sleep 抱着锁睡觉
锁到底锁什么
synchronized 默认锁的是this 方法的调用者
但是this 中的对象则不能同步 ,要同步哪个对象 就锁哪个
死锁
两个或多个线程在等待对方释放资源;某个同步块同时拥有两个以上对象的锁
产生死锁的条件
1.互斥条件:一个资源每次只能被一个进程使用。
2.请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放。
3.不剥夺条件:进程已获得的资源,在末使用完之前,不能强行剥夺。
4.循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系。
Synchronized和Lock的区别
- Lock可以判断是否获取到了锁
- lock必须要手动释放,如果不释放锁,则出现死锁
- Lock不一定会等待下去
- 可重入锁,不可中断,非公平
- Lock适合锁大量的代码
虚假唤醒
Synchronized
synchronized到底是什么
synchronized是一个重量级锁,实现依赖于JVM 的 monitor 监视器锁。主要使用monitorenter和monitorexit指令来实现方法同步和代码块同步。
在编译时,会将monitorexit指令插入到同步代码块的开始位置,而monitorexit插入方法结束处和异常处,并且每一个monitorenter都有一个与之对应的monitorexit。
任何对象都有一个monitor与之关联,当一个monitor被持有后,它将被处于锁定状态,线程执行到monitorenter指令时间,会尝试获取对象所对应的monitor的所有权,即获取获得对象的锁,由于在编译期会将monitorexit插入到方法结束处和异常处,所以在方法执行完毕或者出现异常的情况会自动释放锁
synchronized一段代码块或者方法后,其实是将内部设涉及到的变量从CPU缓存中移除,必须去主存中拿数据。
Java中的锁类型
可重入锁、公平锁、互斥锁、自旋锁、偏向锁、轻量级锁、读写锁、共享锁
ReentrantLock 加锁原理: 更改标识符的值
synchronized 到底改变的是对象的什么属性???
对象头
隐式
synchronized方法 monitorexit
JVM可以从方法常量池中的方法表结构(method_info Structure) 中的 ACC_SYNCHRONIZED 访问标志区分一个方法是否同步方法
当方法调用时,调用指令将会 检查方法的 ACC_SYNCHRONIZED 访问标志是否被设置,如果设置了,执行线程将先持有monitor(虚拟机规范中用的是管程一词), 然后再执行方法,最后再方法完成(无论是正常完成还是非正常完成)时释放monitor
为何class文件会有两个monitorexit
synchronized代码块 会有monitorenter...
为了保证在方法异常完成时 monitorenter 和 monitorexit 指令依然可以正确配对执行,编译器会自动产生一个异常处理器,这个异常处理器声明可处理所有的异常,它的目的就是用来执行 monitorexit 指令。从字节码中也可以看出多了一个monitorexit指令,它就是异常结束时被执行的释放monitor 的指令。
为什么早期是重量级锁
同时我们还必须注意到的是在Java早期版本中,synchronized属于重量级锁,效率低下,因为监视器锁(monitor)是依赖于底层的操作系统的Mutex Lock来实现的,而操作系统实现线程之间的切换时需要从用户态转换到核心态,这个状态之间的转换需要相对比较长的时间,时间成本相对较高
JVM对锁的优化
锁升级
锁的升级: 无锁--偏向--轻量--重量
为什么升级: 1.6前只要拿不到锁,就要挂起当前线程,性能差
无锁、匿名偏向锁:当前对象没有作为锁存在
偏向锁
在大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,因此为了减少同一线程获取锁(会涉及到一些CAS操作,耗时)的代价而引入偏向锁。偏向锁的核心思想是,如果一个线程获得了锁,那么锁就进入偏向模式,此时Mark Word 的结构也变为偏向锁结构,当这个线程再次请求锁时,无需再做任何同步操作,即获取锁的过程,这样就省去了大量有关锁申请的操作,从而也就提供程序的性能。当一个线程访问同步代码块并获取锁时,会在Mark Word里存储锁偏向的线程ID。在线程进入和退出同步块时不再通过CAS操作来加锁和解锁,而是检测Mark Word里是否存储着指向当前线程的偏向锁。引入偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量级锁执行路径,因为轻量级锁的获取及释放依赖多次CAS原子指令,而偏向锁只需要在置换ThreadID的时候依赖一次CAS原子指令即可。偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动释放偏向锁。所以,对于没有锁竞争的场合,偏向锁有很好的优化效果,毕竟极有可能连续多次是同一个线程申请相同的锁。但是对于锁竞争比较激烈的场合,偏向锁就失效了,因为这样场合极有可能每次申请锁的线程都是不相同的,因此这种场合下不应该使用偏向锁,否则会得不偿失,需要注意的是,偏向锁失败后,并不会立即膨胀为重量级锁,而是先升级为轻量级锁
轻量级锁
倘若偏向锁失败,虚拟机并不会立即升级为重量级锁,它还会尝试使用一种称为轻量级锁的优化手段(1.6之后加入的),此时Mark Word 的结构也变为轻量级锁的结构。
采用自旋的方式,以CAS的形式获取锁资源轻量级锁能够提升程序性能的依据是“对绝大部分的锁,在整个同步周期内都不存在竞争”,注意这是经验数据。需要了解的是,轻量级锁所适应的场景是线程交替执行同步块的场合,如果存在同一时间访问同一锁的场合,就会导致轻量级锁膨胀为重量级锁。
自旋锁
轻量级锁失败后,虚拟机为了避免线程真实地在操作系统层面挂起,还会进行一项称为自旋锁的优化手段。这是基于在大多数情况下,线程持有锁的时间都不会太长,如果直接挂起操作系统层面的线程可能会得不偿失,毕竟操作系统实现线程之间的切换时需要从用户态转换到核心态,这个状态之间的转换需要相对比较长的时间,时间成本相对较高,因此自旋锁会假设在不久将来,当前的线程可以获得锁,因此虚拟机会让当前想要获取锁的线程做几个空循环(这也是称为自旋的原因),一般不会太久,可能是50个循环或100循环,在经过若干次循环后,如果得到锁,就顺利进入临界区。重量级锁
如果还不能获得锁,那就会将线程在操作系统层面挂起,这就是自旋锁的优化方式,这种方式确实也是可以提升效率的。最后没办法也就只能升级为重量级锁了
锁消除
虚拟机在JIT编译时(可以简单理解为当某段代码即将第一次被执行时进行编译,又称即时编译),通过对运行上下文的扫描,去除不可能存在共享资源竞争的锁,通过这种方式消除没有必要的锁,可以节省毫无意义的请求锁时间,如下StringBuffer的append是一个同步方法,但是在add方法中的StringBuffer属于一个局部变量,并且不会被其他线程所使用,因此StringBuffer不可能存在共享资源竞争的情景,JVM会自动将其锁消除
锁膨胀
一个循环中频繁的获取和释放资源,开销很大,锁会扩大到循环外。
自旋锁
CAS 没有获取到锁的线程是不会阻塞的,要通过控制循环值不断获取
3个操作数 内存值 V(共享) 旧的预期值E(缓存) 要修改的新值N,仅当E == N时,才会修改 E!=N时,则再次查询主存中的V 使 E == N
底层:硬件
wait和notify
首先需要理解锁池和等待池这两个概念
notify和notifyall区别
为什么使用notify可能产生死锁而使用notifyAll则不会
AQS
AbstractQueuedSynchronizer
AbstractQueuedSynchronizer抽象类, JUC包下的一个基类。是一个用于构建锁、同步器等线程协作工具类的框架ReentrantLock、ThreadPoolExecutor、Semaphore等。有了 AQS 以后,很多用于线程协作的工具类就都可以很方便的被写出来,有了 AQS 之后,可以让更上层的开发极大的减少工作量,避免重复造轮子,同时也避免了上层因处理不当而导致的线程安全问题,因为 AQS 把这些事情都做好了
特点
- 提供volatile修饰的state对象,采用CAS方式修改。支持可重入的特点
- 维护FIFO双向链表Node表示等待队列
- 采用模板模式(protected方法)
- 主要方法tryAcquire/acquire/addWaiter/tryRelease(CAS轮训、挂起线程)
- 判断当前线程是否应该被挂起:看前一个节点的状态
- Release时特点:从尾到头释放
方法解读
- tryAcquire:尝试获取锁,立即返回结果
- acquire: 进入队列等待,直到获取
// 供上层编写, 模板方式
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 完整入队
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 该编写逻辑决定了后续为何要从尾到头遍历
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
aquire方法
- 先进行一次基于派生类实现的tryAcquire(有各自不同的特点)获取,若没有获取到锁
- 将当前等待获取锁的线程封装为一个Node,将其插入到FIFO的双向队列中。但这里插入的方式有快速插入和完全插入两种:
- 快速插入:一次CAS比较
- 完全插入:自旋 + CAS方式
- 插入成功后,就要根据当前队列的状态判断该线程是否需要被挂起
- 如果当前线程节点处于头节点的后一个(头节点只占位用),则通过自旋的方式去拿锁
- 否则判断该线程是否应该被挂起(以避免无用自旋消耗CPU),判断当前节点的前置节点的waitStatus
- SIGNAL:前置节点也在等待拿锁,当前节点肯定也要挂起
- 大于0: 前置节点被取消,删除前面所有ws大于0的节点
- 其他状态: 尝试将前置节点ws置为SIGNAL
AQS Node 中SIGNAL状态的意义:只要前置节点释放锁,则就会通知标识为SIGNAL状态的后续节点的线程
CANCELLED状态的意义:在同步队列等待的线程等待超时或被中断,则需要从同步队列中取消该Node节点
LockSupport.part作用是什么?为什么又有Thread.intterupted()?
先说结论:处于等待队列中的线程无法响应外部的中断请求,只有当队列中的线程拿到锁后才去响应之前的中断请求
基础:当其他线程试图通过interrupt去中断一个处于wait或sleep的线程时则会产生异常。而LockSupport.part可以让线程进入waiting(jvm)/sleep(os)的状态,只能通过LockSupport.unpark或者interrupt方法去唤醒该线程。
所以等待线程在park后就被挂起了,如果有其他线程对其进行interrupt中断,则会return中断标志位(此时就为true) (但线程此时没有被中断)。在acquireQueued方法中通过成员变量interrupted存储了这个中断标志值,当线程获取到锁的时候,则会返回这个值(其意义就是,该线程获取到锁了,但需要被中断)。所以就会运行到acquire方法下的selfInterrupt方法中进行此线程的interrupt()
release方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 将当前线程ws状态置为0以便不影响其他判断
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
- 尝试释放当前线程持有的锁,如果成功释放,则
- CAS操作改变当前线程ws 以便不影响并发情况下其他线程的判断
- 判断下一个节点是否有效
- 无效:从尾到头去找有效节点(从尾找到头,而不会在中途停止) 为什么要从尾到头
- 有效:通过LockSupport.unpark去释放锁
为什么这里是从尾到头去寻找有效节点?
关键还要看enq方法,enq方法中的关键语句:
node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t;
CAS操作不能保证if代码块的线程安全,也就是在CAS操作与t.next=node之间可能有其他线程进行操作,进而导致 tail指向prev但prev未指向tail(尽管最后都会顺利连接上),在这个过程中,如果有线程执行unparkSuccessor且按照从头到尾的顺序去遍历,就无法遍历到所有的节点,但从尾到头遍历是可以保证的,这主要是取决于enq方法中代码编写逻辑来决定的。
ReentrantLock
基于AQS,JUC编程中可以实现公平锁和非公平锁来对共享资源进行同步,与synchronized一样支持可重入,在调度上更灵活,支持更多功能.同时还可以支持可中断锁机制的实现实现
实现Lock接口,具有lock tryLock unlock newCondition等方法
核心类 Sync 派生出公平锁FairSync类与非公平锁NonfairSync类
Sync
继承自AQS等抽象类,暴露lock方法用于派生类实现
NonfairSync
为什么会有非公平锁?为什么非公平锁的效率比公平锁更高?
个人理解:因为线程的挂起和唤醒需要一段时间,有一定开销,那么如果按照公平锁的方式,那肯定是当前线程释放锁后,肯定还需要唤醒下一个线程。但如果按照非公平锁的方式,如果当前线程释放锁后,存在一个刚开启的线程,它刚好被创建,则这个时候就不用被唤醒而直接进入就绪状态了。
主要就两个方法:基于对Sync抽象父类实现的lock方法以及对AQS抽象父类实现的tryAcquire方法
// NonfairSync
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
lock方法
lock方法实质就是先来一次CAS看锁是否被占用(不管前面是否有其他线程在等待),如果没有被占用,则占用该锁。如果这次没有获得锁,则会进入acquire方法。而aquire方法是AQS中的final修饰已实现的方法,内部会调用到tryAcquire这个方法,而NonfairSync中则重写了这个protected的方法
// AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// Sync
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 可重入性的体现
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
通过NonfairSync的lock方法顺序是:
- 先进行CAS来一次插队的非公平查看锁,如果锁被占用
- 再来一次非公平的CAS,并且这次是支持可重入的,如果锁被其他线程占用
- 走公平机制,进行排队获取
实现可重入方式的时候为什么要比较nextc正负?
因为state是基于int的,而当nextc正溢出时,则会产生负数,所以需要进行非负判断。也就可以说,可重入最大数目其实是有符号int表示的正数范围
FairSync
// FairSync
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 满足可重入特点
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
FairSync要求线程在获取锁时:
- 如果锁没被占用,则查看是否有排在当前线程前等待获取锁的线程,如果没有
- 如果当前锁正在被当前线程持有,还要满足可重入的特点
- 否则,则返回false,进行排队
ReentrantLock
默认的无参构造是非公平锁机制
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
// 实现可中断锁机制
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
值得注意的是,不管Sync是公平还是非公平的,这里tryLock其实是非公平获取锁的机制,这也就是为什么nonfairTryAcquire方法要在Sync中进行实现了
可中断锁机制的实现
Sync实现Lock接口提供了lock和lockInterruptibly方法。
lock方法与lockInterruptibly区别在于:
- lock考虑获取锁,获取锁后才响应之前的中断
- lockInterruptibly优先考虑响应中断
最终会进入到AQS到方法中。与之前的acquireQueued方法大同小异,作用就是线程通过这种方式等待获取锁时,在LockSupport.park挂起后,可以经过其他线程的interrupt唤醒,然后抛出异常。最终效果,也就是使得让等待中的线程立即中止。
// AQS
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 关键语句
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Condition
对于synchornized关键字,其内部可以用wait 和 notify进行线程等待或唤醒(wait必须在线程同步的代码块中使用)。但在Lock实现类例如ReentrantLock中,则可以通过Condition的await、signal来进行等待和唤醒
且ReentrantLock可以唤醒指定条件的线程,而Object的唤醒是随机的
与Synchronized区别
共同点:
- 用来协调多进程对共享对象的访问
- 可重入锁,保证可见性和互斥性
不同点:
- Re显示获得锁释放锁,sy隐式获得锁
- Re属于API级别锁,sy属于JVM层的锁
- Re可响应中断,sy不可以响应中断
- Re实现公平锁
- Re可通过Condition绑定多个条件
- 发生异常时 Re如果没有unlock去释放锁,则可能造成死锁,而sy会自动释放锁
- Lock可以让等待锁的线程响应中断,而sy不会,使用sy时等待锁的线程会一直等待而不会响应线程
- 通过Lock可以知道线程是否获得了锁,而sy无法的得知
Map
HashMap存在的线程问题:插入时可能出现环形链表
使HM的线程安全方式
- Collections.synchronizedMap()返回一个新的Map:使用synchronized进行互斥。使用代理new了一个新的🥱,直接锁住方法
- ConccurentHashMap:使用新的锁机制,将HM进行拆分。
Hashtable
与HashTable区别
- HT线程同步,HM线程不同步(实质就是每个方法都加了synchronized锁)
- HT不允许有键值有空值
- HM默认数量是16, 2倍增长库容,HT数组默认为11,增长 2x+1
既不支持Null key也不支持Null value
这是因为Hashtable使用的是安全失败机制(fail-safe),这种机制会使你此次读到的数据不一定是最新的数据。
如果你使用null值,就会使得其无法判断对应的key是不存在还是为空,因为你无法再调用一次contain(key)来对key是否存在进行判断,ConcurrentHashMap同理。
解决问题
多线程下添加元素的安全性
何为安全?多线程插入时的线程不安全问题 -- 用hashTable可解决
如何实现
synchronized 锁方法
ConcurrentHashMap
注意⚠️: JDK版本 1.7
Hashtable存在的问题:性能差,一把锁锁住了整个资源
采用策略:分段锁
ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable
核心内部类: Segment、HashEntry
内部变量
- DEFAULT_CONCURRENCY_LEVEL,CHM的并发程度。默认是 16,也就是说 ConcurrentHashMap 有 16 个 Segments,所以理论上,这个时候,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。
- loadFactor:负载因子,给每个Segment内部使用
HashEntry
类似Node的链表节点,但与HM的Entry不同,这里在初始化时还有一段代码
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
这里主要是获取next相对于此对象内存地址的偏移量,为了后续能在CAS操作中直接对next进行操作
Segment
Segment继承自ReentrantLock
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
transient volatile HashEntry<K,V>[] table;
/**
* The number of elements. Accessed only either within locks
* or among other volatile reads that maintain visibility.
*/
transient int count;
/**
* The total number of mutative operations in this segment.
* Even though this may overflows 32 bits, it provides
* sufficient accuracy for stability checks in CHM isEmpty()
* and size() methods. Accessed only either within locks or
* among other volatile reads that maintain visibility.
*/
transient int modCount;
/**
* 下一次需要扩容的阈值
* The table is rehashed when its size exceeds this threshold.
* (The value of this field is always <tt>(int)(capacity *
* loadFactor)</tt>.)
*/
transient int threshold;
操作
插入
// CHM
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
// 实际就是取hash的高4位
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
第一层Put: 这里也就是取key hash的高x位 (比如concurrencyLevel为2^n,则就取高n位,这里取决于segment初始设置成多少)作为所在segment的索引,如果所在segment为null,则初始化这个segment。
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
初始化的过程中会使用到UNSAFE借助offset去安全地检查值,为什么?因为可能有很多线程在并发地创建同一个segment,因此需要判断。在最终通过CAS操作去完成Segment的创建
而这里为什么会在CHM初始化时去初始化第一个segment呢?因为在这里接住segment[0]处的数组长度和负载因子来初始化segment[k]了,而为什么使用“当前”的segment[0]呢?因为segment[0]可能已经被扩容过了
第二层Put:
// Segment
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
这里先尝试通过tryLock快速获取该Segment的锁,如果获取到了
- 通过key到hash与该segment HashEntry数组长度-1进行与运算得到该元素在该segment下的HashEntry位置
- 拿到该HashEntry下的第一个元素(采用UNSAFE 线程安全方式)
- 此时就是在HashEntry中,也就是链表结构了。从前往后遍历,如果key值相等 或者hash相等且equals,则改变其值
- 如果遍历到最后还是没找到满足条件的node,则采用头插法,把node插入到该HashEntry中(判断是否需要对该Segment进行扩容)
scanAndLockForPut
如果第二层Put尝试获取锁失败,则会用该方法来获取锁
// Segment
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
先自己再进行一次tryLock,拿到锁了,直接返回null。如果没拿到锁:
- 如果该位置的HashEntry为空,则先创建一个HashEntry
- 如果不为空,则尝试去从前往后遍历key值相同的。如果没找到,最终会回到上一步。(可以理解为预先做一些事情,这样在第二层Put中就不用再次初始化node了)。
- 如果这时候出现了并发情况,也就是有其他的元素插入到了这个Segment的HashEntry数组的指定位置中,那就需要从该HashEntry的头开始,再去进行遍历
- 如果通过循环尝试获取锁次数超过指定值(单核次数为1多核次数为64),则lock阻塞自己,直到获取锁后返回
查询
根据hash和segmentShift、segmentMask等通过 UNSAFE类+offset 找到元素所在的segment,再去找元素所在的HashEntry索引,然后遍历此Entry链表。
需要注意的是查询操作是没有加锁的
扩容
当插入节点时,这个Segment中总节点数count大于该Segment可容量节点数目threshold时,将触发扩容操作。
// Segment
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
大致思想还是与HM一致,由于Segment是不能扩容的,只能去扩容Segment的HashEntry数组长度(2倍)。然后需要重新去计算Mask并对数组中的元素进行移位:最高位与1比较判断是否需要进行移位(idx值要么保持不变,要么为idx+oldCapcity).对原来HashEntry链表中的元素进行判断。JDK1.7的思想我认为是有一点优化的思想在里面:即从前往后遍历,试图找到一个节点lastRun,该节点到尾节点更新后idx值都是一样的,然后将lastRun直接插入到指定索引的Entry数组中,对于在lastRun之前的节点,那其值可能是idx也可能是idx+oldCapacity,就对每个节点的idx进行单独计算再头插。最终将插入导致rehash的这个新节点再头插入到指定的HashEntry中
⚠️:不同的Segment下的HashEntry数组长度可能是不同的
CAS
原理
乐观锁:假设程序汇总并发少,让线程不断尝试更新
若:内存位置的值 == 预期原值 -> 则赋予新值。通过while循环完成。
优点:不用进行用户态 内核态的切换
缺点:自旋时间不确定、开销大、只能保证一个共享变量的原子操作(多个共享变量可进行合并然后使用CAS--AtomicRefernce)
CAS是如何保证线程安全的?
如何保证Compare和Swap的原子性:通过UNSAFE类实现,提供硬件级别的原子操作(native方法)
ABA问题
CAS在操作时会检查值是否改变,但如果原来值为A,被其他线程通过CAS改成B,然后又被改成A,这个时候去进行CAS,发现值还是A,则不会更新值,但其实值已经发生了改变。如何解决这个问题,就需要使用版本号作为每次更改的依据,相当于(ArrayList中的modCount)。增加版本控制: AtomicStampedReference,不但会判断原值,还会比较版本信息
两种实现线程安全的方式
- 加锁: Synchronized/Lock
- 不加锁: CAS Loop & Volatile (轻量)
FutureTask详解
FutureTask为Future提供了基础实现,线程安全由CAS保证。常用来封装Callable和Runnable,作为一个任务提交到线程池中
Future接口
future提供了五个方法,用于cancel isCancel isDone
get get(timeout)
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
// 任务被取消、执行过程中发生异常也属于完成
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Thread只支持Runnable接口
通过线程池方式,submit一个callable ,同时线程池也i支持runnable形式,也一个 支持runnbale 的 返回一个future
源码解读
FutureTask实现了Runnable和Callable接口,所以FutureTask既能够当作一个Runnable被Thread执行,也能作为Future来得到Callable到计算结果(Callable作为其成员变量)。
Callable接口
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
成员变量
// volatile修饰, 保证在线程间的可见性
private volatile int state;
private static final int NEW = 0; // 新的任务或正在被执行的任务
private static final int COMPLETING = 1; // 任务完成或有异常,但结果还没有保存到outcome中
private static final int NORMAL = 2; // 任务已经执行完成且结果保存到outcome中
private static final int EXCEPTIONAL = 3; // 任务发生异常且原因保存到outcome中
private static final int CANCELLED = 4; // 调用cancel(false),任务被取消且不中断任务执行线程
private static final int INTERRUPTING = 5; // 调用cancel(true),任务被取消并且要中断任务执行线程但是还没有中断任务执行线程之前
private static final int INTERRUPTED = 6; // 调用cancel(true),任务被取消并且要中断任务执行线程后
/** 内置Callable变量,在完成后置为空*/
private Callable<V> callable;
/** 执行结果 */
private Object outcome; // non-volatile, protected by state reads/writes
// 跑Callable的线程
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
构造FutureTask
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
// 封装后的Callable
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
- 如果是传入Callable,则直接赋给内置的callable变量并更新state
- 如果传入Runnbale,则需要封装成实现了Callable接口的RunnableAdapter再赋值给callable变量
run方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 后续会讲这里为什么要讲runner置空
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
// 此处不太理解
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// CAS方式移除等待线程变量
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 后续会降到这个unpark
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // 置空callable
}
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
- 新建任务,将runner通过CAS替换为当前线程(CAS避免并发下多进程启动FutureTask造成run多次运行)
- 调用Callable接口的call方法,并捕获异常。
- 如果成功执行不抛出异常:CAS设置state状态为COMPLETING,赋值outcome,CAS设置state状态为NORMAL,并调用finishCompletion方法唤醒等待线程
- 如果执行失败抛出异常:CAS设置state状态为COMPLETING,赋值outcome为具体的异常Throwable,CAS设置state状态EXCEPTIONAL,调用finishCompletion方法唤醒等待线程。若线程未被挂起,则表明希望当前线程挂起
get方法
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
// 被取消,则返回取消异常
if (s >= CANCELLED)
throw new CancellationException();
// 产生异常则抛出指定异常
throw new ExecutionException((Throwable)x);
}
核心方法awaitDone
理解awaitDone需要理解FutureTask中的链表结构成员WaitNode,维护的是通过get方法等待该FutureTask结果的线程队列。所以实际情况中会有多个线程会调用到FutureTask到get方法,所以也就需要一个成员变量runner去区别当前线程,所以这也是为什么run方法中要进行runner置空的操作
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
// 注意这里的写法. q.next = waiters 返回的还是waiters(只是相当于简化了代码)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
// 注意这里的阻塞
LockSupport.park(this);
}
}
- 先看一下有没有等待限制,如果有就算一下截止时间
- 看该线程有没有被中断,如果被中断,则移除等待节点并抛出异常
- 如果当前状态为结束状态 state>COMPLETING,则置空等待节点的线程,并返回状态
- 如果当前状态为COMPLETING,则应该让出CPU 等待其执行好完毕
- 如果当前状态为NEW,表示正在执行:
- 如果等待节点为null,则创建一个等待节点(初始情况一定会被创建)
- 如果等待节点已被创建但未进入到等待中,则就排进链表中,采用头插法
- 如果等待节点已经被排进链表中,则可以挂起了
- 如果设置了超时时间,则看是否超时,如果超时了,就把当前等待线程移除链表并返回当前FutureTask状态(超时后会在get方法中通过该state判断并抛出超时异常)
- 如果设置了超时但尚未超时,则park进行有时限的阻塞
这里通过park阻塞了该线程,那么在哪里进行唤醒呢?
解铃还须系铃人,在run方法中,顺利完成Callable后会进行finishCompletion,这一步就是通过LockSupport.unpark依次去唤醒所有的等待线程
cancel
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
- 检查当前状态,如果为NEW,则通过CAS根据是否可被中断将其改为CANCELLED或者INTERRUPTING
- 如果是可被中断的cancel,则对当前线程进行打断,并设置线程已中断的状态
- 唤醒所有等待线程
启动
为什么不能直接调用futureTask的get方法?为什么会一直阻塞
FutureTask get 中涉及awaitDone的方法有死循环,这时候Callable都没有run起来,所以会进行park而无法被唤醒
*因此FutureTask有两种启动方式:
- 通过ExecutorService启动
- 通过Thread包装进行启动
futuretask必须放在thread中 ,start后才能进行isdone等的操作
(如何判断isDone isCancel ? 状态量)
threadpool无法执行futuetask,会返回null
但是
其实executor.submit a Future 其实也是跑了一个futuretask
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null); // 返回一个futuretask
execute(ftask);
return ftask;
}
延伸
Thread.start如何启动?Threadgroup?
如何停止线程
stop方法
立即停止线程 释放锁
为什么stop方法是Deprecated的?数据不一致问题
interrupt方法
设置中断标志位,在外部配合isInterrupted()或者interrupted方法进行判断
两者区别: interrupted会把标志位清空(所以:连续两次调用该方法时第二次返回false)
ThreadPoolExecutor
意义
减少开销,创建和销毁资源存在时间成本,进行一组线程的管理
父级结构
原理概述
提交一个任务到线程池后
- 当前线程池中在运行的线程数量是否少于corePoolSize
- 是:创建一个新的工作线程
- 否:判断BlockingQueue是否满了
- 没满:将线程放进BQ中
- 满了:如果创建一个新的工作线程使当前运行的线程数量数目超过maximumPoolSize,则交给RejectHandler来处理
源码分析
构造
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize: 线程池的核心线程数,就算是空的也不会被销毁
- maximumPoolSize:线程池能容纳的最大线程数目
- TimeUnit + keepAliveTime:空闲线程最大等待新线程进入的时间(超时将会被销毁)
- workQueue:保存等待被执行的任务的阻塞队列。BlockingQueue类型
- threadFactory: 创建线程的工厂,自定义的线程工厂可以给每个新建的线程设置线程名
- handler:线程的饱和策略,如果阻塞队列满了且没有空闲的工作线程。则会采用一以下的策略:
- AbortPolicy:直接抛出异常
- CallerRunsPolicy:用调用者所在的线程执行任务
- DiscardOldestPolicy:丢弃阻塞队列中最靠前的任务,并执行当前任务
- DiscardPolicy:直接丢弃该任务
三种预制的线程池类型
Executors.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
线程池线程数量达到corePoolSize后,即使线程池没有可执行的任务,也不会释放线程.
工作队列为无界队列LinkedBlockingQueue,队列容量为Integer.MAX_VALUE,因此keepAliveTime和饱和策略handler基本上是不会作用的
Executors.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
初始化的线程池中只能有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务。使用无界队列,所以handler和keepAlive基本失效
Executors.newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
最大线程数达到Integer.MAX_VALUE,使用SynchronousQueue作为阻塞队列,keepAlive会作用
关键属性
// 状态ctl是由Atmoic类管理的,高三位表示状态, 低29位表示线程数目
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 111 0000000..000
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 存放任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
// worker的集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 历史达到的worker最大值
private int largestPoolSize;
// 采用AQS锁
private final ReentrantLock mainLock = new ReentrantLock();
// 饱和策略
private volatile RejectedExecutionHandler handler;
// 常驻线程数目(不会被销毁)
private volatile int corePoolSize;
private volatile long keepAliveTime;
private volatile int maximumPoolSize;
- Running 111:接受新任务,也能处理阻塞队列的任务
- SHUTDOWN 000:不接受新任务但可以处理阻塞队列的任务
- STOP 001:不接受新任务,不处理阻塞队列里的任务,并且会中断处理过程中的任务
- TIDYING 010:当前线程池没有工作线程,调用terminated方法
- TERMINATED 011: terminated方法调用完成
工作流程
核心类
Worker
基于AQS并实现Runnable接口,实质是线程池的工作线程的实现。
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
Execute
// ThreadPoolExecutor
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- 通过ctl去获取当前线程数,如果小于核心线程数,添加一个工作线程
- 如果当前线程池还在运行中且阻塞队列中还能容纳这个任务(就把这个任务放进队列中)。则再次判断状态,如果当前线程没在运行就把这个任务移除,并进行reject策略
- 如果当前线程池处于Running状态但没有工作线程,则创建一个工作线程Worker
- 如果工作队列满了,则 尝试去创建一个工作线程,如果失败了,就执行reject
这里为什么需要double check ctl?
第一步判断了线程池正在运行且阻塞队列成功的放入了该任务。但可能下一个时刻线程池的状态就改变,所以也就是如果改变过程发生在检查线程池状态之前 加入任务到阻塞队列之后的话,这样就会出问题,所以要在成功插入队列后再进行一次检查,验证加入到队列之前ctl的状态都是没问题的。
AddWorker
private boolean addWorker(Runnable firstTask, boolean core)
- 先判断线程池是否在正常RUNNING态,如果是在SHUTDOWN态且
- 判断当前工作线程数是否超出规定(core为true则采用corePoolSize, false采用maximumPoolSize为依据)
- 通过CAS方式改变ctl来增加工作线程数目,如果CAS失败,则说明可能是ctl发生改变,重新再循环一次。如果成功:
- 初始化一个Worker,调用mainLock,上锁,再次检查线程池状态。检查线程是否被启动,若没有
- workers集合中添加这个工作线程(这也是为什么要上锁的主要原因),看是否需要更新下历史最大线程数,启动这个Worker线程。
- 如果添加这个工作线程失败,则将其从workers中移除,并减少工作线程数
这里出现了一种语法,之前没见过。其实是类似goto语句,这里作者可能只是不想设置标志位选择是彻底跳出上层循环还是从第二层循环返回第一层循环重新开始选择了使用这个语法。
tag: for(;;) { // .... break tag; // ... continue retry; }
需要注意的是,break tag表示跳到tag出且不再执行循环,continue是跳到tag处重新执行循环
runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- Worker先取出当前的第一个任务,并释放Worker自己的锁
- 判断该任务是否为空,如果为空则通过getTask方法从阻塞队列中拿一个任务过来(getTask可能会阻塞)。如果没有任务,则会通过while循环一直获取任务
- 对Worker上锁,保证线程不被其他线程中断
- 检查ctl,若线程池处于中断状态,则将线程中断
- 执行beforeExecute切面方法(ThreadPoolExecutor预留)
- run这个task,并捕获可能出现的异常
- 执行afterExecute方法(ThreadPoolExecutor预留)
- 完成收尾工作:置空task、completedTasks++,解锁Worker
- 如果产生异常,则会执行processWorkerExit,依据情况进行Worker的销毁或重建
getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
- 如果当前线程池状态为STOP或者已经在 SHUTDOWN状态中且阻塞队列中没有等待的线程,则会销毁该线程
- 如果(当前线程数目>线程池最大可容量线程数 || 设置了核心线程空闲一段时间后销毁且当前已经超时) && (当前工作线程>1 || 阻塞队列为空),则CAS减少当前线程数目并且销毁
- 根据是否允许核心线程idle超时后不被销毁采取从阻塞队列中用poll有限等待或take无限等待,如果超时则会设置超时标志。keepAliveTime也就是在此处生效
submit
由AbstractExecutorService实现,实质就是将Runnable、Callable等包装成一个RunnableFuture(实质上是一个FutureTask)并execute
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
ThreadLocal
为使用相同变量的每个线程都创建不同的存储。当使用ThreadLocal来维护变量时, ThreadLocal会为每个线程创建单独的变量副本, 避免因多线程操作共享变量而导致的数据不一致的情况。
不是用于解决共享变量的问题,也不是为了协调线程同步,而是为了方便每个线程处理自己的状态而引入的机制
应用:session管理、数据库链接管理
为什么会使用ThreadLocal呢?
考虑以下场景:
- 用户在打开Database连接或使的时候,以前写的代码往往是调用static Connection getConnection()这种方法去获取一个静态的connection,多个线程使用一个connection,在高并发的情况下很有可能一个线程准备关闭该connection,而另外一个线程正准备使用该connection,如果没有做额外的同步操作的话就会出问题。
- 如果将Connection变为普通类,则每个线程创建线程时都需要new一次,这能保证并发问题不发生,但同时,每个线程都要开启和关闭一个connection,这对服务器的压力是很大的。
源码梳理
Get & Set
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
}
Thread类中有ThreadLocalMap的成员变量threadLocals。这里主要通过根据当前线程拿到threadLocals, 然后再从该线程的ThreadLocalMap通过当前线程key拿到Entry,进而拿到key和value。如果map为空的话,则需要初始化
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
if (this instanceof TerminatingThreadLocal) {
TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this);
}
return value;
}
// 模板方式
protected T initialValue() {
return null;
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
初始化时通过initialValue设置key的value(如果有重写这个方法,就不为null),然后再创建map,并插入这个entry
总结一下:通过静态的方式获得线程隔离的变量
ThreadLocalMap
特点
- ThreadLocalMap没有实现Map接口,更没有继承AbstractMap,可以说它是Map但又不完全像Map那样强大
- 存放的key都是ThreadLocal<?>类型,计算hashCode由threadlocal去决定
- ThreadLocalMap的Entry基于 WeakReference
- 该方法仅用一个Entry数组来存储Key Value且Entry也不是链表形式,每个数组的slot中只存一个Entry。如果存在index相同或者容量超过threshold则需要rehash扩容(2倍扩容)。需要注意的是,如果扩容时idx还是相同,则会尝试把当前元素放在该idx后续位置。这也就造成了在getEntry的时候如果发现当前idx的key不等时,则需要依次比较后续位置的key
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
return;
}
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
这里还需要注意的一点是:ThreadLocalMap的Entry是WeakReference类型的,所谓WeakReference,表明其只能生存到下一次gc之前,所以当k==null时有可能是根本没有这个key也有可能是该Entry已经被回收了,则要调用replaceStableEntry替换过期的数据,做了什么还不知道。
内存泄漏
在使用线程池操作ThreadLocal的情况下可能出现问题:线程池中的线程可能工作完后不会停止,处于idle态,那么这时候进行gc,有WeakReference的ThreadLocal自然就被清理,但是这时候还存在Thread Ref -> Thread(Heap) -> ThreadLocalMap -> Entry -> Value的强连接引用,最终就导致key为null但value不为null
为避免出现内存泄漏,ThreadLocalMap提供remove方法,也就是找到key对应的idx位置并且清理掉这个entry
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
// 清除对ThreadLocal的弱引用, 置key为null
e.clear();
// 清除key为null的元素
expungeStaleEntry(i);
return;
}
}
}
正确使用ThreadLocal
- 每次调用完ThreadLocal都调用其remove方法清除数据
- 将ThreadLocal定义为private static,这样就一直存在ThreadLocal的强引用,无法被gc回收,也就能保证任何时候都能通过ThreadLocal的弱引用访问到Entry到value置
BlockingQueue
BlockingQueue
方法 | 立即抛异常 | 立即返回特定值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add | offer | put | offer |
移除 | remove | poll | take | poll |
检查 | element | peek |
各种阻塞队列
- ArrayBlockingQueue:有界的阻塞队列,内部实现是将对象放进数组中的,所以需要指定数组的初始大小,且**不支持扩容!**使用takeIndex和putIndex维护当前存在数组的元素索引区间
- LinkedBlockingQueue:使用链表维护的阻塞队列,可以存放无限多个元素(理论上)
- DelayQueue:对元素进行持有直到一个特定的延迟过期
- SynchronousQueue:内部同时只能容单个元素
BlockingDeque
表示一个线程放入和提取的双端队列,可以在任意一端插入或者拿走元素
方法 | 立即抛异常 | 立即返回特定值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | addFirst/addLast | offerFirst/offerLast | putFirst/putLast | offerFirst/offerLast |
移除 | removeFirst/removeLast | pollFirst/pollLast | takeFirst/takeLast | pollFirst/pollLast |
检查 | getFirst/getLast | peekFirst/peekLast |
应用
生产者消费者实现
基于Condition和Lock实现
public class Main {
static final int MAX_CAPACITY = 10;
static Lock lock = new ReentrantLock();
// 缓冲池不为空
static Condition notEmpty = lock.newCondition();
// 缓冲池不为满
static Condition notFull = lock.newCondition();
static Queue<Object> pool = new LinkedList<>();
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 300, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
for (int i = 0; i < 5; i++) {
threadPoolExecutor.submit(new Producer("s" + i));
threadPoolExecutor.submit(new Consumer("s" + i));
}
Thread.sleep(50000);
}
static class Producer implements Runnable {
private String name;
public Producer(String name) {
this.name = name;
}
@Override
public void run() {
while (true) {
lock.lock();
try {
while (pool.size() >= MAX_CAPACITY) {
// wait
System.out.println("Producer-" + name + " waiting");
notFull.await();
}
pool.add(new Object());
System.out.println("Producer-" + name + "add a object to pool, remain: " + pool.size());
notEmpty.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
static class Consumer implements Runnable {
private String name;
public Consumer(String name) {
this.name = name;
}
@Override
public void run() {
while (true) {
lock.lock();
try {
while (pool.size() <= 0) {
// wait
System.out.println("Consumer-" + name + " waiting");
notEmpty.await();
}
pool.poll();
System.out.println("Consumer-" + name + "consume a object to pool, remain: " + pool.size());
notFull.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}