RabbitMQ系列-持久化

xiaoxiao2021-07-05  244

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。  为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化。


queue的持久化

如过将queue的持久化标识durable设置为true,则代表是一个持久的队列,那么在服务重启之后,也会存在,因为服务会把持久化的queue存放在硬盘上,当服务重启的时候,会重新恢复之前被持久化的queue。

创建queue的时候会设置4个属性,其中durable代表的是否持久化

/** * Declare a queue * @see com.rabbitmq.client.AMQP.Queue.Declare * @see com.rabbitmq.client.AMQP.Queue.DeclareOk * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) { Assert.notNull(name, "\'name\' cannot be null"); this.name = name; this.durable = durable; this.exclusive = exclusive; this.autoDelete = autoDelete; this.arguments = arguments; }

 参数说明:

durable:是否持久化exclusive排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。autoDelete自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

消息的持久化

队列的持久化通过上面的属性设置,但是里面的消息是否为持久化那还要看消息的持久化设置。也就是说,重启之前那个queue里面还没有发出去的消息的话,重启之后那队列里面是不是还存在原来的消息,这个就要取决于发生着在发送消息时对消息的设置了。 

Message持久化,也就是发送时消息持久化。(Message包含body,body为我们需要发送的消息具体内容,一般以json字符串发送,消费端再解析;MessageProperties为Message的一些额外的属性,做一些扩展作用)

//org.springframework.amqp.core.*包里面的RabbitTemplate消息的时候在封装Message对象时消息默认是持久化的 public void convertAndSend(String exchange, String routingKey, Object object, CorrelationData correlationData) throws AmqpException { this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData); } protected Message convertMessageIfNecessary(Object object) { return object instanceof Message?(Message)object:this.getRequiredMessageConverter().toMessage(object, new MessageProperties()); } public MessageProperties() { this.deliveryMode = DEFAULT_DELIVERY_MODE; this.priority = DEFAULT_PRIORITY; } static { DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT; DEFAULT_PRIORITY = Integer.valueOf(0); }

从上代码不难看出在发送消息的时候消息默认是持久化的.


交换机的持久化

public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) { super(name, durable, autoDelete, arguments); } public FanoutExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) { super(name, durable, autoDelete, arguments); } public TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) { super(name, durable, autoDelete, arguments); } public HeadersExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) { super(name, durable, autoDelete, arguments); }

设置每种交换机的durable=true就好.


参考文章:https://www.tuicool.com/articles/6nIBBjN

转载请注明原文地址: https://www.6miu.com/read-4821323.html

最新回复(0)