上一篇文章,简单介绍了JMS,ActiveMQ,以及ActiveMQ的网页版消息查看页面,
接下来就来实现以下点对点通信模式。
项目结构如下:
发送消息的代码:
package com.java1234.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
*消息生产者
* @author ALWZ
*
*/
public class JMSProducer {
private static final String USERNAME =ActiveMQConnection.DEFAULT_USER;//使用默认用户名
private static final String PASSWORD =ActiveMQConnection.DEFAULT_PASSWORD;//使用默认密码
private static final String BROKEURL =ActiveMQConnection.DEFAULT_BROKER_URL;//使用默认本地连接
private static final int SENDNUM =10;//发送消息的数量
public static void main(String[] args){
ConnectionFactory connectionFactory;//连接工厂
Connection connection=null;//连接
Session session;//会话 接收或者发送消息的线程
Destination destination;//消息的目的地
MessageProducer messageProducer;//消息生产者
//实例化连接工厂
connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME,
JMSProducer.PASSWORD,JMSProducer.BROKEURL);
try {
connection=connectionFactory.createConnection();//通过连接工程获取连接
connection.start();//启动连接
//创建session
session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
destination=session.createQueue("FirstQueue1");//创建消息队列
messageProducer=session.createProducer(destination);//创建消息生产者
sendMessage(session, messageProducer);//发送消息
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}finally{
if (connection!=null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
*发送消息
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session,MessageProducer messageProducer)
throws JMSException{
for (int i = 0; i < JMSProducer.SENDNUM; i++) {
TextMessage message =session.createTextMessage("ActiveMQ发送的消息"+i);//创建消息
System.out.println("发送消息:"+"ActiveMQ发送的消息"+i);
messageProducer.send(message);//发送消息
}
}
}
运行后,显示生产了20个消息:
接收者(消费者)代码:
package com.java1234.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
*消息消费者
* @author ALWZ
*
*/
public class JMSConsumer {
private static final String USERNAME =ActiveMQConnection.DEFAULT_USER;//使用默认用户名
private static final String PASSWORD =ActiveMQConnection.DEFAULT_PASSWORD;//使用默认密码
private static final String BROKEURL =ActiveMQConnection.DEFAULT_BROKER_URL;//使用默认本地连接
public static void main(String[] args){
ConnectionFactory connectionFactory;//连接工厂
Connection connection=null;//连接
Session session;//会话 接收或者发送消息的线程
Destination destination;//消息的目的地
MessageConsumer messageConsumer;//消息的消费者
//实例化连接工厂
connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME,JMSConsumer.PASSWORD,
JMSConsumer.BROKEURL);
try {
connection=connectionFactory.createConnection();//通过连接工厂获取连接
connection.start();//启动连接
session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination=session.createQueue("FirstQueue1");//创建消息队列
messageConsumer=session.createConsumer(destination);//创建消息消费者
while (true) {
//每隔100ms接受一次消息,不断的接收
TextMessage textMessage =(TextMessage)messageConsumer.receive(100000);
if (textMessage!=null) {
System.out.println("收到的消息:"+textMessage.getText());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
运行后显示:
有一个消费者,消费了20个消息
以上写的接收事件,是伸手要,每隔100ms就去要,这种方法不好,所以有一个监听器,
自动接收队列的消息。:
Listener类:
package com.java1234.activemq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
*消息监听(只要将监听注册到消息消费者里面,就可以监听消息)
* @author ALWZ
*
*/
public class Listener implements MessageListener{
@Override
public void onMessage(Message message) {
try {
System.out.println("收到的消息:"+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
然后在消费者类中,修改:
package com.java1234.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
*消息消费者
* @author ALWZ
*
*/
public class JMSConsumer2 {
private static final String USERNAME =ActiveMQConnection.DEFAULT_USER;//使用默认用户名
private static final String PASSWORD =ActiveMQConnection.DEFAULT_PASSWORD;//使用默认密码
private static final String BROKEURL =ActiveMQConnection.DEFAULT_BROKER_URL;//使用默认本地连接
public static void main(String[] args){
ConnectionFactory connectionFactory;//连接工厂
Connection connection=null;//连接
Session session;//会话 接收或者发送消息的线程
Destination destination;//消息的目的地
MessageConsumer messageConsumer;//消息的消费者
//实例化连接工厂
connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME,
JMSConsumer2.PASSWORD,JMSConsumer2.BROKEURL);
try {
connection=connectionFactory.createConnection();//通过连接工厂获取连接
connection.start();//启动连接
session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination=session.createQueue("FirstQueue1");//创建消息队列
messageConsumer=session.createConsumer(destination);//创建消息消费者
messageConsumer.setMessageListener(new Listener());//注册消息监听
} catch (JMSException e) {
e.printStackTrace();
}
}
}