Spring Batch CSV to Database
August 01, 2019
This page will walk through Spring Batch CSV to database example. Spring Batch creates a Job
using Step
and step is configured using reader, processor and writer. To read CSV file we will create reader using FlatFileItemReader
and to write data in database we will create writer using JdbcBatchItemWriter
. To create a processor we need to create a Spring component by implementing ItemProcessor
. Java configuration class needs to be annotated with @EnableBatchProcessing
.
Here on this page we will create a batch application that will read data from CSV file and write into MySQL database. To create datasource, we will use HikariCP. We will create a Spring Batch listener that will give notifications before start of the job and after completion of the job. We will also create scheduler to schedule the batch job. Now find the complete example in detail step-by-step.
Contents
1. Technologies Used
Find the technologies being used in our example.1. Java 11
2. Spring 5.1.8.RELEASE
3. Spring Batch 4.1.2.RELEASE
4. Spring Boot 2.1.6.RELEASE
5. Maven 3.5.2
6. MySQL 5.5
7. Eclipse 2018-099
2. Project Structure
In our example we are using Eclipse IDE. Find the project structure print screen of our application using Eclipse.3. Maven Dependencies
Find the Maven dependencies of our project.pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.concretepage</groupId> <artifactId>spring-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>Spring</name> <description>Spring Demo Project</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.6.RELEASE</version> <relativePath /> </parent> <properties> <context.path>spring-app</context.path> <java.version>11</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>6.0.5</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
4. CSV File
In our CSV file there are four columns that contain students detail and their subject wise marks. The first row is the column names. In our demo application we will read CSV file from project classpath. Find the print screen of our CSV file.student-marks.csv
5. MySQL Table
In our Spring Batch demo application, we are reading data from CSV file and writing it into MySQL database. From CSV file, we will read students details and their subject wise marks and then we will write students details and total marks in database. Find the MySQL database table used in our demo application.MySQL Table: marksheet
CREATE TABLE `marksheet` ( `rollNum` INT(11) NULL DEFAULT NULL, `studentName` VARCHAR(50) NULL DEFAULT NULL, `totalMarks` INT(5) NULL DEFAULT NULL ) COLLATE='latin1_swedish_ci' ENGINE=InnoDB;
6. Batch Configuration
In Spring Batch operation we need to configure a batch job usingJob
. Batch job will configure its step using Step
. A step is configured with reader, processor and writer. A reader will read input file, such as CSV file, using FlatFileItemReader
. Before writing the input data, we can process it and perform some operation over it. To create a processor we need to create a class implementing Spring ItemProcessor
interface. To create a writer we can use JdbcBatchItemWriter
to write data in database such as MySQL.
BatchConfiguration.java
package com.concretepage.configuration; import javax.sql.DataSource; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.LineMapper; import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.scheduling.annotation.EnableScheduling; import com.concretepage.listener.JobCompletionListener; import com.concretepage.model.Marksheet; import com.concretepage.model.Student; import com.concretepage.processor.StudentItemProcessor; import com.zaxxer.hikari.HikariDataSource; @Configuration @EnableBatchProcessing @EnableScheduling public class BatchConfiguration extends DefaultBatchConfigurer { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Bean public LineMapper<Student> lineMapper() { DefaultLineMapper<Student> lineMapper = new DefaultLineMapper<Student>(); lineMapper.setLineTokenizer(new DelimitedLineTokenizer() { { setNames(new String[] { "rollNum", "stdName", "subjectAMark", "subjectBMark" }); } }); lineMapper.setFieldSetMapper(new BeanWrapperFieldSetMapper<Student>() { { setTargetType(Student.class); } }); return lineMapper; } @Bean public FlatFileItemReader<Student> reader() { return new FlatFileItemReaderBuilder<Student>() .name("studentItemReader") .resource(new ClassPathResource("student-marks.csv")) .lineMapper(lineMapper()) .linesToSkip(1) .build(); } @Bean public JdbcBatchItemWriter<Marksheet> writer(DataSource dataSource) { return new JdbcBatchItemWriterBuilder<Marksheet>() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Marksheet>()) .sql("INSERT INTO marksheet (rollNum, studentName, totalMarks) VALUES (:rollNum, :stdName,:totalMarks)") .dataSource(dataSource) .build(); } @Bean public ItemProcessor<Student, Marksheet> processor() { return new StudentItemProcessor(); } @Bean public Job createMarkSheetJob(JobCompletionListener listener, Step step1) { return jobBuilderFactory .get("createMarkSheetJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1) .end() .build(); } @Bean public Step step1(ItemReader<Student> reader, ItemWriter<Marksheet> writer, ItemProcessor<Student, Marksheet> processor) { return stepBuilderFactory .get("step1") .<Student, Marksheet>chunk(5) .reader(reader) .processor(processor) .writer(writer) .build(); } @Bean public DataSource getDataSource() { HikariDataSource dataSource = new HikariDataSource(); dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/cpdb"); dataSource.setUsername("root"); dataSource.setPassword("cp"); return dataSource; } @Bean public JdbcTemplate jdbcTemplate(DataSource dataSource) { return new JdbcTemplate(dataSource); } @Override public void setDataSource(DataSource dataSource) { } }
<batch:*>
in XML configuration. We need to annotate our JavaConfig with @EnableBatchProcessing
annotation to enable batch processing.
@EnableScheduling: Enables scheduled task execution capability in Spring. In our example we will schedule batch jobs and we need to annotate our JavaConfig with
@EnableScheduling
annotation to enable it.
In our example we will read student data from a CSV file and will be written to MySQL database marksheet table. Find the models used in our demo application.
Student.java
package com.concretepage.model; public class Student { private long rollNum; private String stdName; private int subjectAMark; private int subjectBMark; //setters ad getters }
package com.concretepage.model; public class Marksheet { private long rollNum; private String stdName; private int totalMarks; public Marksheet(long rollNum, String stdName, int totalMarks){ this.rollNum = rollNum; this.stdName = stdName; this.totalMarks = totalMarks; } //setters ad getters }
6.1 Create Reader using FlatFileItemReader
SpringFlatFileItemReader
is the restartable ItemReader
that reads lines from input resource. To create a reader we need to create a bean of FlatFileItemReader
in configuration file.
@Bean public FlatFileItemReader<Student> reader() { return new FlatFileItemReaderBuilder<Student>() .name("studentItemReader") .resource(new ClassPathResource("student-marks.csv")) .lineMapper(lineMapper()) .linesToSkip(1) .build(); }
Spring Batch provides
LineMapper
interface that maps lines read from resource to domain object per line basis. In our example we will read rows from CSV file that has 4 columns. They will mapped to Java object using following line mapper.
@Bean public LineMapper<Student> lineMapper() { DefaultLineMapper<Student> lineMapper = new DefaultLineMapper<Student>(); lineMapper.setLineTokenizer(new DelimitedLineTokenizer() { { setNames(new String[] { "rollNum", "stdName", "subjectAMark", "subjectBMark" }); } }); lineMapper.setFieldSetMapper(new BeanWrapperFieldSetMapper<Student>() { { setTargetType(Student.class); } }); return lineMapper; }
DelimitedLineTokenizer
is a line tokenizer that splits the input string on a configurable delimeter. Each row of CSV file will map to Student
class. The first column of CSV file will map to rollNum
, second column will map to stdName
, third column will map to subjectAMark
and forth column will map to subjectBMark
fields of Student
class.
We can directly configure
DelimitedLineTokenizer
to our reader to map rows of input CSV file to domain object.
@Bean public FlatFileItemReader<Student> reader() { return new FlatFileItemReaderBuilder<Student>() .name("studentItemReader") .resource(new ClassPathResource("student-marks.csv")) .delimited() .names(new String[]{ "rollNum", "stdName", "subjectAMark", "subjectBMark" }) .fieldSetMapper(new BeanWrapperFieldSetMapper<Student>() {{ setTargetType(Student.class); }}) .linesToSkip(1) .build(); }
delimited()
returns DelimitedBuilder
that builds DelimitedLineTokenizer
.
6.2 Create Writer using JdbcBatchItemWriter
To create a writer we are usingJdbcBatchItemWriter
. This is the implementation of ItemWriter
which uses NamedParameterJdbcTemplate
to execute batch of statements for all items provided. We need to configure SQL query to insert data and datasource to connect to database in writer bean.
@Bean public JdbcBatchItemWriter<Marksheet> writer(DataSource dataSource) { return new JdbcBatchItemWriterBuilder<Marksheet>() .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Marksheet>()) .sql("INSERT INTO marksheet (rollNum, studentName, totalMarks) VALUES (:rollNum, :stdName,:totalMarks)") .dataSource(dataSource) .build(); }
6.3 Create Step
Step
provides the configuration of step in batch job. We need to create a bean of Step
that will configure reader, processor and writer to perform batch job.
@Bean public Step step1(ItemReader<Student> reader, ItemWriter<Marksheet> writer, ItemProcessor<Student, Marksheet> processor) { return stepBuilderFactory .get("step1") .<Student, Marksheet>chunk(5) .reader(reader) .processor(processor) .writer(writer) .build(); }
StepBuilderFactory
. The method get()
provides StepBuilder
for the given step name i.e. step1
in our example. StepBuilder
provides chunk()
method that builds a step to process items in chunks with the given size. chunk()
is a generic type that needs input and output type i.e. Student
is the input type and Marksheet
is the output type in our batch example.
6.4 Create Job
SpringJob
represents a Batch job that configures incrementer, listener, step etc. The policy to restart a job is that job will restart as a whole and not to a step. To configure a job, we use JobBuilderFactory
whose get()
method returns a JobBuilder
that builds a Job
.
@Bean public Job createMarkSheetJob(JobCompletionListener listener, Step step1) { return jobBuilderFactory .get("createMarkSheetJob") .incrementer(new RunIdIncrementer()) .listener(listener) .flow(step1) .end() .build(); }
RunIdIncrementer
gives the next job parameter in sequence. Listener is configured to listen to the job events such as before job and after job. We can use listener if we want to perform some operations before and after job, such as clearing database tables before job and counting rows after job. Using JobBuilder.flow()
we configure Step
that will be performed for each input row in the batch process.
6.5 Create DataSource
Here we are configuring datasource using HikariCP. The datasource is configured in writer bean in batch configuration file.@Configuration @EnableBatchProcessing public class BatchConfiguration extends DefaultBatchConfigurer { ------ @Bean public DataSource getDataSource() { HikariDataSource dataSource = new HikariDataSource(); dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/cpdb"); dataSource.setUsername("root"); dataSource.setPassword("cp"); return dataSource; } @Bean public JdbcTemplate jdbcTemplate(DataSource dataSource) { return new JdbcTemplate(dataSource); } //Helps to create default batch tables @Override public void setDataSource(DataSource dataSource) { } }
setDataSource
method from DefaultBatchConfigurer
so that the default tables of Spring Batch operation is created by application itself.
7. Item Processor
In our batch process, reader reads data asStudent
type and writer writes data as Marksheet
type. A processor is used to process some operation on input data read by reader and then changing into the data required by writer of batch operation. Processor is configured while creating batch Step
. To create a processor we need to create a Spring component by implementing ItemProcessor
.
StudentItemProcessor.java
package com.concretepage.processor; import org.springframework.batch.item.ItemProcessor; import com.concretepage.model.Marksheet; import com.concretepage.model.Student; public class StudentItemProcessor implements ItemProcessor<Student, Marksheet> { @Override public Marksheet process(final Student student) throws Exception { int totalMarks = student.getSubjectAMark() + student.getSubjectBMark(); System.out.println("Student name:" + student.getStdName() + " and Total marks:" + totalMarks); Marksheet marksheet = new Marksheet(student.getRollNum(), student.getStdName(), totalMarks); return marksheet; } }
Student
which contains the details of student and subject-wise mark and prepares their marksheet as an object of Marksheet
class required by writer.
8. Batch Job Execution Listener
Spring Batch providesJobExecutionListener
that listens the Batch job. It provides beforeJob
and afterJob
methods. To create a listener we need to create a Spring component by implementing JobExecutionListener
and override its methods. beforeJob
will execute before job start and afterJob
will execute after completion of job.
JobCompletionListener.java
package com.concretepage.listener; import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Component; import com.concretepage.model.Marksheet; @Component public class JobCompletionListener implements JobExecutionListener { @Autowired public JdbcTemplate jdbcTemplate; @Override public void beforeJob(JobExecution jobExecution) { System.out.println("Executing job id " + jobExecution.getId()); } @Override public void afterJob(JobExecution jobExecution) { if(jobExecution.getStatus() == BatchStatus.COMPLETED) { List<Marksheet> result = jdbcTemplate.query("SELECT rollNum, studentName, totalMarks FROM marksheet", new RowMapper<Marksheet>() { @Override public Marksheet mapRow(ResultSet rs, int row) throws SQLException { return new Marksheet(rs.getLong(1), rs.getString(2), rs.getInt(3)); } }); System.out.println("Number of Records:"+result.size()); } } }
beforeJob
can be used to log some information or clear database tables. afterJob
can be used to validate data written in database after job completion.
9. Batch Job Scheduler
We will create a scheduler to schedule the batch job. The scheduler will schedule the batch job after every given fixed time. To enable the scheduler, we need to annotate Java configuration class with@EnableScheduling
annotation. Now find our scheduler.
BatchJobScheduler.java
package com.concretepage.scheduler; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component public class BatchJobScheduler { @Autowired private Job job; @Autowired private JobLauncher jobLauncher; @Scheduled(fixedDelay = 6000) public void runBatchJob() { JobParameters params = new JobParametersBuilder() .addLong("jobId", System.currentTimeMillis()) .toJobParameters(); try { jobLauncher.run(job, params); } catch (JobExecutionAlreadyRunningException e) { e.printStackTrace(); } catch (JobRestartException e) { e.printStackTrace(); } catch (JobInstanceAlreadyCompleteException e) { e.printStackTrace(); } catch (JobParametersInvalidException e) { e.printStackTrace(); } } }
runBatchJob
will run after every 6 seconds. To change Job parameter for every scheduled time, we are using System.currentTimeMillis()
. The job parameter will be created as jobId=1564587528542
. After every 6 seconds the batch job will run. The job will read CSV file, process it and then write data into database and we will get notification by listener.
10. Run Application
To run the application, find the Spring Boot Main class.Main.java
package com.concretepage; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); } }
To run the application as JAR, find the steps.
1. Go to the root directory of the project using command prompt and run the mvn clean package command.
2. In target directory a JAR will be created.
3. To run the application using JAR, run the command.
java -jar target/spring-demo-0.0.1-SNAPSHOT.jar
11. References
Creating a Batch ServiceSpring Batch Introduction