写在前面:本文采用rabbitmq环境是docker单节点。 项目地址:https://github.com/Blankwhiter/AMQP
在centos窗口中,执行如下命令拉取镜像,以及创建容器:
docker pull rabbitmq:3.7-management docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq-single rabbitmq:3.7-management注:5672 -> 客户端与rabbitmq的通信端口 15672 -> 图形化管理界面端口
创建容器后,在浏览器输入http://192.168.10.170:15672
注:192.168.10.170是虚拟机ip ,默认的username/password为guest/guest
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>template</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>template</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--引入序列化 由于本次测试没有引入spring-boot-starter-web模块,故引入jackson-databind <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.6</version> <scope>compile</scope> </dependency> <!--引入amqp--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>application.yml
spring: rabbitmq: host: 192.168.9.219 port: 5672 username: guest password: guestAmqpConfig.java
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class AmqpConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }测试类,读者请按照步骤依次执行
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.Arrays; import java.util.HashMap; @RunWith(SpringRunner.class) @SpringBootTest public class TemplateApplicationTests { /** * rabbitmq 给rabbitmq 收发消息 */ @Autowired RabbitTemplate rabbitTemplate; /** * amqpTemplate 给amqp中间件 收发消息, 推荐使用 */ @Autowired AmqpTemplate amqpTemplate; /** * 交换器、队列管理 * amqp系统管理功能组件 */ @Autowired AmqpAdmin amqpAdmin; /** * 第一步: * 使用amqpAdmin初始化。创建交换器,队列,以及交换器绑定队列 * 执行完成。访问 http://192.168.9.219:15672/#/exchanges/%2F/exchange.direct 可以查看具体信息 */ @Test public void init() { /** * exchange 类型主要分DirectExchange FanoutExchange TopicExchange HeadersExchange * 具体区别更多详情请查看 https://blog.csdn.net/belonghuang157405/article/details/83184388 关于amqp相关内容 */ //创建交换器 点对点模式 amqpAdmin.declareExchange(new DirectExchange("exchange.direct")); //创建队列 queue第二个参数:是否持久化 amqpAdmin.declareQueue(new Queue("direct-queue",true)); //交换器绑定队列 amqpAdmin.declareBinding(new Binding("direct-queue", Binding.DestinationType.QUEUE,"exchange.direct","fruit.apple",null)); } /** * 第二步: * 发送消息 */ @Test public void sendMessage(){ //第一种.使用send 需要自己构造一个Message,定义消息内容以及消息头 //rabbitTemplate.send("exchange.direct","fruit.apple",new Message("apple-message-made".getBytes(),null)); //第二种.使用convertAndSend 第三个参数object默认当初消息体,自动序列化发送给rabbitmq HashMap<Object, Object> map = new HashMap<>(); map.put("type","red apple"); map.put("data", Arrays.asList(1,2,3)); // convertAndSend 使用的转换器是SimpleMessageConverter 会采用jdk序列方式, // 但往往大多数会想使用json方式,故多编写一个配置类:AmqpConfig // 这时候再去http://192.168.9.219:15672/#/queues/%2F/direct-queue界面上Get messages时候查看消息就是json格式 rabbitTemplate.convertAndSend("exchange.direct","fruit.apple",map); } /** * 第三步: * 接收消息 */ @Test public void receiveMessage(){ Object message = rabbitTemplate.receiveAndConvert("direct-queue"); System.out.println(message.getClass()); System.out.println(message.toString()); } }pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>annotation</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>annotation</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--引入amqp--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>application.yml
spring: rabbitmq: host: 192.168.9.219 port: 5672 username: guest password: guestAmqpConfig.java
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class AmqpConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }测试类,读者请按照步骤依次执行
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.Arrays; import java.util.HashMap; @RunWith(SpringRunner.class) @SpringBootTest public class AnnotationApplicationTests { /** * rabbitmq 给rabbitmq 收发消息 */ @Autowired RabbitTemplate rabbitTemplate; /** * amqpTemplate 给amqp中间件 收发消息, 推荐使用 */ @Autowired AmqpTemplate amqpTemplate; /** * 交换器、队列管理 * amqp系统管理功能组件 */ @Autowired AmqpAdmin amqpAdmin; /** * 第一步: * 使用amqpAdmin初始化。创建交换器,队列,以及交换器绑定队列 * 执行完成。访问 http://192.168.9.219:15672/#/exchanges/%2F/exchange.fanout 可以查看具体信息 */ @Test public void init() { /** * exchange 类型主要分DirectExchange FanoutExchange TopicExchange HeadersExchange * 具体区别更多详情请查看 https://blog.csdn.net/belonghuang157405/article/details/83184388 关于amqp相关内容 */ //创建交换器 广播模式 amqpAdmin.declareExchange(new FanoutExchange("exchange.fanout")); //创建队列 queue第二个参数:是否持久化 amqpAdmin.declareQueue(new Queue("fanout-queue1",true)); amqpAdmin.declareQueue(new Queue("fanout-queue2",true)); amqpAdmin.declareQueue(new Queue("fanout-queue3",true)); //交换器绑定队列 amqpAdmin.declareBinding(new Binding("fanout-queue1", Binding.DestinationType.QUEUE,"exchange.fanout","fruit.orange-1",null)); amqpAdmin.declareBinding(new Binding("fanout-queue2", Binding.DestinationType.QUEUE,"exchange.fanout","fruit.orange-2",null)); amqpAdmin.declareBinding(new Binding("fanout-queue3", Binding.DestinationType.QUEUE,"exchange.fanout","fruit.orange-3",null)); } /** * 第二步: * 发送消息 */ @Test public void sendMessage(){ //第一种.使用send 需要自己构造一个Message,定义消息内容以及消息头 //rabbitTemplate.send("exchange.fanout","fruit.apple",new Message("apple-message-made".getBytes(),null)); //第二种.使用convertAndSend 第三个参数object默认当初消息体,自动序列化发送给rabbitmq HashMap<Object, Object> map = new HashMap<>(); map.put("type","red orange"); map.put("data", Arrays.asList(1,2,3)); // convertAndSend 使用的转换器是SimpleMessageConverter 会采用jdk序列方式, // 但往往大多数会想使用json方式,故多编写一个配置类:AmqpConfig // 这时候再去http://192.168.9.219:15672/#/queues/%2F/fanout-queueX界面上Get messages时候查看消息就是json格式 rabbitTemplate.convertAndSend("exchange.fanout","",map); } }按照步骤运行完上述代码,编写MessageService类,并启动项目。 MessageService.java
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.util.HashMap; @Service public class MessageService { @RabbitListener(queues = "fanout-queue1") public void receiveQueue1Message(HashMap<String,Object> map){ System.out.println("fanout-queue1 begin"); Object message = map.getOrDefault("data", "no message"); System.out.println(message.toString()); System.out.println("fanout-queue1 end"); } @RabbitListener(queues = "fanout-queue2") public void receiveQueue2Message(HashMap<String,Object> map){ System.out.println("fanout-queue2 begin"); Object message = map.getOrDefault("data", "no message"); System.out.println(message.toString()); System.out.println("fanout-queue2 end"); } @RabbitListener(queues = "fanout-queue3") public void receiveQueue3Message(HashMap<String,Object> map){ System.out.println("fanout-queue3 begin"); Object message = map.getOrDefault("data", "no message"); System.out.println(message.toString()); System.out.println("fanout-queue3 end"); } }启动完成后 控制台打印如下信息:
fanout-queue3 end fanout-queue1 begin [1, 2, 3] fanout-queue1 end fanout-queue2 begin [1, 2, 3] fanout-queue2 end fanout-queue3 begin [1, 2, 3] fanout-queue3 end