面试官:你分析过线程池源码吗?

作者 : 开心源码 本文共8881个字,预计阅读时间需要23分钟 发布时间: 2022-05-12 共246人阅读

线程池源码也是面试经常被提问到的点,我会将全局源码做一分析,而后告诉你面试考啥,怎样答。

为什么要用线程池?

简洁的答两点就行。

  1. 降低系统资源消耗。

  2. 提高线程可控性。

如何创立使用线程池?

JDK8提供了五种创立线程池的方法:

1.创立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

public static ExecutorService newFixedThreadPool(int nThreads) {    return new ThreadPoolExecutor(nThreads, nThreads,                                  0L, TimeUnit.MILLISECONDS,                                  new LinkedBlockingQueue<Runnable>());}

2.(JDK8新添加)会根据所需的并发数来动态创立和关闭线程。能够正当的使用CPU进行对任务进行并发操作,所以适合使用在很耗时的任务。

注意返回的是ForkJoinPool对象。

public static ExecutorService newWorkStealingPool(int parallelism) {    return new ForkJoinPool        (parallelism,         ForkJoinPool.defaultForkJoinWorkerThreadFactory,         null, true);}

什么是ForkJoinPool:

public ForkJoinPool(int parallelism,                        ForkJoinWorkerThreadFactory factory,                        UncaughtExceptionHandler handler,                        boolean asyncMode) {        this(checkParallelism(parallelism),             checkFactory(factory),             handler,             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,             "ForkJoinPool-" + nextPoolId() + "-worker-");        checkPermission();    }

使用一个无限队列来保存需要执行的任务,可以传入线程的数量;不传入,则默认使用当前计算机中可用的cpu数量;使用分治法来处理问题,使用fork()和join()来进行调用。

3.创立一个可缓存的线程池,可灵活回收空闲线程,若无可回收,则新建线程。

public static ExecutorService newCachedThreadPool() {    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                  60L, TimeUnit.SECONDS,                                  new SynchronousQueue<Runnable>());}

4.创立一个单线程的线程池。

public static ExecutorService newSingleThreadExecutor() {    return new FinalizableDelegatedExecutorService        (new ThreadPoolExecutor(1, 1,                                0L, TimeUnit.MILLISECONDS,                                new LinkedBlockingQueue<Runnable>()));}

5.创立一个定长线程池,支持定时及周期性任务执行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {    return new ScheduledThreadPoolExecutor(corePoolSize);}

上层源码结构分析

Executor结构:

Executor

一个运行新任务的简单接口

public interface Executor {    void execute(Runnable command);}

ExecutorService

扩展了Executor接口。增加了少量用来管理执行器生命周期和任务生命周期的方法

AbstractExecutorService

对ExecutorService接口的笼统类实现。不是我们分析的重点。

ThreadPoolExecutor

Java线程池的核心实现。

ThreadPoolExecutor源码分析

属性解释

// AtomicInteger是原子类  ctlOf()返回值为RUNNING;private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 高3位表示线程状态private static final int COUNT_BITS = Integer.SIZE - 3;// 低29位表示workerCount容量private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits// 能接收任务且能解决阻塞队列中的任务private static final int RUNNING    = -1 << COUNT_BITS;// 不能接收新任务,但可以解决队列中的任务。private static final int SHUTDOWN   =  0 << COUNT_BITS;// 不接收新任务,不解决队列任务。private static final int STOP       =  1 << COUNT_BITS;// 所有任务都终止private static final int TIDYING    =  2 << COUNT_BITS;// 什么都不做private static final int TERMINATED =  3 << COUNT_BITS;// 存放任务的阻塞队列private final BlockingQueue<Runnable> workQueue;

值的注意的是状态值越大线程越不活跃。

线程池状态的转换模型:

构造器

public ThreadPoolExecutor(int corePoolSize,//线程池初始启动时线程的数量                          int maximumPoolSize,//最大线程数量                          long keepAliveTime,//空闲线程多久关闭?                          TimeUnit unit,// 计时单位                          BlockingQueue<Runnable> workQueue,//放任务的阻塞队列                          ThreadFactory threadFactory,//线程工厂                          RejectedExecutionHandler handler// 拒绝策略) {    if (corePoolSize < 0 ||        maximumPoolSize <= 0 ||        maximumPoolSize < corePoolSize ||        keepAliveTime < 0)        throw new IllegalArgumentException();    if (workQueue == null || threadFactory == null || handler == null)        throw new NullPointerException();    this.acc = System.getSecurityManager() == null ?            null :            AccessController.getContext();    this.corePoolSize = corePoolSize;    this.maximumPoolSize = maximumPoolSize;    this.workQueue = workQueue;    this.keepAliveTime = unit.toNanos(keepAliveTime);    this.threadFactory = threadFactory;    this.handler = handler;}

在向线程池提交任务时,会通过两个方法:execute和submit。

本文着重讲解execute方法。submit方法放在下次和Future、Callable一起分析。

execute方法:

public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    // clt记录着runState和workerCount    int c = ctl.get();    //workerCountOf方法取出低29位的值,表示当前活动的线程数    //而后拿线程数和 核心线程数做比较    if (workerCountOf(c) < corePoolSize) {        // 假如活动线程数<核心线程数        // 增加到        //addWorker中的第二个参数表示限制增加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断        if (addWorker(command, true))            // 假如成功则返回            return;        // 假如失败则重新获取 runState和 workerCount        c = ctl.get();    }    // 假如当前线程池是运行状态并且任务增加到队列成功    if (isRunning(c) && workQueue.offer(command)) {        // 重新获取 runState和 workerCount        int recheck = ctl.get();        // 假如不是运行状态并且         if (! isRunning(recheck) && remove(command))            reject(command);        else if (workerCountOf(recheck) == 0)            //第一个参数为null,表示在线程池中创立一个线程,但不去启动            // 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize            addWorker(null, false);    }    //再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize    else if (!addWorker(command, false))        //假如失败则拒绝该任务        reject(command);}

总结一下它的工作流程:

  1. workerCount &lt; corePoolSize,创立线程执行任务。

  2. workerCount &gt;= corePoolSize&&阻塞队列workQueue未满,把新的任务放入阻塞队列。

  3. workQueue已满,并且workerCount &gt;= corePoolSize,并且workerCount &lt; maximumPoolSize,创立线程执行任务。

  4. 当workQueue已满,workerCount &gt;= maximumPoolSize,采取拒绝策略,默认拒绝策略是直接抛异常。

通过上面的execute方法可以看到,最主要的逻辑还是在addWorker方法中实现的,那我们就看下这个方法:

addWorker方法

主要工作是在线程池中创立一个新的线程并执行

参数定义:

  • firstTask the task the new thread should run first (or null if none). (指定新添加线程执行的第一个任务或者者不执行任务)

  • core if true use corePoolSize as bound, else maximumPoolSize.(core假如为true则使用corePoolSize绑定,否则为maximumPoolSize。 (此处使用布尔指示符而不是值,以确保在检查其余状态后读取新值)。)

private boolean addWorker(Runnable firstTask, boolean core) {    retry:    for (;;) {        int c = ctl.get();        //  获取运行状态        int rs = runStateOf(c);        // Check if queue empty only if necessary.        // 假如状态值 >= SHUTDOWN (不接新任务&不解决队列任务)        // 并且 假如 !(rs为SHUTDOWN 且 firsTask为空 且 阻塞队列不为空)        if (rs >= SHUTDOWN &&            ! (rs == SHUTDOWN &&               firstTask == null &&               ! workQueue.isEmpty()))            // 返回false            return false;        for (;;) {            //获取线程数wc            int wc = workerCountOf(c);            // 假如wc大与容量 || core假如为true表示根据corePoolSize来比较,否则为maximumPoolSize            if (wc >= CAPACITY ||                wc >= (core ? corePoolSize : maximumPoolSize))                return false;            // 添加workerCount(原子操作)            if (compareAndIncrementWorkerCount(c))                // 假如添加成功,则跳出                break retry;            // wc添加失败,则再次获取runState            c = ctl.get();  // Re-read ctl            // 假如当前的运行状态不等于rs,说明状态已被改变,返回重新执行            if (runStateOf(c) != rs)                continue retry;            // else CAS failed due to workerCount change; retry inner loop        }    }    boolean workerStarted = false;    boolean workerAdded = false;    Worker w = null;    try {        // 根据firstTask来创立Worker对象        w = new Worker(firstTask);        // 根据worker创立一个线程        final Thread t = w.thread;        if (t != null) {            // new一个锁            final ReentrantLock mainLock = this.mainLock;            // 加锁            mainLock.lock();            try {                // Recheck while holding lock.                // Back out on ThreadFactory failure or if                // shut down before lock acquired.                // 获取runState                int rs = runStateOf(ctl.get());                // 假如rs小于SHUTDOWN(处于运行)或者者(rs=SHUTDOWN && firstTask == null)                // firstTask == null证实只新建线程而不执行任务                if (rs < SHUTDOWN ||                    (rs == SHUTDOWN && firstTask == null)) {                    // 假如t活着就抛异常                    if (t.isAlive()) // precheck that t is startable                        throw new IllegalThreadStateException();                    // 否则加入worker(HashSet)                    //workers包含池中的所有工作线程。仅在持有mainLock时访问。                    workers.add(w);                    // 获取工作线程数量                    int s = workers.size();                    //largestPoolSize记录着线程池中出现过的最大线程数量                    if (s > largestPoolSize)                        // 假如 s比它还要大,则将s赋值给它                        largestPoolSize = s;                    // worker的增加工作状态改为true                        workerAdded = true;                }            } finally {                mainLock.unlock();            }            // 假如worker的增加工作完成            if (workerAdded) {                // 启动线程                t.start();                // 修改线程启动状态                workerStarted = true;            }        }    } finally {        if (! workerStarted)            addWorkerFailed(w);    }    // 返回线启动状态    return workerStarted;
为什么需要持有mainLock?

由于workers是HashSet类型的,不能保证线程安全。

w = new Worker(firstTask);如何了解呢

Worker.java

private final class Worker    extends AbstractQueuedSynchronizer    implements Runnable

可以看到它继承了AQS并发框架还实现了Runnable。证实它还是一个线程任务类。那我们调用t.start()事实上就是调用了该类重写的run方法。

Worker为什么不使用ReentrantLock来实现呢?

tryAcquire方法它是不允许重入的,而ReentrantLock是允许重入的。对于线程来说,假如线程正在执行是不允许其它锁重入进来的。

线程只要要两个状态,一个是独占锁,表明正在执行任务;一个是不加锁,表明是空闲状态。

public void run() {    runWorker(this);}

run方法又调用了runWorker方法:

final void runWorker(Worker w) {    // 拿到当前线程    Thread wt = Thread.currentThread();    // 拿到当前任务    Runnable task = w.firstTask;    // 将Worker.firstTask置空 并且释放锁    w.firstTask = null;    w.unlock(); // allow interrupts    boolean completedAbruptly = true;    try {        // 假如task或者者getTask不为空,则一直循环        while (task != null || (task = getTask()) != null) {            // 加锁            w.lock();            // If pool is stopping, ensure thread is interrupted;            // if not, ensure thread is not interrupted.  This            // requires a recheck in second case to deal with            // shutdownNow race while clearing interrupt            //  return ctl.get() >= stop             // 假如线程池状态>=STOP 或者者 (线程中断且线程池状态>=STOP)且当前线程没有中断            // 其实就是保证两点:            // 1. 线程池没有中止            // 2. 保证线程没有中断            if ((runStateAtLeast(ctl.get(), STOP) ||                 (Thread.interrupted() &&                  runStateAtLeast(ctl.get(), STOP))) &&                !wt.isInterrupted())                // 中断当前线程                wt.interrupt();            try {                // 空方法                beforeExecute(wt, task);                Throwable thrown = null;                try {                    // 执行run方法(Runable对象)                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                    afterExecute(task, thrown);                }            } finally {                // 执行完后, 将task置空, 完成任务++, 释放锁                task = null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly = false;    } finally {        // 退出工作        processWorkerExit(w, completedAbruptly);    }

总结一下runWorker方法的执行过程:

  1. while循环中,不断地通过getTask()方法从workerQueue中获取任务

  2. 假如线程池正在中止,则中断线程。否则调用3.

  3. 调用task.run()执行任务;

  4. 假如task为null则跳出循环,执行processWorkerExit()方法,销毁线程workers.remove(w);

这个流程图非常经典:

除此之外,ThreadPoolExector还提供了tryAcquiretryReleaseshutdownshutdownNowtryTerminate、等涉及的一系列线程状态更改的方法有兴趣可以自己研究。大体思路是一样的,这里不做详情。

Worker为什么不使用ReentrantLock来实现呢?

tryAcquire方法它是不允许重入的,而ReentrantLock是允许重入的。对于线程来说,假如线程正在执行是不允许其它锁重入进来的。

线程只要要两个状态,一个是独占锁,表明正在执行任务;一个是不加锁,表明是空闲状态。

在runWorker方法中,为什么要在执行任务的时候对每个工作线程都加锁呢?

shutdown方法与getTask方法存在竞态条件.(这里不做深入,建议自己深入研究,对它比较熟习的面试官一般会问)

高频考点

  1. 创立线程池的五个方法。
  2. 线程池的五个状态
  3. execute执行过程。
  4. runWorker执行过程。(把两个流程图记下,了解后说个大该就行。)
  5. 比较深入的问题就是我在文中插入的问题。
  6. …期望大家能在评论区补充。

读者福利

而针对以上面试技术点,我在这里也做少量技术知识面试专题资料分享,希望能更好的帮助到大家。
资料免费领取方式:点点喜欢,加入合作Java架构交流Qqun:935692859,管理员处免费领取资料。

imageimageimage

对于很多初级Java工程师而言,想要提升技能,往往是自己摸索成长,不成体系的学习效果低效漫长且无助。

整理的这些架构技术希望对Java开发的朋友们有所参考以及少走弯路,本文的重点是你有没有收获与成长,其他的都不重要,希望读者们能谨记这一点。同时我经过多年的收藏目前也算收集到了一套完整的学习资料,希望对想成为架构师的朋友有肯定的参考和帮助。

下面是部分资料截图,诚意满满:特别适合有2-5年开发经验的Java程序员们学习。

领取方式:点点喜欢,加入合作Java架构交流Qqun:935692859,管理员处免费领取资料。
(部分资料如下)

image

说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » 面试官:你分析过线程池源码吗?

发表回复