Spring Boot整合rabbitmq实例(超级便捷)direct,topic,fanout模式简单易懂

xiaoxiao2025-04-09  15

1,rabbitmq发送消息的流程图如下

在springboot整合rabbitmq时大致的流程我简要说明一下,建立消息发送者,发送消息到绑定的路由器上,消息接受者通过路由键寻找到自己所要监听的消息队列。

2,交换机几种路由模式

 从图中能看出交换机是通过绑定键与消息队列中的路由键进行关联的,具体的关联规则包含direct(直连),topic(匹配),fanout(广播),headers(头消息匹配)。

在讲解代码之前先引入支持rabbitmq的jar包

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

这是spring-boot为我们提供支持AMQP协议的jar包

2.1direct模式

   发送端

import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Configuration public class DirectSend { @Value("${rabbitmq.exchangeDirectName}") public String exchangeDirectName; @Value("${rabbitmq.bindingDirectKey}") public String bindingDirectKey; @Autowired private AmqpTemplate rabbitTemplate; @Scheduled(cron = "0/5 * * * * *") public void directSend(){ System.out.println("准备开始发送direct模式的消息=======>"); rabbitTemplate.convertAndSend(exchangeDirectName,bindingDirectKey,"这是一条direct模式发送的消息"); System.out.println("发送direct模式的消息完毕"); } }

接受端

import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queues.direct",autoDelete = "true"),exchange = @Exchange(value = "${rabbitmq.exchangeDirectName}",type = ExchangeTypes.DIRECT),key = "${rabbitmq.bindingDirectKey}" )) public class DirectMessageConsumer { @RabbitHandler public void directMessageConsumer(String msg){ System.out.println("direct模式接受的消息为:"+msg); } }

application.properties设置的变量

#设置交换机的名称 rabbitmq.exchangeDirectName=exchange-test-direct #设置交换机的绑定键 rabbitmq.bindingDirectKey=binding.direct

这种direct模式是根据路由键进行全匹配,只有交换机的绑定键(bindingkey)和消息队列的路由键(routingkey)相同时才会进行绑定。需要注意的是@Scheduled是定时任务的注解,需要在启动类中注明@EnableScheduling元注解。否则定时任务不会生效

运行结果

2.2 topic模式

发送端

import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component public class TopicSend { @Value("${rabbitmq.exchangeTopicName}") public String exchangeDirectName; @Autowired private AmqpTemplate rabbitTemplate; @Scheduled(cron = "0/5 * * * * *")//5秒发送一次 public void topicSend(){ System.out.println("准备开始发送topic模式的消息=======>"); rabbitTemplate.convertAndSend(exchangeDirectName,"binding.topic.warn","这是一条topic模式发送的警告消息"); rabbitTemplate.convertAndSend(exchangeDirectName,"binding.topic.error","这是一条topic模式发送的错误消息"); rabbitTemplate.convertAndSend(exchangeDirectName,"binding.topic.log","这是一条topic模式发送的日志消息"); System.out.println("发送topic模式的消息完毕"); } }

接收端

import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queues.topic",autoDelete = "true"),exchange = @Exchange(value = "${rabbitmq.exchangeTopicName}",type = ExchangeTypes.TOPIC),key = "${rabbitmq.bindingTopicKey}" )) public class TopicMessageConsumer { @RabbitHandler public void TopicMessageConsumer(String msg){ System.out.println("topic模式接受的消息为:"+msg); } }

配置的交换机名称

#设置交换机的名称 rabbitmq.exchangeTopicName=exchange-test-topic #设置交换机的绑定键 rabbitmq.bindingTopicKey=binding.topic.*

运行结果

 

2.3 fanout模式

发送端

import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component public class FanoutSend { @Value("${rabbitmq.exchangeFanoutName}") public String exchangeFanoutName; @Autowired private AmqpTemplate rabbitTemplate; @Scheduled(cron = "0/5 * * * * *")//5秒发送一次 public void fanoutSend(){ System.out.println("准备开始发送fanout模式的消息=======>"); rabbitTemplate.convertAndSend(exchangeFanoutName,"bindingkey","这是一条fanout模式发送的消息"); System.out.println("发送fanout模式的消息完毕"); } }

接收端

import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queues.fanout",autoDelete = "true"),exchange = @Exchange(value = "${rabbitmq.exchangeFanoutName}",type = ExchangeTypes.FANOUT) )) public class FanoutMessageConsumer { @RabbitHandler public void fanoutMessageConsumer(String msg){ System.out.println("fanout模式fanoutMessageConsumer接受的消息为:"+msg); } } @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queues.fanout2",autoDelete = "true"),exchange = @Exchange(value = "${rabbitmq.exchangeFanoutName}",type = ExchangeTypes.FANOUT) )) class Fanout2MessageConsumer { @RabbitHandler public void fanout2MessageConsumer(String msg){ System.out.println("fanout模式fanout2MessageConsumer接受的消息为:"+msg); } }

运行结果

如发现报错

org.springframework.amqp.AmqpIOException: java.io.IOException     at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:71) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:113) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1853) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1790) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.amqp.rabbit.core.RabbitAdmin.initialize(RabbitAdmin.java:540) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.amqp.rabbit.core.RabbitAdmin.lambda$null$9(RabbitAdmin.java:453) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.2.RELEASE.jar:na]     at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164) ~[spring-retry-1.2.2.RELEASE.jar:na]     at org.springframework.amqp.rabbit.core.RabbitAdmin.lambda$afterPropertiesSet$10(RabbitAdmin.java:452) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:36) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:634) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:240) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1816) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1790) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:345) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1604) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:995) [spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at java.lang.Thread.run(Thread.java:745) [na:1.8.0_102] Caused by: java.io.IOException: null     at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126) ~[amqp-client-5.4.3.jar:5.4.3]     at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122) ~[amqp-client-5.4.3.jar:5.4.3]     at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144) ~[amqp-client-5.4.3.jar:5.4.3]     at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:777) ~[amqp-client-5.4.3.jar:5.4.3]     at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:52) ~[amqp-client-5.4.3.jar:5.4.3]     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_102]     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_102]     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_102]     at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_102]     at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1032) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at com.sun.proxy.$Proxy92.exchangeDeclare(Unknown Source) ~[na:na]     at org.springframework.amqp.rabbit.core.RabbitAdmin.declareExchanges(RabbitAdmin.java:587) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.amqp.rabbit.core.RabbitAdmin.lambda$initialize$11(RabbitAdmin.java:541) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1847) ~[spring-rabbit-2.0.8.RELEASE.jar:2.0.8.RELEASE]     ... 17 common frames omitted Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - cannot redeclare exchange 'exchange-test-direct' in vhost '/' with different type, durable, internal or autodelete value, class-id=40, method-id=10)     at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-5.4.3.jar:5.4.3]     at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.4.3.jar:5.4.3]     at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494) ~[amqp-client-5.4.3.jar:5.4.3]     at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288) ~[amqp-client-5.4.3.jar:5.4.3]     at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138) ~[amqp-client-5.4.3.jar:5.4.3]     ... 28 common frames omitted Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - cannot redeclare exchange 'exchange-test-direct' in vhost '/' with different type, durable, internal or autodelete value, class-id=40, method-id=10)     at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516) ~[amqp-client-5.4.3.jar:5.4.3]     at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346) ~[amqp-client-5.4.3.jar:5.4.3]     at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178) ~[amqp-client-5.4.3.jar:5.4.3]     at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111) ~[amqp-client-5.4.3.jar:5.4.3]     at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:670) ~[amqp-client-5.4.3.jar:5.4.3]     at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48) ~[amqp-client-5.4.3.jar:5.4.3]     at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:597) ~[amqp-client-5.4.3.jar:5.4.3]     ... 1 common frames omitted

2018-10-30 16:49:28.673  WARN 12588 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue: queues.topic 2018-10-30 16:49:28.673  WARN 12588 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=3

 

请删除rabbitmq客户端的交换机和队列 重新配置

 

 

 

 

转载请注明原文地址: https://www.6miu.com/read-5027872.html

最新回复(0)