1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| <?php
namespace service\entry;
use common\components\BaseServer; use common\library\Helper; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; use Yii;
class RemoteOpenDoor extends BaseServer { const EXPIRE = 5; const QUEUE_NAME = "remote_open_door"; public $devSn,$schoolId;
private $channel; private $connection,$replyQueue,$corrId,$response,$params; private $door = [1,2];
public function init(){ $amqp = Yii::$app->params['rabbitMQ']; $this->connection = new AMQPStreamConnection($amqp['host'],$amqp['port'],$amqp['user'],$amqp['password'],$amqp['vhost']); if(!$this->connection->isConnected()){ $this->setError(10003,'连接失败'); return false; } $this->channel = $this->connection->channel();
$arguments = new AMQPTable(); $arguments->set("x-message-ttl",10000); $this->channel->exchange_declare(self::QUEUE_NAME,'topic',false,false,false); list($this->replyQueue, ,) = $this->channel->queue_declare("",false,false,false,true,false); $callback = function(AMQPMessage $rep){ if($rep->get('correlation_id') == $this->corrId) { $this->response = $rep->body; } };
$this->channel->basic_consume( $this->replyQueue,'',false,false,false,false,$callback); }
public function __construct(){ $this->init(); }
protected function buildParams(){ $params = [ 'expire'=>intval(time() + self::EXPIRE), 'devSn'=>intval($this->devSn), 'door'=>$this->door, ];
$this->params = json_encode($params,true); }
public function open(){ $this->buildParams(); $this->corrId = uniqid(); $this->response = null; $properties = ['correlation_id'=>$this->corrId,'reply_to'=>$this->replyQueue]; $message = new AMQPMessage($this->params,$properties); $this->channel->basic_publish($message,self::QUEUE_NAME,sprintf('school.%d',$this->schoolId));
try{ $this->channel->wait(null,false,self::EXPIRE); }catch (\Exception $exception){ $this->setError(10003,'开门失败'); return false; }
$rs = json_decode($this->response,true); if (!is_array($rs['failDoor']) && (!$rs['failDoor'] || $rs['failDoor']!=$this->door)){ return true; }
$this->setError(10003,'开门失败'); return false; } }
|