Google

Aug 6, 2013

Spring JMS with Websphere MQ (aka MQ Series) -- Part 2 (Sender)



This is the continuation of Spring JMS with Websphere MQ -- Part 1.


Step 5: In Part 1, we configured the Connectionfactory, and in this step  configure the JMSTemplate with the @Configurable Spring annotation

package com.myapp.jms;

import java.io.IOException;

import javax.annotation.Resource;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class MyAppJmsTemplateConfig {

 @Value("${my.queue}")
    private String myAppQueueName;
 
 @Resource (name = "internalJmsConnectionFactory")
 private ConnectionFactory connectionFactory;
 
 @Bean(name = "myAppJmsTemplate")
 public JmsTemplate busniessExceptionJmsTemplate() throws JMSException, IOException {
  
  JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); 
  jmsTemplate.setExplicitQosEnabled(true);
  jmsTemplate.setDefaultDestinationName(myAppQueueName);
  return jmsTemplate;
 }
}

Step 6: Define the Spring context xml file META-INF/spring/myApp-applicationContext.xml file.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:p="http://www.springframework.org/schema/p"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:jee="http://www.springframework.org/schema/jee" 
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:task="http://www.springframework.org/schema/task"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:jms="http://www.springframework.org/schema/integration/jms"
    xmlns:cache="http://www.springframework.org/schema/cache"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc" 
    xsi:schemaLocation="
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
            http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.1.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
            http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
            http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
            http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd
            http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd
            http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd     
            http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-3.1.xsd">

 <bean id="placeholderConfig" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="ignoreUnresolvablePlaceholders" value="false" />
    <property name="location" value="classpath:calculation/validation.properties" />
 </bean>
 
  <context:component-scan base-package="com.myapp.jms">

</beans>

Step 7: Define the message sender class that publishes the message to the default queue that was configured earlier.

package com.myapp.service.impl;

import com.google.common.base.Stopwatch;
import com.myapp.service.MyAppService;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.annotation.concurrent.ThreadSafe;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

/**
 * Asynchronously submits a string to the JMS queue
 */
@Component
@ThreadSafe
public class MyAppServiceImpl implements MyAppService
{
    
    private static final Logger LOG = LoggerFactory.getLogger(MyAppServiceImpl.class);
    
    @Resource(name = "myAppJmsTemplate")
    private JmsTemplate jmsTemplate;
    
    @Value("${my.publishers.count}")
    int publisherCount;
    

    private ExecutorService pooledSender;
    
    @PostConstruct
    void init()
    {
        pooledSender = Executors.newFixedThreadPool(publisherCount);
    }
    
   
    @Override
    public void send(final String msg)
    {
        pooledSender.execute(new Runnable()
        {
            @Override
            public void run()
            {
                send(msg);
            }
            
            private void send(final String msg, int retryCount)
            {
                
                Stopwatch stopwatch = new Stopwatch().start();
                try
                {
 
                    jmsTemplate.send(new MessageCreator()
                    {
                        @Override
                        public Message createMessage(Session session) throws JMSException
                        {
                            return session.createTextMessage(msg);
                        }
                    });
                }
                catch (Exception e)
                {
                    LOG.warn("Unable to send message to JMS Queue {}", msg);
                   
                }
                LOG.info(" message sent to JMS Queue in {} ms", stopwatch.elapsedMillis());
            }
        });
    }
}


Step 8: Finally, the JUnit test class that

package com.myapp.service.impl;

import com.myapp.service.MyAppExecutionService;

import java.util.HashMap;
import java.util.Map;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.PropertySource;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:META-INF/spring/myApp-applicationContext.xml")
@PropertySource(
{
    "classpath:jms/internalConnection.properties"
   
})
public class MyAppServiceImplSenderIntegrationTest
{
    
    private static final Logger LOG = LoggerFactory.getLogger(MyAppServiceImplSenderIntegrationTest.class);
    
    @Resource
    private MyAppExecutionService executionService;
    
    
    @Test
    public void myAppIntegrationTest()
    {
        
        for (int i = 0; i < 1; i++)
        {
            final Map<String, String> detailMap = new HashMap<String, String>();
            detailMap.put("KeyA", "ValueA" + i);
            detailMap.put("KeyB", "ValueB" + i);
            executionService.send(detailMap.toString());
        }
    }
}



Labels: ,

Aug 1, 2013

Spring JMS with Websphere MQ (aka MQ Series) -- Part 2 (Receiver or Subscriber)



This is the continuation Spring JMS with Websphere MQ -- Part 1 (Configuration), and Spring JMS with Websphere MQ -- Part 2 (Sender).


Step 1: Write a listener class to receive the message by invoking the onMessage(Message message) method.

package com.myapp.service.impl;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
//@Transactional
public class MyAppListenerListener implements MessageListener
{
    
    private static Logger LOG = LoggerFactory.getLogger(MyAppListenerListener.class);
    
    @Override
    public void onMessage(Message message)
    {
        LOG.info("Entered ...............");
        
        if (message instanceof TextMessage)
        {
            try
            {
                String msg = ((TextMessage) message).getText();
                LOG.debug("Message contents: {} ", msg);
                System.out.println(msg);
            }
            catch (Exception e)
            {
                LOG.error("Error processing the message");
               
            }
            
        }
        
    }
}


Step 2 :  Configure the Spring container and bootstrap the MyAppListenerListener  to the container. You can set up multiple consumers to run concurrently in multiple threads. This basically involves extending the class MyAppJmsTemplateConfig defined in part -2.

package com.myapp.jms;

import java.io.IOException;

import javax.annotation.Resource;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;

@Configuration
@ComponentScan(basePackageClasses =
{
    MyAppListener.class
})
@PropertySource(
{
    "classpath:jms/internalConnection.properties"
})
public class MyAppJmsTemplateConfig  {

 @Value("${my.queue}")
    private String myAppQueueName;
 
 @Resource (name = "internalJmsConnectionFactory")
 private ConnectionFactory connectionFactory;
 
 @Resource
    private MyAppListener myAppListener;
 
 private int maxConsumers = 3; //3 concurrent consumers
 
 @Bean(name = "myAppJmsTemplate")
 public JmsTemplate myAppJmsTemplate() throws JMSException, IOException {
  
  JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); 
  jmsTemplate.setExplicitQosEnabled(true);
  jmsTemplate.setDefaultDestinationName(myAppQueueName);
  return jmsTemplate;
 } 
 
   /**
     * Sets up the JMS MessageListener 
     * 
     * @return
     */
    @Bean
    public DefaultMessageListenerContainer myAppListenerContainer()
    {
        DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
        listenerContainer.setConnectionFactory(internalJmsConnectionFactory);
        listenerContainer.setDestinationName(myAppQueueName);
        listenerContainer.setMessageListener(myAppListener);
        listenerContainer.setMaxConcurrentConsumers(maxConsumers);
        listenerContainer.setSessionTransacted(true);
        return listenerContainer;
    }
}


Refer to the myAppListenerContainer( ) method to bind the Spring listener container to the listener you defined earlier itself.


Step 3: Finally,the JUnit test class that waits for 10 minutes. In other words, waiting for the onMessage(...) method to be kicked off asynchronously when a message arrives on the queue.

package com.myapp.service.impl;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:META-INF/spring/myApp-applicationContext.xml")
public class DeCalcRunListenerIntegrationTest
{
    
    @Test
    public void testOnMessage() throws InterruptedException
    {
        Thread.sleep(300000);
    }  
}

Labels: ,

Jul 30, 2013

Spring JMS with Websphere MQ (aka MQ Series) -- Part 1 (configuration)

Messaging systems are used in enterprise applications for scalability. Here is the series of JMS tutorials. Stay tuned for more parts to follow.



Step 1: Add the relevant dependency jars to the pom.xml file.

<!-- Spring -->
<dependency>
 <groupId>org.springframework</groupId>
 <artifactId>spring-jms</artifactId>
</dependency>
<!-- JMS/MQ -->
<dependency>
 <groupId>com.ibm.webshere.mq7</groupId>
 <artifactId>commonservices</artifactId>
</dependency>
<dependency>
 <groupId>com.ibm.webshere.mq7</groupId>
 <artifactId>com-ibm-mqjms</artifactId>
</dependency>
<dependency>
 <groupId>com.ibm.webshere.mq7</groupId>
 <artifactId>com-ibm-mq</artifactId>
</dependency>
<dependency>
 <groupId>com.ibm.webshere.mq7</groupId>
 <artifactId>dhbcore</artifactId>
</dependency>
<dependency>
 <groupId>com.ibm.webshere.mq7</groupId>
 <artifactId>com-ibm-mq-jmqi</artifactId>
</dependency>
<dependency>
 <groupId>javax.jms</groupId>
 <artifactId>jms</artifactId>
</dependency>
<dependency>
 <groupId>javax.jms</groupId>
 <artifactId>jms</artifactId>
</dependency>


Step 2: Define the JMS properties jms/internalConnection.properties as shown below

#connection factory properties
jms.transportType=1  # i.e. TCP
jms.hostName=your_host
jms.channel=your.channel
jms.port=1414
jms.queueManager=your.que.manager
jms.sslEnabled=false
jms.sslCipherSuite=
jms.ssl.keystore.path=
jms.ssl.password=
 
#destination property 
my.queueName=my.queue 


Step 3: An abstract class that configures the  ConnectionFactory.

package com.myapp.jms;

import java.util.Properties;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.convert.support.ConfigurableConversionService;
import org.springframework.core.convert.support.DefaultConversionService;

import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.mq.jms.MQQueueConnectionFactory;

public class AbstractMqJmsConnectionConfig {

 private static final Logger LOG = LoggerFactory.getLogger(AbstractMqJmsConnectionConfig.class);

 private final ConfigurableConversionService conversionService = new DefaultConversionService();

 protected ConnectionFactory createQueueConnectionFactory(Properties properties) throws JMSException {
  MQQueueConnectionFactory connectionFactory = new MQQueueConnectionFactory();

  setConnectionFactoryProperties(connectionFactory, properties);

  return connectionFactory;
 }

 private void setConnectionFactoryProperties(MQConnectionFactory connectionFactory, Properties properties)
   throws JMSException {
    
  connectionFactory.setTransportType(conversionService.convert(properties.getProperty("jms.transportType"), Integer.class));
  connectionFactory.setHostName(properties.getProperty("jms.hostName"));
  connectionFactory.setChannel(properties.getProperty("jms.channel"));
  connectionFactory.setPort(conversionService.convert(properties.getProperty("jms.port"), Integer.class));
  connectionFactory.setQueueManager(properties.getProperty("jms.queueManager"));
  connectionFactory.setClientID(properties.getProperty("jms.clientid"));
  
  if (conversionService.convert(properties.getProperty("jms.sslEnabled"), Boolean.class)) {
   setSSLSystemProperties(properties);
   connectionFactory.setSSLCipherSuite(properties.getProperty("jms.sslCipherSuite"));
  }
 }

 private void setSSLSystemProperties(Properties properties) {
  String sslkeystoreFullPath = properties.getProperty("jms.ssl.keystore.path");
  LOG.info("Setting sslkeystoreFullPath : {}", sslkeystoreFullPath);
  System.setProperty("javax.net.ssl.keyStore", sslkeystoreFullPath);
  System.setProperty("javax.net.ssl.keyStorePassword", properties.getProperty("jms.ssl.password"));
 }

}

Note:  The  ConfigurableConversionService  utility class from spring is handy to convert string property values to relevant data types like Integer, Boolean, etc.

Step 4: Define the concrete class that loads the internalConnection.properties and has the Spring @Configuration annotation for the Spring dependency injection.

package com.myapp.jms;

import java.io.IOException;
import java.util.Properties;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;

import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
public class InternalJmsConnectionFactoryConfig extends AbstractMqJmsConnectionConfig {
 
 
 @Bean (name="internalJmsConnectionFactory")
 protected ConnectionFactory createQueueConnectionFactory() throws JMSException, IOException {
  return createQueueConnectionFactory(internalUMJMSProperties());
 }
 

 private Properties internalJMSProperties() throws IOException {
  PropertiesFactoryBean factory = new PropertiesFactoryBean();
  factory.setLocation(new ClassPathResource("jms/internalConnection.properties"));
  factory.afterPropertiesSet();
  return factory.getObject();
 }
}


More on JMS

Labels: ,