Spring batch - classifier example

We would strongly recommend to go though Spring docs, Batch API (https://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/item/support/ClassifierCompositeItemWriter.html) and https://gist.github.com/benas/bfe2be7386b99ce496425fac9ff35fb8.

Classifier Classify the given object and return an object of a different type, possibly an enumerated type. Interface for a classifier. At its simplest a is just a map from objects of one type to objects of another type. Note that implementations can only be serializable if the parameter types are themselves serializable.

Project structure:
























create database

CREATE TABLE `test`.`customer` (
  `id` MEDIUMINT(8) UNSIGNED NOT NULL AUTO_INCREMENT,
  `firstName` VARCHAR(255) NULL,
  `lastName` VARCHAR(255) NULL,
  `birthdate` VARCHAR(255) NULL,
  PRIMARY KEY (`id`)

  ) AUTO_INCREMENT=1;


Customer.java

@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthdate;

}

application.properties

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=root
spring.batch.initialize-schema=always

CustomerRowMapper.java

public class CustomerRowMapper implements RowMapper<Customer> {
@Override
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
return Customer.builder().id(rs.getLong("id"))
.firstName(rs.getString("firstName"))
.lastName(rs.getString("lastName"))
.birthdate(rs.getString("birthdate")).build();
}

}

CustomerClassifier.java

public class CustomerClassifier implements Classifier<Customer, ItemWriter<? super Customer>>{
private static final long serialVersionUID = 1L;

private ItemWriter<Customer> evenItemWriter;
private ItemWriter<Customer> oddItemWriter;

public CustomerClassifier(ItemWriter<Customer> evenItemWriter, ItemWriter<Customer> oddItemWriter) {
this.evenItemWriter = evenItemWriter;
this.oddItemWriter = oddItemWriter;
}

@Override
public ItemWriter<? super Customer> classify(Customer customer) {
return customer.getId() % 2 == 0 ? evenItemWriter : oddItemWriter;
}

}

CustomLineAggregator.java

public class CustomLineAggregator implements LineAggregator<Customer> {
private ObjectMapper objectMapper = new ObjectMapper();

@Override
public String aggregate(Customer item) {
try {
return objectMapper.writeValueAsString(item);
} catch (Exception e) {
throw new RuntimeException("Unable to serialize Customer", e);
}
}

}

ClassifierWrittingMultipleDestinationsApplication.java

@SpringBootApplication
@EnableBatchProcessing
public class ClassifierWrittingMultipleDestinationsApplication implements CommandLineRunner {
@Autowired
private JobLauncher jobLauncher;

@Autowired
private Job job;

public static void main(String[] args) {
SpringApplication.run(ClassifierWrittingMultipleDestinationsApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
                .addString("JobId", String.valueOf(System.currentTimeMillis()))
.addDate("date", new Date())
                .addLong("time",System.currentTimeMillis()).toJobParameters();

JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("STATUS :: "+execution.getStatus());
}

}

JobConfiguration.java
@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public JdbcPagingItemReader<Customer> customerPagingItemReader(){
// reading database records using JDBC in a paging fashion
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());
// Sort Keys
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
// MySQL implementation of a PagingQueryProvider using database specific features.
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
public FlatFileItemWriter<Customer> jsonItemWriter() throws Exception{
String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
System.out.println(">> Output Path = "+customerOutputPath);
FlatFileItemWriter<Customer> writer = new FlatFileItemWriter<>();
writer.setLineAggregator(new CustomLineAggregator());
writer.setResource(new FileSystemResource(customerOutputPath));
writer.afterPropertiesSet();
return writer;
}
@Bean
public StaxEventItemWriter<Customer> xmlItemWriter() throws Exception{
String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
System.out.println(">> Output Path = "+customerOutputPath);
Map<String, Class> aliases = new HashMap<>();
aliases.put("customer", Customer.class);
XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
// StAX and Marshaller for serializing object to XML. 
StaxEventItemWriter<Customer> writer = new StaxEventItemWriter<>();
writer.setRootTagName("customers");
writer.setMarshaller(marshaller);
writer.setResource(new FileSystemResource(customerOutputPath));
writer.afterPropertiesSet();
return writer;
}
@Bean
public ClassifierCompositeItemWriter<Customer> classifierCustomerCompositeItemWriter() throws Exception{
ClassifierCompositeItemWriter<Customer> compositeItemWriter = new ClassifierCompositeItemWriter<>();
compositeItemWriter.setClassifier(new CustomerClassifier(xmlItemWriter(), jsonItemWriter()));
return compositeItemWriter;
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer> chunk(10)
.reader(customerPagingItemReader())
.writer(classifierCustomerCompositeItemWriter())
.stream(xmlItemWriter())
.stream(jsonItemWriter())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>classifierWrittingMultipleDestinations</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>classifierWrittingMultipleDestinations</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!-- Spring OXM -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<version>1.4.7</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>







Comments

Popular posts from this blog

Install Redis on Oracle Linux 7+