From b6ffb0a9d159fe698560709a6d410eae4734e216 Mon Sep 17 00:00:00 2001 From: Scott Jackson Date: Tue, 29 Aug 2023 14:51:57 -0700 Subject: [PATCH 1/2] Start working on adding auto retrying failed jobs --- bin/resque | 2 + lib/Resque.php | 3 + lib/Resque/Failure/RedisRetrySuppression.php | 47 ++++ lib/Resque/Job.php | 19 +- lib/Resque/Plugin.php | 126 ++++++++++ lib/Resque/Plugin/ExponentialRetry.php | 53 +++++ lib/Resque/Plugin/Retry.php | 233 +++++++++++++++++++ lib/ResqueScheduler.php | 6 +- 8 files changed, 483 insertions(+), 6 deletions(-) create mode 100644 lib/Resque/Failure/RedisRetrySuppression.php create mode 100644 lib/Resque/Plugin.php create mode 100644 lib/Resque/Plugin/ExponentialRetry.php create mode 100644 lib/Resque/Plugin/Retry.php diff --git a/bin/resque b/bin/resque index d06479fa..17b6af96 100755 --- a/bin/resque +++ b/bin/resque @@ -47,6 +47,8 @@ if(!empty($REDIS_BACKEND)) { Resque::setBackend($REDIS_BACKEND, $REDIS_BACKEND_DB); } +Resque_Plugin::initialize(); + $logLevel = false; $LOGGING = getenv('LOGGING'); $VERBOSE = getenv('VERBOSE'); diff --git a/lib/Resque.php b/lib/Resque.php index ddc4a02a..77cebe37 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -1,4 +1,7 @@ retrying or $job->retryDelay <= 0) { + return parent::__construct($job, $exception, $worker, $queue); + } + + $retryDelay = $job->retryDelay; + + $data = $this->getData($job, $exception, $worker, $queue); + $data->retry_delay = $retryDelay; + $data->retried_at = strftime('%a %b %d %H:%M:%S %Z %Y', $job->retryingAt); + + Resque::redis()->rpush('failed', json_encode($data)); + + } + + // /** + // * Return the redis key used to log the failure information + // * + // * @param Resque_Job $job + // * @param string + // */ + // protected function redisRetryKey($job) { + // return 'failed-retrying:'.$job->retryKey; + // } + + // /** + // * Clean up the retry information from Redis + // * + // * @param Resque_Job $job + // */ + // protected function clearRetryKey($job) { + // $retryKey = $this->redisRetryKey($job); + + // Resque::redis()->del($retryKey); + // } +} \ No newline at end of file diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index 4c3d4e7a..85551126 100755 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -182,6 +182,10 @@ public function getInstance() return $this->instance; } + public function getClass() { + return $this->payload['class']; + } + /** * Actually execute a job by calling the perform method on the class * associated with the job with the supplied arguments. @@ -230,11 +234,18 @@ public function perform() public function fail($exception) { Resque::redis()->lrem('working:' . $this->queue, 0, $this->payload['id']); Resque::redis()->del('job:' . $this->payload['id'] . ':worker'); - Resque_Event::trigger('onFailure', array( - 'exception' => $exception, - 'job' => $this, - )); + try { + Resque_Event::trigger('onFailure', array( + 'exception' => $exception, + 'job' => $this, + )); + } catch(Throwable $e) { + if($this->worker) { + $this->worker->logger->log(Psr\Log\LogLevel::EMERGENCY, 'Failed to report onFailure for {job}: {stack}', array('job' => $this, 'stack' => $e->getMessage())); + } + } + // TODO: Track if we are retrying this instead of marking it as failed after putting in a delayed queue $this->updateStatus(Resque_Job_Status::STATUS_FAILED); Resque_Failure::create( $this->payload, diff --git a/lib/Resque/Plugin.php b/lib/Resque/Plugin.php new file mode 100644 index 00000000..9eff664c --- /dev/null +++ b/lib/Resque/Plugin.php @@ -0,0 +1,126 @@ +getInstance()); + + if (!is_null($exception)) { + array_unshift($payload, $exception); + } + + call_user_func_array($callable, $payload); + } + } + } + + /** + * Retrieve the plugin instances for this job, optionally filtered by a hook + * + * @param Resque_Job $job an instance of a job + * @param string $hook optional hook to filter by + * @return array of plugins for the job + */ + public static function plugins(Resque_Job $job, $hook = null) { + $jobName = (string) $job; + + if (!array_key_exists($jobName, static::$_pluginInstances)) { + static::$_pluginInstances[$jobName] = static::createInstances($job); + } + + $instances = static::$_pluginInstances[$jobName]; + + if (empty($hook) or empty($instances)) { + return $instances; + } + + return array_filter($instances, function($instance) use ($hook) { + return is_callable(array($instance, $hook)); + }); + } + + /** + * Create instances of the plugins for the specified job class + * @param Resque_Job $job + * @return array of plugin instances for this job class + */ + public static function createInstances($job) { + $instances = array(); + $jobClass = $job->getClass(); + + if (property_exists($jobClass, 'resquePlugins')) { + $pluginClasses = $jobClass::$resquePlugins; + + foreach ($pluginClasses as $pluginClass) { + if (stripos($pluginClass, '\\') !== 0) { + $pluginClass = '\\'. $pluginClass; + } + + if (class_exists($pluginClass)) { + array_push($instances, new $pluginClass); + } + } + } + + return $instances; + } + + +} \ No newline at end of file diff --git a/lib/Resque/Plugin/ExponentialRetry.php b/lib/Resque/Plugin/ExponentialRetry.php new file mode 100644 index 00000000..1fdd0235 --- /dev/null +++ b/lib/Resque/Plugin/ExponentialRetry.php @@ -0,0 +1,53 @@ +getInstanceProperty($job, 'retryLimit', count($this->backoffStrategy($job))); + } + + /** + * Get the retry delay for the job + * + * @param Resque_Job $job + * @return int retry delay in seconds + */ + protected function retryDelay($job) { + $backoffStrategy = $this->backoffStrategy($job); + $strategySteps = count($backoffStrategy); + + if ($strategySteps <= 0) { + return 0; + } elseif (($strategySteps - 1) > $job->retryAttempt) { + return $backoffStrategy[$job->retryAttempt]; + } else { + return $backoffStrategy[$strategySteps - 1]; + } + } + + /** + * Get the backoff strategy from the job, defaults to: + * - 1 second + * - 5 seconds + * - 30 seconds + * - 1 minute + * - 10 minutes + * - 1 hour + * - 3 hours + * - 6 hours + * + * @param Resque_Job $job + * @return int retry limit + */ + protected function backoffStrategy($job) { + $defaultStrategy = array(1, 5, 30, 60, 600, 3600, 10800, 21600); + return $this->getInstanceProperty($job, 'backoffStrategy', $defaultStrategy); + } +} \ No newline at end of file diff --git a/lib/Resque/Plugin/Retry.php b/lib/Resque/Plugin/Retry.php new file mode 100644 index 00000000..eb354da6 --- /dev/null +++ b/lib/Resque/Plugin/Retry.php @@ -0,0 +1,233 @@ +retryCriteriaValid($exception, $job)) { + $this->tryAgain($exception, $job); + $job->retryKey = $this->redisRetryKey($job); + } else { + $this->cleanRetryKey($job); + } + + } + + /** + * Hook into before the job is performed + * + * Sets up the tracking of the of the amount of attempts trying to perform this job + * + * @param Resque_Job $job + */ + public function beforePerform($job) { + // Keep track of the number of retry attempts + $retryKey = $this->redisRetryKey($job); + + Resque::redis()->setnx($retryKey, -1); // set to -1 if key doesn't exist + $job->retryAttempt = Resque::redis()->incr($retryKey); + + // Set basic info on the job + $job->retryKey = $this->redisRetryKey($job); + $job->failureBackend = 'Resque_Failure_Suppression'; + } + + /** + * Hook into the job having been performed + * + * Cleans up any data we've tracked for retrying now that the job has been successfully + * performed. + * + * @param Resque_Job $job + */ + public function afterPerform($job) { + $this->cleanRetryKey($job); + } + + + /** + * Retry the job + * + * @param Exception $exception the exception that caused the job to fail + * @param Resque_Job $job the job that failed and should be retried + */ + protected function tryAgain($exception, $job) { + $retryDelay = $this->retryDelay($job); + + $queue = $job->queue; + $class = $job->getClass(); + $arguments = $job->getArguments(); + + $retryingAt = time() + $retryDelay; + + if ($retryDelay <= 0) { + if($job->worker) { + $job->worker->logger->log(Psr\Log\LogLevel::NOTICE, 'Re-running failed job immediately: {job}', array('job' => $job)); + } + $job->recreate(); + } else { + if($job->worker) { + $job->worker->logger->log(Psr\Log\LogLevel::NOTICE, 'Re-running failed job in {delay} seconds: {job}', array('job' => $job, 'delay' => $retryDelay)); + } + $monitor = false; + $status = new Resque_Job_Status($job->payload['id']); + if($status->isTracking()) { + $monitor = true; + } + ResqueScheduler::enqueueAt($retryingAt, $queue, $class, $arguments, $monitor, $job->payload['id']); + } + + $job->retrying = true; + $job->retryDelay = $retryDelay; + $job->retryingAt = $retryingAt; + } + + /** + * Clean up the retry attempts information from Redis + * + * @param Resque_Job $job + */ + protected function cleanRetryKey($job) { + $retryKey = $this->redisRetryKey($job); + + Resque::redis()->del($retryKey); + } + + /** + * Return the redis key used to track retries + * + * @param Resque_Job $job + * @param string + */ + protected function redisRetryKey($job) { + $name = array( + 'Job{' . $job->queue .'}' + ); + $name[] = $job->payload['class']; + $name[] = $job->payload['id']; + + return 'retry:' . '(' . implode(' | ', $name) . ')'; + } + + /** + * Test whether the retry criteria are valid + * + * @param Exception $exception + * @param Resque_Job $job + * @return boolean + */ + protected function retryCriteriaValid($exception, $job) { + if ($this->retryLimitReached($job)) return false; + + $shouldRetry = $this->retryException($exception, $job); + + return $shouldRetry; // retry everything for now + } + + /** + * Check whether this exception should be retried. Will retry all exceptions + * when no specific exceptions are defined. + * + * @param Exception $e exception thrown in job + * @param Resque_Job $job + * @return boolean + */ + protected function retryException($exception, $job) { + $exceptions = $this->retryExceptions($job); + + if (is_null($exceptions) or empty($exceptions)) return true; + + foreach ($exceptions as $e) { + if (stripos($e, '\\') !== 0) { + $e = '\\'. $e; + } + + if (is_a($exception, $e)) return true; + } + + // if we reached this point, the exception is not one we want to retry + return false; + } + + /** + * Get the exceptions defined on the job instance for which this job shoud be + * retried when it fails. + * + * @param Resque_Job $job + * @return array,null classnames of exceptions or null + */ + + protected function retryExceptions($job) { + return $this->getInstanceProperty($job, 'retryExceptions', null); + } + + /** + * Check whether the retry limit has been reached + * + * @param Resque_Job $job + * @return boolean + */ + protected function retryLimitReached($job) { + $retryLimit = $this->retryLimit($job); + + if ($retryLimit === 0) { + return true; + } elseif ($retryLimit > 0) { + if(!isset($job->retryAttempt) || !$job->retryAttempt) { + $retryKey = $this->redisRetryKey($job); + $job->retryAttempt = intval(Resque::redis()->get($retryKey) ?? '0'); + } + + return ($job->retryAttempt >= $retryLimit); + } else { + return false; + } + } + + /** + * Get the retry delay from the job, defaults to 0 + * + * @param Resque_Job $job + * @return int retry delay in seconds + */ + protected function retryLimit($job) { + return $this->getInstanceProperty($job, 'retryLimit', 1); + } + + /** + * Get the retry delay from the job, defaults to 0 + * + * @param Resque_Job $job + * @return int retry delay in seconds + */ + protected function retryDelay($job) { + return $this->getInstanceProperty($job, 'retryDelay', 0); + } + + /** + * Get a property of the job instance if it exists, otherwise + * the default value for this property. Return null for a property + * that has no default set + */ + protected function getInstanceProperty($job, $property, $default = null) { + $instance = $job->getInstance(); + + if (method_exists($instance, $property)) { + return call_user_func_array(array($instance, $property), $job); + } + + if (property_exists($instance, $property)) { + return $instance->{$property}; + } + + return $default; + } + +} \ No newline at end of file diff --git a/lib/ResqueScheduler.php b/lib/ResqueScheduler.php index b103bfb0..ca346096 100644 --- a/lib/ResqueScheduler.php +++ b/lib/ResqueScheduler.php @@ -41,11 +41,13 @@ public static function enqueueIn($in, $queue, $class, array $args = array(), $tr * @param array $args Any optional arguments that should be passed when the job is executed. * @param boolean $trackStatus Set to true to be able to monitor the status of a job. */ - public static function enqueueAt($at, $queue, $class, $args = array(), $trackStatus = false) + public static function enqueueAt($at, $queue, $class, $args = array(), $trackStatus = false, $id = null) { self::validateJob($class, $queue); - $id = Resque::generateJobId(); + if(!$id) { + $id = Resque::generateJobId(); + } $job = self::jobToHash($queue, $class, $args, $id, $trackStatus); self::delayedPush($at, $job); From e2f50129c360fc592e33391ef8a681d7dfab3874 Mon Sep 17 00:00:00 2001 From: Scott Jackson Date: Thu, 21 Sep 2023 14:21:34 -0700 Subject: [PATCH 2/2] Mark retrying jobs as retried instead of as failed until they start again --- lib/Resque/Job.php | 8 +++++++- lib/Resque/Job/IsRetryingException.php | 5 +++++ lib/Resque/Job/Status.php | 1 + lib/Resque/Plugin/Retry.php | 2 ++ 4 files changed, 15 insertions(+), 1 deletion(-) create mode 100644 lib/Resque/Job/IsRetryingException.php diff --git a/lib/Resque/Job.php b/lib/Resque/Job.php index 85551126..885a6f84 100755 --- a/lib/Resque/Job.php +++ b/lib/Resque/Job.php @@ -239,13 +239,19 @@ public function fail($exception) { 'exception' => $exception, 'job' => $this, )); + } catch(Resque_Job_IsRetryingException $e) { + if($this->worker) { + $this->worker->logger->log(Psr\Log\LogLevel::DEBUG, 'onFailure is triggering a retry so do not mark status as failed for {job}', array('job' => $this)); + } + + $this->updateStatus(Resque_Job_Status::STATUS_RETRYING); + return; } catch(Throwable $e) { if($this->worker) { $this->worker->logger->log(Psr\Log\LogLevel::EMERGENCY, 'Failed to report onFailure for {job}: {stack}', array('job' => $this, 'stack' => $e->getMessage())); } } - // TODO: Track if we are retrying this instead of marking it as failed after putting in a delayed queue $this->updateStatus(Resque_Job_Status::STATUS_FAILED); Resque_Failure::create( $this->payload, diff --git a/lib/Resque/Job/IsRetryingException.php b/lib/Resque/Job/IsRetryingException.php new file mode 100644 index 00000000..97c46b38 --- /dev/null +++ b/lib/Resque/Job/IsRetryingException.php @@ -0,0 +1,5 @@ +retryCriteriaValid($exception, $job)) { $this->tryAgain($exception, $job); $job->retryKey = $this->redisRetryKey($job); + + throw new Resque_Job_IsRetryingException(); } else { $this->cleanRetryKey($job); }