Rabbitmq 实现延时队列

参考文章

https://www.cnblogs.com/mfrank/p/11260355.html 延时队列java
https://blog.csdn.net/why444216978/article/details/105881131 设置ttl过期时间

最主要的几个应用场景

  • 订单在十分钟之内未支付则自动取消。
  • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  • 账单在一周内未支付,则自动结算。
  • 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

主要就是实现一个延时通知

参考的文章是java的,在对队列进行配置时遇到小问题,官方库又没有给到例子,害

原理大概是这样的

往延时交换机发送消息,因为没有消费者处理消息,然后消息超时,就会被发送到声明延时队列的时候设置好的超时后要发送到的死信交换机上
所以实现延时队列就是消费那里只需要读取死信的队列就可以了

代码如下

product.php

<?php
/**
 * Created by PhpStorm.
 * User: Administrator
 * Date: 2020/7/23
 * Time: 12:00
 */
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

require_once __DIR__ . '/../vendor/autoload.php';
/**
 * 两个延时队列 A B
 * 两个死信队列 A B
 * 两个交换机 一个延时交换机 一个死信交换机
 *
 * 简单点 队列各一个好了
 */

$custom_msg = $_GET['msg'] ?? '默认消息';

$connect = new AMQPStreamConnection('localhost', '5672', 'xxxxx', 'xxxx');

$channel=$connect->channel();
//延时交换机
$delay_exchange_name = 'task_delay_exchange';
//延时队列
$delay_queue_name = 'task_delay_queue';
//key
$delay_routing_key = 'delay_routing';

//死信交换机
$dead_letter_exchange_name = 'task_dead_letter_exchange';
//死信队列
$dead_letter_queue_name = 'task_dead_letter_queue';
$dead_letter_routing_key = 'dead_letter_routing_key';

//可以针对队列做延时也可以对消息做延时 对消息处理可控度高点 这里使用队列延时

//声明延时交换机
$channel->exchange_declare($delay_exchange_name,AMQP_EX_TYPE_DIRECT);
//声明死信交换机
$channel->exchange_declare($dead_letter_exchange_name,AMQP_EX_TYPE_DIRECT);

$arguments =new \PhpAmqpLib\Wire\AMQPTable();
//array(
//    // x-message-ttl  声明队列的TTL
//    'x-message-ttl' => 15000, // 过期时间15秒
//    // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
//    'x-dead-letter-exchange' => $dead_letter_exchange_name,
//    // x-dead-letter-routing-key  这里声明当前队列的死信路由key
//    'x-dead-letter-routing-key' => $dead_letter_routing_key
//)
$arguments->set('x-message-ttl',15000);
$arguments->set('x-dead-letter-exchange',$dead_letter_exchange_name);
$arguments->set('x-dead-letter-routing-key',$dead_letter_routing_key);

//声明延时队列
$channel->queue_declare($delay_queue_name,false,true,false,true,false,$arguments);
//声明死信队列
$channel->queue_declare($dead_letter_queue_name);
//延时队列与延时交换机绑定
$channel->queue_bind($delay_queue_name, $delay_exchange_name,$delay_routing_key);
//死信队列与死信交换机绑定
$channel->queue_bind($dead_letter_queue_name, $dead_letter_exchange_name, $dead_letter_routing_key);

$msg = new AMQPMessage($custom_msg,array());
//往延时交换机发送消息
$channel->basic_publish($msg, $delay_exchange_name, $delay_routing_key);
echo "发送成功:{$custom_msg}" . PHP_EOL;
$channel->close();
$connect->close();

消费者 receive

<?php
/**
 * Created by PhpStorm.
 * User: Administrator
 * Date: 2020/7/6
 * Time: 10:46
 */
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', '5672', 'xxxx', 'xxxx');
$channel=$connection->channel();

//死信交换机
$dead_letter_exchange_name = 'task_dead_letter_exchange';
//死信队列
$dead_letter_queue_name = 'task_dead_letter_queue';

$dead_letter_routing_key = 'dead_letter_routing_key';

//声明死信交换机
$channel->exchange_declare($dead_letter_exchange_name, AMQP_EX_TYPE_DIRECT);

//声明死信队列
$channel->queue_declare($dead_letter_queue_name);

$channel->queue_bind($dead_letter_queue_name, $dead_letter_exchange_name,$dead_letter_routing_key);
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback=function ($msg){
    echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume($dead_letter_queue_name,'',false,false,false,false,$callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

如果要针对信息粒度进行延时的

//设置消息过期时间 单位豪秒
        $head = array_merge(array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT), ['expiration' => $ttl*1000]);
        $msg = new AMQPMessage($message, $head);

By cc

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注