上篇我们已经讲完了ActiveMQ的基本使用,这篇文章用来讲讲ActiveMQ常用的几种机制 ActiveMQ的基本使用
一、发送消息的数据类型
上面的代码演示,全部都是发送字符串,但是ActiveMQ支持哪些数据呢?
大家可以看一下 javax.jms.Message 这个接口,只要是这个接口的数据,都可以被发送。
或者这样看起来有点麻烦,那么看到上面的代码,创建消息,是通过session这个对象来创建的,那我们来看一下这里有哪些可以被创建的呢?
//纯字符串的数据 session.createTextMessage(); //序列化的对象 session.createObjectMessage(); //流,可以用来传递文件等 session.createStreamMessage(); //用来传递字节 session.createBytesMessage(); //这个方法创建出来的就是一个map,可以把它当作map来用,当你看了它的一些方法,你就懂了 session.createMapMessage(); //这个方法,拿到的是javax.jms.Message,是所有message的接口 session.createMessage();
二、传递javabean对象
传递一个java对象,可能是最多的使用方式了,而且这种数据接收与使用都方便,那么,下面的代码就来演示下如何发送一个java对象
当然了,这个对象必须序列化,也就是实现Serializable接口
//通过这个方法,可以把一个对象发送出去,当然,这个对象需要序列化,因为一切在网络在传输的,都是字节 ObjectMessage obj = session.createObjectMessage(); for(int i = 0 ; i < 100 ; i ++){ Person p = new Person(i,"名字"); obj.setObject(p); producer.send(obj); } 那么在接收端要怎么接收这个对象呢?
//实现一个消息的监听器 //实现这个监听器后,以后只要有消息,就会通过这个监听器接收到 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { //同样的,强转为ObjectMessage,然后拿到对象,强转为Person Person p = (Person) ((ObjectMessage)message).getObject(); System.out.println(p); } catch (JMSException e) { e.printStackTrace(); } } });
三、保证消息的成功处理
消息发送成功后,接收端接收到了消息。然后进行处理,但是可能由于某种原因,高并发也好,IO阻塞也好,反正这条消息在接收端处理失败了。而点对点的特性是一条消息,只会被一个接收端给接收,只要接收端A接收成功了,接收端B,就不可能接收到这条消息,如果是一些普通的消息还好,但是如果是一些很重要的消息,比如说用户的支付订单,用户的退款,这些与金钱相关的,是必须保证成功的,那么这个时候要怎么处理呢?
我们可以使用 CLIENT_ACKNOWLEDGE 模式
之前其实就有提到当创建一个session的时候,需要指定其事务,及消息的处理模式,当时使用的是
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); AUTO_ACKNOWLEDGE
这一个代码的是,当消息发送给接收端之后,就自动确认成功了,而不管接收端有没有处理成功,而一旦确认成功后,就会把队列里面的消息给清除掉,避免下一个接收端接收到同样的消息。
那么,它还有另外一个模式,那就是 CLIENT_ACKNOWLEDGE
这行要写在接收端里面,不是写在发送端的
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
这行代码以后,如果接收端不确认消息,那么activemq将会把这条消息一直保留,直到有一个接收端确定了消息。
那么要怎么确认消息呢?
在接收端接收到消息的时候,调用javax.jms.Message的acknowledge方法
@Override public void onMessage(Message message) { try { //获取到接收的数据 String text = ((TextMessage)message).getText(); System.out.println(text); //确认接收,并成功处理了消息 message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } }
这样,当消息处理成功之后,确认消息,如果不确定,activemq将会发给下一个接收端处理
注意:只在点对点中有效,订阅模式,即使不确认,也不会保存消息
四、避免消息队列的并发 JMQ设计出来的原因,就是用来避免并发的,和沟通两个系统之间的交互。先看一下之前的代码:
//实现一个消息的监听器 //实现这个监听器后,以后只要有消息,就会通过这个监听器接收到 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { //获取到接收的数据 String text = ((TextMessage)message).getText(); System.out.println(text); //确认接收,并成功处理了消息 message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } } });
之前的代码里面,实现了一个监听器,监听消息的传递,这样只要每有一个消息,都会即时的传递到程序中。
但是,这样的处理,在高并发的时候,因为它是被动接收,并没有考虑到程序的处理能力,可能会压跨系统,那要怎么办呢?
答案就是把被动变为主动,当程序有着处理消息的能力时,主动去接收一条消息进行处理
实现的代码如下:
if(当程序有能力处理){//当程序有能力处理时接收 Message receive = consumer.receive(); //这个可以设置超时时间,超过则不等待消息 recieve.receive(10000); //其实receive是一个阻塞式方法,一定会拿到值的 if(null != receive){ String text = ((TextMessage)receive).getText(); receive.acknowledge(); System.out.println(text); }else{ //没有值嘛 // } } 通过上面的代码,就可以让程序自已判断,自己是否有能力接收这条消息,如果不能接收,那就给别的接收端接收,或者等自己有能力处理的时候接收
五、消息有效期的管理
这样的场景也是有的,一条消息的有效时间,当发送一条消息的时候,可能希望这条消息在指定的时间被处理,如果超过了指定的时间,那么这条消息就失效了,就不需要进行处理了,那么我们可以使用ActiveMQ的设置有效期来实现
代码如下:
TextMessage msg = session.createTextMessage("哈哈"); for(int i = 0 ; i < 100 ; i ++){ //设置该消息的超时时间 producer.setTimeToLive(i * 1000); producer.send(msg); }
这里每一条消息的有效期都是不同的,打开ip:8161/admin/就可以查看到,里面的消息越来越少了。
过期的消息是不会被接收到的。
过期的消息会从队列中清除,并存储到ActiveMQ.DLQ这个队列里面,这个稍后会解释。
五、 过期消息,处理失败的消息如何处理过期的、处理失败的消息,将会被ActiveMQ置入“ActiveMQ.DLQ”这个队列中。
这个队列是ActiveMQ自动创建的。
如果需要查看这些未被处理的消息,可以进入这个队列中查看
//指定一个目的地,也就是一个队列的位置 destination = session.createQueue("ActiveMQ.DLQ");这样就可以进入队列中,然后实现接口,或者通过receive()方法,就可以拿到未被处理的消息,从而保证正确的处理