部分内容来源:JavaGuide
使用ThreadPoolExecutor代码示例
package com.example.threadpool.Test;
import org.apache.catalina.filters.RemoteIpFilter;
import org.springframework.stereotype.Component;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
//用来测试使用我们的ThreadPoolExecutor
@Component
public class ThreadPoolExecutorTest {
public static final int CORE_POOL_SIZE = 5;//核心线程数
public static final int MAX_POOL_SIZE = 10;//最大线程数
public static final int QUEUE_CAPACITY = 100;//队列的最大数量
public static final Long KEEP_ALIVE_TIME = 1L;//保持连接的时间
public static final ThreadPoolExecutor kiraExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) {
for (int i = 0; i <100 ; i++) {
kiraExecutor.submit(new Runnable() {
@Override
public void run() {
System.out.println("niubi");
}
});
}
}
}
package com.example.threadpool.Test;
import org.apache.catalina.filters.RemoteIpFilter;
import org.springframework.stereotype.Component;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
//用来测试使用我们的ThreadPoolExecutor
@Component
public class ThreadPoolExecutorTest {
public static final int CORE_POOL_SIZE = 5;//核心线程数
public static final int MAX_POOL_SIZE = 10;//最大线程数
public static final int QUEUE_CAPACITY = 100;//队列的最大数量
public static final Long KEEP_ALIVE_TIME = 1L;//保持连接的时间
public static final ThreadPoolExecutor kiraExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) {
for (int i = 0; i <100 ; i++) {
kiraExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("niubi");
}
});
}
}
}
我们用submit()或者executor()来提交我们的任务
我们都是提交我们的异步任务,既然是异步任务我们就要弄一个线程
要不就连接Runnable接口,要不就继承Thread来实现一个任务类
要不就直接new 一个Runnable,
还可以使用Callable,Future,FutureTask
package com.example.threadpool.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class MyTest1 {
public static void main(String[] args) {
test1();
}
private static void test1() {
ExecutorService es = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
es.submit(new MyRunnable(i));
}
}
private static void test2() {
ExecutorService es = Executors.newCachedThreadPool(new ThreadFactory() {
int n=1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"自定义线程名称"+n++);
}
});
for (int i = 0; i < 10; i++) {
es.submit(new MyRunnable(i));
}
}
private static void test3() {
ExecutorService es = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
es.submit(new MyRunnable(i));
}
}
}
//任务类,包含一个任务编号
class MyRunnable implements Runnable{
private int id;
public MyRunnable(int id)
{
this.id=id;
}
@Override
public void run() {
String name=Thread.currentThread().getName();
System.out.println(name+"执行了任务..."+id);
}
}
为什么要用线程池
1.降低资源消耗:通过重复利用已创建的线程来降低线程创建造成的消耗
2.提高响应速度:任务到达时,任务不需要等到线程创建就能立即执行
3.提高线程的可管理性:线程是稀缺资源,如果无限制创建不仅会消耗系统资源还会降低系统的稳定性,我们可以通过线程池进行统一的分配,调优和监控
说一下线程池的执行流程
有一个核心线程数和一个最大线程数
- 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。
- 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。
- 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。
- 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,调用拒绝策略会调用RejectedExecutionHandler.rejectedExecution()方法。
为什么我们要在核心线程数后放入队列而不是在达到最大线程数的时候放入队列
线程创建的开销
- 创建和销毁线程是有一定开销的(CPU 资源、内存分配等),所以我们尽量少创建一些线程,把线程尽量控制到一定数量
- 如果直接达到最大线程数再放入队列,会导致频繁创建和销毁线程,从而增加系统负担,降低性能
避免过多线程竞争资源
- 如果每次任务到来都优先创建新线程(直到最大线程数),可能导致线程池中线程数量迅速增长增加线程上下文切换的开销,影响整体性能。 (避免短时间内生成大量线程增加上下文切换的开销,同时也算是作为一种缓冲机制来防止OOM)
- 优先使用任务队列可以避免线程池在短时间内生成过多线程,从而提升线程池的稳定性
我们的线程池为什么要队列
防止任务丢失
线程池中线程的数量有限,当任务数量超过线程池能够立即处理的范围时,队列提供了一种机制来保存多余的任务,防止任务丢失
提高任务处理稳定性(类似于限流,削峰)
在突发任务高峰时,任务到达的速度往往会超过线程的处理速度。队列可以将任务暂存,平滑任务到达和处理之间的差距,防止短时间内创建大量线程起到一个缓冲的作用
说一下线程池的参数
核心线程数 corePoolSize
最大线程数 maximumPoolSize
任务队列(全局共享) taskQueue
最大空闲时间 keepAliveTime
时间单位 Unit
线程工厂 threadFactory 可以用来给线程取名字
拒绝策略 handler
介绍一下线程池的几种工作队列
1.直接提交队列(SynchronousQueue
)
特点:
不存储任务,任务提交后必须直接交给线程处理。
如果线程池中没有空闲线程来处理任务,任务会被拒绝。
每个任务提交时都需要找到一个线程来执行,否则会阻塞提交方。
适用场景:
适用于任务量大且执行速度快的场景。
常用于需要最大化线程池并发性能的情况。
线程池配置建议:
通常搭配无界线程池(如 Executors.newCachedThreadPool()
)使用,线程池可以动态扩展线程数量。
优点:
简化设计,适合处理短生命周期任务。
缺点:
在高并发且线程数有限的情况下容易导致任务被拒绝。
2. 无界队列(LinkedBlockingQueue
)
可做无界和有界队列,默认情况下其容量为Integer.MAX_VALUE所以可以看成无界队列
特点:
是一个无界阻塞队列,队列长度没有限制(理论上是 Integer.MAX_VALUE
)。
如果线程池的线程全部繁忙,新提交的任务会进入队列等待,而不会被拒绝。
线程池中的线程数不会超过核心线程数,因为多余的任务都进入了队列。
适用场景:
适用于任务执行速度较慢且需要限制线程数的场景。
常用于固定线程池(如 Executors.newFixedThreadPool()
)或需要稳定线程数的应用。
线程池配置建议:
核心线程数应配置较大,最大线程数没有意义,因为线程池永远不会创建超过核心线程数的线程。
优点:
避免任务丢失,适合高负载的场景。
缺点:
如果任务量过多可能导致内存占用过高。
3. 有界队列(ArrayBlockingQueue
)
特点:
是一个有界阻塞队列,需要指定队列的容量。
当队列已满且线程池线程数已达到最大值时,提交任务会触发拒绝策略。
适用场景:
适用于需要限制任务队列长度的场景,以防止过多任务导致内存溢出。
线程池配置建议:
核心线程数和队列长度要根据实际任务量合理配置,避免过早触发拒绝策略。
优点:
可以通过设置队列大小限制任务数量,防止系统资源被过多任务耗尽。
缺点:
当队列满时,新任务可能会被拒绝。
4. 优先队列(PriorityBlockingQueue
)
特点:
是一个基于优先级的无界队列,任务会根据优先级排序(需要任务实现 Comparable
接口或传入 Comparator
)。
优先级高的任务会优先被线程执行。
适用场景:
适用于需要对任务排序并优先处理某些任务的场景。
线程池配置建议:
与无界队列类似,最大线程数可能没有意义。
优点:
任务按优先级执行,灵活性高。
缺点:
优先级排序可能会带来额外的性能开销。
5. 延迟队列(DelayQueue
)
特点:
是一个无界阻塞队列,任务必须实现 Delayed
接口。
只有延迟时间到期的任务才会被线程执行。
适用场景:
适用于需要延迟执行任务的场景,比如定时任务调度。
线程池配置建议:
常用于自定义线程池,专门处理定时任务。
优点:
支持任务延迟,适合定时和调度场景。
缺点:
由于是无界队列,可能导致内存问题。
线程池常用的阻塞队列有哪些
有界阻塞队列 LinkedBlockingQueue(链表实现)
同步队列 SynchronizedQueue
延迟队列 DelayedWorkQueue
有界阻塞队列 ArrayBlokcingQueue(数组实现,容量一旦创建就不能修改)
说一下线程池的拒绝策略
此时线程数已经达到我们的最大线程数,然后我们的队列也放满了任务
我们有4种拒绝策略
1.AbortPolicy(默认的拒绝策略)
2.CallerRunsPolicy
3.DiscardPolicy
4.DiscardOldestPolicy
1.AbortPolicy抛出一个RejectException,拒绝新任务
AbortPolicy是我们的默认的拒绝策略,线程池会直接抛出 RejectedExecutionException
异常
2.CallerRunsPolicy我们提交这个任务的线程来执行这个任务,也就是我们调用execute方法的线程(如果此时执行程序关闭,就会丢弃该任务)
3.DiscardPolicy不处理新任务,直接丢弃掉
线程池会直接丢弃该新任务,不会抛出任何异常,也不会有任何提示信息
4.DiscardOldestPolicy丢弃最早的未处理的任务请求,然后提交最新的任务
什么是CPU密集型?什么是IO密集型?
1. CPU密集型
定义:
CPU密集型任务主要消耗的是CPU的计算能力,需要大量的计算操作,例如浮点运算、复杂的数学计算、加密解密、数据压缩等。任务的性能主要取决于CPU的计算速度和并行处理能力(多核、多线程)。
特征:
CPU使用率很高,几乎总是接近100%。
内存、磁盘、网络等IO操作相对较少。
CPU是主要的性能瓶颈。
常见的场景:
图像和视频处理(如编码、解码、渲染)。
科学计算(如模拟、数值分析)。
大规模机器学习训练(如深度学习模型训练)。
游戏引擎的物理计算或AI逻辑
2. IO密集型
定义:
IO密集型任务主要消耗的是IO设备的访问能力,例如磁盘IO、网络IO或内存IO。任务的性能主要取决于IO操作的速度和吞吐量。
特征:
CPU利用率较低,大部分时间处于等待IO完成的状态(如磁盘读写、网络数据传输)。
程序运行时间受限于IO操作的延迟,而不是CPU的处理速度。
IO是主要的性能瓶颈。
常见的场景:
文件读写(如日志记录、大文件处理)。
网络服务(如Web服务器、数据库查询)。
数据传输任务(如爬虫下载、视频流)。
API调用(需要访问外部系统或服务)
如果每个业务各开一个线程池,那么线程池里面的参数是怎么考虑的?核心线程数最大线程数怎么设置?任务队列怎么考虑?拒绝策略怎么考虑?
1. 核心线程数(corePoolSize)
核心线程数是线程池在空闲时仍然保留的线程数量
cpu密集型是为了高效
io密集型是为了更多的线程能够io
如何设置:
CPU密集型任务:
如果任务主要消耗CPU资源(例如复杂计算),核心线程数应接近CPU核心数或稍低。
推荐公式:corePoolSize = CPU核心数 + 1
理由:线程数等于CPU核心数可以最大化利用CPU,同时加上1是为了处理偶尔的上下文切换或其他轻量任务。
IO密集型任务:
如果任务涉及大量的IO操作(如网络请求、数据库操作、文件读写),线程会因IO等待而阻塞,可以使用更多的线程以提高吞吐量。
推荐公式:corePoolSize = CPU核心数 * 2
或 CPU核心数 / (1 - 阻塞系数)
阻塞系数 = IO等待时间 / (IO等待时间 + CPU计算时间)。例如,如果阻塞系数是0.8,则corePoolSize ≈ CPU核心数 * 5
。
混合型任务:
如果任务既有CPU密集部分又有IO操作,需要分析两部分的比例并综合考虑,或将混合任务分解到不同线程池中分别处理。
额外建议:
如果任务需要快速响应,可以适当提高corePoolSize
,确保有足够的线程来处理任务。
根据实际场景,可以通过压测调整,找到最优核心线程数。
2. 最大线程数(maximumPoolSize)
最大线程数是线程池能容纳的最大线程数量,当任务队列已满时会创建非核心线程
如何设置:
最大线程数一般是核心线程数的2~5倍。
任务突增: 如果业务可能出现流量高峰(例如秒杀、抢购场景),可以设置一个较高的最大线程数来承载突增流量。
有限资源保护: 如果系统资源有限,不建议设置过高,防止线程过多导致上下文切换或OOM(内存溢出)。
回收策略: 非核心线程会在keepAliveTime
后被回收,因此最大线程数只在任务爆发时起作用
如果用了每个业务各开一个线程池导致服务器挂了怎么办
共享线程池减少线程池数量
因为我们为每个业务都开一个线程池
我们线程池过多造成服务压力过大从而导致了OOM(内存耗尽,OUT OF MEMORYERRO)
所以我们最简单的一点可以想到
从业务本身来考虑
1.我们可以优化我们的业务逻辑,让我们的业务逻辑处理更快,占用线程的时间更少
2.我们可以对我们的Java服务的接口进行熔断限流
限流,例如Sentinel
熔断,例如Hystrix
从线程池本身来考虑
我们可以监控线程池,美团的动态线程池里面就实现了我们的监控告警功能
如果让你设计一个多线程导致服务器挂掉的预警工具该怎么设计?
我们肯定要监控线程池里面你的具体细节
1.检查当前活跃线程数
2.检查队列里面的等待的任务的个数
3.检查CPU状态
4.邮件通知告警模块,通知到微信钉钉进行警告,让工作人员即使处理。及时通过通知告警错误
5.记录日志,记录线程的状态,CPU占用,栈堆信息。并且使用我们的日志分析工具,例如我们的ELK可以可视化日志信息然后我们来进行分析。及时通过日志定位错误
核心线程数可不可以设置为0
可以
核心线程数为0的时候,会创建一个非核心线程来执行
从源码角度进行分析
当核心线程数为 0 时,来了一个任务之后,会先将任务添加到任务队列,同时也会判断当前工作的线程数是否为 0,如果为 0,则会创建线程来执行线程池的任务
说一下Java内置线程池的种类
Cache 缓存线程池
使用SynchronousQueue直接提交队列
队列的容量为0,实际不存储任何任务,它只负责对任务进行中转和传递,所以效率比较高
它的特点在于线程数是几乎可以无限增加的,队列的容量为0,实际不存储任何任务,它只负责对任务进行中转和传递,所以效率比较高。
Fixed 固定大小线程池线程池
使用LinkedBlockingQueue
核心线程数和最大线程数是一样的
而且就算任务队列满了,到了本该继续增加线程数的时候,由于它的最大线程数和核心线程数是一样的,所以也无法再增加新的线程了
Single 单例线程池
使用LinkedBlockingQueue
它会使用唯一的线程去执行任务,原理和FixedThreadPool 是一样的,只不过这里线程只有一个
也就是最大线程数和核心线程数都是1
Shcedule 延迟线程池
使用DelayedWorkQueue
可以设置定期的执行任务,它支持定时或周期性执行任务,比如每隔 10 秒钟,执行一次任务,我通过这个实现类设置定期执行任务的策略
我们该如何创建使用我们的线程池
如何创建使用线程池
1.使用Java内置线程池
2.使用ThreadPoolExecutor手动指定参数创建线程池
阿里巴巴开发规范
Java 中的 Executors 类定义了一些快捷的工具方法,来帮助我们快速创建线程池
《阿里巴巴 Java 开发手册》中提到,禁止使用这些方法来创建线程池
这一条规则的背后,是大量血淋淋的生产事故
最典型的就是 newFixedThreadPool和 newCachedThreadPool可能因为资源耗尽导致 OOM 问题
所以,不建议使用 Executors 提供的两种快捷的线程池,原因如下:
1.我们需要根据自己的场景,并发情况来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合需求。一般都需要设置有界的工作队列和可控的线程数。
2.任何时候,都应该为自定义线程池指定有意义的名称,以方便排查问题。当出现线程数暴增、线程死锁、线程占用大量CPU、线程执行出现异常等问题时,我们往往会抓取线程栈。此时,有意义的线程名称,就可以方便我们定位问题。
除了建议手动命名线程池以外,我还建议使用一些监控手段来观察线程池的状态。线程池池化组件往往会表现得很节约,默默无闻,除非出现了拒绝策略,否则压力大时不会抛出一个异常。如果我们能提前观察到线程池队列的积压,或者线程数量的快速膨胀,往往可以提早发现并解决问题。
线程池的shutDown()和shutDownNow()方法有什么用
shutDown
用了以后会置状态为SHUTDOWN,正在执行的任务会继续执行下去,没有被执行的任务则中断
此时,则不能再往线程池中添加任何任务,否则将会抛出RejectedExecutionException 异常
shutDownNow
shutdownNow 为STOP,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务。
它试图终止线程的方法是通过调用 Thread.interrupt() 方法来实现的,
但是这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出
线程池的核心线程会被回收吗
不会被回收
但我们设置为true就可以回收空闲的核心线程了
如果不允许丢弃任务,应该选择哪个拒绝策略
只要程序不关闭,就会使用执行execute方法的线程执行该任务
也就是我们的CallerRunsPolicy拒绝策略
CallerRunsPolicy拒绝策略有什么风险?应该如何解决?
风险
提交任务的是主线程,可能会导致主线程阻塞
从输出结果可以看出,因为CallerRunsPolicy
这个拒绝策略,导致耗时的任务用了主线程执行,导致线程池阻塞,进而导致后续任务无法及时执行,严重的情况下很可能导致 OOM(内存耗尽)
如何解决
从问题的本质入手
我们可以增加阻塞队列的大小
我们也可以调整线程池的最大线程数量
任务持久化思路
设计一个MYSQL的任务表
用Redis缓存任务
将任务提交到消息队列中
实现逻辑
核心在于自定义拒绝策略和阻塞队列
1.实现RejectedExecutionHandler
接口自定义拒绝策略
2.继承BlockingQueue
实现一个混合式阻塞队列,重写take()
方法,取任务时优先从数据库中读取最早的任务,数据库中无任务时再从 ArrayBlockingQueue
中去取任务
我们队列满的情况下,取任务优先从数据库中读取最早的任务,数据库中没任务了我们再从列队里面去取任务
线程池中线程异常后,销毁还是复用
1.销毁:使用execute()提交任务
如果任务是execute()提交到线程池,然后执行过程中抛出异常,
如果这个异常没有被捕获,那么该异常就会导致线程终止,并且异常会被打印出来(打印到控制台或者日志文件)
线程池会检测到这种线程终止,并创建一个新的线程替换它,从而保持配置的线程数不变
2.复用:使用submit()提交任务
如果在任务执行中发生异常,这个异常不会被打印出来
异常会被封装到由submit()返回的Future对象中,我们可以调用Future.get()方法,来捕获一个ExecutionException。
这种情况下,线程不会因为异常而终止,它会继续存在于线程池中,准备执行后续任务
使用Submit()和execute()提交任务有什么区别?
1.使用execute()提交任务:
如果任务是execute()提交到线程池,然后执行过程中抛出异常,
如果这个异常没有被捕获,那么该异常就会导致线程终止,并且异常会被打印出来(打印到控制台或者日志文件)
线程池会检测到这种线程终止,并创建一个新的线程替换它,从而保持配置的线程数不变
2.使用submit提交任务:
如果在任务执行中发生异常,这个异常不会被打印出来
异常会被封装到由submit()返回的Future对象中,我们可以调用Future.get()方法,来捕获一个ExecutionException。
这种情况下,线程不会因为异常而终止,它会继续存在于线程池中,准备执行后续任务
如何给线程池命名
1.利用guava
2.自己实现ThreadFactory
如何设计一个能根据任务的优先级来执行的线程池
使用PriorityBlockingQueue作为任务队列
两种方式
一:提交到线程池的任务实现Comoarable接口,并重写copareTo方法来指定任务之间的优先级比较规则
二:创建PriorityBlockingQueue时,传入一个Comparator对象来指定任务之间的排序规则