iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >java项目中的多线程实践记录
  • 155
分享到

java项目中的多线程实践记录

2024-04-02 19:04:59 155人浏览 独家记忆

Python 官方文档:入门教程 => 点击学习

摘要

项目开发中对于一些数据的处理需要用到多线程,比如文件的批量上传,数据库的分批写入,大文件的分段下载等。 通常会使用spring自带的线程池处理,做到对线程的定制化处理和更好的可控,建

项目开发中对于一些数据的处理需要用到多线程,比如文件的批量上传,数据库的分批写入,大文件的分段下载等。 通常会使用spring自带的线程池处理,做到对线程的定制化处理和更好的可控,建议使用自定义的线程池。 主要涉及到的几个点:

1. 自定义线程工厂(ThreadFactoryBuilder),主要用于线程的命名,方便追踪

2. 自定义的线程池(ThreadPoolExecutorUtils),可以按功能优化配置参数

3. 一个抽象的多线程任务处理接口(OperationThreadService)和通用实现(OperationThread)

4. 统一的调度实现(MultiThreadOperationUtils)

核心思想:分治归并,每个线程计算出自己的结果,最后统一汇总。


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;


public class ThreadFactoryBuilder {
    private static Logger logger = LoggerFactory.getLogger(ThreadFactoryBuilder.class);

    private String nameFORMat = null;
    private boolean daemon = false;
    private int priority = Thread.NORM_PRIORITY;

    public ThreadFactoryBuilder setNameFormat(String nameFormat) {
        if (nameFormat == null) {
            throw new NullPointerException();
        }
        this.nameFormat = nameFormat;
        return this;
    }

    public ThreadFactoryBuilder setDaemon(boolean daemon) {
        this.daemon = daemon;
        return this;
    }

    public ThreadFactoryBuilder setPriority(int priority) {
        if (priority < Thread.MIN_PRIORITY) {
            throw new IllegalArgumentException(String.format(
                    "Thread priority (%s) must be >= %s", priority, Thread.MIN_PRIORITY));
        }

        if (priority > Thread.MAX_PRIORITY) {
            throw new IllegalArgumentException(String.format(
                    "Thread priority (%s) must be <= %s", priority, Thread.MAX_PRIORITY));
        }

        this.priority = priority;
        return this;
    }

    public ThreadFactory build() {
        return build(this);
    }

    private static ThreadFactory build(ThreadFactoryBuilder builder) {
        final String nameFormat = builder.nameFormat;
        final Boolean daemon = builder.daemon;
        final Integer priority = builder.priority;
        final AtomicLong count = new AtomicLong(0);

        return (Runnable runnable) -> {
            Thread thread = new Thread(runnable);
            if (nameFormat != null) {
                thread.setName(String.format(nameFormat, count.getAndIncrement()));
            }
            if (daemon != null) {
                thread.setDaemon(daemon);
            }
            thread.setPriority(priority);
            thread.setUncaughtExceptionHandler((t, e) -> {
                String threadName = t.getName();
                logger.error("error occurred! threadName: {}, error msg: {}", threadName, e.getMessage(), e);
            });
            return thread;
        };
    }
}

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;


public class ThreadPoolExecutorUtils {
    private static Logger logger = LoggerFactory.getLogger(ThreadFactoryBuilder.class);

    public static int defaultCoreSize = Runtime.getRuntime().availableProcessors();
    private static int pollWaitingTime = 60;
    private static int defaultQueueSize = 10 * 1000;
    private static int defaultMaxSize = 4 * defaultCoreSize;
    private static String threadName = "custom-pool";

    
    public static ThreadPoolExecutor getExecutorPool(int waitingTime, int coreSize, int maxPoolSize, int queueSize) {
        pollWaitingTime = waitingTime;
        defaultCoreSize = coreSize;
        defaultMaxSize = maxPoolSize;
        defaultQueueSize = queueSize;
        return getExecutorPool();
    }

    
    public static ThreadPoolExecutor getExecutorPool(int waitingTime, int queueSize, int maxPoolSize) {
        pollWaitingTime = waitingTime;
        defaultQueueSize = queueSize;
        defaultMaxSize = maxPoolSize;
        return getExecutorPool();
    }

    
    public static ThreadPoolExecutor getExecutorPool(int waitingTime, int queueSize) {
        pollWaitingTime = waitingTime;
        defaultQueueSize = queueSize;
        return getExecutorPool();
    }

    
    public static ThreadPoolExecutor getExecutorPool(int waitingTime) {
        pollWaitingTime = waitingTime;
        return getExecutorPool();
    }

    
    public static ThreadPoolExecutor getExecutorPool() {
        return getExecutorPool(threadName);
    }

    
    public static ThreadPoolExecutor getExecutorPool(String threadName) {
        ThreadFactory factory = new ThreadFactoryBuilder()
                .setNameFormat(threadName + "-%d")
                .build();
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(defaultQueueSize);
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(defaultCoreSize,
                defaultMaxSize, 60, TimeUnit.SECONDS, queue, factory,
                (r, executor) -> {
                    
                    if (!executor.isshutdown()) {
                        logger.warn("ThreadPoolExecutor is over working, please check the thread tasks! ");
                    }
                }) {

            
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                super.afterExecute(r, t);
                if (t == null && r instanceof Future<?>) {
                    try {
                        Future<?> future = (Future<?>) r;
                        future.get();
                    } catch (CancellationException ce) {
                        t = ce;
                    } catch (ExecutionException ee) {
                        t = ee.getCause();
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
                if (t != null) {
                    logger.error("customThreadPool error msg: {}", t.getMessage(), t);
                }
            }
        };
        
        poolExecutor.prestartAllCoreThreads();
        return poolExecutor;
    }

    
    public static void closeAfterComplete(ThreadPoolExecutor pool) {
        
        pool.shutdown();
        try {
            
            if (!pool.awaitTermination(pollWaitingTime, TimeUnit.SECONDS)) {
                
                pool.shutdownNow();
            }
        } catch (InterruptedException e) {
            logger.error("ThreadPool overtime: {}", e.getMessage());
            //(重新)丢弃所有尚未被处理的任务,同时会设置线程池中每个线程的中断标志位
            pool.shutdownNow();
            // 保持中断状态
            Thread.currentThread().interrupt();
        }
    }
}

import java.util.Arrays;


public class PartitionElements {
    
    private long index;
    
    private long batchCounts;
    
    private long partitions;
    
    private long totalCounts;
    private Object[] args;
    private Object data;

    public PartitionElements() {

    }

    public PartitionElements(long batchCounts, long totalCounts, Object[] args) {
        this.batchCounts = batchCounts;
        this.totalCounts = totalCounts;
        this.partitions = aquirePartitions(totalCounts, batchCounts);
        this.args = args;
    }

    public PartitionElements(long index, PartitionElements elements) {
        this.index = index;
        this.batchCounts = elements.getBatchCounts();
        this.partitions = elements.getPartitions();
        this.totalCounts = elements.getTotalCounts();
        this.args = elements.getArgs();
    }

    
    public long aquirePartitions(long totalCounts, long batchCounts) {
        long partitions = totalCounts / batchCounts;
        if (totalCounts % batchCounts != 0) {
            partitions = partitions + 1;
        }
        //  兼容任务总数total = 1 的情况
        if (partitions == 0) {
            partitions = 1;
        }
        return partitions;
    }

    public long getIndex() {
        return index;
    }

    public void setIndex(long index) {
        this.index = index;
    }

    public long getBatchCounts() {
        return batchCounts;
    }

    public void setBatchCounts(long batchCounts) {
        this.batchCounts = batchCounts;
    }

    public long getPartitions() {
        return partitions;
    }

    public void setPartitions(long partitions) {
        this.partitions = partitions;
    }

    public long getTotalCounts() {
        return totalCounts;
    }

    public void setTotalCounts(long totalCounts) {
        this.totalCounts = totalCounts;
    }

    public Object[] getArgs() {
        return args;
    }

    public void setArgs(Object[] args) {
        this.args = args;
    }

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }

    @Override
    public String toString() {
        return "PartitionElements{" +
                "index=" + index +
                ", batchCounts=" + batchCounts +
                ", partitions=" + partitions +
                ", totalCounts=" + totalCounts +
                ", args=" + Arrays.toString(args) +
                '}';
    }
}

import cn.henry.study.common.bo.PartitionElements;


public interface OperationThreadService {

    
    long count(Object[] args) throws Exception;

    
    Object prepare(Object[] args) throws Exception;

    
    Object invoke(PartitionElements elements) throws Exception;

    
    void post(PartitionElements elements, Object object) throws Exception;

    
    Object finished(Object object) throws Exception;

}

import cn.henry.study.common.bo.PartitionElements;
import cn.henry.study.common.service.OperationThreadService;
import cn.henry.study.common.thread.OperationThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;


public class MultiThreadOperationUtils {
    private static Logger logger = LoggerFactory.getLogger(MultiThreadOperationUtils.class);

    
    public static Object batchExecute(OperationThreadService service, Object[] args) throws Exception {
        long totalCounts = service.count(args);
        long batchCounts = totalCounts / ThreadPoolExecutorUtils.defaultCoreSize;
        // 兼容任务少于核心线程数的情况
        if (batchCounts == 0) {
            batchCounts = 1L;
        }
        PartitionElements elements = new PartitionElements(batchCounts, totalCounts, args);
        return batchExecute(service, elements);
    }

    
    public static Object batchExecute(OperationThreadService service, long batchCounts, Object[] args) throws Exception {
        long totalCounts = service.count(args);
        PartitionElements elements = new PartitionElements(batchCounts, totalCounts, args);
        return batchExecute(service, elements);
    }

    
    private static Object batchExecute(OperationThreadService service, PartitionElements elements) throws Exception {
        ThreadPoolExecutor executor = ThreadPoolExecutorUtils.getExecutorPool();
        // 在多线程分治任务之前的预处理方法,返回业务数据
        final Object obj = service.prepare(elements.getArgs());
        // 预防list和map的resize,初始化给定容量,可提高性能
        ArrayList<Future<PartitionElements>> futures = new ArrayList<>((int) elements.getPartitions());
        OperationThread opThread = null;
        Future<PartitionElements> future = null;
        // 添加线程任务
        for (int i = 0; i < elements.getPartitions(); i++) {
            // 划定任务分布
            opThread = new OperationThread(new PartitionElements(i + 1, elements), service);
            future = executor.submit(opThread);
            futures.add(future);
        }
        // 关闭线程池
        executor.shutdown();
        // 阻塞线程,同步处理数据
        futures.forEach(f -> {
            try {
                // 线程单个任务结束后的归并方法
                service.post(f.get(), obj);
            } catch (Exception e) {
                logger.error("post routine fail", e);
            }
        });
        return service.finished(obj);
    }

}

import cn.henry.study.common.bo.PartitionElements;
import cn.henry.study.common.service.OperationThreadService;
import cn.henry.study.common.utils.MultiThreadOperationUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;


public class MultiThreadServiceTest implements OperationThreadService {
    private static Logger logger = LoggerFactory.getLogger(MultiThreadServiceTest.class);

    @Override
    public long count(Object[] args) throws Exception {
        return 100L;
    }

    @Override
    public Object prepare(Object[] args) throws Exception {
        return "success";
    }

    @Override
    public Object invoke(PartitionElements elements) throws Exception {
        List<Object> list = new ArrayList<>((int) elements.getBatchCounts());
        for (int i = 0; i < elements.getIndex(); i++) {
            list.add("test_" + i);
        }
        return list;
    }

    @Override
    public void post(PartitionElements elements, Object object) throws Exception {
        String insertsql = "insert into test (id) values ";
        StringBuilder sb = new StringBuilder();
        List<Object> datas = (List<Object>) elements.getData();
        for (int i = 0; i < datas.size(); i++) {
            if ((i + 1) % 5 == 0 || (i + 1) == datas.size()) {
                sb.append("('" + datas.get(i) + "')");
                logger.info("{}: 测试insert sql: {}", elements, insertSql + sb.toString());
                sb = new StringBuilder();
            } else {
                sb.append("('" + datas.get(i) + "'),");
            }
        }
    }

    @Override
    public Object finished(Object object) throws Exception {
        return object;
    }

    @Test
    public void testBatchExecute() {
        try {
            Object object = MultiThreadOperationUtils.batchExecute(new MultiThreadServiceTest(), 10, new Object[]{"test"});
            logger.info("测试完成: {}", object.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

总结:这是一个抽象之后的多线程业务流程处理方式,已在生产环境使用,多线程的重点在业务分割和思想上,有清晰的责任划分。

到此这篇关于java项目中的多线程实践的文章就介绍到这了,更多相关java多线程实践内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: java项目中的多线程实践记录

本文链接: https://www.lsjlt.com/news/157209.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • java项目中的多线程实践记录
    项目开发中对于一些数据的处理需要用到多线程,比如文件的批量上传,数据库的分批写入,大文件的分段下载等。 通常会使用spring自带的线程池处理,做到对线程的定制化处理和更好的可控,建...
    99+
    2022-11-12
  • java项目中的多线程实践分析
    本篇内容主要讲解“java项目中的多线程实践分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“java项目中的多线程实践分析”吧!项目开发中对于一些数据的处理需要用到多线程,比如文件的批量上传,...
    99+
    2023-06-25
  • Java项目中的日志记录:使用NPM包和Numy的最佳实践。
    Java项目中的日志记录:使用NPM包和Numy的最佳实践 在现代的软件开发中,日志记录是至关重要的一步。它不仅可以帮助开发人员在开发过程中调试代码,还可以在应用程序出现问题时帮助运维人员快速定位问题。在Java项目中,使用NPM包和Num...
    99+
    2023-10-07
    日志 npm numy
  • Java协程编程之Loom项目实战记录
    目录前提Loom项目简单介绍Virtual Thread使用小结前提 之前很长一段时间关注JDK协程库的开发进度,但是前一段时间比较忙很少去查看OpenJDK官网的内容。Java协程...
    99+
    2022-11-12
  • Java实现Token登录验证的项目实践
    目录一、JWT是什么?二、使用步骤1.项目结构2.相关依赖3.数据库4.相关代码三、测试结果一、JWT是什么? 在介绍JWT之前,我们先来回顾一下利用token进行用户身份验证的流程...
    99+
    2023-03-19
    Java Token登录验证 Java 登录验证
  • 记一次Maven项目改造成SpringBoot项目的过程实践
    目录背景概述过程加依赖修改main方法所在类引入数据库依赖启动类上加入mapper扫描添加application.ymlsqlSessionFactory空指针异常分析改造Mybat...
    99+
    2022-11-13
  • 深入浅析Java项目中的多线程
    这期内容当中小编将会给大家带来有关深入浅析Java项目中的多线程,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。Java多线程实例 3种实现方法Java中的多线程有三种实现方式:1.继承Thread类,重写...
    99+
    2023-05-31
    java 多线程 ava
  • react+ts实现简单jira项目的最佳实践记录
    练手的一套项目 react+ts 虽然内容较少,但是干货挺多,尤其是对hooks的封装,ts的泛型的理解,使用更上一层楼 项目代码:https://gitee.com/fine509...
    99+
    2022-11-12
  • 在java项目中如何实现同步多线程
    本篇文章给大家分享的是有关在java项目中如何实现同步多线程,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。 java多线程的同步方法实例代码先看一个段有关银行存钱的代...
    99+
    2023-05-31
    java 多线程 同步
  • Java项目中多线程的作用有哪些
    这期内容当中小编将会给大家带来有关Java项目中多线程的作用有哪些,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。1.创建线程在Java中创建线程有两种方法:使用Thread类和使用Runnable接口。在...
    99+
    2023-05-31
    java 多线程
  • SpringBoot项目使用 axis 调用webservice接口的实践记录
    目录序WebService定义个人理解实践webservice 常识一个webservice 接口发布地址往往类似:qq在线接口验证接口为例maven 使用 axis应用依赖(不可缺...
    99+
    2022-11-13
  • 在Java项目中实现多线程并发编程的方法
    在Java项目中实现多线程并发编程的方法?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Java 中的锁通常分为两种:通过关键字 synchronized 获取的...
    99+
    2023-05-31
    并发 java并发 多线程
  • java多线程在项目中怎么应用
    Java多线程在项目中的应用主要有以下几个方面:1. 提高程序的并发性:多线程可以同时处理多个任务,提高程序的并发性,使得程序的执行...
    99+
    2023-09-29
    java
  • 在Java项目中实现多线程的方法有哪些
    在Java项目中实现多线程的方法有哪些?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。Java中,有两种方式可以创建多线程:1 通过继承Thread类,重写Thread的run(...
    99+
    2023-05-31
    java 多线程 ava
  • 多线程并发编程如何在Java项目中实现
    本篇文章为大家展示了多线程并发编程如何在Java项目中实现 ,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。一、多线程操作系统有两个容易混淆的概念,进程和线程。进程:一个计算机程序的运行实例,包含了需...
    99+
    2023-05-31
    java 多线程 并发编程
  • vuex项目中登录状态管理的实践过程
    目录工具: 登录场景: 实践: 场景1思考与实践 场景2思考与实践 总结工具: chorme浏览器安装Vue.js devtools方便调试 登录场景: 页面的导航处或其他地方有...
    99+
    2022-11-12
  • 在Java项目中怎么样实现调度多线程
    在Java项目中怎么样实现调度多线程?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。        方法一:设置线程优先级。...
    99+
    2023-05-31
    java 多线程 ava
  • Java项目中的多线程有哪些关键字
    Java项目中的多线程有哪些关键字?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。一、同步(synchronized)和异步(asynchronized)同步(synchro...
    99+
    2023-05-31
    java 多线程 关键字
  • Java项目中的多线程通信如何利用Socket实现
    这期内容当中小编将会给大家带来有关Java项目中的多线程通信如何利用Socket实现,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。Java Socket实现多线程通信的方法,代码如下:package co...
    99+
    2023-05-31
    java socket 多线程通信
  • java多线程实现同步锁卖票实战项目
    目录同步概念与方法:窗口类:测试类:同步概念与方法: 窗口类: public class Ticket implements Runnable{ int ticket...
    99+
    2023-01-28
    java 同步锁卖票 java 同步锁
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作