Fanount 基础 (没有路由Key))


SpringBoot+RabbitMQ的简单实现之Fanout模式
1.在pom中添加springboot对amqp的支持
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
2.在application.properties中添加RabbitMQ的简单配置信息
1 2 3 4 5
| spring.rabbitmq.host=127.0.0.1 #5672是发送消息端口,15672是管理端的端口 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
|
3.配置Queue(消息队列)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @Configuration public class QueueConfig { @Bean(name = "queue-fanoutA") public Queue queue_fanoutA() { return new Queue("queue-fanoutA"); } @Bean(name = "queue-fanoutB") public Queue queue-fanoutB() { return new Queue("queue-fanoutB"); } @Bean(name = "queue-fanoutC") public Queue queue-fanoutC() { return new Queue("queue-fanoutC"); }
@Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout_exchange"); }
@Bean Binding bindingExchangeFanoutA(@Qualifier("queue-fanoutA") Queue queue-fanoutA, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue-fanoutA).to(fanoutExchange); }
@Bean Binding bindingExchangeFanoutB(@Qualifier("queue-fanoutB") Queue queue-fanoutB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue-fanoutB).to(fanoutExchange); } @Bean Binding bindingExchangeFanoutC(@Qualifier("queue-fanoutC") Queue queue-fanoutC, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue-fanoutC).to(fanoutExchange); }
}
|
4.编写消息生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Component public class Sender_Fanout {
@Autowired private RabbitTemplate rabbitTemplate;
public void send(String exchangeName,String routingKey,Message message) { rabbitTemplate.convertAndSend(exchangeName,null,message); } }
|
5.编写消息消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| @Component public class Receive_Fanout {
@RabbitListener(queues="queue-fanoutA") public void processA(Message message) throws UnsupportedEncodingException { MessageProperties messageProperties = message.getMessageProperties(); String contentType = messageProperties.getContentType(); System.out.println("Receive-FanoutA:"+new String(message.getBody(), contentType)); } @RabbitListener(queues="queue-fanoutB") public void processB(Message message) throws UnsupportedEncodingException { MessageProperties messageProperties = message.getMessageProperties(); String contentType = messageProperties.getContentType(); System.out.println("Receive-FanoutB:"+new String(message.getBody(), contentType)); } @RabbitListener(queues="queue-fanoutC") public void processC(Message message) throws UnsupportedEncodingException { MessageProperties messageProperties = message.getMessageProperties(); String contentType = messageProperties.getContentType(); System.out.println("Receive-FanoutC:"+new String(message.getBody(), contentType)); } }
|
Tsst
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @RunWith(SpringRunner.class) @SpringBootTest public class TestRabbitMQ_Fanout { @Autowired private Sender_Fanout sender_Fanout; @Test public void testRabbit_Fanout() {
MessageProperties messageProperties = new MessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); messageProperties.setContentType("UTF-8"); Message message = new Message("hello,rabbit_topic!".getBytes(), messageProperties); sender_Fanout.send("fanout_exchange","",message); } }
|
1 2 3 4 5 6
| @RabbitListener(bindings = @QueueBinding(value = @Queue("myQueuebingExchange"), exchange = @Exchange("myExchange") )) public void process(String message) { log.info("message={}", message); }
|


Author:
John Doe
Permalink:
http://yoursite.com/2019/08/10/消息队列/RabbitMQ/RabbitMQ整合/Fanount 发布订阅模式/
License:
Copyright (c) 2019 CC-BY-NC-4.0 LICENSE
Slogan:
Do you believe in DESTINY?