一,什么是消息队列

介绍

消息队列,简写mq 全称:message queue;
消息队列是典型的,生产、消费模型。
生产者(消息发送者)负责向队列中生产消息,消费者负责从队列中获取消息。

消息队列就是一种先进先出的数据结构。
例如,我们向MQ生产(发送)几条消息
push:msg1
push:msg2
push:msg3

那么消费端,取数据时的顺序也是
pull: msg1
pull: msg2
pull: msg3

mq解决什么问题

应用解耦

场景:我们系统注册完,要给用户发一封欢迎的邮件。
这样我们的代码是注册模块注册成功后,给调用邮件发送模块。

register(){
    //逻辑
    $uid = 逻辑;
    self::sendMail($uid);
}

这时候产品经理说注册成功还得给用户发个优惠券 我们在调用发送优惠券

register(){
    //逻辑
    $uid = 逻辑;
    self::sendMail($uid);
    self::sendCoupon($uid);
}

上面假设发邮件和发优惠券都是通过用户uid去发送。
ok,突然产品脑洞一开,又要加东西。

register(){
    //逻辑
    $uid = 逻辑;
    self::sendMail($uid);
    self::sendCoupon($uid);
    self::sendXXX($uid);//奇葩需求XXX
    self::sendAAA($uid);//奇葩需求AAA
}

突然某一天,发送邮件的系统出问题了(有可能用qq邮箱发的,限制条数了).
那么到这里有可能这个用户就注册不了。或者注册成功了,但是邮件,优惠券以及后面的奇葩需求执行不下去。

这样耦合相对大。我们用mq怎么解耦?

register(){
    //逻辑
    $uid = 逻辑;
    self::pushMq($uid);//把业务uid 发到消息队列去 生产
}

sendMail(){
    $uid = self::pullMq();//取到uid 发送邮件 消费
}

sendCoupon(){
    $uid = self::pullMq();//取到uid 发送优惠券 消费
}
....以及其他各种奇葩需求调用

这样,我们注册逻辑成功后只管往队列push生产消息进去。
而其他逻辑则只管从队列里pull消息出来使用消费

削峰

例如我之前在 某药网买药。
下单成功不是跳支付,而是发邮件和短信,让你打开url支付。
那么模块就是 A 下单,B通知。
1个模块吞吐为X
2个模块就是2X。
那么当并发来时,A吞吐不够,很容易就跑不动了
此时,如果有用mq,我们只需要跑模块A,执行下单。异步跑通知。
这样我们可能短信和邮件发的慢一点,但是确保了我们有发短信,不至于挂掉。

消息分发

假设A生产数据。B和C对A的数据感兴趣。那么A生产完数据,要调用下B和C。
过两天需求来了↓↓↓↓
D也要调用下,
E也调用下谢谢。
B不调用了,帮我注释下。

这样每当有要加数据,减数据,A的维护者都要在那边啪叽啪叽改来改去,虽然小事,但是烦不烦人?
这样A把消息丢到mq里面,谁对A的数据感兴趣谁自己去监听。

异步

A模块要做一个处理,需要处理A1,A2,A3,A4这么多事情。
A1处理要10ms,A2要30ms,A3需要600ms,A4需要2000ms
总共需要2640ms。
那我们可以让他处理 A1,剩下的丢队列里。或者A1,A2这种短耗时的的直接执行。
A3与A4长耗时我们可以丢队列,异步去处理他。
这样接口返回时间,就能从2640ms变成最快的10ms(A1)或者40ms(A1+A2)
当然A3,和A4消耗的时间我们没优化,实际上另外的2600ms也是需要的,只不过客户端感觉到的是40ms

带来的坑

系统可用性降低

本来只需考虑本身自己的可用性,引入mq后,mq挂了怎么办?

系统复杂度提高。

加入mq,消息丢失怎么办?重复消费怎么办?消费顺序怎么保证?

一致性问题

a处理完返回成功了,调用者以为请求成功了。可是BC成功,D失败了怎么办,数据不一致了。

填坑

消息丢失怎么办?

消息丢失可能出现3个地方
1.生产者把消息发给MQ时丢了。
处理方式一:生产者发送数据前开启mq事务,如果mq没收到消息则报错。此时回滚事务。重发。。 事务会影响吞吐量
处理方式二:confirm模式,生产者开启confirm模式,每次写的时候,分配唯一id。然后写入mq中,mq会回传ack消息,告诉你消息OK了。如果没能成功则mq会回掉nack接口,告诉你失败了。
一般都是使用 confirm机制来避免诗句丢失。

2.mq弄丢数据
这时候,我们得开启mq的数据持久化,这样哪怕mq挂了,也能像redis一样,开起来就读数据。
设置持久化两步骤
第一步,创建queue时,将其设置为持久化,这样可以保证mq持久化队列元数据。但他不会持久化queue里面的数据。
第二部,发送消息时,设置deliveryMode为2 。此时mq消息也会持久化到磁盘。
必须要开启这两个持久化才行。mq挂了,哪怕再次重启,也没问题。
持久化+confirm机制,基本也就没啥大事了。

3.消费端丢失。
RabbitMQ如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ认为你都消费了,这数据就丢了。

这个时候得用RabbitMQ提供的ack机制,简单来说,就是你关闭RabbitMQ自动ack,可以通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那RabbitMQ就认为你还没处理完,这个时候RabbitMQ会把这个消费分配给别的consumer去处理,消息是不会丢的。

消费顺序

按道理来说,mq上没的搞顺序依赖,简单的办法是单消费关系。
如果多消费,,那就根据这博客
https://www.cnblogs.com/dwxblogs/p/10951393.html

重复消费

分两大类,消费者重复发送,2mq向消费者投递时,重复投递
终极解决,幂等性。
1.乐观锁,唯一索引等可以处理。
2.缓存锁。

参考:
https://www.jianshu.com/p/1ae35123329e
https://www.jianshu.com/p/f40cd4d6737e
https://www.cnblogs.com/dwxblogs/

文档更新时间: 2020-04-17 14:15   作者:young