轻松上手 PHP RabbitMQ 消息发布与订阅 PHP RabbitMQ 发布于2019-09-07 / 更新于2021-01-02 07:04
场景 之前开发一个电竞比分网系统,有许多模块依赖实时比赛状态(待开始、进行中、已结束、异常),比赛状态 进行中->已结束
由图像识别处理,识别到比赛结束后向消息队列发送某场比赛的状态信息,其他模块只需订阅队列消息获取比赛状态更新并进行对于逻辑处理
RabbitMQ 概念 RabbitMQ 安装运行 $ docker pull rabbitmq:3.8.3-management
$ docker run --name rabbitmq -d -p 5672 :5672 -p 15672 :15672 -v /data:/var/lib/rabbitmq rabbitmq:3.8.3-management
添加管理员 命令行$ docker exec -it 89e8e968aebc bash
root@89e8e968aebc:/# rabbitmqctl add_user ar414 ar414
root@89e8e968aebc:/# rabbitmqctl set_user_tags ar414 administrator
Web管理端
添加vhost
PHP 简单使用 安装 $ composer require php-amqplib/php-amqplib
发布者 <?php
require_once __DIR__ . '/../vendor/autoload.php' ;
use PhpAmqpLib\ Connection\ AMQPStreamConnection ;
use PhpAmqpLib\ Message\ AMQPMessage ;
$exchange = 'Gaming' ;
$connection = new AMQPStreamConnection ( '127.0.0.1' , 5672 , 'ar414' , 'ar414' , 'test' ) ;
$channel = $connection - > channel ( ) ;
$channel - > exchange_declare ( $exchange , 'direct' , false , false , false ) ;
for ( $i = 0 ; $i < 100 ; $i ++ ) {
$routes = [ 'dota' , 'csgo' , 'lol' ] ;
$key = array_rand ( $routes ) ;
$arr = [
'match_id' = > $i ,
'status' = > rand ( 0 , 3 )
] ;
$data = json_encode ( $arr ) ;
$msg = new AMQPMessage ( $data ) ;
$channel - > basic_publish ( $msg , $exchange , $routes [ $key ] ) ;
echo '发送 ' . $routes [ $key ] . ' 消息: ' . $data . PHP_EOL ;
}
$channel - > close ( ) ;
$connection - > close ( ) ;
订阅者 <?php
require_once __DIR__ . '/../vendor/autoload.php' ;
use PhpAmqpLib\ Connection\ AMQPStreamConnection ;
$exchange = 'Gaming' ;
$routerKey = 'lol' ; //只订阅LOL消息
$connection = new AMQPStreamConnection ( '127.0.0.1' , 5672 , 'ar414' , 'ar414' , 'test' ) ;
$channel = $connection - > channel ( ) ; $channel - > exchange_declare ( $exchange , 'direct' , false , false , false ) ;
list ( $queueName , , ) = $channel - > queue_declare ( "" , false , false , true , false ) ;
$channel - > queue_bind ( $queueName , $exchange , $routerKey ) ;
echo " 等待消息中..." . PHP_EOL ;
$callback = function ( $msg ) {
echo '接收到消息:' , $msg - > delivery_info [ 'routing_key' ] , ':' , $msg - > body , PHP_EOL ;
sleep ( 1 ) ; //模拟耗时执行
} ;
$channel - > basic_consume ( $queueName , '' , false , true , false , false , $callback ) ;
while ( $channel - > is_consuming ( ) ) {
$channel - > wait ( ) ;
}
$channel - > close ( ) ;
$connection - > close ( ) ;
运行 1. 运行某一个订阅者程序监听LOL消息队列(LolSub.php) 2. 运行发送者程序(Send.php)
发送者
$ php Send.php
LOL订阅者
$ php LolSub.php
讨论