本文主要来自于孤尽老师的《码出高效》和方腾飞老师等《Java 并发编程的艺术》两本书中关于线程池的笔记总结,侵删。
一、进程、线程、线程池的定义
进程
定义
进程是程序的一次执行,是一个程序及其数据处理在处理机上顺序执行时所发生的活动,是具有独立功能的程序在一个数据集合上运行的过程,它是系统进行资源分配和调度的一个独立单位。
进程的三种基本状态
- 就绪状态
- 执行状态
- 阻塞状态
线程
创建线程的方式
从比较广的范围上来说,可以有三种:
- 继承自 Thread 类(不符合里式代换原则)
- 实现 Runnable 接口(推荐)
- 实现 Callable 接口(可以获得返回值)
如果你在面试或者笔试中,你也可以从细粒度角度来划分线程的创建方式,大概有七种:
- 继承 Thread 类
- 实现 Runnable 接口
- 使用内部类的方式(lambda 表达式)
- 定时器
- 实现 Callable 接口
- 基于线程池的方式
- 使用 Spring 来实现多线程(@EnableAsync 和 @Async 注解)
线程的生命周期
- NEW(新建状态、出生状态)
- RUNNABLE(就绪状态)
- RUNNING(运行状态)
- BLOCKED(阻塞状体)
- DEAD(终止状态)
以上参考孤尽老师的《码出高效》是生命周期里存在五种状态,其实也可以在此基础上多加两个状态:线程的等待状态(wait)和休眠状态(sleep)。
线程的相关方法
- wait(),Object 中的方法,置线程为主动等待
- notify(),Object 中的方法,线程唤醒
- notifyAll(),Object 中的方法,唤醒所有所有处于等待状态的线程
- sleep(long millis),置线程为休眠状态,单位:毫秒
- 输入/输出请求,I/O 操作置线程为阻塞状态
- 等待输入/输出结束时,置线程为就绪状态
- run(),置线程为运行状态
- run() 执行完毕,置线程为死亡(终止)状态
- interrupt(),线程中断,置为死亡状态
进程与线程的区别
- 进程是一个具有独立功能的程序,是关于某个数据集合的执行活动,不同的进程拥有独立的内存空间;
- 线程是程序执行的最小单位,一个或多个线程组成一个进程,同一个进程中的所有线程共享相同的内存空间,运行时都有一个线程栈来保存变量值信息。
- JVM 中,线程的创建需要开辟虚拟机栈、本地方法栈、程序计数器等线程私有的内存空间。在线程销毁时需要回收这些系统资源。
线程池
就像我们企业项目中,连接 MySQL 需要有数据库连接池,它是为了避免每次进行数据库交互都要进行数据库连接的初始化到执行到销毁的繁琐且资源消耗过程,所有建立一个数据库连接池的概念,在这个池子中提前建立数据库连接,不断执行业务请求过来的 SQL 语句,然后通过池子统一管理数据库的连接和销毁工作,从而提高资源利用率,提高响应速度,降低资源消耗。
同理,Java 并发中对于多线程的利用也有线程池的概念,频繁地创建和销毁线程会浪费大量的系统资源,增加并发编程风险。并且,线程本身无法支持阻塞或者服务拒绝等一系列业务化的操作。这些东西我们需要通过线程池来解决。
线程池从字面上来理解,其实就是一个池子放了 N 个线程供调用。其实线程池是与工作队列密切相关的,是指管理一组同构工作线程的资源池。工作者线程从工作队列中获取任务,执行任务,然后返回线程池等待下一个任务。
二、线程池相关类
线程池相关类的设计属于 Executor 框架的一部分,Executor 框基于生产者-消费者模式来设计,提供一种标准的方法将任务的提交过程与执行过程解耦,从而为灵活且强大的异步任务执行框架提供基础。其中,提交任务的操作就相当于生产者,执行任务的线程就相当于消费者。相关类图如下:
用文字叙述一遍:
- Executor 作为顶级接口,只定义了一个任务执行的方法 execute。
- ExecutorService 作为自己接口实现了 Executor 并且丰富了任务执行的各种行为声明,比如如何提交一个任务、关闭线程池等。
- ScheduledExecutorService 接口继续继承 ExecutorService 接口并且声明一些定时、周期性任务执行的方法,最后 ScheduledThreadPoolExecutor 实现了这个接口(他也继承了线程池执行类 ThreadPoolExecutor)。
- 抽象类 AbstractExecutorService 是直接实现了 ExecutorService 接口,最后 ForkJoinPool 和 ThreadPoolExecutor 继承了这个抽象类,ForkJoinPool 主要体现工作窃取算法,ThreadPoolExecutor 就是我们用到的线程池管理类。
三、线程池的创建线程的主要处理流程
在 ThreadPoolExecutor 中对于线程的管理和创建主要体现那几个构造方法中。对于线程池中管理任务线程的步骤及思路如下:
- 如果当前运行的线程少于 corePoolSize(常驻核心线程数),则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)
- 如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue 队列
- 如果队列已满,则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)
- 如果创建新线程将使当前运行的线程超出 maximumPoolSize(线程池能够容纳同时执行的最大线程数),任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution() 方法执行拒绝策略。
相关源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
/** * 线程池构造方法 */ public ThreadPoolExecutor( // 常驻核心线程数 // 如果等于 0,则任务执行完之后,没有任何请求进入时销毁线程池的线程 // 如果大于 0,即使本地任务执行完毕,核心线程也不会被销毁 // 设置过大会浪费资源,设置过小会导致线程频繁的创建或销毁 int corePoolSize, // 线程池能够容纳同时执行的最大线程数 // 必须大于等于 1 // 如果待执行的线程数大于次数,需要借助 workQueue 参数,缓存在队列中 // 如果此数与 corePoolSize 相等,表示线程池的大小是固定的 int maximumPoolSize, // 线程池中的线程空闲时间 // 当空闲时间达到这个值的时候,线程会被销毁,知道只剩下 corePoolSize 个线程为止,避免浪费内存和句柄资源 // 默认当线程池的线程数大于 corePoolSize 时,此参数才会起作用 // 当 变量设置为 true 时,核心线程超时也会被回收 long keepAliveTime, TimeUnit unit, // 时间单位,通常是 TimeUnit.SECONDS // 缓存队列 // 当请求的线程数大于 maximumPoolSize 时,线程进入 BlockingQueue 阻塞队列 BlockingQueue<Runnable> workQueue, // 线程工厂 // 用来生产一组相同任务的线程 // 线程池的命名就是通过给这个 factory 增加组名前缀实现 ThreadFactory threadFactory, // 执行拒绝策略的对象,限流保护 // 当 workQueue 的任务缓存区上限的时候,就通过该策略处理请求 RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) { throw new IllegalArgumentException(); } // 要求队列、线程工厂、拒绝处理服务必须有实例对象 if (workQueue == null || threadFactory == null || handler == null) { throw new NullPointerException(); } this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } /** * 线程池的执行 * 步骤及思路 * 1.如果当前运行的线程少于 corePoolSize(常驻核心线程数),则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁) * 2.如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue 队列 * 3.如果队列已满,则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁) * 4.如果创建新线程将使当前运行的线程超出 maximumPoolSize(线程池能够容纳同时执行的最大线程数),任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution() 方法执行拒绝策略 */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 返回包含线程数及线程池状态的 Integer 类型数值 int c = ctl.get(); // 如果工作线程数小于核心线程数,则创建线程任务并执行 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) { return; } // 如果创建失败,防止外部已经在线程池中加入新任务,重新获取线程数及线程池状态 c = ctl.get(); } // 只有线程池处于 RUNNING(线程池可以接受新任务) 状态,才执行置入队列的操作 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果线程池不是 RUNNING 状态,则将刚加入队列的任务移除 if (! isRunning(recheck) && remove(command)) { reject(command); } // 如果之前的线程已被消费完,新建一个线程 else if (workerCountOf(recheck) == 0) { addWorker(null, false); } } // 核心池和队列都已经满了,尝试创建一个新线程 else if (!addWorker(command, false)) { // 如果 addWorker 返回的是 false,表示创建失败,就会唤醒拒绝策略 reject(command); } } |
这里面还有一个 addWorker 方法的源码,负责线程的创建,老四没有放上来。不过你可以在的 GitHub 上 JDK 源码中文注释项目中看到所有的关于线程池底层代码的中文注释,JDK 源码的阅读以及学习是一个漫长而又困难的过程,我也会一直持续更新自己的学习进度,很多学习笔记都会记录在各个源码类和方法上,可以一起学习和进步。
四、通过线程池静态工厂 Executors 创建线程池
Executors 属于一个静态工厂类,主要是帮助我们实例化线程池的包装对象,所以这里面自然包括 ThreadPoolExecutor 的创建(另外两个分别是:ScheduledThreadPoolExecutor、ForkJoinPool),一共有五种方式:
- Executors. newFixedThreadPool(int nThreads):nThreads参数代表固定线程数,既是核心线程数也是最大线程数,所以不存在空闲线程,keepAliveTime = 0;使用 LinkedBlockingQueue 队列,基于链表结构的阻塞队列,此队列按 FIFO 排序元素,吞吐量通常比较高。
- Executors. newWorkStealingPool():JDK8 新引入的工作窃取式线程池,创建持有足够线程的线程池支持给定的并行度,并通过使用多个队列减少竞争,CPU 数量会被设置为默认的并行度。返回 ForkJoinPool(JDK7 引入)对象,AbstractExecutorService 的子类。
- Executors. newSingleThreadExecutor():创建一个单线程的线程池,相当于但线程串行执行所有任务,保证按任务的提交顺序依次执行。使用 LinkedBlockingQueue 阻塞队列。
- Executors. newCachedThreadPool():这是高度可伸缩的线程池,如果工作线程处于空闲状态,就回收工作线程。不过 maximumPoolSize 最大可以到整型最大数,存在 OOM 风险。使用 SynchronousQueue 队列,一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue。
- Executors. newScheduledThreadPool(int corePoolSize):定时及周期性任务执行线程池,相比 Timer,ScheduledExecutorService 更安全,功能更强大。特点是不回收线程,而 newCachedThreadPool 回收工作线程,不过和 newCachedThreadPool 一样,maximumPoolSize 最大可以到整型最大数,存在 OOM 风险。
五、如何向线程池提交任务
有两种方式向线程池提交任务,一个就是 Executor 框架的中基本接口 Executor 声明的 execute() 方法,如上源码所示,具体实现在 ThreadPoolExecutor 中,不过 execute() 方法用于提交不需要返回值的任务,所以我们无法判断任务是否被线程池执行成功。
另一种方式实现在 AbstractExecutorService 抽象类中,它是 ExecutorService 接口声明的 submit() 方法,该方法可以用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功。县官源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
/** * 用于提交需要返回值的任务。 * 线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功。 * 也可以通过 future.get() 方法来获取返回值。 * 不过 get() 方法会阻塞当前线程直到任务完成。 * 也可以使用 get(long timeout,TimeUnit unit) 方法,该方法会阻塞当前线程一段时间后立即返回,不过这时候有可能任务没有执行完。 */ public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } |
六、关闭线程池的两种方法
关闭线程池的原理
遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,不过无法响应中断的任务可能永远无法终止。
关闭线程池的两种方式及区别
线程池的关闭有 shutdown() 和 shutdownNow() 两种方式,前者只是将线程池的状态设置成 SHUTDOWN(不再接受新任务,但可以继续执行队列中的任务) 状态,然后再中断所有没有正在执行任务的线程。
而后者首先将线程池的状态设置成 STOP(全面拒绝,并中断正在处理的任务),然后尝试停止所有的正在执行或暂停任务的线程,并且返回等待执行任务的列表。
七、线程池的使用场景
如果按照《手册》的要求,线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。但是实际中,线程适合并发任务量大且单个线程执行时间短的场景。
八、如何合理的配置线程池
任务的性质
任务的分类
- CPU密集型任务
- IO密集型任务
- 混合型任务(上面二者均有)
任务的优先级:高、中和低
任务的执行时间:长、中和短
任务的依赖性:是否依赖其他系统资源,如数据库连接。
根据任务的性质划分线程池配置的重点
- CPU密集型任务应配置尽可能小的线程,一般配置 Ncpu+1 个线程的线程池。
- 由于 I/O 密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如 2 * Ncpu。业界有一个计算公式:Nthreads = Ncpu * Ucpu * (1 + W/C),Ucpu取值在 0 到 1 之间,代表 cpu 的使用率,W/C 代表计算时间等待的比率。
- 混合型的任务,如果可以拆分,将其拆分成一个 CPU 密集型任务和一个 I/O 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。
- 优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先执行。
- 执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。
- 依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,等待的时间越长,CPU 空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用 CPU。
- 一定要使用有界队列,避免 OOM 风险。
小结
影响线程池线程大小配置的因素
- CPU 的个数
- 内存的大小
- 任务的性质(分类、优先级、执行时间、依赖性)
- 网络连接,比如 JDBC(属于任务性质中依赖性的分类)等
获取当前设备 CPU 数量
Runtime.getRuntime().availableProcessors();
九、线程池的优势、作用总结
- 降低资源消耗:利用线程池管理并复用线程、控制最大并发数等降低线程创建和销毁造成的资源消耗
- 提高响应速度:当请求到达,池中的线程已经蓄势待发,避免了创建线程带来的开销
- 提高线程的可管理性,隔离线程环境
- 实现任务线程队列缓存策略和拒绝机制
- 实现某些与时间相关的功能,如定时执行、周期执行等
- 合适的线程池容量可以有效的防止多线程相互竞争资源而使内存溢出
十、线程池使用相关注意事项及约束
托管线程池中的线程都是后台线程,使用 new Thread 方式创建的线程默认都是前台线程。
创建线程或线程池时请指定有意义的线程名称,方便出错时回溯。
这一条在本博客已经讨论过多次,可以参考文末下方的相关阅读文章。
线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
企业级项目最好是通过线程池来处理并发任务。
线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的构造方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
Executors 提供的集中线程池其实是各有各的特点,这点的强制性要求其实要求我们在使用中注意自己的业务关联,配置自己最合理的线程池而不是随意拿一个原生的底层线程池就使用,比如 newFixedThreadPool 和 newSingleThreadExecutor 允许队列的长度是整形的最大值,相当于无界队列,这是有资源耗尽、 OOM 风险的;同样 newCachedThreadPool 和 newScheduledThreadPool 都允许创建整形最大值的线程数量,同样也是 OOM 风险。
不过其实不允许使用有点夸张了,对于基本的业务操作直接使用恰当的原生线程池创建方法即可,不然放在那里干嘛呢?
十一、相关文章阅读
- 阿里巴巴Java开发规约第一章-并发处理篇 吐血浅析
- [笔记]琐碎的Java基础知识不整理版随手记 持续更新
- Java 并发编程之死锁的简单总结
- [转载]创建线程或线程池时请指定有意义的线程名称,方便出错时回溯 并发编程网 – ifeve.com
- Java十一道由浅入深的笔面试题第二期 详细解析
- 浅析Java中的Fork和Join并发编程框架
更博不易,如果觉得文章对你有帮助并且有能力的老铁烦请捐赠盒烟钱,点我去赞助。或者扫描文章下面的微信/支付宝二维码打赏任意金额(点击「给你买杜蕾斯」),也可扫描小站放的支付宝领红包二维码,线下支付享受优惠的同时老四也可以获得对应赏金,老四这里抱拳谢谢诸位了。捐赠时请备注姓名或者昵称,因为您的署名会出现在赞赏列表页面,您的捐赠钱财也会被用于小站的服务器运维上面,再次抱拳感谢。