-
Notifications
You must be signed in to change notification settings - Fork 0
Home
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议。
RabbitMQ 是 amqp 协议的 Erlang 的实现。
AMQP 的模型架构的主要角色包括:生产者、消费者、交换器、队列。
生产者 (Producter) 即 消息投递方
消费者 (Consumer) 即 消息接收方
服务节点 (Broker) 消息的服务节点
生产者和消费者都属于客户端;服务节点属于服务端;
交换器、队列、绑定
绑定
Rabbitmq 中需要路由键和绑定键联合使用才能使生产者的消息成功投递到队列中去。
RoutingKey: 生产者发送给交换器绑定的 Key
BindingKey: 交换器和队列绑定的 Key
生产者将消息投递到交换器,通过交换器绑定的队列,最终投递到对应的队列中去。
交换器
Rabbitmq 共有 4 种交换器
fanout 把消息投递到所有与此交换器绑定的队列中
direct 把消息投递到 BindingKey 和 RoutingKey 完全匹配的队列中
topic 规则匹配,BindingKey 中存在两种特殊字符
- * 匹配零个或多个单词
- #匹配一个单词
header 不依赖于 RoutingKey 而是通过消息体中的 headers 属性来进行匹配绑定,通过 headers 中的 key 和 BindingKey 完全匹配,由于性能较差一般用的比较少。
在 Golang 中创建 rabbitmq 生产者基本步骤是:
连接 Connection
创建 Channel
创建或连接一个交换器
创建或连接一个队列
交换器绑定队列
投递消息
关闭 Channel
关闭 Connection
// connection
connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
// channel
channel, err := connection.Channel()
if err != nil {
panic(err)
}
// 创建一个交换器
if err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil); err != nil {
panic(err)
}
ExchangeDeclare参数解析:
-
name 交换机名称
-
kind 交换机类型
-
durable 持久化
-
autoDelete 是否自动删除
-
internal 是否是内置交换机
-
noWait 是否等待服务器确认
-
args 其它配置
ExchangeDeclare参数说明要点:
autoDelete:
自动删除功能必须要在交换器曾经绑定过队列或者交换器的情况下,处于不再使用的时候才会自动删除,如果是刚刚创建的尚未绑定队列或者交换器的交换器或者早已创建只是未进行队列或者交换器绑定的交换器是不会自动删除的。
internal:
内置交换器是一种特殊的交换器,这种交换器不能直接接收生产者发送的消息,只能作为类似于队列的方式绑定到另一个交换器,来接收这个交换器中路由的消息,内置交换器同样可以绑定队列和路由消息,只是其接收消息的来源与普通交换器不同。
noWait
当 noWait 为 true 时,声明时无需等待服务器的确认。
创建交换器还有一个差不多的方法 (ExchangeDeclarePassive),他主要是假定交换已存在,并尝试连接到
不存在的交换将导致 RabbitMQ 引发异常,可用于检测交换的存在。
if _, err := channel.QueueDeclare("q1", true, false, false, true, nil); err != nil {
panic(err)
}
QueueDeclare参数解析:
-
name 队列名称
-
durable 持久化
-
autoDelete 自动删除
-
exclusive 排他
-
noWait 是否等待服务器确认
-
args Table参数
QueueDeclare参数说明要点:
exclusive 排他
排他队列只对首次创建它的连接可见,排他队列是基于连接 (Connection) 可见的,并且该连接内的所有信道 (Channel) 都可以访问这个排他队列,在这个连接断开之后,该队列自动删除,由此可见这个队列可以说是绑到连接上的,对同一服务器的其他连接不可见。
同一连接中不允许建立同名的排他队列的,这种排他优先于持久化,即使设置了队列持久化,在连接断开后,该队列也会自动删除。
非排他队列不依附于连接而存在,同一服务器上的多个连接都可以访问这个队列。
autoDelete 设置是否自动删除,为 true 则设置队列为自动删除。
自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
不能把这个参数错误地理解为:"当连接到此队列的所有客户端断开时,这个队列自动删除",因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
创建队列还有一个差不多的方法 (QueueDeclarePassive),他主要是假定队列已存在,并尝试连接到
不存在的队列将导致 RabbitMQ 引发异常,可用于检测队列的存在。
if err = channel.QueueBind("q1", "q1Key", "e1", true, nil); err != nil {
panic(err)
}
参数解析:
-
name 队列名称
-
key BindingKey 根据交换机类型来设定
-
exchange 交换机名称
-
noWait 是否等待服务器确认
-
args 其他Table参数
if err = channel.ExchangeBind("dest", "q1Key", "src", false, nil); err != nil {
panic(err)
}
参数解析:
-
destination 目的交换器
-
key RoutingKey 路由键
-
source 源交换器
-
noWait 是否等待服务器确认
-
args Table 其它参数
生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配的另一个交换器 destination,井把消息转发到 destination 中,进而存储在destination 绑定的队列 queue 中,某种程度上来说 destination 交换器可以看作一个队列。
if err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{
Timestamp: time.Now(),
ContentType: "text/plain",
Body: []byte("Hello Golang and AMQP(Rabbitmq)!"),
}); err != nil {
panic(err)
}
参数解析:
-
exchange 交换器名称
-
key RouterKey路由键
-
mandatory 是否为无法路由的消息进行返回处理
-
immediate 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃
-
msg 消息体
参数说明要点:
mandatory
消息发布的时候设置消息的 mandatory 属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式,设置为 true 表示将消息返回到生产者,否则直接丢弃消息。
immediate
参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。imrnediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递:如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。
RabbitMQ 3.0 版本开始去掉了对 imrnediate 参数的支持
Rabbitmq 消费方式共有 2 种,分别是推模式和拉模式:
推模式是通过持续订阅的方式来消费信息,Consume方法返回一个golang的通道来供消费者接收消息,直到Connection/Channel被关闭或取消;在接收期间,RabbitMQ 会不断地推送消息给消费者。
推送消息的个数会受到 channel.Qos 的限制
deliveries, err := channel.Consume("queue1", "any", false, false, false, true, nil)
if err != nil {
panic(err)
}
如果 ack 设置为 false 则表示需要手动进行 ack 消费
v, ok := <-deliveries
if ok {
// 手动ack确认
// 注意: 这里只要调用了ack就是手动确认模式,
// multiple 表示的是在此channel中先前所有未确认的deliveries都将被确认
// 并不是表示设置为false就不进行当前ack确认
if err := v.Ack(true); err != nil {
fmt.Println(err.Error())
}
} else {
fmt.Println("Channel close")
}
参数解析:
-
queue 队列名称
-
consumer 消息者名称
-
autoAck 是否确认消费
-
exclusive 排他
-
noLocal RabbitMQ不支持,仅为占位参数满足协议
-
noWait 是否等待服务器确认
-
args 其他Table参数
参数说明要点:
noLocal
设置为 true 则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 中的消费者;RabbitMQ不支持该参数
相对来说比较简单,是由消费者主动拉取信息来消费,同样也需要进行 ack 确认消费
channel.Get(queue string, autoAck bool)
func Connection() (*amqp.Connection) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
return conn
}
func Sample() {
var wg sync.WaitGroup
wg.Add(1)
go SampleConsumption(&wg)
// 创建连接
connection := Connection()
defer connection.Close()
// 开启 channel
channel, err := connection.Channel()
if err != nil {
panic(err)
}
defer channel.Close()
if err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil); err != nil {
panic(err)
}
if _, err := channel.QueueDeclare("q1", true, false, false, true, nil); err != nil {
panic(err)
}
if err = channel.QueueBind("q1", "q1Key", "e1", true, nil); err != nil {
panic(err)
}
// mandatory true 未找到队列返回给消费者
returnChan := make(chan amqp.Return,0)
channel.NotifyReturn(returnChan)
// Publish
if err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{
Timestamp: time.Now(),
ContentType: "text/plain",
Body: []byte("Hello Golang and AMQP(Rabbitmq)!"),
}); err != nil {
panic(err)
}
//for v := range returnChan{
// fmt.Printf("Return %#v\n",v)
//}
wg.Wait()
}
func SampleConsumption(wg *sync.WaitGroup) {
connection := Connection()
defer connection.Close()
channel, err := connection.Channel()
if err != nil {
panic(err)
}
defer channel.Close()
deliveries, err := channel.Consume("q1", "any", false, false, false, true, nil)
if err != nil {
panic(err)
}
// 这里只取一条,因为product只发一条
v, ok := <-deliveries
if ok {
if err := v.Ack(true); err != nil {
fmt.Println(err.Error())
}
} else {
fmt.Println("Channel close")
}
wg.Done()
}
/*
Qos controls how many messages or how many bytes the server will try to keep on
the network for consumers before receiving delivery acks. The intent of Qos is
to make sure the network buffers stay full between the server and client.
With a prefetch count greater than zero, the server will deliver that many
messages to consumers before acknowledgments are received. The server ignores
this option when consumers are started with noAck because no acknowledgments
are expected or sent.
With a prefetch size greater than zero, the server will try to keep at least
that many bytes of deliveries flushed to the network before receiving
acknowledgments from the consumers. This option is ignored when consumers are
started with noAck.
When global is true, these Qos settings apply to all existing and future
consumers on all channels on the same connection. When false, the Channel.Qos
settings will apply to all existing and future consumers on this channel.
Please see the RabbitMQ Consumer Prefetch documentation for an explanation of
how the global flag is implemented in RabbitMQ, as it differs from the
AMQP 0.9.1 specification in that global Qos settings are limited in scope to
channels, not connections (https://www.rabbitmq.com/consumer-prefetch.html).
To get round-robin behavior between consumers consuming from the same queue on
different connections, set the prefetch count to 1, and the next available
message on the server will be delivered to the next available consumer.
If your consumer work time is reasonably consistent and not much greater
than two times your network round trip time, you will see significant
throughput improvements starting with a prefetch count of 2 or slightly
greater as described by benchmarks on RabbitMQ.
http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
*/
func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error {}
/*
NotifyClose registers a listener for when the server sends a channel or
connection exception in the form of a Connection.Close or Channel.Close method.
Connection exceptions will be broadcast to all open channels and all channels
will be closed, where channel exceptions will only be broadcast to listeners to
this channel.
The chan provided will be closed when the Channel is closed and on a
graceful close, no error will be sent.
*/
func (ch *Channel) NotifyClose(c chan *Error) chan *Error {}
/*
NotifyFlow registers a listener for basic.flow methods sent by the server.
When `false` is sent on one of the listener channels, all publishers should
pause until a `true` is sent.
The server may ask the producer to pause or restart the flow of Publishings
sent by on a channel. This is a simple flow-control mechanism that a server can
use to avoid overflowing its queues or otherwise finding itself receiving more
messages than it can process. Note that this method is not intended for window
control. It does not affect contents returned by basic.get-ok methods.
When a new channel is opened, it is active (flow is active). Some
applications assume that channels are inactive until started. To emulate
this behavior a client MAY open the channel, then pause it.
Publishers should respond to a flow messages as rapidly as possible and the
server may disconnect over producing channels that do not respect these
messages.
basic.flow-ok methods will always be returned to the server regardless of
the number of listeners there are.
To control the flow of deliveries from the server, use the Channel.Flow()
method instead.
Note: RabbitMQ will rather use TCP pushback on the network connection instead
of sending basic.flow. This means that if a single channel is producing too
much on the same connection, all channels using that connection will suffer,
including acknowledgments from deliveries. Use different Connections if you
desire to interleave consumers and producers in the same process to avoid your
basic.ack messages from getting rate limited with your basic.publish messages.
*/
func (ch *Channel) NotifyFlow(c chan bool) chan bool {}
/*
NotifyReturn registers a listener for basic.return methods. These can be sent
from the server when a publish is undeliverable either from the mandatory or
immediate flags.
A return struct has a copy of the Publishing along with some error
information about why the publishing failed.
*/
func (ch *Channel) NotifyReturn(c chan Return) chan Return {}
/*
NotifyCancel registers a listener for basic.cancel methods. These can be sent
from the server when a queue is deleted or when consuming from a mirrored queue
where the master has just failed (and was moved to another node).
The subscription tag is returned to the listener.
*/
func (ch *Channel) NotifyCancel(c chan string) chan string {}
/*
NotifyConfirm calls NotifyPublish and starts a goroutine sending
ordered Ack and Nack DeliveryTag to the respective channels.
For strict ordering, use NotifyPublish instead.
*/
func (ch *Channel) NotifyConfirm(ack, nack chan uint64) (chan uint64, chan uint64) {}
/*
NotifyPublish registers a listener for reliable publishing. Receives from this
chan for every publish after Channel.Confirm will be in order starting with
DeliveryTag 1.
There will be one and only one Confirmation Publishing starting with the
delivery tag of 1 and progressing sequentially until the total number of
Publishings have been seen by the server.
Acknowledgments will be received in the order of delivery from the
NotifyPublish channels even if the server acknowledges them out of order.
The listener chan will be closed when the Channel is closed.
The capacity of the chan Confirmation must be at least as large as the
number of outstanding publishings. Not having enough buffered chans will
create a deadlock if you attempt to perform other operations on the Connection
or Channel while confirms are in-flight.
It's advisable to wait for all Confirmations to arrive before calling
Channel.Close() or Connection.Close().
*/
func (ch *Channel) NotifyPublish(confirm chan Confirmation) chan Confirmation {}