Rabbitmq 实现延时队列

参考文章

https://www.cnblogs.com/mfrank/p/11260355.html 延时队列java
https://blog.csdn.net/why444216978/article/details/105881131 设置ttl过期时间

最主要的几个应用场景

  • 订单在十分钟之内未支付则自动取消。
  • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  • 账单在一周内未支付,则自动结算。
  • 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

主要就是实现一个延时通知

参考的文章是java的,在对队列进行配置时遇到小问题,官方库又没有给到例子,害

代码如下

product.php

<?php
/**
 * Created by PhpStorm.
 * User: Administrator
 * Date: 2020/7/23
 * Time: 12:00
 */
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

require_once __DIR__ . '/../vendor/autoload.php';
/**
 * 两个延时队列 A B
 * 两个死信队列 A B
 * 两个交换机 一个延时交换机 一个死信交换机
 *
 * 简单点 队列各一个好了
 */

$custom_msg = $_GET['msg'] ?? '默认消息';

$connect = new AMQPStreamConnection('localhost', '5672', 'xxxxx', 'xxxx');

$channel=$connect->channel();
//延时交换机
$delay_exchange_name = 'task_delay_exchange';
//延时队列
$delay_queue_name = 'task_delay_queue';
//key
$delay_routing_key = 'delay_routing';

//死信交换机
$dead_letter_exchange_name = 'task_dead_letter_exchange';
//死信队列
$dead_letter_queue_name = 'task_dead_letter_queue';
$dead_letter_routing_key = 'dead_letter_routing_key';

//可以针对队列做延时也可以对消息做延时 对消息处理可控度高点 这里使用队列延时

//声明延时交换机
$channel->exchange_declare($delay_exchange_name,AMQP_EX_TYPE_DIRECT);
//声明死信交换机
$channel->exchange_declare($dead_letter_exchange_name,AMQP_EX_TYPE_DIRECT);

$arguments =new \PhpAmqpLib\Wire\AMQPTable();
//array(
//    // x-message-ttl  声明队列的TTL
//    'x-message-ttl' => 15000, // 过期时间15秒
//    // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
//    'x-dead-letter-exchange' => $dead_letter_exchange_name,
//    // x-dead-letter-routing-key  这里声明当前队列的死信路由key
//    'x-dead-letter-routing-key' => $dead_letter_routing_key
//)
$arguments->set('x-message-ttl',15000);
$arguments->set('x-dead-letter-exchange',$dead_letter_exchange_name);
$arguments->set('x-dead-letter-routing-key',$dead_letter_routing_key);

//声明延时队列
$channel->queue_declare($delay_queue_name,false,true,false,true,false,$arguments);
//声明死信队列
$channel->queue_declare($dead_letter_queue_name);
//延时队列与延时交换机绑定
$channel->queue_bind($delay_queue_name, $delay_exchange_name,$delay_routing_key);
//死信队列与死信交换机绑定
$channel->queue_bind($dead_letter_queue_name, $dead_letter_exchange_name, $dead_letter_routing_key);

$msg = new AMQPMessage($custom_msg,array());
//往延时交换机发送消息
$channel->basic_publish($msg, $delay_exchange_name, $delay_routing_key);
echo "发送成功:{$custom_msg}" . PHP_EOL;
$channel->close();
$connect->close();

消费者 receive

<?php
/**
 * Created by PhpStorm.
 * User: Administrator
 * Date: 2020/7/6
 * Time: 10:46
 */
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', '5672', 'xxxx', 'xxxx');
$channel=$connection->channel();

//死信交换机
$dead_letter_exchange_name = 'task_dead_letter_exchange';
//死信队列
$dead_letter_queue_name = 'task_dead_letter_queue';

$dead_letter_routing_key = 'dead_letter_routing_key';

//声明死信交换机
$channel->exchange_declare($dead_letter_exchange_name, AMQP_EX_TYPE_DIRECT);

//声明死信队列
$channel->queue_declare($dead_letter_queue_name);

$channel->queue_bind($dead_letter_queue_name, $dead_letter_exchange_name,$dead_letter_routing_key);
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback=function ($msg){
    echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume($dead_letter_queue_name,'',false,false,false,false,$callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();