Google

Jun 20, 2012

Batch Processing in Java with Spring batch -- part 3

This assumes that you have read the spring batch part 1 & part 2. This is the final part.

Step 1: The annotated Java classes are referenced directly due to following line in the batch-context.xml
  
    <!-- Component-Scan automatically detects annotations in the Java classes.  -->
 <context:component-scan base-package="com.myapp.batch" />

Step 2: You define the batch job as shown below. It can have a number of steps. Some details are omitted to keep it simple.
 

  <batch:job id="availableBalanceJob">
        <batch:listeners>
      <!-- annotated listener using context:component-scan--> 
            <batch:listener ref="appJobExecutionListener" />
            <batch:listener ref="itemFailureLoggerListener" />
        </batch:listeners>
        <batch:step id="step1" parent="batchControlStep">
            <batch:end on="FAILED" />
            <batch:next on="*" to="step2" />
        </batch:step>
        <batch:step id="step2">
            <batch:tasklet>
                <batch:chunk
                    reader="accountReader"
                    processor="availableCashProcessor"
                    writer="availableCashWriter"
                    commit-interval="100"
                    retry-limit="1000000">
     <!-- deadlock retry -->
                    <batch:retryable-exception-classes>
                        <batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
                    </batch:retryable-exception-classes>
                </batch:chunk>
                <batch:listeners>
                    <batch:listener ref="batchItemListener" />
                </batch:listeners>
            </batch:tasklet>
        </batch:step>
    </batch:job>

Step 3: Provide the relevant annotated and referenced class implementations. The listener "appJobExecutionListener" is defined as shown below. As you can see the class is annotated with @Component("appJobExecutionListener").
 

package com.myapp.batch.listener;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.myapp.batch.dao.BatchControlDao;
import com.myapp.batch.domain.BatchControl;

@Component("appJobExecutionListener")
public class AppJobExecutionListener implements JobExecutionListener {

    @Autowired
    private BatchControlDao batchControlDao;

    public void beforeJob(JobExecution arg0) {
    }

    public void afterJob(JobExecution jobExecution) {
        Long batchControlId = (Long) jobExecution.getExecutionContext().get(BatchControl.JOB_KEY_BATCH_CONTROL_ID);
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            batchControlDao.saveJobComplete(batchControlId, BatchStatus.COMPLETED);
            System.out.println("Job completed: " + jobExecution.getJobInstance().getJobName());
        } else if (jobExecution.getStatus() == BatchStatus.FAILED) {
            batchControlDao.saveJobComplete(batchControlId, BatchStatus.FAILED);
            System.out.println("Job failed: " + jobExecution.getJobInstance().getJobName() + " "
                    + jobExecution.getFailureExceptions());
        }
    }

}

The "itemFailureLoggerListener" can be defined in a similar fashion.
 
package com.myapp.batch.listener;

import org.springframework.batch.core.listener.ItemListenerSupport;
import org.springframework.stereotype.Component;

@Component("itemFailureLoggerListener")
public class ItemFailureLoggerListener extends ItemListenerSupport {

  
    public void onReadError(Exception ex) {
        System.out.println("Encountered error on read", ex);
    }

    public void onWriteError(Exception ex, Object item) {
        System.out.println("Encountered error on write", ex);
    }
}

The "accountReader" is the ItemReader that reads the relevant account numbers from the accounts table. It basically gets a list of items (i.e. Accounts) to be processed by the processor.
 

    <bean id="accountReader" class="org.springframework.batch.item.database.JdbcCursorItemReader">
        <property name="dataSource" ref="dataSourceMyDs" />
        <property name="sql">
            <value>
                <![CDATA[
                select a.account_no, a.account_name, a.debit, a.credit
                from accounts a  
                where a.account_no > ?  and
                      a.account_no >= ? and  
                      a.account_no <= ?         
                order by a.account_no
                ]]>
            </value>
        </property>
        <property name="rowMapper" ref="accountRowMapper" />
        <property name="preparedStatementSetter" ref="accountReaderStatementSetter" />
    </bean>

The AccountRowMapper is defined as follows to map the results
 

package com.myapp.batch.rowmapper;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;

import com.myapp.batch..domain.Account;

@Component("accountRowMapper")
public class AccountRowMapper implements RowMapper {
    
    public static final String ACC_NO = "account_no";
    //...
    
    public Account mapRow(ResultSet rs, int rowNum) throws SQLException {
        Account account = new Account();
        account.setAccNo(rs.getInteger(ACC_NO));
        //.....
        return account;
    }

}

The where clause arguments can be supplied as shown below by reading from the "jobExecutionContext"
 

     <bean id="accountReaderStatementSetter" scope="step" class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
        <property name="parameters">
            <list>
                <value>#{jobExecutionContext['lastProcessedAccNo']}</value>
                <value>#{jobExecutionContext['accFrom']}</value>
                <value>#{jobExecutionContext['accTo']}</value>
            </list>
        </property>
    </bean>

 
The availableCashProcessor is the ItemProcessor that calculates the available cash balance. You can delegate this work to a number of processor classes as shown below. It loops through each item (i.e Account)
 

    <bean id="availableCashProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">
        <property name="delegates">
            <list>
                <bean class="com.myapp.batch.processor.CalculateCashValueProcessor" />
            </list>
        </property>
    </bean>
 
The processor class is shown below
 

package com.myapp.batch.processor;

import java.util.ArrayList;
import java.util.List;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component("availableCashProcessor")
public class CalculateValueProcessor implements ItemProcessor<Account, Account> {

 
    //the account read by the "accountReader" is passed in
    public Account process(Account account) {
        //simple and dirty without using BigDecimal
        account.setAvailableCash(account.getCredit() - account.getDebit());
        return account;
    }

}

The "availaavailableCashWriter" is the ItemWriter that writes the results.
 

package com.myapp.batch.writer;

import java.util.List;

import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

//...

@Component("availableCashWriter")
public class AvailableCashWriter implements ItemWriter<Account> {

    private StepExecution stepExecution;

    @Autowired
    private AccountDao accountDao; //injected via spring confih

    @Autowired
    private ModelAccountPortfolioValueDao modelAccountPortValueDao;

    public void write(List<? extends Account> items) throws Exception {
        for (Account account : items) {
            // Save portfolio value for account
            Account readAcc = accountDao.getAccount(account.getAccNo());
            readAcc.setAvalableCash(account.getAvailableCash());
            accountDao.updateAccount(readAcc);

            // Save to step execution context so that it can be promoted to job execution context
            ExecutionContext stepContext = this.stepExecution.getExecutionContext();
            stepContext.put(BatchControl.JOB_KEY_LAST_PROCESSED_ACC_NO, account.getAccNo());
        }
    }


    @BeforeStep
    public void setStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
}

Finally, the batchItemListener that updates the lastProcessedAccNo.
 

package com.myapp.batch.listener;

import java.util.List;

import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.listener.ItemListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.myapp.batch.dao.BatchControlDao;
import com.myapp.batch.domain.BatchControl;

@Component("batchItemListener")
public class ItemBatchListener extends ItemListenerSupport {

    private StepExecution stepExecution;

    @Autowired
    private BatchControlDao batchControlDao;

    public void afterWrite(List items) {
        Long batchId =
                (Long) this.stepExecution.getJobExecution().getExecutionContext().get(
                        BatchControl.JOB_KEY_BATCH_ID);
        String lastProcessedAccNo =
                (String) this.stepExecution.getExecutionContext().get(BatchControl.JOB_KEY_LAST_PROCESSED_ACC_NO);
        batchControlDao.saveLastProcessedAccNo(lastProcessedAccNo, batchId);
    }

    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
}

Labels:

8 Comments:

Blogger Unknown said...

hiii ,

I have a question





#{jobExecutionContext['lastProcessedAccNo']}
#{jobExecutionContext['accFrom']}
#{jobExecutionContext['accTo']}




#{jobExecutionContext['accFrom']} is not working for me , what would be the error??...


Thanks in advance

2:01 AM, May 23, 2013  
Blogger Unknown said...

Have they been added to your step context via something like

stepContext.put(BatchControl.JOB_KEY_LAST_PROCESSED_ACC_NO, account.getAccNo());

2:25 AM, May 23, 2013  
Blogger Unknown said...

Thank you very much for replying , please look at my job configuration file .

























































1




























































#{jobExecutionContext['settlementId]'}
1


















DO YOU THINK I MISSED SOME THING??

7:37 AM, May 23, 2013  
Blogger Unknown said...

in continuation to my previous comment , I have set the settlementId in



package com.ani.pci.settlements;


import java.util.List;

import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemWriter;

import com.anixter.creditcard.so.settlement.SettlementRequestSO;

public class SettlementsItemWriter implements ItemWriter {

private StepExecution stepExecution;




@Override
public void write(List arg0) throws Exception {

System.out.println(" Hello I am in Writer class++++++++++++ " + arg0);
// TODO Auto-generated method stub

ExecutionContext stepContext = this.stepExecution.getExecutionContext();
int settlementId = 7;
SettlementRequestSO s= new SettlementRequestSO();
stepContext.put("settlementId", settlementId);
System.out.println("step Context : "+ stepContext);
//stepContext.put("settlemntRequestSO",s);
}




@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}

7:42 AM, May 23, 2013  
Blogger Unknown said...

yaa , I did it in my program added it step execution context , tried retrieving it from job context in next step writer, I could retrieve it in writer class of step 2 but the problem I couldnt get the jobcontext key in job config file in settleDtlParameterSetter , I have put a debug point at ListPreparedStatementSetter setValues method could see the parameter value as string literal #{jobExecutionContext['settlementId']}, Can you please help me with this problem


































#{jobExecutionContext['settlementId']}
1


1:41 AM, May 24, 2013  
Blogger Unknown said...

This comment has been removed by the author.

11:32 AM, May 25, 2013  
Blogger Unknown said...

look at the spring-batch part 2 for the declaration and references of the "promotionListener". Click on the spring-batch keyword cloud to view all spring-batch posts together.

11:34 AM, May 25, 2013  
Blogger Unknown said...

Can you post the code how to delete record of database from itemwriter in spring batch.I am stuck in this I tried many way but it is not working.

11:41 PM, December 06, 2013  

Post a Comment

Subscribe to Post Comments [Atom]

<< Home