Spring batch advanced tutorial -- writing your own reader
My previous 3 part spring batch tutorial covered a high level overview with examples. This tutorial demonstrates how to wrap your own File Reader with the FileItemReader to peek the data and group them the way you wanted to provide some customization. For example, if you have a CSV file like shown below where
|
"Portfolio1","29/02/2012","11/03/2012", "Portfolio1","Account1","OPENBAL", 2000.00 "Portfolio1 ","Account1","PURCHASE",1000.00 "Portfolio1 ","Account1","EXPENSE",500.00 "Portfolio1 ","Account1","ADJUSTMENT ", 200.00 "Portfolio1","Account1","OPENBAL ", 12000.00 "Portfolio1 ","Account2","PURCHASE",1000.00 "Portfolio1 ","Account3","ADJUSTMENT",1000.00
So, wee need to write a custom file reader that can peek into the next record before reading it.
Step 1: Snippets of the spring batch context configuration file.E..g.applicationContext-myapp.xml.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:task="http://www.springframework.org/schema/task" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:int="http://www.springframework.org/schema/integration" xmlns:file="http://www.springframework.org/schema/integration/file" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file-2.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd"> <!-- load properties file--> <context:property-placeholder location="classpath:myapp.properties" /> <!-- annotation driven injection --> <tx:annotation-driven /> <!-- define the job that reads from a CSV file and write to a database--> <job id="myAppJob" xmlns="http://www.springframework.org/schema/batch"> <listeners> <listener ref="myAppJobExecutionListener" /> </listeners> <step id="loadMyAppFeedData"> <tasklet transaction-manager="transactionManager"> <listeners> <listener ref="stepExecutionListener" /> </listeners> <chunk reader="groupMyAppDetailsReader" writer="myAppFileItemWriter" commit-interval="10" /> </tasklet> </step> </job> <!-- Spring supplied File Item Reader that reads CSV file line by line--> <bean id="myAppFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step"> <property name="resource" value="#{jobParameters['dataFileName']}" /> <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> <property name="names" value="portfolioCd,accountCd,transactionType, Amount" /> </bean> </property> <property name="fieldSetMapper"> <bean class="com.myapp.mapper.MyAppFieldSetMapper" /> </property> </bean> </property> <property name="linesToSkip" value="1" /> <property name="skippedLinesCallback" ref="myAppFileHeaderLineCallbackHandler" /> </bean> <!-- My custom CSV file Reader that groups data but it internally makes use of the Spring's FileItemReader--> <bean id="groupMyAppDetailsReader" class="com.myapp.item.reader.myAppItemReader"> <property name="delegate" ref="myAppFileItemReader" /> </bean> <!-- My custome File Item Writer --> <bean id="myAppFileItemWriter" class="com.myapp.item.writer.MyAppItemWriter" /> <!-- The Step execution context listener that can be injected to propagate step values --> <bean id="stepExecutionListener" class="com.myapp.StepExecutionListenerCtxInjecter" /> </beans>
Step 2: The custom reader can be implemented as shown below. The key here is that peeking the next record to enable grouping and making use of the Spring provided FileItemReader as a delegate to read each CSV line.
package com.myapp.item.reader; import java.util.ArrayList; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemStreamException; import org.springframework.batch.item.ItemStreamReader; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException; import com.myapp.model.TransactionDetail; import com.myapp.model.MyAppPortfolioParent; public class MyAppFileItemReader implements ItemStreamReader<MyAppPortfolioParent> { private ItemStreamReader<TransactionDetail> delegate; private TransactionDetail curItem = null; @Override public MyAppPortfolioParent read() { MyAppPortfolioParent parent = null; try { if (curItem == null) { curItem = delegate.read(); } if (curItem != null) { parent = new MyAppPortfolioParent(); parent.setBalanceDetail(curItem); } curItem = null; if (parent != null) { parent.setTxnDetails(new ArrayList<TransactionDetail>()); TransactionDetail detail = peek(); while (detail != null && !"OPENBAL".equalsIgnoreCase(peek().getTxnCd())) { parent.getTxnDetails().add(curItem); curItem = null; detail = peek(); } } } catch (Exception e) { e.printStackTrace(); } return parent; } public TransactionDetail peek() throws Exception, UnexpectedInputException, ParseException { if (curItem == null) { curItem = delegate.read(); } return curItem; } @Override public void close() throws ItemStreamException { delegate.close(); } @Override public void open(ExecutionContext arg0) throws ItemStreamException { delegate.open(arg0); } @Override public void update(ExecutionContext arg0) throws ItemStreamException { delegate.update(arg0); } public void setDelegate(ItemStreamReader<TransactionDetail> delegate) { this.delegate = delegate; } }
Step 3: The utility class that can be used to inject the step and job execution contexts into your reader, processor, or writer classes.
package com.myapp.util; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.annotation.BeforeStep; import org.springframework.batch.item.ExecutionContext; public class StepExecutionListenerCtxInjecter { private ExecutionContext stepExecutionCtx; private ExecutionContext jobExecutionCtx; @BeforeStep public void beforeStep(StepExecution stepExecution) { stepExecutionCtx = stepExecution.getExecutionContext(); jobExecutionCtx = stepExecution.getJobExecution().getExecutionContext(); } public ExecutionContext getStepExecutionCtx() { return stepExecutionCtx; } public ExecutionContext getJobExecutionCtx() { return jobExecutionCtx; } }
Step 4: As you could see in the spring config file that we are skipping the first record, which is the header record and defined a LineCallBackHandler to handle the header records. Here is the implementation of this handler.
package com.myapp.handler; import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.file.LineCallbackHandler; import org.springframework.stereotype.Component; import com.myapp.dao.MyAppingDao; import com.myapp.model.MyAppMeta; import com.myapp.util.CashforecastingUtil; import com.myapp.util.StepExecutionListenerCtxInjecter; @Component(value = "myAppFileHeaderLineCallbackHandler") public class MyAppFileHeaderCallbackHandler implements LineCallbackHandler { private static final Logger LOGGER = LoggerFactory.getLogger(MyAppFileHeaderCallbackHandler.class); public static final String FEED_HEADER_DATA = "feedHeaderData"; @Resource(name = "myappFeedDao") private MyAppDao myappDao; @Resource(name = "stepExecutionListener") private StepExecutionListenerCtxInjecter stepExecutionListener; @Override public void handleLine(String headerLine) { LOGGER.debug("header line: {}", headerLine); //convert CSV data into MyAppMeta cfMeta = MyAppUtil.getMyAppMetaFromHeader(headerLine, null); // logical delete current records int noOfRecordsLogicallyDeleted = myappDao.logicallyDelete(cfMeta); LOGGER.info("No of records logically deleted: " + noOfRecordsLogicallyDeleted); //save it in the job execution context stepExecutionListener.getJobExecutionCtx().put(FEED_HEADER_DATA, cfMeta); } }
Step 5: The FileItemReader has a mapper defined to map each row to an object. We need to define this object that gets invoked when each CSV line item is read to convert each field to an object as shown below.
package com.myapp.mapper;
import java.math.BigDecimal; import java.text.ParseException; import org.apache.commons.lang.time.DateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.FieldSet; import org.springframework.validation.BindException; import com.myapp.model.MyAppDetail; public class MyAppFieldSetMapper implements FieldSetMapper<MyAppDetail> { private final static Logger logger = LoggerFactory.getLogger(CashForecastFieldSetMapper.class); @Override public MyAppDetail mapFieldSet(FieldSet fs) throws BindException { if (fs == null) { return null; } MyAppDetail detail = new MyAppDetail(); detail.setPortfolioCd(fs.readString("portfolioCd")); detail.setAccountCd(fs.readString("accountCd")); detail.setTxnCd(fs.readString("txnCd")); BigDecimal cashValue = fs.readBigDecimal("cashValue"); detail.setCashValue(cashValue != null ? cashValue : BigDecimal.ZERO); return detail; } }
Step 6: The writer class that is responsible for writing a group of items (i.e. parent and children records) to the database.
package com.myapp.item.writer; import java.util.List; import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.item.ItemWriter; import com.myapp.dao.myappingDao; import com.myapp.handler.myappFileHeaderCallbackHandler; import com.myapp.model.myappDetail; import com.myapp.model.myappMeta; import com.myapp.model.myappParent; import com.myapp.util.StepExecutionListenerCtxInjecter; public class MyAppItemWriter implements ItemWriter<MyAppParent> { @Resource(name = "stepExecutionListener") private StepExecutionListenerCtxInjecter stepExecutionListener; // to get the step and job contexts @Resource(name = "myappFeedDao") private myappingDao myappDao; //dao class for saving records into database private final static Logger logger = LoggerFactory.getLogger(MyappItemWriter.class); @Override public void write(List portfolioDetails) { //retrieving previously stored data from the job context myappMeta pfMeta = (myappMeta) stepExecutionListener.getJobExecutionCtx().get( MyAppFileHeaderCallbackHandler.FEED_HEADER_DATA); int batchJobId = -1; //retrieving previously stored data from the job context if (stepExecutionListener.getJobExecutionCtx().get("batchJobId") != null) { batchJobId = stepExecutionListener.getJobExecutionCtx().getInt("batchJobId"); } pfMeta.setBatchJobId(batchJobId); try { for (myappParent cfp : portfolioDetails) { MyappDetail bd = cfp.getBalanceDetail(); // save cash forcasting balances int noOfRecords = myappDao.saveMyappBalance(bd, pfMeta); logger.info("No of cashforcast balance records inserted " + noOfRecords); int syntheticId = myappDao.getmyappId(bd, pfMeta); // save myapping transaction records List<Myappdetail> txnDetails = cfp.getTxnDetails(); for (myappDetail txd : txnDetails) { myappDao.saveMyappDetail(txd, syntheticId); } } } catch (Exception e) { logger.error("myappItemWriter error", e); throw new RuntimeException(e); } if (logger.isDebugEnabled()) { logger.debug("Commiting chunks to the database ...... "); } } }
Labels: spring batch
5 Comments:
Thanks a lot to post it, example is really very needful.
Nicely written example.
Hi ,
I am reading file from Database and then writing in fixed line file format (No comma) ,I would like to include Header and Footer in file.
My Header format is prefixed with HD
HD-RECORD-TYPE-IDENTIFIER(2)
HD-REC-SEQ-NUM(5) its line number
HD-SUBSCRIBER-ID(10)
HD-DATE-TIME-STAMP(10)
HD-REJECT-CODE(10)
FILLER
Body detail format is as below and prefixed with (DT)
HD-RECORD-TYPE-IDENTIFIER(2)
HD-REC-SEQ-NUM(5)
DT-CREDIT-CARD-NUMBER
DT-PAN-SEQ-NUMBER
DT-CUSTOMER-NUMBER
DT-EXPIRY-DATE
DT-SE-NUM
DT-DECISION-INDICATOR
similary footer format is and prefixed with 'FT'
FOOTER FT-RECORD-TYPE-IDENTIFIER(2)
FOOTER FT-REC-SEQ-NUM(5)
FOOTER FT-NO-OF-RECORDS
HD0000100011000220111212022424
DT000020002376000390521009
FT000030034000000052
Refer Spring batch doco where fixed file format is described.
can you post an example to use ItemReader and HibernateItemWriter in SpringBatch....
Thanks In Advance
Post a Comment
Subscribe to Post Comments [Atom]
<< Home