Spring Batch - Multi-threaded steps
In this example, we will show you very basic use case of SimpleAsyncTaskExecutor.
CREATE TABLE `test`.`customer` (
`id` MEDIUMINT(8) UNSIGNED NOT NULL,
`firstName` VARCHAR(255) NULL,
`lastName` VARCHAR(255) NULL,
`birthdate` VARCHAR(255) NULL,
PRIMARY KEY (`id`)
);
CREATE TABLE `test`.`new_customer` (
`id` MEDIUMINT(8) UNSIGNED NOT NULL,
`firstName` VARCHAR(255) NULL,
`lastName` VARCHAR(255) NULL,
`birthdate` VARCHAR(255) NULL,
PRIMARY KEY (`id`)
);
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthdate;
}
CustomerRowMapper.java
public class CustomerRowMapper implements RowMapper<Customer> {
@Override
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
//@// @formatter:off
return Customer.builder()
.id(rs.getLong("id"))
.firstName(rs.getString("firstName"))
.lastName(rs.getString("lastName"))
.birthdate(rs.getString("birthdate"))
.build();
// @formatter:on
}
}
application.properties:
# MYSQL DB Details
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.platform=mysql
spring.datasource.continue-on-error=false
# SPRING BATCH (BatchProperties)
# Database schema initialization mode.
spring.batch.initialize-schema=always
spring.batch.job.enabled=false
JobConfiguration.java
@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public JdbcPagingItemReader<Customer> pagingItemReader(){
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("select id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
public JdbcBatchItemWriter<Customer> customerItemWriter(){
JdbcBatchItemWriter<Customer> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(this.dataSource);
writer.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)");
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.afterPropertiesSet();
return writer;
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(1000)
.reader(pagingItemReader())
.writer(customerItemWriter())
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
}
MultithreadedStepApplication.java
@SpringBootApplication
@EnableBatchProcessing
public class MultithreadedStepApplication implements CommandLineRunner{
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
public static void main(String[] args) {
SpringApplication.run(MultithreadedStepApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addDate("date", new Date())
.addLong("time",System.currentTimeMillis()).toJobParameters();
JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("STATUS :: "+execution.getStatus());
}
}
In this example, we will show you very basic use case of SimpleAsyncTaskExecutor.
We would highly recommend you to read "7. Scaling and Parallel Processing" from https://docs.spring.io/spring-batch/trunk/reference/html/scalability.html. I've use SimpleAsyncTaskExecutor with default thread name prefix.
Create Database tables:
`id` MEDIUMINT(8) UNSIGNED NOT NULL,
`firstName` VARCHAR(255) NULL,
`lastName` VARCHAR(255) NULL,
`birthdate` VARCHAR(255) NULL,
PRIMARY KEY (`id`)
);
CREATE TABLE `test`.`new_customer` (
`id` MEDIUMINT(8) UNSIGNED NOT NULL,
`firstName` VARCHAR(255) NULL,
`lastName` VARCHAR(255) NULL,
`birthdate` VARCHAR(255) NULL,
PRIMARY KEY (`id`)
);
Customer.java
@NoArgsConstructor
@Builder
@Data
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthdate;
}
CustomerRowMapper.java
public class CustomerRowMapper implements RowMapper<Customer> {
@Override
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
//@// @formatter:off
return Customer.builder()
.id(rs.getLong("id"))
.firstName(rs.getString("firstName"))
.lastName(rs.getString("lastName"))
.birthdate(rs.getString("birthdate"))
.build();
// @formatter:on
}
}
application.properties:
# MYSQL DB Details
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.platform=mysql
spring.datasource.continue-on-error=false
# SPRING BATCH (BatchProperties)
# Database schema initialization mode.
spring.batch.initialize-schema=always
spring.batch.job.enabled=false
JobConfiguration.java
@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public JdbcPagingItemReader<Customer> pagingItemReader(){
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("select id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
public JdbcBatchItemWriter<Customer> customerItemWriter(){
JdbcBatchItemWriter<Customer> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(this.dataSource);
writer.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)");
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.afterPropertiesSet();
return writer;
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(1000)
.reader(pagingItemReader())
.writer(customerItemWriter())
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
}
MultithreadedStepApplication.java
@SpringBootApplication
@EnableBatchProcessing
public class MultithreadedStepApplication implements CommandLineRunner{
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
public static void main(String[] args) {
SpringApplication.run(MultithreadedStepApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addDate("date", new Date())
.addLong("time",System.currentTimeMillis()).toJobParameters();
JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("STATUS :: "+execution.getStatus());
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>multithreadedStep</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>multithreadedStep</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Please make sure to create sample data into the customer table. This batch job will load data into the new_customer table.
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.1.1.RELEASE)
2018-12-19 22:16:29.507 INFO 4160 --- [ main] c.example.MultithreadedStepApplication : Starting MultithreadedStepApplication on DESKTOP-NQ639DU with PID 4160 (F:\spring-boot-spring-batch-master\Spring-Batch-by-Michael-Minella\multithreadedStep\target\classes started by pc in F:\spring-boot-spring-batch-master\Spring-Batch-by-Michael-Minella\multithreadedStep)
2018-12-19 22:16:29.513 INFO 4160 --- [ main] c.example.MultithreadedStepApplication : No active profile set, falling back to default profiles: default
2018-12-19 22:16:30.610 INFO 4160 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2018-12-19 22:16:30.818 INFO 4160 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2018-12-19 22:16:31.204 INFO 4160 --- [ main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: MYSQL
2018-12-19 22:16:31.386 INFO 4160 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
2018-12-19 22:16:31.591 INFO 4160 --- [ main] c.example.MultithreadedStepApplication : Started MultithreadedStepApplication in 2.498 seconds (JVM running for 3.165)
2018-12-19 22:16:31.931 INFO 4160 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] launched with the following parameters: [{date=1545237991594, time=1545237991595}]
2018-12-19 22:16:32.060 INFO 4160 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
2018-12-19 22:16:32.503 INFO 4160 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] completed with the following parameters: [{date=1545237991594, time=1545237991595}] and the following status: [COMPLETED]
STATUS :: COMPLETED
2018-12-19 22:16:32.518 INFO 4160 --- [ Thread-2] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2018-12-19 22:16:32.530 INFO 4160 --- [ Thread-2] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
mysql> SELECT * FROM test.new_customer;
+----+------------+----------+---------------------+
| id | firstName | lastName | birthdate |
+----+------------+----------+---------------------+
| 1 | John | Doe | 10-10-1952 10:10:10 |
| 2 | Amy | Eugene | 05-07-1985 17:10:00 |
| 3 | Laverne | Mann | 11-12-1988 10:10:10 |
| 4 | Janice | Preston | 19-02-1960 10:10:10 |
| 5 | Pauline | Rios | 29-08-1977 10:10:10 |
| 6 | Perry | Burnside | 10-03-1981 10:10:10 |
| 7 | Todd | Kinsey | 14-12-1998 10:10:10 |
| 8 | Jacqueline | Hyde | 20-03-1983 10:10:10 |
| 9 | Rico | Hale | 10-10-2000 10:10:10 |
| 10 | Samuel | Lamm | 11-11-1999 10:10:10 |
| 11 | Robert | Coster | 10-10-1972 10:10:10 |
| 12 | Tamara | Soler | 02-01-1978 10:10:10 |
| 13 | Justin | Kramer | 19-11-1951 10:10:10 |
| 14 | Andrea | Law | 14-10-1959 10:10:10 |
| 15 | Laura | Porter | 12-12-2010 10:10:10 |
| 16 | Michael | Cantu | 11-04-1999 10:10:10 |
| 17 | Andrew | Thomas | 04-05-1967 10:10:10 |
| 18 | Jose | Hannah | 16-09-1950 10:10:10 |
| 19 | Valerie | Hilbert | 13-06-1966 10:10:10 |
| 20 | Patrick | Durham | 12-10-1978 10:10:10 |
+----+------------+----------+---------------------+
20 rows in set (0.00 sec)
Comments
Post a Comment