-
Notifications
You must be signed in to change notification settings - Fork 0
Home
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议。
RabbitMQ 是 amqp 协议的 Erlang 的实现。
AMQP 的模型架构的主要角色包括:生产者、消费者、交换器、队列。
生产者 (Producter) 即 消息投递方
消费者 (Consumer) 即 消息接收方
服务节点 (Broker) 消息的服务节点
生产者和消费者都属于客户端;服务节点属于服务端;
交换器、队列、绑定
绑定
Rabbitmq 中需要路由键和绑定键联合使用才能使生产者的消息成功投递到队列中去。
RoutingKey: 生产者发送给交换器绑定的 Key
BindingKey: 交换器和队列绑定的 Key
生产者将消息投递到交换器,通过交换器绑定的队列,最终投递到对应的队列中去。
交换器
Rabbitmq 共有 4 种交换器
fanout 把消息投递到所有与此交换器绑定的队列中
direct 把消息投递到 BindingKey 和 RoutingKey 完全匹配的队列中
topic 规则匹配,BindingKey 中存在两种特殊字符
- * 匹配零个或多个单词
- #匹配一个单词
header 不依赖于 RoutingKey 而是通过消息体中的 headers 属性来进行匹配绑定,通过 headers 中的 key 和 BindingKey 完全匹配,由于性能较差一般用的比较少。
在 Golang 中创建 rabbitmq 生产者基本步骤是:
连接 Connection
创建 Channel
创建或连接一个交换器
创建或连接一个队列
交换器绑定队列
投递消息
关闭 Channel
关闭 Connection
// connection
connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
// channel
channel, err := connection.Channel()
if err != nil {
panic(err)
}
// 创建一个交换器
if err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil); err != nil {
panic(err)
}
ExchangeDeclare参数解析:
-
name 交换机名称
-
kind 交换机类型
-
durable 持久化
-
autoDelete 是否自动删除
-
internal 是否是内置交换机
-
noWait 是否等待服务器确认
-
args 其它配置
ExchangeDeclare参数说明要点:
autoDelete:
自动删除功能必须要在交换器曾经绑定过队列或者交换器的情况下,处于不再使用的时候才会自动删除,如果是刚刚创建的尚未绑定队列或者交换器的交换器或者早已创建只是未进行队列或者交换器绑定的交换器是不会自动删除的。
internal:
内置交换器是一种特殊的交换器,这种交换器不能直接接收生产者发送的消息,只能作为类似于队列的方式绑定到另一个交换器,来接收这个交换器中路由的消息,内置交换器同样可以绑定队列和路由消息,只是其接收消息的来源与普通交换器不同。
noWait
当 noWait 为 true 时,声明时无需等待服务器的确认。
创建交换器还有一个差不多的方法 (ExchangeDeclarePassive),他主要是假定交换已存在,并尝试连接到
不存在的交换将导致 RabbitMQ 引发异常,可用于检测交换的存在。
if _, err := channel.QueueDeclare("q1", true, false, false, true, nil); err != nil {
panic(err)
}
QueueDeclare参数解析:
-
name 队列名称
-
durable 持久化
-
autoDelete 自动删除
-
exclusive 排他
-
noWait 是否等待服务器确认
-
args Table参数
QueueDeclare参数说明要点:
exclusive 排他
排他队列只对首次创建它的连接可见,排他队列是基于连接 (Connection) 可见的,并且该连接内的所有信道 (Channel) 都可以访问这个排他队列,在这个连接断开之后,该队列自动删除,由此可见这个队列可以说是绑到连接上的,对同一服务器的其他连接不可见。
同一连接中不允许建立同名的排他队列的,这种排他优先于持久化,即使设置了队列持久化,在连接断开后,该队列也会自动删除。
非排他队列不依附于连接而存在,同一服务器上的多个连接都可以访问这个队列。
autoDelete 设置是否自动删除,为 true 则设置队列为自动删除。
自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
不能把这个参数错误地理解为:"当连接到此队列的所有客户端断开时,这个队列自动删除",因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
创建队列还有一个差不多的方法 (QueueDeclarePassive),他主要是假定队列已存在,并尝试连接到
不存在的队列将导致 RabbitMQ 引发异常,可用于检测队列的存在。
if err = channel.QueueBind("q1", "q1Key", "e1", true, nil); err != nil {
panic(err)
}
参数解析:
-
name 队列名称
-
key BindingKey 根据交换机类型来设定
-
exchange 交换机名称
-
noWait 是否等待服务器确认
-
args 其他Table参数
if err = channel.ExchangeBind("dest", "q1Key", "src", false, nil); err != nil {
panic(err)
}
参数解析:
-
destination 目的交换器
-
key RoutingKey 路由键
-
source 源交换器
-
noWait 是否等待服务器确认
-
args Table 其它参数
生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配的另一个交换器 destination,井把消息转发到 destination 中,进而存储在destination 绑定的队列 queue 中,某种程度上来说 destination 交换器可以看作一个队列。
if err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{
Timestamp: time.Now(),
ContentType: "text/plain",
Body: []byte("Hello Golang and AMQP(Rabbitmq)!"),
}); err != nil {
panic(err)
}
参数解析:
-
exchange 交换器名称
-
key RouterKey路由键
-
mandatory 是否为无法路由的消息进行返回处理
-
immediate 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃
-
msg 消息体
参数说明要点:
mandatory
消息发布的时候设置消息的 mandatory 属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式,设置为 true 表示将消息返回到生产者,否则直接丢弃消息。
immediate
参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。imrnediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递:如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。
RabbitMQ 3.0 版本开始去掉了对 imrnediate 参数的支持
Rabbitmq 消费方式共有 2 种,分别是推模式和拉模式:
推模式是通过持续订阅的方式来消费信息,Consume方法返回一个golang的通道来供消费者接收消息,直到Connection/Channel被关闭或取消;在接收期间,RabbitMQ 会不断地推送消息给消费者。
推送消息的个数会受到 channel.Qos 的限制
deliveries, err := channel.Consume("queue1", "any", false, false, false, true, nil)
if err != nil {
panic(err)
}
如果 ack 设置为 false 则表示需要手动进行 ack 消费
v, ok := <-deliveries
if ok {
// 手动ack确认
// 注意: 这里只要调用了ack就是手动确认模式,
// multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认
// 并不是表示设置为false就不进行当前ack确认
if err := v.Ack(true); err != nil {
fmt.Println(err.Error())
}
} else {
fmt.Println("Channel close")
}
参数解析:
-
queue 队列名称
-
consumer 消息者名称
-
autoAck 是否确认消费
-
exclusive 排他
-
noLocal RabbitMQ不支持,仅为占位参数满足协议
-
noWait 是否等待服务器确认
-
args 其他Table参数
参数说明要点:
noLocal
设置为 true 则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 中的消费者;RabbitMQ不支持该参数
相对来说比较简单,是由消费者主动拉取信息来消费,同样也需要进行 ack 确认消费
channel.Get(queue string, autoAck bool)
func Connection() (*amqp.Connection) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
return conn
}
func Sample() {
var wg sync.WaitGroup
wg.Add(1)
go SampleConsumption(&wg)
// 创建连接
connection := Connection()
defer connection.Close()
// 开启 channel
channel, err := connection.Channel()
if err != nil {
panic(err)
}
defer channel.Close()
if err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil); err != nil {
panic(err)
}
if _, err := channel.QueueDeclare("q1", true, false, false, true, nil); err != nil {
panic(err)
}
if err = channel.QueueBind("q1", "q1Key", "e1", true, nil); err != nil {
panic(err)
}
// mandatory true 未找到队列返回给消费者
returnChan := make(chan amqp.Return,0)
channel.NotifyReturn(returnChan)
// Publish
if err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{
Timestamp: time.Now(),
ContentType: "text/plain",
Body: []byte("Hello Golang and AMQP(Rabbitmq)!"),
}); err != nil {
panic(err)
}
//for v := range returnChan{
// fmt.Printf("Return %#v\n",v)
//}
wg.Wait()
}
func SampleConsumption(wg *sync.WaitGroup) {
connection := Connection()
defer connection.Close()
channel, err := connection.Channel()
if err != nil {
panic(err)
}
defer channel.Close()
deliveries, err := channel.Consume("q1", "any", false, false, false, true, nil)
if err != nil {
panic(err)
}
// 这里只取一条,因为product只发一条
v, ok := <-deliveries
if ok {
if err := v.Ack(true); err != nil {
fmt.Println(err.Error())
}
} else {
fmt.Println("Channel close")
}
wg.Done()
}
/*
当开启手动Ack模式时才起作用
Qos 限制客户端在确认消息之前可以接收到的消息数量(预读数量),默认不限制
如果队列中有大量消息且消费端处理消息较慢,应设置Qos加以限制,
否则客户端和RabbitMQ Server端都将耗费大量内存来保存预读的消息
prefetchCount 预读消息个数
prefetchSize 预读字节数
global true: Connection级别; false: Channel级别
*/
func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error {}
Qos设置最佳实践:
-
如果有一个或多个消费者且可以快速处理消息,消息预读数可以稍微大一些,尽量让消费者保持忙碌;
-
如果消息处理时间较为确定,且网络状态稳定,则只需将本次的总处理时间除以客户端上每个消息的处理时间((两次网络传输时间+处理时间)/处理时间),即可得出合适的预取值;
-
在有许多消费者且消息处理时间短的情况下,建议使用较低的预取值。太低的值会使消费者空闲,因为他们需要等待消息到达;过高的值则可能会使一个消费者忙碌,而其他消费者则处于空闲状态;
-
如果有很多消费者和/或较长的消息处理时间,建议将预取计数设置为1,以使消息平均分配给所有消费者;
-
注意,如果开启了自动Ack,预取值将无效;
-
一个典型的错误是:进行无限的预取,其中一个客户端接收所有消息,用完内存并崩溃,从而导致所有消息都被重新传递
refer: https://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html
注意:以下notify方法注册的chan在连接被关闭时会被close,重连时需要重新注册!
/*
NotifyClose 向server注册一个chan用于接收Connection/Channel关闭事件,
当关闭时,该chan也会被close
*/
func (ch *Channel) NotifyClose(c chan *Error) chan *Error {}
/*
NotifyFlow registers a listener for basic.flow methods sent by the server.
When `false` is sent on one of the listener channels, all publishers should
pause until a `true` is sent.
The server may ask the producer to pause or restart the flow of Publishings
sent by on a channel. This is a simple flow-control mechanism that a server can
use to avoid overflowing its queues or otherwise finding itself receiving more
messages than it can process. Note that this method is not intended for window
control. It does not affect contents returned by basic.get-ok methods.
When a new channel is opened, it is active (flow is active). Some
applications assume that channels are inactive until started. To emulate
this behavior a client MAY open the channel, then pause it.
Publishers should respond to a flow messages as rapidly as possible and the
server may disconnect over producing channels that do not respect these
messages.
basic.flow-ok methods will always be returned to the server regardless of
the number of listeners there are.
To control the flow of deliveries from the server, use the Channel.Flow()
method instead.
Note: RabbitMQ will rather use TCP pushback on the network connection instead
of sending basic.flow. This means that if a single channel is producing too
much on the same connection, all channels using that connection will suffer,
including acknowledgments from deliveries. Use different Connections if you
desire to interleave consumers and producers in the same process to avoid your
basic.ack messages from getting rate limited with your basic.publish messages.
*/
func (ch *Channel) NotifyFlow(c chan bool) chan bool {}
/*
NotifyReturn 向server注册一个chan用于接收 消息不可达事件,
chan中存放的是消息体的拷贝及一些附加的错误信息/消息不可达的原因
*/
func (ch *Channel) NotifyReturn(c chan Return) chan Return {}
/*
NotifyCancel 向server注册一个chan用于接收订阅被取消的事件,
包括queue被删除的事件和镜像模式下主节点下线的事件,
han中存放的是 subscription tag
*/
func (ch *Channel) NotifyCancel(c chan string) chan string {}
/*
NotifyConfirm calls NotifyPublish and starts a goroutine sending
ordered Ack and Nack DeliveryTag to the respective channels.
For strict ordering, use NotifyPublish instead.
*/
func (ch *Channel) NotifyConfirm(ack, nack chan uint64) (chan uint64, chan uint64) {}
/*
NotifyPublish registers a listener for reliable publishing. Receives from this
chan for every publish after Channel.Confirm will be in order starting with
DeliveryTag 1.
There will be one and only one Confirmation Publishing starting with the
delivery tag of 1 and progressing sequentially until the total number of
Publishings have been seen by the server.
Acknowledgments will be received in the order of delivery from the
NotifyPublish channels even if the server acknowledges them out of order.
The listener chan will be closed when the Channel is closed.
The capacity of the chan Confirmation must be at least as large as the
number of outstanding publishings. Not having enough buffered chans will
create a deadlock if you attempt to perform other operations on the Connection
or Channel while confirms are in-flight.
It's advisable to wait for all Confirmations to arrive before calling
Channel.Close() or Connection.Close().
*/
func (ch *Channel) NotifyPublish(confirm chan Confirmation) chan Confirmation {}