一起来写线程池(二)——实现拥有定时任务、周期任务线程池

正文索引 [隐藏]

写在前面:
本篇博客是仿照了JDK1.8线程池的实现,如对JDK1.8线程池很熟悉。
那么,,,,,就可以离开本页面啦(✪ω✪)
阅读该篇博客需要阅读:一起来写线程池(一)——实现基础线程池
点击下载本篇博客源码

ScheduledExecutor线程池在基础线程池上实现了定时任务、周期任务以及定时周期任务,其关键是在普通的任务队列上扩展实现延迟任务队列。

延迟任务队列

延迟任务队列本质还是一个阻塞队列,只不过要遵循以下几个制约:

  1. 进入延迟队列的任务时按其定义好的发生时间排序,发生时间前的排列在前面。这条制约则规定了进入该队列的任务时必须实现比较接口的;
  2. 到任务发生的时间之前,延迟队列会一直阻塞;
  3. 后进入的任务,依旧按时间先后排序

对于第一条与第三条制约,我们应当想到可以利用J.U.C包下的PriorityBlockingQueue优先阻塞队列来实现,而对于第二条,我们可以在任务发生的时间之前一直检测任务是否可以发生,如果不发生,则一直死循环,发生则返回该任务执行,其实这就是定时任务的原理,而周期任务的原理其实是获取到该任务后,再次计算时间,将该任务加入延迟队列再次等待发生,延迟队列源码如下:

/**
 * Created with IntelliJ IDEA.
 * @author: Suyeq
 * @date: 2019-03-18
 * @time: 9:03
 */
public class DelayWorkQueue implements BlockingQueue<Runnable> {

    private volatile int size=0;

    private BlockingQueue<ScheduledFutureTask> taskqueue=new PriorityBlockingQueue<ScheduledFutureTask>();

    private Lock lock=new ReentrantLock();

    private Condition notice=lock.newCondition();


    @Override
    public boolean add(Runnable e) {
        return offer(e);
    }

    @Override
    public boolean offer(Runnable e) {
        if (e==null){
            throw new NullPointerException();
        }
        ScheduledFutureTask task= (ScheduledFutureTask) e;
        taskqueue.offer(task);
        lock.lock();
        size++;
        lock.unlock();
        return true;
    }

    /**
     * 锁粒度必须尽可能的小
     * 否则会影响到定时的精度
     * @return
     * @throws InterruptedException
     */
    @Override
    public Runnable take() throws InterruptedException {
        try{
            while(true){
                lock.lock();
                Runnable task=taskqueue.peek();
                if (task==null){
                    System.out.println("取得任务失败,优先队列为null");
                    return null;
                }
                long delay=((ScheduledFutureTask) task).getDelay(TimeUnit.NANOSECONDS);
                if (delay<=0){
                    return finishTake((ScheduledFutureTask)task);
                }
                lock.unlock();
            }
        }finally {
            lock.unlock();
        }
    }

    /**
     * 判断是否需要周期执行
     * 是,则将定时重置,取出任务再次加入优先队列重新排序
     * 否则直接返回
     * @param task
     * @return
     * @throws InterruptedException
     */
    public Runnable finishTake(ScheduledFutureTask task) throws InterruptedException {
        if (task.isPeriodic()){
            task.calculateNextDelay();
            Runnable newTask=taskqueue.take();
            System.out.println(((ScheduledFutureTask) newTask).getDelay(TimeUnit.SECONDS));
            taskqueue.offer((ScheduledFutureTask)newTask);
            return newTask;
        }
        size--;
        return taskqueue.take();
    }
    //......
}

定时任务、周期任务

在定时任务与周期任务这一块,普通的实现Runnable接口的任务已经不能满足需求了,从上述代码中可以看出,在这里自己实现了ScheduledFutureTask类来作为新的提交任务,对新的任务类提出新的几点需求:

  • 必须实现Comparable或者Comparator接口来按时间先后实现比较排序;
  • 必须拥有一个时间字段,让第一点需求得以实现;
  • 为了保证简用性,必须实现Runnable接口;
  • 可以在任务执行完成后返回某些数据;

对于1、2、3条需求,如果阅读过源码,其实只需要实现RunnableScheduledFuture接口便可以了,因为它继承了满足以上三点需求的接口,而对于第四点需求,我们在上篇博客一起来写线程池(一)——实现基础线程池利用FutureTask类来封装run方法来实现,但是jdk的源码中并没有为具有延迟属性的任务实现新的FutureTask类,虽然定时任务,也可以通过封装其run方法来实现,但是周期任务却不能(只能获取到一次返回值,不能再次获取)。

查阅源码后得知,其实jdk1.8中实现了新的FutureTask类,只不过是内部类,没有公开,其继承了FutureTask类,利用了FutureTask内部的runSet方法来实行状态重置,以此来达到可以周期的执行任务并返回某些数据的,如果对FutureTask类如何执行不清楚的同学可以看看这篇博客:Java并发——FutureTask类源码解析。具体代码如下:

/**
 * Created with IntelliJ IDEA.
 *
 * @author: Suyeq
 * @date: 2019-03-17
 * @time: 16:26
 */
public class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {

    private long delayTimes;

    private final long periodic;

    private final long cycleTimes;

    private final TimeUnit unit;

    //private final int sortIndex;

    public ScheduledFutureTask(Runnable task,long ns,TimeUnit unit) {
        super(task,null);
        this.periodic=0;
        this.cycleTimes=0;
        this.unit=unit;
        this.delayTimes=TimeUnit.NANOSECONDS.convert(ns,unit)+now();
        //this.sortIndex=sortIndex;
    }

    public ScheduledFutureTask(Runnable task,long ns,long cycleTimes,TimeUnit unit) {
        super(task,null);
        this.periodic=1;
        this.cycleTimes=cycleTimes;
        this.unit=unit;
        this.delayTimes=TimeUnit.NANOSECONDS.convert(cycleTimes,unit)+now();
        //this.sortIndex=sortIndex;
    }

    public ScheduledFutureTask(Callable task,long ns,TimeUnit unit) {
        super(task);
        this.periodic=0;
        this.cycleTimes=0;
        this.unit=unit;
        this.delayTimes=TimeUnit.NANOSECONDS.convert(ns,unit)+now();
        //this.sortIndex=sortIndex;
    }

    @Override
    public void run(){
        boolean isPeriodic=isPeriodic();
        if (!isPeriodic){
            //执行定时任务
            super.run();
        }else if (isPeriodic){
            //执行周期性任务
            super.runAndReset();
        }
    }

    /**
     * 判断有无周期性
     * @return
     */
    @Override
    public boolean isPeriodic() {
        return this.periodic!=0;
    }

    /**
     * 计算周期任务下次执行时间
     */
    public void calculateNextDelay(){
       this.delayTimes=now()+TimeUnit.NANOSECONDS.convert(cycleTimes,this.unit);
    }

    /**
     * 按纳秒的时间距离任务执行还剩余的时间
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(delayTimes-now()+1000*1000,TimeUnit.NANOSECONDS);
    }

    /**
     * 返回0表示延迟相等
     * -1表示该延迟大于当前Future延迟
     * 1表示延迟小于
     * -2表示该延迟是自身或者
     * 传入的不是ScheduledFutureTask对象
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        if (o==this){
            return 0;
        }
        if (o instanceof ScheduledFuture){
            ScheduledFuture scheduledFuture=(ScheduledFuture)o;
            long timeSub=scheduledFuture.getDelay(TimeUnit.NANOSECONDS)-this.getDelay(TimeUnit.NANOSECONDS);
            if (timeSub>0){
                return -1;
            }else if (timeSub==0){
                return 0;
            }else {
                return 1;
            }
        }
        return -2;
    }

    /**
     * 以纳秒的方式返回当前时间
     * @return
     */
    public long now(){
        return System.nanoTime();
    }
}

提交任务

我们只需要让自己实现的ScheduledExecutor线程池继承基础线程池就行了,因为大部分的功能基础线程池便已实现了,只需要注意的是提交任务的过程与基础线程池有所不同:

  • 先插入任务队列,在创建新的工作者线程;
  • 工作者线程小于核心线程数时创建,否则不创建,因为超过了核心线程数会影响到定时任务以及周期任务的精度;

具体实现如下:

 /**
 * Created with IntelliJ IDEA.
 *
 * @author: Suyeq
 * @date: 2019-03-18
 * @time: 9:02
 */
public class SuyeScheduleThreadPool extends SuyeThreadPool implements ScheduledExecutorService {

    public SuyeScheduleThreadPool(int core){
        super(core,new DelayWorkQueue());
    }

    /**
     * 定时任务提交
     * @param task
     * @param time
     * @param unit
     */
    @Override
    public void schedule(Runnable task, long time, TimeUnit unit) {
        super.getBlockQueue().offer(new ScheduledFutureTask<Void>(task,time,unit));
        executeDelay();
    }

    /**
     * 定时执行
     * @return
     */
    private boolean executeDelay(){
        int poolState=suyeThreadPoolState.getPoolState();
        int workThreadSize=suyeThreadPoolState.getWorkThreadSize();
        if (poolState<suyeThreadPoolState.StopState() && workThreadSize<bestPoolThreadSize){
            return super.addWorkThread(null);
        }
        return false;
    }

    /**
     * 定时结果任务提交
     * @param task
     * @param time
     * @param unit
     * @param result
     * @param <T>
     * @return
     */
    @Override
    public <T> Future<T> schedule(Runnable task, long time, TimeUnit unit, final T result) {
        Callable<T> callable= Executors.callable(task,result);
        Future<T> future=new ScheduledFutureTask<T>(callable,time,unit);
        super.getBlockQueue().offer(future);
        executeDelay();
        return future;
    }

    /**
     * 周期任务
     * @param task
     * @param time
     * @param unit
     */
    @Override
    public void scheduleAtFixedRate(Runnable task, long time, TimeUnit unit) {
        super.getBlockQueue().offer(new ScheduledFutureTask<Void>(task,0,time,unit));
        executeDelay();
    }
}

整个实现已经完成,需要完整代码的同学可以去点击下载本篇博客源码