Apache Camel for asynchronous processing

Apache Camel is a very powerful and can be used with Java to solve various problems. In this example, I will discuss how it can be used to generate feeds asynchronously. You don't have to write any multi-threading code. Sending email can be done with the MVEL expression language.

Following is a simple example where some feed files are processed asynchronously by retrieving the job request from an in-memory queue. In the event of error, email notification is sent. "from" is a consumer, and "to" is the destination.


Step 1:  The dependency jars required defined via Maven pom file.

 <properties>
     <camel.version>2.10.3</camel.version>
 </properties>
   
 <dependencyManagement>
  <dependencies> 
   <!-- CAMEL -->
   <dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-core</artifactId>
    <version>${camel.version}</version>
   </dependency>
   <dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-spring</artifactId>
    <version>${camel.version}</version>
   </dependency>

   <dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-test-spring</artifactId>
    <version>${camel.version}</version>
    <scope>test</scope>
   </dependency>

   <dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-spring-javaconfig</artifactId>
    <version>${camel.version}</version>
   </dependency>

   <dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-mvel</artifactId>
    <version>${camel.version}</version>
   </dependency>

   <dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-bean-validator</artifactId>
    <version>${camel.version}</version>
   </dependency>

   <dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-beanio</artifactId>
    <version>${camel.version}</version>
   </dependency>
  </dependencies>
 </dependencyManagement>



Step 2: The next step is to bootstrap Camel via Spring Java based configuration. Making the config class CamelContextAware will inject the camel context via the setter method.


package com.myapp.config;

import com.myapp.camel.JobHandlingRouteBuilder;
import com.myapp.service.MyAppService;

import javax.annotation.Resource;

import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class MyAppConfig implements CamelContextAware
{
    
    private CamelContext context;
    
    @Resource(name = "myAppService")
    private MyAppService myAppService;
    
    //constructor injection, where template is the bean id.
    @Bean
    public ProducerTemplate template()
    {
        if (context != null)
        {
            return context.createProducerTemplate();
        }
    }
    
    @Bean
    public RouteBuilder jobHandlingRouteBuilder()
    {
        return new JobHandlingRouteBuilder(myAppForecastService);
    }
    
    @Override
    public void setCamelContext(CamelContext camelContext)
    {
        context = camelContext;
    }
    
    @Override
    public CamelContext getCamelContext()
    {
        return context;
    }
    
}


Step 3: Define the ProducerTemplate within your Service class.  The ProducerTemplate interface allows you to send message exchanges to endpoints in a variety of different ways to make it easy to work with Camel Endpoint instances from Java code. This service class could be invoked via a RESTful Webservice. 

//............

@Service(value = "myAppService")
@Transactional(propagation = Propagation.SUPPORTS)
public class CashForecastServiceImpl implements CashForecastService
{

   private ProducerTemplate template;

   
   //since autowired, injected via MyAppConfig template() method with beanId being template.
   @Autowired
   public void setTemplate(ProducerTemplate template)
   {
       this.template = template;
   }
    
   @Override
    public boolean handleGroupLevelFeedGenerationRequest()
    {
       
    //a pojo Java class with fields like account code, etc and getter/setter methods
    FeedGenerationRequest request = new FeedGenerationRequest();
    request.setAccountCode("12345");
    
       //add headers and body. header will be used to determine processing logic by the RouteBuilder 
        Map<string, object> headers = new HashMap<string,object>();
        headers.put(JobHandlingRouteBuilder.JOB_TYPE_HEADER, JobType.FEED_GENERATION);
        
  //send it to an in memory BockingQueue define in the JobHandlingRouteBuilder class
        template.sendBodyAndHeaders(JobHandlingRouteBuilder.JOB_QUEUE, request, headers);
        
        return true;
    }

    public boolean generateFeed1(FeedGenerationRequest request){
      //logic to generate feed goes here
    }
 
    public boolean generateFeed2(FeedGenerationRequest request){
       //logic to generate feed goes here
    }
  

 //...........................
   
}


Step 4: Finally, define the camel route to queue and asynchronously generate the required feed files.




package com.myapp.camel;


import com.myapp.JobType;
import com.myapp.MyAppService;

import org.apache.camel.builder.RouteBuilder;


public class JobHandlingRouteBuilder extends RouteBuilder
{
    public static final String JOB_QUEUE = "vm:jobQueue?size=50&timeout=1000000&concurrentConsumers=1";
    public static final String FEED_GENERATION_JOB_QUEUE = "vm:feedGenerationJobQueue?size=50&timeout=1000000&concurrentConsumers=1";
    public static final String DIRECT_FEED1 = "direct:feed1";
    public static final String DIRECT_ERROR = "direct:error";
    public static final String DIRECT_FEED2 = "direct:feed2";
    public static final String JOB_TYPE_HEADER = "jobTypeHeader";
    public static final String CONTROLLABLE_JOB_TYPE_HEADER = "controllableJobTypeHeader";
    
    private MyAppService myAppService;
    
    public JobHandlingRouteBuilder(MyAppService myAppService)
    {
        super();
        this.myAppService = myAppService;
    }
    
    @Override
    public void configure() throws Exception
    {
     //build routes
        configureJobHandlingRoute();
        configureFeedGenerationJobHandlingRoute();
    }
    
    /**
     * Main route to handling all jobs
     */
    private void configureJobHandlingRoute()
    {
        
  //from the in memory job queue move it to the in memory feed generation queue
        from(JOB_QUEUE)
            .routeId(JOB_QUEUE)
            .choice()
                .when((header(JOB_TYPE_HEADER).isEqualTo(JobType.FEED_GENERATION)))
                    .log(INFO, "Handling Feed Generation Job")    
                    .to(FEED_GENERATION_JOB_QUEUE);
    }
    
    /**
     * Route to handle Feed Generation jobs
     */
    public void configureFeedGenerationJobHandlingRoute()
    {
        // @formatter:off
          from(FEED_GENERATION_JOB_QUEUE)
              .routeId(FEED_GENERATION_JOB_QUEUE)
              .multicast()                           //multiple destinations  
                  .parallelProcessing()              //multiple threads
                  .to(DIRECT_FEED2, DIRECT_FEED1)
              .end();
          
          //TODO handle exception and mark the status as FAILED
          from(DIRECT_FEED2)
              .routeId(DIRECT_FEED2)
              .setHeader(CONTROLLABLE_JOB_TYPE_HEADER, simple(ControllableJobType.POS_FEED.toString()))
              .doTry()
                  .bean(myAppService, "generateFeed2") //invokes generateFeed2 method on myAppService bean
              .doCatch(Exception.class)
                  .to(DIRECT_ERROR)
               .end();

          from(DIRECT_FEED1)
              .routeId(DIRECT_FEED1)
              .setHeader(CONTROLLABLE_JOB_TYPE_HEADER, simple(ControllableJobType.CASH_FEED.toString()))
              .doTry()
                  .bean(myAppService, "generateFeed1")  //invokes generateFeed2 method on myAppService bean
              .doCatch(Exception.class)
                  .to(DIRECT_ERROR)
               .end();
          
          //Handle failtures
          from(DIRECT_ERROR)
              .routeId(DIRECT_ERROR)
              .log(INFO, " ${headers.controllableJobTypeHeader} Job failed, reason:  ${exception.stacktrace}")
              .bean(cashForecastService, "markControllableJobFailed")
              .end();
    }
}




If you want to send email notification on error, the routes can be enhanced as shown below.

Firstly, add camel mail component.

    
 <dependency>
     <groupId>org.apache.camel</groupId>
  <artifactId>camel-mail</artifactId>
  <version>${camel.version}</version>
 </dependency>


        public static final String DIRECT_EMAIL_NOTIFICATION = "direct:emailNotification"; 
  public static final String NOTIFICATION_FLAG_HEADER = "notificationFlagHeader";
  public static final String CREATION_FAILURE_EVENT_SUBJECT = "Failed Feed Generation";
  private static final String LOG_URI = "log:" + AbstractCommonRouteBuilder.class.getPackage().getName()
            + "?level=ERROR";
  
  //....

  @Override
        public void doConfigure()
       {
           addPropertiesLocation("classpath:cash/cashforecast.properties");     
       }
  
  //....
  
        from(DIRECT_FEED1)
              .routeId(DIRECT_FEED1)
              .setHeader(CONTROLLABLE_JOB_TYPE_HEADER, simple(ControllableJobType.CASH_FEED.toString()))
              .doTry()
                  .bean(myAppService, "produceGroupLevelCashFeed")
              .doCatch(Exception.class)
                  .to(DIRECT_ERROR)
               .end();
          
          //Handle failtures
          from(DIRECT_ERROR)
              .routeId(DIRECT_ERROR)
              .log(INFO, " ${headers.controllableJobTypeHeader} Job failed, reason:  ${exception.stacktrace}")
              .bean(myApp, "handleControllableJobFailed")
              .to(DIRECT_EMAIL_NOTIFICATION)
              .end();
          
          from(DIRECT_EMAIL_NOTIFICATION)
              .routeId(DIRECT_EMAIL_NOTIFICATION)
              .setHeader(NOTIFICATION_FLAG_HEADER, simple("{{myapp.feed.enable.email.notification}}"))
              .choice()
                  .when(header(NOTIFICATION_FLAG_HEADER).isEqualTo(true))
                      .log(INFO, "Sending email notification on Failture")
                      .setBody(simple("${headers.controllableJobTypeHeader} : ${exception.message}"))
                      .to("smtp:{{myapp.mail.host}}?contentType=text/html&to={{myapp.feed.notification.recipient}}&from={{myapp.feed.notification.sender}}"
                          + "&subject="
                          + CREATION_FAILURE_EVENT_SUBJECT
                          + "&mail.smtp.auth=false&mail.smtp.starttls.enable=false&delete=true&mapMailMessage=false");
 

    //...
 
        
 /*
     * TODO: The better approach would be to use
     * BridgePropertyPlaceholderConfigurer so that it picks up @PropertySource
     * style configuration
     */
    protected void addPropertiesLocation(String... newLocations)
    {
        PropertiesComponent properties = getPropertiesComponent();
        String[] locationsArr = properties.getLocations();
        List<string> locations = new ArrayList<string>();
        if (locationsArr != null)
        {
            for (String location : locationsArr)
            {
                locations.add(location);
            }
        }
        for (String location : newLocations)
        {
            locations.add(location);
        }
        locationsArr = new String[locations.size()];
        locationsArr = locations.toArray(locationsArr);
        properties.setLocations(locationsArr);    
    }
 


The route is defined using MVEL, which is a powerful expression language for Java-based applications. You can also appreciate, how easy it is to build your routes using various protocols.


No comments:

Post a Comment

5 recent Java developer posts