基础使用

我这里以tp5.1为示例。

我在服务器上直接composer一个tp5.1项目。
tp手册:https://www.kancloud.cn/manual/thinkphp5_1/353948

我们去composer搜下,amqp的依赖包。
这样我们调用amqp将会优雅美观许多,而且网上有这个包,我们没必要再自己撸一套吧?

看这个下载数这么高,就它了。
安装这个依赖

[root@localhost mqtest]# composer require php-amqplib/php-amqplib

依赖的加载啥的,可以去了解下tp5.1源码的自动加载。。这里就不再叙述。

安装完,在项目根目录有个这个:

ok,我们来测试下生产数据

<?php
namespace app\index\controller;

class Index
{

        public static function sendMsg() {
        $routing_key = 'queue';
        $queue = 'queue';
        $connection = new \PhpAmqpLib\Connection\AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin', '/');
        $channel = $connection->channel();
        $channel->queue_declare($queue, false, false, false, false);
        $data = [
            '1' => 111,
            'x' => 123
        ];
        $msg = new \PhpAmqpLib\Message\AMQPMessage(json_encode($data));
        $channel->basic_publish($msg, '', $routing_key);
        $channel->close();
        $connection->close();
        echo " ok";
    }
}

我们向mq里面发送几次数据。。我这边刷新了8下
登陆mq后台,我们可以看到,queue这个队列,写进了8条数据

写一个消费端,并用命令行去消费

我们写在同一个目录下

        public function receiveMsg() {
        $connection = new \PhpAmqpLib\Connection\AMQPStreamConnection('127.0.0.1', 5672, 'admin', 'admin', '/');
        $channel = $connection->channel();
        $channel->queue_declare('queue', false, false, false, false);
        echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
        $callback = function($msg) {
            echo " [x] Received ", "\n";
            dump($msg->body);
            echo "\n";
        };
        $channel->basic_consume('queue', '', false, true, false, false, $callback);
        while (count($channel->callbacks)) {
            $channel->wait();
        }
    }

我们用ssh工具,进入到tp目录里的public
[root@localhost public]# php index.php index/index/receiveMsg

执行消费端,看看输出什么结果:

文档更新时间: 2020-04-17 14:15   作者:young