池化技术思想主要是提前缓存大量的资源,减少每次获取资源的消耗,提高对资源的利用率,特别是在高并发场景这种提高更加明显。常见的池化技术包含线程池、数据库连接池、http连接池、对象池、内存池、代理池等等。本文就分析并实现线程池。
问题
(0)为什么要使用线程池技术?
new Thread的优缺点
1 | 优点:通过new Thread()创建线程的API简单易用,结构清晰,对于执行单一的一次性任务十分便利。 |
线程池的优点:
1 | 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。 |
(1)自己动手写一个线程池需要考虑哪些因素?
(2)自己动手写的线程池如何测试?
简介
线程池是Java并发编程中经常使用到的技术,那么自己如何动手写一个线程池呢?
属性分析
线程池,顾名思义它首先是一个“池”,这个池里面放的是线程,线程是用来执行任务的。
首先,线程池中的线程应该是有类别的,有的是核心线程,有的是非核心线程,所以我们需要两个变量标识核心线程数量coreSize和最大线程数量maxSize。
为什么要区分是否为核心线程呢?这是为了控制系统中线程的数量。
当线程池中线程数未达到核心线程数coreSize时,来一个任务加一个线程是可以的,也可以提高任务执行的效率。
当线程池中线程数达到核心线程数后,得控制一下线程的数量,来任务了先进队列,如果任务执行足够快,这些核心线程很快就能把队列中的任务执行完毕,完全没有新增线程的必要。
当队列中任务也满了,这时候光靠核心线程就无法及时处理任务了,所以这时候就需要增加新的线程了,但是线程也不能无限制地增加,所以需要控制其最大线程数量maxSize。
其次,我们需要一个任务队列来存放任务,这个队列必须是线程安全的,我们一般使用BlockingQueue阻塞队列来充当,当然使用ConcurrentLinkedQueue也是可以的(注意ConcurrentLinkedQueue不是阻塞队列,不能运用在jdk的线程池中)。
最后,当任务越来越多而线程处理却不及时,迟早会达到一种状态,队列满了,线程数也达到最大线程数了,这时候怎么办呢?这时候就需要走拒绝策略了,也就是这些无法及时处理的任务怎么办的一种策略,常用的策略有丢弃当前任务、丢弃最老的任务、调用者自己处理、抛出异常等。
根据上面的描述,我们定义一个线程池一共需要这么四个变量:核心线程数coreSize、最大线程数maxSize、阻塞队列BlockingQueue、拒绝策略RejectPolicy。
另外,为了便于给线程池一个名称,我们再加一个变量:线程池的名称name。
所以我们得出了线程池的属性及构造方法大概如下:
1 | public class MyThreadPoolExecutor implements Executor { |
任务流向分析
根据上面的属性分析,基本上我们已经得到了任务流向的完整逻辑:
首先,如果运行的线程数小于核心线程数,直接创建一个新的核心线程来运行新的任务。
其次,如果运行的线程数达到了核心线程数,则把新任务入队列。
然后,如果队列也满了,则创建新的非核心线程来运行新的任务。
最后,如果非核心线程数也达到最大了,那就执行拒绝策略。
代码逻辑大致如下:
1 |
|
创建线程逻辑分析
首先,创建线程的依据是正在运行的线程数量有没有达到核心线程数或者最大线程数,所以我们还需要一个变量runningCount用来记录正在运行的线程数。
其次,这个变量runningCount需要在并发环境下加加减减,所以这里需要使用到Unsafe的CAS指令来控制其值的修改,用了CAS就要给这个变量加上volatile修饰,为了方便我们这里直接使用AtomicInteger来作为这个变量的类型。
然后,因为是并发环境中,所以需要判断runningCount < coreSize(或maxSize)(条件一)的同时修改runningCount CAS加一(条件二)成功了才表示可以增加一个线程,如果条件一失败则表示不能再增加线程了直接返回false,如果条件二失败则表示其它线程先修改了runningCount的值,则重试。
最后,创建一个线程并运行新任务,且不断从队列中拿任务来运行【本篇文章由公众号“彤哥读源码”原创】。
代码逻辑如下:
1 | private boolean addWorker(Runnable newTask, boolean core) { |
取任务逻辑分析
从队列中取任务应该使用take()方法,这个方法会一直阻塞直至取到任务或者中断,如果中断了就返回null,这样当前线程也就可以安静地结束了,另外还要注意中断了记得把runningCount减一。
1 | private Runnable getTask() { |
好了,到这里我们自己的线程池就写完了,下面我们一起来想想怎么测试呢?
测试逻辑分析
我们再来回顾下自己的写的线程池的构造方法:
1 | public MyThreadPoolExecutor(String name, int coreSize, int maxSize, |
name,这个随便传;
coreSize,我们假设为5;
maxSize,我们假设为10;
taskQueue,任务队列,既然我们设置的是有边界的,我们就用最简单的ArrayBlockingQueue好吧,容量设置为15,这样里面最多可以存储15条任务;
rejectPolicy,拒绝策略,我们假设使用丢弃当前任务的策略,OK,我们来实现一个。
1 | public class DiscardRejectPolicy implements RejectPolicy { |
OK,这样一个线程池就创建完成了,下面就是执行任务了,我们假设通过for循环连续不断地添加100个任务好不好。
1 | public static void main(String[] args) { |
我们分析下这段程序:
(1)先连续创建了5个核心线程,并执行了新任务;
(2)后面的15个任务进了队列;
(3)队列满了,又连续创建了5个线程,并执行了新任务;
(4)后面的任务就没得执行了,全部走了丢弃策略;
(5)所以真正执行成功的任务应该是 5 + 15 + 5 = 25 条任务;
运行之:
1 | thread name: core_test1 |
可以看到,创建了5个核心线程、5个非核心线程,成功执行了25条任务,完成没问题,完美^^。
总结
(1)自己动手写一个线程池需要考虑的因素主要有:核心线程数、最大线程数、任务队列、拒绝策略。
(2)创建线程的时候要时刻警惕并发的陷阱;
完整源码
Executor接口
1 | public interface Executor { |
MyThreadPoolExecutor线程池实现类
1 | package com.funzzz.pool; |
RejectPolicy拒绝策略接口
1 | public interface RejectPolicy { |
DiscardRejectPolicy丢弃策略实现类
1 | public class DiscardRejectPolicy implements RejectPolicy { |
测试类
1 | public static void main(String[] args) { |
参考
1 | https://mp.weixin.qq.com/s/Mbh6OuHnC4vhrca5wfvsdw |
- 本文作者: 初心
- 本文链接: http://funzzz.fun/2021/03/05/手写线程池-1-无返回值线程池/
- 版权声明: 本博客所有文章除特别声明外,均采用 MIT 许可协议。转载请注明出处!