消息队列

简介

RABBITMQ、KAFKA、ACTIVEMQ、ROCKETMQ、PULSAR。消息队列中间件有很多,在此学习RabbitMQ。

RabbitMQ

先学习RabbitMQ,比较经典。

安装RabbitMQ

直接上Docker好了

docker pull rabbitmq:latest
docker pull rabbitmq:4.0-management
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management

不出意外就会启动成功,前者rabbitmq是正宗rabbitmq服务端口后面的management的端口15672是管理端web网页。

带持久化存储的 设置账户密码的运行方式

docker run -d --name rabbitmq \
    -p 5672:5672 -p 15672:15672 \
    -v rabbitmq_data:/var/lib/rabbitmq \
    -v rabbitmq_log:/var/log/rabbitmq \
    -e RABBITMQ_DEFAULT_USER=admin \
    -e RABBITMQ_DEFAULT_PASS=password \
    rabbitmq:4.0-management

Docker命名卷 rabbitmq_log 与 rabbitmq_data

root@kTY-HK3-QL-86139:/home/root/note# ls /var/lib/docker/volumes/
backingFsBlockDev  metadata.db  rabbitmq_data  rabbitmq_log
root@kTY-HK3-QL-86139:/home/root/note# ls /var/lib/docker/volumes/
backingFsBlockDev  metadata.db  rabbitmq_data  rabbitmq_log
root@kTY-HK3-QL-86139:/home/root/note# docker volume ls
DRIVER    VOLUME NAME
local     rabbitmq_data
local     rabbitmq_log
root@kTY-HK3-QL-86139:/home/root/note# docker volume inspect   rabbitmq_data
[
    {
        "CreatedAt": "2025-03-20T06:44:37Z",
        "Driver": "local",
        "Labels": null,
        "Mountpoint": "/var/lib/docker/volumes/rabbitmq_data/_data",
        "Name": "rabbitmq_data",
        "Options": null,
        "Scope": "local"
    }
]
root@kTY-HK3-QL-86139:/home/root/note# docker volume inspect   rabbitmq_log
[
    {
        "CreatedAt": "2025-03-20T06:44:37Z",
        "Driver": "local",
        "Labels": null,
        "Mountpoint": "/var/lib/docker/volumes/rabbitmq_log/_data",
        "Name": "rabbitmq_log",
        "Options": null,
        "Scope": "local"
    }
]

容器内部管理命令

docker exec -it rabbitmq bash
# 进入容器后可以执行以下命令
rabbitmqctl list_queues
rabbitmqctl list_exchanges
rabbitmqctl list_users
rabbitmqctl add_user newuser newpassword
rabbitmqctl set_user_tags newuser administrator
rabbitmqctl set_permissions -p / newuser ".*" ".*" ".*"

常用Docker命令

# 停止RabbitMQ容器
docker stop rabbitmq
# 启动已有的RabbitMQ容器
docker start rabbitmq
# 查看容器日志
docker logs rabbitmq

# 进入容器内部
docker exec -it rabbitmq bash

# 查看本地镜像
docker images
# 查看本地镜像
docker image ls
# 只查看特定仓库的镜像
docker images rabbitmq
# 查看特定标签的镜像
docker images rabbitmq:3-management
# 使用过滤器查询
docker images --filter "reference=rabbitmq*"
# 查看镜像详情
docker image inspect rabbitmq:3-management
# 获取镜像ID
docker images -q rabbitmq:3-management
# 查看镜像构建历史
docker history rabbitmq:3-management
# 删除指定镜像
docker rmi rabbitmq:3-management
# 删除所有未使用的镜像
docker image prune
# 删除所有未使用的镜像(不询问)
docker image prune -a -f

# 查看所有正在运行的容器
docker ps
# 查看所有容器(包括已停止的)
docker ps -a
# 查看最近创建的N个容器
docker ps -n 5
# 只显示容器ID(适用于脚本中)
docker ps -q
# 所有容器的ID(包括停止的)
docker ps -aq
# 自定义输出格式
docker ps --format "{{.Names}}: {{.Status}}"
# 表格格式输出
docker ps --format "table {{.ID}}\t{{.Names}}\t{{.Status}}"
# 按名称过滤
docker ps -a --filter "name=rabbitmq"
# 按状态过滤(running, exited, restarting等)
docker ps --filter "status=running"
# 按镜像过滤
docker ps --filter "ancestor=rabbitmq:3-management"
# 多条件过滤
docker ps --filter "status=exited" --filter "exited=1"
# 查看容器详细配置
docker inspect rabbitmq
# 查询特定信息
docker inspect --format='{{.NetworkSettings.IPAddress}}' rabbitmq
# 查看容器资源使用统计
docker stats
# 查看特定容器资源使用
docker stats rabbitmq
# 查看容器日志
docker logs rabbitmq
# 查看最近的日志
docker logs --tail 100 rabbitmq
# 实时查看日志
docker logs -f rabbitmq
# 查看容器内正在运行的进程
docker top rabbitmq
# 删除容器
docker rm rabbitmq

端口说明

访问管理界面

浏览器打开 htto://localhost:15672

RabbitMQ文档

看官方文档最靠谱

Tutorials:

https://www.rabbitmq.com/tutorials

Docs:

https://www.rabbitmq.com/docs/

开始入门

主要有两部分

使用Node.js

npm install amqplib

Queue tutorials

“hello world!”

The simplest thing that does something

.

在本教程的这一部分中,我们将使用Javascript编写两个小型程序;一个 发送单个消息的生产者,以及接收的消费者 消息并将其打印出来。

// send.js
var amqp = require('amqplib/callback_api');
// 连接到服务器,添加用户名和密码
amqp.connect('amqp://user:password@IP', function (error0, connection) {
    if (error0) {
        throw error0;
    }
    // 创建一个通道
    connection.createChannel(function (error1, channel) {
        if (error1) {
            throw error1;
        }
        // 定义一个队列
        var queue = 'hello';
        var msg = 'Hello World!';
        // 发送消息到队列
        channel.assertQueue(queue, {
            durable: false
        });
        // for (let i = 0; i < 100; i++) {
        channel.sendToQueue(queue, Buffer.from(msg));
        console.log(" [x] Sent %s", msg);
        // }
    });

    setTimeout(function () {
        connection.close();
        process.exit(0)
    }, 500);
});
// receive.js
var amqp = require('amqplib/callback_api');

amqp.connect('amqp://user:password@IP', function (error0, connection) {
    if (error0) {
        throw error0;
    }
    connection.createChannel(function (error1, channel) {
        if (error1) {
            throw error1;
        }
        var queue = 'hello';

        channel.assertQueue(queue, {
            durable: false
        });

        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
        channel.consume(queue, function (msg) {
            console.log(" [x] Received %s", msg.content.toString());
        }, {
            noAck: true
        });
    });
});

Connection、Channel、Queue

  1. Connection 客户端与 RabbitMQ 服务器的物理连接。一个连接可以包含多个通道(channel),以便复用连接资源。
  2. Channel channel 是基于 connection 的虚拟通信通道。通常建议为每个线程或任务创建一个独立的通道,而不是为每个任务创建一个新的连接。
  3. Queue 队列可以配置为持久化(durable)或非持久化(non-durable)。队列的名称是唯一标识,消费者和生产者通过队列名称进行交互。

channel.asserQueue 是RabbitMQ中用于声明或检查队列的一个方法,它的作用是确保队列存在,如果队列不存在,则创建一个新队列。

channel.assertQueue(queue, [options], [callback]);

queue:队列的名称(字符串)。如果传入空字符串,RabbitMQ 会创建一个随机名称的队列。

options:可选参数,用于配置队列属性。

durable:是否持久化队列(默认值为 false)。持久化队列在 RabbitMQ 重启后仍然存在。

exclusive:是否为独占队列(默认值为 false)。独占队列只能由声明它的连接使用,并在连接关闭时自动删除。

autoDelete:是否自动删除队列(默认值为 false)。当队列不再被使用时自动删除。

callback:可选回调函数,返回队列的状态信息。

Work Queues

Distributing tasks among workers (the competing consumers pattern)

.
// send.js
var amqp = require('amqplib/callback_api');
// 连接到服务器,添加用户名和密码
amqp.connect('amqp://user:password@IP', function (error0, connection) {
    if (error0) {
        throw error0;
    }
    // 创建一个通道
    connection.createChannel(function (error1, channel) {
        if (error1) {
            throw error1;
        }
        // 定义一个队列
        var queue = 'hello';
        var msg = 'Hello World!';
        // 发送消息到队列
        channel.assertQueue(queue, {
            durable: false
        });
        channel.sendToQueue(queue, Buffer.from(msg),
            {
                persistent: true//持久化消息 前提是队列也是持久化的
            });
        console.log(" [x] Sent %s", msg);
    });

    setTimeout(function () {
        connection.close();
        process.exit(0)
    }, 500);
});
// receive.js
var amqp = require('amqplib/callback_api');

amqp.connect('amqp://user:password@IP', function (error0, connection) {
    if (error0) {
        throw error0;
    }
    connection.createChannel(function (error1, channel) {
        if (error1) {
            throw error1;
        }
        var queue = 'hello';

        channel.assertQueue(queue, {
            durable: false
        });

        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
        channel.consume(queue, function (msg) {
            console.log(" [x] Received %s", msg.content.toString());
            // 模拟耗时任务
            setTimeout(function () {
                console.log(" [x] Done");
            }, 1000);
        }, {
            noAck: true // 手动消费者确认已关闭
        });
    });
});
  1. 循环调度
node receive.js // 开启两个消费者
node receive.js // 开启两个消费者

默认情况下,RabbitMQ会将每条消息发送给下一个消费者, 依次。平均而言,每个消费者将获得相同数量的 讯息。这种分发消息的方式称为循环。尝试 与三个或更多工人一起出去。

执行任务可能需要几秒钟,您可能想知道如果 消费者开始一项艰巨的任务,并在完成之前终止。 使用我们当前的代码,一旦RabbitMQ向消费者传递消息, 立即将其标记为删除。在这种情况下,如果您终止工人, 您丢失了刚刚处理的消息。已发送的消息 对于这个特定的工人,但尚未处理,也会丢失。直接本来就是TCP通信的。

ack 由 消费者告诉RabbitMQ已收到特定消息, 已处理,并且RabbitMQ可以自由删除它。消费者交付确认时将执行超时(默认情况下为30分钟)。

// 手动确认ack
var amqp = require('amqplib/callback_api');

amqp.connect('amqp://user:password@IP', function (error0, connection) {
    if (error0) {
        throw error0;
    }
    connection.createChannel(function (error1, channel) {
        if (error1) {
            throw error1;
        }
        var queue = 'hello';

        channel.assertQueue(queue, {
            durable: false
        });

        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
        channel.consume(queue, function (msg) {
            console.log(" [x] Received %s", msg.content.toString());
            // 模拟耗时任务
            setTimeout(function () {
                console.log(" [x] Done");
                channel.ack(msg);
            }, 5000);
        }, {
            noAck: false // 启动手动ack
        });
    });
});

比如将消息发给了worker1,worker1在收到msg后,处理5s期间,connection与服务器断开了,也就是不可能ack了,服务器会立即把msg再发给worker2处理,如果没有worker可用则等到有worker可用时msg再发向消费者。可能造成内存积压,忘记手动ack是很严重的事。

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
命令作用
* rabbitmqctl list_queues:列出 RabbitMQ 中所有队列的信息。
* name:显示队列的名称。
* messages_ready:显示队列中准备好被消费者消费的消息数量。
* messages_unacknowledged:显示队列中已被消费者接收但尚未确认的消息数量。
  1. 持久化
// 确保队列在RabbitMQ节点重新启动后仍然有效
channel.assertQueue('hello', {durable: true});
// RabbitMQ不允许您重新定义现有队列,因为前面已经创建了 durable为false的队列, 具有不同的参数 如需要则再创建一个新的队列
channel.assertQueue('task_queue', {durable: true});
// 消息持久化
channel.sendToQueue(queue, Buffer.from(msg), {persistent: true});

将消息标记为持久性并不能完全保证该消息 不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘, RabbitMQ接受消息后,还有一个短时间窗口, 尚未保存。而且,RabbitMQ不会 fsync() 每个消息-它可能只是保存到缓存中,而没有真正写入磁盘。持久性保证并不强,但绰绰有余 为我们的简单任务队列。

  1. prefetch 可以告诉RabbitMQ不要付出超过一次给Worker的消息
.
var amqp = require('amqplib/callback_api');

amqp.connect('amqp://user:password@IP', function (error0, connection) {
    if (error0) {
        throw error0;
    }
    connection.createChannel(function (error1, channel) {
        if (error1) {
            throw error1;
        }
        var queue = 'hello';

        channel.assertQueue(queue, {
            durable: false
        });

        // 告诉服务器消息一个一个来  没确认就不会发下一个
        channel.prefetch(1);

        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
        channel.consume(queue, function (msg) {
            console.log(" [x] Received %s", msg.content.toString());
            // 模拟耗时任务
            setTimeout(function () {
                console.log(" [x] Done");
                channel.ack(msg);
            }, 5000);
        }, {
            noAck: false // 启动手动ack
        });
    });
});

Publish/Subscribe

Sending messages to many consumers at once

.

生产者发送消息的过程

  1. 连接到 RabbitMQ 服务器:
  2. 声明交换机(Exchange):
  3. 发送消息到交换机:

消费者接受消息的过程

  1. 连接到 RabbitMQ 服务器:
  2. 声明交换机(Exchange):
  3. 声明队列(Queue):
  4. 绑定队列到交换机:
  5. 消费消息:

如果交换机没有绑定任何队列,生产者向交换机发送的消息会被丢弃。这是 RabbitMQ 的默认行为。

channel.publish(exchange, routingKey, content, [options]);
exchange(字符串):
    指定消息要发送到的交换机名称。
    如果使用默认交换机,可以传空字符串('')。
routingKey(字符串):
    指定路由键,用于决定消息的路由方式。
    在 fanout 类型的交换机中,路由键会被忽略。
content(Buffer):
    消息的内容,必须是一个 Buffer 对象。
    例如:Buffer.from('Hello World!')。
options(可选对象):
    用于设置消息的属性,例如持久化、优先级等。
    常用选项:
    persistent(布尔值):是否将消息标记为持久化(默认 false)。
    mandatory(布尔值):如果消息无法路由到队列,是否返回给生产者(默认 false)。
    expiration(字符串):消息的过期时间(以毫秒为单位)。
    headers(对象):自定义消息头。

生产者

// send.js
var amqp = require('amqplib/callback_api');
// 连接到服务器
amqp.connect('amqp://username:password@IP', function (error0, connection) {
    if (error0) {
        throw error0;
    }
    // 创建一个通道
    connection.createChannel(function (error1, channel) {
        if (error1) {
            throw error1;
        }
        let exchange = 'logs';
        // 声明交换机
        channel.assertExchange(exchange, 'fanout', {
            durable: false // 非持久化的 重启后交换机将消失
        });

        let msg = 'Hello World!';
        channel.publish(exchange, '', Buffer.from(msg));
        console.log(" [x] Sent %s", msg);
    });

    setTimeout(function () {
        connection.close();
        process.exit(0)
    }, 500);
});

消费者

// receive.js
var amqp = require('amqplib/callback_api');

amqp.connect('amqp://username:password@IP', function (error0, connection) {
    if (error0) {
        throw error0;
    }
    connection.createChannel(function (error1, channel) {
        if (error1) {
            throw error1;
        }

        // 声明交换机
        let exchange = 'logs';
        channel.assertExchange(exchange, 'fanout', {
            durable: false
        });

        // 声明队列 起一个参数为队列名 因为使用临时队列则不用指定
        channel.assertQueue('', {
            exclusive: true // 临时队列,连接断开队列会自动删除
        }, function (error2, q) {
            if (error2) {
                throw error2;
            }
            console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
            // 绑定交换机和队列
            channel.bindQueue(q.queue, exchange, '');
            channel.consume(q.queue, function (msg) {
                if (msg.content) {
                    console.log(" [x] %s", msg.content.toString());
                }
            }, { noAck: true });
        });
    });
});

启动多个receive,用send发消息,会发现每个receive都会收到相同的消息。

Routing

Receiving messages selectively

.

在上面的Publish/Subscribe模式中,交换机使用了fanout方式,下面将会使用direct交换机方式,当消息的路由key与队列绑定的路由key完全匹配时才发向队列。

// send.js
var amqp = require('amqplib/callback_api');
// 连接到服务器
amqp.connect('amqp://username:password@IP', function (error0, connection) {
    if (error0) {
        throw error0;
    }
    // 创建一个通道
    connection.createChannel(function (error1, channel) {
        if (error1) {
            throw error1;
        }
        let exchange = "direct_logs";
        let args = process.argv.slice(2);
        let msg = args.slice(1).join(' ') || 'Hello World!';
        let severity = (args.length > 0) ? args[0] : 'info'; // 路由键

        // 声明direct交换机
        channel.assertExchange(exchange, 'direct', {
            durable: false // 非持久化的 重启后交换机将消失
        });

        channel.publish(exchange, severity, Buffer.from(msg));

        console.log(" [x] Sent %s: %s", severity, msg);
    });

    setTimeout(function () {
        connection.close();
        process.exit(0)
    }, 500);
});
// receive.js
var amqp = require('amqplib/callback_api');

let args = process.argv.slice(2);
if (args.length == 0) {
    console.log("Usage: receive.js <exchange>");
    process.exit(1);
}

amqp.connect('amqp://username:password@IP', function (error0, connection) {
    if (error0) {
        throw error0;
    }

    connection.createChannel(function (error1, channel) {
        if (error1) {
            throw error1;
        }

        let exchange = 'direct_logs';

        // 声明交换机
        channel.assertExchange(exchange, 'direct', {
            durable: false
        });

        // 声明队列
        channel.assertQueue('', {
            exclusive: true//临时队列,连接断开队列会自动删除
        }, function (error2, q) {
            if (error2) {
                throw error2;
            }
            console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);

            args.forEach(function (severity) {
                // 绑定交换机和队列
                channel.bindQueue(q.queue, exchange, severity);
            });

            channel.consume(q.queue, function (msg) {
                if (msg.content) {
                    console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
                }
            }, { noAck: true });
        });
    });
});

测试方法

# 启动两个 receive
# receive1
node receive.js warning error
# receive2
node receive.js info warning error
# receive1 receive2 都可以都到
node send.js error "scdscsdcs"
# receive1 receive2 都可以都到
node send.js warning "scdscsdcs"
# receive2 都可以都到
node send.js info "scdscsdcs"

Topics

Receiving messages based on a pattern (topics)

.

和Routing类似,只不过可以进行模式匹配

.
  1. quick.orange.rabbit 将会发向Q1 Q2
  2. lazy.orange.elephant 将会发向Q1 Q2
  3. quick.orange.fox 仅发向Q1
  4. lazy.brown.fox 仅发向Q2
  5. lazy.pink.rabbit 虽然满足Q2两次但仍只会发向Q2一次
  6. quick.brown.fox 将被抛弃

消息不匹配任何绑定,将会丢失。

主题交换功能强大,可以像其他交换一样运作。当队列绑定到 #(哈希)绑定键-它将接收 所有消息,无论路由密钥如何-例如 fanout 交换。当特殊字符 *(星)和 #(哈希)不用于绑定, 主题交换的行为就像 direct 一。

// send.js
var amqp = require('amqplib/callback_api');
// 连接到服务器
amqp.connect('amqp://username:password@IP', function (error0, connection) {
    if (error0) {
        throw error0;
    }
    // 创建一个通道
    connection.createChannel(function (error1, channel) {
        if (error1) {
            throw error1;
        }
        let exchange = "topic_logs";
        let args = process.argv.slice(2);
        let key = (args.length > 0) ? args[0] : 'anonymous.info';
        let msg = args.slice(1).join(' ') || 'Hello World!';

        // 声明topic交换机
        channel.assertExchange(exchange, 'topic', {
            durable: false // 非持久化的 重启后交换机将消失
        });

        channel.publish(exchange, key, Buffer.from(msg));

        console.log(" [x] Sent %s: %s", key, msg);
    });

    setTimeout(function () {
        connection.close();
        process.exit(0)
    }, 500);
});
// receive.js
var amqp = require('amqplib/callback_api');

let args = process.argv.slice(2);
if (args.length == 0) {
    console.log("Usage: receive.js <facility>.<severity>");
    process.exit(1);
}

amqp.connect('amqp://username:password@IP', function (error0, connection) {
    if (error0) {
        throw error0;
    }

    connection.createChannel(function (error1, channel) {
        if (error1) {
            throw error1;
        }

        let exchange = 'topic_logs';

        // 声明topic交换机
        channel.assertExchange(exchange, 'topic', {
            durable: false
        });

        // 声明队列
        channel.assertQueue('', {
            exclusive: true//临时队列,连接断开队列会自动删除
        }, function (error2, q) {
            if (error2) {
                throw error2;
            }
            console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);

            args.forEach(function (key) {
                // 绑定交换机和队列
                channel.bindQueue(q.queue, exchange, key);
            });

            channel.consume(q.queue, function (msg) {
                if (msg.content) {
                    console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
                }
            }, { noAck: true });
        });
    });
});

使用样例

node receive.js "#"
node receive.js "kern.*"
node send.js "kern.cds" "cdscs"
node send.js "kern." "cdscs"

RPC

Request/reply pattern example

.

其实背后操作方式就是搞两个队列,一个用于A给B发消息,一个用于B给A发消息。

.
channel.assertQueue('', {
    exclusive: true
});
channel.sendToQueue('rpc_queue', Buffer.from('10'), {
    replyTo: queue_name
});
// rpcserver.js
#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://username:password@IP', function (error0, connection) {
    if (error0) {
        throw error0;
    }
    connection.createChannel(function (error1, channel) {
        if (error1) {
            throw error1;
        }
        var queue = 'rpc_queue';

        channel.assertQueue(queue, {
            durable: false
        });

        channel.prefetch(1); // 消息一个一个接受处理
        console.log(' [x] Awaiting RPC requests');
        channel.consume(queue, function reply(msg) {
            var n = parseInt(msg.content.toString());

            console.log(" [.] fib(%d)", n);

            var r = fibonacci(n);

            // 使用客户端提供的队列名,将结果回给客户端
            channel.sendToQueue(msg.properties.replyTo,
                Buffer.from(r.toString()), {
                correlationId: msg.properties.correlationId // correlationId用客户端提供的
            });

            channel.ack(msg);
        }, {
            noAck: false
        });
    });
});

function fibonacci(n) {
    if (n == 0 || n == 1)
        return n;
    else
        return fibonacci(n - 1) + fibonacci(n - 2);
}
// rpcclient.js
let amqp = require('amqplib/callback_api');

let args = process.argv.slice(2);

if (args.length == 0) {
    console.log("Usage: rpc_client.js num")
    process.exit(1);
}

amqp.connect('amqp://username:password@IP', function (error0, connection) {
    if (error0) {
        throw error0;
    }
    connection.createChannel(function (error1, channel) {
        if (error1) {
            throw error1;
        }
        channel.assertQueue('', {
            exclusive: true
        }, function (error2, q) {
            if (error2) {
                throw error2;
            }
            let correlationId = generateUuid();
            let num = parseInt(args[0]);
            console.log(' [x] Requesting fib(%d)', num);

            // 接收回包
            channel.consume(q.queue, function (msg) {
                if (msg.properties.correlationId == correlationId) {
                    console.log(' [.] Got %s', msg.content.toString());
                    setTimeout(function () {
                        connection.close();
                        process.exit(0)
                    }, 500);
                }
            }, {
                noAck: true
            });

            channel.sendToQueue('rpc_queue',
                Buffer.from(num.toString()), {
                correlationId: correlationId,
                replyTo: q.queue
            });
        });
    });
});

function generateUuid() {
    return Math.random().toString() +
        Math.random().toString() +
        Math.random().toString();
}

Publisher Confirms

Reliable publishing with publisher confirms

RabbitMQ 官方文档中的 Publisher Confirms 样例主要讲解了如何实现可靠的消息发布机制。Publisher Confirms 是一种机制,用于确保消息从生产者成功发布到 RabbitMQ 服务器的队列中。通过这种机制,生产者可以确认消息是否被 RabbitMQ 正确接收并持久化。因为TCP我们不知道发送的内容是否已经到达目的地。

样例内容

  1. Publisher Confirms的作用
  2. 实现方式:
const amqp = require('amqplib');

async function publishMessages() {
    try {
        const connection = await amqp.connect("amqp://username:password@IP");
        const channel = await connection.createConfirmChannel();

        const queue = 'confirm_queue';
        const message = 'Hello, Publisher Confirms!';

        // 声明队列
        await channel.assertQueue(queue, { durable: true });

        // 发送消息并等待确认
        channel.sendToQueue(queue, Buffer.from(message), { persistent: true }, (err, ok) => {
            if (err) {
                console.error('Message nacked!');
            } else {
                console.log('Message acked');
            }
        });

        // 连接关闭
        setTimeout(() => {
            connection.close();
        }, 500);
    } catch (error) {
        console.error('Error: ', error);
    }
}

publishMessages();

Stream tutorials

使用RabbitMQ的Stream服务

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672 \
-v rabbitmq_data:/var/lib/rabbitmq \
-v rabbitmq_log:/var/log/rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:4-management

通过设置 advertised_host 为 localhost,客户端可以通过 localhost:5552 连接到 Stream 服务,而不会因为主机名解析问题导致连接失败。

待服务器启动,然后启用流和流管理插件

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management 

RabbitMQ 的 Stream 概念

RabbitMQ 的 Stream 是一种高吞吐量、低延迟的消息流处理机制,专为处理大规模数据流的场景设计。它与传统的 RabbitMQ 队列不同,Stream 提供了类似于 Kafka 的功能,支持消息的顺序消费、分区、持久化以及高效的消息存储和检索。

Stream 的核心特点

  1. 高吞吐量:

  2. 顺序消费:

  3. 分区(Partitioning):

  4. 持久化:

  5. 灵活的消费模式:

  6. 高效存储:

  7. 与 RabbitMQ 的无缝集成:

为什么设计 Stream 的概念?

RabbitMQ 设计 Stream 的概念,主要是为了满足现代分布式系统中对高吞吐量、低延迟消息流处理的需求,同时弥补传统队列模型的不足。

  1. 传统队列的局限性
  1. 满足现代数据流处理需求
  1. 与 Kafka 等流处理系统竞争

Stream 的典型应用场景

  1. 日志处理:
  2. 事件流处理:
  3. 物联网(IoT):
  4. 数据分析:
  5. 消息重放:

Stream “Hello World!”

The simplest thing that does something

.
// send.js
const rabbit = require("rabbitmq-stream-js-client")
const streamName = "hello-nodejs-stream";

async function send() {
    console.log("Connecting...");
    const client = await rabbit.connect({
        vhost: "/",
        port: 5552,
        hostname: "IP",
        username: "username",
        password: "password",
    });

    console.log("Making sure the stream exists...");
    // max-length-bytes: 设置 Stream 的最大存储大小为 5GB。
    const streamSizeRetention = 5 * 1e9
    await client.createStream({ stream: streamName, arguments: { "max-length-bytes": streamSizeRetention } });

    const publisher = await client.declarePublisher({ stream: streamName });

    console.log("Sending a message...");

    for (let i = 0; i < 10; i++) {
        await publisher.send(Buffer.from("Test message1"));
    }
}

send();
// receive.js
const rabbit = require("rabbitmq-stream-js-client")
const streamName = "hello-nodejs-stream";

async function receive() {
    console.log("Connecting...");
    const client = await rabbit.connect({
        vhost: "/",
        port: 5552,
        hostname: "IP",
        username: "username",
        password: "password",
    });

    console.log("Making sure the stream exists...");
    // max-length-bytes: 设置 Stream 的最大存储大小为 5GB。
    const streamSizeRetention = 5 * 1e9
    await client.createStream({ stream: streamName, arguments: { "max-length-bytes": streamSizeRetention } });

    await client.declareConsumer({ stream: streamName, offset: rabbit.Offset.first() }, (message) => {
        console.log(`Received message ${message.content.toString()}`)
    });
}

receive();

关于 offset: rabbit.Offset.first()

Stream消息的保留机制

RabbitMQ Stream 提供了灵活的保留策略,允许用户根据需求配置消息的存储和删除规则。以下是常见的保留策略:

  1. 基于时间的保留(Time-based Retention)

await client.createStream({
    stream: streamName,
    arguments: { "x-max-age": 3600000 } // 消息保留 1 小时
});
  1. 基于存储大小的保留(Size-based Retention)

await client.createStream({
    stream: streamName,
    arguments: { "max-length-bytes": 5 * 1e9 } // 最大存储 5GB
});
  1. 基于分区的保留

Offset Tracking

Offset Tracking即偏移跟踪

Keep track of message processing

.

什么是偏移量?流可以看作是元素是消息的数组。 偏移量是数组中给定消息的索引。

// send.js
const rabbit = require("rabbitmq-stream-js-client")
const streamName = "hello-nodejs-stream";

async function send() {
    console.log("Connecting...");
    const client = await rabbit.connect({
        vhost: "/",
        port: 5552,
        hostname: "IP",
        username: "username",
        password: "password",
    });

    console.log("Making sure the stream exists...");
    const streamSizeRetention = 5 * 1e9
    await client.createStream({ stream: streamName, arguments: { "max-length-bytes": streamSizeRetention } });

    const publisher = await client.declarePublisher({ stream: streamName });

    const messageCount = 100;
    console.log(`Publishing ${messageCount} messages...`);

    for (let i = 0; i < messageCount; i++) {
        const body = i === messageCount - 1 ? "marker" : `hello ${i}`;
        await publisher.send(Buffer.from(body));
    }
}

send();
// receive.js
const rabbit = require("rabbitmq-stream-js-client")
const streamName = "hello-nodejs-stream";

async function receive() {
    console.log("Connecting...");
    const client = await rabbit.connect({
        vhost: "/",
        port: 5552,
        hostname: "IP",
        username: "admin",
        password: "password",
    });

    console.log("Making sure the stream exists...");
    const streamSizeRetention = 5 * 1e9

    await client.createStream({ stream: streamName, arguments: { "max-length-bytes": streamSizeRetention } });

    const startFrom = rabbit.Offset.first();
    let firstOffset = startFrom.value;
    let lastOffset = startFrom.value;
    let messageCount = 0;

    await client.declareConsumer({ stream: streamName, offset: startFrom }, (message) => {
        messageCount++;
        if (messageCount === 1) {
            console.log("First message received");
            firstOffset = message.offset;
        }
        if (message.content.toString() === "marker") {
            console.log("Marker found");
            lastOffset = message.offset;
            console.log(`Done consuming, first offerset was ${firstOffset}, last offset was ${lastOffset}`);
        }
    });
}

receive();

指定偏移量

const startFrom = rabbit.Offset.offset(42n);

服务器端偏移跟踪,consumer可以存储offset,以便下次重新连接后可以query出来。

const rabbit = require("rabbitmq-stream-js-client")
const streamName = "hello-nodejs-stream";

async function receive() {
    console.log("Connecting...");
    const client = await rabbit.connect({
        vhost: "/",
        port: 5552,
        hostname: "IP",
        username: "username",
        password: "password",
    });

    console.log("Making sure the stream exists...");
    const streamSizeRetention = 5 * 1e9

    await client.createStream({ stream: streamName, arguments: { "max-length-bytes": streamSizeRetention } });

    // start consuming at the beginning of the stream
    const consumerRef = "offset-tracking-tutorial"; // the consumer must a have name
    let firstOffset = undefined;
    let offsetSpecification = rabbit.Offset.first();

    // 先从服务器获取这个consumer的offset
    try {
        // take the offset stored on the server if it exists
        const offset = await client.queryOffset({ reference: consumerRef, stream: streamName });
        offsetSpecification = rabbit.Offset.offset(offset + 1n); // start from the message after 'marker'
    } catch (e) {
        if (e) {
            throw e;
        }
    }

    let lastOffset = offsetSpecification.value;
    let messageCount = 0;
    const consumer = await client.declareConsumer(
        { stream: streamName, offset: offsetSpecification, consumerRef },
        async (message) => {
            messageCount++;
            if (!firstOffset && messageCount === 1) {
                firstOffset = message.offset;
                console.log("First message received");
            }
            if (messageCount % 10 === 0) { // 每10个消息存一下offset
                await consumer.storeOffset(message.offset); // store offset every 10 messages
            }

            console.log(`${message.content.toString()}`);
            if (message.content.toString() === "marker") {
                console.log("Marker found");
                lastOffset = message.offset;
                await consumer.storeOffset(message.offset); // store the offset on consumer closing
                await consumer.close(true);
                process.exit(0);
            }
        }
    );

    console.log(`Start consuming...`);
}

receive();