Google

Aug 1, 2013

Spring JMS with Websphere MQ (aka MQ Series) -- Part 2 (Receiver or Subscriber)



This is the continuation Spring JMS with Websphere MQ -- Part 1 (Configuration), and Spring JMS with Websphere MQ -- Part 2 (Sender).


Step 1: Write a listener class to receive the message by invoking the onMessage(Message message) method.

package com.myapp.service.impl;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
//@Transactional
public class MyAppListenerListener implements MessageListener
{
    
    private static Logger LOG = LoggerFactory.getLogger(MyAppListenerListener.class);
    
    @Override
    public void onMessage(Message message)
    {
        LOG.info("Entered ...............");
        
        if (message instanceof TextMessage)
        {
            try
            {
                String msg = ((TextMessage) message).getText();
                LOG.debug("Message contents: {} ", msg);
                System.out.println(msg);
            }
            catch (Exception e)
            {
                LOG.error("Error processing the message");
               
            }
            
        }
        
    }
}


Step 2 :  Configure the Spring container and bootstrap the MyAppListenerListener  to the container. You can set up multiple consumers to run concurrently in multiple threads. This basically involves extending the class MyAppJmsTemplateConfig defined in part -2.

package com.myapp.jms;

import java.io.IOException;

import javax.annotation.Resource;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;

@Configuration
@ComponentScan(basePackageClasses =
{
    MyAppListener.class
})
@PropertySource(
{
    "classpath:jms/internalConnection.properties"
})
public class MyAppJmsTemplateConfig  {

 @Value("${my.queue}")
    private String myAppQueueName;
 
 @Resource (name = "internalJmsConnectionFactory")
 private ConnectionFactory connectionFactory;
 
 @Resource
    private MyAppListener myAppListener;
 
 private int maxConsumers = 3; //3 concurrent consumers
 
 @Bean(name = "myAppJmsTemplate")
 public JmsTemplate myAppJmsTemplate() throws JMSException, IOException {
  
  JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); 
  jmsTemplate.setExplicitQosEnabled(true);
  jmsTemplate.setDefaultDestinationName(myAppQueueName);
  return jmsTemplate;
 } 
 
   /**
     * Sets up the JMS MessageListener 
     * 
     * @return
     */
    @Bean
    public DefaultMessageListenerContainer myAppListenerContainer()
    {
        DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
        listenerContainer.setConnectionFactory(internalJmsConnectionFactory);
        listenerContainer.setDestinationName(myAppQueueName);
        listenerContainer.setMessageListener(myAppListener);
        listenerContainer.setMaxConcurrentConsumers(maxConsumers);
        listenerContainer.setSessionTransacted(true);
        return listenerContainer;
    }
}


Refer to the myAppListenerContainer( ) method to bind the Spring listener container to the listener you defined earlier itself.


Step 3: Finally,the JUnit test class that waits for 10 minutes. In other words, waiting for the onMessage(...) method to be kicked off asynchronously when a message arrives on the queue.

package com.myapp.service.impl;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:META-INF/spring/myApp-applicationContext.xml")
public class DeCalcRunListenerIntegrationTest
{
    
    @Test
    public void testOnMessage() throws InterruptedException
    {
        Thread.sleep(300000);
    }  
}

Labels: ,

0 Comments:

Post a Comment

Subscribe to Post Comments [Atom]

<< Home