相关文章推荐
刚毅的火柴  ·  搜索结果·  2 月前    · 
绅士的匕首  ·  欧美乐队| 魔力红 酷玩 梦龙 林肯公园 ...·  10 月前    · 
飘逸的萝卜  ·  卢红同志一行赴浙江学习调研统战工作·  11 月前    · 
求醉的胡萝卜  ·  山东东明石化集团总裁李治一行来院招聘毕业生·  1 年前    · 
耍酷的咖啡豆  ·  从零开始的末世生活漫画免费 - ...·  2 年前    · 
Code  ›  SpringBatch读单个文件(FlatFileItemReader)和写单个文件(FlatFileItemWriter)(一)_境里婆娑的博客
exists
https://blog.csdn.net/TreeShu321/article/details/100600915
慷慨大方的薯片
2 年前
    • 一、初始化springbatch表
    • 二、抽取公共读文件reader
    • 三、抽取公共处理的process
    • 四、公共写文件 writer
    • 五、一个简单的job
      • 1、创建config类
      • 2、创建process
      • 3、公共constants
    • 六、运行job

    前言:springbatch是一个轻量级的批处理框架,在企业级应用中,我们常见一些批处理业务场景,借助Spring
    Batch我们可以很方便的开发出健壮、易用的批处理应用
    。

    目前我做的项目的应用场景是数据迁移,顾名思义就是把老系统的数据迁移到新系统。

    代码已上传GitHub上面地址:https://github.com/FadeHub/spring-boot-learn/tree/master/spring-boot-springbatch

    SpringBatch其它文章直通车:

    SpringBatch读单个文件(FlatFileItemReader)和写单个文件(FlatFileItemWriter)(一)
    SpringBatch顺序读取多文件(MultiResourceItemReader)和顺序写文件(MultiResourceItemWriter)(二)
    SpringBatch读数据库(MyBatisPagingItemReader)(三)
    SpringBatch读文件(FlatFileItemReader)写据库(MyBatisBatchItemWriter)(四)
    SpringBatch 监听器之Job监听器(JobExecutionListener)和Step监听器(StepExecutionListener)(五)
    SpringBatch 监听器之Chunk监听器(ChunkListener)和Skip监听器(SkipListener)(六)
    现在我们做一个简单的例子读文件和输出文件,看看Springbatch的魅力所在。

    一、初始化springbatch表

    -- Autogenerated: do not edit this file
    DROP TABLE IF EXISTS BATCH_STEP_EXECUTION_CONTEXT ;
    DROP TABLE IF EXISTS BATCH_JOB_EXECUTION_CONTEXT ;
    DROP TABLE IF EXISTS BATCH_STEP_EXECUTION ;
    DROP TABLE IF EXISTS BATCH_JOB_EXECUTION_PARAMS ;
    DROP TABLE IF EXISTS BATCH_JOB_EXECUTION ;
    DROP TABLE IF EXISTS BATCH_JOB_INSTANCE ;
    DROP TABLE IF EXISTS BATCH_STEP_EXECUTION_SEQ ;
    DROP TABLE IF EXISTS BATCH_JOB_EXECUTION_SEQ ;
    DROP TABLE IF EXISTS BATCH_JOB_SEQ ;
    -- Autogenerated: do not edit this file
    CREATE TABLE BATCH_JOB_INSTANCE  (
    	JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY ,
    	VERSION BIGINT ,
    	JOB_NAME VARCHAR(100) NOT NULL,
    	JOB_KEY VARCHAR(32) NOT NULL,
    	constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
    ) ENGINE=InnoDB;
    CREATE TABLE BATCH_JOB_EXECUTION  (
    	JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
    	VERSION BIGINT  ,
    	JOB_INSTANCE_ID BIGINT NOT NULL,
    	CREATE_TIME DATETIME NOT NULL,
    	START_TIME DATETIME DEFAULT NULL ,
    	END_TIME DATETIME DEFAULT NULL ,
    	STATUS VARCHAR(10) ,
    	EXIT_CODE VARCHAR(2500) ,
    	EXIT_MESSAGE VARCHAR(2500) ,
    	LAST_UPDATED DATETIME,
    	JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
    	constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
    	references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
    ) ENGINE=InnoDB;
    CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
    	JOB_EXECUTION_ID BIGINT NOT NULL ,
    	TYPE_CD VARCHAR(6) NOT NULL ,
    	KEY_NAME VARCHAR(100) NOT NULL ,
    	STRING_VAL VARCHAR(250) ,
    	DATE_VAL DATETIME DEFAULT NULL ,
    	LONG_VAL BIGINT ,
    	DOUBLE_VAL DOUBLE PRECISION ,
    	IDENTIFYING CHAR(1) NOT NULL ,
    	constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
    	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
    ) ENGINE=InnoDB;
    CREATE TABLE BATCH_STEP_EXECUTION  (
    	STEP_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY ,
    	VERSION BIGINT NOT NULL,
    	STEP_NAME VARCHAR(100) NOT NULL,
    	JOB_EXECUTION_ID BIGINT NOT NULL,
    	START_TIME DATETIME NOT NULL ,
    	END_TIME DATETIME DEFAULT NULL ,
    	STATUS VARCHAR(10) ,
    	COMMIT_COUNT BIGINT ,
    	READ_COUNT BIGINT ,
    	FILTER_COUNT BIGINT ,
    	WRITE_COUNT BIGINT ,
    	READ_SKIP_COUNT BIGINT ,
    	WRITE_SKIP_COUNT BIGINT ,
    	PROCESS_SKIP_COUNT BIGINT ,
    	ROLLBACK_COUNT BIGINT ,
    	EXIT_CODE VARCHAR(2500) ,
    	EXIT_MESSAGE VARCHAR(2500) ,
    	LAST_UPDATED DATETIME,
    	constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
    	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
    ) ENGINE=InnoDB;
    CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT  (
    	STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
    	SHORT_CONTEXT VARCHAR(2500) NOT NULL,
    	SERIALIZED_CONTEXT TEXT ,
    	constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
    	references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
    ) ENGINE=InnoDB;
    CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
    	JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
    	SHORT_CONTEXT VARCHAR(2500) NOT NULL,
    	SERIALIZED_CONTEXT TEXT ,
    	constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
    	references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
    ) ENGINE=InnoDB;
    CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
    	ID BIGINT NOT NULL,
    	UNIQUE_KEY CHAR(1) NOT NULL,
    	constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
    ) ENGINE=InnoDB;
    INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);
    CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
    	ID BIGINT NOT NULL,
    	UNIQUE_KEY CHAR(1) NOT NULL,
    	constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
    ) ENGINE=InnoDB;
    INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);
    CREATE TABLE BATCH_JOB_SEQ (
    	ID BIGINT NOT NULL,
    	UNIQUE_KEY CHAR(1) NOT NULL,
    	constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
    ) ENGINE=InnoDB;
    INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);
    

    二、抽取公共读文件reader

    公共reader继承FlatFileItemReader读文件类。

    package com.sl.common;
    import com.sl.common.CommonConstants;
    import org.springframework.batch.item.file.FlatFileItemReader;
    import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
    import org.springframework.batch.item.file.mapping.DefaultLineMapper;
    import org.springframework.batch.item.file.transform.DefaultFieldSetFactory;
    import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
    import org.springframework.core.io.FileSystemResource;
    import java.lang.reflect.Field;
    import java.lang.reflect.Modifier;
    import java.util.ArrayList;
    import java.util.List;
     * 公共读方法
     * @author shuliangzhao
     * @Title: CommonItemReader
     * @ProjectName spring-boot-learn
     * @Description: TODO
     * @date 2019/9/7 16:30
    public class CommonFileItemReader<T> extends FlatFileItemReader<T> {
         private FileSystemResource fileSystemResource;
         public CommonFileItemReader(Class clz) {
             setEncoding(CommonConstants.ENCODING_READ);
             fileSystemResource = new FileSystemResource("D:\\aplus\\shuqian\\target\\"+clz.getSimpleName()+".csv");
             setResource(fileSystemResource);
             DefaultLineMapper defaultLineMapper = new DefaultLineMapper();
             DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
             delimitedLineTokenizer.setFieldSetFactory(new DefaultFieldSetFactory());
             Field[] fields = clz.getDeclaredFields();
             List<String> list = new ArrayList<>();
             for (Field field : fields) {
                 if (!Modifier.isStatic(field.getModifiers())) {
                      list.add(field.getName());
             String[] names = new String[list.size()];
             delimitedLineTokenizer.setNames(list.toArray(names));
             delimitedLineTokenizer.setDelimiter(",");
             defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
             BeanWrapperFieldSetMapper fieldSetMapper = new BeanWrapperFieldSetMapper();
             fieldSetMapper.setTargetType(clz);
             defaultLineMapper.setFieldSetMapper(fieldSetMapper);
             setLineMapper(defaultLineMapper);
    

    三、抽取公共处理的process

    公共的process继承ItemProcessor。

    package com.sl.common;
    import org.springframework.batch.item.ItemProcessor;
    import javax.annotation.PostConstruct;
    import java.lang.reflect.ParameterizedType;
     * 公共处理
     * @author shuliangzhao
     * @Title: CommonProcessor
     * @ProjectName spring-boot-learn
     * @Description: TODO
     * @date 2019/9/7 16:39
    public abstract class CommonProcessor<I,O> implements ItemProcessor<I,O> {
        private Class<I> input;
        private Class<O> output;
        @PostConstruct
        public void init() {
            input = (Class<I>)((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0];
            output = (Class<O>)((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[1];
        @Override
        public O process(I i) throws Exception {
            O o = output.newInstance();
            processor(o,i);
            return o;
        public abstract void processor(O o, I i);
    

    四、公共写文件 writer

    公共的写继承ItemWriter。

    package com.sl.common;
    import com.sl.common.CommonConstants;
    import org.springframework.batch.item.file.FlatFileItemWriter;
    import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
    import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
    import org.springframework.core.io.FileSystemResource;
    import java.lang.reflect.Field;
    import java.lang.reflect.Modifier;
    import java.util.ArrayList;
    import java.util.List;
     * 公共写
     * @author shuliangzhao
     * @Title: CommonFileItemWriter
     * @ProjectName spring-boot-learn
     * @Description: TODO
     * @date 2019/9/7 16:48
    public class CommonFileItemWriter<T> extends FlatFileItemWriter<T> {
        private FileSystemResource fileSystemResource;
        public CommonFileItemWriter(Class clz) {
            BeanWrapperFieldExtractor beanWrapperFieldExtractor = new BeanWrapperFieldExtractor();
            Field[] fields = clz.getDeclaredFields();
            List<String> list = new ArrayList<>();
            for (java.lang.reflect.Field field : fields) {
                if (!Modifier.isStatic(field.getModifiers())) {
                    list.add(field.getName());
            String[] names = new String[list.size()];
            beanWrapperFieldExtractor.setNames(list.toArray(names));
            beanWrapperFieldExtractor.afterPropertiesSet();
            DelimitedLineAggregator lineAggregator = new DelimitedLineAggregator();
            lineAggregator.setDelimiter(",");
            lineAggregator.setFieldExtractor(beanWrapperFieldExtractor);
            setName(clz.getSimpleName());
            setEncoding(CommonConstants.ENCODING_READ);
            fileSystemResource = new FileSystemResource("D:\\aplus\\shuqian\\source\\"+ clz.getSimpleName() + ".csv");
            setResource(fileSystemResource);
            setLineAggregator(lineAggregator);
    

    五、一个简单的job

    1、创建config类

    package com.sl.config;
    import com.sl.common.CommonFileItemWriter;
    import com.sl.entity.People;
    import com.sl.entity.Student;
    import com.sl.common.CommonFileItemReader;
    import com.sl.processor.StudentProcessor;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepScope;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
     * @author shuliangzhao
     * @Title: UserConfiguration
     * @ProjectName spring-boot-learn
     * @Description: TODO
     * @date 2019/9/7 17:06
    @Configuration
    @EnableBatchProcessing
    public class StudentConfiguration {
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
        @Autowired
        private StudentProcessor studentProcessor;
        @Bean
        public Job studentJob() {
             return jobBuilderFactory.get("studentJob")
                     .start(studentStep())
                     .build();
        @Bean
        public Step studentStep() {
            return stepBuilderFactory.get("studentStep")
                    .<People, Student>chunk(10)
                    .reader(peopleCommonFileItemReader())
                    .processor(studentProcessor)
                    .writer(studentCommonFileItemWriter())
                    .build();
        @Bean
        @StepScope
        public CommonFileItemReader<People> peopleCommonFileItemReader() {
            return new CommonFileItemReader<>(People.class);
        public CommonFileItemWriter<Student> studentCommonFileItemWriter() {
            return new CommonFileItemWriter<>(Student.class);
    

    2、创建process

    package com.sl.processor;
    import com.sl.common.CommonProcessor;
    import com.sl.entity.People;
    import com.sl.entity.Student;
    import org.springframework.batch.core.configuration.annotation.StepScope;
    import org.springframework.stereotype.Component;
     * @author shuliangzhao
     * @Title: PeopleProcessor
     * @ProjectName spring-boot-learn
     * @Description: TODO
     * @date 2019/9/7 17:12
    @Component
    @StepScope
    public class StudentProcessor extends CommonProcessor<People, Student> {
        @Override
        public void processor(Student o, People people) {
           o.setName(people.getName());
           o.setAddress(people.getAdress());
           o.setAge(people.getAge());
           o.setIdCar(people.getIdCard());
    

    3、公共constants

    package com.sl.common;
     * @author shuliangzhao
     * @Title: CommonConstants
     * @ProjectName spring-boot-learn
     * @Description: TODO
     * @date 2019/9/7 16:34
    public class CommonConstants {
        public static final String ENCODING_READ = "GBK";
    

    六、运行job

    package com.sl;
    import org.springframework.batch.core.*;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
    import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
    import org.springframework.batch.core.repository.JobRestartException;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ConfigurableApplicationContext;
    import java.util.Date;
    @SpringBootApplication
    public class Application {
        public static void main(String[] args) throws Exception {
            ApplicationContext run = SpringApplication.run(Application.class, args);
            run(run);
        private static void run(ApplicationContext ctx) throws Exception{
            JobLauncher jobLauncher = ctx.getBean(JobLauncher.class);
            JobParameters jobParameters = new JobParametersBuilder().addDate("test", new Date()).toJobParameters();
            JobExecution studentJob = jobLauncher.run(ctx.getBean("studentJob", Job.class), jobParameters);
    

    运行完job就可以看到在指定目录已经把文件写入。以上代码比较多,基础理论可以通过阅读springbatch官方文档。。

    后面章节会介绍springbatch的其他功能,比如读表入库、skip机制、异常重跑等功能点。

    文章目录一、初始化springbatch表二、抽取公共读文件reader三、抽取公共处理的process四、公共写文件 writer五、一个简单的job1、创建config类2、创建process3、公共constants六、运行job前言:springbatch是一个轻量级的批处理框架,在企业级应用中,我们常见一些批处理业务场景,借助SpringBatch我们可以很方便的开发出健壮、易用的...
    文章目录一、抽取顺序读取数据库公共Writer二、mapper配置文件三、写数据库的job四、继承公共CommonProcesor五、执行job 前言:我们在日常开发中可能会遇到写数据库,SpringBatch封装的也有写数据库的Writer,但是我们这次做实验用的是Mybatis封装的MyBatisBatchItemWriter 代码已上传GitHub上面地址:https://github...
    对于read读取数据时是一个item为单位的循环读取,而对于writer写入数据则是以chunk为单位,一块一块的进行写入 先写一个Job 和 ItermReader作为例子 @Configuration public class DbOutputDemoJobConfiguration { @Autowired public JobBuilderFac...
    FlatFileItemReader 的主要作用就是读入一个文件,返回一组bean,他要配置的最主要的组件: 【1】 resource 文件在什么地方 【2】 lineMapper 即 这个组件将文件的每一行组装成一个bean。 其中lineMapper 又是由 lineTokenizer 和 fieldSetMapper 【1】 lineTokenizer 作用是将文件的每一行分...
    FlatFileItemReader实现ItemReader接口,核心作用将Flat文件中的记录转换为Java对象。 一、FlatFileItemReader中关键属性 bufferedReaderFactory BufferedReaderFactory 根据给定的resource创建BufferReader实例,默认使用DefaultBufferedReaderFactory创建文本类型的BufferReader实例。 comments
    使用SpringBatch-FlatFileItemReader读取文件时,跳过文件中的空行的实现 创建一个JYKSimpleRecordSeparatorPolicy类,继承SimpleRecordSeparatorPolicy 这个类的作用是增强SimpleRecordSeparatorPolicy的作用,因为原本默认Spring框架是使用的SimpleRecordSeparatorPoli...
    本文通过一个完整的实例,运用Spring Batch对FlatFile进行读写操作。此实例的流程是:读取一个含有4个字段的FlatFile(ID,Name,Age,Score),对读取的字段做简单的处理,然后输出到另外一个FlatFile中。 工程结构如下图: JobLaunch类用来启动Job,CSVItemProcessor类用来对Reader取得的数据进行处理,Student类是一个
    前序文章陆续介绍了批处理的基本概念,Job使用、Step控制、Item的结构以及扁平文件的读写。本文将接着前面的内容说明数据库如何进行批处理读写。 数据读取 数据库是绝大部分系统要用到的数据存储工具,因此针对数据库执行批量数据处理任务也是很常见的需求。数据的批量处理与常规业务开发不同,如果一次性读取百万条,对于任何系统而言肯定都是不可取的。为了解决这个问题Spring Batch提供了2套数据读取方案: 基于游标读取数据 基于分页读取数据 游标读取数据 对于有经验大数据工程师而言数据库游标的操作应
    前面关于Spring Batch的文章,讲述了SpringBatch对Flat、XML等文件的读写操作,本文将和大家一起讨论Spring Batch对DB的读写操作。Spring Batch对DB数据的读取操作提供两种形式,一种是以游标为基础,一条条的读取数据;另外一种是分页的方式读取DB。 通过前面文章的讲解,大家应该对SpringBatch的框架和基本配置有了一定的了...
    如果您在for循环中获得了文件路径,那么您可以使用Python内置的`open`函数来读取单个文件。您可以将文件路径作为参数传递给`open`函数,然后指定文件的访问模式。例如,如果您想要以只读模式打开文件,可以这样写: with open(file_path, 'r') as f: # 在这里执行读取文件的操作 在`with`语句块中,您可以执行读取文件的操作。例如,您可以使用`read`方法读取整个文件的内容,也可以使用`readline`方法读取文件的一行,等等。完成文件读取后,Python会自动关闭文件句柄,您无需手动关闭。 如果您需要对多个文件进行操作,可以将上述代码放入for循环中,遍历所有的文件路径,逐个读取文件。
    NavicatPremium连接MySQL出现异常Authentication plugin ‘caching_sha2_password‘ cannot be loaded的解决方案 yyylll_bbn: 这个代码输完密码改成什么啦,现在密码都不对啦表情包表情包表情包表情包 详解mybatis的配置setMapperLocations多个路径两种方法 您好请问下如果我的mybatis是注解开发的也就是没有mapper的xml文件,那在mapperlocation这一步要怎么写呢,不写的话貌似会报错 全网最详细SpringBatch读(Reader)混合文件讲解 nijianghu: 是什么组件呢 Java中如何将List拆分为多个小list集合 m0_67534743: 传入的数字不能被整除怎么搞 MyBatis中的@Mapper注解使用 Yummyyyyh: 我今天试的时候,不加@Mapper确实可以,没报错
 
推荐文章
刚毅的火柴  ·  搜索结果
2 月前
绅士的匕首  ·  欧美乐队| 魔力红 酷玩 梦龙 林肯公园 - 歌单 - 网易云音乐
10 月前
飘逸的萝卜  ·  卢红同志一行赴浙江学习调研统战工作
11 月前
求醉的胡萝卜  ·  山东东明石化集团总裁李治一行来院招聘毕业生
1 年前
耍酷的咖啡豆  ·  从零开始的末世生活漫画免费 - 从零开始的末世生活漫画 - 漫画在线全集免费阅读 - 腾讯动漫
2 年前
今天看啥   ·   Py中国   ·   codingpro   ·   小百科   ·   link之家   ·   卧龙AI搜索
删除内容请联系邮箱 2879853325@qq.com
Code - 代码工具平台
© 2024 ~ 沪ICP备11025650号