Spring Batch - Local Partition
Before beginning this tutorials, we will request you to take a deep understanding from the
https://docs.spring.io/spring-batch/trunk/reference/html/scalability.html and https://github.com/hosnibenslama/Spring-batch-local-Partitioning/blob/master/src/main/java/io/spring/batch/configuration/JobConfiguration.java
Partitions Overview:
Partion Steps:
Create Database tables:
CREATE TABLE `test`.`customer` (
`id` MEDIUMINT(8) UNSIGNED NOT NULL,
`firstName` VARCHAR(255) NULL,
`lastName` VARCHAR(255) NULL,
`birthdate` VARCHAR(255) NULL,
PRIMARY KEY (`id`)
);
CREATE TABLE `test`.`new_customer` (
`id` MEDIUMINT(8) UNSIGNED NOT NULL,
`firstName` VARCHAR(255) NULL,
`lastName` VARCHAR(255) NULL,
`birthdate` VARCHAR(255) NULL,
PRIMARY KEY (`id`)
);
Customer.java
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthdate;
}
CustomerRowMapper.java
public class CustomerRowMapper implements RowMapper<Customer> {
@Override
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
return Customer.builder().id(rs.getLong("id")).firstName(rs.getString("firstName"))
.lastName(rs.getString("lastName")).birthdate(rs.getString("birthdate")).build();
}
}
application.properties:
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=root
spring.batch.initialize-schema=always
spring.batch.job.enabled=false
ColumnRangePartitioner.java
public class ColumnRangePartitioner implements Partitioner {
private JdbcOperations jdbcTemplate;
private String table;
private String column;
public void setTable(String table) {
this.table = table;
}
public void setColumn(String column) {
this.column = column;
}
public void setDataSource(DataSource dataSource) {
jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") FROM " + table, Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") FROM " + table, Integer.class);
int targetSize = (max - min) / gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if(end >= max) {
end = max;
}
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
}
}
JobConfiguration.java
@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public ColumnRangePartitioner partitioner() {
ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();
columnRangePartitioner.setColumn("id");
columnRangePartitioner.setDataSource(dataSource);
columnRangePartitioner.setTable("customer");
return columnRangePartitioner;
}
@Bean
@StepScope
public JdbcPagingItemReader<Customer> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minValue,
@Value("#{stepExecutionContext['maxValue']}") Long maxValue) {
System.out.println("reading " + minValue + " to " + maxValue);
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);
queryProvider.setSortKeys(sortKeys);
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
@StepScope
public JdbcBatchItemWriter<Customer> customerItemWriter(){
JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(dataSource);
itemWriter.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)");
itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
itemWriter.afterPropertiesSet();
return itemWriter;
}
// Master
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.partitioner(slaveStep().getName(), partitioner())
.step(slaveStep())
.gridSize(4)
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
// salve
@Bean
public Step slaveStep() {
return stepBuilderFactory.get("slaveStep")
.<Customer, Customer>chunk(1000)
.reader(pagingItemReader(null, null))
.writer(customerItemWriter())
.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>localPartitioning</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>localPartitioning</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Before beginning this tutorials, we will request you to take a deep understanding from the
https://docs.spring.io/spring-batch/trunk/reference/html/scalability.html and https://github.com/hosnibenslama/Spring-batch-local-Partitioning/blob/master/src/main/java/io/spring/batch/configuration/JobConfiguration.java
Partitions Overview:
Partion Steps:
Create Database tables:
CREATE TABLE `test`.`customer` (
`id` MEDIUMINT(8) UNSIGNED NOT NULL,
`firstName` VARCHAR(255) NULL,
`lastName` VARCHAR(255) NULL,
`birthdate` VARCHAR(255) NULL,
PRIMARY KEY (`id`)
);
CREATE TABLE `test`.`new_customer` (
`id` MEDIUMINT(8) UNSIGNED NOT NULL,
`firstName` VARCHAR(255) NULL,
`lastName` VARCHAR(255) NULL,
`birthdate` VARCHAR(255) NULL,
PRIMARY KEY (`id`)
);
Customer.java
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthdate;
}
CustomerRowMapper.java
public class CustomerRowMapper implements RowMapper<Customer> {
@Override
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
return Customer.builder().id(rs.getLong("id")).firstName(rs.getString("firstName"))
.lastName(rs.getString("lastName")).birthdate(rs.getString("birthdate")).build();
}
}
application.properties:
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=root
spring.batch.initialize-schema=always
spring.batch.job.enabled=false
ColumnRangePartitioner.java
public class ColumnRangePartitioner implements Partitioner {
private JdbcOperations jdbcTemplate;
private String table;
private String column;
public void setTable(String table) {
this.table = table;
}
public void setColumn(String column) {
this.column = column;
}
public void setDataSource(DataSource dataSource) {
jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") FROM " + table, Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") FROM " + table, Integer.class);
int targetSize = (max - min) / gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if(end >= max) {
end = max;
}
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
}
}
JobConfiguration.java
@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public ColumnRangePartitioner partitioner() {
ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();
columnRangePartitioner.setColumn("id");
columnRangePartitioner.setDataSource(dataSource);
columnRangePartitioner.setTable("customer");
return columnRangePartitioner;
}
@Bean
@StepScope
public JdbcPagingItemReader<Customer> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minValue,
@Value("#{stepExecutionContext['maxValue']}") Long maxValue) {
System.out.println("reading " + minValue + " to " + maxValue);
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);
queryProvider.setSortKeys(sortKeys);
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
@StepScope
public JdbcBatchItemWriter<Customer> customerItemWriter(){
JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(dataSource);
itemWriter.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)");
itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
itemWriter.afterPropertiesSet();
return itemWriter;
}
// Master
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.partitioner(slaveStep().getName(), partitioner())
.step(slaveStep())
.gridSize(4)
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
// salve
@Bean
public Step slaveStep() {
return stepBuilderFactory.get("slaveStep")
.<Customer, Customer>chunk(1000)
.reader(pagingItemReader(null, null))
.writer(customerItemWriter())
.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
}
LocalPartitioningApplication.java
@SpringBootApplication
@EnableBatchProcessing
public class LocalPartitioningApplication implements CommandLineRunner{
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
public static void main(String[] args) {
SpringApplication.run(LocalPartitioningApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("JobId", String.valueOf(System.currentTimeMillis()))
.addDate("date", new Date())
.addLong("time",System.currentTimeMillis()).toJobParameters();
JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("STATUS :: "+execution.getStatus());
}
}
Sample Data
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>localPartitioning</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>localPartitioning</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
MySQL DB Output:
mysql> SELECT * FROM test.new_customer;
+----+------------+----------+---------------------+
| id | firstName | lastName | birthdate |
+----+------------+----------+---------------------+
| 1 | John | Doe | 10-10-1952 10:10:10 |
| 2 | Amy | Eugene | 05-07-1985 17:10:00 |
| 3 | Laverne | Mann | 11-12-1988 10:10:10 |
| 4 | Janice | Preston | 19-02-1960 10:10:10 |
| 5 | Pauline | Rios | 29-08-1977 10:10:10 |
| 6 | Perry | Burnside | 10-03-1981 10:10:10 |
| 7 | Todd | Kinsey | 14-12-1998 10:10:10 |
| 8 | Jacqueline | Hyde | 20-03-1983 10:10:10 |
| 9 | Rico | Hale | 10-10-2000 10:10:10 |
| 10 | Samuel | Lamm | 11-11-1999 10:10:10 |
| 11 | Robert | Coster | 10-10-1972 10:10:10 |
| 12 | Tamara | Soler | 02-01-1978 10:10:10 |
| 13 | Justin | Kramer | 19-11-1951 10:10:10 |
| 14 | Andrea | Law | 14-10-1959 10:10:10 |
| 15 | Laura | Porter | 12-12-2010 10:10:10 |
| 16 | Michael | Cantu | 11-04-1999 10:10:10 |
| 17 | Andrew | Thomas | 04-05-1967 10:10:10 |
| 18 | Jose | Hannah | 16-09-1950 10:10:10 |
| 19 | Valerie | Hilbert | 13-06-1966 10:10:10 |
| 20 | Patrick | Durham | 12-10-1978 10:10:10 |
+----+------------+----------+---------------------+
20 rows in set (0.00 sec)
Comments
Post a Comment