Google

Aug 6, 2013

Spring JMS with Websphere MQ (aka MQ Series) -- Part 2 (Sender)



This is the continuation of Spring JMS with Websphere MQ -- Part 1.


Step 5: In Part 1, we configured the Connectionfactory, and in this step  configure the JMSTemplate with the @Configurable Spring annotation

package com.myapp.jms;

import java.io.IOException;

import javax.annotation.Resource;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class MyAppJmsTemplateConfig {

 @Value("${my.queue}")
    private String myAppQueueName;
 
 @Resource (name = "internalJmsConnectionFactory")
 private ConnectionFactory connectionFactory;
 
 @Bean(name = "myAppJmsTemplate")
 public JmsTemplate busniessExceptionJmsTemplate() throws JMSException, IOException {
  
  JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); 
  jmsTemplate.setExplicitQosEnabled(true);
  jmsTemplate.setDefaultDestinationName(myAppQueueName);
  return jmsTemplate;
 }
}

Step 6: Define the Spring context xml file META-INF/spring/myApp-applicationContext.xml file.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:p="http://www.springframework.org/schema/p"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:jee="http://www.springframework.org/schema/jee" 
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:task="http://www.springframework.org/schema/task"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:jms="http://www.springframework.org/schema/integration/jms"
    xmlns:cache="http://www.springframework.org/schema/cache"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc" 
    xsi:schemaLocation="
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
            http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.1.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
            http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
            http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
            http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd
            http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd
            http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd     
            http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-3.1.xsd">

 <bean id="placeholderConfig" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="ignoreUnresolvablePlaceholders" value="false" />
    <property name="location" value="classpath:calculation/validation.properties" />
 </bean>
 
  <context:component-scan base-package="com.myapp.jms">

</beans>

Step 7: Define the message sender class that publishes the message to the default queue that was configured earlier.

package com.myapp.service.impl;

import com.google.common.base.Stopwatch;
import com.myapp.service.MyAppService;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.annotation.concurrent.ThreadSafe;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

/**
 * Asynchronously submits a string to the JMS queue
 */
@Component
@ThreadSafe
public class MyAppServiceImpl implements MyAppService
{
    
    private static final Logger LOG = LoggerFactory.getLogger(MyAppServiceImpl.class);
    
    @Resource(name = "myAppJmsTemplate")
    private JmsTemplate jmsTemplate;
    
    @Value("${my.publishers.count}")
    int publisherCount;
    

    private ExecutorService pooledSender;
    
    @PostConstruct
    void init()
    {
        pooledSender = Executors.newFixedThreadPool(publisherCount);
    }
    
   
    @Override
    public void send(final String msg)
    {
        pooledSender.execute(new Runnable()
        {
            @Override
            public void run()
            {
                send(msg);
            }
            
            private void send(final String msg, int retryCount)
            {
                
                Stopwatch stopwatch = new Stopwatch().start();
                try
                {
 
                    jmsTemplate.send(new MessageCreator()
                    {
                        @Override
                        public Message createMessage(Session session) throws JMSException
                        {
                            return session.createTextMessage(msg);
                        }
                    });
                }
                catch (Exception e)
                {
                    LOG.warn("Unable to send message to JMS Queue {}", msg);
                   
                }
                LOG.info(" message sent to JMS Queue in {} ms", stopwatch.elapsedMillis());
            }
        });
    }
}


Step 8: Finally, the JUnit test class that

package com.myapp.service.impl;

import com.myapp.service.MyAppExecutionService;

import java.util.HashMap;
import java.util.Map;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.PropertySource;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:META-INF/spring/myApp-applicationContext.xml")
@PropertySource(
{
    "classpath:jms/internalConnection.properties"
   
})
public class MyAppServiceImplSenderIntegrationTest
{
    
    private static final Logger LOG = LoggerFactory.getLogger(MyAppServiceImplSenderIntegrationTest.class);
    
    @Resource
    private MyAppExecutionService executionService;
    
    
    @Test
    public void myAppIntegrationTest()
    {
        
        for (int i = 0; i < 1; i++)
        {
            final Map<String, String> detailMap = new HashMap<String, String>();
            detailMap.put("KeyA", "ValueA" + i);
            detailMap.put("KeyB", "ValueB" + i);
            executionService.send(detailMap.toString());
        }
    }
}



Labels: ,

0 Comments:

Post a Comment

Subscribe to Post Comments [Atom]

<< Home