ThreadPool---ThreadPoolExecutor

例子一:

简历里写了写过一些多线程的程序,或者是看过一些并发领域的书,这种情况下通常我会问多线程中的一些更具体的知识点:线程池实现的细节、降低高并发程序锁竞争的方法等;

1 线程池的状态

ThreadPoolExecutor的实现中线程池有5种状态,用一个int的前3bit来表示。ThreadPoolExecutor生成后,线程池的状态是RUNNING。

No 名字 含义
1 RUNNING 接受新任务;执行已进入任务队列中的任务。
2 SHUTDOWN 不再接受新任务;但执行已进入任务队列中的任务。
3 STOP 不再接受新任务;也不执行已进入任务队列中的任务,同时还会去中断当前正在执行的任务
4 TIDYING 所有的任务都已经结束,并且workCount的数量为0。状态为TIDYING的线程会执行terminated() 钩子方法。
5 TERMINATED terminated() 已执行完成。

1.1 线程池的状态转换

  • RUNNING -> SHUTDOWN
    1 On invocation of shutdown(), perhaps implicitly in finalize()
    
  • (RUNNING or SHUTDOWN) -> STOP
    1 On invocation of shutdownNow()
    
  • SHUTDOWN -> TIDYING
    • When both queue and pool are empty
  • STOP -> TIDYING
    • When pool is empty
  • TIDYING -> TERMINATED
    • When the terminated() hook method has completed

1.2 线程池的阻塞队列有什么用?

阻塞队列有两个作用:第一是为了控制线程存活,通过workQueue的take和pull实现;第二是为了存放Runnable对象,以便线程池里空闲的线程处理。

2 线程池的核心参数

名字 含义
corePoolSize 设置一个线程池中的核心线程数。 如果设置allowCoreThreadTimeOut为false的情况下: 即使当线程池中的线程处于空闲状态,这些线程也不会被线程池中移除。 如果设置了allowCoreThreadTimeOut为true, 那么当核心线程在空闲了一段时间后依旧没有用于工作,那么将会从线程池中移除。 注意:(allowCoreThreadTimeOut默认为false,通常情况下也无需做修改)
maximumPoolSize 线程池中所允许创建最大线程数量
keepAliveTime 当线程池中的线程数量大于核心线程数,如果这些多出的线程在经过了keepAliveTime时间后,依然处于空闲状态,那么这些多出的空闲线程将会被结束其生命周期。
workQueue 用于存放任务的阻塞队列,当线程池中的核心线程都处在执行任务时,提交的任务将被存储在workQueue进行缓冲。
threadFactory 线程池中用于创建线程的工厂
handler 当线程池中的线程数量达到最大并且阻塞队列也已经满了无法再添加任务时,线程池所采取的处理策略。

3 线程池中线程的生命周期

  1. 线程创建并加入线程池
    1. prestartCoreThread()调用addWoker()
    2. execute()调用addWoker()
  2. addWoker(): 根据给定的任务来创建线程,创建的过程中会根据实际的线程数量以及状态来判断是否去创建。
  3. 如果addWoker()创建了线程T,并把T加入线程池中,就会调用Worker.run() --->ThreadPoolExecutor.runWorker(Wroker)方法开始运行线程T。
  4. runWorker()调用getTask()获取要执行的任务:如果有任务返回,就执行。
  5. getTask()能从队列中获取任务,就返回获取的任务,否则可能返回null(活跃线程数大于corePoolSize,并且获取任务时超时),也可能在获取任务时被阻塞(活跃线程数<=corePoolSize,并且allowCoreThreadTimeOut=false)。
  6. runWorker()循环调用getTask()获取要执行的任务,当返回的task为null时,退出循环,执行processWorkerExit()
  7. processWorkerExit():从线程池中删除T,并调用tryTerminate()方法。
final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

4 线程池的生命周期

4.1 awaitTermination的作用?

shutdown() just ensures that no new tasks are accepted. The submitted tasks and currently running tasks are allowed to continue.

Sometimes this is not what you'd want. You'd probably want to wait for sometime before you issue another shutdownNow(), in which case an attempt would be made to interrupt the currently executing tasks.

4.2 ThreadPoolExecutor的用法

@Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println("Thread " + name + " started at "
                + System.currentTimeMillis());
        Task[] tasks = new Task[20];
        ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 15, 2000L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());

        for (int i = 0; i < 20; i++) {
            pool.execute(new Task());
        }
        System.out.println("Thread " + name + " is done creating"
                + " tasks at " + System.currentTimeMillis());

        pool.shutdown();
        System.out.println("Thread " + name + " invoked shutdown at "
                + System.currentTimeMillis());
        try {
            pool.awaitTermination(5000L, TimeUnit.MILLISECONDS);
            System.out.println("Thread " + name + " invoked "
                    + "shutdownNow at "
                    + System.currentTimeMillis());

            pool.shutdownNow();
        } catch (InterruptedException ex) {
            System.out.println("Thread " + name + " was INTERRUPTED AT "
                    + System.currentTimeMillis());
            System.out.println("Thread " + name + " invoked shutdownNow at "
                    + System.currentTimeMillis());
            pool.shutdownNow();
        }

        System.out.println("Thread " + name + " ended at "
                + System.currentTimeMillis());
    }

5 负载过高我该怎么办?

当线程池实际的数据量到达最大值时,如果再次提交新的执行,则会拒绝执行。那么ThreadPoolExecutor是如何拒绝执行的呢?

ThreadPool中默认的拒绝策略使用的是中断策略,即当无法接哦受新的任务时,直接抛出RejectedExecutionException异常。

如果一个ThreadPoolExecutor池中的线程进入waiting状态,它不能执行其他任务。

References

  1. ExecutorService - 10 tips and tricks
  2. 我是一个线程
  3. 锁的优化和注意事项

results matching ""

    No results matching ""