iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >Java自定义线程池的实现示例
  • 392
分享到

Java自定义线程池的实现示例

2024-04-02 19:04:59 392人浏览 泡泡鱼

Python 官方文档:入门教程 => 点击学习

摘要

目录一、Java语言本身也是多线程,回顾Java创建线程方式如下:二、jdk线程池工具类.三、业界知名自定义线程池扩展使用.一、Java语言本身也是多线程,回顾Java创建线程方式如

一、Java语言本身也是多线程,回顾Java创建线程方式如下:

1、继承Thread类,(Thread类实现Runnable接口),来个类图加深印象。

2、实现Runnable接口实现无返回值、实现run()方法,啥时候run,黑话了。

3、实现Callable接口重写call()+FutureTask获取.

public class CustomThread {
    public static void main(String[] args) {
        // 自定义线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("Custom Run");
                System.out.println(Thread.currentThread().getName());
            }
        },"custom-thread-1").start();
    }
}

4、基于线程池集中管理创建线程系列周期.【本篇文章重点介绍】

二、JDK线程池工具类.

1、Executors工具类,是JDK中Doug Lea大佬实现供开发者使用。

随着JDK版本迭代逐渐加入了基于工作窃取算法的线程池了,阿里编码规范也推荐开发者自定义线程池,禁止生产直接使用Executos线程池工具类,因此很有可能造成OOM异常。同时在某些类型的线程池里面,使用无界队列还会导致maxinumPoolSize、keepAliveTime、handler等参数失效。因此目前在大厂的开发规范中会强调禁止使用Executors来创建线程池。这里说道阻塞队列。LinkedBlockingQueue。

2、自定义线程池工具类基于ThreadPoolExecutor实现,那个JDK封装的线程池工具类也是基于这个ThreadPoolExecutor实现的。

public class ConstomThreadPool extends ThreadPoolExecutor{
    
    public ConstomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        setThreadFactory(new CustomThreadFactory(poolName, false));
    }
}

 自定义线程工厂类,这样线程命名有开发者控制实现了,这样参数可以做到可配置化,生产环境可以供不同业务模块使用,如果系统配置值不生效,就给一个默认值,更加满足业务需要.


public class CustomThreadFactory implements ThreadFactory {
    
    private final AtomicInteger atomicInteger = new AtomicInteger(1);
    
    private final String namePrefix;
    
    private final boolean isDaemon;
 
    public CustomThreadFactory(String prefix, boolean daemin) {
        if (StringUtils.isNoneBlank(prefix)) {
            this.namePrefix = prefix;
        } else {
            this.namePrefix = "thread_pool";
        }
        // 是否是守护线程
        isDaemon = daemin;
    }
 
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r, namePrefix + "-" + atomicInteger.getAndIncrement());
        thread.setDaemon(isDaemon);
        // 设置线程优先级
        if (thread.getPriority() != Thread.NORM_PRIORITY) {
            thread.setPriority(Thread.NORM_PRIORITY);
        }
        return thread;
    }
}

 这里spring框架提供的自定义线程池工厂类,当然了一些开源包也会提供这样的轮子,这个比较简单了.

@SuppressWarnings("serial")
public class CustomizableThreadFactory extends CustomizableThreadCreator implements ThreadFactory {
 
	
	public CustomizableThreadFactory() {
		super();
	}
 
	
	public CustomizableThreadFactory(String threadNamePrefix) {
		super(threadNamePrefix);
	}
 
 
	@Override
	public Thread newThread(Runnable runnable) {
		return createThread(runnable);
	}
 
}

 3、SpringBoot框架提供的自定义线程池,基于异步注解@Async名称和一些业务自定义配置项,很好的实现了业务间线程池的隔离。

@Configuration
public class ThreadPoolConfig {
    
    @Bean("serviceTaskA")
    public ThreadPoolTaskExecutor serviceTaskA() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(10);
        executor.seTKEepAliveSeconds(60);
        executor.setThreadNamePrefix("service-a");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
 
    
    @Bean("serviceTaskB")
    public ThreadPoolTaskExecutor serviceTaskB() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(10);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("service-b");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
}

整体来看是Spring框架对JDK的线程池做了封装,公开发者使用,毕竟框架嘛,肯定是把方便留给开发者。

4、并发流线程池。

        List<String> list = new ArrayList<>(4);
        list.add("A");
        list.add("B");
        list.add("C");
        list.add("D");
        list.parallelStream().forEach(string -> {
            string = string + "paralleStream";
            System.out.println(Thread.currentThread().getName()+":-> "+string);
        });

运行实例:

说明:并发流默认使用系统公共的线程池ForkJoinWorkerThread,供整个程序使用。

 类图如下,基于分治法,双端窃取算法实现的一种线程池。

 ForkJoin实现的了自己的线程工厂命名。

 也可以自定义并发流线程,然后提交任务,一般并发流适用于短暂耗时业务,避免拖垮整个线程池业务.

5、实现一个基于系统公用线程池工具类,运行这个系统中的异步业务.

public final class CustomExecutors  {
    
    private static final int CORE_POOL_SIZE=5;
    
    private static final int MAX_POOL_SIZE=10;
    
    private static final int KEEP_ALIVE_TIME=60;
    
    private static final LinkedBlockingQueue queue=new LinkedBlockingQueue(100);
    
    private static final String POOL_PREFIX_NAME="Custom-Common-Pool";
 
    private CustomExecutors(){
        //throw new XXXXException("un support create pool!");
    }
 
    private static ConstomThreadPool constomThreadPool;
 
    
    static {
        constomThreadPool=new ConstomThreadPool(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME,TimeUnit.SECONDS,queue,POOL_PREFIX_NAME);
    }
 
    
    private static ExecutorService getInstance(){
        return constomThreadPool;
    }
 
    private static Future<?> submit(Runnable task){
       return constomThreadPool.submit(task);
    }
 
    private static <T> Future<T> submit(Runnable task, T result){
        return constomThreadPool.submit(task,result);
    }
 
    private static <T> Future<T> submit(Callable<T> task){
        return constomThreadPool.submit(task);
    }
 
    private static void execute(Runnable task){
        constomThreadPool.execute(task);
    }
}

三、业界知名自定义线程池扩展使用.

1、org.apache.Tomcat.util.threads;【Tomcat线程池】

 2、XXL-JOB分布式任务调度框架的快慢线程池,线程池任务隔离.

public class JobTriggerPoolHelper {
    private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);
 
 
    // ---------------------- trigger pool ----------------------
 
    // fast/slow thread pool
    private ThreadPoolExecutor fastTriggerPool = null;
    private ThreadPoolExecutor slowTriggerPool = null;
 
    public void start(){
        fastTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                    }
                });
 
        slowTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                    }
                });
    }
 
 
    public void stop() {
        //triggerPool.shutdown();
        fastTriggerPool.shutdownNow();
        slowTriggerPool.shutdownNow();
        logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
    }
 
 
    // job timeout count
    private volatile long minTim = System.currentTimeMillis()/60000;     // ms > min
    private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
 
 
    
    public void addTrigger(final int jobId,
                           final TriggerTypeEnum triggerType,
                           final int failRetryCount,
                           final String executorShardingParam,
                           final String executorParam,
                           final String addressList) {
 
        // choose thread pool
        ThreadPoolExecutor triggerPool_ = fastTriggerPool;
        AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
        if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
            triggerPool_ = slowTriggerPool;
        }
 
        // trigger
        triggerPool_.execute(new Runnable() {
            @Override
            public void run() {
 
                long start = System.currentTimeMillis();
 
                try {
                    // do trigger
                    XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                } finally {
 
                    // check timeout-count-map
                    long minTim_now = System.currentTimeMillis()/60000;
                    if (minTim != minTim_now) {
                        minTim = minTim_now;
                        jobTimeoutCountMap.clear();
                    }
 
                    // incr timeout-count-map
                    long cost = System.currentTimeMillis()-start;
                    if (cost > 500) {       // ob-timeout threshold 500ms
                        AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                        if (timeoutCount != null) {
                            timeoutCount.incrementAndGet();
                        }
                    }
 
                }
 
            }
        });
    }
 
 
 
    // ---------------------- helper ----------------------
 
    private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
 
    public static void toStart() {
        helper.start();
    }
    public static void toStop() {
        helper.stop();
    }
 
    
    public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
        helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
    }
 
}

①、定义两个线程池,一个是fastTriggerPool,另一个是slowTriggerPool。
②、定义一个容器ConcurrentMap,存放每个任务的执行慢次数,60秒后自动清空该容器。
③、在线程的run()方法中计算每个任务的耗时,如果大于500ms,则任务的慢执行次数+1。

 3、基于线程池动态监控动态线程池 

引用图片,线程池常见问题

 还有比较多啦,例如ES的基于JDK的线程池,dubbo中等。

到此这篇关于Java自定义线程池的实现示例的文章就介绍到这了,更多相关Java自定义线程池内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: Java自定义线程池的实现示例

本文链接: https://www.lsjlt.com/news/139989.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Java自定义线程池的实现示例
    目录一、Java语言本身也是多线程,回顾Java创建线程方式如下:二、JDK线程池工具类.三、业界知名自定义线程池扩展使用.一、Java语言本身也是多线程,回顾Java创建线程方式如...
    99+
    2022-11-13
  • C#实现自定义线程池实例代码
    在项目中如果是web请求时候,IIS会自动分配一个线程来进行处理,如果很多个应用程序共享公用一个IIS的时候,线程分配可能会出现一个问题(当然也是我的需求造成的) 之前在做项目的时候...
    99+
    2022-11-13
  • Java中线程池自定义实现详解
    目录前言线程为什么不能多次调用start方法线程池到底是如何复用的前言 最初使用线程池的时候,网上的文章告诉我说线程池可以线程复用,提高线程的创建效率。从此我的脑海中便为线程池打上了...
    99+
    2023-03-01
    Java线程池自定义 Java线程池
  • Java中线程池自定义如何实现
    这篇“Java中线程池自定义如何实现”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Java中线程池自定义如何实现”文章吧。线...
    99+
    2023-07-05
  • Java多线程 自定义线程池详情
    主要介绍: 1.任务队列 2.拒绝策略(抛出异常、直接丢弃、阻塞、临时队列) 3.init( min ) 4.active 5.max ...
    99+
    2022-11-12
  • Java中怎么自定义线程池
    本篇文章给大家分享的是有关Java中怎么自定义线程池,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。Java代码ThreadPoolExecutor  &nb...
    99+
    2023-06-17
  • Python探索之自定义实现线程池
    为什么需要线程池呢? 设想一下,如果我们使用有任务就开启一个子线程处理,处理完成后,销毁子线程或等得子线程自然死亡,那么如果我们的任务所需时间比较短,但是任务数量比较多,那么更多的时间是花...
    99+
    2022-06-05
    自定义 线程 Python
  • java怎么自定义并发线程池
    要自定义并发线程池,可以使用`ThreadPoolExecutor`类来实现。`ThreadPoolExecutor`是`Execu...
    99+
    2023-10-25
    java
  • 怎么在Java中自定义线程池
    这篇文章给大家介绍怎么在Java中自定义线程池,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。【1】接口定义public interface IThreadPool<Job exten...
    99+
    2023-06-06
  • Java如何自定义线程池中队列
    目录背景问题分析问题解决总结两个队列的UML关系图SynchronousQueue的定义ArrayBlockingQueue的定义分析jdk源码中关于线程池队列的说明背景 业务交互的...
    99+
    2022-11-13
  • Java怎么自定义线程池中队列
    本篇内容介绍了“Java怎么自定义线程池中队列”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!背景业务交互的过程中涉及到了很多关于SFTP下载...
    99+
    2023-07-02
  • 怎么在java项目中自定义线程池
    怎么在java项目中自定义线程池?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。使用线程池时,可以使用 newCachedThreadPool()或者 newFi...
    99+
    2023-05-31
    java 线程池 ava
  • 线程池之newFixedThreadPool定长线程池的实例
    newFixedThreadPool定长线程池的实例 newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。newFixedThr...
    99+
    2022-11-12
  • Java利用线程工厂监控线程池的实现示例
    ThreadFactory 线程池中的线程从哪里来呢?就是ThreadFoctory public interface ThreadFactory { Thread ne...
    99+
    2022-11-12
  • Java线程池的示例分析
    小编给大家分享一下Java线程池的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!为什么需要线程池我们知道创建线程的常用方式就是 new Thread() ...
    99+
    2023-06-22
  • Android编程实现自定义toast示例
    本文实例讲述了Android编程实现自定义toast。分享给大家供大家参考,具体如下: 效果图: 代码: //自定义布局的toast customViewToast.set...
    99+
    2022-06-06
    toast Android
  • Java实现手写一个线程池的示例代码
    目录概述线程池框架设计代码实现阻塞队列的实现线程池消费端实现获取任务超时设计拒绝策略设计概述 线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个...
    99+
    2022-11-13
    Java手写线程池 Java线程池
  • Java中线程池的示例分析
    小编给大家分享一下Java中线程池的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!Java线程池线程的缺点:线程的创建它会开辟本地方法栈、JVM栈、程序计...
    99+
    2023-06-20
  • springboot为异步任务规划自定义线程池的实现
    目录一、Spring Boot任务线程池二、自定义线程池三、优雅地关闭线程池一、Spring Boot任务线程池 线程池的作用 防止资源占用无限的扩张调用过程省去资源的创建和销毁所占...
    99+
    2022-11-13
  • 关于dubbo 自定义线程池的问题
    目录初识dubbo一、什么是dubbo?二、为什么要用dubbo前言dubbo线程池dubbo线程池说明自定义线程池代码实现步骤初识dubbo 一、什么是dubbo? Dubbo是阿...
    99+
    2022-11-13
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作