因为PHP-Resque 的重试部分需要自己写,网上又没啥轮子,而且resque也已经很久不更新了,所以自己研究下resque的源码,然后也借鉴了Laravel的队列重试机制,实现了PHP-Resque的重试机制。

Resque地址

https://github.com/chrisboulton/php-resque

设计思路

1.这里需要阅读resque源码,resque的worker把失败的队列的数据都放在了"resque:failed"列表,数据如下。

2.我的项目是在yii2下统一处理resque的,打算开一个守护进程不断重试这些失败的队列。
3.我直接resque源码在新增了retry方法,在每一个failJob的payload增加了attempts尝试次数,
一开始创建的时候attempts为0,重试之后attempts每次加1然后进行retry,直到到达尝试数次才停止重试。

核心代码
调用重试的代码

/** * Retry Job */ public function actionRetry() { $redis = new Redis('redis'); while (true) { $json = $redis->rPop('resque:failed'); $array = json_decode($json, true); if ($array) { $jobArray = $array['payload']; if ($jobArray['attempts'] < $this->retryTimes) { //retry \Resque::retry($jobArray, $array['queue']); echo "Queued job " . $jobArray['id'] . ' has retry!' . "\n"; } else { //stop retry $redis->lPush('resque:failed', [$json]); } } //take a sleep echo "*** Sleeping for ".$this->sleep. "\n"; sleep($this->sleep); } return true; }

这里可以弄成守护进程一直处理失败的队列任务。

/php-resque/lib/Resque.php

/** * retry job and save it to the specified queue. * * @param array $jobArray The attempts of the the job . * array( * 'id'=> The id of the job * 'class' => The name of the class that contains the code to execute the job. * 'args' => Any optional arguments that should be passed when the job is executed.'' * 'attempts'=> The retry attempts of the job * ) * @param string $queue The name of the queue to place the job in. * * @return boolean */ public static function retry($jobArray,$queue) { require_once dirname(__FILE__) . '/Resque/Job.php'; $result = Resque_Job::retry($jobArray,$queue); if ($result) { Resque_Event::trigger('afterEnqueue', array( 'class' => $jobArray['class'], 'args' => $jobArray['args'], 'queue' => $queue, )); } return true; }

php-resque/lib/Resque/Job.php

/** * retry job and save it to the specified queue. * * * @param array $jobArray The data of the job. * array( * 'id'=> The id of the job * 'class' => The name of the class that contains the code to execute the job. * 'args' => Any optional arguments that should be passed when the job is executed.'' * 'attempts'=> The retry attempts of the job * ) * @param string $queue The name of the queue to place the job in. * * @return string */ public static function retry($jobArray,$queue) { $args = $jobArray['args']; if($args !== null && !is_array($args)) { throw new InvalidArgumentException( 'Supplied $args must be an array.' ); } $jobArray['attempts']++; Resque::push($queue, array( 'class' => $jobArray['class'], 'args' => $args, 'id' => $jobArray['id'], 'attempts'=>$jobArray['attempts'] )); return true; }结果:

我这里我设置了重试次数为3次,每隔5秒处理一个队列。

注意

1. 只有任务JOB实现类出现异常才会被重新扔回到"resque:failed"队列里面。 2. 对于正常的业务错误不需要重试,目前只考虑到对curl会出现的异常进行重试。