Batch processing in Java with Spring batch - part 2
This post assumes that you have read the part-1 spring batch overview questions and answers. The data access objects, datasource configuration, and few other classes are left out for brevity as these were covered on other spring topics.
Step 1: Define the batch-context.xml with the JobLauncher and the JobRepository. JobLaunchers are responsible for starting a Job with a given job parameters. The spring provided implementation, SimpleJobLauncher, relies on a TaskExecutor to launch the jobs. If no specific TaskExecutor is set then the spring provided default SyncTaskExecutor is used for testing purpose. A JobRepository implementation requires a set of execution Daos to store its information. The MapJobRepositoryFactoryBean is a FactoryBean that automates the creation of a SimpleJobRepository using non-persistent in-memory DAO implementations. This repository is only really intended for use in testing and rapid prototyping. The JobRegistryBeanPostProcessor registers Job beans with a JobRegistry.
<!-- define the job repository --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager" /> </bean> <!-- define the launcher and pass the jobRepository as setter injection --> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" /> <bean id="jobRegistryBeanPostProcessor" class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor"> <property name="jobRegistry" ref="jobRegistry" /> </bean>Step 2: Define the steps necessary for the batch job. Firstly, define the batch control step that will update the batch_control table accordingly.
<!-- Component-Scan automatically detects annotations in the Java classes. --> <context:component-scan base-package="com.myapp.batch" /> <batch:step id="batchControlStep"> <batch:tasklet> <batch:chunk reader="batchControlReader" writer="jobStartBatchControlWriter" commit-interval="1" /> <batch:listeners> <batch:listener ref="promotionListener" /> <batch:listener ref="batchReaderListener" /> </batch:listeners> </batch:tasklet> </batch:step> <bean id="batchControlReader" class="org.springframework.batch.item.database.JdbcCursorItemReader"> <property name="dataSource" ref="dataSourceMyDs" /> <property name="sql"> <util:constant static-field="com.myapp.batch.dao.impl.BatchControlDaoImpl.SELECT_BY_JOB_NAME_SQL" /> </property> <property name="rowMapper" ref="batchControlRowMapper" /> <property name="preparedStatementSetter" ref="batchControlReaderStatementSetter" /> </bean> <bean id="batchControlReaderStatementSetter" scope="step" class="org.springframework.batch.core.resource.ListPreparedStatementSetter"> <!-- The parameter that is passed via command line: my_job_run.sh accountValueUpdateJob1 accountValueUpdateJob1.log --> <!-- $JAVA_HOME/bin/java -classpath ${CLASSPATH} ${JOB_CLASS} batch-context.xml availableBalanceJob jobName=accountValueUpdateJob1" --> <property name="parameters"> <list> <value>#{jobParameters['jobName']}</value> </list> </property> </bean> <!-- promotes the values read via static constants from one step to another step via the StepExecutionContext --> <bean id="promotionListener" class="org.springframework.batch.core.listener.ExecutionContextPromotionListener"> <property name="keys"> <list> <util:constant static-field="com.myapp.batch.domain.BatchControl.JOB_KEY_LAST_PROCESSED_ACC_NO" /> <util:constant static-field="com.myapp.batch.domain.BatchControl.JOB_KEY_BATCH_ID" /> <util:constant static-field="com.myapp.batch.domain.BatchControl.ACC_FROM" /> <util:constant static-field="com.myapp.batch.domain.BatchControl.ACC_TO" /> </list> </property> </bean> <!-- The listener class that has the afterStep() method to print -- > <bean id="batchReaderListener" class="com.myapp.batch.listener.BatchReaderListener" />Step 3: Define the relevant Java classes that are rereferenced in the above configuration file. Capture the batch control meta data as shown below.
public class BatchControl { public static final String JOB_KEY_LAST_PROCESSED_ACC_NO = "lastProcessedAccNo"; public static final String JOB_KEY_BATCH_ID = "batchId"; public static final String ACC_FROM = "accFrom"; public static final String ACC_TO = "accTo"; public static final String BATCH_USER = "batch"; private Long jobId; private String jobName; private DateTime startDateTime; private DateTime endDateTime; private String status; private Integer accFrom; private Integer accTo; private Integer lastProcessedAccNo; //getters and setters are omitted }
The listener class that prints the job status -- "COMPLETED" or "FAILED" after the step execution.
package com.myapp.batch.listener; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.step.NoWorkFoundStepExecutionListener; public class BatchReaderListener extends NoWorkFoundStepExecutionListener { @Override public ExitStatus afterStep(StepExecution stepExecution) { ExitStatus exitStatus = super.afterStep(stepExecution); if (exitStatus != null && exitStatus.equals(ExitStatus.FAILED)) { System.out.println("Could not load batch control record with job name " + stepExecution.getJobExecution().getJobInstance().getJobParameters().getString("jobName")); } return exitStatus; } }
The Dao object that reads and updates the batch_control table.
package com.myapp.batch.dao.impl; import org.joda.time.DateTime; import org.springframework.batch.core.BatchStatus; import org.springframework.jdbc.core.simple.ParameterizedBeanPropertyRowMapper; import org.springframework.jdbc.core.simple.SimpleJdbcTemplate; import org.springframework.orm.hibernate3.HibernateTemplate; import org.springframework.orm.hibernate3.SessionFactoryUtils; import org.springframework.orm.hibernate3.support.HibernateDaoSupport; import com.myapp.batch.dao.BatchControlDao; import com.myapp.batch.domain.BatchControl; public class BatchControlDaoImpl extends HibernateDaoSupport implements BatchControlDao { public static final String SELECT_BY_JOB_NAME_SQL = "select job_id, job_name, start_timestamp, end_timestamp, status," + " account_no_from, account_no_to, last_account_no from batch_control where job_name = ?"; private static final String JOB_START_UPDATE_SQL = "update batch_control set start_timestamp = ?, end_timestamp = ?, status = ?, last_account_no=?" + " where batch_id = ?"; private static final String JOB_COMPLETE_UPDATE_SQL = "update batch_control set end_timestamp = ?, status = ? where batch_id = ?"; private static final String LAST_PROCESSED_ACC_UPDATE_SQL = "update batch_control set last_account_no = ? where batch_id = ?"; private SimpleJdbcTemplate simpleJdbcTemplate; public BatchControlDaoImpl(HibernateTemplate hibernateTemplate) { setHibernateTemplate(hibernateTemplate); this.simpleJdbcTemplate = new SimpleJdbcTemplate(SessionFactoryUtils.getDataSource(hibernateTemplate.getSessionFactory())); } public void init(Long Id, Integer last_account_no) { simpleJdbcTemplate.update(JOB_START_UPDATE_SQL, new DateTime().toDate(), null, BatchStatus.STARTED.name(), last_account_no, batchId); } public void saveJobComplete(Long batchId, BatchStatus status) { simpleJdbcTemplate.update(JOB_COMPLETE_UPDATE_SQL, new DateTime().toDate(), status.name(), batchId); } public void saveLastProcessedId(String last_account_no, Long batchId) { simpleJdbcTemplate.update(LAST_PROCESSED_ACC_UPDATE_SQL, last_account_no, batchId); } public BatchControl getBatchControl(String jobName) { BatchControl batchControl = simpleJdbcTemplate.queryForObject(SELECT_BY_JOB_NAME_SQL, ParameterizedBeanPropertyRowMapper .newInstance(BatchControl.class), jobName); return batchControl; } }This will be concluded in part-3.
Labels: spring batch
1 Comments:
Thank you so much Arul... The way you explained and the way you highlighted is suberb...
Post a Comment
Subscribe to Post Comments [Atom]
<< Home