线程池框架Executor

线程double

Executor 框架是一个根据一组执行策略调用、调度,执行和控制的异步任务框架,目的是提供一种将 “任务提交” 与 “任务如何运行” 分离开来的机制。

为什么需要线程池

Java线程池详解

  • 重用线程池中的线程,减少因对象创建,销毁所带来的性能开销;
  • 能有效的控制线程的最大并发数,提高系统资源利用率,同时避免过多的资源竞争,避免堵塞;
  • 能够多线程进行简单的管理,使线程的使用简单、高效;

线程池的状态

  • RUNNING: 能接受新提交的任务,并且也能处理阻塞队列中的任务
  • SHUTDOWN: 不在接受新提交的任务,但可以处理存量任务
  • STOP: 不在接受新提交的任务,也不处理存量任务
  • TIDYING: 所有的任务都已终止
  • TERMINATED: terminated() 方法执行完后加入该状态
状态转换图

状态转换图.jpg

线程池的生命周期

线程池生命周期.jpg

线程池的大小如何选定

  • CPU密集型:线程数 = 按照核数或者核数 + 1 设定
  • I/O密集型:线程数 = CPU核数 * (1 + 平均等待时间/平均工作时间)

Executor 核心 API 概述:

ThreadPoolExecutor

  • Executor(接口类): 运行任务的简单接口;
  • ExecutorService(接口类): 扩展了 Executor 接口,扩展能力:
    • 支持有返回值的线程;
    • 支持管理线程的生命周期;
  • AbstractExecutorService(抽象类): ExecutorService 接口的默认实现;
  • ThreadPoolExecutor(实现类): AbstractExecutorService 抽象类的实现,是 Executor 框架最核心的类;

ScheduledThreadPoolExecutor

  • ScheduledExecutorService(接口类): 扩展了 ExecutorService 接口,支持定期执行任务。
  • ScheduledThreadPoolExecutor(继承 ThreadPoolExecutor 类,并实现 ScheduledExecutorService 接口): 一个可定时调度任务的线程池。

静态方法

  • Executors: 可以通过调用 Executors 的静态工厂方法来创建线程池并返回一个 ExecutorService 对象。

Executor UML图

Executor: 所有线程池的接口,只有一个方法。

public interface Executor { void execute(Runnable command); }

ExecutorService: 增加 Executor 的行为,是 Executor 实现类的最直接接口。

Executors: 创建不同的线程池满足不同场景的需求,返回的线程池都实现了 ExecutorService 接口。

  1. 1)、newFixedThreadPool(int n Threads):指定工作线程数量的线程池
  2. 2)、newCachedThreadPool(): 处理大量短时间工作任务的线程池
    1. 试图缓存线程并重用,当无缓存线程可用时,就会创建新的工作线程;
    2. 如果线程闲置的时间超过阈值,则会被终止并移除缓存;
    3. 系统长时间闲置的时候,不会消耗什么资源;
  3. 3)、newSingleThreadExecutor():创建唯一的工作者线程来执行任务,如果线程异常结束,会有另外一个线程取代它
  4. 4)、newSingleThreadScheduledExecutor()与newScheduledThreadPool(int corePoolSize):定时或者周期性的工作调度,两者的区别在于单一工作线程还是多个线程。
  5. 5)、newWorkStealingPool():内部会构建 ForkJoinPool,利用 working-stealing 算法(某个线程从其他队列里窃取任务来执行),并行地处理任务,不保证处理顺序。

J.U.C的三个 Executor 接口

  • Executor: 运行新任务的简单接口,将任务提交和任务执行细节解耦。
  • ExecutorService: 具备管理执行器和任务生命周期的方法,提交任务机制更完善。
  • ScheduledExecutorService: 支持 Future 和定期执行任务。

ThreadPoolExecutor: 线程池的具体实现类,一般用的各种线程池都是基于这个类实现的。

构造方法如下:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
  • corePoolSize: 线程池的核心线程数,线程池中运行的线程数永远不会超过 corePoolSize 个, 默认情况下也可以一直存活。可以通过设置 allowCoreThreadTimeOut 为 true, 此时 核心线程数就是 0, 此时 keepAliveTime 控制所有线程的超时时间。
  • maximumPoolSize: 线程池允许的最大线程数;
  • keepAliveTime: 指的是空闲线程结束的超时时间;
  • unit: 是一个枚举, 表示 keepAliveTime 的单位;
  • workQueue: 表示存放任务的 BlockingQueue 队列;
  • BlockingQueue: 阻塞队列(BlockingQueue) 是 java.util.concurrent 下的主要用来控制线程同步的工具, 如果 BlockingQueue 是空的,从 BlockingQueue 取东西的操作将会被阻断进入等待状态,直到 BlockingQueue 进了东西才会被唤醒。同样,如果 BlockingQueue 是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到 BlockingQueue 里有空间才会被唤醒继续操作。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素,具体的实现类有 LinkedBlockingQueueArrayBlockingQueued等。一般其内部的都是通过 Lock 和 Condition(显示锁 Look 及 Condition 的学习与使用)来实现阻塞和唤醒。
线程池的工作过程:
  • 1)、线程池刚创建时,里面没有一个线程。任务队列是作为参数传递进来的,不过,就算队列里面有任务,线程池也不会马上执行他们。
  • 2)、当调用 execute() 方法添加一个任务时,线程池会做如下判断:
    • 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
    • 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
    • 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize, 那么还是要创建非核心线程立刻运行这个任务;
    • 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize, 那么线程池会抛出异常 RejectExecutionException;
  • 3)、当一个线程完成任务时,它会从队列中取下一个任务来执行;
  • 4)、当一个线程无事可做, 超过一定的时间 (KeepAliveTime) 时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

查看线程池工作UML图

线程池的创建和使用

生成线程池采用了工具类 Exceutors 的静态方法,以下是几种常见的线程池。

SingleThreadExector:

单个后台线程(其缓冲队列是无界的)

为什么阿里巴巴要禁用 Executors 创建线程池?

  • FixedThreadPool 和 SingleThreadExecutor => 允许的请求队列长度为 Integer.MAX_VALUE, 可能会堆积大量的请求,从而引起 OOM 异常。
  • CachedThreadPool => 允许创建的线程数为 Integer.MAX_VALUE, 可能会创建大量的线程,从而引起 OOM 异常。

更多详情

参考文章

Java 中的多线程你只要看这一篇就够了
从使用到原理学习Java线程池
从源码的角度解析线程池运行原理
Java线程池实现原理及其在美团业务中的实践
面试官:这道线程池场景题回答一下?