Rabbitmq如何保证数据传到消费不丢失

1.保证生产者投递消息成功

2.保证消息被消费者消费(消息持久化)

保证生产者投递消息成功

要保证生产者投递消息成功有两种方法,一种使用事务模式,一种使用confirm模式

使用事务模式必然会降低性能,因为每次投递消息都会启动一个事务

所以多数使用confirm模式 这也是生产环境使用最多的一个模式

启动这个模式也很简单,直接看代码

product.php

<?php
/**
 * Created by PhpStorm.
 * User: Administrator
 * Date: 2020/7/24
 * Time: 15:59
 */

require "vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'xxxx', 'xxxxx');
$channel = $connection->channel();

$exchangeName = 'test_confirm_exchange';
$queueName = 'test_confirm_queue';
$routeingKey = 'test_confirm';
$channel->exchange_declare($exchangeName, AMQP_EX_TYPE_DIRECT);
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, $routeingKey);

$msg = new AMQPMessage(
    "Hello World!",
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

$channel->confirm_select(); //开启确认模式
$channel->basic_publish($msg, $exchangeName, $routeingKey);

//注册ack回调 投递不成功
$channel->set_nack_handler(function (AMQPMessage $msg) {
    var_dump('nack');
});
//投递成功
$channel->set_ack_handler(function (AMQPMessage $msg) {
    var_dump("ack");
});
//等待接收ack
$channel->wait_for_pending_acks();

保证消息被消费者消费(消息持久化)

保证消息被消费的前提是消息要持久化 不能丢失

在上个例子product.php 中消息被设置了持久化,array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)

队列也设置了持久化队列 queue_declare($queueName, false, true, false, false); 第三个参数是设置队列持久化的设置

消息持久化后就是要保证消息被消费者正常的消费,消费完成后告诉rbmq已经消费完了,ack

具体代码展示

consume.php

<?php
/**
 * Created by PhpStorm.
 * User: Administrator
 * Date: 2020/7/24
 * Time: 15:59
 */

require "vendor/autoload.php";

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'xxxx', 'xxxx');
$channel = $connection->channel();

$exchangeName = 'test_confirm_exchange';
$queueName = 'test_confirm_queue';
$routeingKey = 'test_confirm';

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
//消费
$callback= function ($msg) {
    echo " [x] Received ", $msg->body, "\n";
    echo " [x] Done", "\n";
    //告诉rbmq处理完了
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//任务调度 多个消费者时每个消费者处理一个消息
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
    $channel->wait();
}
$channel->close();
$connection->close();
就这样基本完成了一个消息的稳定性了,适用了大多数的系统了,如果还要更高可用可用加集群