Spring Batch CSV to Database

By Arvind Rai, August 01, 2019
This page will walk through Spring Batch CSV to database example. Spring Batch creates a Job using Step and step is configured using reader, processor and writer. To read CSV file we will create reader using FlatFileItemReader and to write data in database we will create writer using JdbcBatchItemWriter. To create a processor we need to create a Spring component by implementing ItemProcessor. Java configuration class needs to be annotated with @EnableBatchProcessing.
Here on this page we will create a batch application that will read data from CSV file and write into MySQL database. To create datasource, we will use HikariCP. We will create a Spring Batch listener that will give notifications before start of the job and after completion of the job. We will also create scheduler to schedule the batch job. Now find the complete example in detail step-by-step.

1. Technologies Used

Find the technologies being used in our example.
1. Java 11
2. Spring 5.1.8.RELEASE
3. Spring Batch 4.1.2.RELEASE
4. Spring Boot 2.1.6.RELEASE
5. Maven 3.5.2
6. MySQL 5.5
7. Eclipse 2018-099

2. Project Structure

In our example we are using Eclipse IDE. Find the project structure print screen of our application using Eclipse.
Spring Batch CSV to Database

3. Maven Dependencies

Find the Maven dependencies of our project.
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.concretepage</groupId>
	<artifactId>spring-demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>
	<name>Spring</name>
	<description>Spring Demo Project</description>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.6.RELEASE</version>
		<relativePath />
	</parent>
	<properties>
		<context.path>spring-app</context.path>
		<java.version>11</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-batch</artifactId>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>6.0.5</version>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project> 
The spring-boot-starter-batch provides all the required dependencies for Spring Batch operation. To create datasource we will use HikariCP and it will also be resolved automatically.

4. CSV File

In our CSV file there are four columns that contain students detail and their subject wise marks. The first row is the column names. In our demo application we will read CSV file from project classpath. Find the print screen of our CSV file.
student-marks.csv
Spring Batch CSV to Database

5. MySQL Table

In our Spring Batch demo application, we are reading data from CSV file and writing it into MySQL database. From CSV file, we will read students details and their subject wise marks and then we will write students details and total marks in database. Find the MySQL database table used in our demo application.
MySQL Table: marksheet
CREATE TABLE `marksheet` (
	`rollNum` INT(11) NULL DEFAULT NULL,
	`studentName` VARCHAR(50) NULL DEFAULT NULL,
	`totalMarks` INT(5) NULL DEFAULT NULL
)
COLLATE='latin1_swedish_ci'
ENGINE=InnoDB; 

6. Batch Configuration

In Spring Batch operation we need to configure a batch job using Job. Batch job will configure its step using Step. A step is configured with reader, processor and writer. A reader will read input file, such as CSV file, using FlatFileItemReader. Before writing the input data, we can process it and perform some operation over it. To create a processor we need to create a class implementing Spring ItemProcessor interface. To create a writer we can use JdbcBatchItemWriter to write data in database such as MySQL.
BatchConfiguration.java
package com.concretepage.configuration;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import com.concretepage.listener.JobCompletionListener;
import com.concretepage.model.Marksheet;
import com.concretepage.model.Student;
import com.concretepage.processor.StudentItemProcessor;
import com.zaxxer.hikari.HikariDataSource;

@Configuration
@EnableBatchProcessing
@EnableScheduling
public class BatchConfiguration extends DefaultBatchConfigurer {
	@Autowired
	public JobBuilderFactory jobBuilderFactory;

	@Autowired
	public StepBuilderFactory stepBuilderFactory;

	@Bean
	public LineMapper<Student> lineMapper() {
		DefaultLineMapper<Student> lineMapper = new DefaultLineMapper<Student>();
		lineMapper.setLineTokenizer(new DelimitedLineTokenizer() {
			{
				setNames(new String[] { "rollNum", "stdName", "subjectAMark", "subjectBMark" });
			}
		});
		lineMapper.setFieldSetMapper(new BeanWrapperFieldSetMapper<Student>() {
			{
				setTargetType(Student.class);
			}
		});
		return lineMapper;
	}

	@Bean
	public FlatFileItemReader<Student> reader() {
	    return new FlatFileItemReaderBuilder<Student>()
		  .name("studentItemReader")		
		  .resource(new ClassPathResource("student-marks.csv"))
		  .lineMapper(lineMapper())
		  .linesToSkip(1)
		  .build();
	}

	@Bean
	public JdbcBatchItemWriter<Marksheet> writer(DataSource dataSource) {
	    return new JdbcBatchItemWriterBuilder<Marksheet>()
		  .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Marksheet>())
		  .sql("INSERT INTO marksheet (rollNum, studentName, totalMarks) VALUES (:rollNum, :stdName,:totalMarks)")
		  .dataSource(dataSource)
		  .build();
	}

	@Bean
	public ItemProcessor<Student, Marksheet> processor() {
	    return new StudentItemProcessor();
	}

	@Bean
	public Job createMarkSheetJob(JobCompletionListener listener, Step step1) {
	    return jobBuilderFactory
		  .get("createMarkSheetJob")
		  .incrementer(new RunIdIncrementer())
		  .listener(listener)
		  .flow(step1)
		  .end()
		  .build();
	}

	@Bean
	public Step step1(ItemReader<Student> reader, ItemWriter<Marksheet> writer,
			ItemProcessor<Student, Marksheet> processor) {
	    return stepBuilderFactory
		  .get("step1")
		  .<Student, Marksheet>chunk(5)
		  .reader(reader)
		  .processor(processor)
		  .writer(writer)
		  .build();
	}

	@Bean
	public DataSource getDataSource() {
		HikariDataSource dataSource = new HikariDataSource();
		dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/cpdb");
		dataSource.setUsername("root");
		dataSource.setPassword("cp");
		return dataSource;
	}

	@Bean
	public JdbcTemplate jdbcTemplate(DataSource dataSource) {
	    return new JdbcTemplate(dataSource);
	}

	@Override
	public void setDataSource(DataSource dataSource) {
	}
} 
@EnableBatchProcessing: Enables Spring Batch features and provides basic configuration for a batch job. It is equivalent to <batch:*> in XML configuration. We need to annotate our JavaConfig with @EnableBatchProcessing annotation to enable batch processing.

@EnableScheduling: Enables scheduled task execution capability in Spring. In our example we will schedule batch jobs and we need to annotate our JavaConfig with @EnableScheduling annotation to enable it.

In our example we will read student data from a CSV file and will be written to MySQL database marksheet table. Find the models used in our demo application.
Student.java
package com.concretepage.model;
public class Student {
	private long rollNum;
	private String stdName;
	private int subjectAMark;
	private int subjectBMark;
        
        //setters ad getters
} 
Marksheet.java
package com.concretepage.model;
public class Marksheet {
	private long rollNum;	
	private String stdName;
	private int totalMarks;
	public Marksheet(long rollNum, String stdName, int totalMarks){
		this.rollNum = rollNum;
		this.stdName = stdName;
		this.totalMarks = totalMarks;
	}
        //setters ad getters
} 
Now let us discuss the beans created in our batch configuration file step-by-step.

6.1 Create Reader using FlatFileItemReader

Spring FlatFileItemReader is the restartable ItemReader that reads lines from input resource. To create a reader we need to create a bean of FlatFileItemReader in configuration file.
@Bean
public FlatFileItemReader<Student> reader() {
	   return new FlatFileItemReaderBuilder<Student>()
	   .name("studentItemReader")		
	   .resource(new ClassPathResource("student-marks.csv"))
	   .lineMapper(lineMapper())
	   .linesToSkip(1)
	   .build();
} 
We need to configure resource location, line mapper, lines to skip etc. Here we are reading CSV file from classpath. In our CSV file, first row is the column names. So we are skipping first line to read.
Spring Batch provides LineMapper interface that maps lines read from resource to domain object per line basis. In our example we will read rows from CSV file that has 4 columns. They will mapped to Java object using following line mapper.
@Bean
public LineMapper<Student> lineMapper() {
	DefaultLineMapper<Student> lineMapper = new DefaultLineMapper<Student>();
	lineMapper.setLineTokenizer(new DelimitedLineTokenizer() {
		{
			setNames(new String[] { "rollNum", "stdName", "subjectAMark", "subjectBMark" });
		}
	});
	lineMapper.setFieldSetMapper(new BeanWrapperFieldSetMapper<Student>() {
		{
			setTargetType(Student.class);
		}
	});
	return lineMapper;
} 
DelimitedLineTokenizer is a line tokenizer that splits the input string on a configurable delimeter. Each row of CSV file will map to Student class. The first column of CSV file will map to rollNum, second column will map to stdName, third column will map to subjectAMark and forth column will map to subjectBMark fields of Student class.

We can directly configure DelimitedLineTokenizer to our reader to map rows of input CSV file to domain object.
@Bean
public FlatFileItemReader<Student> reader() {
	return new FlatFileItemReaderBuilder<Student>()
	  .name("studentItemReader")		
	  .resource(new ClassPathResource("student-marks.csv"))
	  .delimited()
	  .names(new String[]{ "rollNum", "stdName", "subjectAMark", "subjectBMark" })
	  .fieldSetMapper(new BeanWrapperFieldSetMapper<Student>() {{
		   setTargetType(Student.class);
	  }})
	  .linesToSkip(1)
	  .build();
} 
The method delimited() returns DelimitedBuilder that builds DelimitedLineTokenizer.

6.2 Create Writer using JdbcBatchItemWriter

To create a writer we are using JdbcBatchItemWriter. This is the implementation of ItemWriter which uses NamedParameterJdbcTemplate to execute batch of statements for all items provided. We need to configure SQL query to insert data and datasource to connect to database in writer bean.
@Bean
public JdbcBatchItemWriter<Marksheet> writer(DataSource dataSource) {
	return new JdbcBatchItemWriterBuilder<Marksheet>()
	   .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Marksheet>())
	   .sql("INSERT INTO marksheet (rollNum, studentName, totalMarks) VALUES (:rollNum, :stdName,:totalMarks)")
	   .dataSource(dataSource)
	   .build();
} 

6.3 Create Step

Step provides the configuration of step in batch job. We need to create a bean of Step that will configure reader, processor and writer to perform batch job.
@Bean
public Step step1(ItemReader<Student> reader, ItemWriter<Marksheet> writer,
		ItemProcessor<Student, Marksheet> processor) {
	return stepBuilderFactory
	  .get("step1")
	  .<Student, Marksheet>chunk(5)
	  .reader(reader)
	  .processor(processor)
	  .writer(writer)
	  .build();
} 
We configure the step using StepBuilderFactory. The method get() provides StepBuilder for the given step name i.e. step1 in our example. StepBuilder provides chunk() method that builds a step to process items in chunks with the given size. chunk() is a generic type that needs input and output type i.e. Student is the input type and Marksheet is the output type in our batch example.

6.4 Create Job

Spring Job represents a Batch job that configures incrementer, listener, step etc. The policy to restart a job is that job will restart as a whole and not to a step. To configure a job, we use JobBuilderFactory whose get() method returns a JobBuilder that builds a Job.
@Bean
public Job createMarkSheetJob(JobCompletionListener listener, Step step1) {
	return jobBuilderFactory
	  .get("createMarkSheetJob")
	  .incrementer(new RunIdIncrementer())
	  .listener(listener)
	  .flow(step1)
	  .end()
	  .build();
} 
RunIdIncrementer gives the next job parameter in sequence. Listener is configured to listen to the job events such as before job and after job. We can use listener if we want to perform some operations before and after job, such as clearing database tables before job and counting rows after job. Using JobBuilder.flow() we configure Step that will be performed for each input row in the batch process.

6.5 Create DataSource

Here we are configuring datasource using HikariCP. The datasource is configured in writer bean in batch configuration file.
@Configuration
@EnableBatchProcessing
public class BatchConfiguration extends DefaultBatchConfigurer {
        ------
	@Bean
	public DataSource getDataSource() {
		HikariDataSource dataSource = new HikariDataSource();
		dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/cpdb");
		dataSource.setUsername("root");
		dataSource.setPassword("cp");
		return dataSource;
	}
	@Bean
	public JdbcTemplate jdbcTemplate(DataSource dataSource) {
		return new JdbcTemplate(dataSource);
	}
        //Helps to create default batch tables
	@Override
	public void setDataSource(DataSource dataSource) {
	}
} 
We have overridden setDataSource method from DefaultBatchConfigurer so that the default tables of Spring Batch operation is created by application itself.

7. Item Processor

In our batch process, reader reads data as Student type and writer writes data as Marksheet type. A processor is used to process some operation on input data read by reader and then changing into the data required by writer of batch operation. Processor is configured while creating batch Step. To create a processor we need to create a Spring component by implementing ItemProcessor.
StudentItemProcessor.java
package com.concretepage.processor;
import org.springframework.batch.item.ItemProcessor;
import com.concretepage.model.Marksheet;
import com.concretepage.model.Student;
public class StudentItemProcessor implements ItemProcessor<Student, Marksheet> {
	@Override
	public Marksheet process(final Student student) throws Exception {
		int totalMarks = student.getSubjectAMark() + student.getSubjectBMark();
		System.out.println("Student name:" + student.getStdName() + " and Total marks:" + totalMarks);
		Marksheet marksheet = new Marksheet(student.getRollNum(), student.getStdName(), totalMarks);
		return marksheet;
	}
} 
In our example the reader reads data from CSV file as an object of Student which contains the details of student and subject-wise mark and prepares their marksheet as an object of Marksheet class required by writer.

8. Batch Job Execution Listener

Spring Batch provides JobExecutionListener that listens the Batch job. It provides beforeJob and afterJob methods. To create a listener we need to create a Spring component by implementing JobExecutionListener and override its methods. beforeJob will execute before job start and afterJob will execute after completion of job.
JobCompletionListener.java
package com.concretepage.listener;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;
import com.concretepage.model.Marksheet;

@Component
public class JobCompletionListener implements JobExecutionListener {
	@Autowired
	public JdbcTemplate jdbcTemplate;

	@Override
	public void beforeJob(JobExecution jobExecution) {
		System.out.println("Executing job id " + jobExecution.getId());
	}

	@Override
	public void afterJob(JobExecution jobExecution) {
		if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
	        List<Marksheet> result = jdbcTemplate.query("SELECT rollNum, studentName, totalMarks FROM marksheet", 
	        		new RowMapper<Marksheet>() {
	            @Override
	            public Marksheet mapRow(ResultSet rs, int row) throws SQLException {
	                return new Marksheet(rs.getLong(1), rs.getString(2), rs.getInt(3));
	            }
	        });
	        System.out.println("Number of Records:"+result.size());
		}
	}
} 
beforeJob can be used to log some information or clear database tables. afterJob can be used to validate data written in database after job completion.

9. Batch Job Scheduler

We will create a scheduler to schedule the batch job. The scheduler will schedule the batch job after every given fixed time. To enable the scheduler, we need to annotate Java configuration class with @EnableScheduling annotation. Now find our scheduler.
BatchJobScheduler.java
package com.concretepage.scheduler;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class BatchJobScheduler {
	@Autowired
	private Job job;
	
	@Autowired
	private JobLauncher jobLauncher;

	@Scheduled(fixedDelay = 6000)
	public void runBatchJob() {
	        JobParameters params = new JobParametersBuilder()
	                .addLong("jobId", System.currentTimeMillis())
	                .toJobParameters();
		try {
			jobLauncher.run(job, params);
		
                } catch (JobExecutionAlreadyRunningException e) {
				e.printStackTrace();
		} catch (JobRestartException e) {
			e.printStackTrace();
		} catch (JobInstanceAlreadyCompleteException e) {
			e.printStackTrace();
		} catch (JobParametersInvalidException e) {
			e.printStackTrace();
		}
	}
} 
runBatchJob will run after every 6 seconds. To change Job parameter for every scheduled time, we are using System.currentTimeMillis(). The job parameter will be created as jobId=1564587528542. After every 6 seconds the batch job will run. The job will read CSV file, process it and then write data into database and we will get notification by listener.

10. Run Application

To run the application, find the Spring Boot Main class.
Main.java
package com.concretepage;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
} 
We can directly run the Main class as Java Application or by mvn spring-boot:run command using command prompt.
To run the application as JAR, find the steps.
1. Go to the root directory of the project using command prompt and run the mvn clean package command.
2. In target directory a JAR will be created.
3. To run the application using JAR, run the command.
java -jar target/spring-demo-0.0.1-SNAPSHOT.jar 
Before running the command, make sure that your MySQL database is up. Now go the MySQL database table and we will get inserted data as given in the print screen.
Spring Batch CSV to Database

11. References

Creating a Batch Service
Spring Batch Introduction

12. Download Source Code

POSTED BY
ARVIND RAI
ARVIND RAI
LEARN MORE








©2024 concretepage.com | Privacy Policy | Contact Us