rabbitmq php 消息队列安装教程

作者:袖梨 2022-11-14

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

1,安装rabbitmq,php扩展

# yum install php-pecl-amqp rabbitmq-server epel-release

2,启动

# /etc/init.d/rabbitmq-server start

如果要改什么配置的话,在/etc/rabbitmq 目录下新增文件:rabbitmq-env.conf,注意:文件名是固定的
RABBITMQ_NODE_IP_ADDRESS=192.168.10.208
RABBITMQ_NODE_PORT=5672
RABBITMQ_NODENAME=rabbit

3,启用监控模块


# cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.1.5/sbin
# ./rabbitmq-plugins enable rabbitmq_management //启用监控模块
The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_management

# /etc/init.d/rabbitmq-server restart //重启


4,php测试代码


class RabbitMq
{
public $configs = array('host'=>'192.168.10.208','port'=>5672,'username'=>'guest','password'=>'guest','vhost'=>'/');
//交换机名称
public $exchange_name = 'test-e';
//队列名称
public $queue_name = 'test-q';
//路由名称
public $route_key = 'test-r';
/*
* 持久化,默认true
*/
public $durable = true;
/*
* 自动删除
* exchange is deleted when all queues have finished using it
* queue is deleted when last consumer unsubscribes
*
*/
public $autodelete = true;
/*
* 镜像
* 镜像队列,打开后消息会在节点之间复制,有master和slave的概念
*/
public $mirror = false;

private $_conn = null;
private $_exchange = null;
private $_channel = null;
private $_queue = null;

/**
* 创建队列需要用到的虚拟主机、交换机
*/
public function __construct($configs = array(), $exchange_name = '', $queue_name = '', $route_key = '') {
if(!emptyempty($configs)){
$this->setConfigs($configs);
}
if(!emptyempty($exchange_name)){
$this->exchange_name = $exchange_name;
}
if(!emptyempty($queue_name)){
$this->queue_name = $queue_name;
}
if(!emptyempty($route_key)){
$this->route_key = $route_key;
}

$this->_conn = new AMQPConnection($this->configs);
$this->_conn->connect();

//创建channel
$this->_channel = new AMQPChannel($this->_conn);
//创建exchange交换机
$this->_exchange = new AMQPExchange($this->_channel);
$this->_exchange->setName($this->exchange_name);//创建名字
$this->_exchange->setType(AMQP_EX_TYPE_DIRECT);
$this->_exchange->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
$this->_exchange->declare();

//创建队列
$this->_queue = new AMQPQueue($this->_channel);
//设置队列名字 如果不存在则添加
$this->_queue->setName($this->queue_name);
$this->_queue->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
echo "queue status: ".$this->_queue->declare();

echo "
";
echo 'queue bind: '.$this->_queue->bind($this->exchange_name,$this->route_key);//将你的队列绑定到routingKey
//echo 'queue bind: '.$this->_queue->bind($this->exchange_name,'route.'.$timestr);//将你的队列绑定到routingKey
echo "
";

}

/**
*
* 向队列里面添加信息
*/
public function setRabbitMqInfo()
{
//发布消息
$message = json_encode(array('Hello World!','php','c++'));
$this->_channel->startTransaction();
echo "send: ".$this->_exchange->publish($message, $this->route_key); //将你的消息通过制定routingKey发送
//echo "send: ".$ex->publish($message, 'route.'.$timestr); //将你的消息通过制定routingKey发送
$this->_channel->commitTransaction();
//$this->_conn->disconnect();
}

/**
*
* 获取队列里的信息
*/
public function getRabbitMqInfo ()
{

// $this->_queue->consume(array($this,'processMessage')); //需手动应答
while($this->_queue->declare()){
$messages = $this->_queue->get(AMQP_AUTOACK) ;
if ($messages){
echo "

";  
print_r(json_decode($messages->getBody(),true))."n";
}
}

// disconnect
$this->_conn->disconnect();
}
}


set_time_limit(0);
include_once('rabbitmq.inc.php');

$ra = new RabbitMq();
$ra->setRabbitMqInfo();
$ra->getRabbitMqInfo();

//结果如下
[root@vpn208 ~]# php test.php
queue status: 0
queue bind: 1
send: 1

Array  
(
[0] => Hello World!
[1] => php
[2] => c++
)
命令行下调用,还是直接URL访问都是可以的.

相关文章

精彩推荐