rabbitmq延迟队列之php实现
延迟任务应用场景
场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。
场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。
实现方案
定时任务轮询数据库,看是否有产生新任务,如果产生则消费任务
pcntl_alarm为进程设置一个闹钟信号
swoole的异步高精度定时器:swoole_time_tick(类似javascript的setInterval)和swoole_time_after(相当于javascript的setTimeout)
rabbitmq延迟任务
以上四种方案,如果生产环境有使用到swoole建议使用第三种方案。此篇文章重点讲述第四种方案实现
Rabbitmq延迟队列实现
RabbitMQ没有直接去实现延迟队列这个功能。而是需要通过消息的TTL和死信Exchange这两者的组合来实现。
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。
可以通过设置消息的expiration字段或者队列x-message-ttl属性来设置时间,两者是一样的效果。下面例子是通过队列的ttl实现死信
$queue=newAMQPQueue($channel);$queue->setName($params['queueName']?:'');$queue->setFlags(AMQP_DURABLE);$queue->setArguments(array('x-dead-letter-exchange'=>'delay_exchange','x-dead-letter-routing-key'=>'delay_route','x-message-ttl'=>60000,));$queue->declareQueue();
当上面的消息扔到该队列中后,过了60秒,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去。单靠死信还不能实现延迟任务,还要靠Dead Letter Exchange。
Exchage的概念在这里就不在赘述,可以从这里进行了解。一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
2. 上面的消息的TTL到了,消息过期了。
3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
示例
生产者:
<?phpheader('Content-Type:text/html;charset=utf8;');$params=array('exchangeName'=>'test_cache_exchange','queueName'=>'test_cache_queue','routeKey'=>'test_cache_route',);$connectConfig=array('host'=>'localhost','port'=>5672,'login'=>'rabbitmq','password'=>'rabbitmq','vhost'=>'/');//var_dump(extension_loaded('amqp'));判断是否加载amqp扩展//exit();try{$conn=newAMQPConnection($connectConfig);$conn->connect();if(!$conn->isConnected()){//die('Conexiuneesuata');//TODO记录日志echo'rabbit-mq连接错误:',json_encode($connectConfig);exit();}$channel=newAMQPChannel($conn);if(!$channel->isConnected()){//die('Connectionthroughchannelfailed');//TODO记录日志echo'rabbit-mqConnectionthroughchannelfailed:',json_encode($connectConfig);exit();}$exchange=newAMQPExchange($channel);$exchange->setFlags(AMQP_DURABLE);//持久化$exchange->setName($params['exchangeName']?:'');$exchange->setType(AMQP_EX_TYPE_DIRECT);//direct类型$exchange->declareExchange();//$channel->startTransaction();$queue=newAMQPQueue($channel);$queue->setName($params['queueName']?:'');$queue->setFlags(AMQP_DURABLE);$queue->setArguments(array('x-dead-letter-exchange'=>'delay_exchange','x-dead-letter-routing-key'=>'delay_route','x-message-ttl'=>60000,));$queue->declareQueue();//绑定$queue->bind($params['exchangeName'],$params['routeKey']);}catch(Exception$e){}//$num=mt_rand(100,500);$num=1;//生成消息$exchange->publish("thisistestmessage..",$params['routeKey'],AMQP_MANDATORY,array('delivery_mode'=>2));
消费者:
<?phpheader('Content-Type:text/html;charset=utf8;');$params=array('exchangeName'=>'delay_exchange','queueName'=>'delay_queue','routeKey'=>'delay_route',);$connectConfig=array('host'=>'localhost','port'=>5672,'login'=>'rabbitmq','password'=>'rabbitmq','vhost'=>'/');//var_dump(extension_loaded('amqp'));//exit();try{$conn=newAMQPConnection($connectConfig);$conn->connect();if(!$conn->isConnected()){//die('Conexiuneesuata');//TODO记录日志echo'rabbit-mq连接错误:',json_encode($connectConfig);exit();}$channel=newAMQPChannel($conn);if(!$channel->isConnected()){//die('Connectionthroughchannelfailed');//TODO记录日志echo'rabbit-mqConnectionthroughchannelfailed:',json_encode($connectConfig);exit();}$exchange=newAMQPExchange($channel);$exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端$exchange->setName($params['exchangeName']?:'');$exchange->setType(AMQP_EX_TYPE_DIRECT);//direct类型$exchange->declareExchange();//$channel->startTransaction();$queue=newAMQPQueue($channel);$queue->setName($params['queueName']?:'');$queue->setFlags(AMQP_DURABLE);$queue->declareQueue();//绑定$queue->bind($params['exchangeName'],$params['routeKey']);}catch(Exception$e){echo$e->getMessage();exit();}functioncallback(AMQPEnvelope$message){global$queue;if($message){$body=$message->getBody();echo$body.PHP_EOL;$queue->ack($message->getDeliveryTag());}else{echo'nomessage'.PHP_EOL;}}//$queue->consume('callback');第一种消费方式,但是会阻塞,程序一直会卡在此处//第二种消费方式,非阻塞$start=time();while(true){$message=$queue->get();if(!empty($message)){echo$message->getBody();$queue->ack($message->getDeliveryTag());//应答,代表该消息已经消费$end=time();echo'<br>'.($end-$start);exit();}else{//echo'messagenotfound'.PHP_EOL;}}
这个示例注意要跟上一篇博文示例作对比rabbitmq以及php amqp扩展使用,最关键的点就是在生产者那里
$queue->setArguments(array('x-dead-letter-exchange'=>'delay_exchange','x-dead-letter-routing-key'=>'delay_route','x-message-ttl'=>60000,));
详细过程:
首先由正常队列(test_cache_queue)和正常exchange(test_cache_exchange),两者相绑定。
该正常队列设置了死信路由(delay_exchange)和死信路由key以及TTL,生产者生产消息到正常队列和正常路由上.
当正常队列设置TTL时间一到,那延迟消息就会自动发布到死信路由
消费者通过死信路由(delay_exchange)和死信队列(delay_queue)来消费
参考文章:
https://www.cnblogs.com/haoxinyue/p/6613706.html
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。