为什么使用线程池?如何合理使用线程池?

new Thead的弊端

  1. 项目中每次使用的线程的时候new Thread都需要创建对象,性能差.
  2. 缺乏统一管理,不断的new Thread会导致竞争激烈,导致系统性能下降OOM.

线程池的好处

  1. 对线程资源缓存,线程资源可复用,减少线程对象创建,回收的开销,性能更好.
  2. 可以控制并发数量,减少竞争,提高系统资源利用率.

线程池参数

以最全的线程池创建参数为例:

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler){
}
  1. corePoolSize核心线程的数量.当核心数量的线程是空闲的时候也不会被销毁.除非我们设置了allowCoreThreadTimeOut为true. 保证核心数量的线程存活,能帮助应对突发流量.线程池的初始线程数量是0.线程池在接收任务的时候,如果当前线程数量小于核心线程数量并且没有空闲线程资源, 这时候就会创建新的线程来处理任务.

  2. maximumPoolSize最大线程数量.当工作队列已经满了并且核心线程数量不够用的时候,就考虑是否创建更多的线程来处理工作,但是线程池中线程数量不能超过最大线程数量.

    • workQueue设置成容量固定的队列的时候,当工作队列已经满了并且核心线程数量不够用的时候会创建更多的线程.
    • workQueue设置成无界队列的时候,当核心线程数量不够用的时候并不会创建更多的线程.
  3. keepAliveTime线程存活时间.当线程池中线程数量大于核心线程的数量的时候,超过空闲时间的非核心线程就会被销毁.

  4. unitkeepAliveTime的时间单位.

  5. workQueue是线程的工作队列.等待执行的任务会存放在阻塞队列中.workQueue的设置会影响maximumPoolSize是否生效.

  6. threadFactory 在创建新的线程的时候被使用.通常使用传入threadFactory来定义新创建线程的名称.

1
2
3
4
5
6
        // 自定义线程池命名
        StringBuilder threadPoolName = new StringBuilder(namePrefix);
        threadPoolName.append("-%d");
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                                        .setNameFormat(threadPoolName.toString())
                                        .build();
  1. handler拒绝策略.当工作队列已满,线程池的线程数量达到最大线程数量,没有空闲线程的时候来了新的任务,这时候就需要使用拒绝策略来进行处理. 拒绝策略包含一下四种.
    • ThreadPoolExecutor.AbortPolicy 放弃任务执行,并且抛出异常(默认的拒绝策略)
    • ThreadPoolExecutor.CallerRunsPolicy 用调用线程池的线程执行当前任务
    • ThreadPoolExecutor.DiscardPolicy 静默的抛弃任务执行
    • ThreadPoolExecutor.DiscardOldestPolicy 抛弃工作队列中最老的任务,执行当前任务.

创建线程池

理解了创建线程池参数的含义,可以自已定义一个线程池. 创建一个核心线程数量是4,最大线程数量是8,线程空闲存活时间是4s,工作队列大小是100,自定义线程创建类,拒绝策略是静默的抛弃任务执行的线程池.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ThreadPoolTest {

    @Test
    public void test1() throws InterruptedException {
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("文件上传-%d")
                .build();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,
                8,
                4,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                threadFactory,
                new ThreadPoolExecutor.DiscardPolicy());
        System.out.println(threadPoolExecutor.getPoolSize());
        // 执行108个任务,使得工作队列已经满的时候,核心线程不够用,创建到最大线程数量8
        for (int i = 0; i < 108; i++) {
            threadPoolExecutor.execute(() -> {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        // 刚执行108个任务后线程池线程数量
        Thread.sleep(1090);
        System.out.println(threadPoolExecutor.getPoolSize());

        // 执行完任务后线程超过空闲时间后线程池线程的数量
        Thread.sleep(4000);
        System.out.println(threadPoolExecutor.getPoolSize());
    }
}

输出预期结果:

1
2
3
0
8
4

执行任务

  1. 使用线程池执行任务不需要根据异步任务的执行结果进行某些处理
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
    /**
     * 执行任务,不需要等待线程执行的结果
     */
    @Test
    public void test2(){
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("test2-%d")
                .build();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,
                8,
                4,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                threadFactory,
                new ThreadPoolExecutor.DiscardPolicy());

        threadPoolExecutor.execute(()->{
            System.out.println("thread execute ....");
        });
    }
  1. 执行任务,需要在异步任务处理完成之后进行某些操作.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    /**
     * 执行任务,需要在异步任务处理完成之后进行某些操作
     */
    @Test
    public void test3(){
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("test2-%d")
                .build();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,
                8,
                4,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                threadFactory,
                new ThreadPoolExecutor.DiscardPolicy());

        Future<String> submit = threadPoolExecutor.submit(() -> {
            System.out.println("thread execute do something one....");
            return "task complete can do something";
        });

        // do something two

        try {
            // 等待异步任务完成.
            String result = submit.get();
            // do something
            System.out.println(result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

在实际开发中,用户进行请求,我们需要处理something one和something two. 我们需要在处理完something one和something two两个任务之后给用户返回.如果两个事件一个耗时100ms一个耗时80ms并且没有因果关系.那么用上述写法整个接口 的耗时就是约为100ms.这是我们优化接口性能的一种思路.将接口中没有因果关系的处理操作,放入submit里面去执行.将接口时间优化为最长因果关系路径的执行时间.

关闭线程池

  1. 关闭线程池.等待正在执行和工作队列中的任务执行完成后关闭
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
    /**
     * 关闭线程池.等待正在执行和工作队列中的任务执行完成.
     */
    @Test
    public void test4(){
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("test4-%d")
                .build();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,
                8,
                4,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                threadFactory,
                new ThreadPoolExecutor.DiscardPolicy());

        threadPoolExecutor.execute(()->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("thread execute ....");
        });

        threadPoolExecutor.shutdown();
        try {
            Thread.sleep(2100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

输出:

1
thread execute ....
  1. 关闭线程池.等待正在执行的任务完成和抛弃工作队列中的任务
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
 /**
     * 关闭线程池.等待正在执行的任务完成和抛弃工作队列中的任务.
     */
    @Test
    public void test5(){
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("test5-%d")
                .build();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,
                8,
                4,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                threadFactory,
                new ThreadPoolExecutor.DiscardPolicy());

        threadPoolExecutor.execute(()->{
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("thread execute ....");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("thread execute ....");
        });

        threadPoolExecutor.shutdownNow();
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

输出:

1
2
3
4
5
6
7
8
9

java.lang.InterruptedException: sleep interrupted
	at java.lang.Thread.sleep(Native Method)
thread execute ....
	at com.kklt.test.juc.ThreadPoolTest.lambda$test5$4(ThreadPoolTest.java:150)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
thread execute ....

我们可以看到 threadPoolExecutor.shutdownNow();的调用把正在休眠的线程唤醒把任务执行完成.

监控线程池

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
     * 常用的监控线程池的方法
     */
    @Test
    public void test6() throws InterruptedException {
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("test6-%d")
                .build();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,
                8,
                4,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100),
                threadFactory,
                new ThreadPoolExecutor.DiscardPolicy());
        System.out.println("线程池中线程数量:"+threadPoolExecutor.getPoolSize());
        System.out.println("已经执行和未执行任务数量:"+threadPoolExecutor.getTaskCount());
        System.out.println("执行完成任务数量:"+threadPoolExecutor.getCompletedTaskCount());
        System.out.println("正在执行任务线程数量:"+threadPoolExecutor.getActiveCount());
        System.out.println("--------------------------------------------------------");
        // 执行108个任务,使得工作队列已经满的时候,核心线程不够用,创建到最大线程数量8
        for (int i = 0; i < 108; i++) {
            threadPoolExecutor.execute(() -> {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        System.out.println("线程池中线程数量:"+threadPoolExecutor.getPoolSize());
        System.out.println("已经执行和未执行任务数量:"+threadPoolExecutor.getTaskCount());
        System.out.println("执行完成任务数量:"+threadPoolExecutor.getCompletedTaskCount());
        System.out.println("正在执行任务线程数量:"+threadPoolExecutor.getActiveCount());
        System.out.println("--------------------------------------------------------");
        // 刚执行108个任务后线程池线程数量
        Thread.sleep(1090);
        System.out.println("线程池中线程数量:"+threadPoolExecutor.getPoolSize());
        System.out.println("已经执行和未执行任务数量:"+threadPoolExecutor.getTaskCount());
        System.out.println("执行完成任务数量:"+threadPoolExecutor.getCompletedTaskCount());
        System.out.println("正在执行任务线程数量:"+threadPoolExecutor.getActiveCount());
        System.out.println("--------------------------------------------------------");

        // 执行完任务超过空闲时间后线程池线程的数量
        Thread.sleep(4000);
        System.out.println("线程池中线程数量:"+threadPoolExecutor.getPoolSize());
        System.out.println("已经执行和未执行任务数量:"+threadPoolExecutor.getTaskCount());
        System.out.println("执行完成任务数量:"+threadPoolExecutor.getCompletedTaskCount());
        System.out.println("正在执行任务线程数量:"+threadPoolExecutor.getActiveCount());
        System.out.println("--------------------------------------------------------");
    }

输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
线程池中线程数量:0
已经执行和未执行任务数量:0
执行完成任务数量:0
正在执行任务线程数量:0
--------------------------------------------------------
线程池中线程数量:8
已经执行和未执行任务数量:108
执行完成任务数量:0
正在执行任务线程数量:8
--------------------------------------------------------
线程池中线程数量:8
已经执行和未执行任务数量:108
执行完成任务数量:108
正在执行任务线程数量:0
--------------------------------------------------------
线程池中线程数量:4
已经执行和未执行任务数量:108
执行完成任务数量:108
正在执行任务线程数量:0
--------------------------------------------------------

线程池数配置

  1. 对于计算密集型任务,建议线程的数量设置为: cpu数量*单个cpu线程数+1
  2. 对于IO密集型任务,建议线程的数量设置为: cpu数量单个cpu线程数2

需要注意的是:当任务的执行时间很短的时候.短到线程切换的代价更大,不建议使用多线程处理.

jdk线程池方法

jdk为我们提供了一些创建线程池的方法.不建议使用.

1
2
3
4
5
6
7
    @Test
    public void test7(){
        Executors.newSingleThreadExecutor();
        Executors.newCachedThreadPool();
        Executors.newFixedThreadPool(4);
        Executors.newScheduledThreadPool(4);
    }

这些方法最终都会调用我们上述的创建线程池的参数最多的构造方法.因此理解了参数的意义,完全可以创建出和jdk自带方法等价的线程池.