嗨客网搜索
RabbitMQ延迟队列

什么是延迟队列

延迟队列存储的对象肯定是对应的延时消息,所谓 “延时消息” 是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

场景一:在订单系统中,一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。

场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到智能设备。

TTL加DLX实现延迟队列

AMQP 协议和 RabbitMQ 队列本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。但是我们可以通过 RabbitMQ 的两个特性来曲线实现延迟队列:

Time To Live(TTL)

RabbitMQ 可以针对 Queue 设置 x-expires 或者针对 Message 设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为 dead letter(死信)。

RabbitMQ 针对队列中的消息过期时间有两种方法可以设置:

  1. 通过队列属性设置,队列中所有消息都有相同的过期时间。
  2. 对消息进行单独设置,每条消息 TTL 可以不同。

如果同时使用,则消息的过期时间以两者之间 TTL 较小的那个数值为准。消息在队列的生存时间一旦超过设置的 TTL 值,就成为 dead letter。

Dead Letter Exchanges(DLX)

RabbitMQ 的 Queue 可以配置 x-dead-letter-exchange 和 x-dead-letter-routing-key(可选)两个参数,如果队列内出现了 dead letter,则按照这两个参数重新路由转发到指定的队列。

  • x-dead-letter-exchange:出现 dead letter 之后将 dead letter 重新发送到指定 exchange。
  • x-dead-letter-routing-key:出现 dead letter 之后将 dead letter 重新按照指定的 routing-key 发送。

队列出现 dead letter 的情况有:

  • 消息或者队列的 TTL 过期。
  • 队列达到最大长度。
  • 消息被消费端拒绝(basic.reject or basic.nack)并且 requeue=false。

综合上述两个特性,设置了 TTL 规则之后当消息在一个队列中变成死信时,利用 DLX 特性它能被重新转发到另一个 Exchange 或者 Routing Key,这时候消息就可以重新被消费了。

设置方法

第一步:设置 TTL 产生死信,有两种方式 Per-Message TTL 和 Queue TTL,第一种可以针对每一条消息设置一个过期时间使用于大多数场景,第二种针对队列设置过期时间、适用于一次性延时任务的场景。

还有其他产生死信的方式比如消费者拒绝消费 basic.reject 或者 basic.nack ( 前提要设置消费者的属性 requeue=false)。

Per-Message TTL (对每一条消息设置一个过期时间),java client 发送一条只能驻留 60 秒的消息到队列:

byte[] messageBodyBytes = "Hello, world!".getBytes(); AMQP.BasicProperties properties = new AMQP.BasicProperties(); properties.setExpiration("60000");//设置消息的过期时间为60秒 channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes); //这条消息发送到相应的队列之后,如果60秒内没有被消费,则变为死信

Queue TTL (对整个队列设置一个过期时间),创建一个队列,队列的消息过期时间为 30 分钟(这个队列30分钟内没有消费者消费消息则删除,删除后队列内的消息变为死信):

Map<String, Object> args = new HashMap<String, Object>(); args.put("x-expires", 1800000); channel.queueDeclare("myqueue", false, false, false, args); rabbitmqctl命令方式(.* 为所有队列, 可以替换为指定队列): rabbitmqctl set_policy expiry ".*" '{"expires":1800000}' --apply-to queues rabbitmqctl (Windows): rabbitmqctl set_policy expiry ".*" "{""expires"":1800000}" --apply-to queues

第二步:设置死信的转发规则(如果没有任何规则,则直接丢弃死信)

Dead Letter Exchanges 设置方法:

//声明一个直连模式的exchange channel.exchangeDeclare("some.exchange.name", "direct"); //声明一个队列,当myqueue队列中有死信产生时,会转发到交换器some.exchange.name Map<String, Object> args = new HashMap<String, Object>(); args.put("x-dead-letter-exchange", "some.exchange.name"); //如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key //args.put("x-dead-letter-routing-key", "some-routing-key"); channel.queueDeclare("myqueue", false, false, false, args);

插件实现延迟队列

在 rabbitmq 3.5.7 及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖 Erlang/OPT 18.0 及以上。

插件源码地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

插件下载地址:

https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

安装

进入插件安装目录{rabbitmq-server}/plugins/(可以查看一下当前已存在的插件)。这里,我们首先下载插件:

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange (关闭插件) rabbitmq-plugins disable rabbitmq_delayed_message_exchange

插件使用

通过声明一个 x-delayed-message 类型的 exchange 来使用 delayed-messaging 特性,x-delayed-message 是插件提供的类型,并不是 rabbitmq 本身的:

// ... elided code ... Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args); // ... more code ...

发送消息的时候通过在 header 添加 “x-delay” 参数来控制消息的延时时间:

// ... elided code ... byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8"); Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x-delay", 5000); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes); // ... more code ...

案例

消息发送端:

import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { // 队列名称 private final static String EXCHANGE_NAME="delay_exchange"; private final static String ROUTING_KEY="key_delay"; @SuppressWarnings("deprecation") public static void main(String[] argv) throws Exception { /** * 创建连接连接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.12.190"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); // 声明x-delayed-type类型的exchange Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true, false, args); Map<String, Object> headers = new HashMap<String, Object>(); //设置在2016/11/04,16:45:12向消费端推送本条消息 Date now = new Date(); Date timeToPublish = new Date("2016/11/04,16:45:12"); String readyToPushContent = "publish at " + sf.format(now) + " \t deliver at " + sf.format(timeToPublish); headers.put("x-delay", timeToPublish.getTime() - now.getTime()); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder() .headers(headers); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props.build(), readyToPushContent.getBytes()); // 关闭频道和连接 channel.close(); connection.close(); } }

消息接收端:

import java.text.SimpleDateFormat; import java.util.Date; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class Recv { // 队列名称 private final static String QUEUE_NAME = "delay_queue"; private final static String EXCHANGE_NAME="delay_exchange"; public static void main(String[] argv) throws Exception, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.12.190"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.queueDeclare(QUEUE_NAME, true,false,false,null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicConsume(QUEUE_NAME, true, queueingConsumer); SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); try { System.out.println("****************WAIT***************"); while(true){ QueueingConsumer.Delivery delivery = queueingConsumer .nextDelivery(); // String message = (new String(delivery.getBody())); System.out.println("message:"+message); System.out.println("now:\t"+sf.format(new Date())); } } catch (Exception exception) { exception.printStackTrace(); } } }
嗨客网顶部