iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >Java实现多线程大批量同步数据(分页)
  • 463
分享到

Java实现多线程大批量同步数据(分页)

Python 官方文档:入门教程 => 点击学习

摘要

背景 最近遇到个功能,两个月有300w+的数据,之后还在累加,因一开始该数据就全部存储在Mysql表,现需要展示在页面,还需要关联另一张表的数据,而且产品要求页面的查询条件多达20个

背景

最近遇到个功能,两个月有300w+的数据,之后还在累加,因一开始该数据就全部存储在Mysql表,现需要展示在页面,还需要关联另一张表的数据,而且产品要求页面的查询条件多达20个条件,最终,这个功能卡的要死,基本查不出来数据。

最后是打算把这两张表的数据同时存储到mongoDB中去,以提高查询效率。

一开始同步的时候,采用单线程,循环以分页的模式去同步这两张表数据,结果是…一晚上,只同步了30w数据,特慢!!!

最后,改造了一番,2小时,就成功同步了300w+数据。

以下是主要逻辑。

线程的个数请根据你自己的服务器性能酌情设置。

思路

先通过count查出结果集的总条数,设置每个线程分页查询的条数,通过总条数和单次条数得到线程数量,通过改变limit的下标实现分批查询。

代码实现

package com.GitHub.admin.controller.loans;

import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.github.admin.model.entity.CaseCheckCallRecord;
import com.github.admin.model.entity.duyan.DuyanCallRecordDetail;
import com.github.admin.model.entity.loans.CaseCallRemarkRecord;
import com.github.admin.service.duyan.DuyanCallRecordDetailService;
import com.github.admin.service.loans.CaseCallRemarkRecordService;
import com.github.common.constant.MonGodbConstant;
import com.github.common.util.DingDingMsgSendUtils;
import com.github.common.util.ListUtils;
import com.github.common.util.Response;
import com.github.common.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.WEB.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;


@Slf4j
@RestController
@RequestMapping("/demo")
public class SynchronizeHistoricalDataController implements DisposableBean {

    private ExecutorService executor = Executors.newFixedThreadPool(10, "SynchronizeHistoricalDataController");  //newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

    @Value("${spring.profiles.active}")
    private String profile;
    @Autowired
    private DuyanCallRecordDetailService duyanCallRecordDetailService;
    @Autowired
    private MongoTemplate mongoTemplate;
    @Autowired
    private CaseCallRemarkRecordService caseCallRemarkRecordService;

    
    @GetMapping("/syncHistoryData")
    public Response syncHistoryData(Map<String, Object> params) throws Exception {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    logicHandler(params);
                } catch (Exception e) {
                    log.warn("多线程同步稽查通话记录历史数据才处理异常,errMsg = {}", e);
                    DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,多线程同步稽查通话记录历史数据才处理异常,errMsg = "+e);
                }
            }
        });
        return Response.success("请求成功");
    }

    
    private void logicHandler(Map<String, Object> params) throws Exception {
        
        List<DuyanCallRecordDetail> result = new ArrayList<>();

        
        int count = this.duyanCallRecordDetailService.selectCount(new EntityWrapper<DuyanCallRecordDetail>()
                .eq("is_delete", 0)
                .eq("platfORM_type", 1));
        DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,本次需要同步" + count + "条历史稽查通话记录数据。");

//        int count = 2620266;
        
        int num = 1000;

        
        int times = count / num;
        if (count % num != 0) {
            times = times + 1;
        }

        
        int offset = 0;

        
        List<Callable<List<DuyanCallRecordDetail>>> tasks = new ArrayList<>();
        for (int i = 0; i < times; i++) {
            Callable<List<DuyanCallRecordDetail>> qfe = new ThredQuery(duyanCallRecordDetailService, params, offset, num);
            tasks.add(qfe);
            offset = offset + num;
        }

        
        List<List<Callable<List<DuyanCallRecordDetail>>>> smallList = ListUtils.partition(tasks, 10);
        for (List<Callable<List<DuyanCallRecordDetail>>> callableList : smallList) {
            if (CollectionUtils.isNotEmpty(callableList)) {
//                executor.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        log.info("任务拆分执行开始:线程{}拆分处理开始...", Thread.currentThread().getName());
//
//                        log.info("任务拆分执行结束:线程{}拆分处理开始...", Thread.currentThread().getName());
//                    }
//                });

                try {
                    List<Future<List<DuyanCallRecordDetail>>> futures = executor.invokeAll(callableList);
                    
                    if (futures != null && futures.size() > 0) {
                        for (Future<List<DuyanCallRecordDetail>> future : futures) {
                            List<DuyanCallRecordDetail> duyanCallRecordDetailList = future.get();
                            if (CollectionUtils.isNotEmpty(duyanCallRecordDetailList)){
                                executor.execute(new Runnable() {
                                    @Override
                                    public void run() {
                                        
                                        log.info("异步存储MongoDB开始:线程{}拆分处理开始...", Thread.currentThread().getName());
                                        saveMongoDB(duyanCallRecordDetailList);
                                        log.info("异步存储MongoDB结束:线程{}拆分处理开始...", Thread.currentThread().getName());
                                    }
                                });
                            }
                            //result.addAll(future.get());
                        }
                    }
                } catch (Exception e) {
                    log.warn("任务拆分执行异常,errMsg = {}", e);
                    DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,任务拆分执行异常,errMsg = "+e);
                }
            }
        }
    }

    
    private void saveMongoDB(List<DuyanCallRecordDetail> duyanCallRecordDetailList) {
        for (DuyanCallRecordDetail duyanCallRecordDetail : duyanCallRecordDetailList) {
            
            org.springframework.data.mongodb.core.query.Query query = new org.springframework.data.mongodb.core.query.Query();
            query.addCriteria(Criteria.where("callUuid").is(duyanCallRecordDetail.getCallUuid()));
            List<CaseCheckCallRecord> caseCheckCallRecordList = mongoTemplate.find(query, CaseCheckCallRecord.class, MongodbConstant.CASE_CHECK_CALL_RECORD);
            if (CollectionUtils.isNotEmpty(caseCheckCallRecordList)) {
                log.warn("call_uuid = {}在MongoDB已经存在数据,后面数据将被舍弃...", duyanCallRecordDetail.getCallUuid());
                continue;
            }

            
            CaseCallRemarkRecord caseCallRemarkRecord = this.caseCallRemarkRecordService.selectOne(new EntityWrapper<CaseCallRemarkRecord>()
                    .eq("is_delete", 0)
                    .eq("call_uuid", duyanCallRecordDetail.getCallUuid()));

            CaseCheckCallRecord caseCheckCallRecord = new CaseCheckCallRecord();
            BeanUtils.copyProperties(duyanCallRecordDetail, caseCheckCallRecord);
            //补充
            caseCheckCallRecord.setCollectorUserId(duyanCallRecordDetail.getUserId());
            
            if (caseCallRemarkRecord != null) {
                //补充
                caseCheckCallRecord.setCalleeName(caseCallRemarkRecord.getContactName());            
            }
            log.info("正在存储数据到MongoDB:{}", caseCheckCallRecord.toString());
            this.mongoTemplate.save(caseCheckCallRecord, MongodbConstant.CASE_CHECK_CALL_RECORD);
        }
    }

    @Override
    public void destroy() throws Exception {
        executor.shutdown();
    }
}


class ThredQuery implements Callable<List<DuyanCallRecordDetail>> {
    
    private DuyanCallRecordDetailService myService;
    
    private Map<String, Object> params;

    
    private int offset;
    
    private int num;

    public ThredQuery(DuyanCallRecordDetailService myService, Map<String, Object> params, int offset, int num) {
        this.myService = myService;
        this.params = params;
        this.offset = offset;
        this.num = num;
    }

    @Override
    public List<DuyanCallRecordDetail> call() throws Exception {
        
        List<DuyanCallRecordDetail> duyanCallRecordDetailList = myService.selectList(new EntityWrapper<DuyanCallRecordDetail>()
                .eq("is_delete", 0)
                .eq("platform_type", 1)
                .last("limit "+offset+", "+num));
        return duyanCallRecordDetailList;
    }
}

ListUtils工具

package com.github.common.util;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.util.ArrayList;
import java.util.List;


@Slf4j
public class ListUtils {
    
    
    public static <T> List<T> deepCopy(List<T> src) {
        try {
            ByteArrayOutputStream byteout = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(byteout);
            out.writeObject(src);
            ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());
            ObjectInputStream in = new ObjectInputStream(bytein);
            @SuppressWarnings("unchecked")
            List<T> dest = (List<T>) in.readObject();
            return dest;
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            return null;
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }
    
    public static <T> T objDeepCopy(T src) {
        try {
            ByteArrayOutputStream byteout = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(byteout);
            out.writeObject(src);
            ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());
            ObjectInputStream in = new ObjectInputStream(bytein);
            @SuppressWarnings("unchecked")
            T dest = (T) in.readObject();
            return dest;
        } catch (ClassNotFoundException e) {
            log.error("errMsg = {}", e);
            return null;
        } catch (IOException e) {
            log.error("errMsg = {}", e);
            return null;
        }
    }

    
    public static <T> List<List<T>> averageAssign(List<T> source, int n) {
        List<List<T>> result = new ArrayList<List<T>>();
        int remaider = source.size() % n;  //(先计算出余数)
        int number = source.size() / n;  //然后是商
        int offset = 0;//偏移量
        for (int i = 0; i < n; i++) {
            List<T> value = null;
            if (remaider > 0) {
                value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
                remaider--;
                offset++;
            } else {
                value = source.subList(i * number + offset, (i + 1) * number + offset);
            }
            result.add(value);
        }
        return result;
    }

    
    public static <T> List<List<T>> partition(List<T> list, int size){
        return  Lists.partition(list, size); // 使用guava
    }

    
    public static void main(String[] args) {
        List<Integer> bigList = new ArrayList<>();
        for (int i = 0; i < 101; i++){
            bigList.add(i);
        }
        log.info("bigList长度为:{}", bigList.size());
        log.info("bigList为:{}", bigList);
        List<List<Integer>> smallists = partition(bigList, 20);
        log.info("smallists长度为:{}", smallists.size());
        for (List<Integer> smallist : smallists) {
            log.info("拆分结果:{},长度为:{}", smallist, smallist.size());
        }
    }

}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程网。

--结束END--

本文标题: Java实现多线程大批量同步数据(分页)

本文链接: https://www.lsjlt.com/news/172304.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Java实现多线程大批量同步数据(分页)
    背景 最近遇到个功能,两个月有300w+的数据,之后还在累加,因一开始该数据就全部存储在mysql表,现需要展示在页面,还需要关联另一张表的数据,而且产品要求页面的查询条件多达20个...
    99+
    2022-11-13
    Java同步数据 Java多线程同步数据 Java多线程大批量同步数据
  • Java多线程实现第三方数据同步
    本文实例为大家分享了Java多线程实现第三方数据同步的具体代码,供大家参考,具体内容如下 一、场景 最近的一项开发任务是同步第三方数据,而第三方数据一般有存量数据和增量数据,存量数据...
    99+
    2022-11-13
    Java多线程数据同步 Java第三方数据同步 Java数据同步
  • 怎么用java多线程处理大批量数据
    在Java中使用多线程处理大批量数据可以提高程序的处理效率。下面是一个简单的示例代码,演示了如何使用多线程处理大批量数据: impo...
    99+
    2024-04-02
  • 怎么实现java多线程批量拆分List导入数据库
    本篇内容介绍了“怎么实现java多线程批量拆分List导入数据库”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!直接把list怼进Mysql使...
    99+
    2023-06-25
  • java多线程批量拆分List导入数据库的实现过程
    目录一、前言二、直接把list怼进Mysql三、分组把list导入Mysql中四、多线程分批导入Mysql五、小结一、前言 前两天做了一个导入的功能,导入开始的时候非常慢,导入2w...
    99+
    2024-04-02
  • Java如何实现多线程、线程同步
    这篇文章主要介绍了Java如何实现多线程、线程同步的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Java如何实现多线程、线程同步文章都会有所收获,下面我们一起来看看吧。1 多线程1.1 进程进程:是正在运行的程...
    99+
    2023-06-30
  • 怎么使用java多线程处理大批量数据
    要使用Java多线程处理大批量数据,可以按照以下步骤进行:1. 创建一个线程池:使用Java中的ThreadPoolExecutor...
    99+
    2023-09-26
    java
  • Java使用多线程做批处理(查询大量数据)
    Java使用多线程做批处理(查询大量数据) Java使用多线程做批处理(查询大量数据) Java使用多线程做批处理(查询大量数据)前言背景Java使用多线程的条件操作流程 前言背...
    99+
    2023-09-01
    java jvm 开发语言
  • Java中怎样实现多线程同步
    本篇文章给大家分享的是有关Java中怎样实现多线程同步,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。不同步时的代码Bank.java  package&nb...
    99+
    2023-06-17
  • Java中怎么实现多线程同步
    今天就跟大家聊聊有关Java中怎么实现多线程同步,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。让我们从JVM的角度来看看锁这个概念:在Java程序运行时环境中,JVM需要对两类线程共...
    99+
    2023-06-17
  • 详解Java利用ExecutorService实现同步执行大量线程
    自从java1.5以后,官网就推出了Executor这样一个类,这个类,可以维护我们的大量线程在操作临界资源时的稳定性。先上一段代码吧:TestRunnable.javapublic class TestRunnable implement...
    99+
    2023-05-31
    java executorservice ava
  • Java中怎么利用多线程锁实现数据同步共享
    Java中怎么利用多线程锁实现数据同步共享,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。大多数应用程序要求线程互相通信来同步它们的动作。在Java程序中最简单实现同步的方法就是...
    99+
    2023-06-17
  • Java多线程实现FTP批量上传文件
    本文实例为大家分享了Java多线程实现FTP批量上传文件的具体代码,供大家参考,具体内容如下 1、构建FTP客户端 package cn.com.pingtech.common.ft...
    99+
    2024-04-02
  • Java中怎么同步多线程静态数据
    本篇文章给大家分享的是有关Java中怎么同步多线程静态数据,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。1:直接使用静态同步方法,Java多线程静态数据代码public&nbs...
    99+
    2023-06-17
  • java 多线程处理大量并发数据
    Java中多线程是一种处理数据的常见方式,它可以同时执行多个线程以提高程序的性能和效率。下面是一个使用多线程处理数据的示例代码: public class DataProcessor { pub...
    99+
    2023-09-07
    java 开发语言
  • C++多线程中的线程同步与互斥量实例分析
    本篇内容介绍了“C++多线程中的线程同步与互斥量实例分析”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!线程同步#include &...
    99+
    2023-06-30
  • qt多线程处理大量数据怎么实现
    在Qt中,可以使用QThread来实现多线程处理大量数据。首先,创建一个继承自QThread的自定义线程类,重写其run()函数,将...
    99+
    2023-10-18
    qt
  • Springboot 手动分页查询分批批量插入数据的实现流程
    目录前言业务场景是什么? 正文前言 业务场景是什么?  就是数据库的一批数据,量不少,需要执行同步插入到别的地方。 简单点肯定是一次性查出来,然后循环一个个插入,...
    99+
    2024-04-02
  • SpringBoot用多线程批量导入数据库实现方法
    目录环境原始的for循环入库批量保存操作在批量插入的基础上使用多线程处理多线程入库的事务问题环境 springboot、mybatisPlus、mysql8 mysql8(部署在1核...
    99+
    2023-02-03
    SpringBoot多线程导入数据库 SpringBoot导入数据库 SpringBoot批量导入数据库
  • Java编程中如何实现同步关键字的多线程同步?
    在Java编程中,多线程编程是一个非常常见的问题。由于多线程的特殊性质,会导致线程之间出现数据竞争和并发访问的问题。因此,在多线程编程中,我们需要使用同步机制来保证线程之间的安全性和正确性。Java中的同步机制主要有两种,一种是同步方法,...
    99+
    2023-11-11
    编程算法 同步 关键字
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作