Spring Batch - Local Partition

Before beginning this tutorials, we will request you to take a deep understanding from the 
https://docs.spring.io/spring-batch/trunk/reference/html/scalability.html and https://github.com/hosnibenslama/Spring-batch-local-Partitioning/blob/master/src/main/java/io/spring/batch/configuration/JobConfiguration.java

Partitions Overview:

















Partion Steps:















Create Database tables:

CREATE TABLE `test`.`customer` (
  `id` MEDIUMINT(8) UNSIGNED NOT NULL,
  `firstName` VARCHAR(255) NULL,
  `lastName` VARCHAR(255) NULL,
  `birthdate` VARCHAR(255) NULL,
  PRIMARY KEY (`id`)
  );
  
 CREATE TABLE `test`.`new_customer` (
  `id` MEDIUMINT(8) UNSIGNED NOT NULL,
  `firstName` VARCHAR(255) NULL,
  `lastName` VARCHAR(255) NULL,
  `birthdate` VARCHAR(255) NULL,
  PRIMARY KEY (`id`)
  );


Customer.java

@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthdate;

}

CustomerRowMapper.java

public class CustomerRowMapper implements RowMapper<Customer> {

@Override
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
return Customer.builder().id(rs.getLong("id")).firstName(rs.getString("firstName"))
.lastName(rs.getString("lastName")).birthdate(rs.getString("birthdate")).build();
}

}

application.properties:
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=root

spring.batch.initialize-schema=always


spring.batch.job.enabled=false

ColumnRangePartitioner.java

public class ColumnRangePartitioner implements Partitioner {
private JdbcOperations jdbcTemplate;
private String table;
private String column;

public void setTable(String table) {
this.table = table;
}

public void setColumn(String column) {
this.column = column;
}

public void setDataSource(DataSource dataSource) {
jdbcTemplate = new JdbcTemplate(dataSource);
}

@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") FROM " + table, Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") FROM " + table, Integer.class);
int targetSize = (max - min) / gridSize + 1;

Map<String, ExecutionContext> result = new HashMap<>();
int number = 0;
int start = min;
int end = start + targetSize - 1;

while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);

if(end >= max) {
end = max;
}

value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}

return result;
}

}

JobConfiguration.java

@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
private DataSource dataSource;

@Bean
public ColumnRangePartitioner partitioner() {
ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();
columnRangePartitioner.setColumn("id");
columnRangePartitioner.setDataSource(dataSource);
columnRangePartitioner.setTable("customer");
return columnRangePartitioner;
}

@Bean
@StepScope
public JdbcPagingItemReader<Customer> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minValue,
@Value("#{stepExecutionContext['maxValue']}") Long maxValue) {
System.out.println("reading " + minValue + " to " + maxValue);

Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);
queryProvider.setSortKeys(sortKeys);

JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());
reader.setQueryProvider(queryProvider);

return reader;
}


@Bean
@StepScope
public JdbcBatchItemWriter<Customer> customerItemWriter(){
JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(dataSource);
itemWriter.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)");
itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
itemWriter.afterPropertiesSet();

return itemWriter;
}

// Master
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.partitioner(slaveStep().getName(), partitioner())
.step(slaveStep())
.gridSize(4)
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}

// salve
@Bean
public Step slaveStep() {
return stepBuilderFactory.get("slaveStep")
.<Customer, Customer>chunk(1000)
.reader(pagingItemReader(null, null))
.writer(customerItemWriter())
.build();
}

@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}

}

LocalPartitioningApplication.java

@SpringBootApplication
@EnableBatchProcessing
public class LocalPartitioningApplication implements CommandLineRunner{
@Autowired
private JobLauncher jobLauncher;

@Autowired
private Job job;
public static void main(String[] args) {
SpringApplication.run(LocalPartitioningApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
                .addString("JobId", String.valueOf(System.currentTimeMillis()))
.addDate("date", new Date())
                .addLong("time",System.currentTimeMillis()).toJobParameters();
JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("STATUS :: "+execution.getStatus());
}
}

Sample Data











pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>localPartitioning</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>localPartitioning</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>


MySQL DB Output:

mysql> SELECT * FROM test.new_customer;
+----+------------+----------+---------------------+
| id | firstName  | lastName | birthdate           |
+----+------------+----------+---------------------+
|  1 | John       | Doe      | 10-10-1952 10:10:10 |
|  2 | Amy        | Eugene   | 05-07-1985 17:10:00 |
|  3 | Laverne    | Mann     | 11-12-1988 10:10:10 |
|  4 | Janice     | Preston  | 19-02-1960 10:10:10 |
|  5 | Pauline    | Rios     | 29-08-1977 10:10:10 |
|  6 | Perry      | Burnside | 10-03-1981 10:10:10 |
|  7 | Todd       | Kinsey   | 14-12-1998 10:10:10 |
|  8 | Jacqueline | Hyde     | 20-03-1983 10:10:10 |
|  9 | Rico       | Hale     | 10-10-2000 10:10:10 |
| 10 | Samuel     | Lamm     | 11-11-1999 10:10:10 |
| 11 | Robert     | Coster   | 10-10-1972 10:10:10 |
| 12 | Tamara     | Soler    | 02-01-1978 10:10:10 |
| 13 | Justin     | Kramer   | 19-11-1951 10:10:10 |
| 14 | Andrea     | Law      | 14-10-1959 10:10:10 |
| 15 | Laura      | Porter   | 12-12-2010 10:10:10 |
| 16 | Michael    | Cantu    | 11-04-1999 10:10:10 |
| 17 | Andrew     | Thomas   | 04-05-1967 10:10:10 |
| 18 | Jose       | Hannah   | 16-09-1950 10:10:10 |
| 19 | Valerie    | Hilbert  | 13-06-1966 10:10:10 |
| 20 | Patrick    | Durham   | 12-10-1978 10:10:10 |
+----+------------+----------+---------------------+
20 rows in set (0.00 sec)

Comments

Popular posts from this blog

Install Redis on Oracle Linux 7+