iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >Spring Batch轻量级批处理框架实战
  • 874
分享到

Spring Batch轻量级批处理框架实战

2024-04-02 19:04:59 874人浏览 八月长安

Python 官方文档:入门教程 => 点击学习

摘要

目录1 实战前的理论基础1.1 spring Batch是什么1.2 Spring Batch能做什么1.3 基础架构1.4 核心概念和抽象2 各个组件介绍2.1 Job2.2 St

1 实战前的理论基础

1.1 Spring Batch是什么

Spring Batch 是一个轻量级、全面的批处理框架,旨在支持开发对企业系统日常运营至关重要的强大的批处理应用程序。同时使开发人员在必要时可以轻松访问和利用更先进的企业服务。Spring Batch 不是调度框架,它旨在与调度程序一起工作,而不是取代调度程序。

1.2 Spring Batch能做什么

  • 自动化、复杂的大量信息处理,无需用户交互即可最有效地处理。这些操作通常包括基于时间的事件(例如月末计算、通知或通信)。
  • 定期应用在非常大的数据集上重复处理的复杂业务规则(例如,保险福利确定或费率调整)。
  • 将从内部和外部系统接收的信息集成到记录系统中,这些信息通常需要以事务方式进行格式化、验证和处理。批处理用于每天为企业处理数十亿笔交易。

业务场景:

  • 定期提交批处理
  • 并发批处理:作业的并行处理
  • 分阶段的、企业消息驱动的处理
  • 大规模并行批处理
  • 失败后手动或计划重启
  • 依赖步骤的顺序处理(扩展到工作流驱动的批处理)
  • 部分处理:跳过记录(例如,在回滚时)
  • 整批事务,适用于小批量或现有存储过程/脚本的情况

总之Spring batch可以做的:

  • 数据库、文件或队列中读取大量记录。
  • 以某种方式处理数据。
  • 以修改后的形式写回数据。

1.3 基础架构

在这里插入图片描述

1.4 核心概念和抽象

在这里插入图片描述

核心概念:一个 Job 有一对多的Step,每个步骤都正好有一个 ItemReader、一个ItemProcessor和 一个ItemWriter。需要启动作业(使用 JobLauncher),并且需要存储有关当前运行进程的元数据(在 中 JobRepository)。

2 各个组件介绍

2.1 Job

Job是封装了整个批处理过程的实体。与其他 Spring 项目一样,一个Job与 XML 配置文件或基于 Java 的配置连接在一起。这种配置可以被称为“作业配置”。

在这里插入图片描述

可配置项:

  • 作业的简单名称。
  • Step实例的定义和排序
  • 作业是否可重新启动。

2.2 Step

一个Step是一个域对象,它封装了批处理作业的一个独立的、连续的阶段。因此,每个 Job 完全由一个或多个步骤组成。一个Step包含定义和控制实际批处理所需的所有信息。

在这里插入图片描述

一个StepExecution代表一次尝试执行一个StepStepExecution 每次Step运行时都会创建一个新的,类似于JobExecution

2.3 ExecutionContext

一个ExecutionContext表示由框架持久化和控制的键/值对的集合,以允许开发人员有一个地方来存储范围为StepExecution对象或JobExecution对象的持久状态。

2.4 JobRepository

JobRepository是上述所有 Stereotypes 的持久性机制。它提供了CRUD操作JobLauncherJob以及Step实现。当 Job第一次启动,一个JobExecution被从库中获得,并且,执行的过程中,StepExecutionJobExecution实施方式是通过将它们传递到存储库持续。

使用 Java 配置时,@EnableBatchProcessing注解提供了一个 JobRepository作为开箱即用自动配置的组件之一。

2.5 JobLauncher

JobLauncher表示一个简单的接口,用于Job使用给定的 集合 启动JobParameters,如以下示例所示:


public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

期望实现JobExecution从 中 获得有效JobRepository并执行Job

2.6 Item Reader

ItemReader是一种抽象,表示一次检索Step一个项目的输入。当ItemReader用完它可以提供的项目时,它通过返回来表明这一点null

2.7 Item Writer

ItemWriter是一种抽象,表示一次一个Step、一批或一大块项目的输出。通常, anItemWriter不知道它接下来应该接收的输入,并且只知道在其当前调用中传递的项目。

2.8 Item Processor

ItemProcessor是表示项目的业务处理的抽象。当ItemReader读取一个项目并ItemWriter写入它们时,它 ItemProcessor提供了一个访问点来转换或应用其他业务处理。如果在处理该项目时确定该项目无效,则返回 null表示不应写出该项目。

3 Spring Batch实战

下面就利用我们所学的理论实现一个最简单的Spring Batch批处理项目

3.1 依赖和项目结构以及配置文件

依赖


<!--Spring batch-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- WEB依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- lombok-->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.20</version>
</dependency>
<!--  Mysql-->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
</dependency>
<!--  mybatis-->
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.2.0</version>
</dependency>

项目结构

在这里插入图片描述

配置文件


server.port=9000
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.passWord=12345
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

3.2 代码和数据表

数据表


CREATE TABLE `student` (
    `id` int(100) NOT NULL AUTO_INCREMENT,
    `name` varchar(45) DEFAULT NULL,
    `age` int(2) DEFAULT NULL,
    `address` varchar(45) DEFAULT NULL,
    PRIMARY KEY (`id`),
    UNIQUE KEY `id_UNIQUE` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=203579 DEFAULT CHARSET=utf8 ROW_FORMAT=REDUNDANT

Student实体类



@Data
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
@ToString
@TableName("student")
public class Student {

    @TableId(value = "id", type = IdType.AUTO)
    private Long sId;

    @TableField("name")
    private String sName;

    @TableField("age")
    private Integer sAge;

    @TableField("address")
    private String sAddress;

}

Mapper层



@Mapper
@Repository
public interface StudentDao extends BaseMapper<Student> {
}

模拟数据库(文件)中读取类



public class StudentVirtualDao {

    
    public List<Student> getStudents() {
        ArrayList<Student> students = new ArrayList<>();
        students.add(new Student(1L, "zs", 23, "Beijing"));
        students.add(new Student(2L, "ls", 23, "Beijing"));
        students.add(new Student(3L, "ww", 23, "Beijing"));
        students.add(new Student(4L, "zl", 23, "Beijing"));
        students.add(new Student(5L, "MQ", 23, "Beijing"));
        students.add(new Student(6L, "gb", 23, "Beijing"));
        students.add(new Student(7L, "lj", 23, "Beijing"));
        students.add(new Student(8L, "ss", 23, "Beijing"));
        students.add(new Student(9L, "zsdd", 23, "Beijing"));
        students.add(new Student(10L, "zss", 23, "Beijing"));
        return students;
    }
}

Service层接口



public interface StudentService {

    List<Student> selectStudentsFromDB();

    void insertStudent(Student student);
}

Service层实现类



@Service
public class StudentServiceImpl implements StudentService {

    @Autowired
    private StudentDao studentDao;

    @Override
    public List<Student> selectStudentsFromDB() {
        return studentDao.selectList(null);
    }

    @Override
    public void insertStudent(Student student) {
        studentDao.insert(student);
    }
}

最核心的配置类BatchConfiguration



@Configuration
@EnableBatchProcessing
@SuppressWarnings("all")
public class BatchConfiguration {

    
    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    
    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    
    @Autowired
    public JobRepository jobRepository;

    
    @Autowired
    private JobLauncher jobLauncher;

    
    @Autowired
    private StudentService studentService;

    
    @Autowired
    private Job studentJob;

    
    @Bean
    public ItemWriter<Student> writer() {
        ItemWriter<Student> writer = new ItemWriter() {
            @Override
            public void write(List list) throws Exception {
                //debug发现是嵌套的List reader的线程List嵌套真正的List
                list.forEach((stu) -> {
                    for (Student student : (ArrayList<Student>) stu) {
                        studentService.insertStudent(student);
                    }
                });
            }
        };
        return writer;
    }

    
    @Bean
    public ItemReader<Student> reader() {
        ItemReader<Student> reader = new ItemReader() {
            @Override
            public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                //模拟数据获取
                StudentVirtualDao virtualDao = new StudentVirtualDao();
                return virtualDao.getStudents();
            }
        };
        return reader;
    }

    
    @Bean
    public ItemProcessor processor() {
        ItemProcessor processor = new ItemProcessor() {
            @Override
            public Object process(Object o) throws Exception {
                //debug发现o就是reader单次单线程读取的数据
                return o;
            }
        };
        return processor;
    }

    
    @Bean
    public Step studentStepOne() {
        return stepBuilderFactory.get("studentStepOne")
            .chunk(1)
            .reader(reader()) //加入reader
            .processor(processor())  //加入processor
            .writer(writer())//加入writer
            .build();
    }

    
    @Bean
    public Job studentJob() {
        return jobBuilderFactory.get("studentJob")
            .flow(studentStepOne())//加入step
            .end()
            .build();
    }


    
    @Scheduled(fixedRate = 5000)
    public void printMessage() {
        try {
            JobParameters jobParameters = new JobParametersBuilder()
                .addLong("time", System.currentTimeMillis())
                .toJobParameters();
            jobLauncher.run(studentJob, jobParameters);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3.3 测试

在这里插入图片描述

项目启动1s之后

在这里插入图片描述

看数据库,除了我们实体类定义的表以外多出来这么多表,这些表都是spring batch自带的记录日志和错误的表,具体的字段含义的有待研究

在这里插入图片描述

4 实战后的总结

Spring Batch有非常快的写入和读取速度,但是带来的影响就是非常耗费内存和数据库连接池的资源如果使用不好的话还会发生异常,因此我们要进行正确的配置,接下来我们进行简单的源码探究:

4.1 JobBuilderFactory

job的获取使用了简单工厂模式和建造者模式JobBuilderFactory获取JobBuilder在经过配置返回一个job对象的实例,该实例就是Spring Batch中最顶级的组件,包含了n和step


public class JobBuilderFactory {

   private JobRepository jobRepository;

   public JobBuilderFactory(JobRepository jobRepository) {
      this.jobRepository = jobRepository;
   }
   //返回JobBuilder
   public JobBuilder get(String name) {
      JobBuilder builder = new JobBuilder(name).repository(jobRepository);
      return builder;
   }
}

jobBuilder类


public class JobBuilder extends JobBuilderHelper<JobBuilder> {

   
   public JobBuilder(String name) {
      super(name);
   }

   
   public SimpleJobBuilder start(Step step) {
      return new SimpleJobBuilder(this).start(step);
   }

   
   public JobFlowBuilder start(Flow flow) {
      return new FlowJobBuilder(this).start(flow);
   }

   
   public JobFlowBuilder flow(Step step) {
      return new FlowJobBuilder(this).start(step);
   }
}

4.2 StepBuilderFactory

直接看StepBuilder类


public class StepBuilder extends StepBuilderHelper<StepBuilder> {

   public StepBuilder(String name) {
      super(name);
   }

   
   public TaskletStepBuilder tasklet(Tasklet tasklet) {
      return new TaskletStepBuilder(this).tasklet(tasklet);
   }

   
   public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize) {
      return new SimpleStepBuilder<I, O>(this).chunk(chunkSize);
   }

   public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
      return new SimpleStepBuilder<I, O>(this).chunk(completionPolicy);
   }

   public PartitionStepBuilder partitioner(String stepName, Partitioner partitioner) {
      return new PartitionStepBuilder(this).partitioner(stepName, partitioner);
   }

   public PartitionStepBuilder partitioner(Step step) {
      return new PartitionStepBuilder(this).step(step);
   }

   public JobStepBuilder job(Job job) {
      return new JobStepBuilder(this).job(job);
   }

   
   public FlowStepBuilder flow(Flow flow) {
      return new FlowStepBuilder(this).flow(flow);
   }
}

参考文档:

https://docs.spring.io/spring-batch/docs/4.3.x/reference/html/index.html

Https://www.jdon.com/springbatch.html

到此这篇关于Spring Batch轻量级批处理框架实战的文章就介绍到这了,更多相关Spring Batch批处理内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: Spring Batch轻量级批处理框架实战

本文链接: https://www.lsjlt.com/news/154714.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Spring Batch轻量级批处理框架实战
    目录1 实战前的理论基础1.1 Spring Batch是什么1.2 Spring Batch能做什么1.3 基础架构1.4 核心概念和抽象2 各个组件介绍2.1 Job2.2 St...
    99+
    2024-04-02
  • Spring batch批处理框架
    spring batch框架的简介批处理任务是大多数IT项目的一个重要组成部分,批处理在业务系统中负责处理海量的数据,无须人工干预就能够自动高效的进行复杂的数据分析和处理。批处理会定期读入批量数据,经过相应的业务处理进行归档的业务操作,批处...
    99+
    2023-05-31
    spring batch 处理框架
  • 详解批处理框架之Spring Batch
    目录一、Spring Batch的概念知识1.1、分层架构1.2、关键概念1.2.1、JobRepository1.2.2、任务启动器JobLauncher1.2.3、任务Job1....
    99+
    2024-04-02
  • 批处理框架Spring Batch有什么用
    这篇文章给大家分享的是有关批处理框架Spring Batch有什么用的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。什么是批处理在现代企业应用当中,面对复杂的业务以及海量的数据,除了通过庞杂的人机交互界面进行各种处...
    99+
    2023-06-05
  • 如何使用Spring Batch批处理框架
    这篇文章主要讲解了“如何使用Spring Batch批处理框架”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何使用Spring Batch批处理框架”吧!1 前言Spring Batch是...
    99+
    2023-06-16
  • Spring Batch批处理框架操作指南
    目录简介Spring Batch 架构Spring Batch 核心概念什么是 Job什么是 JobInstance什么是 JobParameters什么是 JobExecution...
    99+
    2024-04-02
  • python Web开发 flask轻量级Web框架实战项目--学生管理系统
     上次发的一篇文章,有很多朋友私信我要后面的部分,那咱们就今天来一起学习一下吧,因为我的数据库这门课选中的课题是学生管理系统,所以今天就以这个课题为例子,从0到1去实现一个管理系统。数据库设计部分我会专门出一个博客的,敬请期待吧~~~ ...
    99+
    2023-09-03
    flask python 后端
  • 轻量级ORM框架Dapper应用之实现DTO
    一、什么是DTO 先来看看百度百科的解释: 数据传输对象(DTO)(Data Transfer Object),是一种设计模式之间传输数据的软件应用系统。数据传输目标往往是数据访问对...
    99+
    2024-04-02
  • 轻量级ORM框架Dapper应用之实现CURD操作
    在上一篇文章中,讲解了如何安装Dapper,这篇文章中将会讲解如何使用Dapper使用CURD操作。 例子中使用到的实体类定义如下: using System; using Syst...
    99+
    2024-04-02
  • 轻量级ORM框架Dapper应用之实现In操作
    IN 操作符允许我们在 WHERE 子句中规定多个值。 本篇文章中,还是使用和上篇文章中同样的实体类和数据库,Dapper使用in操作符的代码如下: using System...
    99+
    2024-04-02
  • 轻量级ORM框架Dapper应用之实现Join操作
    在这篇文章中,讲解如何使用Dapper使用Inner join的操作 1、新创建两张表:Users表和Product表 Users表定义如下: CREATE TABLE [dbo]....
    99+
    2024-04-02
  • 轻量级ORM框架Dapper应用实现In操作的方法
    本篇内容介绍了“轻量级ORM框架Dapper应用实现In操作的方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!Dapper使用in操作符的...
    99+
    2023-06-29
  • 如何理解轻量级的接口配置建模框架Midway-ModelProxy
    如何理解轻量级的接口配置建模框架Midway-ModelProxy,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。前言使用Node做前后端分...
    99+
    2024-04-02
  • 实时处理Python数组,Spring框架能否胜任?
    Python语言因其简单易学、功能强大等特点而备受开发者的喜爱。而在Python中,数组是一种非常常见的数据类型,其具有存储多个相同类型数据的能力,是编程中经常使用的一种数据结构。但是,当我们需要实时处理大量的Python数组数据时,我们...
    99+
    2023-07-04
    数组 实时 spring
  • Spring框架中的自然语言处理:从原理到实践。
    Spring框架中的自然语言处理:从原理到实践 自然语言处理(Natural Language Processing,简称NLP)是人工智能领域中的一个重要分支,它涉及到语言学、计算机科学和人工智能等多个领域,主要目的是使计算机能够理解、分...
    99+
    2023-08-01
    linux 自然语言处理 spring
  • 使用spring框架实现数据库事务处理方式
    目录使用spring框架实现数据库事务处理JDBC对数据库事务处理的支持JDBC定义了五种事务隔离级别来解决这些并发导致的问题在spring框架中调用一个数据库事务处理分三步走:sp...
    99+
    2024-04-02
  • Spring 框架中如何利用编程算法处理大量数组数据?
    随着互联网时代的到来,数据成为了一个巨大的挑战。大量的数据需要被处理,分析和存储。在这个过程中,算法成为了一个关键的因素。编程算法是一种用于解决各种复杂问题的技术,包括大量数据的处理。在 Spring 框架中,我们可以利用编程算法来处理大...
    99+
    2023-08-02
    编程算法 spring 数组
  • PHP容器技术能否实现Spring框架的实时数据处理?
    随着大数据时代的到来,实时数据处理成为了越来越多企业和开发者的需求,而Spring框架则是Java语言中最为流行的框架之一,它在实时数据处理方面有着得天独厚的优势。那么,PHP容器技术是否能够实现Spring框架的实时数据处理呢?本文将从...
    99+
    2023-09-12
    容器 实时 spring
  • 响应式自然语言处理:Java Spring框架的最佳实践
    自然语言处理(NLP)是一种计算机科学领域,它研究如何让计算机理解和处理人类语言。响应式自然语言处理则是一种新的NLP技术,它可以在实时情况下处理自然语言,并且能够快速地响应用户的输入。 在本文中,我们将介绍如何使用Java Spring...
    99+
    2023-08-19
    spring 自然语言处理 响应
  • 实时数据处理需求下,PHP容器与Spring框架的对比分析
    随着互联网技术的迅速发展,实时数据处理需求越来越成为企业关注的重点。而在实现实时数据处理时,选择合适的框架和容器至关重要。本文将对PHP容器和Spring框架进行对比分析,为大家提供更好的选择依据。 一、PHP容器 1.1 介绍 PHP容器...
    99+
    2023-09-12
    容器 实时 spring
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作