Rabbitmq如何保证数据传到消费不丢失
1.保证生产者投递消息成功
2.保证消息被消费者消费(消息持久化)
保证生产者投递消息成功
要保证生产者投递消息成功有两种方法,一种使用事务模式,一种使用confirm模式
使用事务模式必然会降低性能,因为每次投递消息都会启动一个事务
所以多数使用confirm模式 这也是生产环境使用最多的一个模式
启动这个模式也很简单,直接看代码
product.php
<?php
/**
* Created by PhpStorm.
* User: Administrator
* Date: 2020/7/24
* Time: 15:59
*/
require "vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'xxxx', 'xxxxx');
$channel = $connection->channel();
$exchangeName = 'test_confirm_exchange';
$queueName = 'test_confirm_queue';
$routeingKey = 'test_confirm';
$channel->exchange_declare($exchangeName, AMQP_EX_TYPE_DIRECT);
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, $routeingKey);
$msg = new AMQPMessage(
"Hello World!",
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->confirm_select(); //开启确认模式
$channel->basic_publish($msg, $exchangeName, $routeingKey);
//注册ack回调 投递不成功
$channel->set_nack_handler(function (AMQPMessage $msg) {
var_dump('nack');
});
//投递成功
$channel->set_ack_handler(function (AMQPMessage $msg) {
var_dump("ack");
});
//等待接收ack
$channel->wait_for_pending_acks();
保证消息被消费者消费(消息持久化)
保证消息被消费的前提是消息要持久化 不能丢失
在上个例子product.php 中消息被设置了持久化,array(‘delivery_mode’ => AMQPMessage::DELIVERY_MODE_PERSISTENT)
队列也设置了持久化队列 queue_declare($queueName, false, true, false, false); 第三个参数是设置队列持久化的设置
消息持久化后就是要保证消息被消费者正常的消费,消费完成后告诉rbmq已经消费完了,ack
具体代码展示
consume.php
<?php
/**
* Created by PhpStorm.
* User: Administrator
* Date: 2020/7/24
* Time: 15:59
*/
require "vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'xxxx', 'xxxx');
$channel = $connection->channel();
$exchangeName = 'test_confirm_exchange';
$queueName = 'test_confirm_queue';
$routeingKey = 'test_confirm';
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
//消费
$callback= function ($msg) {
echo " [x] Received ", $msg->body, "\n";
echo " [x] Done", "\n";
//告诉rbmq处理完了
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//任务调度 多个消费者时每个消费者处理一个消息
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();