线程池
在常见的应用开发过程中手动操作线程的场景并不是很多,特别是框架满天飞的今天。虽然和常见的业务处理相比手动使用多线程的场景不多,虽然不多但总是有的。线程的开销主要花费在创建和销毁的过程,所以考虑到线程的复用问题。如果要基于这个目标,自定义一个线程池需要考虑哪些问题?拍脑袋的想法是维护一定数量的线程保持着不被回收,有任务需要处理的时候找个空闲的线程去执行,执行完成后不销毁继续保持空闲存活状态。基于这个想法先尝试着弄个简单的线程池玩一下:
自定义线程池
MyPersistentThread - 一个不会被回收的线程1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65package threadDemo;
/**
* 一个不会被回收的线程
* Created by xiehui1956(@)gmail.com on 2020/3/25
*/
public class MyPersistentThread extends Thread {
/**
* 线程池
*/
private MyThreadPool myThreadPool;
/**
* 任务
*/
private Runnable runnable;
/**
* 是否关闭
*/
private boolean isShutDown = false;
public MyPersistentThread(Runnable runnable, String threadName, MyThreadPool myThreadPool) {
super(threadName);
this.myThreadPool = myThreadPool;
this.runnable = runnable;
}
/**
* 重写run方法
*/
@Override
public void run() {
//不关闭,就回收
while (!isShutDown) {
if (null != runnable) {
runnable.run();
}
myThreadPool.add(this);
try {
synchronized (this) {
//线程空闲,等待任务
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public synchronized void setTarget(Runnable runnable) {
this.runnable = runnable;
//通知run方法,接客
notifyAll();
}
public synchronized void shutDown() {
isShutDown = true;
//通知run方法销毁线程
notifyAll();
}
}
MyThreadPool - 线程池1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109package threadDemo;
import java.util.List;
import java.util.Vector;
/**
* 线程池
* Created by xiehui1956(@)gmail.com on 2020/3/25
*/
public class MyThreadPool {
private MyThreadPool() {
idleThreads = new Vector(5);
threadCounter = 0;
}
private static class SingleMyThreadPool {
public static final MyThreadPool myThreadPool = new MyThreadPool();
}
public static MyThreadPool getInstance() {
return SingleMyThreadPool.myThreadPool;
}
/**
* 空闲线程放入线程池或关闭
*
* @param myPersistentThread
*/
public synchronized void add(MyPersistentThread myPersistentThread) {
if (!isShutDown) {
idleThreads.add(myPersistentThread);
} else {
myPersistentThread.shutDown();
}
}
/**
* 关闭所有线程任务
*/
public synchronized void shutDownAllThread() {
isShutDown = true;
idleThreads.forEach(myPersistentThread -> {
myPersistentThread.shutDown();
});
}
/**
* 执行任务
*
* @param runnable
*/
public synchronized void start(Runnable runnable) {
MyPersistentThread myPersistentThread;
if (0 < idleThreads.size()) {
int lastOne = idleThreads.size() - 1;
myPersistentThread = idleThreads.get(lastOne);
idleThreads.remove(myPersistentThread);
myPersistentThread.setTarget(runnable);
} else { //没有空闲线程,则新建
threadCounter++;
myPersistentThread = new MyPersistentThread(runnable, "MPThread #" + threadCounter, this);
myPersistentThread.start();
}
}
/**
* 空闲的线程队列
*/
private List<MyPersistentThread> idleThreads;
/**
* 当前线程总数
*/
private int threadCounter;
/**
* 是否关闭
*/
private boolean isShutDown = false;
public List<MyPersistentThread> getIdleThreads() {
return idleThreads;
}
public void setIdleThreads(List<MyPersistentThread> idleThreads) {
this.idleThreads = idleThreads;
}
public int getThreadCounter() {
return threadCounter;
}
public void setThreadCounter(int threadCounter) {
this.threadCounter = threadCounter;
}
public boolean isShutDown() {
return isShutDown;
}
public void setShutDown(boolean shutDown) {
isShutDown = shutDown;
}
}
为了测试方便,简单封装了下统计时间的模板:1
2
3
4
5
6
7
8package threadDemo;
/**
* Created by xiehui1956(@)gmail.com on 2020/3/25
*/
public interface TimeHandler {
void doSomething(TimeHandler timeHandler);
}
1 | package threadDemo; |
1 | package threadDemo; |
测试代码:
1 | package threadDemo; |
测试结果:
- 当times为10的时候:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43int times = 10;
@Test
public void testNoThreadPool() {
timeHandlerAdapter.doSomething(new MyTest() {
@Override
public void doSomething(TimeHandler timeHandler) {
for (int i = 0; i < times; i++) {
new Thread(() -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
});
}
77
@Test
public void testUseThreadPool() {
timeHandlerAdapter.doSomething(new MyTest() {
@Override
public void doSomething(TimeHandler timeHandler) {
for (int i = 0; i < times; i++) {
MyThreadPool.getInstance().start(() -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
});
}
81
以上,使用线程池执行任务需要花费更多时间。
- 当times为1000的时候:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41@Test
public void testNoThreadPool() {
timeHandlerAdapter.doSomething(new MyTest() {
@Override
public void doSomething(TimeHandler timeHandler) {
for (int i = 0; i < times; i++) {
new Thread(() -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
});
}
198
@Test
public void testUseThreadPool() {
timeHandlerAdapter.doSomething(new MyTest() {
@Override
public void doSomething(TimeHandler timeHandler) {
for (int i = 0; i < times; i++) {
MyThreadPool.getInstance().start(() -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
});
}
149
以上,不使用线程池执行任务需要花费更多时间。
总结,并不是所有场景下使用线程池都可以带来性能优化。上面封装的线程池虽然不怎么样,但是用来屡屡思路还是可以的。幸运的是JDK也封装了一套线程池。
Executor
总结下JDK提供的Executor框架,使用这个框架可以很好的进行线程控制。Executor使用的设计模式是工厂模式,工厂模式常分为三种:1. 简单工厂;2. 工厂方法;3. 抽象工厂。严格的说Executor使用的是抽象工厂模式,工厂模式的这三种细分对产品分层的设计是逐层细化,这样更加的灵活增加了内聚性减少了耦合性。当然对应的编码量也是逐步增多的。
Executor主要提供的方法有:
- newFixedThreadPool()-这个方法创造的是一个有固定线程数量的线程池。有新任务提交时,如果有空闲线程就立即执行。如果没有就任务将被存放到任务队列中等待空闲线程。
- newSingleThreadExecutor()-这个方法创造的是单个线程的线程池,如果提交的任务大于1就会被保存到任务队列中等待空闲线程,按照先进先出的顺序执行。这个线程池在有需要保证线程执行顺序的任务场景下可以使用。
- newCachedThreadPool()-这个方法创造的是一个没有固定线程数量的线程池。有新任务提交时有空闲的线程就复用,没有空闲线程就创建。
- newSingleThreadScheduledExecutor()-这个方法创造的是一个单个线程的线程池,和newSingleThreadExecutor()不同之处在于加入了Scheduled定时任务功能。所以这个线程可以设置到某个固定的时间执行任务,也可以设置周期性执行任务。
- newScheduledThreadPool()-这个方法创造一个可以指定线程数量又带有Scheduled定时任务的线程池。
方法使用实例
newFixedThreadPool
1 | ExecutorService service = Executors.newFixedThreadPool(3); |
newSingleThreadExecutor
1 | ExecutorService service = Executors.newSingleThreadExecutor(); |
newCachedThreadPool
1 | ExecutorService service = Executors.newCachedThreadPool(); |
newSingleThreadScheduledExecutor
1 | long start = System.currentTimeMillis(); |
newScheduledThreadPool
1 | long start = System.currentTimeMillis(); |
Executors实现逻辑
从具体方法入口,可以发现不同的方法都是基于ThreadPoolExecutor的不同实现,往上翻可以发现这个构造方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize - 核心线程数
maximumPoolSize - 最大线程数
keepAliveTime - 大于核心线程数的线程存活时间,需要执行的任务超过核心线程数后会进入任务队列,队列放满以后会扩充线程到最大线程数。如果超过核心线程数的那部分线程空闲了这个参数设置的时间会被回收。
unit - 时间单位
workQueue - 任务队列
threadFactory - 线程工厂
handler - 拒绝策略,服务器能处理的任务到达阀值的时候后续任务的拒绝策略。
任务队列常见实现:
- 直接提交的队列:SynchronousQueue - 没有容量,每一个插入操作都要等待一个相应的删除操作,不保存任务。没有空闲线程就创建线程,如果到达最大线程数就执行拒绝策略。
- 有界任务队列:ArrayBlockingQueue - 带有容量,到达最大容量执行新建线程。到达最大线程数执行拒绝策略。
- 无解任务队列:LinkedBlockingQueue - 和有界队列相比,无解队列上不封顶。无法执行的任务就放到队列,任务到达核心线程数也不会新建线程。
- 优先任务队列:PriorityBlockingQueue - 带有先后顺序的线程队列,是一个特殊的无解队列。无论是ArrayBlockingQueue还是LinkedBlockingQueue都是按照先进先出算法处理任务,PriorityBlockingQueue可以根据任务自身的优先级顺序先后执行。
拒绝策略:
- AbortPolicy - 直接抛出异常,组织系统正常运行。
- CallerRunsPolic - 当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。
- DiscardOledestPolicy - 丢弃即将被执行的任务,并尝试提交当前任务。
- DiscardPolicy - 抛弃当前的任务。