基础使用
我这里以tp5.1为示例。
我在服务器上直接composer一个tp5.1项目。
tp手册:https://www.kancloud.cn/manual/thinkphp5_1/353948
我们去composer搜下,amqp的依赖包。
这样我们调用amqp将会优雅美观许多,而且网上有这个包,我们没必要再自己撸一套吧?

看这个下载数这么高,就它了。
安装这个依赖
[root@localhost mqtest]# composer require php-amqplib/php-amqplib
依赖的加载啥的,可以去了解下tp5.1源码的自动加载。。这里就不再叙述。
安装完,在项目根目录有个这个:
ok,我们来测试下生产数据
<?php
namespace app\index\controller;
class Index
{
public static function sendMsg() {
$routing_key = 'queue';
$queue = 'queue';
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin', '/');
$channel = $connection->channel();
$channel->queue_declare($queue, false, false, false, false);
$data = [
'1' => 111,
'x' => 123
];
$msg = new \PhpAmqpLib\Message\AMQPMessage(json_encode($data));
$channel->basic_publish($msg, '', $routing_key);
$channel->close();
$connection->close();
echo " ok";
}
}

我们向mq里面发送几次数据。。我这边刷新了8下
登陆mq后台,我们可以看到,queue这个队列,写进了8条数据
写一个消费端,并用命令行去消费
我们写在同一个目录下
public function receiveMsg() {
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin', '/');
$channel = $connection->channel();
$channel->queue_declare('queue', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo " [x] Received ", "\n";
dump($msg->body);
echo "\n";
};
$channel->basic_consume('queue', '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
}
我们用ssh工具,进入到tp目录里的public[root@localhost public]# php index.php index/index/receiveMsg
执行消费端,看看输出什么结果:
文档更新时间: 2020-04-17 14:15 作者:young