MQ学习(二)--- 点对点模式

xiaoxiao2021-02-28  11

上一篇文章,简单介绍了JMS,ActiveMQ,以及ActiveMQ的网页版消息查看页面,

接下来就来实现以下点对点通信模式。

 

项目结构如下:

 

发送消息的代码:

package com.java1234.activemq;

 

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

 

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

 

/**

 *消息生产者

 * @author ALWZ

 *

 */

public class JMSProducer {

private static final String USERNAME =ActiveMQConnection.DEFAULT_USER;//使用默认用户名

 

private static final String PASSWORD =ActiveMQConnection.DEFAULT_PASSWORD;//使用默认密码

 

private static final String BROKEURL =ActiveMQConnection.DEFAULT_BROKER_URL;//使用默认本地连接

 

private static final int SENDNUM =10;//发送消息的数量

 

public static void main(String[] args){

ConnectionFactory connectionFactory;//连接工厂

Connection connection=null;//连接

Session session;//会话 接收或者发送消息的线程

Destination destination;//消息的目的地

MessageProducer messageProducer;//消息生产者

//实例化连接工厂

connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME,

JMSProducer.PASSWORD,JMSProducer.BROKEURL);

 

try {

connection=connectionFactory.createConnection();//通过连接工程获取连接

connection.start();//启动连接

//创建session

session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

destination=session.createQueue("FirstQueue1");//创建消息队列

messageProducer=session.createProducer(destination);//创建消息生产者

sendMessage(session, messageProducer);//发送消息

session.commit();

 

} catch (JMSException e) {

e.printStackTrace();

}finally{

if (connection!=null) {

try {

connection.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

}

 

}

/**

 *发送消息

 * @param session

 * @param messageProducer

 * @throws JMSException

 */

public static void sendMessage(Session session,MessageProducer messageProducer) 

      throws JMSException{

for (int i = 0; i < JMSProducer.SENDNUM; i++) {

TextMessage message =session.createTextMessage("ActiveMQ发送的消息"+i);//创建消息

System.out.println("发送消息:"+"ActiveMQ发送的消息"+i);

messageProducer.send(message);//发送消息

}

}                

 

}

运行后,显示生产了20个消息:

 

 

 

接收者(消费者)代码:

package com.java1234.activemq;

 

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MessageConsumer;

import javax.jms.Session;

import javax.jms.TextMessage;

 

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

 *消息消费者

 * @author ALWZ

 *

 */

public class JMSConsumer {

 

private static final String USERNAME =ActiveMQConnection.DEFAULT_USER;//使用默认用户名

 

private static final String PASSWORD =ActiveMQConnection.DEFAULT_PASSWORD;//使用默认密码

 

private static final String BROKEURL =ActiveMQConnection.DEFAULT_BROKER_URL;//使用默认本地连接

 

public static void main(String[] args){

ConnectionFactory connectionFactory;//连接工厂

Connection connection=null;//连接

Session session;//会话 接收或者发送消息的线程

Destination destination;//消息的目的地

MessageConsumer messageConsumer;//消息的消费者

 

//实例化连接工厂

connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME,JMSConsumer.PASSWORD,

JMSConsumer.BROKEURL);

 

try {

connection=connectionFactory.createConnection();//通过连接工厂获取连接

connection.start();//启动连接                

session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

destination=session.createQueue("FirstQueue1");//创建消息队列

messageConsumer=session.createConsumer(destination);//创建消息消费者

while (true) {

//每隔100ms接受一次消息,不断的接收

TextMessage textMessage =(TextMessage)messageConsumer.receive(100000);

if (textMessage!=null) {

System.out.println("收到的消息:"+textMessage.getText());

}else {

break;

}

}

 

} catch (JMSException e) {

e.printStackTrace();

}

}

}

 

运行后显示:

有一个消费者,消费了20个消息

 

以上写的接收事件,是伸手要,每隔100ms就去要,这种方法不好,所以有一个监听器,

自动接收队列的消息。:

Listener类:

package com.java1234.activemq;

 

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageListener;

import javax.jms.TextMessage;

/**

 *消息监听(只要将监听注册到消息消费者里面,就可以监听消息)

 * @author ALWZ

 *

 */

public class Listener implements MessageListener{

 

@Override

public void onMessage(Message message) {

try {

System.out.println("收到的消息:"+((TextMessage)message).getText());

} catch (JMSException e) {

e.printStackTrace();

}

}

 

 

}

 

 

然后在消费者类中,修改:

package com.java1234.activemq;

 

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MessageConsumer;

import javax.jms.Session;

 

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

 *消息消费者

 * @author ALWZ

 *

 */

public class JMSConsumer2 {

 

private static final String USERNAME =ActiveMQConnection.DEFAULT_USER;//使用默认用户名

 

private static final String PASSWORD =ActiveMQConnection.DEFAULT_PASSWORD;//使用默认密码

 

private static final String BROKEURL =ActiveMQConnection.DEFAULT_BROKER_URL;//使用默认本地连接

 

public static void main(String[] args){

ConnectionFactory connectionFactory;//连接工厂

Connection connection=null;//连接

Session session;//会话 接收或者发送消息的线程

Destination destination;//消息的目的地

MessageConsumer messageConsumer;//消息的消费者

 

//实例化连接工厂

connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME,

JMSConsumer2.PASSWORD,JMSConsumer2.BROKEURL);

 

try {

connection=connectionFactory.createConnection();//通过连接工厂获取连接

connection.start();//启动连接                

session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

destination=session.createQueue("FirstQueue1");//创建消息队列

messageConsumer=session.createConsumer(destination);//创建消息消费者

messageConsumer.setMessageListener(new Listener());//注册消息监听

 

} catch (JMSException e) {

e.printStackTrace();

}

}

}

转载请注明原文地址: https://www.6miu.com/read-200429.html

最新回复(0)