介绍
ThreadPoolExecutor是Executors工具类的一部分功能,另一部分是ScheduledThreadPoolExecutor的实现。ScheduledThreadPoolExecutor继承了ThreadPoolExecutor并实现了ScheduledExecutorService接口。线程池队列是DelayedWorkQueue,其和DelayedQueue类似,是一个延迟队列。
类图介绍
ScheduleFutureTask是具有返回值的任务,继承自FutureTask。FutureTask的内部有一个变量state用来表示任务的状态,一开始状态为NEW,所有状态为:
1 | private static final int NEW = 0; // 初始状态 |
可能的任务状态路径为:
- NEW->COMPLETING->NORMAL 初始状态->执行中->正常结束
- NEW->COMPLETING->EXCEPTIONAL 初始状态->执行中->执行异常
- NEW->CANCELLED 初始状态->任务取消
- NEW->INTERRUPTING->INTERRUPTED 初始状态->被中断中->被中断
ScheduleFutureTask内部还有一个变量period用来表示任务的类型,任务类型如下:
- period=0,说明当前任务是一次性的,执行完就退出了
- period为负数,说明当前任务为fixed-delay任务,是固定延迟的定时可重复执行任务
- period为正数,说明当前任务为fixed-rate任务,是固定频率的定时可重复执行任务
源码分析
ScheduledThreadPoolExecutor的构造函数:
1 | public ScheduledThreadPoolExecutor(int corePoolSize) { |
由构造函数可知,线程池队列是DelayedWorkQueue。
1. schedule
该方法的作用是提交一个延迟执行的任务,任务从提交时间算起延迟单位为unit的delay时间后开始执行。提交的任务不是周期性任务,任务只会执行一次。
1 | public ScheduledFuture<?> schedule(Runnable command, |
代码(2)装饰任务,把提交的command任务转换为ScheduledFutureTask。ScheduledFutureTask是具体放入延迟队列里面的东西。由于是延迟任务,所以ScheduledFutureTask实现了long getDelay(TimeUnit unit)和int compareTo(Delayed other)方法。triggerTime方法将延迟时间转换为绝对时间,也就是把当前时间的纳秒数加上延迟的纳秒数后的long型值。ScheduledFutureTask的构造函数:
1 | ScheduledFutureTask(Runnable r, V result, long ns) { |
父类FutureTask的构造函数:
1 | public FutureTask(Runnable runnable, V result) { |
long getDelay(TimeUnit unit)方法如下:用来计算当前任务还有多少时间就过期了。
1 | public long getDelay(TimeUnit unit) { |
int compareTo(Delayed other)方法如下:
1 | public int compareTo(Delayed other) { |
compareTo的作用是加入元素到延迟队列后,在内部建立或者调整堆时会使用该元素的compareTo方法与队列里面其他元素进行比较,让最快要过期的元素放到对首。所以无论什么时候向队列里面添加元素,队首的元素都是最快要过期的元素。
将任务添加到延迟队列,delayedExecute方法如下:
1 | private void delayedExecute(RunnableScheduledFuture<?> task) { |
如果代码(6)判断结果为false,则会执行代码(7)确保至少有一个线程在处理任务,即使核心线程数corePoolSize被设置为0。ensurePrestart代码如下:
1 | void ensurePrestart() { |
下面看线程池里面的线程如何获取并执行执行任务。ScheduledFutureTask的run方法如下:
1 | public void run() { |
由于periodic的值为false,所以执行代码(10)调用父类FutureTask的run方法:
1 | public void run() { |
set方法如下:
1 | protected void set(V v) { |
setException方法如下:
1 | protected void setException(Throwable t) { |
2. scheduleWithFixedDelay
该方法的作用是当任务执行完毕后,让其延迟固定时间后再次运行(fixed-delay任务)。
1 | public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, |
这里会执行到代码(11),runAndReset方法如下:
1 | protected boolean runAndReset() { |
任务正常执行完毕后不会设置任务的状态,这样做是为了让任务成为可重复执行的任务。代码(19)判断如果当前任务正常执行完毕并且任务状态为NEW则返回true,否则返回false。如果返回了true则执行代码(11.1)的setNextRunTime方法设置该任务下一次的执行时间:
1 | private void setNextRunTime() { |
总结:fixed-delay类型的任务的执行原理为:当添加一个任务到延迟队列后,等待initalDelay时间,任务就会过期,过期的任务就会被从队列移除,并执行。执行完毕后,会重新设置任务的延迟时间,然后再把任务放入到延迟队列,循环往复。如果一个任务在执行中抛出了异常,那么这个任务就结束了,不会影响其他任务的执行。
3. scheduleAtFixedRate
该方法相对起始时间以固定频率调用指定的任务(fixed-rate任务)。
1 | public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, |
在讲fixed-rate类型的任务command转换为ScheduledFutureTask时设置period=period,不再是-period。所以当前任务执行完毕后,调用setNextRunTime设置任务下次执行的时间时执行的是time+=p而不再是time=triggerTime(-p)。
总结:fixed-rate方式执行规则为,时间为initdelday+n*period时启动任务,但是如果当前任务还没有完,下一次要执行的任务的时间到了,则不会并发执行,下次要执行的任务会延迟执行,要等到当前任务执行完毕后再执行。
总结
ScheduledThreadPoolExecutor其内部使用DelayQueue来存放具体任务。任务分三种使用period的值来区分:
- 一次性执行任务执行完毕就结束了;
- fixed-delay任务保证同一个任务在多次执行之间间隔固定时间;
- fixed-rate任务保证按照固定的频率执行。