Apache Camel for asynchronous processing
Apache Camel is a very powerful and can be used with Java to solve various problems. In this example, I will discuss how it can be used to generate feeds asynchronously. You don't have to write any multi-threading code. Sending email can be done with the MVEL expression language. Following is a simple example where some feed files are processed asynchronously by retrieving the job request from an in-memory queue. In the event of error, email notification is sent. "from" is a consumer, and "to" is the destination. |
Step 1: The dependency jars required defined via Maven pom file.
<properties> <camel.version>2.10.3</camel.version> </properties> <dependencyManagement> <dependencies> <!-- CAMEL --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>${camel.version}</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-spring</artifactId> <version>${camel.version}</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-test-spring</artifactId> <version>${camel.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-spring-javaconfig</artifactId> <version>${camel.version}</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-mvel</artifactId> <version>${camel.version}</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-bean-validator</artifactId> <version>${camel.version}</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-beanio</artifactId> <version>${camel.version}</version> </dependency> </dependencies> </dependencyManagement>
Step 2: The next step is to bootstrap Camel via Spring Java based configuration. Making the config class CamelContextAware will inject the camel context via the setter method.
package com.myapp.config; import com.myapp.camel.JobHandlingRouteBuilder; import com.myapp.service.MyAppService; import javax.annotation.Resource; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MyAppConfig implements CamelContextAware { private CamelContext context; @Resource(name = "myAppService") private MyAppService myAppService; //constructor injection, where template is the bean id. @Bean public ProducerTemplate template() { if (context != null) { return context.createProducerTemplate(); } } @Bean public RouteBuilder jobHandlingRouteBuilder() { return new JobHandlingRouteBuilder(myAppForecastService); } @Override public void setCamelContext(CamelContext camelContext) { context = camelContext; } @Override public CamelContext getCamelContext() { return context; } }Step 3: Define the ProducerTemplate within your Service class. The ProducerTemplate interface allows you to send message exchanges to endpoints in a variety of different ways to make it easy to work with Camel Endpoint instances from Java code. This service class could be invoked via a RESTful Webservice.
//............ @Service(value = "myAppService") @Transactional(propagation = Propagation.SUPPORTS) public class CashForecastServiceImpl implements CashForecastService { private ProducerTemplate template; //since autowired, injected via MyAppConfig template() method with beanId being template. @Autowired public void setTemplate(ProducerTemplate template) { this.template = template; } @Override public boolean handleGroupLevelFeedGenerationRequest() { //a pojo Java class with fields like account code, etc and getter/setter methods FeedGenerationRequest request = new FeedGenerationRequest(); request.setAccountCode("12345"); //add headers and body. header will be used to determine processing logic by the RouteBuilder Map<string, object> headers = new HashMap<string,object>(); headers.put(JobHandlingRouteBuilder.JOB_TYPE_HEADER, JobType.FEED_GENERATION); //send it to an in memory BockingQueue define in the JobHandlingRouteBuilder class template.sendBodyAndHeaders(JobHandlingRouteBuilder.JOB_QUEUE, request, headers); return true; } public boolean generateFeed1(FeedGenerationRequest request){ //logic to generate feed goes here } public boolean generateFeed2(FeedGenerationRequest request){ //logic to generate feed goes here } //........................... }
Step 4: Finally, define the camel route to queue and asynchronously generate the required feed files.
package com.myapp.camel; import com.myapp.JobType; import com.myapp.MyAppService; import org.apache.camel.builder.RouteBuilder; public class JobHandlingRouteBuilder extends RouteBuilder { public static final String JOB_QUEUE = "vm:jobQueue?size=50&timeout=1000000&concurrentConsumers=1"; public static final String FEED_GENERATION_JOB_QUEUE = "vm:feedGenerationJobQueue?size=50&timeout=1000000&concurrentConsumers=1"; public static final String DIRECT_FEED1 = "direct:feed1"; public static final String DIRECT_ERROR = "direct:error"; public static final String DIRECT_FEED2 = "direct:feed2"; public static final String JOB_TYPE_HEADER = "jobTypeHeader"; public static final String CONTROLLABLE_JOB_TYPE_HEADER = "controllableJobTypeHeader"; private MyAppService myAppService; public JobHandlingRouteBuilder(MyAppService myAppService) { super(); this.myAppService = myAppService; } @Override public void configure() throws Exception { //build routes configureJobHandlingRoute(); configureFeedGenerationJobHandlingRoute(); } /** * Main route to handling all jobs */ private void configureJobHandlingRoute() { //from the in memory job queue move it to the in memory feed generation queue from(JOB_QUEUE) .routeId(JOB_QUEUE) .choice() .when((header(JOB_TYPE_HEADER).isEqualTo(JobType.FEED_GENERATION))) .log(INFO, "Handling Feed Generation Job") .to(FEED_GENERATION_JOB_QUEUE); } /** * Route to handle Feed Generation jobs */ public void configureFeedGenerationJobHandlingRoute() { // @formatter:off from(FEED_GENERATION_JOB_QUEUE) .routeId(FEED_GENERATION_JOB_QUEUE) .multicast() //multiple destinations .parallelProcessing() //multiple threads .to(DIRECT_FEED2, DIRECT_FEED1) .end(); //TODO handle exception and mark the status as FAILED from(DIRECT_FEED2) .routeId(DIRECT_FEED2) .setHeader(CONTROLLABLE_JOB_TYPE_HEADER, simple(ControllableJobType.POS_FEED.toString())) .doTry() .bean(myAppService, "generateFeed2") //invokes generateFeed2 method on myAppService bean .doCatch(Exception.class) .to(DIRECT_ERROR) .end(); from(DIRECT_FEED1) .routeId(DIRECT_FEED1) .setHeader(CONTROLLABLE_JOB_TYPE_HEADER, simple(ControllableJobType.CASH_FEED.toString())) .doTry() .bean(myAppService, "generateFeed1") //invokes generateFeed2 method on myAppService bean .doCatch(Exception.class) .to(DIRECT_ERROR) .end(); //Handle failtures from(DIRECT_ERROR) .routeId(DIRECT_ERROR) .log(INFO, " ${headers.controllableJobTypeHeader} Job failed, reason: ${exception.stacktrace}") .bean(cashForecastService, "markControllableJobFailed") .end(); } }
If you want to send email notification on error, the routes can be enhanced as shown below.
Firstly, add camel mail component.
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-mail</artifactId> <version>${camel.version}</version> </dependency>
public static final String DIRECT_EMAIL_NOTIFICATION = "direct:emailNotification"; public static final String NOTIFICATION_FLAG_HEADER = "notificationFlagHeader"; public static final String CREATION_FAILURE_EVENT_SUBJECT = "Failed Feed Generation"; private static final String LOG_URI = "log:" + AbstractCommonRouteBuilder.class.getPackage().getName() + "?level=ERROR"; //.... @Override public void doConfigure() { addPropertiesLocation("classpath:cash/cashforecast.properties"); } //.... from(DIRECT_FEED1) .routeId(DIRECT_FEED1) .setHeader(CONTROLLABLE_JOB_TYPE_HEADER, simple(ControllableJobType.CASH_FEED.toString())) .doTry() .bean(myAppService, "produceGroupLevelCashFeed") .doCatch(Exception.class) .to(DIRECT_ERROR) .end(); //Handle failtures from(DIRECT_ERROR) .routeId(DIRECT_ERROR) .log(INFO, " ${headers.controllableJobTypeHeader} Job failed, reason: ${exception.stacktrace}") .bean(myApp, "handleControllableJobFailed") .to(DIRECT_EMAIL_NOTIFICATION) .end(); from(DIRECT_EMAIL_NOTIFICATION) .routeId(DIRECT_EMAIL_NOTIFICATION) .setHeader(NOTIFICATION_FLAG_HEADER, simple("{{myapp.feed.enable.email.notification}}")) .choice() .when(header(NOTIFICATION_FLAG_HEADER).isEqualTo(true)) .log(INFO, "Sending email notification on Failture") .setBody(simple("${headers.controllableJobTypeHeader} : ${exception.message}")) .to("smtp:{{myapp.mail.host}}?contentType=text/html&to={{myapp.feed.notification.recipient}}&from={{myapp.feed.notification.sender}}" + "&subject=" + CREATION_FAILURE_EVENT_SUBJECT + "&mail.smtp.auth=false&mail.smtp.starttls.enable=false&delete=true&mapMailMessage=false"); //... /* * TODO: The better approach would be to use * BridgePropertyPlaceholderConfigurer so that it picks up @PropertySource * style configuration */ protected void addPropertiesLocation(String... newLocations) { PropertiesComponent properties = getPropertiesComponent(); String[] locationsArr = properties.getLocations(); List<string> locations = new ArrayList<string>(); if (locationsArr != null) { for (String location : locationsArr) { locations.add(location); } } for (String location : newLocations) { locations.add(location); } locationsArr = new String[locations.size()]; locationsArr = locations.toArray(locationsArr); properties.setLocations(locationsArr); }
The route is defined using MVEL, which is a powerful expression language for Java-based applications. You can also appreciate, how easy it is to build your routes using various protocols.
Labels: Apache Camel
0 Comments:
Post a Comment
Subscribe to Post Comments [Atom]
<< Home