防止消息丢失机制

上章节,我们了解mq的基础使用。但是那个代码,也是有不完整的地方。

我们来了解下消息丢失的处理机制。

生产者 confirm消息确认

AMQP消息协议提供了事务支持,不过事务机制会导致性能急剧下降,所以rabbitmq特别引入了confirm机制。
事务不能与confirm一起使用

我们先看下事务怎么写

$channel->tx_select();//开始事务
$channel->basic_publish($msg, '', 'task_queue');//发消息
$channel->tx_commit();//提交事务  当然还有回滚事务 $channel->tx_rollback();

当然,事务会降低性能,我们来看看消息确认怎么弄。

//异步回调消息确认 消息接受成功
$channel->set_ack_handler(
        function (AMQPMessage $message) {
            echo "Message acked with content " . $message->body . PHP_EOL;
        }
    );
//消息接收失败
$channel->set_nack_handler(
        function (AMQPMessage $message) {
            echo "Message nacked with content " . $message->body . PHP_EOL;
        }
    );

$channel->confirm_select();//开启消息确认
$channel->basic_publish($msg, '', 'task_queue');//发消息
$channel->wait_for_pending_acks();//阻塞等待消息确认

消息持久化

如果你没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。

queue_declare 的第三个参数改为 true 就是开启队列的持久化了,,然而,我们必须在生产和消费端都改掉。
$channel->queue_declare('new_queue', false, true, false, false);
值得注意的是,我们刚才已经创建了名为:queue的队列是非持久化的,现在创建一个持久化的,不能用相同名字。
所以我用了new_queue

队列持久化了,还得开启消息持久化。
我们在消息生产端(发送端加入这个参数)

//我这里没use 命名空间,看着很长,建议大家use命名空间
$msg = new \PhpAmqpLib\Message\AMQPMessage('Message content sent', ['delivery_mode' => \PhpAmqpLib\Message\AMQPMessage::DELIVERY_MODE_PERSISTENT]);
也就是把 delivery_mode 模式设置为2,持久化

消费端 消费确认

当我们消费端,处理一些比较耗时,比较奇葩的任务时。我们担心是否挂掉而引起数据丢失。
这时候我们就要 把basic_consume 的第四个参数关了,第四个参数是 no_ack 。no_ack 为 true时,不用消息确认,为false时要确认。
开启确认完后,我们处理完这条消息,要给mq服务通知一下 给mq服务发送一个ack。

        $callback = function($msg) {
            echo " [x] Received ", "\n";
            dump($msg->body);
            echo "\n";
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);//发送ack确认
        };
        //第四个参数 开启ack确认
        $channel->basic_consume('queue', '', false, false, false, false, $callback);

消息争抢、公平调度

当有个场景,我们生产者生产消息时,奇数数据大,偶数数据小,那么此时我们有两个消费端。消费奇数的很繁忙,累的狗吃屎,处理偶数的很轻松,啪叽就搞定。
此时我们mq还是照常一直发任务,啪叽啪叽,给我干哦,任务平均分配给你们了,干不完扣你们kpi了!!
当然,这样并不能为我们提供高性能处理效率。因为处理偶数的消费进程总是处于空闲期(贤者期)

此时,我们开启增强模式,mq也不再疯狂给安排活了,谁先干完活就给谁安排下一个任务。在消费端加个参数
$channel->basic_qos(null, 1, null);

这样的话就保障各个进程都处于饱和状态。

参考代码:
消息生产端:https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/php/new_task.php
消息消费端:https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/php/worker.php

文档更新时间: 2020-04-17 14:15   作者:young