博客 RabbitMQ 从原理到实战—golang版本(下)

RabbitMQ 从原理到实战—golang版本(下)

   数栈君   发表于 2024-11-13 15:57  251  0

4.RabbitMQ 四种工作模型实战

4.1 Simple模式

单发单收,消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/e469e6888d7b7e033bf7e9836d3c9d3c..jpg

生产者端

package main

import (
"github.com/streadway/amqp"
"log"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://xz:xz123456@139.199.162.166:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

// 声明一个队列
q, err := ch.QueueDeclare(
"test_queue", // 队列名称
false, // 是否持久化
false, // 是否自动删除
false, // 是否为排他性队列
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")

// 要发送的消息
body := "Hello RabbitMQ!"
err = ch.Publish(
"", // 交换机名称,空表示默认的交换机
q.Name, // 路由键,这里使用队列名称
false, // 是否等待响应
false, // 是否强制消息路由到队列
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")

log.Printf(" [x] Sent %s", body)
}

消费者端

package main

import (
"github.com/streadway/amqp"
"log"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://xz:xz123456@139.199.162.166:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

// 声明一个队列,确保消费者也使用相同的队列
q, err := ch.QueueDeclare(
"test_queue", // 队列名称
false, // 是否持久化
false, // 是否自动删除
false, // 是否为排他性队列
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")

// 注册一个消费者来接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标识符
true, // 自动确认
false, // 是否为排他性消费者
false, // 如果服务器没有消息会阻塞消费者
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to register a consumer")

// 使用 goroutine 来处理消息
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()

// 阻塞主 goroutine,直到程序终止
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
select {}
}

运行:

可以先启动消费者端,在启动生产者端,每运行一次生产者端,消费者端就会消费一次

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/162967cf77415e4bbefe9868bd1f0b60..jpg

4.2 工作队列 Work Queue

让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/4a7e92e7d063ef2e254fafa4ea59468b..jpg

生产者端

package main

import (
"github.com/streadway/amqp"
"log"
"os"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://user:pwd@IP地址:5672/虚拟机")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

// 声明一个持久化队列
q, err := ch.QueueDeclare(
"work_queue", // 队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否为排他性队列
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")

// 获取要发送的消息内容
body := bodyFrom(os.Args)

// 发布消息到队列
err = ch.Publish(
"", // 默认交换机
q.Name, // 路由键为队列名
false, // 是否等待响应
false, // 是否强制消息路由到队列
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 消息持久化
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")

log.Printf(" [x] Sent %s", body)
}

// 从命令行参数获取消息内容
func bodyFrom(args []string) string {
var s string
if len(args) < 2 || os.Args[1] == "" {
s = "Hello RabbitMQ!"
} else {
s = os.Args[1]
}
return s
}

消费者端

package main

import (
"github.com/streadway/amqp"
"log"
"time"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://user:pwd@IP地址:5672/虚拟机")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

// 声明一个持久化队列
q, err := ch.QueueDeclare(
"work_queue", // 队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否为排他性队列
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")

// 设置每次只分发一个任务给消费者
err = ch.Qos(
1, // 每次预取一个任务
0, // 不限制消息总数
false, // 应用在当前通道
)
failOnError(err, "Failed to set QoS")

// 注册消费者,接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"work queue consumer", // 消费者标识符
false, // 手动确认消息
false, // 是否为排他性消费者
false, // 如果服务器没有消息会阻塞消费者
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to register a consumer")

// 使用 goroutine 处理消息
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
doWork(d.Body)
log.Printf("Done")
d.Ack(false) // 手动确认消息处理完毕
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
select {}
}

// 模拟任务处理,根据消息中的点号数量来延迟处理时间
func doWork(body []byte) {
for _, char := range body {
if char == '.' {
time.Sleep(1 * time.Second)
}
}
}

运行,可以同时启动多个消费者端,不断运行生产者端

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/a04f3486e7107bc2452662512e1fabea..jpg

4.3 发布订阅Pub/Sub模式

在这种模型中,多了一个 Exchange 角色,而且过程略有变化:

P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X (交换机)。

C:消费者,消息的接收者,会一直等待消息到来。

Queue:消息队列,接收消息、缓存消息。

Exchange:交换机(X) ,一方面,接收生产者发送的消息。另一方面,如何处理消息,递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/00f2e52802eaf3324931440ed9dfdd7d..jpg

Exchange类型:

·  Fanout:广播,将消息交给所有绑定到交换机的队列。
·  Direct:全值匹配,把消息交给符合指定routing key的队列。
·  Topic:通配符,与Direct类型类似,但Direct类型要求routing key完全相等,而Topic类型是对routing key进行模糊匹配,比Direct灵活。
·  Headers:根据Message的一些头部信息来分发过滤Message,用的比较少。

4.3.1Fanout

生产者端

package main

import (
"github.com/streadway/amqp"
"log"
"os"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
exchangeName := "fanout exchange"
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://xz:xz123456@139.199.162.166:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

// 声明一个交换机,类型为fanout
err = ch.ExchangeDeclare(
exchangeName, // 交换机名称
"fanout", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否为内置交换机
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare an exchange")

// 获取要发送的消息内容
body := bodyFrom(os.Args)

// 发布消息到交换机
err = ch.Publish(
exchangeName, // 交换机名称
"", // 路由键为空,fanout模式忽略
false, // 是否等待响应
false, // 是否强制消息路由到队列
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 消息持久化
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")

log.Printf(" [x] Sent %s", body)
}

// 从命令行参数获取消息内容
func bodyFrom(args []string) string {
var s string
if len(args) < 2 || os.Args[1] == "" {
s = "info: Hello RabbitMQ!"
} else {
s = os.Args[1]
}
return s
}

消费者端

package main

import (
"github.com/streadway/amqp"
"log"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
exchangeName := "fanout exchange"
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://xz:xz123456@139.199.162.166:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

// 声明交换机,类型为fanout,确保消费者能找到交换机
err = ch.ExchangeDeclare(
exchangeName, // 交换机名称
"fanout", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否为内置交换机
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare an exchange")

// 声明一个随机名称的临时队列,用于接收交换机的消息
q, err := ch.QueueDeclare(
"", // 队列名称留空,RabbitMQ 会生成一个随机名称
false, // 是否持久化
false, // 是否自动删除
true, // 是否为排他性队列
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")

// 绑定队列到交换机
err = ch.QueueBind(
q.Name, // 队列名称
"", // 路由键为空,fanout模式忽略
exchangeName, // 交换机名称
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to bind a queue")

// 注册消费者,接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标识符
true, // 自动确认消息
false, // 是否为排他性消费者
false, // 如果服务器没有消息会阻塞消费者
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to register a consumer")

// 使用 goroutine 处理消息
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
select {}
}

4.3.2 Direct

生产者端

package main

import (
"github.com/streadway/amqp"
"log"
"os"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
exchangeName := "direct exchange"
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://xz:xz123456@139.199.162.166:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

// 声明一个交换机,类型为direct
err = ch.ExchangeDeclare(
exchangeName, // 交换机名称
"direct", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否为内置交换机
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare an exchange")

// 获取要发送的消息内容和路由键
body := bodyFrom(os.Args)
severity := severityFrom(os.Args)

// 发布消息到交换机
err = ch.Publish(
exchangeName, // 交换机名称
severity, // 路由键,用于消息路由
false, // 是否等待响应
false, // 是否强制消息路由到队列
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 消息持久化
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")

log.Printf(" [x] Sent %s: %s", severity, body)
}

// 从命令行参数获取消息内容
func bodyFrom(args []string) string {
if len(args) < 3 || os.Args[2] == "" {
return "Hello RabbitMQ!"
}
return os.Args[2]
}

// 从命令行参数获取消息的路由键
func severityFrom(args []string) string {
if len(args) < 2 || os.Args[1] == "" {
return "info"
}
return os.Args[1]
}

消费者端

package main

import (
"github.com/streadway/amqp"
"log"
"os"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
exchangeName := "direct exchange"
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://xz:xz123456@139.199.162.166:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

// 声明交换机,类型为direct,确保消费者能找到交换机
err = ch.ExchangeDeclare(
exchangeName, // 交换机名称
"direct", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否为内置交换机
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare an exchange")

// 声明一个随机名称的临时队列,用于接收交换机的消息
q, err := ch.QueueDeclare(
"", // 队列名称留空,RabbitMQ 会生成一个随机名称
false, // 是否持久化
false, // 是否自动删除
true, // 是否为排他性队列
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")

// 从命令行参数获取要绑定的路由键(日志级别)
severity := severityFrom(os.Args)

// 绑定队列到交换机,使用指定的路由键
err = ch.QueueBind(
q.Name, // 队列名称
severity, // 路由键(日志级别)
exchangeName, // 交换机名称
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to bind a queue")

// 注册消费者,接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标识符
true, // 自动确认消息
false, // 是否为排他性消费者
false, // 如果服务器没有消息会阻塞消费者
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to register a consumer")

// 使用 goroutine 处理消息
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
select {}
}

// 从命令行参数获取路由键
func severityFrom(args []string) string {
if len(args) < 2 || os.Args[1] == "" {
return "info"
}
return os.Args[1]
}

4.3.4 Topic

生产者端

ish a message")

log.Printf(" [x] Sent %s: %s", routingKey, body)
}

// 从命令行参数获取消息内容
func bodyFrom(args []string) string {
if len(args) < 3 || os.Args[2] == "" {
return "Hello RabbitMQ!"
}
return os.Args[2]
}

// 从命令行参数获取消息的路由键
func routingKeyFrom(args []string) string {
if len(args) < 2 || os.Args[1] == "" {
return "anonymous.info"
}
return os.Args[1]
}

消费者端

package main

import (
"github.com/streadway/amqp"
"log"
"os"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
exchangeName := "topic exchange"
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://xz:xz123456@139.199.162.166:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

// 声明交换机,类型为topic,确保消费者能找到交换机
err = ch.ExchangeDeclare(
exchangeName, // 交换机名称
"topic", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否为内置交换机
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare an exchange")

// 声明一个随机名称的临时队列,用于接收交换机的消息
q, err := ch.QueueDeclare(
"", // 队列名称留空,RabbitMQ 会生成一个随机名称
false, // 是否持久化
false, // 是否自动删除
true, // 是否为排他性队列
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")

// 从命令行参数获取要绑定的路由键(日志类别)
bindingKey := bindingKeyFrom(os.Args)

// 绑定队列到交换机,使用指定的路由键
err = ch.QueueBind(
q.Name, // 队列名称
bindingKey, // 路由键(日志类别)
exchangeName, // 交换机名称
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to bind a queue")

// 注册消费者,接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标识符
true, // 自动确认消息
false, // 是否为排他性消费者
false, // 如果服务器没有消息会阻塞消费者
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to register a consumer")

// 使用 goroutine 处理消息
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
select {}
}

// 从命令行参数获取路由键
func bindingKeyFrom(args []string) string {
if len(args) < 2 || os.Args[1] == "" {
return "#"
}
return os.Args[1]
}

运行

启动多个消费者端,观察控制台的打印

go run consumer.go "kern.*" # 只接收与内核相关的日志
go run consumer.go "*.critical" # 只接收严重级别的日志
go run consumer.go "#" # 接收所有日志

发送消息

go run producer.go "kern.critical" "Kernel panic - critical error"
go run producer.go "app.info" "App started successfully"
go run producer.go "app.error" "App failed to start"

5.保证消息可靠性机制详解

需要保证消息不能丢失,消息从生产者生产出来一直到消费者消费成功,这条链路是这样的:

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/5e04e514af5d207389b5df3949dd2b84..jpg

将从以下几个方面来保证消息的可靠性。

5.1 消息持久化
队列持久化:在声明队列时,将 durable 参数设置为 true,使得队列在 RabbitMQ 重启后仍然存在。
消息持久化:在发送消息时,将 delivery_mode 设置为 2,这样消息会被写入磁盘,即使 RabbitMQ 崩溃或重启,消息也不会丢失。

5.2 确认机制
消息的可靠投递分为了两大内容:发送端的确认和消费端的确认。

5.2.1 发送端
发送端的消息可靠性投递:confirm 确认模式和return 退回模式。

**confirm 确认模式:**消息从 producer 到达 exchange 则会给 producer 发送一个应答,我们需要开启confirm模式,才能接收到这条应答。开启方式是将Channel.Confirm(noWait bool)参数设置为false,表示同意发送者将当前channel信道设置为confirm模式。

**return 退回模式:**消息从 exchange–>queue 投递失败,会将消息退回给producer。

5.2.2 消费端
消息从Queue发送到消费端之后,消费端会发送一个确认消息:Consumer Ack,有两种确认方式:自动确认和手动确认。

**自动确认:**当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。

**手动确认:**在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用ch.Ack(false),手动确认,如果出现异常,则调用d.Reject(true)让其自动重新发送消息。

5.3 事务机制
RabbitMQ 支持 AMQP 事务机制,允许生产者将消息发送操作包裹在一个事务中。如果事务提交成功,则消息被投递;如果事务回滚,则消息不会被投递。但是这种方式比较消耗性能,实际场景中使用比较少。

5.4 消息重发机制
消息重发:如果生产者未收到确认,可以重发消息。

死信队列(Dead-Letter Exchange, DLX):消息在多次重发失败后,可以被投递到一个死信交换器(DLX),由专门的消费者进行处理。


5.5 优先级队列
当有多个消息等待被投递时,优先级队列可以确保高优先级的消息先被处理,从而提高重要消息的可靠性。

5.6 集群模式/镜像队列
通过 RabbitMQ 的镜像队列(集群模式),消息可以在多个节点间复制,确保在节点故障时,消息仍然可用。

6.RabbitMQ 高级特性

6.1 延时队列&死信队列

通过设置消息的过期时间到TTL这种方式来讲解

当消息没有配置消费者,消息就一直停留在队列中,停留时间超过存活时间后,消息就会被自动移动到死信交换机

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/36ac65d1bd1a7ada4e3214c7769bd1e0..jpg

6.1.1 rabbitmq_delayed_message_exchange 插件安装

插件下载地址:rabbitmq-delayed-message-exchange


将下载好的插件上传到linux系统中,拷贝到容器内部

$ dockercp rabbitmq_delayed_message_exchange-3.13.0.ez rabbitmq:/plugins

执行

$ rabbitmq-plugins enable rabbitmq_delayed_message_exchange

刷新界面 在add exchange就能看到x-delayed-message这个选项
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/13e9d5cd7c830f6f850285a9139870ad..jpg

6.1.2 插件使用

生产者的实现很简单,只需要在消息的header中加入"x-delay"字段并用其值表示消息的 TTL,最后将其发送到延迟队列即可。

package main

import (
"github.com/streadway/amqp"
"log"
"time"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
conn, err := amqp.Dial("amqp://xz:xz123456@139.199.162.166:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

var (
exchange = "x-delayed-message"
queue = "delay_queue"
routingKey = "log_delay"
body string
)
// 申请交换机
err = ch.ExchangeDeclare(exchange, exchange, true, false, false, false, amqp.Table{
"x-delayed-type": "direct",
})
failOnError(err, "交换机申请失败!")

err = ch.QueueBind(queue, routingKey, exchange, false, nil)
failOnError(err, "绑定交换机失败!")

body = "==========10000=================" + time.Now().Local().Format("2006-01-02 15:04:05")
// 将消息发送到延时队列上
err = ch.Publish(
exchange,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
Headers: map[string]interface{}{
"x-delay": "10000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
},
})

failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)

body = "==========20000=================" + time.Now().Local().Format("2006-01-02 15:04:05")
// 将消息发送到延时队列上
err = ch.Publish(
exchange,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
Headers: map[string]interface{}{
"x-delay": "20000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
},
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)

body = "==========5000=================" + time.Now().Local().Format("2006-01-02 15:04:05")
// 将消息发送到延时队列上
err = ch.Publish(
exchange,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
Headers: map[string]interface{}{
"x-delay": "5000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
},
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}

消费者

package main

import (
"github.com/streadway/amqp"
"log"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
// 建立链接
conn, err := amqp.Dial("amqp://xz:xz123456@139.199.162.166:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

var (
exchange = "x-delayed-message"
queue = "delay_queue"
routingKey = "log_delay"
)

// 申请交换机
err = ch.ExchangeDeclare(
exchange,
exchange,
true,
false,
false,
false,
amqp.Table{
"x-delayed-type": "direct",
})

failOnError(err, "交换机申请失败!")

// 声明一个常规的队列, 其实这个也没必要声明,因为 exchange 会默认绑定一个队列
q, err := ch.QueueDeclare(
queue,
true,
true,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")

err = ch.QueueBind(
q.Name,
routingKey,
exchange,
false,
nil)
failOnError(err, "Failed to bind a queue")

// 这里监听的是 test_logs
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
for d := range msgs {
log.Printf("接收数据 [x] %s", d.Body)
}
}()

log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}

运行

生产者

2024/08/22 09:06:39 [x] Sent ==========10000=================2024-08-22 09:06:39
2024/08/22 09:06:39 [x] Sent ==========20000=================2024-08-22 09:06:39
2024/08/22 09:06:39 [x] Sent ==========5000=================2024-08-22 09:06:39

消费者

2024/08/22 09:06:44 接收数据 [x] ==========5000=================2024-08-22 09:06:39
2024/08/22 09:06:49 接收数据 [x] ==========10000=================2024-08-22 09:06:39
2024/08/22 09:06:59 接收数据 [x] ==========20000=================2024-08-22 09:06:39

6.2 优先级队列
在实现 RabbitMQ 优先级队列时,你需要为队列设置 x-max-priority 参数,指定一个优先级范围,通常建议在 0 到 10 之间,这个值表示队列中消息的最高优先级。

当生产者发送消息时,需要设置 priority 属性,建议不要超过你设置的最大优先级值。如果超过了这个范围,设置的优先级将不再生效。在指定范围内,数字越大,优先级越高。

优先级队列的处理场景主要适用于生产者的消息产生速度快于消费者的处理速度。当队列中有消息堆积时,优先级策略才会发挥作用,通过优先调度高优先级的消息,提高处理效率。如果消费者消费速度更快或等于生产速度,则优先级队列的作用不明显。
生产者

ackage main

import (
"fmt"
"github.com/streadway/amqp"
"log"
"strconv"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s:%s", msg, err)
panic(fmt.Sprintf("%s:%s", msg, err))
}
}

func main() {
conn, err := amqp.Dial("amqp://xz:xz123456@139.199.162.166:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

var args amqp.Table
args = amqp.Table{"x-max-priority": int32(10)}
q, err := ch.QueueDeclare(
"priqueue",
true,
false,
false,
false,
args,
)
failOnError(err, "Failed to declare q queue")

// 生产者一次性创建6个消息,其中奇数优先级为2,偶数优先级为1,并阻塞到RabbitMQ上面,先不启动消费者 或则一下子就消费了 体现不出来优先级
for i := 0; i < 6; i++ {
body := "hello rabbitmq"
body += strconv.Itoa(i)
pri := i%2 + 1
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Priority: uint8(pri),
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}

return
}

消费者

package main

import (
"fmt"
"github.com/streadway/amqp"
"log"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s:%s", msg, err)
panic(fmt.Sprintf("%s:%s", msg, err))
}
}

func main() {
conn, err := amqp.Dial("amqp://xz:xz123456@139.199.162.166:5672/xz")
failOnError(err, "Failed to connect to server")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to connect to channel")
defer ch.Close()
var args amqp.Table
args = amqp.Table{"x-max-priority": int32(10)}
q, err := ch.QueueDeclare(
"priqueue",
true,
false,
false,
false,
args,
)
failOnError(err, "Failed to declare a queue")

err = ch.Qos(
1,
0,
false,
)
failOnError(err, "Failed to set QoS")

msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
for d := range msgs {
log.Printf("接收数据 [x] %s", d.Body)
}
}()

log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}

运行 先运行生产者阻塞住 在运行消费者

优先级队列会优先处理优先级为 2 的消息,之后再处理优先级为 1 的消息。对于具有相同优先级的消息,队列则按照先进先出(FIFO)的顺序进行消费。这种机制确保了高优先级的消息能被优先处理,而同优先级的消息则保持原有的发送顺序。

2024/08/22 10:05:44 接收数据 [x] hello rabbitmq1
2024/08/22 10:05:44 接收数据 [x] hello rabbitmq3
2024/08/22 10:05:44 接收数据 [x] hello rabbitmq5
2024/08/22 10:05:44 接收数据 [x] hello rabbitmq0
2024/08/22 10:05:44 接收数据 [x] hello rabbitmq2
2024/08/22 10:05:44 接收数据 [x] hello rabbitmq4

6.3 监控和告警

使用 Prometheus 方式

6.3.1 RabbitMq 启动 prometheus监控插件

rabbitmq-plugins enable rabbitmq_prometheus

启动之后可以看到开放了15692端口 

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/26e072378a27a98604a4a142d69faa46..jpg

验证

http://yourIP:15692/metrics

小坑  

我这边遇到访问不通的原因有两个:
1.用的腾讯云 必须在防火墙开发15692端口
2.在用docker启动的时候必须做端口映射 -p15692:15692

6.3.2 prometheus 安装
6.3.2.1 下载安装

下载地址

上传到linux系统中,解压

tar xvfz prometheus-2.54.0.linux-amd64.tar.gz
# 改名 名字太长了(可选)
mv prometheus-2.54.0.linux-amd64 prometheus
6.3.2.2 修改配置
# 进入目录
cd prometheus/
# 修改配置
vim prometheus.yml

修改配置

scrape_configs:
# The job name is added as a label `job=` to any timeseries scraped from this config.
# 监控的表示名
- job_name: "rabbitmq"

# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.

static_configs:
- targets: ["localhost:15692"] # rabbitmq部署路径

6.3.2.3 启动
./prometheus

也可以设置成一个服务 可以直接后台启动

# 创建数据目录
$ mkdir -p /data/prometheus/prometheus/data
# 创建用户并授权
$ useradd prometheus
$ chown -R prometheus:prometheus /usr/local/prometheus /data/prometheus
# 添加服务
$ vim /usr/lib/systemd/system/prometheus.service

服务文件内容

[Unit]
Description= Prometheus
After=network.target

[Service]
Type=simple
User=prometheus
ExecStart=/usr/local/prometheus/prometheus/prometheus --config.file=/usr/local/prometheus/prometheus/prometheus.yml --storage.tsdb.path=/data/prometheus/prometheus/data
ExecReload=/bin/kill -HUP $MAINPID
Restart=on-failure

[Install]
WantedBy=multi-user.target

启动

# 启动
$ systemctl start prometheus.service
# 查看状态
$ systemctl status prometheus.service

验证

访问http://ip:9090/targets?search= 是否有rabbitmq

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/ecf996f6087a452d7ccbcae192a4f708..jpg

6.3.3 Grafana 安装

下载地址

安装

$ yum install-y https://dl.grafana.com/enterprise/release/grafana-enterprise-11.1.4-1.x86_64.rpm

启动服务

$ systemctl start grafana-server
$ systemctl status grafana-server

访问

http://ip:3000 账号密码admin/admin 设置新密码

添加 data source

修改 prometheus 地址即可 然后保存

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/f5fc9eb04a0d9ad360569dfdb4c9a30e..jpg

下载grafana 模版

地址 选择一个自己喜欢的样式下载就行

导入模版

Dashboards-New-import将刚才下载的json上传

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/616e6d1a8f8275b75d0c4bbf000d9be1..jpg

展示

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/90f7b3b5da0f05f6329195fc75d2b694..jpg

6.3.4 配置邮箱告警通知

邮箱开启stmp 获取授权码

修改grafana配置

vim /etc/grafana/grafana.ini

修改配置,保存重启

[smtp]
enabled = true
host = smtp.163.com:465
# 你的邮箱地址
user =xxxxx@163.com
# 获得的授权码
password =xxxxx
# 你的邮箱地址
from_address = xxxx@163.com
from_name = Grafana

新增联络点

点击测试回发送一份测试邮箱到你填写的邮件

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/901384fc3164449aad90ee12a2e3b6c6..jpg

http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/527d1b2833a17704907fb89a31449eda..jpg

新增告警配置


看具体需要监测cpu、内存还是什么指标添加告警规则即可。


这份《RabbitMQ从入门到实战》学习笔记到这里就告一段落了。作为一名Go后端工程师,我在项目中经常使用RabbitMQ,这些笔记是我在自学过程中整理出来的。由于我并非一名运维工程师,因此最后章节关于告警的内容可能讲得不够详尽,如果你对此感兴趣,可以查阅相关的深度文章。另外,如果笔记中有任何错误或不足之处,欢迎指正,非常感谢大家的理解与包容。

————————————————

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

原文链接:https://blog.csdn.net/luozong2689/article/details/141439507


《数据资产管理白皮书》下载地址:

《行业指标体系白皮书》下载地址:

《数据治理行业实践白皮书》下载地址:

《数栈V6.0产品白皮书》下载地址:


想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:


同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群