Callable、Future与线程池

在创建新线程的三种方式中,继承Thread实现Runnable接口两种方式都都没有返回值,因此当我们想要获取子线程计算结果时只能设置共享数据,同时还需要考虑同步的问题,比较麻烦。而Callable接口就是解决这个问题的存在。

Callable

Callable和Runnable类似,都是只有一个方法的标志性接口V call()只不过Callable是有返回值的,并且声明了异常Expection,即当计算正常进行则返回v,计算出错则抛出异常。
单独一个Callable并没有什么可说的,该接口一般都是配合Future接口进行使用。

Future接口

Future是对Callable任务进行处理的接口,里面有对任务操作的方法:

//获取结果,若无结果会阻塞至异步计算完成
V get()
//获取结果,超时返回null
V get(long timeOut, TimeUnit unit)
//执行结束(完成/取消/异常)返回true
boolean isDone()
//任务完成前被取消返回true
boolean isCancelled()
//取消任务,未开始或已完成返回false,参数表示是否中断执行中的线程
boolean cancel(boolean mayInterruptRunning)
其中,对于boolean cancel(boolean mayInterruptRunning)方法的参数:
简单来说,传入false参数只能取消还没有开始的任务,若任务已经开始了,就任由其运行下去。
当创建了Future实例,任务可能有以下三种状态:
等待状态。此时调用cancel()方法不管传入true还是false都会标记为取消,任务依然保存在任务队列中,但当轮到此任务运行时会直接跳过。
完成状态。此时cancel()不会起任何作用,因为任务已经完成了。
运行中。此时传入true会中断正在执行的任务,传入false则不会中断。

Future的实现子类为FutureTask<V>,该类即实现了Future接口,也实现了Runnable接口,因此Callable可配合FutureTask使用:

Callable<Integer> c = ()->{
    Thread.sleep(3000);//模拟计算
    return 100;//返回计算结果
};
//实例化FutureTask,注意这里不能使用Future的多态形式,因为只有FutureTask实现了Runnable接口
FutureTask<Integer> ft = new FutureTask<>(c);
//启动线程
new Thread(ft).start();
//获取计算结果,注意这里会阻塞
System.out.println(ft.get());

FutureTask提供了两种构造方法:

//上例使用的就是这个,参数为Callable
public FutureTask(Callable<V> callable) {
}
//当Runnable执行成功时返回result(这有个毛用啊。。。)
public FutureTask(Runnable runnable, V result) {
}

FutureTask可以很方便的启动线程。

线程池

除了使用FutureTask,还可以使用Callable + Future + 线程池的方式执行Callable:

Callable<Integer> c = ()->{
    Thread.sleep(3000);
    return 100;
};
//构建定长线程池
ExecutorService service = Executors.newFixedThreadPool(10);
//在线程池中提交Callable时会返回Future对象
Future<Integer> future = service.submit(c);
System.out.println(future.get());

上例的线程池创建方式只是为了方便,在阿里开发手册中要求:

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

对于Executors提供的几种线程池分别为:

newSingleThreadExecutor
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。
此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

new ThreadPoolExecutor(1, 1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())

newFixedThreadPool
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

newCachedThreadPool
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。
此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());

newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。除了延迟执行之外和newFixedThreadPool基本相同,可以用来执行定时任务

上面四种方式只是比较方便,阿里开发手册要求不能使用这些方式,那么来看看阿里要求的线程池方式:

public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler
) 

参数分别为:

  • corePoolSize - 线程池核心池的大小。
  • maximumPoolSize - 线程池的最大线程数。
  • keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
  • unit - keepAliveTime 的时间单位。
  • workQueue - 用来储存等待执行任务的队列。
  • threadFactory - 线程工厂。
  • handler - 拒绝策略
关注点1 线程池大小
线程池有两个线程数的设置,一个为核心池线程数,一个为最大线程数。
在创建了线程池后,默认情况下,线程池中并没有任何线程,等到有任务来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法
当创建的线程数等于 corePoolSize 时,会加入设置的阻塞队列。当队列满时,会创建线程执行任务直到线程池中的数量等于maximumPoolSize。
关注点2 适当的阻塞队列
java.lang.IllegalStateException: Queue full
方法 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek()
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue: 一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue: 一个不存储元素的阻塞队列。
LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。
关注点3 明确拒绝策略
ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出
RejectedExecutionException异常。 (默认)
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

参考

标签: Java, 多线程

添加新评论