分布式消息队列(RabbitMQ)学习记录
本文使用的操作系统为windows,使用JDK17
分布式消息队列的使用场景:
- 异步处理
- 应用解耦
- 流量削峰
- 流量削峰
基本概念
- AMQP协议(Advanced Message Queue Protocol):高级消息队列协议
- 生产者:生产消息
- 消费者:消费消息
- 交换机:转发消息到对应的队列
- 路由:转发消息的路径
- 队列:储存消息
下载安装RabbitMQ
Installing RabbitMQ | RabbitMQ
下载RabbitMQ4.1.4
使用Erlang26.2
RabbitMQ是使用Erlang开发的消息队列,安装RabbitMQ需要先安装Erlang,Erlang是一门高性能的编程语言
RabbitMQ版本与Erlang版本的对应关系
先安装Erlang,然后安装RabbitMQ
安装成功后可以在windows的服务管理面板看到RabbitMQ服务
安装RabbitMQ的监控面板
进入RabbitMQ安装目录下的"sbin"目录,使用CMD运行下面的脚本启用Rabiit的管理面板
rabbitmq-plugins.bat enable rabbitmq_management
输出以下信息表示安装成功
重启RabbitMQ访问"http://localhost:15672"
默认的账号密码都是guest,登陆后的界面
这个guest用户只能本地访问,远程访问需要新建一个用户
快速入门(Hello World)
一对一的场景,一个消息生产者往一个队列发送消息,一个消息消费者从一个队列中消费消息
在项目中加入操作RabbitMQ的客户端依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.26.0</version>
</dependency>
生产者代码
-
创建连接工厂
//1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory();
-
配置
//2.配置 //设置RabbitMQ服务端的主机地址 factory.setHost("localhost"); //实际生产中要配置用户名和密码等等 // factory.setUsername(""); // factory.setPassword("");
-
创建连接,并拿到channel
//3.创建连接,并拿到channel //我们通过channel来操作RabbitMQ Connection connection = factory.newConnection(); Channel channel = connection.createChannel())
-
给channel配置队列的消息,当对应名称的信息队列不存在时会自动创建
//4.给channel配置队列的消息 /** * 第一个参数(queue)是队列的名称 * 第二参数是(durable)这是是否持久化队列 * 第三个参数(exclusive)表示是否为独占队列,独占队列只允许创建该队列的连接进行操作,其他连接无法操作该队列(服务重启后可能谁也无法操作这个队列) * 第四个参数(autoDelete)表示是否为自动删除队列,该队列没有连接使用时,自动删除队列 * 第五个参数(arguments)表示其它额外的参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null);
方法原型
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
-
生产发布消息
//5.生产消息 /** * 第一个参数(exchange)指定一个交换机 * 第二个参数(routingKey)表示队列的名称 * 第三个参数(props)表示携带的额外属性 * 第四个参数(body)是要生产的消息本身 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
方法原型
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
package space.anyi.rabbitMQ_learn.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
//生产者
public class Send {
//队列的名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.配置
//设置RabbitMQ服务端的主机地址
factory.setHost("localhost");
//实际生产中要配置用户名和密码等等
// factory.setUsername("");
// factory.setPassword("");
//3.创建连接,并拿到channel
//我们通过channel来操作RabbitMQ
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//4.给channel配置队列的消息
/**
* 第一个参数(queue)是队列的名称
* 第二参数是(durable)这是是否持久化队列
* 第三个参数(exclusive)表示是否为独占队列,独占队列只允许创建该队列的连接进行操作,其他连接无法操作该队列(服务重启后可能谁也无法操作这个队列)
* 第四个参数(autoDelete)表示是否为自动删除队列,该队列没有连接使用时,自动删除队列
* 第五个参数(arguments)表示其它额外的参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
//5.生产消息
/**
* 第一个参数(exchange)指定一个交换机
* 第二个参数(routingKey)表示队列的名称
* 第三个参数(props)表示携带的额外属性
* 第四个参数(body)是要生产的消息本身
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
消费者代码
-
创建连接工厂
//1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory();
-
配置
//2.配置 factory.setHost("localhost");
-
获取连接和对应的channel
//3.获取连接和对应的channel Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
-
配置队列(这一步不是必须的,当对应名称的信息队列已经存在时,可以省略,一般为了代码的健壮性都会加上)
//4.配置队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
消费消息
/** * 第一个参数(queue)是队列的名称 * 第二个参数(autoAck)表示获取消息后是否自动响应消息被消费成功 * 第三个参数(deliverCallback)是消息的处理回调方法 * 第四个参数(cancelCallback)是消息被取消的回调方法 */ //5.消费消息,会持续阻塞,等待消息队列有可以消费的消息 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
方法原型
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
消费信息的回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); };
package space.anyi.rabbitMQ_learn.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
//消费者
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.配置
factory.setHost("localhost");
//3.获取连接和对应的channel
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//4.配置队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
/**
* 第一个参数(queue)是队列的名称
* 第二个参数(autoAck)表示获取消息后是否自动响应消息被消费成功
* 第三个参数(deliverCallback)是消息的处理回调方法
* 第四个参数(cancelCallback)是消息被取消的回调方法
*/
//5.消费消息,会持续阻塞,等待消息队列有可以消费的消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
工作队列(Work Queue)
一个消息生产者对应一个队列,一个队列对应多个消费者
场景:单个消费者处理消息能力不够强,需要加机器进行并行处理
生产者代码
-
队列持久化
//队列持久化,将durable设置为true channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
方法原型
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
-
消息持久化
//使用MessageProperties.PERSISTENT_TEXT_PLAIN表示发布的这个消息是纯文本的,并且表示这个消息是以持久化的模式发送的 /** * MessageProperties.PERSISTENT_TEXT_PLAIN中有两个属性: * contentType:text/plain 表示纯文本 * deliveryMode:2 表示消息持久化 */ channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
方法原型
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
package space.anyi.rabbitMQ_learn.workQueue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.util.Scanner;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
private static final Scanner input = new Scanner(System.in);
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//队列持久化,将durable设置为true
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
while (input.hasNext()) {
//接收控制台的消息,用于发送多条消息
String message = input.nextLine();
//使用MessageProperties.PERSISTENT_TEXT_PLAIN表示发布的这个消息是纯文本的,并且表示这个消息是以持久化的模式发送的
/**
* contentType:text/plain 表示纯文本
* deliveryMode:2 表示消息持久化
*/
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
消费者代码
-
取消消息自动确认
//设置消息不再自动确认,需要手动确认 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
-
设置一次取多少条消息
//Qos一次处理多少条消息,即从消息队列一次取多少条消息 channel.basicQos(1);
方法原型
void basicQos(int prefetchCount) throws IOException;
-
消息确认
/** * 消息确认 * 第一个参数(deliveryTag)是消息的Tag * 第二个参数(multiple)是是否批量确认消息,即确认这一次从消息队列取回的所以消息(有些消息可能还没处理完),一般不使用批量确认,处理完一条就确认一条消息 */ channel.basicAck(deliveryTag, false);
方法原型
void basicAck(long deliveryTag, boolean multiple) throws IOException;
-
消息拒绝
- basicNackh和basicReject都可以拒绝消息,basicNack可以批量拒绝,basicReject只能拒绝一条
/** * 消息拒绝 * 第一个参数(deliveryTag)是消息的Tag * 第二个参数(multiple)是是否批量确认消息 * 第三个参数(requeue)表示是否将消息重新放回消息队列 */ channel.basicNack(deliveryTag,false,true);
/** * 消息拒绝 * 第一个参数(deliveryTag)是消息的Tag * 第二个参数(requeue)表示是否将消息重新放回消息队列 */ channel.basicReject(deliveryTag,true);
方法原型
void basicNack(long deliveryTag, boolean multiple, boolean requeue)throws IOException; void basicReject(long deliveryTag, boolean requeue) throws IOException;
RabbitMQ的消息确认机制
stateDiagram-v2
[*] --> Ready : 消息入队
Ready --> Unacked : 消费者取走
Unacked --> Ready : 消费者断开连接
Unacked --> Acked : 消费者确认
Acked --> [*] : 从队列中删除
Unacked --> Ready : 消费者拒绝(并重新入队)
Unacked --> [*] : 消费者拒绝(并丢弃)
note right of Ready
消息在队列中,
等待被消费
end note
note right of Unacked
消息已被消费者持有,
但未得到最终确认。
此状态消息不会被删除。
end note
- ready:消息未被消费者取走
- Unacked:消息被消费者取走,但还未确认
- acked:消息被消费者取走,且已经确认
- nack:信息消费失败
- reject:消息被消费者拒绝
- 还未被消费者取走的消息才能被消费者取走
- 消费者取走消息但未确认,消息不会从消息队列中删除
- 只有消费者确认的消息才会从消息队列中删除
- 被拒绝的消息可以重新放回消息队列,也可能被丢弃
package space.anyi.rabbitMQ_learn.workQueue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
//设置消息不再自动确认,需要手动确认
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//Qos一次处理多少条消息,即从消息队列一次取多少条消息
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
//业务逻辑
doWork(message);
} finally {
System.out.println(" [x] Done");
//通过delivery.getEnvelope().getDeliveryTag()获取消息的Tag,可以认为是消息的ID
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
// 消息处理完成,确认消息成功接收
/**
* 消息确认
* 第一个参数(deliveryTag)是消息的Tag
* 第二个参数(multiple)是是否批量确认消息,即确认这一次从消息队列取回的所以消息(有些消息可能还没处理完),一般不使用批量确认,处理完一条就确认一条消息
*/
channel.basicAck(deliveryTag, false);
/**
* 消息拒绝
* 第一个参数(deliveryTag)是消息的Tag
* 第二个参数(multiple)是是否批量确认消息
* 第三个参数(requeue)表示是否将消息重新放回消息队列
*/
//channel.basicNack(deliveryTag,false,true);
/**
* 消息拒绝
* 第一个参数(deliveryTag)是消息的Tag
* 第二个参数(requeue)表示是否将消息重新放回消息队列
*/
//channel.basicReject(deliveryTag,true);
}
};
//连接到服务端,开始消费消息,取消自动确认消息,改为手动确认
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
/**
* 消息的处理
* @param task 消息任务
*/
private static void doWork(String task) {
try {
//模拟消费消息消耗的时间
Thread.sleep(1000*5);
System.out.println("处理消息:%s".formatted(task));
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
发布订阅(Publish/Subcribe)信息队列
一个生产者对应一个交换机,一个交换机对应多个队列
场景:一个系统产生的信息被多个外部系统使用,交换机将生产的信息转发到外部系统使用的信息队列中
引入交换机的概念,交换机负责信息的转发,将信息转发到对应满足条件的信息队列
交换机有四种类型:
- direct
- topic
- headers
- fanout
这里的案例以fanout类型为例,fanout直译为扇出,可以理解为广播
生产者代码
-
先声明交换机,指定名称和类型
/** * 第一个参数(exchange)是交换机的名称 * 第二个参数(type)是交换机的类型,支持的类型有:direct, topic, headers and fanout * 这里使用fanout类型,即广播类型,会将消息发送给所有绑定到该交换机的队列 */ channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
方法原型
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
-
然后声明消息队列
-
再将消息队列绑定到交换机上
/** * 一个fanout交换机绑定多个信息队列(表示不同外部系统使用的消息队列) * 第一个参数(queue)是队列的名称 * 第二个参数(exchange)是交换机的名称 * 第三个参数(routingKey)是路由键,这里使用空字符串表示不需要路由键 */ channel.queueBind(queue1, EXCHANGE_NAME, ""); channel.queueBind(queue2, EXCHANGE_NAME, "");
方法原型
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
-
最后发布信息
//4.发布信息,向交换机发布信息 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
package space.anyi.rabbitMQ_learn.publish_subscribe;
import java.util.Scanner;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @ProjectName: RabbitMQ-learn
* @FileName: EmitLog
* @Author: 杨逸
* @Data:2025/10/7 21:47
* @Description: 信息生产者
*/
public class EmitLog {
//交换机的名称
private static final String EXCHANGE_NAME = "logs";
public static final Scanner input = new Scanner(System.in);
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//1.声明一个交换机
/**
* 第一个参数(exchange)是交换机的名称
* 第二个参数(type)是交换机的类型,支持的类型有:direct, topic, headers and fanout
* 这里使用fanout类型,即广播类型,会将消息发送给所有绑定到该交换机的队列
*/
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queue1 = "queue1";
String queue2 = "queue2";
//2.声明队列
channel.queueDeclare(queue1, false, false, false, null);
channel.queueDeclare(queue2, false, false, false, null);
//3.绑定队列到交换机上(注意需要先声明队列才能绑定队列)
/**
* 一个fanout交换机绑定多个信息队列(表示不同外部系统使用的消息队列)
* 第一个参数(queue)是队列的名称
* 第二个参数(exchange)是交换机的名称
* 第三个参数(routingKey)是路由键,这里使用空字符串表示不需要路由键
*/
channel.queueBind(queue1, EXCHANGE_NAME, "");
channel.queueBind(queue2, EXCHANGE_NAME, "");
while (input.hasNext()) {
String message = input.nextLine();
//4.发布信息,向交换机发布信息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
消费者代码
使用两个不同的channel表示不同的外部系统消费不同的信息队列
- 拿到channel
- 消费对应的信息队列
package space.anyi.rabbitMQ_learn.publish_subscribe;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* @ProjectName: RabbitMQ-learn
* @FileName: ReceiveLogs
* @Author: 杨逸
* @Data:2025/10/7 22:00
* @Description: 信息消费者
*/
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
//1.获取两个channel模拟绑定到fanout交换机的多个队列同时进行消费
Channel channel1 = connection.createChannel();
Channel channel2 = connection.createChannel();
//消费者不需要绑定到交换机
//channel1.exchangeDeclare(EXCHANGE_NAME, "fanout");
//channel2.exchangeDeclare(EXCHANGE_NAME, "fanout");
//消费者也不需要绑定到信息队列
String queueName1 = "queue1";
String queueName2 = "queue2";
//channel1.queueBind(queueName1, EXCHANGE_NAME, "");
//channel2.queueBind(queueName2, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//两个消费信息的回调
DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [消费者1] Received '" + message + "'");
};
DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [消费者2] Received '" + message + "'");
};
//2.消费对应队列的信息
channel1.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { });
channel2.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
}
}
路由(Routing)信息队列
一个信息生产者对应一个交换机,一个交换机对应多个信息队列,交换机转发信息根据routingKey进行转发到对应的信息队列,direct交换机的路由键routingKey的匹配规则是字符串的完全匹配
场景:一个系统生产的信息有等级层次划分,有些信息只希望授权过的外部系统才能访问,根据路由键这些信息只发往特定的信息队列
生产者代码
- 交换机使用direct类型,direct交换机的工作方式是根据路由键(routingKey)进行信息转发,路由键的匹配方式是字符串完全匹配
- 绑定队列指定交换机和路由键
- 发布信息指定交换机的路由键
package space.anyi.rabbitMQ_learn.routing;
import java.util.Scanner;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @ProjectName: RabbitMQ-learn
* @FileName: EmitLogDirect
* @Author: 杨逸
* @Data:2025/10/9 8:33
* @Description:
*/
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static final String[] queueNames = new String[]{"error","waring","info"};
public static final Scanner input = new Scanner(System.in);
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//声明一个交换机,使用direct类型的交换机
/**
* direct类型的交换机转发信息需要根据路由键
*/
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//声明队列,三个表示不同类型日志的信息队列
for (int i = 0; i < queueNames.length; i++) {
String queueName = queueNames[i];
String routingKey = queueName;
//信息队列声明
channel.queueDeclare(queueName, false, false, false, null);
//交换机绑定队列
/**
* 交换机绑定信息队列,指定路由键(routingKey)
*/
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
}
while (input.hasNext()) {
System.out.println("请输入路由键:");
String routingKey = input.nextLine();
System.out.println("请输入信息:");
String message = input.nextLine();
//发布信息时,指定交换机exchange和路由键routingKey
/**
* 根据路由键转发到对应的信息队列
*/
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
}
}
消费者代码
- 消费不同的信息队列
package space.anyi.rabbitMQ_learn.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* @ProjectName: RabbitMQ-learn
* @FileName: ReceiveLogsDirect
* @Author: 杨逸
* @Data:2025/10/9 8:48
* @Description:
*/
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static final String[] queueNames = new String[]{"error","waring","info"};
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
for (int i = 0; i < queueNames.length; i++) {
String queueName = queueNames[i];
String routingKey = queueName;
//信息队列声明
channel.queueDeclare(queueName, false, false, false, null);
//交换机绑定队列
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
//获取信息的routingKey
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
};
//模拟多个客户端消费信息
for (int i = 0; i < queueNames.length; i++) {
String queueName = queueNames[i];
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
}
主题(Topic)信息队列
topic交换机与direct交换机的主要区别是,topic交换机的路由键routingKey支持通配符的格式
场景:特定的一类信息给特定的一类程序处理
topic类型交换机的路由键是支持通配符(类Ant格式)的,一个单词有多个字符,单词与单词之间以点(.)分割
- "*":匹配一个单词
- "#":匹配零个或多个单词
- "*.orange.*" :匹配"a.orange.b",也匹配"abc.orange.abc",但不匹配"aorangeb"
- "a.#" :匹配"a.b.c",也匹配"a.b",也匹配"a",但不匹配"ab"
生产者代码
package space.anyi.rabbitMQ_learn.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
/**
* @ProjectName: RabbitMQ-learn
* @FileName: EmitLogTopic
* @Author: 杨逸
* @Data:2025/10/9 9:45
* @Description:
*/
public class EmitLogTopic {
public static final Scanner input = new Scanner(System.in);
private static final String EXCHANGE_NAME = "topic_logs";
public static final String[] queueNames = new String[]{"q1","q2"};
public static final String[] routingKeys = new String[]{"*.orange.*","a.#"};
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//创建一个topic类型的交换机
/**
* topic类型交换机的路由键是支持通配符(类Ant格式)的,一个单词有多个字符,单词与单词之间以点(.)分割
* * :匹配一个单词
* # :匹配零个或多个单词
*
* e.g:
* *.orange.* :匹配a.orange.b,也匹配abc.orange.abc,但不匹配aorangeb
* a.# :匹配a.b.c,也匹配a.b,也匹配a,但不匹配ab
*/
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
for (int i = 0; i < queueNames.length; i++) {
String queueName = queueNames[i];
String routingKey = routingKeys[i];
//创建信息队列
channel.queueDeclare(queueName, false, false, false, null);
//将队列绑定到交换机
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
}
System.out.println("请输入路由键:");
while (input.hasNext()) {
String routingKey = input.nextLine();
System.out.println("请输入消息:");
String message = input.nextLine();
//生产信息
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
System.out.println("请输入路由键:");
}
}
}
}
消费者代码
消费者通过信息队列的名称进行信息消费即可
package space.anyi.rabbitMQ_learn.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* @ProjectName: RabbitMQ-learn
* @FileName: ReceiveLogsTopic
* @Author: 杨逸
* @Data:2025/10/9 10:05
* @Description:
*/
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static final String[] queueNames = new String[]{"q1","q2"};
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//获取信息队列的名称
//String queueName = channel.queueDeclare().getQueue();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
//消费信息
for (String queueName : queueNames) {
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
}
核心特性
信息确认机制
信息过期机制
场景:有些信息的时效比较高,一段时间不处理后可能就不需要处理了,
-
给信息队列指定过期时间(信息队列里的每一条信息都有固定的过期时间)
-
在创建信息队列时指定过期时间
//信息队列的额外配置参数 Map<String, Object> arguments = new HashMap<String, Object>(); //设置队列中的消息的过期时间为五秒钟 arguments.put("x-message-ttl",5*1000); //创建信息队列时,通过arguments设置队列的过期时间 channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
信息在过期时间过后仍没有被消费,信息就会被丢弃
信息被读取,但未确认,在信息过期时间到后不会被丢弃
- 在信息发布时给信息指定过期时间
//创建一个信息的额外配置,设置消息的过期时间为十秒钟
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(10*1000+"").build();
//在发布信息时给特定信息指定过期时间
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
给信息队列设置的过期时间对信息队列中的每一条信息都生效
给信息设置的过期时间只对这条信息生效
package space.anyi.rabbitMQ_learn.ttl;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @ProjectName: RabbitMQ-learn
* @FileName: Producer
* @Author: 杨逸
* @Data:2025/10/9 16:40
* @Description:
*/
public class Producer {
private final static String QUEUE_NAME = "ttl_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//信息队列的额外配置参数
Map<String, Object> arguments = new HashMap<String, Object>();
//设置队列中的消息的过期时间为五秒钟
arguments.put("x-message-ttl",5*1000);
//创建信息队列时,通过arguments设置队列的过期时间
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
String message = "Hello World!";
//创建一个信息的额外配置,设置消息的过期时间为十秒钟
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration(10*1000+"")
.build();
//在发布信息时给特定信息指定过期时间
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
/**
* 总结:
* 给信息队列设置的过期时间对信息队列中的每一条信息都生效
* 给信息设置的过期时间只对这条信息生效
*/
}
}
}
死信队列
死信:因为各种原因导致无法被处理的信息被称为死信,比如信息过期,信息消费失败又没有重新放到信息队列,信息队列满了信息放不进去
死信队列:保存死信的队列,就是个普通的信息队列,只是保存的是死信
死信交换机:将死信转发到死信队列的交换机,也是一个普通的交换机
死信队列是为了给系统提供的一种容错机制,让无法被处理的信息,也有机会可以被处理,防止信息的丢失
- 死信的处理方法
- 给信息队列绑定一个出现死信使用的交换机exchange
- 给信息队列绑定一个出现死信使用的路由键routingKey
//创建工作交换机
channel.exchangeDeclare(WORK_EXCHANGE, "direct");
//1.创建死信交换机
channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, "direct");
//2.创建死信队列
String dlx_name1 = "dead_letter_queue1";
channel.queueDeclare(dlx_name1, false, false, false, null);
//3.死信队列绑定死信交换机
String routingKey1 = "key1";
channel.queueBind(dlx_name1, DEAD_LETTER_EXCHANGE, routingKey1);
//死信交换机配置
Map<String,Object> arguments1 = new HashMap<>();
/**
* 通过x-dead-letter-exchange参数指定死信交换机(exchange)的名称
* 通过x-dead-letter-routing-key参数指定死信转发的路由键(routingKey)
*/
arguments1.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
arguments1.put("x-dead-letter-routing-key", routingKey1);
//4.创建工作信息队列,通过arguments参数配置死信交换机
channel.queueDeclare(WORK_QUEUE1, false, false, false, arguments1);
//工作队列绑定工作交换机
channel.queueBind(WORK_QUEUE1, WORK_EXCHANGE, routingKey1);
- 生产者代码
package space.anyi.rabbitMQ_learn.dead_letter;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.Scanner;
/**
* @ProjectName: RabbitMQ-learn
* @FileName: Producer
* @Author: 杨逸
* @Data:2025/10/9 18:39
* @Description: 死信队列机制
* 工作交换机将通过路由键转发到工作信息队列
* 当工作队列产生死信时,死信交换机通过路由键转发到死信队列
*/
public class Producer {
public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
public static final String WORK_EXCHANGE = "work_exchange";
public static final String WORK_QUEUE1 = "work_queue1";
public static final String WORK_QUEUE2 = "work_queue2";
public static final Scanner input = new Scanner(System.in);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();) {
//创建工作交换机
channel.exchangeDeclare(WORK_EXCHANGE, "direct");
//创建死信交换机
channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, "direct");
//创建死信队列
String dlx_name1 = "dead_letter_queue1";
String dlx_name2 = "dead_letter_queue2";
channel.queueDeclare(dlx_name1, false, false, false, null);
channel.queueDeclare(dlx_name2, false, false, false, null);
//死信队列绑定死信交换机
String routingKey1 = "key1";
String routingKey2 = "key2";
channel.queueBind(dlx_name1, DEAD_LETTER_EXCHANGE, routingKey1);
channel.queueBind(dlx_name2, DEAD_LETTER_EXCHANGE, routingKey2);
//死信交换机配置
Map<String,Object> arguments1 = new HashMap<>();
Map<String,Object> arguments2 = new HashMap<>();
/**
* 通过x-dead-letter-exchange参数指定死信交换机(exchange)的名称
* 通过x-dead-letter-routing-key参数指定死信转发的路由键(routingKey)
*/
arguments1.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
arguments1.put("x-dead-letter-routing-key", routingKey1);
arguments2.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
arguments2.put("x-dead-letter-routing-key", routingKey2);
//创建工作信息队列,通过arguments参数配置死信交换机
channel.queueDeclare(WORK_QUEUE1, false, false, false, arguments1);
channel.queueDeclare(WORK_QUEUE2, false, false, false, arguments2);
//工作队列绑定工作交换机
channel.queueBind(WORK_QUEUE1, WORK_EXCHANGE, routingKey1);
channel.queueBind(WORK_QUEUE2, WORK_EXCHANGE, routingKey2);
System.out.println("请输入路由键:");
while (input.hasNext()) {
String routingKey = input.nextLine();
System.out.println("请输入信息:");
String message = input.nextLine();
//发送信息
channel.basicPublish(WORK_EXCHANGE,routingKey,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
System.out.println("请输入路由键:");
}
}
}
}
- 消费者代码
package space.anyi.rabbitMQ_learn.dead_letter;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* @ProjectName: RabbitMQ-learn
* @FileName: Consumer
* @Author: 杨逸
* @Data:2025/10/9 18:59
* @Description:
* 消费者拒绝工作队列的信息模拟死信的产生
* 模拟其他程序消费死信队列的信息
*/
public class Consumer {
public static final String WORK_QUEUE1 = "work_queue1";
public static final String WORK_QUEUE2 = "work_queue2";
public static final String DLX_QUEUE_NAME1 = "dead_letter_queue1";
public static final String DLX_QUEUE_NAME2 = "dead_letter_queue2";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//工作信息的处理
DeliverCallback deliverCallback = (consumerTag,delivery)->{
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
String routingKey = delivery.getEnvelope().getRoutingKey();
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
//拒绝信息,模拟死信的产生
channel.basicNack(deliveryTag,false,false);
System.out.println(" [worker] Received '" + routingKey + "':'" + message + "已拒绝'");
};
//死信队列的处理
DeliverCallback deliverCallback1 = (consumerTag,delivery)->{
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [dlx1] Received '" + routingKey + "':'" + message + "'");
};
DeliverCallback deliverCallback2 = (consumerTag,delivery)->{
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [dlx2] Received '" + routingKey + "':'" + message + "'");
};
//消费工作信息
channel.basicConsume(WORK_QUEUE1,false,deliverCallback, consumerTag->{});
channel.basicConsume(WORK_QUEUE2,false,deliverCallback, consumerTag->{});
//消费死信队列的信息
channel.basicConsume(DLX_QUEUE_NAME1,true,deliverCallback1,consumerTag->{});
channel.basicConsume(DLX_QUEUE_NAME2,true,deliverCallback2,consumerTag->{});
}
}
RabbitMQ常用API
- 常规操作
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.配置
//设置RabbitMQ服务端的主机地址
factory.setHost("localhost");
//实际生产中要配置用户名和密码等等
factory.setUsername("");
factory.setPassword("");
//3.创建连接,并拿到channel
//我们通过channel来操作RabbitMQ
Connection connection = factory.newConnection();
Channel channel = connection.createChannel())
- 创建交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//方法原型
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
- 创建信息队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//方法原型
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
- 信息队列绑定交换机
channel.queueBind(queue1, EXCHANGE_NAME, "");
//方法原型
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
- 发布信息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
//方法原型
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
- 消费信息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
//方法原型
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
- 给信息队列设置额外的参数
//信息队列的额外配置参数
Map<String, Object> arguments = new HashMap<String, Object>();
//设置队列中的消息的过期时间为五秒钟
arguments.put("x-message-ttl",5*1000);
//创建信息队列时,通过arguments设置队列的过期时间
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
- 给信息设置额外的参数
//创建一个信息的额外配置,设置消息的过期时间为十秒钟
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(10*1000+"").build();
//在发布信息时给特定信息指定过期时间
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
- 信息消费的处理回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
RabbitMQ集成到项目中
通过springboot starter集成到项目中
导入依赖
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.16</version>
</dependency>
配置RabbitMQ
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
自定义信息生产者
package space.anyi.BI.mq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @ProjectName: serve
* @FileName: MessageProducer
* @Author: 杨逸
* @Data:2025/10/9 20:17
* @Description: 信息生产者
*/
@Component
public class MessageProducer {
//通过RabbitTemplate操作RabbitMQ
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 发送信息
* @param exchange 交换机
* @param routingKey 路由键
* @param message 信息
* @description:
* @author: 杨逸
* @data:2025/10/09 20:19:06
* @since 1.0.0
*/
public void sendMessage(String exchange,String routingKey,String message) {
//发送信息
rabbitTemplate.convertAndSend(exchange,routingKey,message);
}
}
自定义信息消费者
package space.anyi.BI.mq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* @ProjectName: serve
* @FileName: MessageConsumer
* @Author: 杨逸
* @Data:2025/10/9 20:22
* @Description: 信息消费者
*/
@Component
public class MessageConsumer {
/**
* @param message 信息
* @param channel 通道
* @param deliverTag 消息标签
* @description:
* @author: 杨逸
* @data:2025/10/09 20:29:40
* @since 1.0.0
*/
//接收信息
@RabbitListener(queues = {"queue_name"},ackMode = "MANUAL")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliverTag) {
//消费信息的代码
}
}
RabbitMQ的重要特性
面试考点
- 消息队列的概念,模型,应用场景
- 交换机的类别,路由的绑定关系
- 信息可靠性
- 信息确认机制(ack,nack,reject)
- 信息持久化(durable)
- 信息过期机制
- 死信队列
- 延迟队列(类似死信队列)
- 顺序消费,消费幂等性
- 可扩展性
- 集群
- 故障恢复机制
- 镜像
- 运维监控告警