Monday, January 14, 2013

Spring 3 & JMS

Here is a quick way to develop a simple application using Jms.  You will need the following components:

  • Apache ActiveMQ
  • Spring
  • Java
  • Maven
 
Install ActiveMQ on Ubuntu. The latest download bundle is available at ActiveMQ download page.

Example (Running ActiveMQ 5.7.0 on Ubuntu):

1. Download apache-activemq-5.7.0-bin.tar.gz.

2. Untar the bundle:

tar zxvf apache-activemq-5.7.0-bin.tar.gz

3. Configure and start Activemq

cd apache-activemq-5.7.0
bin/activemq setup newConfig
bin/activemq start

4. Verify Activemq is running:

netstat -an |grep 61616
or
Go to Admin console by visiting http://localhost:8161/admin/

5. Create a basic Jms queue for testing:

Use Admin console.
Select Queues
Create a new queue called TestQ.

Once you have created a Jms queue, you need to write classes to produce and consume messages that are transmitted via the newly created queue.

We will utilize Spring to transmit messages over JMS queues. Since this type of delivery is point to point, you need message producers at one end and message consumers at the other end.

JMS Message Producer

1. Create a context file called: ~/workspace/jms/src/main/resources/application-context.xml.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:amq="http://activemq.apache.org/schema/core" 
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:p="http://www.springframework.org/schema/p"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
                        http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
                        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <context:component-scan base-package="com.noushin.spring.jms" />
    <context:annotation-config />

    <!-- ActiveMQ destinations to use -->
    <amq:queue id="destination" physicalName="TestQ" />
    
    <!-- ActiveMQ broker URL -->
    <amq:connectionFactory id="jmsFactory" brokerURL="tcp://localhost:61616" />

    <!-- Spring JMS ConnectionFactory -->
    <bean id="singleConnectionFactory" 
          class="org.springframework.jms.connection.SingleConnectionFactory"
          p:targetConnectionFactory-ref="jmsFactory"/>
    
    <!-- Spring JMS Producer Configuration -->
    <bean id="jmsProducerTemplate" class="org.springframework.jms.core.JmsTemplate"
        p:connectionFactory-ref="singleConnectionFactory"
        p:defaultDestination-ref="destination"/>
        
</beans>


2. Create a class that produces messages and sends them over Jms. Lets call it MessageProducer.
package com.noushin.spring.jms.producer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer {

   final static Logger logger = Logger.getLogger(MessageProducer.class);

   @Autowired
   private JmsTemplate jmsTemplate;

   public void produce() throws Exception {
      
      if (jmsTemplate != null) {
         MessageCreator mc = new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
               try {
                  TextMessage message = session.createTextMessage("This is a message.");
                  return message;
               } 
                catch (JMSException je) {
                  logger.error("JMS Exception : ", je);
                  return null;
               }
            }
         };
         jmsTemplate.send(mc);
      }
   }
}

3. Here is the pom.xml to successfully run this example
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.noushin.spring</groupId>
    <artifactId>jms</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>jms</name>
    <url>http://maven.apache.org</url>

    <properties>
        <activemq.version>5.2.0</activemq.version>
        <junit.version>4.10</junit.version>
        <log4j.version>1.2.17</log4j.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring.version>3.2.0.RELEASE</spring.version>
    </properties>

    <repositories>
        <repository>
            <id>springsource-repo</id>
            <name>SpringSource Repository</name>
            <url>http://repo.springsource.org/release</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
                <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>${activemq.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-optional</artifactId>
            <version>${activemq.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.7</version>
        </dependency>
    </dependencies>
</project>

4. After running MessageProducer.main, you should see a message added to TestQ queue.

5. Use ActiveMQ admin console to verify the above steps: http://localhost:8161/admin/queues.jsp

JMS Message Consumer

Now we need to write a class that consumes the messages in the queue waiting to be processed. In this example, I will create a second project.

1.   Create a context file called: ~/workspace/jmsc/src/main/resources/application-context.xml.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:amq="http://activemq.apache.org/schema/core" 
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jms="http://www.springframework.org/schema/jms" 
    xmlns:p="http://www.springframework.org/schema/p"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
                        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
                        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

    <context:component-scan base-package="com.noushin.spring.jms" />
    <context:annotation-config />

    <!-- ActiveMQ destinations to use -->
    <amq:queue id="destination" physicalName="TestQ" />

    <!-- ActiveMQ broker -->
    <amq:connectionFactory id="jmsFactory" brokerURL="tcp://localhost:61616" />

   <!-- JMS Consumer Configuration -->
    <bean id="jmsConsumerConnectionFactory" 
          class="org.springframework.jms.connection.SingleConnectionFactory"
          p:targetConnectionFactory-ref="jmsFactory" />
        
    <jms:listener-container container-type="default" 
                            connection-factory="jmsConsumerConnectionFactory"
                            acknowledge="auto">
        <jms:listener destination="TestQ" ref="messageConsumer" />
    </jms:listener-container>

</beans>

2.  Create a class that consumes messages as they arrive on the queue. Let's call it MessageConsumer.
package com.noushin.spring.jms.consumer;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;

@Component
public class MessageConsumer implements MessageListener {

   final static Logger logger = Logger.getLogger(MessageConsumer.class);

   private int numOfMessages = 0;

   public void onMessage(Message message) {
      try {
         numOfMessages++;
         if (message instanceof TextMessage) {
            TextMessage tm = (TextMessage) message;
            String msg = tm.getText();
            logger.info(">>>>>Processed message: " + msg + " - numOfMessages : " + numOfMessages);
         }
      } catch (JMSException e) {
         logger.error(e.getMessage(), e);
      }
   }
}

3. You can use the same pom file you used for MessageProducer. Make sure to change your project name in the pom file.

4. To test your app, write a JUnit
package com.noushin.spring.jmsc;

import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class AppTest {

   @Test 
   public void testApp() {
         ApplicationContext ctx = new ClassPathXmlApplicationContext("application-context.xml");
    }
}

5. As soon as application context is initialized, go to ActiveMQ admin console and notice the messages you produced in the first project are now removed from the queue. You should also see messages logging the results of executing onMessage method in MessageConsumer class.
2013-01-10 11:50:24,365 [org.springframework.jms.listener.DefaultMessageListenerContainer#0-1] INFO  com.noushin.spring.jms.consumer.MessageConsumer - Processed message: this is a test. - numOfMessages : 2

6. That's all folks. Have fun with Jms :)

4 comments:

  1. hi; i am trying to query oozie with activemq,
    how can i monitoring oozie coordinators using activemq

    can you help me out.

    ReplyDelete
  2. I try to run MessageProducer.main, but, there are no main in MessageProducer class. Do you have source code I could reference?

    ReplyDelete
  3. Hi.

    I'm new to jms, so googling I've found your example.

    The producer for me is ok, I can see the number of pending messages increment on ActiveMq web console. But the consumer do not consume :)

    consumer logs says:

    INFO org.springframework.jms.connection.SingleConnectionFactory - Established shared JMS Connection: ActiveMQConnection {id=ID:xxxxx1-1:1,clientId=null,started=false}.

    While logs of activemq server says: WARN | Transport Connection to: tcp://192.168.168.139:49801 failed: java.io.EOFException | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///192.168.168.139:49801@61616

    Have you got some tips for me? :)

    Thank you!!
    Marco

    ReplyDelete
  4. Thanks Noushin Bashir!
    I've got my webapp connected to ActiveMQ Server straight away using your tutorial. thumbs-up!

    ReplyDelete