Wednesday, 24 December 2014

Partitioning In Spring Batch

Introduction

As in a batch job huge no of records are processed, it faces performance bottlenecks sooner than later. Different performance bottlenecks that can be cited during batch job execution are long execution period, OutOfMemoryError, bad records which abort the job immediately etc. So while writing a real-time batch job, we have to be utmost careful and take these parameters into consideration. That will help us scale up the batch job properly.

Though there are various ways to scale up a batch job, here I shall show you how to scale up a Spring Batch job using local partitioning. Partitioning is a mechanism of dividing a step into multiple threads where each thread executes a chunk of data in parallel. Here original step is called Master while steps that execute through threads are called Slaves. Some programmers refer to it as Master-Slave approach. Again partitioning can be of two types, local and remote. Remote partitioning is beyond the scope of today's discussion.




Example

This post is in continuation to my previous post Springing Into Batch Job. So I will take up the same example that was discussed there.

In between the apparel company ABC Ltd. has opened two offices in New Delhi and Kolkata to expand its business. Employee strength in these branch offices is also same as in head office, Mumbai. At the end of the financial year, branch offices send the payroll info of their employees in a flat or .CSV file to head office for tax computation. So Finance Department of Mumbai office now has to scale up their existing batch job 'taxCalculator'. Let us see how it can be done.

Softwares / Tools

1. Spring 4.1.0
2. Spring Batch 3.0.1
3. Hibernate 4.3.6
4. MySQL 5.5
5. JDK 8
6. Eclipse Luna 4.4.1
7. Maven 3.0

Steps

  • Create a Maven project in eclipse by selecting File -> New -> Maven Project. Name the project as 'PartitionedBatch'. The project structure of PartitionedBatch will look like the figure given below.
Maven Project Structure of the PartitionedBatch Application
  • Modify the POM file generated as follows.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com</groupId>
  <artifactId>PartitionedBatch</artifactId>
  <version>0.0.1-SNAPSHOT</version>
 
  <properties>
         <spring-framework.version>4.1.0.RELEASE</spring-framework.version>
        <spring.batch.version>3.0.1.RELEASE</spring.batch.version>
        <hibernate.version>4.3.6.Final</hibernate.version>
        <mysql.driver.version>5.1.25</mysql.driver.version>
    </properties>
   
    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring-framework.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-orm</artifactId>
            <version>${spring-framework.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-core</artifactId>
            <version>${spring.batch.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.driver.version}</version>
        </dependency>
        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-entitymanager</artifactId>
            <version>${hibernate.version}</version>
        </dependency>
    </dependencies>   
</project>
  • Below is given the most important configuration file, job.xml which is located in '/main/resources/config' folder.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd">
   
    <import resource="context.xml" />
       
    <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
    <batch:job id="taxCalculator">
        <batch:step id="master">
            <partition step="slave" partitioner="batchPartitioner">
                <handler grid-size="3" task-executor="taskExecutor" />
            </partition>
        </batch:step>       
    </batch:job>
    <batch:step id="slave">
        <batch:tasklet>
            <batch:chunk reader="fileReader" writer="dbWriter"
                              processor="itemProcessor" commit-interval="5">
            </batch:chunk>
        </batch:tasklet>
      </batch:step>

    <bean id="batchPartitioner" class="com.
programnplay.batch.BatchPartitioner"/>
    <bean id="fileReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
        <property name="resource" value="classpath:input/#{stepExecutionContext[file]}.csv" />
         <property name="lineMapper" ref="lineMapper"/>
     </bean>
    
     <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
             <property name="lineTokenizer">
                <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                    <property name="names" value="id,name,salary" />
                </bean>
            </property>
            <property name="fieldSetMapper">
                <bean class="com.
programnplay.batch.EmployeeMapper" />
            </property>
    </bean>
   
    <bean id="itemProcessor" class="com.
programnplay.batch.TaxProcessor" />
    <bean id="dbWriter" class="org.springframework.batch.item.database.HibernateItemWriter">
        <property name="sessionFactory" ref="sessionFactory"/>
    </bean>
</beans>


As shown above, now the job 'taxCalculator' consists of a 'master' step which is further divided into several 'slave' steps using partitioner  'batchPartitioner'. This batch partitioner is a bean class which implements Spring batch interface org.springframework.batch.core.partition.support.Partitioner. Each  slave step then reads a file, processes records and writes to the database using chunk based processing where chunk size(5 here) is specified by the commit-interval attribute.

A partition is handled by a separate thread of execution and the no of threads created to handle all the partitions is determined by grid-size attribute of handler tag. We have taken the grid size here as 3 as we have 3 flat files or .CSV files to read, process and write and we want to do it by 3 different threads. Each flat file contains salary info of all the employees of a particular location in the following format. They are placed in '/main/resources/input' folder.

001,Rajiba,200000
002,Rahul,340000
.............................
.............................
999,Mark,100000
1000,Nancy,100000

Readers may note that we have put the bean 'fileReader' into 'step' scope. Reason is that each thread uses different reader for batch processing and we have to inject the resource to be read by the thread dynamically. This is done using Spring Expression Language(SPEL), #{stepExecutionContext['key']} where 'key' is the attribute to be retrieved. Actually the expression #{stepExecutionContext[file]} here finds out the file stored in the execution context of a partition.

The content of context.xml which is imported by job.xml is given below.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
                        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsd">

    <bean id="jobRepository"
        class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
        <property name="dataSource" ref="dataSource" />
        <property name="transactionManager" ref="transactionManager" />
        <property name="databaseType" value="mysql" />
    </bean>

    <bean id="jobLauncher"
        class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
    </bean>
   
    <bean id="transactionManager"
        class="org.springframework.orm.hibernate4.HibernateTransactionManager" >
        <property name="sessionFactory" ref="sessionFactory"/>
    </bean>
   
    <bean id="dataSource"
        class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <property name="driverClassName" value="com.mysql.jdbc.Driver" />
        <property name="url" value="jdbc:mysql://localhost:3306/test" />
        <property name="username" value="abc" />
        <property name="password" value="abc" />
    </bean>
   
    <bean id="sessionFactory" class="org.springframework.orm.hibernate4.LocalSessionFactoryBean">
        <property name="dataSource" ref="dataSource" />
        <property name="annotatedClasses">
            <list>
                <value>com.
programnplay.batch.Employee</value>
            </list>
        </property>
    </bean>

    <jdbc:initialize-database data-source="dataSource">
        <jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql" />
        <jdbc:script location="org/springframework/batch/core/schema-mysql.sql" />
    </jdbc:initialize-database>

</beans>

  • Following is the partitioner class BatchPartitioner that we have referred in job.xml file.

package com.programnplay.batch;

import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;

public class BatchPartitioner implements Partitioner {

    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
        for (int i = 1; i <= gridSize; i++) {   
            ExecutionContext exContext = new ExecutionContext();
            System.out.println("Starting : Thread" + i);
            exContext.put("name", "Thread"+i);
            exContext.put("file", "salary"+i);
            result.put("partition" + i, exContext);
        }
        return result;
    }

}


This class implements org.springframework.batch.core.partition.support.Partitioner interface by overriding its partition() method. In this method, we create execution context for each thread and store values specific to that context. Later on we store these execution contexts in a map referred by unique strings and return the map. If you observe this class carefully, you will find that we have stored 'file' attribute for each thread in separate execution contexts. And this attribute we retrieve it using Spring Expression Language #{stepExecutionContext[file]} and inject to the 'fileReader' in job.xml.
  • Next we will write Employee class.
package com.programnplay.batch;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;

@Entity
public class Employee {
  
    @Id
    @Column(name="id")
    private int eid;
    private String name;
    private int salary;
    private int tax;
  
    public int getEid() {
        return eid;
    }
    public void setEid(int eid) {
        this.eid = eid;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getSalary() {
        return salary;
    }
    public void setSalary(int salary) {
        this.salary = salary;
    }
    public int getTax() {
        return tax;
    }
    public void setTax(int tax) {
        this.tax = tax;
    }
}

  • The mapper class, EmployeeMapper used to map the contents of flat file and Employee entity is given below.
package com.programnplay.batch;

import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

public class EmployeeMapper implements FieldSetMapper<Employee> {

    public Employee mapFieldSet(FieldSet fieldSet) throws BindException {
        Employee emp = new Employee();
        emp.setEid(fieldSet.readInt(0));
        emp.setName(fieldSet.readString(1));
        emp.setSalary(fieldSet.readInt(2));
        return emp;
    }

}

  • Let us write now the class TaxProcessor. This class processes employee info and calculates his/her tax.
package com.programnplay.batch;

import org.springframework.batch.item.ItemProcessor;

public class TaxProcessor implements ItemProcessor<Employee, Employee> {

    public Employee process(Employee emp) throws Exception {
        int sal, tax;
        System.out.println("Calculating tax of  " + emp.getName());
        sal = emp.getSalary();
        System.out.println("Salary="+sal);
        if(sal > 0 && sal <= 300000)
            tax = (int)(0.1 * sal);
        else if(sal > 300000 && sal <= 500000)
            tax = (int)(0.2 * sal);
        else
            tax = (int)(0.3 * sal);       
        System.out.println("Setting tax of  "+emp.getName()+" to "+tax);
        emp.setTax(tax);   
        return emp;
    }
}


For more details on last three classes Employee, EmployeeMapper and TaxProcessor, readers are requested to go through my previous post Springing Into Batch Job.
  • Now let us make database ready by creating an Employee table with the following structure in our MySQL database.
Field   Type         Null      Key     
------   ---------      -------   ------
Id        int(3)            NO      PRI         
Name  varchar(30)  YES             

Salary  int(8)           YES  
Tax       int(5)           YES 

With this, we have partitioned our batch job 'taxCalculator' to calculate the taxes of employees of branch offices also.
  • Following is the client class to run above batch job.
package com.programnplay;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class MainApp {

    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("config/job.xml");
        JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
        Job job = (Job) context.getBean("taxCalculator");
       
        try {
            JobExecution execution = jobLauncher.run(job, new JobParameters());
            System.out.println("Exit Status : " + execution.getStatus());
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Done");
    }


}

Now run the client MainApp and ensure that it finishes execution with Exit Status as COMPLETED. If you verify the database, you will find Employee table updated with employee records from all the .CSV files that were there in '/main/resources/input' folder. All this happens in a few seconds with few lines of code, thanks to Spring Batch Partitioning.


Sample Output Showing Partitions Created

Hope you liked this post. Wish you a merry Xmas and Happy New Year.

2 comments:

  1. Hi Rajiba Lochan Dash,

    I have come across your spring batch posts and found them useful. I am looking for an example of reading data from flat file and loading into parent - child relationship tables of database. I am facing challenges in extracing and setting auto generated primary key value of Parent table and pass it onto Child table for maintaining foreign key constraint.

    Please share me if you have any examples meeting above needs.

    thanks.

    ReplyDelete
    Replies
    1. Hi Venkki
      Can you provide the sample of flat file data and your table structure?

      Delete