RABBITMQ、KAFKA、ACTIVEMQ、ROCKETMQ、PULSAR。消息队列中间件有很多,在此学习RabbitMQ。
先学习RabbitMQ,比较经典。
直接上Docker好了
docker pull rabbitmq:latest
docker pull rabbitmq:4.0-management
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management
不出意外就会启动成功,前者rabbitmq是正宗rabbitmq服务端口后面的management的端口15672是管理端web网页。
带持久化存储的 设置账户密码的运行方式
docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
-v rabbitmq_data:/var/lib/rabbitmq \
-v rabbitmq_log:/var/log/rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:4.0-management
Docker命名卷 rabbitmq_log 与 rabbitmq_data
root@kTY-HK3-QL-86139:/home/root/note# ls /var/lib/docker/volumes/
backingFsBlockDev metadata.db rabbitmq_data rabbitmq_log
root@kTY-HK3-QL-86139:/home/root/note# ls /var/lib/docker/volumes/
backingFsBlockDev metadata.db rabbitmq_data rabbitmq_log
root@kTY-HK3-QL-86139:/home/root/note# docker volume ls
DRIVER VOLUME NAME
local rabbitmq_data
local rabbitmq_log
root@kTY-HK3-QL-86139:/home/root/note# docker volume inspect rabbitmq_data
[
{"CreatedAt": "2025-03-20T06:44:37Z",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/rabbitmq_data/_data",
"Name": "rabbitmq_data",
"Options": null,
"Scope": "local"
}
]
root@kTY-HK3-QL-86139:/home/root/note# docker volume inspect rabbitmq_log
[
{"CreatedAt": "2025-03-20T06:44:37Z",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/rabbitmq_log/_data",
"Name": "rabbitmq_log",
"Options": null,
"Scope": "local"
}
]
容器内部管理命令
docker exec -it rabbitmq bash
# 进入容器后可以执行以下命令
rabbitmqctl list_queues
rabbitmqctl list_exchanges
rabbitmqctl list_users
rabbitmqctl add_user newuser newpassword
rabbitmqctl set_user_tags newuser administrator
rabbitmqctl set_permissions -p / newuser ".*" ".*" ".*"
常用Docker命令
# 停止RabbitMQ容器
docker stop rabbitmq
# 启动已有的RabbitMQ容器
docker start rabbitmq
# 查看容器日志
docker logs rabbitmq
# 进入容器内部
docker exec -it rabbitmq bash
# 查看本地镜像
docker images
# 查看本地镜像
docker image ls
# 只查看特定仓库的镜像
docker images rabbitmq
# 查看特定标签的镜像
docker images rabbitmq:3-management
# 使用过滤器查询
docker images --filter "reference=rabbitmq*"
# 查看镜像详情
docker image inspect rabbitmq:3-management
# 获取镜像ID
docker images -q rabbitmq:3-management
# 查看镜像构建历史
docker history rabbitmq:3-management
# 删除指定镜像
docker rmi rabbitmq:3-management
# 删除所有未使用的镜像
docker image prune
# 删除所有未使用的镜像(不询问)
docker image prune -a -f
# 查看所有正在运行的容器
docker ps
# 查看所有容器(包括已停止的)
docker ps -a
# 查看最近创建的N个容器
docker ps -n 5
# 只显示容器ID(适用于脚本中)
docker ps -q
# 所有容器的ID(包括停止的)
docker ps -aq
# 自定义输出格式
docker ps --format "{{.Names}}: {{.Status}}"
# 表格格式输出
docker ps --format "table {{.ID}}\t{{.Names}}\t{{.Status}}"
# 按名称过滤
docker ps -a --filter "name=rabbitmq"
# 按状态过滤(running, exited, restarting等)
docker ps --filter "status=running"
# 按镜像过滤
docker ps --filter "ancestor=rabbitmq:3-management"
# 多条件过滤
docker ps --filter "status=exited" --filter "exited=1"
# 查看容器详细配置
docker inspect rabbitmq
# 查询特定信息
docker inspect --format='{{.NetworkSettings.IPAddress}}' rabbitmq
# 查看容器资源使用统计
docker stats
# 查看特定容器资源使用
docker stats rabbitmq
# 查看容器日志
docker logs rabbitmq
# 查看最近的日志
docker logs --tail 100 rabbitmq
# 实时查看日志
docker logs -f rabbitmq
# 查看容器内正在运行的进程
docker top rabbitmq
# 删除容器
docker rm rabbitmq
浏览器打开 htto://localhost:15672
看官方文档最靠谱
Tutorials:
https://www.rabbitmq.com/tutorials
Docs:
https://www.rabbitmq.com/docs/
主要有两部分
npm install amqplib
The simplest thing that does something
在本教程的这一部分中,我们将使用Javascript编写两个小型程序;一个 发送单个消息的生产者,以及接收的消费者 消息并将其打印出来。
// send.js
var amqp = require('amqplib/callback_api');
// 连接到服务器,添加用户名和密码
.connect('amqp://user:password@IP', function (error0, connection) {
amqpif (error0) {
throw error0;
}// 创建一个通道
.createChannel(function (error1, channel) {
connectionif (error1) {
throw error1;
}// 定义一个队列
var queue = 'hello';
var msg = 'Hello World!';
// 发送消息到队列
.assertQueue(queue, {
channeldurable: false
;
})// for (let i = 0; i < 100; i++) {
.sendToQueue(queue, Buffer.from(msg));
channelconsole.log(" [x] Sent %s", msg);
// }
;
})
setTimeout(function () {
.close();
connectionprocess.exit(0)
, 500);
}; })
// receive.js
var amqp = require('amqplib/callback_api');
.connect('amqp://user:password@IP', function (error0, connection) {
amqpif (error0) {
throw error0;
}.createChannel(function (error1, channel) {
connectionif (error1) {
throw error1;
}var queue = 'hello';
.assertQueue(queue, {
channeldurable: false
;
})
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
.consume(queue, function (msg) {
channelconsole.log(" [x] Received %s", msg.content.toString());
, {
}noAck: true
;
});
}); })
Connection、Channel、Queue
Connection
客户端与 RabbitMQ
服务器的物理连接。一个连接可以包含多个通道(channel),以便复用连接资源。Channel
channel 是基于 connection
的虚拟通信通道。通常建议为每个线程或任务创建一个独立的通道,而不是为每个任务创建一个新的连接。Queue
队列可以配置为持久化(durable)或非持久化(non-durable)。队列的名称是唯一标识,消费者和生产者通过队列名称进行交互。channel.asserQueue
是RabbitMQ中用于声明或检查队列的一个方法,它的作用是确保队列存在,如果队列不存在,则创建一个新队列。
.assertQueue(queue, [options], [callback]);
channel
queue:队列的名称(字符串)。如果传入空字符串,RabbitMQ 会创建一个随机名称的队列。
options:可选参数,用于配置队列属性。
durable:是否持久化队列(默认值为 false)。持久化队列在 RabbitMQ 重启后仍然存在。
exclusive:是否为独占队列(默认值为 false)。独占队列只能由声明它的连接使用,并在连接关闭时自动删除。
autoDelete:是否自动删除队列(默认值为 false)。当队列不再被使用时自动删除。
callback:可选回调函数,返回队列的状态信息。
Distributing tasks among workers (the competing consumers pattern)
// send.js
var amqp = require('amqplib/callback_api');
// 连接到服务器,添加用户名和密码
.connect('amqp://user:password@IP', function (error0, connection) {
amqpif (error0) {
throw error0;
}// 创建一个通道
.createChannel(function (error1, channel) {
connectionif (error1) {
throw error1;
}// 定义一个队列
var queue = 'hello';
var msg = 'Hello World!';
// 发送消息到队列
.assertQueue(queue, {
channeldurable: false
;
}).sendToQueue(queue, Buffer.from(msg),
channel
{persistent: true//持久化消息 前提是队列也是持久化的
;
})console.log(" [x] Sent %s", msg);
;
})
setTimeout(function () {
.close();
connectionprocess.exit(0)
, 500);
}; })
// receive.js
var amqp = require('amqplib/callback_api');
.connect('amqp://user:password@IP', function (error0, connection) {
amqpif (error0) {
throw error0;
}.createChannel(function (error1, channel) {
connectionif (error1) {
throw error1;
}var queue = 'hello';
.assertQueue(queue, {
channeldurable: false
;
})
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
.consume(queue, function (msg) {
channelconsole.log(" [x] Received %s", msg.content.toString());
// 模拟耗时任务
setTimeout(function () {
console.log(" [x] Done");
, 1000);
}, {
}noAck: true // 手动消费者确认已关闭
;
});
}); })
node receive.js // 开启两个消费者
node receive.js // 开启两个消费者
默认情况下,RabbitMQ会将每条消息发送给下一个消费者, 依次。平均而言,每个消费者将获得相同数量的 讯息。这种分发消息的方式称为循环。尝试 与三个或更多工人一起出去。
执行任务可能需要几秒钟,您可能想知道如果 消费者开始一项艰巨的任务,并在完成之前终止。 使用我们当前的代码,一旦RabbitMQ向消费者传递消息, 立即将其标记为删除。在这种情况下,如果您终止工人, 您丢失了刚刚处理的消息。已发送的消息 对于这个特定的工人,但尚未处理,也会丢失。直接本来就是TCP通信的。
ack 由 消费者告诉RabbitMQ已收到特定消息, 已处理,并且RabbitMQ可以自由删除它。消费者交付确认时将执行超时(默认情况下为30分钟)。
// 手动确认ack
var amqp = require('amqplib/callback_api');
.connect('amqp://user:password@IP', function (error0, connection) {
amqpif (error0) {
throw error0;
}.createChannel(function (error1, channel) {
connectionif (error1) {
throw error1;
}var queue = 'hello';
.assertQueue(queue, {
channeldurable: false
;
})
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
.consume(queue, function (msg) {
channelconsole.log(" [x] Received %s", msg.content.toString());
// 模拟耗时任务
setTimeout(function () {
console.log(" [x] Done");
.ack(msg);
channel, 5000);
}, {
}noAck: false // 启动手动ack
;
});
}); })
比如将消息发给了worker1,worker1在收到msg后,处理5s期间,connection与服务器断开了,也就是不可能ack了,服务器会立即把msg再发给worker2处理,如果没有worker可用则等到有worker可用时msg再发向消费者。可能造成内存积压,忘记手动ack是很严重的事。
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
命令作用
* rabbitmqctl list_queues:列出 RabbitMQ 中所有队列的信息。
* name:显示队列的名称。
* messages_ready:显示队列中准备好被消费者消费的消息数量。
* messages_unacknowledged:显示队列中已被消费者接收但尚未确认的消息数量。
// 确保队列在RabbitMQ节点重新启动后仍然有效
.assertQueue('hello', {durable: true});
channel// RabbitMQ不允许您重新定义现有队列,因为前面已经创建了 durable为false的队列, 具有不同的参数 如需要则再创建一个新的队列
.assertQueue('task_queue', {durable: true});
channel// 消息持久化
.sendToQueue(queue, Buffer.from(msg), {persistent: true}); channel
将消息标记为持久性并不能完全保证该消息 不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘, RabbitMQ接受消息后,还有一个短时间窗口, 尚未保存。而且,RabbitMQ不会 fsync() 每个消息-它可能只是保存到缓存中,而没有真正写入磁盘。持久性保证并不强,但绰绰有余 为我们的简单任务队列。
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://user:password@IP', function (error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function (error1, channel) {
if (error1) {
throw error1;
}
var queue = 'hello';
channel.assertQueue(queue, {
durable: false
});
// 告诉服务器消息一个一个来 没确认就不会发下一个
channel.prefetch(1);
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
channel.consume(queue, function (msg) {
console.log(" [x] Received %s", msg.content.toString());
// 模拟耗时任务
setTimeout(function () {
console.log(" [x] Done");
channel.ack(msg);
}, 5000);
}, {
noAck: false // 启动手动ack
});
});
});
Sending messages to many consumers at once
生产者发送消息的过程
消费者接受消息的过程
如果交换机没有绑定任何队列,生产者向交换机发送的消息会被丢弃。这是 RabbitMQ 的默认行为。
.publish(exchange, routingKey, content, [options]);
channel
exchange(字符串):
指定消息要发送到的交换机名称。'')。
如果使用默认交换机,可以传空字符串(
routingKey(字符串):
指定路由键,用于决定消息的路由方式。
在 fanout 类型的交换机中,路由键会被忽略。
content(Buffer):Buffer 对象。
消息的内容,必须是一个 .from('Hello World!')。
例如:Buffer
options(可选对象):
用于设置消息的属性,例如持久化、优先级等。
常用选项:
persistent(布尔值):是否将消息标记为持久化(默认 false)。
mandatory(布尔值):如果消息无法路由到队列,是否返回给生产者(默认 false)。
expiration(字符串):消息的过期时间(以毫秒为单位)。 headers(对象):自定义消息头。
生产者
// send.js
var amqp = require('amqplib/callback_api');
// 连接到服务器
.connect('amqp://username:password@IP', function (error0, connection) {
amqpif (error0) {
throw error0;
}// 创建一个通道
.createChannel(function (error1, channel) {
connectionif (error1) {
throw error1;
}let exchange = 'logs';
// 声明交换机
.assertExchange(exchange, 'fanout', {
channeldurable: false // 非持久化的 重启后交换机将消失
;
})
let msg = 'Hello World!';
.publish(exchange, '', Buffer.from(msg));
channelconsole.log(" [x] Sent %s", msg);
;
})
setTimeout(function () {
.close();
connectionprocess.exit(0)
, 500);
}; })
消费者
// receive.js
var amqp = require('amqplib/callback_api');
.connect('amqp://username:password@IP', function (error0, connection) {
amqpif (error0) {
throw error0;
}.createChannel(function (error1, channel) {
connectionif (error1) {
throw error1;
}
// 声明交换机
let exchange = 'logs';
.assertExchange(exchange, 'fanout', {
channeldurable: false
;
})
// 声明队列 起一个参数为队列名 因为使用临时队列则不用指定
.assertQueue('', {
channelexclusive: true // 临时队列,连接断开队列会自动删除
, function (error2, q) {
}if (error2) {
throw error2;
}console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
// 绑定交换机和队列
.bindQueue(q.queue, exchange, '');
channel.consume(q.queue, function (msg) {
channelif (msg.content) {
console.log(" [x] %s", msg.content.toString());
}, { noAck: true });
};
});
}); })
启动多个receive,用send发消息,会发现每个receive都会收到相同的消息。
Receiving messages selectively
在上面的Publish/Subscribe模式中,交换机使用了fanout方式,下面将会使用direct交换机方式,当消息的路由key与队列绑定的路由key完全匹配时才发向队列。
// send.js
var amqp = require('amqplib/callback_api');
// 连接到服务器
.connect('amqp://username:password@IP', function (error0, connection) {
amqpif (error0) {
throw error0;
}// 创建一个通道
.createChannel(function (error1, channel) {
connectionif (error1) {
throw error1;
}let exchange = "direct_logs";
let args = process.argv.slice(2);
let msg = args.slice(1).join(' ') || 'Hello World!';
let severity = (args.length > 0) ? args[0] : 'info'; // 路由键
// 声明direct交换机
.assertExchange(exchange, 'direct', {
channeldurable: false // 非持久化的 重启后交换机将消失
;
})
.publish(exchange, severity, Buffer.from(msg));
channel
console.log(" [x] Sent %s: %s", severity, msg);
;
})
setTimeout(function () {
.close();
connectionprocess.exit(0)
, 500);
}; })
// receive.js
var amqp = require('amqplib/callback_api');
let args = process.argv.slice(2);
if (args.length == 0) {
console.log("Usage: receive.js <exchange>");
process.exit(1);
}
.connect('amqp://username:password@IP', function (error0, connection) {
amqpif (error0) {
throw error0;
}
.createChannel(function (error1, channel) {
connectionif (error1) {
throw error1;
}
let exchange = 'direct_logs';
// 声明交换机
.assertExchange(exchange, 'direct', {
channeldurable: false
;
})
// 声明队列
.assertQueue('', {
channelexclusive: true//临时队列,连接断开队列会自动删除
, function (error2, q) {
}if (error2) {
throw error2;
}console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
.forEach(function (severity) {
args// 绑定交换机和队列
.bindQueue(q.queue, exchange, severity);
channel;
})
.consume(q.queue, function (msg) {
channelif (msg.content) {
console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
}, { noAck: true });
};
});
}); })
测试方法
# 启动两个 receive
# receive1
node receive.js warning error
# receive2
node receive.js info warning error
# receive1 receive2 都可以都到
node send.js error "scdscsdcs"
# receive1 receive2 都可以都到
node send.js warning "scdscsdcs"
# receive2 都可以都到
node send.js info "scdscsdcs"
Receiving messages based on a pattern (topics)
和Routing类似,只不过可以进行模式匹配
quick.orange.rabbit
将会发向Q1 Q2lazy.orange.elephant
将会发向Q1 Q2quick.orange.fox
仅发向Q1lazy.brown.fox
仅发向Q2lazy.pink.rabbit
虽然满足Q2两次但仍只会发向Q2一次quick.brown.fox
将被抛弃消息不匹配任何绑定,将会丢失。
主题交换功能强大,可以像其他交换一样运作。当队列绑定到 #(哈希)绑定键-它将接收 所有消息,无论路由密钥如何-例如 fanout 交换。当特殊字符 *(星)和 #(哈希)不用于绑定, 主题交换的行为就像 direct 一。
// send.js
var amqp = require('amqplib/callback_api');
// 连接到服务器
.connect('amqp://username:password@IP', function (error0, connection) {
amqpif (error0) {
throw error0;
}// 创建一个通道
.createChannel(function (error1, channel) {
connectionif (error1) {
throw error1;
}let exchange = "topic_logs";
let args = process.argv.slice(2);
let key = (args.length > 0) ? args[0] : 'anonymous.info';
let msg = args.slice(1).join(' ') || 'Hello World!';
// 声明topic交换机
.assertExchange(exchange, 'topic', {
channeldurable: false // 非持久化的 重启后交换机将消失
;
})
.publish(exchange, key, Buffer.from(msg));
channel
console.log(" [x] Sent %s: %s", key, msg);
;
})
setTimeout(function () {
.close();
connectionprocess.exit(0)
, 500);
}; })
// receive.js
var amqp = require('amqplib/callback_api');
let args = process.argv.slice(2);
if (args.length == 0) {
console.log("Usage: receive.js <facility>.<severity>");
process.exit(1);
}
.connect('amqp://username:password@IP', function (error0, connection) {
amqpif (error0) {
throw error0;
}
.createChannel(function (error1, channel) {
connectionif (error1) {
throw error1;
}
let exchange = 'topic_logs';
// 声明topic交换机
.assertExchange(exchange, 'topic', {
channeldurable: false
;
})
// 声明队列
.assertQueue('', {
channelexclusive: true//临时队列,连接断开队列会自动删除
, function (error2, q) {
}if (error2) {
throw error2;
}console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
.forEach(function (key) {
args// 绑定交换机和队列
.bindQueue(q.queue, exchange, key);
channel;
})
.consume(q.queue, function (msg) {
channelif (msg.content) {
console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
}, { noAck: true });
};
});
}); })
使用样例
node receive.js "#"
node receive.js "kern.*"
node send.js "kern.cds" "cdscs"
node send.js "kern." "cdscs"
Request/reply pattern example
其实背后操作方式就是搞两个队列,一个用于A给B发消息,一个用于B给A发消息。
.assertQueue('', {
channelexclusive: true
;
}).sendToQueue('rpc_queue', Buffer.from('10'), {
channelreplyTo: queue_name
; })
// rpcserver.js
!/usr/bin/env node
#
var amqp = require('amqplib/callback_api');
.connect('amqp://username:password@IP', function (error0, connection) {
amqpif (error0) {
throw error0;
}.createChannel(function (error1, channel) {
connectionif (error1) {
throw error1;
}var queue = 'rpc_queue';
.assertQueue(queue, {
channeldurable: false
;
})
.prefetch(1); // 消息一个一个接受处理
channelconsole.log(' [x] Awaiting RPC requests');
.consume(queue, function reply(msg) {
channelvar n = parseInt(msg.content.toString());
console.log(" [.] fib(%d)", n);
var r = fibonacci(n);
// 使用客户端提供的队列名,将结果回给客户端
.sendToQueue(msg.properties.replyTo,
channelBuffer.from(r.toString()), {
correlationId: msg.properties.correlationId // correlationId用客户端提供的
;
})
.ack(msg);
channel, {
}noAck: false
;
});
});
})
function fibonacci(n) {
if (n == 0 || n == 1)
return n;
else
return fibonacci(n - 1) + fibonacci(n - 2);
}
// rpcclient.js
let amqp = require('amqplib/callback_api');
let args = process.argv.slice(2);
if (args.length == 0) {
console.log("Usage: rpc_client.js num")
process.exit(1);
}
.connect('amqp://username:password@IP', function (error0, connection) {
amqpif (error0) {
throw error0;
}.createChannel(function (error1, channel) {
connectionif (error1) {
throw error1;
}.assertQueue('', {
channelexclusive: true
, function (error2, q) {
}if (error2) {
throw error2;
}let correlationId = generateUuid();
let num = parseInt(args[0]);
console.log(' [x] Requesting fib(%d)', num);
// 接收回包
.consume(q.queue, function (msg) {
channelif (msg.properties.correlationId == correlationId) {
console.log(' [.] Got %s', msg.content.toString());
setTimeout(function () {
.close();
connectionprocess.exit(0)
, 500);
}
}, {
}noAck: true
;
})
.sendToQueue('rpc_queue',
channelBuffer.from(num.toString()), {
correlationId: correlationId,
replyTo: q.queue
;
});
});
});
})
function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
Reliable publishing with publisher confirms
RabbitMQ 官方文档中的 Publisher Confirms 样例主要讲解了如何实现可靠的消息发布机制。Publisher Confirms 是一种机制,用于确保消息从生产者成功发布到 RabbitMQ 服务器的队列中。通过这种机制,生产者可以确认消息是否被 RabbitMQ 正确接收并持久化。因为TCP我们不知道发送的内容是否已经到达目的地。
样例内容
const amqp = require('amqplib');
async function publishMessages() {
try {
const connection = await amqp.connect("amqp://username:password@IP");
const channel = await connection.createConfirmChannel();
const queue = 'confirm_queue';
const message = 'Hello, Publisher Confirms!';
// 声明队列
await channel.assertQueue(queue, { durable: true });
// 发送消息并等待确认
.sendToQueue(queue, Buffer.from(message), { persistent: true }, (err, ok) => {
channelif (err) {
console.error('Message nacked!');
else {
} console.log('Message acked');
};
})
// 连接关闭
setTimeout(() => {
.close();
connection, 500);
}catch (error) {
} console.error('Error: ', error);
}
}
publishMessages();
使用RabbitMQ的Stream服务
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672 \
\
-v rabbitmq_data:/var/lib/rabbitmq \
-v rabbitmq_log:/var/log/rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password '-rabbitmq_stream advertised_host localhost' \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS= rabbitmq:4-management
通过设置 advertised_host 为 localhost,客户端可以通过
localhost:5552
连接到 Stream
服务,而不会因为主机名解析问题导致连接失败。
待服务器启动,然后启用流和流管理插件
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management
RabbitMQ 的 Stream 概念
RabbitMQ 的 Stream 是一种高吞吐量、低延迟的消息流处理机制,专为处理大规模数据流的场景设计。它与传统的 RabbitMQ 队列不同,Stream 提供了类似于 Kafka 的功能,支持消息的顺序消费、分区、持久化以及高效的消息存储和检索。
Stream 的核心特点
高吞吐量:
顺序消费:
分区(Partitioning):
持久化:
灵活的消费模式:
高效存储:
与 RabbitMQ 的无缝集成:
为什么设计 Stream 的概念?
RabbitMQ 设计 Stream 的概念,主要是为了满足现代分布式系统中对高吞吐量、低延迟消息流处理的需求,同时弥补传统队列模型的不足。
Stream 的典型应用场景
The simplest thing that does something
// send.js
const rabbit = require("rabbitmq-stream-js-client")
const streamName = "hello-nodejs-stream";
async function send() {
console.log("Connecting...");
const client = await rabbit.connect({
vhost: "/",
port: 5552,
hostname: "IP",
username: "username",
password: "password",
;
})
console.log("Making sure the stream exists...");
// max-length-bytes: 设置 Stream 的最大存储大小为 5GB。
const streamSizeRetention = 5 * 1e9
await client.createStream({ stream: streamName, arguments: { "max-length-bytes": streamSizeRetention } });
const publisher = await client.declarePublisher({ stream: streamName });
console.log("Sending a message...");
for (let i = 0; i < 10; i++) {
await publisher.send(Buffer.from("Test message1"));
}
}
send();
// receive.js
const rabbit = require("rabbitmq-stream-js-client")
const streamName = "hello-nodejs-stream";
async function receive() {
console.log("Connecting...");
const client = await rabbit.connect({
vhost: "/",
port: 5552,
hostname: "IP",
username: "username",
password: "password",
;
})
console.log("Making sure the stream exists...");
// max-length-bytes: 设置 Stream 的最大存储大小为 5GB。
const streamSizeRetention = 5 * 1e9
await client.createStream({ stream: streamName, arguments: { "max-length-bytes": streamSizeRetention } });
await client.declareConsumer({ stream: streamName, offset: rabbit.Offset.first() }, (message) => {
console.log(`Received message ${message.content.toString()}`)
;
})
}
receive();
关于 offset: rabbit.Offset.first()
Stream消息的保留机制
RabbitMQ Stream 提供了灵活的保留策略,允许用户根据需求配置消息的存储和删除规则。以下是常见的保留策略:
基于时间的保留(Time-based Retention)
await client.createStream({
stream: streamName,
arguments: { "x-max-age": 3600000 } // 消息保留 1 小时
; })
基于存储大小的保留(Size-based Retention)
await client.createStream({
stream: streamName,
arguments: { "max-length-bytes": 5 * 1e9 } // 最大存储 5GB
; })
基于分区的保留
Offset Tracking即偏移跟踪
Keep track of message processing
什么是偏移量?流可以看作是元素是消息的数组。 偏移量是数组中给定消息的索引。
// send.js
const rabbit = require("rabbitmq-stream-js-client")
const streamName = "hello-nodejs-stream";
async function send() {
console.log("Connecting...");
const client = await rabbit.connect({
vhost: "/",
port: 5552,
hostname: "IP",
username: "username",
password: "password",
;
})
console.log("Making sure the stream exists...");
const streamSizeRetention = 5 * 1e9
await client.createStream({ stream: streamName, arguments: { "max-length-bytes": streamSizeRetention } });
const publisher = await client.declarePublisher({ stream: streamName });
const messageCount = 100;
console.log(`Publishing ${messageCount} messages...`);
for (let i = 0; i < messageCount; i++) {
const body = i === messageCount - 1 ? "marker" : `hello ${i}`;
await publisher.send(Buffer.from(body));
}
}
send();
// receive.js
const rabbit = require("rabbitmq-stream-js-client")
const streamName = "hello-nodejs-stream";
async function receive() {
console.log("Connecting...");
const client = await rabbit.connect({
vhost: "/",
port: 5552,
hostname: "IP",
username: "admin",
password: "password",
;
})
console.log("Making sure the stream exists...");
const streamSizeRetention = 5 * 1e9
await client.createStream({ stream: streamName, arguments: { "max-length-bytes": streamSizeRetention } });
const startFrom = rabbit.Offset.first();
let firstOffset = startFrom.value;
let lastOffset = startFrom.value;
let messageCount = 0;
await client.declareConsumer({ stream: streamName, offset: startFrom }, (message) => {
++;
messageCountif (messageCount === 1) {
console.log("First message received");
= message.offset;
firstOffset
}if (message.content.toString() === "marker") {
console.log("Marker found");
= message.offset;
lastOffset console.log(`Done consuming, first offerset was ${firstOffset}, last offset was ${lastOffset}`);
};
})
}
receive();
指定偏移量
const startFrom = rabbit.Offset.offset(42n);
服务器端偏移跟踪,consumer可以存储offset,以便下次重新连接后可以query出来。
const rabbit = require("rabbitmq-stream-js-client")
const streamName = "hello-nodejs-stream";
async function receive() {
console.log("Connecting...");
const client = await rabbit.connect({
vhost: "/",
port: 5552,
hostname: "IP",
username: "username",
password: "password",
;
})
console.log("Making sure the stream exists...");
const streamSizeRetention = 5 * 1e9
await client.createStream({ stream: streamName, arguments: { "max-length-bytes": streamSizeRetention } });
// start consuming at the beginning of the stream
const consumerRef = "offset-tracking-tutorial"; // the consumer must a have name
let firstOffset = undefined;
let offsetSpecification = rabbit.Offset.first();
// 先从服务器获取这个consumer的offset
try {
// take the offset stored on the server if it exists
const offset = await client.queryOffset({ reference: consumerRef, stream: streamName });
= rabbit.Offset.offset(offset + 1n); // start from the message after 'marker'
offsetSpecification catch (e) {
} if (e) {
throw e;
}
}
let lastOffset = offsetSpecification.value;
let messageCount = 0;
const consumer = await client.declareConsumer(
stream: streamName, offset: offsetSpecification, consumerRef },
{ async (message) => {
++;
messageCountif (!firstOffset && messageCount === 1) {
= message.offset;
firstOffset console.log("First message received");
}if (messageCount % 10 === 0) { // 每10个消息存一下offset
await consumer.storeOffset(message.offset); // store offset every 10 messages
}
console.log(`${message.content.toString()}`);
if (message.content.toString() === "marker") {
console.log("Marker found");
= message.offset;
lastOffset await consumer.storeOffset(message.offset); // store the offset on consumer closing
await consumer.close(true);
process.exit(0);
}
};
)
console.log(`Start consuming...`);
}
receive();