Spring Batch - Async Item Processor Writer
In this example, we'll show you AsyncItemProcessor
Create Database tables:
CREATE TABLE `test`.`customer` (
`id` MEDIUMINT(8) UNSIGNED NOT NULL,
`firstName` VARCHAR(255) NULL,
`lastName` VARCHAR(255) NULL,
`birthdate` VARCHAR(255) NULL,
PRIMARY KEY (`id`)
);
CREATE TABLE `test`.`new_customer` (
`id` MEDIUMINT(8) UNSIGNED NOT NULL,
`firstName` VARCHAR(255) NULL,
`lastName` VARCHAR(255) NULL,
`birthdate` VARCHAR(255) NULL,
PRIMARY KEY (`id`)
);
JobConfiguration.java
@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public JdbcPagingItemReader<Customer> customerPagingItemReader(){
// reading database records using JDBC in a paging fashion
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());
// Sort Keys
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
// MySQL implementation of a PagingQueryProvider using database specific features.
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
public ItemProcessor itemProcessor(){
return new ItemProcessor<Customer, Customer>() {
@Override
public Customer process(Customer item) throws Exception {
Thread.sleep(new Random().nextInt(10));
return Customer.builder().id(item.getId()).firstName(item.getFirstName())
.lastName(item.getLastName()).birthdate(item.getBirthdate()).build();
}
};
}
@Bean
public AsyncItemProcessor asyncItemProcessor() throws Exception{
AsyncItemProcessor<Customer, Customer> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(itemProcessor());
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
asyncItemProcessor.afterPropertiesSet();
return asyncItemProcessor;
}
@Bean
public JdbcBatchItemWriter<Customer> customerItemWriter(){
JdbcBatchItemWriter<Customer> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)");
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.afterPropertiesSet();
return writer;
}
@Bean
public AsyncItemWriter<Customer> asyncItemWriter() throws Exception{
AsyncItemWriter<Customer> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(customerItemWriter());
asyncItemWriter.afterPropertiesSet();
return asyncItemWriter;
}
@SuppressWarnings("unchecked")
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.chunk(1000)
.reader(customerPagingItemReader())
.processor(asyncItemProcessor())
.writer(asyncItemWriter())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
}
In this example, we'll show you AsyncItemProcessor
Create Database tables:
CREATE TABLE `test`.`customer` (
`id` MEDIUMINT(8) UNSIGNED NOT NULL,
`firstName` VARCHAR(255) NULL,
`lastName` VARCHAR(255) NULL,
`birthdate` VARCHAR(255) NULL,
PRIMARY KEY (`id`)
);
CREATE TABLE `test`.`new_customer` (
`id` MEDIUMINT(8) UNSIGNED NOT NULL,
`firstName` VARCHAR(255) NULL,
`lastName` VARCHAR(255) NULL,
`birthdate` VARCHAR(255) NULL,
PRIMARY KEY (`id`)
);
JobConfiguration.java
@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public JdbcPagingItemReader<Customer> customerPagingItemReader(){
// reading database records using JDBC in a paging fashion
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());
// Sort Keys
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
// MySQL implementation of a PagingQueryProvider using database specific features.
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
public ItemProcessor itemProcessor(){
return new ItemProcessor<Customer, Customer>() {
@Override
public Customer process(Customer item) throws Exception {
Thread.sleep(new Random().nextInt(10));
return Customer.builder().id(item.getId()).firstName(item.getFirstName())
.lastName(item.getLastName()).birthdate(item.getBirthdate()).build();
}
};
}
@Bean
public AsyncItemProcessor asyncItemProcessor() throws Exception{
AsyncItemProcessor<Customer, Customer> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(itemProcessor());
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
asyncItemProcessor.afterPropertiesSet();
return asyncItemProcessor;
}
@Bean
public JdbcBatchItemWriter<Customer> customerItemWriter(){
JdbcBatchItemWriter<Customer> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)");
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.afterPropertiesSet();
return writer;
}
@Bean
public AsyncItemWriter<Customer> asyncItemWriter() throws Exception{
AsyncItemWriter<Customer> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(customerItemWriter());
asyncItemWriter.afterPropertiesSet();
return asyncItemWriter;
}
@SuppressWarnings("unchecked")
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.chunk(1000)
.reader(customerPagingItemReader())
.processor(asyncItemProcessor())
.writer(asyncItemWriter())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
}
Customer.java
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthdate;
}
CustomerRowMapper.java
public class CustomerRowMapper implements RowMapper<Customer> {
@Override
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
return Customer.builder().id(rs.getLong("id"))
.firstName(rs.getString("firstName"))
.lastName(rs.getString("lastName"))
.birthdate(rs.getString("birthdate")).build();
}
}
application.properties
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=root
spring.batch.initialize-schema=always
spring.batch.job.enabled=false
AsyncItemProcessorItemWriterApplication.java
@SpringBootApplication
@EnableBatchProcessing
public class AsyncItemProcessorItemWriterApplication implements CommandLineRunner{
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
public static void main(String[] args) {
SpringApplication.run(AsyncItemProcessorItemWriterApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("JobId", String.valueOf(System.currentTimeMillis()))
.addDate("date", new Date())
.addLong("time",System.currentTimeMillis()).toJobParameters();
JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("STATUS :: "+execution.getStatus());
}
}
Sample Data:
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('1', 'John', 'Doe', '10-10-1952 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('2', 'Amy', 'Eugene', '05-07-1985 17:10:00');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('3', 'Laverne', 'Mann', '11-12-1988 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('4', 'Janice', 'Preston', '19-02-1960 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('5', 'Pauline', 'Rios', '29-08-1977 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('6', 'Perry', 'Burnside', '10-03-1981 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('7', 'Todd', 'Kinsey', '14-12-1998 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('8', 'Jacqueline', 'Hyde', '20-03-1983 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('9', 'Rico', 'Hale', '10-10-2000 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('10', 'Samuel', 'Lamm', '11-11-1999 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('11', 'Robert', 'Coster', '10-10-1972 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('12', 'Tamara', 'Soler', '02-01-1978 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('13', 'Justin', 'Kramer', '19-11-1951 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('14', 'Andrea', 'Law', '14-10-1959 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('15', 'Laura', 'Porter', '12-12-2010 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('16', 'Michael', 'Cantu', '11-04-1999 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('17', 'Andrew', 'Thomas', '04-05-1967 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('18', 'Jose', 'Hannah', '16-09-1950 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('19', 'Valerie', 'Hilbert', '13-06-1966 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('20', 'Patrick', 'Durham', '12-10-1978 10:10:10');
Comments
Post a Comment