Java利用线程工厂监控线程池实现代码示例

作者:袖梨 2022-06-29

本篇文章小编给大家分享一下Java利用线程工厂监控线程池实现代码示例,文章代码介绍的很详细,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。

ThreadFactory

线程池中的线程从哪里来呢?就是ThreadFoctory

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

Threadfactory里面有个接口,当线程池中需要创建线程就会调用该方法,也可以自定义线程工厂

public class ThreadfactoryText {
    public static void main(String[] args) {
        Runnable runnable=new Runnable() {
            @Override
            public void run() {
                int num=new Random().nextInt(10);
                System.out.println(Thread.currentThread().getId()+"--"+System.currentTimeMillis()+"--睡眠"+num);
                try {
                    TimeUnit.SECONDS.sleep(num);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //创建线程池 使用自定义线程工厂 采用默认的拒绝策略
        ExecutorService executorService=new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t=new Thread(r);
                t.setDaemon(true);//设置为守护线程,当主线程运行结束,线程池中线程也会被释放
                System.out.println("创建了线程"+t);
                return t;
            }
        });
        //提交五个任务
        for (int i = 0; i < 5; i++) {
            executorService.submit(runnable);
        }
    }
}

当线程提交超过五个任务时,线程池会默认抛出异常

监控线程池

ThreadPoolExcutor提供了一组方法用于监控线程池

int getActiveCount()//获得线程池只当前的获得线程数量
long getCompletedTaskCount()//返回线程池完成任务数量
int getCorePoolSize()//线程池中核心任务数量
int getLargestPoolSize() //返回线程池中曾经达到线程的最大数
int getMaximumPoolSize()//返回线程池的最大容量
int getPoolSize()//返回线程大小
BlockingQueue getQueue()//返回阻塞队列
long getTaskCount()//返回线程池收到任务总数
public class Text {
    public static void main(String[] args) throws InterruptedException {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getId() + "线程开始执行--" + System.currentTimeMillis());
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //创建线程池 使用默认线程工厂 有界队列  采用DiscardPolicy策略
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(2, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());
        //提交五个任务
        for (int i = 0; i < 30; i++) {
            executorService.submit(runnable);
            System.out.println("当前线程核心线程数"+executorService.getCorePoolSize()+",最大线程数:"+executorService.getMaximumPoolSize()+",当前线程池大小:"+executorService.getPoolSize()+"活动线程数:"+executorService.getActiveCount()+",收到任务:"+executorService.getTaskCount()+"完成任务数:"+executorService.getCompletedTaskCount()+"等待任务数:"+executorService.getQueue().size());
            TimeUnit.MILLISECONDS.sleep(500);
        }
        System.out.println("-------------------");
        while (executorService.getActiveCount()>=0)//继续对线程池进行检测
        {
          System.out.println("当前线程核心线程数"+executorService.getCorePoolSize()+",最大线程数:"+executorService.getMaximumPoolSize()+",当前线程池大小:"+executorService.getPoolSize()+"活动线程数:"+executorService.getActiveCount()+",收到任务:"+executorService.getTaskCount()+"完成任务数:"+executorService.getCompletedTaskCount()+"等待任务数:"+executorService.getQueue().size());
            Thread.sleep(1000);//每1秒检测一次
        }

    }
}

当线程池大小达到了核心线程数,线程会被放在等待队列。当线程池等待队列已满会开启新的线程。当当前线程大小达到最大线程数,等待队列也满了,再提交的话会执行DiscardPolicy策略,直接丢弃这个无法处理的任务,最后30个任务只剩下15个了。

原理如图:

扩展线程池

有时候需要对线程池进行扩展,如在监控每个任务开始和结束时间,或者自定义其他增强功能。

ThreadPoolExecutor线程池提供了两个方法:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }

线程池执行某个任务前会执行beforeExecute()方法,执行后会调用afterExecute()方法

查看ThreadPoolExecutor源码,在该类中定义了一个内部类Worker,ThreadPoolExecutor线程池的工作线程就是Worker类的实例,Worker实例在执行时会调用beforeExecute与afterExecute方法。

public void run() {
            runWorker(this);
}
final void runWorker(Worker w) {
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
    }      

部分代码已省略,线程执行前会调用beforeExecute,执行后会调用afterExecute方法。

扩展线程池示例

package com;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Text07 {
    public static void main(String[] args) {

        //定义扩展线程池 定义线程池类继承ThreadPoolExecutor,然后重写其他方法
        ExecutorService threadPoolExecutor=
 new ThreadPoolExecutor(5,5,0, TimeUnit.SECONDS,new LinkedBlockingDeque<>()){
     //在内部类重写开始方法
     @Override
     protected void beforeExecute(Thread t, Runnable r) {
         System.out.println(t.getId()+"线程准备执行任务"+((Mytask)r).name);
     }
     //在内部类重写结束方法
     @Override
     protected void afterExecute(Runnable r, Throwable t) {
         System.out.println(((Mytask)r).name+"执行完成");
     }
     //线程池退出
     @Override
     protected void terminated() {
         System.out.println("线程池退出");
     }
 };
        for (int i = 0; i < 5; i++) {
            Mytask mytask=new Mytask("Thread"+i);
            threadPoolExecutor.execute(mytask);
        }
    }
    private  static  class  Mytask implements Runnable
    {
        private  String name;

        public  Mytask(String name)
        {
            this.name=name;
        }
        @Override
        public void run() {
            System.out.println(name+"正在被执行"+Thread.currentThread().getId());
            try {
                Thread.sleep(1000);//模拟任务时长
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

优化线程池大小

线程池大小对系统性能有一定影响,过大或者过小都无法方法发挥系统最佳性能,不需要非常精确,只要避免极大或者极小就可以了,一般来说线程池大小大姚考虑CPU数量

线程池大小=CPU数量 * 目标CPU使用率*(1+等待时间与计算时间的比)

线程池死锁

如果线程池执行中,任务A在执行过程中提交了任务B,任务B添加到线程池中的等待队列,如果A的结束需要B的执行结果,而B线程需要等待A线程执行完毕,就可能会使其他所有工作线程都处于等待状态,待这些任务在阻塞队列中执行。线程池中没有可以对阻塞队列进行处理的线程,就会一直等待下去照成死锁。

适合给线程池提交相互独立的任务,而不是彼此依赖的任务,对于彼此依赖的任务,可以考虑分别提交给不同的线程池来处理。

线程池异常信息捕获

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Text09 {
    public static void main(String[] args) {
        //创建线程池
        ExecutorService executorService=new ThreadPoolExecutor(5,5,0, TimeUnit.SECONDS,new SynchronousQueue<>());
        //向线程池中添加两个数相处计算的任务
        for (int i = 0; i <5 ; i++) {
            executorService.submit(new Text(10,i));
        }

    }
    private  static class  Text implements  Runnable
    {
        private  int x;
        private  int y;
        public  Text(int x,int y)
        {
            this.x=x;
            this.y=y;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"线程x/y结果的为"+x+"/"+y+"="+(x/y));
        }
    }
}

可以看到只有四条结果,实际向线程池提交了五个任务,但是当i==0时,产生了算术异常,线程池把该异常吃掉了,导致我们对该异常一无所知

解决办法:

1.把submit改为execute

2.对线程池进行扩展,对submit进行包装

package com;

import java.util.concurrent.*;

public class Text09 {
    public static void main(String[] args) {
        //创建线程池  使用自定义的线程池
        ExecutorService executorService=new TranceThreadPoorExcuter(5,5,0, TimeUnit.SECONDS,new SynchronousQueue<>());
        //向线程池中添加两个数相处计算的任务
        for (int i = 0; i <5 ; i++) {
            executorService.submit(new Text(10,i));
        }

    }
    public  static class  Text implements  Runnable
    {
        public  int x;
        public  int y;
        public  Text(int x,int y)
        {
            this.x=x;
            this.y=y;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"线程x/y结果的为"+x+"/"+y+"="+(x/y));
        }
    }
    //自定义线程池类 对TranceThreadPoorExcuter进行扩展
    private  static  class  TranceThreadPoorExcuter extends  ThreadPoolExecutor
    {

        public TranceThreadPoorExcuter(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
        //定义一个方法用于传入两个参数 第一个是要接受的任务 第二个是Exception
        public  Runnable warp(Runnable r,Exception e)
        {
            return new Runnable() {
                @Override
                public void run() {

                    try {
                        r.run();
                    }
                    catch (Exception e1)
                    {
                        e.printStackTrace();
                        throw e1;
                    }
                }
            };
        }
        //重写submit方法
        @Override
        public Future submit(Runnable task) {
            return super.submit(warp(task,new Exception("客户跟踪异常")));
        }
        //还可以重写excute方法
    }
}

此方法使用了自定义的线程池,重写线程池中的submit方法,在submit方法中,把要传入的任务参数带一个捕获异常信息的功能就可以捕获线程池异常。

相关文章

精彩推荐