Batch Processing in Java with Spring batch -- part 3
This assumes that you have read the spring batch part 1 & part 2. This is the final part.
Step 1: The annotated Java classes are referenced directly due to following line in the batch-context.xml
<!-- Component-Scan automatically detects annotations in the Java classes. --> <context:component-scan base-package="com.myapp.batch" />
Step 2: You define the batch job as shown below. It can have a number of steps. Some details are omitted to keep it simple.
<batch:job id="availableBalanceJob"> <batch:listeners> <!-- annotated listener using context:component-scan--> <batch:listener ref="appJobExecutionListener" /> <batch:listener ref="itemFailureLoggerListener" /> </batch:listeners> <batch:step id="step1" parent="batchControlStep"> <batch:end on="FAILED" /> <batch:next on="*" to="step2" /> </batch:step> <batch:step id="step2"> <batch:tasklet> <batch:chunk reader="accountReader" processor="availableCashProcessor" writer="availableCashWriter" commit-interval="100" retry-limit="1000000"> <!-- deadlock retry --> <batch:retryable-exception-classes> <batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/> </batch:retryable-exception-classes> </batch:chunk> <batch:listeners> <batch:listener ref="batchItemListener" /> </batch:listeners> </batch:tasklet> </batch:step> </batch:job>Step 3: Provide the relevant annotated and referenced class implementations. The listener "appJobExecutionListener" is defined as shown below. As you can see the class is annotated with @Component("appJobExecutionListener").
package com.myapp.batch.listener; import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.myapp.batch.dao.BatchControlDao; import com.myapp.batch.domain.BatchControl; @Component("appJobExecutionListener") public class AppJobExecutionListener implements JobExecutionListener { @Autowired private BatchControlDao batchControlDao; public void beforeJob(JobExecution arg0) { } public void afterJob(JobExecution jobExecution) { Long batchControlId = (Long) jobExecution.getExecutionContext().get(BatchControl.JOB_KEY_BATCH_CONTROL_ID); if (jobExecution.getStatus() == BatchStatus.COMPLETED) { batchControlDao.saveJobComplete(batchControlId, BatchStatus.COMPLETED); System.out.println("Job completed: " + jobExecution.getJobInstance().getJobName()); } else if (jobExecution.getStatus() == BatchStatus.FAILED) { batchControlDao.saveJobComplete(batchControlId, BatchStatus.FAILED); System.out.println("Job failed: " + jobExecution.getJobInstance().getJobName() + " " + jobExecution.getFailureExceptions()); } } }The "itemFailureLoggerListener" can be defined in a similar fashion.
package com.myapp.batch.listener; import org.springframework.batch.core.listener.ItemListenerSupport; import org.springframework.stereotype.Component; @Component("itemFailureLoggerListener") public class ItemFailureLoggerListener extends ItemListenerSupport { public void onReadError(Exception ex) { System.out.println("Encountered error on read", ex); } public void onWriteError(Exception ex, Object item) { System.out.println("Encountered error on write", ex); } }The "accountReader" is the ItemReader that reads the relevant account numbers from the accounts table. It basically gets a list of items (i.e. Accounts) to be processed by the processor.
<bean id="accountReader" class="org.springframework.batch.item.database.JdbcCursorItemReader"> <property name="dataSource" ref="dataSourceMyDs" /> <property name="sql"> <value> <![CDATA[ select a.account_no, a.account_name, a.debit, a.credit from accounts a where a.account_no > ? and a.account_no >= ? and a.account_no <= ? order by a.account_no ]]> </value> </property> <property name="rowMapper" ref="accountRowMapper" /> <property name="preparedStatementSetter" ref="accountReaderStatementSetter" /> </bean>The AccountRowMapper is defined as follows to map the results
package com.myapp.batch.rowmapper; import java.sql.ResultSet; import java.sql.SQLException; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Component; import com.myapp.batch..domain.Account; @Component("accountRowMapper") public class AccountRowMapper implements RowMapper { public static final String ACC_NO = "account_no"; //... public Account mapRow(ResultSet rs, int rowNum) throws SQLException { Account account = new Account(); account.setAccNo(rs.getInteger(ACC_NO)); //..... return account; } }The where clause arguments can be supplied as shown below by reading from the "jobExecutionContext"
<bean id="accountReaderStatementSetter" scope="step" class="org.springframework.batch.core.resource.ListPreparedStatementSetter"> <property name="parameters"> <list> <value>#{jobExecutionContext['lastProcessedAccNo']}</value> <value>#{jobExecutionContext['accFrom']}</value> <value>#{jobExecutionContext['accTo']}</value> </list> </property> </bean>The availableCashProcessor is the ItemProcessor that calculates the available cash balance. You can delegate this work to a number of processor classes as shown below. It loops through each item (i.e Account)
<bean id="availableCashProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor"> <property name="delegates"> <list> <bean class="com.myapp.batch.processor.CalculateCashValueProcessor" /> </list> </property> </bean>The processor class is shown below
package com.myapp.batch.processor; import java.util.ArrayList; import java.util.List; import org.springframework.batch.item.ItemProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component("availableCashProcessor") public class CalculateValueProcessor implements ItemProcessor<Account, Account> { //the account read by the "accountReader" is passed in public Account process(Account account) { //simple and dirty without using BigDecimal account.setAvailableCash(account.getCredit() - account.getDebit()); return account; } }The "availaavailableCashWriter" is the ItemWriter that writes the results.
package com.myapp.batch.writer; import java.util.List; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.annotation.BeforeStep; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; //... @Component("availableCashWriter") public class AvailableCashWriter implements ItemWriter<Account> { private StepExecution stepExecution; @Autowired private AccountDao accountDao; //injected via spring confih @Autowired private ModelAccountPortfolioValueDao modelAccountPortValueDao; public void write(List<? extends Account> items) throws Exception { for (Account account : items) { // Save portfolio value for account Account readAcc = accountDao.getAccount(account.getAccNo()); readAcc.setAvalableCash(account.getAvailableCash()); accountDao.updateAccount(readAcc); // Save to step execution context so that it can be promoted to job execution context ExecutionContext stepContext = this.stepExecution.getExecutionContext(); stepContext.put(BatchControl.JOB_KEY_LAST_PROCESSED_ACC_NO, account.getAccNo()); } } @BeforeStep public void setStepExecution(StepExecution stepExecution) { this.stepExecution = stepExecution; } }Finally, the batchItemListener that updates the lastProcessedAccNo.
package com.myapp.batch.listener; import java.util.List; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.annotation.BeforeStep; import org.springframework.batch.core.listener.ItemListenerSupport; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.myapp.batch.dao.BatchControlDao; import com.myapp.batch.domain.BatchControl; @Component("batchItemListener") public class ItemBatchListener extends ItemListenerSupport { private StepExecution stepExecution; @Autowired private BatchControlDao batchControlDao; public void afterWrite(List items) { Long batchId = (Long) this.stepExecution.getJobExecution().getExecutionContext().get( BatchControl.JOB_KEY_BATCH_ID); String lastProcessedAccNo = (String) this.stepExecution.getExecutionContext().get(BatchControl.JOB_KEY_LAST_PROCESSED_ACC_NO); batchControlDao.saveLastProcessedAccNo(lastProcessedAccNo, batchId); } @BeforeStep public void saveStepExecution(StepExecution stepExecution) { this.stepExecution = stepExecution; } }
Labels: spring batch
8 Comments:
hiii ,
I have a question
#{jobExecutionContext['lastProcessedAccNo']}
#{jobExecutionContext['accFrom']}
#{jobExecutionContext['accTo']}
#{jobExecutionContext['accFrom']} is not working for me , what would be the error??...
Thanks in advance
Have they been added to your step context via something like
stepContext.put(BatchControl.JOB_KEY_LAST_PROCESSED_ACC_NO, account.getAccNo());
Thank you very much for replying , please look at my job configuration file .
1
#{jobExecutionContext['settlementId]'}
1
DO YOU THINK I MISSED SOME THING??
in continuation to my previous comment , I have set the settlementId in
package com.ani.pci.settlements;
import java.util.List;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemWriter;
import com.anixter.creditcard.so.settlement.SettlementRequestSO;
public class SettlementsItemWriter implements ItemWriter {
private StepExecution stepExecution;
@Override
public void write(List arg0) throws Exception {
System.out.println(" Hello I am in Writer class++++++++++++ " + arg0);
// TODO Auto-generated method stub
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
int settlementId = 7;
SettlementRequestSO s= new SettlementRequestSO();
stepContext.put("settlementId", settlementId);
System.out.println("step Context : "+ stepContext);
//stepContext.put("settlemntRequestSO",s);
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
yaa , I did it in my program added it step execution context , tried retrieving it from job context in next step writer, I could retrieve it in writer class of step 2 but the problem I couldnt get the jobcontext key in job config file in settleDtlParameterSetter , I have put a debug point at ListPreparedStatementSetter setValues method could see the parameter value as string literal #{jobExecutionContext['settlementId']}, Can you please help me with this problem
#{jobExecutionContext['settlementId']}
1
This comment has been removed by the author.
look at the spring-batch part 2 for the declaration and references of the "promotionListener". Click on the spring-batch keyword cloud to view all spring-batch posts together.
Can you post the code how to delete record of database from itemwriter in spring batch.I am stuck in this I tried many way but it is not working.
Post a Comment
Subscribe to Post Comments [Atom]
<< Home