嗨客网搜索
Golang RabbitMQ Routing路由模式

在上一个教程中,我们构建了一个简单的日志系统。我们能够向许多接收者广播日志信息。在本教程中,我们将为其添加一个特性,我们将使其能够只订阅消息的一个子集。例如,我们将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

Bindings

在前面的示例中,我们已经创建了绑定。您可能会想起这样的代码:

err = ch.QueueBind( q.Name, // queue name "", // routing key "logs", // exchange false, nil, )

绑定是交换机和队列之间的关系。这可以简单地理解为:队列对来自此交换机的消息感兴趣。绑定还可以带一个额外的参数 routing_key,为了避免和 Channel.Publish 参数混淆,我们把这个参数叫做 binding key。我们可以这样创建一个带有键的绑定:

err = ch.QueueBind( q.Name, // queue name "black", // routing key "logs", // exchange false, nil, )

绑定键的含义取决于 exchange 类型。我们以前使用的 fanut 类型的交换机,那么此时的 routing key 参数就没有意义。

直连交换机

上一个教程中的日志系统将所有消息广播给所有使用者。我们希望扩展它,允许根据消息的严重性过滤消息。例如,我们可能希望将日志消息写入磁盘的脚本只接收严重错误,而不在警告或信息日志消息上浪费磁盘空间。我们使用的是扇形交换机,这并没有给我们太多的灵活性,它只能进行广播。

现在,我们将改用只连交换机。直连交换机背后的路由算法很简单,消息被发送到绑定密钥与消息的路由密钥完全匹配的队列。为了说明这一点,请考虑以下设置:

16_Go语言操作RabbitMQ路由模式.png

我们可以看到直连交换机 X,它绑定了两个队列。第一个队列用 orange 键绑定,第二个队列有两个绑定,一个绑定 black,另一个绑定 green。在这种设置中,使用路由键 orange 发布到 exchange 的消息将被路由到队列 Q1。路由密钥为 black 或 green 的消息将转到 Q2。所有其他消息都将被丢弃。

Multiple bindings

17_Go语言操作RabbitMQ路由模式.png

使用相同的 binging key 绑定多个队列是完全合法的。在我们的示例中,我们可以添加一个绑定,在 X 和 Q1 之间,绑定键为 black。在这种情况下,直接交换的行为将像 fanout 一样,并将消息广播到所有匹配的队列。routing key 为 black 的消息将同时发送到 Q1 和 Q2。

代码实现

发送日志

我们将在日志系统中使用直连交换机。我们将日志级别作为 routing key,这样,接收脚本将能够选择它想要接收的日志级别,一如既往,我们需要先创建一个 exchange:

err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments )

我们已经准备好发送信息:

err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) body := bodyFrom(os.Args) err = ch.Publish( "logs_direct", // exchange severityFrom(os.Args), // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })

为了简化问题,我们假设 “severity” 可以是 “info”、“warning”、“error” 之一。

订阅

接收消息的工作方式与上一个教程相同,只有一个例外,我们将为感兴趣的每个日志级别创建一个新绑定。

q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments ) if len(os.Args) < 2 { log.Printf("Usage: %s [info] [warning] [error]", os.Args[0]) os.Exit(0) } for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s) err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_direct", // exchange false, nil) }

完整代码

18_Go语言操作RabbitMQ路由模式.png

emit_log_direct.go 代码如下:

package main import ( "fmt" "github.com/streadway/amqp" "time" ) const( addr = "amqp://guest:guest@localhost:5672/" Exchange_Name = "logs_direct" ) var Log_Levels = []string{"error", "info", "error", "warning"} func main(){ fmt.Println("haicoder(www.haicoder.net)") var( conn *amqp.Connection channel *amqp.Channel err error ) //连接MQServer if conn, err = amqp.Dial(addr); err != nil{ fmt.Println("Connect RabbitMQ Err =", err, "Addr =", addr) return } //需要关闭 defer conn.Close() //创建一个Channel,所有的连接都是通过Channel管理的 if channel, err = conn.Channel(); err != nil{ fmt.Println("Create Channel Err =", err) return } defer channel.Close() //创建交换机 if err = channel.ExchangeDeclare( Exchange_Name, //name "direct", //type true, false, false, false, nil, ); err != nil{ fmt.Println("ExchangeDeclare Err =", err) return } //直接向交换机发送数据即可 for i := 0; i < 100; i++{ logLevel := Log_Levels[i%4] msg := fmt.Sprintf("Msg Level %s", logLevel) if err = channel.Publish(Exchange_Name, logLevel, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }); err != nil{ fmt.Println("Publish Err =", err) return } fmt.Println("Send msg ok, msg =", msg) time.Sleep(5*time.Second) } }

receive_logs_direct.go 代码如下:

package main import ( "fmt" "github.com/streadway/amqp" "time" ) const( recvAddr = "amqp://guest:guest@localhost:5672/" Receive_Exchange_Name = "logs_direct" ) var Receive_Log_Levels = []string{"error", "info", "error", "warning"} func main(){ fmt.Println("haicoder(www.haicoder.net)") var( conn *amqp.Connection channel *amqp.Channel queue amqp.Queue msgs <-chan amqp.Delivery err error ) //连接MQServer if conn, err = amqp.Dial(recvAddr); err != nil{ fmt.Println("Connect RabbitMQ Err =", err, "Addr =", recvAddr) return } //需要关闭 defer conn.Close() //创建一个Channel,所有的连接都是通过Channel管理的 if channel, err = conn.Channel(); err != nil{ fmt.Println("Create Channel Err =", err) return } defer channel.Close() //创建交换机 if err = channel.ExchangeDeclare( Receive_Exchange_Name, // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ); err != nil{ fmt.Println("ExchangeDeclare Err =", err) return } //创建队列 if queue, err = channel.QueueDeclare( "", //队列名 false, //持久的 false, // delete when unused false, //独占的 false, nil, ); err != nil{ fmt.Println("QueueDeclare Err =", err) return } //交换机绑定队列 if err = channel.QueueBind(queue.Name, Receive_Log_Levels[0], Receive_Exchange_Name, false, nil); err != nil{ fmt.Println("QueueBind Err =", err) return } //读取数据 if msgs, err = channel.Consume( queue.Name, // queue "", // consumer true, // 自动消息确认 false, // exclusive false, // no-local false, // no-wait nil, ); err != nil{ fmt.Println("Consume Err =", err) return } go func(){ for msg := range msgs{ fmt.Printf("Received a message: %s\n", msg.Body) } }() time.Sleep(100*time.Second) }

我们首先运行生产者,运行结果如下:

19_Go语言操作RabbitMQ路由模式.png

接下来,我们运行消费者,运行结果如下:

20_Go语言操作RabbitMQ路由模式.png

我们可以看到,生产者发送了多种日志级别的消息,而我们这里的消费者只接受了 error 级别的日志,这就是 binding key 的作用。

嗨客网顶部