理解线程池 ThreadPoolExecutor 参数和任务拒绝策略

HYF Lv3

在当今高度并发的软件开发世界中,多线程编程已成为必不可少的一部分。然而,多线程编程不仅提供了性能的提升,同时也带来了复杂性和挑战。想象一下,在一个需要同时处理数百甚至上千个任务的应用中,如何高效地管理线程、避免资源耗尽以及应对任务提交高峰。这就是线程池作为一种解决方案的价值所在。

本文将带您深入探讨线程池,这个在多线程应用中经常被提到却鲜为人知的概念。我们将解释为何需要线程池,它是如何工作的,以及如何通过配置参数和选择适当的任务拒绝策略来提高应用性能和可维护性。

1 为什么需要线程池

在现代软件开发中,我们经常需要处理大量并发任务,从用户请求的处理到后台任务的执行,都可能需要同时处理多个任务。多线程是一种常见的处理并发任务的方式,但直接创建和管理线程可能会导致一系列问题和挑战。

1.1 资源管理的挑战

直接创建线程会占用系统资源,每个线程都需要一定的内存和操作系统资源。大量的线程创建和销毁会导致资源的极度浪费。
如果同时创建大量线程,可能会导致操作系统资源不足,甚至引发系统崩溃。

1.2 线程生命周期管理

手动管理线程的生命周期(创建、启动、休眠、恢复、销毁等)非常复杂,容易出现错误。不正确的线程管理可能导致线程泄漏或线程过多,影响应用的稳定性。
如果不合理地维护线程,可能会导致线程资源未被释放,造成内存泄漏。

1.3 性能优化的需求

频繁地创建和销毁线程会带来性能开销,因为线程的创建和销毁本身就需要时间和资源。
对于任务处理频繁的场景,频繁地创建和销毁线程会导致应用的性能下降。

为了克服上述问题,引入了线程池作为一种解决方案。线程池是一个维护着多个工作线程的线程集合,这些线程可以重复使用,从而减少了线程的创建和销毁开销。通过线程池,我们可以更加有效地利用系统资源,更好地管理线程的生命周期,并且可以优化并发任务的处理性能。

在接下来的内容中,我们将深入探讨线程池的各个参数以及任务拒绝策略,帮助您更好地理解如何配置和使用线程池来满足不同场景的需求。

2 线程池的基本工作原理

线程池是一种用于管理和执行多线程任务的机制,它通过有效地重用线程、管理线程的生命周期以及调度任务的执行,从而提供了一种更加可控和高效的方式来处理并发任务。

2.1 线程池是如何工作的:创建、管理和执行线程任务

线程创建和管理:

在应用启动时,线程池会创建一定数量的工作线程,这些线程处于等待任务的状态。这些线程会被池化(保留在内存中)以供后续任务使用,避免了频繁地创建和销毁线程的开销。

任务提交:

当应用中需要执行一个任务时,可以将任务提交给线程池。这个任务可以是实现了 Runnable 或 Callable 接口的对象,代表需要执行的操作。

任务调度和执行:

线程池会从工作线程池中选择一个空闲的线程,将任务分配给它进行执行。如果没有空闲线程,任务可以进入任务队列等待。一旦线程完成任务执行,它可以被重新分配到新的任务,从而实现线程的重用。

2.2 线程池的核心组件:任务队列、工作线程、任务提交

任务队列:

任务队列是线程池的一个关键组件,用于存放待执行的任务。如果线程池中的所有线程都在工作,新提交的任务会被放入任务队列中等待执行。任务队列可以是有界队列(如数组、链表)或无界队列(如链表、优先队列)。

工作线程:

工作线程是线程池的实际执行者,它们从任务队列中获取任务并执行。线程池会根据需要创建、销毁和管理工作线程,以满足当前任务的处理需求。

任务提交:

任务提交是应用程序将任务交给线程池的过程。这些任务可以是需要并行执行的操作,如数据处理、网络请求等。任务可以由线程池的客户端(即应用程序代码)提交,触发线程池的任务调度和执行。

通过这种方式,线程池将任务的提交和执行解耦,使得应用程序能够更加高效地利用可用的系统资源,减少线程的创建和销毁开销,同时也确保了线程的安全性和管理。

3 常用线程池类: ThreadPoolExecutor和ThreadPooltaskExecutor

ThreadPoolExecutor 和 ThreadPoolTaskExecutor 都是与线程池相关的类,通常用于在多线程应用程序中管理和调度线程的执行。它们在 Java 中有不同的实现和用法。

3.1 ThreadPoolExecutor:

ThreadPoolExecutor 是 Java 标准库中提供的一个类,用于创建和管理线程池。它提供了非常灵活的配置选项,可以手动调整核心线程数、最大线程数、线程空闲时间等参数,以满足不同场景的需求。ThreadPoolExecutor 需要开发者自己实例化并配置,然后通过调用 execute 或 submit 方法来提交任务。示例:

1
2
3
4
5
6
7
8
9
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
);

executor.execute(new RunnableTask());

简单介绍一下 ThreadPoolExecutor 的几个参数:

1.corePoolSize (int):

这是线程池的核心线程数,即线程池中始终保持活跃的线程数量。即使这些线程在执行任务时闲置,也不会被回收。如果任务数少于核心线程数,线程池会创建新线程来执行任务。

2.maximumPoolSize (int):

这是线程池的最大线程数,包括核心线程数在内。在线程池中,允许同时存在的最大线程数。如果任务数超过核心线程数,且线程池中的线程数还未达到最大线程数,线程池会创建新线程来执行任务。

3.keepAliveTime (long):

当线程池中的线程数超过核心线程数,并且这些线程在执行完任务后变得空闲,它们会被保留在池中一段时间,等待可能的新任务。keepAliveTime 表示空闲线程的存活时间,单位由下一个参数 unit 指定。

4.unit (TimeUnit):

这个参数与 keepAliveTime 一起使用,表示存活时间的单位,可以是 TimeUnit.MILLISECONDS、TimeUnit.SECONDS 等等。

5.workQueue (BlockingQueue):

这是用于存放待执行任务的队列。任务会在队列中等待被线程取出并执行。BlockingQueue 是一个阻塞队列,它可以是 LinkedBlockingQueue、ArrayBlockingQueue 等。

6.threadFactory (ThreadFactory):

这个参数是一个线程工厂,用于创建新的线程。通过指定不同的线程工厂,你可以自定义线程的创建方式,例如设置线程的名称、优先级等。

7.handler (RejectedExecutionHandler):

当线程池无法继续接受新任务并且已达到最大线程数时,会执行拒绝策略。RejectedExecutionHandler 定义了如何处理被拒绝的任务,可以选择使用默认的拒绝策略(抛出异常或忽略任务),或者自定义拒绝策略。

3.2 ThreadPoolTaskExecutor:

ThreadPoolTaskExecutor 是 Spring 框架中提供的一个用于管理线程池的类。它是对 ThreadPoolExecutor 的封装,提供了更多的功能和配置选项,同时也更易于集成到 Spring 应用程序中。ThreadPoolTaskExecutor 实现了 Spring 的TaskExecutor 接口,使得在 Spring 应用中更容易地配置和使用。示例:

1
2
3
4
5
6
7
8
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maximumPoolSize);
executor.setKeepAliveSeconds(keepAliveTime);
executor.setQueueCapacity(queueCapacity);
executor.initialize();

executor.execute(new RunnableTask());

ThreadPoolExecutor 是 Java 标准库提供的用于创建和管理线程池的类,更灵活但需要手动配置。ThreadPoolTaskExecutor 是 Spring 框架中对 ThreadPoolExecutor 的封装,提供更多集成和配置的便利。

3.3 ThreadPoolTaskExecutor 的优点

在Spring应用中,ThreadPoolTaskExecutor更常用,这是因为它是Spring框架对ThreadPoolExecutor的封装,提供了更多的功能和便利,同时也更容易与Spring的应用上下文集成。以下是一些使用ThreadPoolTaskExecutor的优点:

Spring集成:ThreadPoolTaskExecutor 是 Spring 框架的一部分,它与 Spring 的应用上下文无缝集成,可以通过配置文件或者 Java 代码进行初始化和管理。
配置选项丰富:ThreadPoolTaskExecutor 提供了丰富的配置选项,包括线程池的核心线程数、最大线程数、队列容量、线程存活时间等,使得配置更加灵活。
可监控和管理:由于与Spring集成,ThreadPoolTaskExecutor可以更容易地被监控和管理,例如通过Spring的管理界面或者其他监控工具。
简化配置:Spring提供了XML配置或者基于注解的方式来配置ThreadPoolTaskExecutor,使得配置变得简单和易于维护。

接下来将详细介绍 ThreadPoolTaskExecutor 线程池的参数和基本使用

4 线程池 ThreadPoolTaskExecutor 参数详解

线程池的性能和行为可以通过配置一系列参数来进行调整,以适应不同的应用场景和需求。接下来我们来解析线程池的各个参数,理解如何根据应用特点进行合理的配置。
我们使用情景代入的方式,来尝试更简单的去理解线程池的各参数。假设我们经营一家繁忙的餐厅,需要有效地管理顾客的用餐需求。在这个情境中,餐厅就相当于一个线程池,顾客的用餐需求就相当于任务,一名服务员只能同时服务一位顾客直到其用餐结束。

4.1 CorePoolSize:最小线程数

线程池中保持活动状态(长期存活)的最小线程数,即使没有任务需要执行。在业务中,应根据应用的并发需求,设置足够且适当的线程数,以保证核心任务可以得到及时处理。

情景:正式服务员是餐厅一直保留的服务员数量,无论是否有顾客需要服务。他们随时准备为顾客提供服务。
示例:你的餐厅始终雇佣了5名正式服务员,无论是否有顾客。这些服务员会一直待在餐厅,准备接待顾客。

4.2 MaxPoolSize:最大线程数

线程池允许创建的最大线程数,包括核心线程和非核心线程。应根据系统资源和任务负载,合理设置最大线程数,避免创建过多线程导致资源浪费。

情景:最多服务员数量是餐厅允许雇佣的最多服务员数量,包括正式服务员和临时服务员。如果顾客服务需求人数增加,你可以雇佣更多的临时服务员。
示例:虽然你有5名正式服务员,但如果突然有很多顾客涌入,你可以临时雇佣更多的服务员,但不会超过一个预定的上限,比如10名。

4.3 QueueCapacity:任务队列容量

存放等待执行的任务的队列容量。如果任务数超过线程池的最小线程数,超出的任务会被放入任务队列等待执行。应根据任务提交速率和线程处理速率,选择合适的队列容量。过小的容量可能导致任务被丢弃,过大的容量可能导致内存占用增加。

情景:等待区里等待服务的客人数量,超过正式服务员数的客人需要排队等待。
示例:餐厅的等待区有10个座位。如果现有5位顾客正在被5名正式服务员服务,再来的客人将会被安排在等待区等待。

4.4 KeepAliveSeconds:线程空闲时间

线程在空闲一段时间后会被销毁,以减少资源占用。应根据任务的性质,设置合理的空闲时间,避免频繁地创建和销毁线程。

情景:服务员在没有客人需要服务时保持工作状态的时间。
示例:服务员在没有客人需要服务时会保持工作状态5分钟,如果在这段时间内没有客人需要服务,临时服务员会被安排下班离开餐厅。

4.5 RejectedExecutionHandler:任务拒绝策略

当线程池饱和无法执行新的任务时,定义如何处理被拒绝的任务。应根据应用需求选择最合适的策略。例如 AbortPolicy、CallerRunsPolicy、DiscardPolicy 和 DiscardOldestPolicy 是常见的策略。

情景:当所有服务员都忙碌,而等待区已满时,需要决定如何处理新到来的客人。
示例:如果等待区已满且没有空闲服务员,拒绝策略可以是将新到来的客人告知他们需要等待,或者直接拒绝为他们提供服务。

4.6 WaitForTasksToCompleteOnShutdown:关闭等待任务完成

决定在关闭线程池时是否等待正在执行的任务完成。如果希望在关闭应用时确保任务都得到完成,设置为 true;否则,设置为 false,以快速关闭应用。

情景:在关闭餐厅之前,是否等待所有客人的服务完成。
示例:如果设置为true,餐厅在关闭前会等待所有客人的服务完成;如果设置为false,餐厅可能会立即关闭,尚未完成的客人服务可能会中断。

4.7 AwaitTerminationSeconds:等待任务完成时间

线程池关闭后等待任务完成的最大时间,防止应用被阻塞。需要确保等待时间足够长,以保证任务完成,但也不要设置得太长,以避免应用长时间无法关闭。

情景:在餐厅关闭后,服务员还可以继续服务的时间最长不超过该等待完成时间。
示例:如果通知十分钟后关闭餐厅,在下发关闭餐厅通知后,餐厅会等待最多10分钟,以确保所有服务员服务正常完成下班。

5 线程池拒绝策略

在线程池中,当任务提交超过线程池的最大容量,即当提交的任务数大于(queueCapacity.size() + maximumPoolSize ),就会触发线程池的拒绝策略。选择适当的拒绝策略对于应用的稳定性和性能至关重要。以下是常见的线程池拒绝策略,以及如何选择最适合您应用的方式。

5.1 AbortPolicy(默认策略)

当任务无法被线程池接受时,会抛出 RejectedExecutionException 异常,拒绝任务提交。适用于希望保证任务不会被丢弃,而是通知提交者任务无法执行的情况。适用于需要保障数据完整性的任务,如核心业务操作。

情景:如果餐厅已经满员,所有服务员都在为顾客服务,新到达的顾客可能会被告知餐厅已经满了,无法为他们提供服务。

5.2 CallerRunsPolicy(调用者执行策略)

当任务无法被线程池执行时,任务提交者线程会被调用来执行该任务。适用于短时任务的提交者能够承受执行任务的开销,避免抛弃任务,但可能会影响任务提交者的性能。

情景:如果餐厅已经很忙,没有多余的服务员,新的顾客会被告知暂时无法为他们提供服务,但他们可以选择自己去取餐,即自己执行任务。

5.3 DiscardPolicy(丢弃策略)

当任务无法被线程池执行时,任务将被丢弃,不做任何处理。适用于临时任务或日志记录等不影响核心业务的任务。如果任务丢失不会造成重大影响,可以选择此策略。

情景:如果餐厅已经满员,新的顾客到达时,餐厅可能会无法接待,但并不会告诉顾客,而是简单地忽略他们的到来。

5.4 DiscardOldestPolicy(丢弃最旧策略)

当任务无法被线程池执行时,会丢弃队列中最早的任务,然后尝试提交当前任务。适用于能够容忍一些任务丢失,但又不希望堆积太多任务在队列中的情况。保证新任务得到处理的同时,尽量减少任务堆积。

情景:如果餐厅已经很忙,有些顾客已经等待很长时间了,新的顾客到达时,餐厅可能会选择取消等待时间最长的顾客,以便为新顾客提供服务。

5.5 自定义策略

实现 RejectedExecutionHandler 接口,定义自己的拒绝策略。适用于特定业务需求,您可以根据任务的重要性和影响性,设计更复杂的拒绝策略,如将任务记录到日志、缓存中,或通过通知机制进行处理。
例如以下示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* @author -侑枫
* @date 2023/8/13 22:57:19
*/
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
// 在这里编写自定义拒绝策略逻辑
// 可以将被拒绝的任务记录到日志、缓存中,或通过通知机制进行处理
System.err.println("Task rejected: " + runnable.toString());
// 也可以抛出自定义异常来提供更多信息
throw new CustomRejectedTaskException("Task rejected: " + runnable.toString());
}
}

可以根据业务需求,将被拒绝的任务记录到日志、数据库或消息队列中,也可以发送通知给相关人员。自定义拒绝策略能够根据应用的特定需求,选择适合的处理方式,以确保任务的处理不会因为线程池饱和而受到影响。

6 示例和代码演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* @author -侑枫
* @date 2023/8/13 22:35:19
*/
public class ThreadPoolExample {
public static void main(String[] args) {
/**
* 创建线程池
* @Param CorePoolSize:核心线程2
* @Param MaxPoolSize:非核心线程4
* @Param QueueCapacity:任务队列容量10
* @Param KeepAliveSeconds:线程空闲时间10秒
* @Param RejectedExecutionHandler:拒绝策略 CallerRunsPolicy(调用者执行策略)
* @Param WaitForTasksToCompleteOnShutdown:等待线程任务执行完毕再关闭线程池
* @Param AwaitTerminationSeconds:线程池等待线程任务执行时间
*/
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
threadPool.setCorePoolSize(2);
threadPool.setMaxPoolSize(4);
threadPool.setQueueCapacity(10);
threadPool.setKeepAliveSeconds(10);
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPool.setWaitForTasksToCompleteOnShutdown(true);
threadPool.setAwaitTerminationSeconds(30);
// 线程池取名
threadPool.setThreadNamePrefix("MyThread-");
// 初始化线程池
threadPool.initialize();

// 提交任务到线程池
for (int i = 1; i <= 15; i++) {
final int taskId = i;
threadPool.execute(() -> {
System.out.println("Task " + taskId + " is being executed by " + Thread.currentThread().getName());
try {
// 模拟任务执行时间
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

// 关闭线程池
threadPool.shutdown();
}
}

执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
22:48:43.049 [main] DEBUG org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService
Task 1 is being executed by MyThread-1
Task 2 is being executed by MyThread-2
Task 13 is being executed by MyThread-3
Task 15 is being executed by main
Task 14 is being executed by MyThread-4
Task 3 is being executed by MyThread-4
22:48:46.095 [main] DEBUG org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Shutting down ExecutorService
Task 4 is being executed by MyThread-2
Task 6 is being executed by MyThread-1
Task 5 is being executed by MyThread-3
Task 8 is being executed by MyThread-2
Task 9 is being executed by MyThread-4
Task 7 is being executed by MyThread-3
Task 10 is being executed by MyThread-1
Task 12 is being executed by MyThread-3
Task 11 is being executed by MyThread-2

Process finished with exit code 0

通过观察任务执行的结果,从中体现出线程池参数的关系:

初始时,线程池会创建 CorePoolSize 个核心线程来执行任务。即线程 1 和 2(2 个核心线程).
如果任务数超过核心线程数,超出的任务会被放入任务队列等待执行。即任务 3-12(10 个任务队列)
如果任务队列也已满,且线程数未达到 MaxPoolSize,会创建非核心线程来处理任务。(2 个非核心线程上线,处理超出任务队列的任务 13,任务 14,而任务 15 则被拒绝策略的 CallerRunsPolicy——调用者执行策略所处理,即该调用者 main 方法线程自己执行。)
KeepAliveSeconds 定义了非核心线程的空闲时间,在空闲时间内,线程池会销毁多余的非核心线程。(销毁空闲时间超过10秒的非核心线程)
当线程池被关闭时,根据 WaitForTasksToCompleteOnShutdown 设置,线程池可能等待任务完成后再继续关闭。
如果等待时间超过 AwaitTerminationSeconds,线程池会强制关闭。

通过这个示例,我们可以更好地理解线程池参数的作用和他们执行顺序及关系。接下来使用 Spring Boot 实现 ThreadPoolTaskExecutor 多线程示例:
配置线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* @author -侑枫
* @date 2023/8/13 22:51:06
*/
@EnableAsync
@Configuration
public class ThreadPoolConfig {
@Bean("customThreadPool")
public ThreadPoolTaskExecutor customThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(10);
executor.setKeepAliveSeconds(60);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(30);
return executor;
}
}

Service 接口:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* @author -侑枫
* @date 2023/8/13 22:52:02
*/
public interface TaskService {
/**
* 测试方法
*
* @param taskId 任务id
*/
void processTask(int taskId);
}

Service 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* @author -侑枫
* @date 2023/8/13 22:54:36
*/
@Service
public class TaskServiceImpl implements TaskService {
@Override
@Async("customThreadPool")
public void processTask(int taskId) {
System.out.println("Task " + taskId + " is being executed by " + Thread.currentThread().getName());
try {
// 模拟任务执行时间
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Controller 层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* @author -侑枫
* @date 2023/8/13 22:59:40
*/
@RestController
@RequestMapping("/tasks")
public class TaskController {
@Autowired
private TaskService taskService;

@GetMapping("/submit")
public R submitTasks() {
for (int i = 1; i <= 15; i++) {
taskService.processTask(i);
}
return Result.ok();
}
}

代码执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Task 1 is being executed by customThreadPool-1
Task 2 is being executed by customThreadPool-2
Task 3 is being executed by customThreadPool-3
Task 14 is being executed by customThreadPool-4
Task 15 is being executed by customThreadPool-5
Task 4 is being executed by customThreadPool-3
Task 8 is being executed by customThreadPool-1
Task 7 is being executed by customThreadPool-5
Task 6 is being executed by customThreadPool-4
Task 5 is being executed by customThreadPool-2
Task 9 is being executed by customThreadPool-5
Task 13 is being executed by customThreadPool-3
Task 12 is being executed by customThreadPool-4
Task 11 is being executed by customThreadPool-2
Task 10 is being executed by customThreadPool-1
流程:任务 1、2、3 由核心线程(CorePoolSize) 1、2、3 执行,任务 14、15 在任务队列(QueueCapacity)被任务 4-13 填充满后到来,非核心线程(MaxPoolSize-CorePoolSize) 4、5 上线,处理任务 14、15,此时达到最大核心数 MaxPoolSize,不再生产新的非核心线程。随后任务相继执行完毕,线程空闲出来,根据先进先出原则处理任务队列内的任务,直到非核心线程空闲时间超过 60 秒(KeepAliveSeconds),非核心任务被销毁。

结语

看到这里,想必读者对线程池 ThreadPoolTaskExecutor 基本工作原理、拒绝策略含义、参数含义及关系都有了更为深刻的理解。总而言之,线程池是提高多线程应用性能和可维护性的关键工具,深入理解其原理和使用方法,将在开发高性能的应用程序中带来巨大的帮助。同时,要注意线程安全性、异常处理以及关闭线程池时的等待任务完成,以保证应用的稳定性和可靠性。

最后附上本文所写源代码:理解线程池 ThreadPoolExecutor 参数和任务拒绝策略

  • 标题: 理解线程池 ThreadPoolExecutor 参数和任务拒绝策略
  • 作者: HYF
  • 创建于 : 2023-08-13 23:52:27
  • 更新于 : 2024-07-27 21:21:52
  • 链接: https://youfeng.ink/ThreadPoolTaskExecutor-936849a670fc/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。