资讯详情

php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)_PHP教程

1、AMQP_EX_TYPE_DIRECT:直连型

直连型还包括: 1对1 和1对N(N对1、 N对N)

674a7d41165c5273d4412aabbf5f4bcd.png

接收端receive.php代码如下

connect();

$channel = new AMQPChannel($connect);

$exchange = new AMQPExchange($channel);

$exchange->setName('exchange');

$exchange->setType(AMQP_EX_TYPE_DIRECT);

$exchange->declare();

$queue = new AMQPQueue($channel);

$queue->setName('logs');

$queue->declare();

$queue->bind('exchange', 'logs');

while (true) {

$queue->consume('callback');

}

$connection->close();

function callback($envelope, $queue) {

var_dump($envelope->getBody());

$queue->nack($envelope->getDeliveryTag());

}

发送端send.php代码如下

connect();

$channel = new AMQPChannel($connect);

$exchange = new AMQPExchange($channel);

$exchange->setName('exchange');

$exchange->setType(AMQP_EX_TYPE_DIRECT);

$exchange->declare();

$exchange->publish('direct type test','logs');

var_dump("Send Message OK");

$connect->disconnect();

如图所示

创建receive_one.php和receive_two.php 并把send.php将代码改为以下代码方便我们观看

receive_one.php 和 receive_two.php 代码相同 或者用dos运行多个接收端

connect();

$channel = new AMQPChannel($connect);

$exchange = new AMQPExchange($channel);

$exchange->setName('exchange');

$exchange->setType(AMQP_EX_TYPE_DIRECT);

$exchange->declare();

$queue = new AMQPQueue($channel);

$queue->setName('logs');

@$queue->declare();

$queue->bind('exchange', 'logs');

while (true) {

$queue->consume('callback');

}

$connection->close();

function callback($envelope, $queue) {

var_dump($envelope->getBody());

$queue->nack($envelope->getDeliveryTag());

}

send.php

connect();

$channel = new AMQPChannel($connect);

$exchange = new AMQPExchange($channel);

$exchange->setName('exchange');

$exchange->setType(AMQP_EX_TYPE_DIRECT);

$exchange->declare();

for ($index = 1; $index < 5; $index ) {

$exchange->publish($index,'logs');

var_dump("Send:$index");

}

$exchange->delete();

$connect->disconnect();

运行结果如下

这里的消息分配给每个接收端似乎很完美,但如果你想更好地处理不同的任务,你需要 公平调度

例如,当1、3处理简单的人时 2.4是复杂的任务 若任务过多 receive_one.php是空闲的而receive_two.php任务繁重

我们进行了以下测试

send.php改成5改成50

for ($index = 1; $index < 50; $index ) {

$exchange->publish($index,'logs');

var_dump("Send:$index");

}

receive_two.php 加上 sleep(3)

function callback($envelope, $queue) {

var_dump($envelope->getBody());

sleep(3);

$queue->nack($envelope->getDeliveryTag());

}

我们的操作程序结果如下

receive_one一切都完成了receive_two才运行一个 之后receive_one一直空闲

我们可以通过 设置在接收端

$channel->setPrefetchCount(1);

在任务完成之前,没有人接收新消息,并将消息发送给其他接收端

如下receive_one.php 和 receive_two.php

$channel = new AMQPChannel($connect);

改成如下

$channel = new AMQPChannel($connect);

$channel->setPrefetchCount(1);

http://www.bkjia.com/PHPjc/735888.htmlwww.bkjia.comtruehttp://www.bkjia.com/PHPjc/735888.htmlTechArticle1、AMQP_EX_TYPE_DIRECT:直连型 直连型还包括: 1对1 和1对N(N对1、 N对N) 接收端receive.php代码如下 connect();$channel = new AMQPChannel($connect);$exchange...

标签: fci连接器10075025

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台