分布式消息队列(RabbitMQ)学习记录

本文使用的操作系统为windows,使用JDK17

RabbitMQ官网

分布式消息队列技术选型参考

分布式消息队列的使用场景:

  • 异步处理
  • 应用解耦
  • 流量削峰
  • 流量削峰

基本概念

  • AMQP协议(Advanced Message Queue Protocol):高级消息队列协议
  • 生产者:生产消息
  • 消费者:消费消息
  • 交换机:转发消息到对应的队列
  • 路由:转发消息的路径
  • 队列:储存消息

下载安装RabbitMQ

Installing RabbitMQ | RabbitMQ

RabbitMQ4.1.4安装包链接

Erlang26.2安装包链接

下载RabbitMQ4.1.4

使用Erlang26.2

RabbitMQ是使用Erlang开发的消息队列,安装RabbitMQ需要先安装Erlang,Erlang是一门高性能的编程语言

RabbitMQ版本与Erlang版本的对应关系

先安装Erlang,然后安装RabbitMQ

安装成功后可以在windows的服务管理面板看到RabbitMQ服务

安装RabbitMQ的监控面板

进入RabbitMQ安装目录下的"sbin"目录,使用CMD运行下面的脚本启用Rabiit的管理面板

rabbitmq-plugins.bat enable rabbitmq_management

输出以下信息表示安装成功

重启RabbitMQ访问"http://localhost:15672"

默认的账号密码都是guest,登陆后的界面

这个guest用户只能本地访问,远程访问需要新建一个用户

快速入门(Hello World)

官方文档案例

一对一的场景,一个消息生产者往一个队列发送消息,一个消息消费者从一个队列中消费消息

在项目中加入操作RabbitMQ的客户端依赖

        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.26.0</version>
        </dependency>

生产者代码

  1. 创建连接工厂

    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    
  2. 配置

    //2.配置
    //设置RabbitMQ服务端的主机地址
    factory.setHost("localhost");
    //实际生产中要配置用户名和密码等等
    //        factory.setUsername("");
    //        factory.setPassword("");
    
  3. 创建连接,并拿到channel

    //3.创建连接,并拿到channel
    //我们通过channel来操作RabbitMQ
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel()) 
    
  4. 给channel配置队列的消息,当对应名称的信息队列不存在时会自动创建

    //4.给channel配置队列的消息
    /**
    * 第一个参数(queue)是队列的名称
    * 第二参数是(durable)这是是否持久化队列
    * 第三个参数(exclusive)表示是否为独占队列,独占队列只允许创建该队列的连接进行操作,其他连接无法操作该队列(服务重启后可能谁也无法操作这个队列)
    * 第四个参数(autoDelete)表示是否为自动删除队列,该队列没有连接使用时,自动删除队列
    * 第五个参数(arguments)表示其它额外的参数
    */
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    

    方法原型

    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
    
  5. 生产发布消息

    //5.生产消息
    /**
    * 第一个参数(exchange)指定一个交换机
    * 第二个参数(routingKey)表示队列的名称
    * 第三个参数(props)表示携带的额外属性
    * 第四个参数(body)是要生产的消息本身
    */
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    

    方法原型

    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
    
package space.anyi.rabbitMQ_learn.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
//生产者
public class Send {
    //队列的名称
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.配置
        //设置RabbitMQ服务端的主机地址
        factory.setHost("localhost");
        //实际生产中要配置用户名和密码等等
//        factory.setUsername("");
//        factory.setPassword("");
        //3.创建连接,并拿到channel
        //我们通过channel来操作RabbitMQ
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //4.给channel配置队列的消息
            /**
             * 第一个参数(queue)是队列的名称
             * 第二参数是(durable)这是是否持久化队列
             * 第三个参数(exclusive)表示是否为独占队列,独占队列只允许创建该队列的连接进行操作,其他连接无法操作该队列(服务重启后可能谁也无法操作这个队列)
             * 第四个参数(autoDelete)表示是否为自动删除队列,该队列没有连接使用时,自动删除队列
             * 第五个参数(arguments)表示其它额外的参数
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            //5.生产消息
            /**
             * 第一个参数(exchange)指定一个交换机
             * 第二个参数(routingKey)表示队列的名称
             * 第三个参数(props)表示携带的额外属性
             * 第四个参数(body)是要生产的消息本身
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消费者代码

  1. 创建连接工厂

    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    
  2. 配置

    //2.配置
    factory.setHost("localhost");
    
  3. 获取连接和对应的channel

    //3.获取连接和对应的channel
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    
  4. 配置队列(这一步不是必须的,当对应名称的信息队列已经存在时,可以省略,一般为了代码的健壮性都会加上)

    //4.配置队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
  5. 消费消息

    /**
    * 第一个参数(queue)是队列的名称
    * 第二个参数(autoAck)表示获取消息后是否自动响应消息被消费成功
    * 第三个参数(deliverCallback)是消息的处理回调方法
    * 第四个参数(cancelCallback)是消息被取消的回调方法
    */
    //5.消费消息,会持续阻塞,等待消息队列有可以消费的消息
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    

    方法原型

    String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
    

    消费信息的回调

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
    
package space.anyi.rabbitMQ_learn.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

//消费者
public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.配置
        factory.setHost("localhost");
        //3.获取连接和对应的channel
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //4.配置队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        /**
         * 第一个参数(queue)是队列的名称
         * 第二个参数(autoAck)表示获取消息后是否自动响应消息被消费成功
         * 第三个参数(deliverCallback)是消息的处理回调方法
         * 第四个参数(cancelCallback)是消息被取消的回调方法
         */
        //5.消费消息,会持续阻塞,等待消息队列有可以消费的消息
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

工作队列(Work Queue)

一个消息生产者对应一个队列,一个队列对应多个消费者

场景:单个消费者处理消息能力不够强,需要加机器进行并行处理

官方文档案例

生产者代码

  • 队列持久化

    //队列持久化,将durable设置为true
    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    

    方法原型

    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
    
  • 消息持久化

    //使用MessageProperties.PERSISTENT_TEXT_PLAIN表示发布的这个消息是纯文本的,并且表示这个消息是以持久化的模式发送的
    /**
    * MessageProperties.PERSISTENT_TEXT_PLAIN中有两个属性:
    * contentType:text/plain 表示纯文本
    * deliveryMode:2 表示消息持久化
    */
    channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
    

    方法原型

    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
    
package space.anyi.rabbitMQ_learn.workQueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.util.Scanner;

public class NewTask {

    private static final String TASK_QUEUE_NAME = "task_queue";
    private static final Scanner input = new Scanner(System.in);

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //队列持久化,将durable设置为true
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

            while (input.hasNext()) {
                //接收控制台的消息,用于发送多条消息
                String message = input.nextLine();

                //使用MessageProperties.PERSISTENT_TEXT_PLAIN表示发布的这个消息是纯文本的,并且表示这个消息是以持久化的模式发送的
                /**
                 * contentType:text/plain 表示纯文本
                 * deliveryMode:2 表示消息持久化
                 */
                channel.basicPublish("", TASK_QUEUE_NAME,
                        MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }

}

消费者代码

  • 取消消息自动确认

    //设置消息不再自动确认,需要手动确认
    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    
  • 设置一次取多少条消息

    //Qos一次处理多少条消息,即从消息队列一次取多少条消息
    channel.basicQos(1);
    

    方法原型

    void basicQos(int prefetchCount) throws IOException;
    
  • 消息确认

    /**
    * 消息确认
    * 第一个参数(deliveryTag)是消息的Tag
    * 第二个参数(multiple)是是否批量确认消息,即确认这一次从消息队列取回的所以消息(有些消息可能还没处理完),一般不使用批量确认,处理完一条就确认一条消息
    */
    channel.basicAck(deliveryTag, false);
    

    方法原型

    void basicAck(long deliveryTag, boolean multiple) throws IOException;
    
  • 消息拒绝

    • basicNackh和basicReject都可以拒绝消息,basicNack可以批量拒绝,basicReject只能拒绝一条
    /**
    * 消息拒绝
    * 第一个参数(deliveryTag)是消息的Tag
    * 第二个参数(multiple)是是否批量确认消息
    * 第三个参数(requeue)表示是否将消息重新放回消息队列
    */
    channel.basicNack(deliveryTag,false,true);
    
    /**
    * 消息拒绝
    * 第一个参数(deliveryTag)是消息的Tag
    * 第二个参数(requeue)表示是否将消息重新放回消息队列
    */
    channel.basicReject(deliveryTag,true);
    

    方法原型

    void basicNack(long deliveryTag, boolean multiple, boolean requeue)throws IOException;
    void basicReject(long deliveryTag, boolean requeue) throws IOException;
    

RabbitMQ的消息确认机制

官方文档

stateDiagram-v2
    [*] --> Ready : 消息入队
    Ready --> Unacked : 消费者取走
    Unacked --> Ready : 消费者断开连接
    Unacked --> Acked : 消费者确认
    Acked --> [*] : 从队列中删除
    Unacked --> Ready : 消费者拒绝(并重新入队)
    Unacked --> [*] : 消费者拒绝(并丢弃)
    
    note right of Ready
        消息在队列中,
        等待被消费
    end note

    note right of Unacked
        消息已被消费者持有,
        但未得到最终确认。
        此状态消息不会被删除。
    end note
  • ready:消息未被消费者取走
  • Unacked:消息被消费者取走,但还未确认
  • acked:消息被消费者取走,且已经确认
  • nack:信息消费失败
  • reject:消息被消费者拒绝
  1. 还未被消费者取走的消息才能被消费者取走
  2. 消费者取走消息但未确认,消息不会从消息队列中删除
  3. 只有消费者确认的消息才会从消息队列中删除
  4. 被拒绝的消息可以重新放回消息队列,也可能被丢弃
package space.anyi.rabbitMQ_learn.workQueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        //设置消息不再自动确认,需要手动确认
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //Qos一次处理多少条消息,即从消息队列一次取多少条消息
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                //业务逻辑
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                //通过delivery.getEnvelope().getDeliveryTag()获取消息的Tag,可以认为是消息的ID
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                // 消息处理完成,确认消息成功接收
                /**
                 * 消息确认
                 * 第一个参数(deliveryTag)是消息的Tag
                 * 第二个参数(multiple)是是否批量确认消息,即确认这一次从消息队列取回的所以消息(有些消息可能还没处理完),一般不使用批量确认,处理完一条就确认一条消息
                 */
                channel.basicAck(deliveryTag, false);
                /**
                 * 消息拒绝
                 * 第一个参数(deliveryTag)是消息的Tag
                 * 第二个参数(multiple)是是否批量确认消息
                 * 第三个参数(requeue)表示是否将消息重新放回消息队列
                 */
                //channel.basicNack(deliveryTag,false,true);
                /**
                 * 消息拒绝
                 * 第一个参数(deliveryTag)是消息的Tag
                 * 第二个参数(requeue)表示是否将消息重新放回消息队列
                 */
                //channel.basicReject(deliveryTag,true);
            }
        };
        //连接到服务端,开始消费消息,取消自动确认消息,改为手动确认
        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }

    /**
     * 消息的处理
     * @param task 消息任务
     */
    private static void doWork(String task) {
        try {
            //模拟消费消息消耗的时间
            Thread.sleep(1000*5);
            System.out.println("处理消息:%s".formatted(task));
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

发布订阅(Publish/Subcribe)信息队列

官方文档案例

一个生产者对应一个交换机,一个交换机对应多个队列

场景:一个系统产生的信息被多个外部系统使用,交换机将生产的信息转发到外部系统使用的信息队列中

引入交换机的概念,交换机负责信息的转发,将信息转发到对应满足条件的信息队列

交换机有四种类型:

  • direct
  • topic
  • headers
  • fanout

这里的案例以fanout类型为例,fanout直译为扇出,可以理解为广播

生产者代码

  1. 先声明交换机,指定名称和类型

    /**
    * 第一个参数(exchange)是交换机的名称
    * 第二个参数(type)是交换机的类型,支持的类型有:direct, topic, headers and fanout
    * 这里使用fanout类型,即广播类型,会将消息发送给所有绑定到该交换机的队列
    */
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    

    方法原型

    Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
    
  2. 然后声明消息队列

  3. 再将消息队列绑定到交换机上

    /**
    * 一个fanout交换机绑定多个信息队列(表示不同外部系统使用的消息队列)
    * 第一个参数(queue)是队列的名称
    * 第二个参数(exchange)是交换机的名称
    * 第三个参数(routingKey)是路由键,这里使用空字符串表示不需要路由键
    */
    channel.queueBind(queue1, EXCHANGE_NAME, "");
    channel.queueBind(queue2, EXCHANGE_NAME, "");
    

    方法原型

    Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
    
  4. 最后发布信息

    //4.发布信息,向交换机发布信息
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
    
package space.anyi.rabbitMQ_learn.publish_subscribe;

import java.util.Scanner;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @ProjectName: RabbitMQ-learn
 * @FileName: EmitLog
 * @Author: 杨逸
 * @Data:2025/10/7 21:47
 * @Description: 信息生产者
 */
public class EmitLog {
    //交换机的名称
    private static final String EXCHANGE_NAME = "logs";
    public static final Scanner input = new Scanner(System.in);

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //1.声明一个交换机
            /**
             * 第一个参数(exchange)是交换机的名称
             * 第二个参数(type)是交换机的类型,支持的类型有:direct, topic, headers and fanout
             * 这里使用fanout类型,即广播类型,会将消息发送给所有绑定到该交换机的队列
             */
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String queue1 = "queue1";
            String queue2 = "queue2";
            //2.声明队列
            channel.queueDeclare(queue1, false, false, false, null);
            channel.queueDeclare(queue2, false, false, false, null);

            //3.绑定队列到交换机上(注意需要先声明队列才能绑定队列)
            /**
             * 一个fanout交换机绑定多个信息队列(表示不同外部系统使用的消息队列)
             * 第一个参数(queue)是队列的名称
             * 第二个参数(exchange)是交换机的名称
             * 第三个参数(routingKey)是路由键,这里使用空字符串表示不需要路由键
             */
            channel.queueBind(queue1, EXCHANGE_NAME, "");
            channel.queueBind(queue2, EXCHANGE_NAME, "");

            while (input.hasNext()) {
                String message = input.nextLine();
                //4.发布信息,向交换机发布信息
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
}

消费者代码

使用两个不同的channel表示不同的外部系统消费不同的信息队列

  1. 拿到channel
  2. 消费对应的信息队列
package space.anyi.rabbitMQ_learn.publish_subscribe;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * @ProjectName: RabbitMQ-learn
 * @FileName: ReceiveLogs
 * @Author: 杨逸
 * @Data:2025/10/7 22:00
 * @Description: 信息消费者
 */

public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        //1.获取两个channel模拟绑定到fanout交换机的多个队列同时进行消费
        Channel channel1 = connection.createChannel();
        Channel channel2 = connection.createChannel();
        //消费者不需要绑定到交换机
        //channel1.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //channel2.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //消费者也不需要绑定到信息队列
        String queueName1 = "queue1";
        String queueName2 = "queue2";
        //channel1.queueBind(queueName1, EXCHANGE_NAME, "");
        //channel2.queueBind(queueName2, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //两个消费信息的回调
        DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [消费者1] Received '" + message + "'");
        };
        DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [消费者2] Received '" + message + "'");
        };
        //2.消费对应队列的信息
        channel1.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { });
        channel2.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
    }
}

路由(Routing)信息队列

官方文档案例

一个信息生产者对应一个交换机,一个交换机对应多个信息队列,交换机转发信息根据routingKey进行转发到对应的信息队列,direct交换机的路由键routingKey的匹配规则是字符串的完全匹配

场景:一个系统生产的信息有等级层次划分,有些信息只希望授权过的外部系统才能访问,根据路由键这些信息只发往特定的信息队列

生产者代码

  • 交换机使用direct类型,direct交换机的工作方式是根据路由键(routingKey)进行信息转发,路由键的匹配方式是字符串完全匹配
  • 绑定队列指定交换机和路由键
  • 发布信息指定交换机的路由键
package space.anyi.rabbitMQ_learn.routing;

import java.util.Scanner;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * @ProjectName: RabbitMQ-learn
 * @FileName: EmitLogDirect
 * @Author: 杨逸
 * @Data:2025/10/9 8:33
 * @Description:
 */

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";
    public static final String[]  queueNames = new String[]{"error","waring","info"};
    public static final Scanner input = new Scanner(System.in);

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //声明一个交换机,使用direct类型的交换机
            /**
             * direct类型的交换机转发信息需要根据路由键
             */
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");

            //声明队列,三个表示不同类型日志的信息队列
            for (int i = 0; i < queueNames.length; i++) {
                String queueName = queueNames[i];
                String routingKey = queueName;
                //信息队列声明
                channel.queueDeclare(queueName, false, false, false, null);
                //交换机绑定队列
                /**
                 * 交换机绑定信息队列,指定路由键(routingKey)
                 */
                channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
            }

            while (input.hasNext()) {
                System.out.println("请输入路由键:");
                String routingKey = input.nextLine();
                System.out.println("请输入信息:");
                String message = input.nextLine();
                //发布信息时,指定交换机exchange和路由键routingKey
                /**
                 * 根据路由键转发到对应的信息队列
                 */
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
            }

        }
    }
}

消费者代码

  • 消费不同的信息队列
package space.anyi.rabbitMQ_learn.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * @ProjectName: RabbitMQ-learn
 * @FileName: ReceiveLogsDirect
 * @Author: 杨逸
 * @Data:2025/10/9 8:48
 * @Description:
 */

public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";
    public static final String[]  queueNames = new String[]{"error","waring","info"};

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        for (int i = 0; i < queueNames.length; i++) {
            String queueName = queueNames[i];
            String routingKey = queueName;
            //信息队列声明
            channel.queueDeclare(queueName, false, false, false, null);
            //交换机绑定队列
            channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            //获取信息的routingKey
            String routingKey = delivery.getEnvelope().getRoutingKey();
            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        };
        //模拟多个客户端消费信息
        for (int i = 0; i < queueNames.length; i++) {
            String queueName = queueNames[i];
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        }
    }
}

主题(Topic)信息队列

官方文档案例

topic交换机与direct交换机的主要区别是,topic交换机的路由键routingKey支持通配符的格式

场景:特定的一类信息给特定的一类程序处理

topic类型交换机的路由键是支持通配符(类Ant格式)的,一个单词有多个字符,单词与单词之间以点(.)分割

  • "*":匹配一个单词
  • "#":匹配零个或多个单词
  • "*.orange.*" :匹配"a.orange.b",也匹配"abc.orange.abc",但不匹配"aorangeb"
  • "a.#" :匹配"a.b.c",也匹配"a.b",也匹配"a",但不匹配"ab"

生产者代码

package space.anyi.rabbitMQ_learn.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
/**
 * @ProjectName: RabbitMQ-learn
 * @FileName: EmitLogTopic
 * @Author: 杨逸
 * @Data:2025/10/9 9:45
 * @Description:
 */

public class EmitLogTopic {
    public static final Scanner input = new Scanner(System.in);
    private static final String EXCHANGE_NAME = "topic_logs";
    public static final String[]  queueNames = new String[]{"q1","q2"};
    public static final String[]  routingKeys = new String[]{"*.orange.*","a.#"};


    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            //创建一个topic类型的交换机
            /**
             * topic类型交换机的路由键是支持通配符(类Ant格式)的,一个单词有多个字符,单词与单词之间以点(.)分割
             *  * :匹配一个单词
             *  # :匹配零个或多个单词
             *
             *  e.g:
             *  *.orange.* :匹配a.orange.b,也匹配abc.orange.abc,但不匹配aorangeb
             *  a.# :匹配a.b.c,也匹配a.b,也匹配a,但不匹配ab
             */
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            for (int i = 0; i < queueNames.length; i++) {
                String queueName = queueNames[i];
                String routingKey = routingKeys[i];
                //创建信息队列
                channel.queueDeclare(queueName, false, false, false, null);
                //将队列绑定到交换机
                channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
            }

            System.out.println("请输入路由键:");
            while (input.hasNext()) {
                String routingKey = input.nextLine();
                System.out.println("请输入消息:");
                String message = input.nextLine();
                //生产信息
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
                System.out.println("请输入路由键:");
            }

        }
    }
}

消费者代码

消费者通过信息队列的名称进行信息消费即可

package space.anyi.rabbitMQ_learn.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * @ProjectName: RabbitMQ-learn
 * @FileName: ReceiveLogsTopic
 * @Author: 杨逸
 * @Data:2025/10/9 10:05
 * @Description:
 */

public class ReceiveLogsTopic {

    private static final String EXCHANGE_NAME = "topic_logs";
    public static final String[]  queueNames = new String[]{"q1","q2"};

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //获取信息队列的名称
        //String queueName = channel.queueDeclare().getQueue();

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        //消费信息
        for (String queueName : queueNames) {
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        }
    }
}

核心特性

信息确认机制

信息过期机制

官方文档

场景:有些信息的时效比较高,一段时间不处理后可能就不需要处理了,

  • 给信息队列指定过期时间(信息队列里的每一条信息都有固定的过期时间)

  • 在创建信息队列时指定过期时间

    //信息队列的额外配置参数
    Map<String, Object> arguments = new HashMap<String, Object>();
    //设置队列中的消息的过期时间为五秒钟
    arguments.put("x-message-ttl",5*1000);
    //创建信息队列时,通过arguments设置队列的过期时间
    channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
    

信息在过期时间过后仍没有被消费,信息就会被丢弃

信息被读取,但未确认,在信息过期时间到后不会被丢弃

  • 在信息发布时给信息指定过期时间
//创建一个信息的额外配置,设置消息的过期时间为十秒钟
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(10*1000+"").build();
//在发布信息时给特定信息指定过期时间
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());

给信息队列设置的过期时间对信息队列中的每一条信息都生效

给信息设置的过期时间只对这条信息生效

package space.anyi.rabbitMQ_learn.ttl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @ProjectName: RabbitMQ-learn
 * @FileName: Producer
 * @Author: 杨逸
 * @Data:2025/10/9 16:40
 * @Description:
 */
public class Producer {
    private final static String QUEUE_NAME = "ttl_queue";
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //信息队列的额外配置参数
            Map<String, Object> arguments = new HashMap<String, Object>();
            //设置队列中的消息的过期时间为五秒钟
            arguments.put("x-message-ttl",5*1000);
            //创建信息队列时,通过arguments设置队列的过期时间
            channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
            String message = "Hello World!";
            //创建一个信息的额外配置,设置消息的过期时间为十秒钟
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .expiration(10*1000+"")
                    .build();
            //在发布信息时给特定信息指定过期时间
            channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            /**
             * 总结:
             * 给信息队列设置的过期时间对信息队列中的每一条信息都生效
             * 给信息设置的过期时间只对这条信息生效
             */
        }
    }
}

死信队列

官方文档

死信:因为各种原因导致无法被处理的信息被称为死信,比如信息过期,信息消费失败又没有重新放到信息队列,信息队列满了信息放不进去

死信队列:保存死信的队列,就是个普通的信息队列,只是保存的是死信

死信交换机:将死信转发到死信队列的交换机,也是一个普通的交换机

死信队列是为了给系统提供的一种容错机制,让无法被处理的信息,也有机会可以被处理,防止信息的丢失

  • 死信的处理方法
    1. 给信息队列绑定一个出现死信使用的交换机exchange
    2. 给信息队列绑定一个出现死信使用的路由键routingKey
//创建工作交换机
channel.exchangeDeclare(WORK_EXCHANGE, "direct");
//1.创建死信交换机
channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, "direct");

//2.创建死信队列
String dlx_name1 = "dead_letter_queue1";
channel.queueDeclare(dlx_name1, false, false, false, null);
//3.死信队列绑定死信交换机
String routingKey1 = "key1";
channel.queueBind(dlx_name1, DEAD_LETTER_EXCHANGE, routingKey1);

//死信交换机配置
Map<String,Object> arguments1 = new HashMap<>();
/**
* 通过x-dead-letter-exchange参数指定死信交换机(exchange)的名称
* 通过x-dead-letter-routing-key参数指定死信转发的路由键(routingKey)
*/
arguments1.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
arguments1.put("x-dead-letter-routing-key", routingKey1);
//4.创建工作信息队列,通过arguments参数配置死信交换机
channel.queueDeclare(WORK_QUEUE1, false, false, false, arguments1);
//工作队列绑定工作交换机
channel.queueBind(WORK_QUEUE1, WORK_EXCHANGE, routingKey1);
  • 生产者代码
package space.anyi.rabbitMQ_learn.dead_letter;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.Scanner;
/**
 * @ProjectName: RabbitMQ-learn
 * @FileName: Producer
 * @Author: 杨逸
 * @Data:2025/10/9 18:39
 * @Description: 死信队列机制
 * 工作交换机将通过路由键转发到工作信息队列
 * 当工作队列产生死信时,死信交换机通过路由键转发到死信队列
 */
public class Producer {
    public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
    public static final String WORK_EXCHANGE = "work_exchange";
    public static final String WORK_QUEUE1 = "work_queue1";
    public static final String WORK_QUEUE2 = "work_queue2";
    public static final Scanner input = new Scanner(System.in);
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel();) {
            //创建工作交换机
            channel.exchangeDeclare(WORK_EXCHANGE, "direct");
            //创建死信交换机
            channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, "direct");

            //创建死信队列
            String dlx_name1 = "dead_letter_queue1";
            String dlx_name2 = "dead_letter_queue2";
            channel.queueDeclare(dlx_name1, false, false, false, null);
            channel.queueDeclare(dlx_name2, false, false, false, null);
            //死信队列绑定死信交换机
            String routingKey1 = "key1";
            String routingKey2 = "key2";
            channel.queueBind(dlx_name1, DEAD_LETTER_EXCHANGE, routingKey1);
            channel.queueBind(dlx_name2, DEAD_LETTER_EXCHANGE, routingKey2);

            //死信交换机配置
            Map<String,Object> arguments1 = new HashMap<>();
            Map<String,Object> arguments2 = new HashMap<>();
            /**
             * 通过x-dead-letter-exchange参数指定死信交换机(exchange)的名称
             * 通过x-dead-letter-routing-key参数指定死信转发的路由键(routingKey)
             */
            arguments1.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
            arguments1.put("x-dead-letter-routing-key", routingKey1);
            arguments2.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
            arguments2.put("x-dead-letter-routing-key", routingKey2);
            //创建工作信息队列,通过arguments参数配置死信交换机
            channel.queueDeclare(WORK_QUEUE1, false, false, false, arguments1);
            channel.queueDeclare(WORK_QUEUE2, false, false, false, arguments2);
            //工作队列绑定工作交换机
            channel.queueBind(WORK_QUEUE1, WORK_EXCHANGE, routingKey1);
            channel.queueBind(WORK_QUEUE2, WORK_EXCHANGE, routingKey2);

            System.out.println("请输入路由键:");
            while (input.hasNext()) {
                String routingKey = input.nextLine();
                System.out.println("请输入信息:");
                String message = input.nextLine();
                //发送信息
                channel.basicPublish(WORK_EXCHANGE,routingKey,null,message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
                System.out.println("请输入路由键:");
            }
        }
    }
}

  • 消费者代码
package space.anyi.rabbitMQ_learn.dead_letter;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * @ProjectName: RabbitMQ-learn
 * @FileName: Consumer
 * @Author: 杨逸
 * @Data:2025/10/9 18:59
 * @Description:
 * 消费者拒绝工作队列的信息模拟死信的产生
 * 模拟其他程序消费死信队列的信息
 */
public class Consumer {
    public static final String WORK_QUEUE1 = "work_queue1";
    public static final String WORK_QUEUE2 = "work_queue2";
    public static final String DLX_QUEUE_NAME1 = "dead_letter_queue1";
    public static final String DLX_QUEUE_NAME2 = "dead_letter_queue2";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        //工作信息的处理
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            String routingKey = delivery.getEnvelope().getRoutingKey();
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            //拒绝信息,模拟死信的产生
            channel.basicNack(deliveryTag,false,false);
            System.out.println(" [worker] Received '" + routingKey + "':'" + message + "已拒绝'");
        };
        //死信队列的处理
        DeliverCallback deliverCallback1 = (consumerTag,delivery)->{
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            String routingKey = delivery.getEnvelope().getRoutingKey();
            System.out.println(" [dlx1] Received '" + routingKey + "':'" + message + "'");
        };
        DeliverCallback deliverCallback2 = (consumerTag,delivery)->{
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            String routingKey = delivery.getEnvelope().getRoutingKey();
            System.out.println(" [dlx2] Received '" + routingKey + "':'" + message + "'");
        };
        //消费工作信息
        channel.basicConsume(WORK_QUEUE1,false,deliverCallback, consumerTag->{});
        channel.basicConsume(WORK_QUEUE2,false,deliverCallback, consumerTag->{});
        //消费死信队列的信息
        channel.basicConsume(DLX_QUEUE_NAME1,true,deliverCallback1,consumerTag->{});
        channel.basicConsume(DLX_QUEUE_NAME2,true,deliverCallback2,consumerTag->{});
    }
}

RabbitMQ常用API

  • 常规操作
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.配置
//设置RabbitMQ服务端的主机地址
factory.setHost("localhost");
//实际生产中要配置用户名和密码等等
factory.setUsername("");
factory.setPassword("");
//3.创建连接,并拿到channel
//我们通过channel来操作RabbitMQ	
Connection connection = factory.newConnection();
Channel channel = connection.createChannel())
  • 创建交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//方法原型
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
  • 创建信息队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//方法原型
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
  • 信息队列绑定交换机
channel.queueBind(queue1, EXCHANGE_NAME, "");
//方法原型
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
  • 发布信息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
//方法原型
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
  • 消费信息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
//方法原型
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
  • 给信息队列设置额外的参数
//信息队列的额外配置参数
Map<String, Object> arguments = new HashMap<String, Object>();
//设置队列中的消息的过期时间为五秒钟
arguments.put("x-message-ttl",5*1000);
//创建信息队列时,通过arguments设置队列的过期时间
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
  • 给信息设置额外的参数
//创建一个信息的额外配置,设置消息的过期时间为十秒钟
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(10*1000+"").build();
//在发布信息时给特定信息指定过期时间
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
  • 信息消费的处理回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };

RabbitMQ集成到项目中

通过springboot starter集成到项目中

导入依赖

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.7.16</version>
</dependency>

配置RabbitMQ

spring:
	rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest

自定义信息生产者

package space.anyi.BI.mq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @ProjectName: serve
 * @FileName: MessageProducer
 * @Author: 杨逸
 * @Data:2025/10/9 20:17
 * @Description: 信息生产者
 */
@Component
public class MessageProducer {
    //通过RabbitTemplate操作RabbitMQ
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送信息
     * @param exchange 交换机
     * @param routingKey 路由键
     * @param message 信息
     * @description: 
     * @author: 杨逸
     * @data:2025/10/09 20:19:06
     * @since 1.0.0
     */
    public void sendMessage(String exchange,String routingKey,String message) {
        //发送信息
        rabbitTemplate.convertAndSend(exchange,routingKey,message);
    }
}

自定义信息消费者

package space.anyi.BI.mq;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * @ProjectName: serve
 * @FileName: MessageConsumer
 * @Author: 杨逸
 * @Data:2025/10/9 20:22
 * @Description: 信息消费者
 */
@Component
public class MessageConsumer {
    /**
     * @param message 信息
     * @param channel 通道
     * @param deliverTag 消息标签
     * @description: 
     * @author: 杨逸
     * @data:2025/10/09 20:29:40
     * @since 1.0.0
     */
    //接收信息
    @RabbitListener(queues = {"queue_name"},ackMode = "MANUAL")
    public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliverTag) {
        //消费信息的代码
    }
}

RabbitMQ的重要特性

面试考点

  1. 消息队列的概念,模型,应用场景
  2. 交换机的类别,路由的绑定关系
  3. 信息可靠性
    1. 信息确认机制(ack,nack,reject)
    2. 信息持久化(durable)
    3. 信息过期机制
    4. 死信队列
  4. 延迟队列(类似死信队列)
  5. 顺序消费,消费幂等性
  6. 可扩展性
    1. 集群
    2. 故障恢复机制
    3. 镜像
  7. 运维监控告警