A sample (example) java class with source code to connect ActiveMQ queue and read incoming meassages to the Queue. It listen to the queue and whenever any message comes to the queueue.
Suppose you need to wrtie a ActiveMQ client which will consume
messages from ActiveMQ queue. Then following code will help you.
You can copy the following code and use it in your project.
Using
this code you will be able to read all the messages en-queued in a
queue of Active MQ
. .
package easycodeforall.zpagecode;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ReadFromActiveMQ implements ExceptionListener {
private String iD = "";
private String QueueOrTopicNameForInput = "";
private String URLOfInputQueueOrTopicServer = "";
ActiveMQConnectionFactory connectionFactory = null;
// Create a Connection
Connection connection;
// Create a Session
Session session = null;
Destination destination = null;
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = null;
// ServiceBean serviceBean = null;
public static void main(String args[]) throws JMSException {
new ReadFromActiveMQ().readMessage();
}
public ReadFromActiveMQ() throws JMSException {
System.out.println("Constructor ReadFromActiveMQ.");
}
public String readMessage() {
String text = "";
try {
System.out.println("Started Reading from Q3");
System.out.println("seting conn to reading " + QueueOrTopicNameForInput);
connectionFactory = new ActiveMQConnectionFactory(URLOfInputQueueOrTopicServer);
// Create a Connection
connection = connectionFactory.createConnection();
connection.setExceptionListener(this);
connection.start();
// Create a Session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
System.out.println("Started Reading from Q4");
// Create the destination (Topic or Queue)
destination = session.createQueue(QueueOrTopicNameForInput);
System.out.println("Started Reading from Q5");
// Create a MessageConsumer from the Session to the Topic or Queue
consumer = session.createConsumer(destination);
System.out.println("Waiting for the message:");
System.out.println("Receivedmsg^: " + QueueOrTopicNameForInput);
Message message = consumer.receive(1000);
System.out.println("Receivedmsg^: " + message);
System.out.println("read message= " + message);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
text = textMessage.getText();
System.out.println("read Received= " + text);
;
} else {
System.out.println("Received: " + message);
System.out.println("NonTxtmessageread Received= " + text);
}
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
} finally {
System.out.println("closing cnnection= ");
closeAll();
}
return text;
}
public void closeAll() {
try {
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
public synchronized void onException(JMSException ex) {
System.out.println("JMS Exception occured. Shutting down client.");
}
}