消息队列-RabbitMq


1、AMQP 简介
AMQP (Advanced Message Queuing Protocol,高级消息队列协议)是一个线路层的协议规范, 而不是 API 规范(例如 JMS)。由于 AMQP 是一个线路层协议规范,因此它天然就是跨平台的, 就像 SMTP、 HTTP 等协议一样,只要开发者按照规范的格式发送数据,任何平台都可以通过 AMQP 进行消息交互。像目前流行的 StormMQ、 RabbitMQ 等都实现了 AMQP。

2、安装RabbitMq

自行百度

3、使用

依赖

1
2
3
4
5
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.5.2.RELEASE</version>
</dependency>

配置

1
2
3
4
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

RabbitTopicConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package org.sang.rabbitmq;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitTopicConfig {
public final static String TOPICNAME = "sang-topic";
@Bean
TopicExchange topicExchange() {
return new TopicExchange(TOPICNAME, true, false);
}
@Bean
Queue xiaomi() {
return new Queue("xiaomi");
}
@Bean
Queue huawei() {
return new Queue("huawei");
}
@Bean
Queue phone() {
return new Queue("phone");
}
@Bean
Binding xiaomiBinding() {
return BindingBuilder.bind(xiaomi()).to(topicExchange())
.with("xiaomi.#");
}
@Bean
Binding huaweiBinding() {
return BindingBuilder.bind(huawei()).to(topicExchange())
.with("huawei.#");
}
@Bean
Binding phoneBinding() {
return BindingBuilder.bind(phone()).to(topicExchange())
.with("#.phone.#");
}
}

首先创建 TopicExchange,参数和前面的一致。然后创建三个 Queue,第一个 Queue 用来存储 和“xiaom1”有关的消息,第二个 Queue 用来存储和“huawei”有关的消息,第三个 Queue 用来存储和“phone” 有关的消息。

• 将三个 Queue 分别绑定到 TopicExchange 上,第一个 Binding 中的“xiaomi.#'’表示消息的 routingkey 凡是以“xiaomi”开头的,都将被路由到名称为“xiaomi”的 Queue 上;第二个 Binding 中的“huawei.#”表示消息的 routingkey 凡是以“huawei”开头的,都将被路由到名 称为“huawei”的 Queue 上;第三个 Binding 中的“#.phone.#"则表示消息的 routingkey 中凡 是包含“phone”的,都将被路由到名称为“phone”的 Queue 上。


接下来针对三个 Queue 创建三个消费

TopicReceiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package org.sang.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicReceiver {
@RabbitListener(queues = "phone")
public void handler1(String message) {
System.out.println("PhoneReceiver:"+ message);
}
@RabbitListener(queues = "xiaomi")
public void handler2(String message) {
System.out.println("XiaoMiReceiver:"+message);
}
@RabbitListener(queues = "huawei")
public void handler3(String message) {
System.out.println("HuaWeiReceiver:"+message);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package org.sang.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void headerTest() {
/* Message nameMsg = MessageBuilder
.withBody("hello header! name-queue".getBytes())
.setHeader("name", "sang").build();
Message ageMsg = MessageBuilder
.withBody("hello header! age-queue".getBytes())
.setHeader("age", "99").build();
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg);
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg);*/
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.news","小米新闻..");
}
}

根据 RabbitTopicConfig 中的配置,第一条消息将被路由到名称为“xiaomi”的 Queue 上, 第 二条消息将被路由到名为“huawei”的 Queue 上,第三条消息将被路由到名为“xiaomi”以及名为 “phone”的 Queue 上,第四条消息将被路由到名为“huawei'’以及名为“phone”的 Queue 上,最 后一条消息则将被路由到名为“phone”的 Queue 上。