1、AMQP_EX_TYPE_DIRECT:直连型
直连型还包括: 1对1 和1对N(N对1、 N对N)
接收端receive.php代码如下
connect();
$channel = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName('logs');
$queue->declare();
$queue->bind('exchange', 'logs');
while (true) {
$queue->consume('callback');
}
$connection->close();
function callback($envelope, $queue) {
var_dump($envelope->getBody());
$queue->nack($envelope->getDeliveryTag());
}
发送端send.php代码如下
connect();
$channel = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$exchange->publish('direct type test','logs');
var_dump("Send Message OK");
$connect->disconnect();
如图所示
创建receive_one.php和receive_two.php 并把send.php将代码改为以下代码方便我们观看
receive_one.php 和 receive_two.php 代码相同 或者用dos运行多个接收端
connect();
$channel = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName('logs');
@$queue->declare();
$queue->bind('exchange', 'logs');
while (true) {
$queue->consume('callback');
}
$connection->close();
function callback($envelope, $queue) {
var_dump($envelope->getBody());
$queue->nack($envelope->getDeliveryTag());
}
send.php
connect();
$channel = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);
$exchange->setName('exchange');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();
for ($index = 1; $index < 5; $index ) {
$exchange->publish($index,'logs');
var_dump("Send:$index");
}
$exchange->delete();
$connect->disconnect();
运行结果如下
这里的消息分配给每个接收端似乎很完美,但如果你想更好地处理不同的任务,你需要 公平调度
例如,当1、3处理简单的人时 2.4是复杂的任务 若任务过多 receive_one.php是空闲的而receive_two.php任务繁重
我们进行了以下测试
send.php改成5改成50
for ($index = 1; $index < 50; $index ) {
$exchange->publish($index,'logs');
var_dump("Send:$index");
}
receive_two.php 加上 sleep(3)
function callback($envelope, $queue) {
var_dump($envelope->getBody());
sleep(3);
$queue->nack($envelope->getDeliveryTag());
}
我们的操作程序结果如下
receive_one一切都完成了receive_two才运行一个 之后receive_one一直空闲
我们可以通过 设置在接收端
$channel->setPrefetchCount(1);
在任务完成之前,没有人接收新消息,并将消息发送给其他接收端
如下receive_one.php 和 receive_two.php
$channel = new AMQPChannel($connect);
改成如下
$channel = new AMQPChannel($connect);
$channel->setPrefetchCount(1);
http://www.bkjia.com/PHPjc/735888.htmlwww.bkjia.comtruehttp://www.bkjia.com/PHPjc/735888.htmlTechArticle1、AMQP_EX_TYPE_DIRECT:直连型 直连型还包括: 1对1 和1对N(N对1、 N对N) 接收端receive.php代码如下 connect();$channel = new AMQPChannel($connect);$exchange...