Spring Batch - Multi-threaded steps

In this example, we will show you very basic use case of SimpleAsyncTaskExecutor.

We would highly recommend you to read "7. Scaling and Parallel Processing" from https://docs.spring.io/spring-batch/trunk/reference/html/scalability.html. I've use SimpleAsyncTaskExecutor with default thread name prefix. 

Create Database tables:

CREATE TABLE `test`.`customer` (
  `id` MEDIUMINT(8) UNSIGNED NOT NULL,
  `firstName` VARCHAR(255) NULL,
  `lastName` VARCHAR(255) NULL,
  `birthdate` VARCHAR(255) NULL,
  PRIMARY KEY (`id`)
  );

CREATE TABLE `test`.`new_customer` (
  `id` MEDIUMINT(8) UNSIGNED NOT NULL,
  `firstName` VARCHAR(255) NULL,
  `lastName` VARCHAR(255) NULL,
  `birthdate` VARCHAR(255) NULL,
  PRIMARY KEY (`id`)

  );





































Customer.java

@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthdate;

}

CustomerRowMapper.java

public class CustomerRowMapper implements RowMapper<Customer> {

@Override
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
//@// @formatter:off
return Customer.builder()
.id(rs.getLong("id"))
.firstName(rs.getString("firstName"))
.lastName(rs.getString("lastName"))
.birthdate(rs.getString("birthdate"))
.build(); 
// @formatter:on
}

}

application.properties:

# MYSQL DB Details
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.platform=mysql
spring.datasource.continue-on-error=false

# SPRING BATCH (BatchProperties)
# Database schema initialization mode.
spring.batch.initialize-schema=always

spring.batch.job.enabled=false

JobConfiguration.java

@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
private DataSource dataSource;

@Bean
public JdbcPagingItemReader<Customer> pagingItemReader(){
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());

Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("select id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setSortKeys(sortKeys);

reader.setQueryProvider(queryProvider);

return reader;
}

@Bean
public JdbcBatchItemWriter<Customer> customerItemWriter(){
JdbcBatchItemWriter<Customer> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(this.dataSource);
writer.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)");
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.afterPropertiesSet();

return writer;
}

@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(1000)
.reader(pagingItemReader())
.writer(customerItemWriter())
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}

@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
}


MultithreadedStepApplication.java

@SpringBootApplication
@EnableBatchProcessing
public class MultithreadedStepApplication implements CommandLineRunner{
@Autowired
private JobLauncher jobLauncher;

@Autowired
private Job job;

public static void main(String[] args) {
SpringApplication.run(MultithreadedStepApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
                .addDate("date", new Date())
                .addLong("time",System.currentTimeMillis()).toJobParameters();

JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("STATUS :: "+execution.getStatus());
}
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>multithreadedStep</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>multithreadedStep</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

Please make sure to create sample data into the customer table. This batch job will load data into the new_customer table.

















  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.1.RELEASE)

2018-12-19 22:16:29.507  INFO 4160 --- [           main] c.example.MultithreadedStepApplication   : Starting MultithreadedStepApplication on DESKTOP-NQ639DU with PID 4160 (F:\spring-boot-spring-batch-master\Spring-Batch-by-Michael-Minella\multithreadedStep\target\classes started by pc in F:\spring-boot-spring-batch-master\Spring-Batch-by-Michael-Minella\multithreadedStep)
2018-12-19 22:16:29.513  INFO 4160 --- [           main] c.example.MultithreadedStepApplication   : No active profile set, falling back to default profiles: default
2018-12-19 22:16:30.610  INFO 4160 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2018-12-19 22:16:30.818  INFO 4160 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2018-12-19 22:16:31.204  INFO 4160 --- [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: MYSQL
2018-12-19 22:16:31.386  INFO 4160 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2018-12-19 22:16:31.591  INFO 4160 --- [           main] c.example.MultithreadedStepApplication   : Started MultithreadedStepApplication in 2.498 seconds (JVM running for 3.165)
2018-12-19 22:16:31.931  INFO 4160 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] launched with the following parameters: [{date=1545237991594, time=1545237991595}]
2018-12-19 22:16:32.060  INFO 4160 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
2018-12-19 22:16:32.503  INFO 4160 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job]] completed with the following parameters: [{date=1545237991594, time=1545237991595}] and the following status: [COMPLETED]
STATUS :: COMPLETED
2018-12-19 22:16:32.518  INFO 4160 --- [       Thread-2] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2018-12-19 22:16:32.530  INFO 4160 --- [       Thread-2] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.



mysql> SELECT * FROM test.new_customer;
+----+------------+----------+---------------------+
| id | firstName  | lastName | birthdate           |
+----+------------+----------+---------------------+
|  1 | John       | Doe      | 10-10-1952 10:10:10 |
|  2 | Amy        | Eugene   | 05-07-1985 17:10:00 |
|  3 | Laverne    | Mann     | 11-12-1988 10:10:10 |
|  4 | Janice     | Preston  | 19-02-1960 10:10:10 |
|  5 | Pauline    | Rios     | 29-08-1977 10:10:10 |
|  6 | Perry      | Burnside | 10-03-1981 10:10:10 |
|  7 | Todd       | Kinsey   | 14-12-1998 10:10:10 |
|  8 | Jacqueline | Hyde     | 20-03-1983 10:10:10 |
|  9 | Rico       | Hale     | 10-10-2000 10:10:10 |
| 10 | Samuel     | Lamm     | 11-11-1999 10:10:10 |
| 11 | Robert     | Coster   | 10-10-1972 10:10:10 |
| 12 | Tamara     | Soler    | 02-01-1978 10:10:10 |
| 13 | Justin     | Kramer   | 19-11-1951 10:10:10 |
| 14 | Andrea     | Law      | 14-10-1959 10:10:10 |
| 15 | Laura      | Porter   | 12-12-2010 10:10:10 |
| 16 | Michael    | Cantu    | 11-04-1999 10:10:10 |
| 17 | Andrew     | Thomas   | 04-05-1967 10:10:10 |
| 18 | Jose       | Hannah   | 16-09-1950 10:10:10 |
| 19 | Valerie    | Hilbert  | 13-06-1966 10:10:10 |
| 20 | Patrick    | Durham   | 12-10-1978 10:10:10 |
+----+------------+----------+---------------------+
20 rows in set (0.00 sec)















Comments

Popular posts from this blog

Install Redis on Oracle Linux 7+