Spring Batch – TaskScheduler

Task Scheduler

This tutorial will show you an example on Spring Batch – TaskScheduler. So it will show you how to schedule the task repeatedly for reading a csv file data and writing to xml file after some modification on csv data. You can read the tutorial Spring Batch to read what is Spring Batch and what are the usages of Spring Batch.

I’ll build a service that imports data from a CSV file, transforms it with custom code, and store the final results in xml file. And schedule the same task repeatedly using Spring’s TaskScheduler.

Prerequisites

Java 1.8/11/19, Maven 3.8.5, MySQL 8.0.26/8.0.31, Spring Boot 2.6.7/3.1.2

Project Setup

Create a maven based project in your favorite IDE or tool and you will see the required project structure gets created.

For spring boot version 3.x you can use the following pom.xml file:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.roytuts</groupId>
	<artifactId>spring-batch-task-scheduler</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>19</maven.compiler.source>
		<maven.compiler.target>19</maven.compiler.target>
	</properties>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.1.2</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-batch</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-oxm</artifactId>
		</dependency>

		<dependency>
			<groupId>jakarta.xml.bind</groupId>
			<artifactId>jakarta.xml.bind-api</artifactId>
		</dependency>

		<dependency>
			<groupId>org.glassfish.jaxb</groupId>
			<artifactId>jaxb-runtime</artifactId>
		</dependency>

		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>8.0.31</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

For spring boot version 2.x you can use the following pom.xml file:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.roytuts</groupId>
	<artifactId>spring-batch-task-scheduler</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>11</maven.compiler.source>
		<maven.compiler.target>11</maven.compiler.target>
	</properties>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.6.7</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-batch</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-oxm</artifactId>
		</dependency>

		<dependency>
			<groupId>jakarta.xml.bind</groupId>
			<artifactId>jakarta.xml.bind-api</artifactId>
		</dependency>

		<dependency>
			<groupId>org.glassfish.jaxb</groupId>
			<artifactId>jaxb-runtime</artifactId>
		</dependency>

		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

In the above build script I have added spring-oxm to get benefits of JAXB classes for generating XML file from Java POJO class.

Related Posts:

MySQL Table

The following table is needed in the MySQL server.

CREATE TABLE `person` (
  `id` int unsigned COLLATE utf8mb4_unicode_ci NOT NULL AUTO_INCREMENT,
  `firstName` varchar(50) COLLATE utf8mb4_unicode_ci NOT NULL,
  `lastName` varchar(50) COLLATE utf8mb4_unicode_ci NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

VO Class

Create a model class Person.java which will represent a row of data for inputs and outputs. I have made the below class JAXB annotation enabled for converting Java object to XML file directly. There will be different import statements according to the version (3.x or 2.x) of spring boot.

@XmlRootElement(name = "person")
public class Person {
	private int id;
	private String firstName;
	private String lastName;
	@XmlAttribute(name = "id")
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	@XmlElement(name = "firstName")
	public String getFirstName() {
		return firstName;
	}
	public void setFirstName(String firstName) {
		this.firstName = firstName;
	}
	@XmlElement(name = "lastName")
	public String getLastName() {
		return lastName;
	}
	public void setLastName(String lastName) {
		this.lastName = lastName;
	}
	@Override
	public String toString() {
		return "Person [id=" + id + ", firstName=" + firstName + ", lastName=" + lastName + "]";
	}
}

FieldSetMapper Class

Create below mapper class which will map the CSV file row item to Java object.

public class PersonFieldSetMapper implements FieldSetMapper<Person> {
	@Override
	public Person mapFieldSet(FieldSet fieldSet) {
		Person person = new Person();
		person.setId(fieldSet.readInt(0));
		person.setFirstName(fieldSet.readString(1));
		person.setLastName(fieldSet.readString(2));
		return person;
	}
}

ItemProcessor Class

Create an intermediate processor. A common paradigm in batch processing is to ingest data, transform it, and then pipe it out somewhere else. Here I write a simple transformer that converts the initial characters of the names to uppercase.

public class PersonItemProcessor implements ItemProcessor<Person, Person> {
	@Override
	public Person process(Person person) throws Exception {
		System.out.println("Processing: " + person);
		final String initCapFirstName = person.getFirstName().substring(0, 1).toUpperCase()
				+ person.getFirstName().substring(1);
		final String initCapLastName = person.getLastName().substring(0, 1).toUpperCase()
				+ person.getLastName().substring(1);
		Person transformedPerson = new Person();
		transformedPerson.setId(person.getId());
		transformedPerson.setFirstName(initCapFirstName);
		transformedPerson.setLastName(initCapLastName);
		return transformedPerson;
	}
}

Input CSV File

Create below CSV file called person.csv under src/main/resources directory.

1000,soumitra,roy
1001,souvik,sanyal
1002,arup,chatterjee
1003,suman,mukherjee
1004,debina,guha
1005,liton,sarkar
1006,debabrata,poddar

Configuration Class

I have created this Spring Configuration class to define several beans for Spring Batch processing.

I have defined beans, such as, ItemProcessor, TransactionManager, JobRepository, DataSource, JobLauncher, Step, Job etc. for our Spring Batch processing.

Note that in spring boot 3.x version, the JobBuilderFactory has been replaced by JobBuilder and StepBuilderFactory has been replaced by StepBuilder. You also don’t need to use @EnableBatchProcessing annotation in the spring batch configuration.

For spring boot 3.x use the following configuration class:

@Configuration
public class SpringBatchConfig {

	@Bean
	@Scope(value = BeanDefinition.SCOPE_PROTOTYPE)
	public Person person() {
		return new Person();
	}

	@Bean
	@Scope(value = BeanDefinition.SCOPE_PROTOTYPE)
	public ItemProcessor<Person, Person> itemProcessor() {
		return new PersonItemProcessor();
	}

	@Bean
	public DataSource dataSource() {
		DriverManagerDataSource dataSource = new DriverManagerDataSource();
		dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
		dataSource.setUrl("jdbc:mysql://localhost:3306/roytuts");
		dataSource.setUsername("root");
		dataSource.setPassword("root");

		ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator();
		databasePopulator.addScript(new ClassPathResource("org/springframework/batch/core/schema-drop-mysql.sql"));
		databasePopulator.addScript(new ClassPathResource("org/springframework/batch/core/schema-mysql.sql"));

		DatabasePopulatorUtils.execute(databasePopulator, dataSource);
		return dataSource;
	}

	@Bean
	public PlatformTransactionManager transactionManager() {
		return new ResourcelessTransactionManager();
	}

	@Bean
	public JobRepository jbRepository(DataSource dataSource, PlatformTransactionManager transactionManager)
			throws Exception {
		JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
		factory.setDatabaseType(DatabaseType.MYSQL.getProductName());
		factory.setDataSource(dataSource);
		factory.setTransactionManager(transactionManager);
		factory.setIncrementerFactory(new DefaultDataFieldMaxValueIncrementerFactory(dataSource) {
			@Override
			public DataFieldMaxValueIncrementer getIncrementer(String incrementerType, String incrementerName) {
				return new SqlServerSequenceMaxValueIncrementer(dataSource, incrementerName);
			}
		});
		factory.afterPropertiesSet();
		return factory.getObject();
	}

	@Bean
	public JobLauncher jbLauncher(JobRepository jobRepository) {
		TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
		jobLauncher.setJobRepository(jobRepository);
		return jobLauncher;
	}

	@Bean
	public BeanWrapperFieldSetMapper<Person> beanWrapperFieldSetMapper() {
		BeanWrapperFieldSetMapper<Person> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
		fieldSetMapper.setPrototypeBeanName("person");
		return fieldSetMapper;
	}

	@Bean
	public FlatFileItemReader<Person> fileItemReader(BeanWrapperFieldSetMapper<Person> beanWrapperFieldSetMapper) {
		FlatFileItemReader<Person> fileItemReader = new FlatFileItemReader<>();
		fileItemReader.setResource(new ClassPathResource("person.csv"));

		DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
		delimitedLineTokenizer.setNames("id", "firstName", "lastName");

		DefaultLineMapper<Person> defaultLineMapper = new DefaultLineMapper<>();
		defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
		defaultLineMapper.setFieldSetMapper(beanWrapperFieldSetMapper);

		fileItemReader.setLineMapper(defaultLineMapper);

		return fileItemReader;
	}

	@Bean
	public JdbcBatchItemWriter<Person> jdbcBatchItemWriter(DataSource dataSource,
			BeanPropertyItemSqlParameterSourceProvider<Person> sqlParameterSourceProvider) {
		JdbcBatchItemWriter<Person> jdbcBatchItemWriter = new JdbcBatchItemWriter<>();
		jdbcBatchItemWriter.setDataSource(dataSource);
		jdbcBatchItemWriter.setItemSqlParameterSourceProvider(sqlParameterSourceProvider);
		jdbcBatchItemWriter.setSql("insert into person(id,firstName,lastName) values (:id, :firstName, :lastName)");

		return jdbcBatchItemWriter;
	}

	@Bean
	public BeanPropertyItemSqlParameterSourceProvider<Person> beanPropertyItemSqlParameterSourceProvider() {
		return new BeanPropertyItemSqlParameterSourceProvider<>();
	}

	@Bean
	public Job jobCsvMysql(JobRepository jobRepository, Step step) {
		return new JobBuilder("jobCsvMysql", jobRepository).flow(step).end().build();
		// return new JobBuilder("jobCsvMysql", jobRepository).start(step).build();
	}

	@Bean
	public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager,
			ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor) {
		return new StepBuilder("step1", jobRepository).<Person, Person>chunk(2, transactionManager).reader(reader)
				.processor(processor).writer(writer).build();
	}

}

For spring boot 2.x version use the following configuration class:

@Configuration
@EnableBatchProcessing
public class SpringBatchConfig {
	@Bean
	@Scope(value = BeanDefinition.SCOPE_PROTOTYPE)
	public Person person() {
		return new Person();
	}
	@Bean
	@Scope(value = BeanDefinition.SCOPE_PROTOTYPE)
	public ItemProcessor<Person, Person> itemProcessor() {
		return new PersonItemProcessor();
	}
	@Bean
	public DataSource dataSource() {
		DriverManagerDataSource dataSource = new DriverManagerDataSource();
		dataSource.setDriverClassName("com.mysql.jdbc.Driver");
		dataSource.setUrl("jdbc:mysql://localhost:3306/roytuts");
		dataSource.setUsername("root");
		dataSource.setPassword("");
		ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator();
		databasePopulator.addScript(new ClassPathResource("org/springframework/batch/core/schema-drop-mysql.sql"));
		databasePopulator.addScript(new ClassPathResource("org/springframework/batch/core/schema-mysql.sql"));
		DatabasePopulatorUtils.execute(databasePopulator, dataSource);
		return dataSource;
	}
	@Bean
	public ResourcelessTransactionManager txManager() {
		return new ResourcelessTransactionManager();
	}
	@Bean
	public JobRepository jbRepository(DataSource dataSource, ResourcelessTransactionManager transactionManager)
			throws Exception {
		JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
		factory.setDatabaseType(DatabaseType.MYSQL.getProductName());
		factory.setDataSource(dataSource);
		factory.setTransactionManager(transactionManager);
		return factory.getObject();
	}
	@Bean
	public JobLauncher jbLauncher(JobRepository jobRepository) {
		SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
		jobLauncher.setJobRepository(jobRepository);
		return jobLauncher;
	}
	@Bean
	public BeanWrapperFieldSetMapper<Person> beanWrapperFieldSetMapper() {
		BeanWrapperFieldSetMapper<Person> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
		fieldSetMapper.setPrototypeBeanName("person");
		return fieldSetMapper;
	}
	@Bean
	public FlatFileItemReader<Person> fileItemReader(BeanWrapperFieldSetMapper<Person> beanWrapperFieldSetMapper) {
		FlatFileItemReader<Person> fileItemReader = new FlatFileItemReader<>();
		fileItemReader.setResource(new ClassPathResource("person.csv"));
		DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
		delimitedLineTokenizer.setNames("id", "firstName", "lastName");
		DefaultLineMapper<Person> defaultLineMapper = new DefaultLineMapper<>();
		defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
		defaultLineMapper.setFieldSetMapper(beanWrapperFieldSetMapper);
		fileItemReader.setLineMapper(defaultLineMapper);
		return fileItemReader;
	}
	@Bean
	public JdbcBatchItemWriter<Person> jdbcBatchItemWriter(DataSource dataSource,
			BeanPropertyItemSqlParameterSourceProvider<Person> sqlParameterSourceProvider) {
		JdbcBatchItemWriter<Person> jdbcBatchItemWriter = new JdbcBatchItemWriter<>();
		jdbcBatchItemWriter.setDataSource(dataSource);
		jdbcBatchItemWriter.setItemSqlParameterSourceProvider(sqlParameterSourceProvider);
		jdbcBatchItemWriter.setSql("insert into person(id,firstName,lastName) values (:id, :firstName, :lastName)");
		return jdbcBatchItemWriter;
	}
	@Bean
	public BeanPropertyItemSqlParameterSourceProvider<Person> beanPropertyItemSqlParameterSourceProvider() {
		return new BeanPropertyItemSqlParameterSourceProvider<>();
	}
	@Bean
	public Job jobCsvMysql(JobBuilderFactory jobBuilderFactory, Step step) {
		return jobBuilderFactory.get("jobCsvMysql").incrementer(new RunIdIncrementer()).flow(step).end().build();
	}
	@Bean
	public Step step1(StepBuilderFactory stepBuilderFactory, ResourcelessTransactionManager transactionManager,
			ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor) {
		return stepBuilderFactory.get("step1").transactionManager(transactionManager).<Person, Person>chunk(2)
				.reader(reader).processor(processor).writer(writer).build();
	}
}

A default simple implementation of the Job interface is provided by Spring Batch in the form of the SimpleJob class which creates some standard functionality on top of Job, however the batch namespace abstracts away the need to instantiate it directly.

Step is a domain object that encapsulates an independent, sequential phase of a batch job. Therefore, every Job is composed entirely of one or more steps. A Step contains all of the information necessary to define and control the actual batch processing.

ItemReader is an abstraction that represents the retrieval of input for a Step, one item at a time.

ItemWriter is an abstraction that represents the output of a Step, one batch or chunk of items at a time. Generally, an item writer has no knowledge of the input it will receive next, only the item that was passed in its current invocation.

ItemProcessor is an abstraction that represents the business processing of an item. While the ItemReader reads one item, and the ItemWriter writes them, the ItemProcessor provides access to transform or apply other business processing. If, while processing the item, it is determined that the item is not valid, returning null indicates that the item should not be written out.

TransactionManager – Spring’s that will be used to begin and commit transactions during processing.

Chunk – The number of items that will be processed before the transaction is committed.

JobRepository is the persistence mechanism. It provides CRUD operations for JobLauncherJob and Step implementations. When a Job is first launched, a JobExecution is obtained from the repository, and during the course of execution StepExecution and JobExecution implementations are persisted by passing them to the repository.

JonLauncher represents a simple interface for launching a Job with a given set of JobParameters.

Spring Task Scheduler

I need to create Spring Task Scheduler to schedule the task repetitively for execution.

I schedule task using cron expression. So the below class will execute the job every 10 seconds.

@Component
@EnableScheduling
public class SpringBatchTaskScheduler {
	@Autowired
	private Job job;
	@Autowired
	private JobLauncher jobLauncher;
	@Scheduled(cron = "*/10 * * * * *")
	public void run() {
		try {
			JobExecution execution = jobLauncher.run(job,
					new JobParametersBuilder().addLong("timestamp", System.currentTimeMillis()).toJobParameters());
			System.out.println("Job Status : " + execution.getStatus());
		} catch (Exception ex) {
			ex.printStackTrace();
		}
		System.out.println("Done");
	}
}

Spring Boot Main Class

Create below class for launching spring batch job. A class with main method and @SpringBootApplication annotation is enough to run the spring boot application.

@SpringBootApplication
public class App {
	public static void main(String[] args) {
		SpringApplication.run(App.class, args);
	}
}

Testing Spring Batch Task Scheduler

Run the above class, you will see the below output.

16.206  INFO 9412 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=jobCsvMysql]] launched with the following parameters: [{run.id=1}]
16.563  INFO 9412 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
Processing: Person [id=1000, firstName=soumitra, lastName=roy]
Processing: Person [id=1001, firstName=souvik, lastName=sanyal]
Processing: Person [id=1002, firstName=arup, lastName=chatterjee]
Processing: Person [id=1003, firstName=suman, lastName=mukherjee]
Processing: Person [id=1004, firstName=debina, lastName=guha]
Processing: Person [id=1005, firstName=liton, lastName=sarkar]
Processing: Person [id=1006, firstName=debabrata, lastName=poddar]
17.283  INFO 9412 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 719ms
17.405  INFO 9412 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=jobCsvMysql]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]

In the above output you see the job name, step name and also which row item from csv file is being processed.

You see also from the above output that the step1 has been executed repeatedly until you stop the execution of the task.

You see the job is executed repeatedly every 10 seconds.

If you use other than in-memory database, such as, MySQL, Oracle etc. then you can also see the SQL scripts have been executed and below tables have been created in the MySQL database with job details.

spring batch taskscheduler

You will also see the batch_job_execution table has been populated with the execution status and timestamp.

Source Code

Download

Leave a Reply

Your email address will not be published. Required fields are marked *