Skip to content

Commit

Permalink
Merge branch 'pl-44-auto-retry-jobs'
Browse files Browse the repository at this point in the history
  • Loading branch information
daneren2005 committed Sep 21, 2023
2 parents 8d35ee9 + e2f5012 commit 5a1fe89
Show file tree
Hide file tree
Showing 10 changed files with 497 additions and 6 deletions.
2 changes: 2 additions & 0 deletions bin/resque
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ if(!empty($REDIS_BACKEND)) {
Resque::setBackend($REDIS_BACKEND, $REDIS_BACKEND_DB);
}

Resque_Plugin::initialize();

$logLevel = false;
$LOGGING = getenv('LOGGING');
$VERBOSE = getenv('VERBOSE');
Expand Down
3 changes: 3 additions & 0 deletions lib/Resque.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
<?php
require_once(__DIR__ . '/Resque/Plugin/Retry.php');
require_once(__DIR__ . '/Resque/Plugin/ExponentialRetry.php');

/**
* Base Resque class.
*
Expand Down
47 changes: 47 additions & 0 deletions lib/Resque/Failure/RedisRetrySuppression.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

class Resque_Failure_Suppression extends Resque_Failure_Redis {
/**
* Initialize a failed job class and save it (where appropriate).
*
* @param object $job Job that failed.
* @param object $exception Instance of the exception that was thrown by the failed job.
* @param object $worker Instance of Resque_Worker that received the job.
* @param string $queue The name of the queue the job was fetched from.
*/
public function __construct($job, $exception, $worker, $queue) {
if (!property_exists($job, 'retrying') or !$job->retrying or $job->retryDelay <= 0) {
return parent::__construct($job, $exception, $worker, $queue);
}

$retryDelay = $job->retryDelay;

$data = $this->getData($job, $exception, $worker, $queue);
$data->retry_delay = $retryDelay;
$data->retried_at = strftime('%a %b %d %H:%M:%S %Z %Y', $job->retryingAt);

Resque::redis()->rpush('failed', json_encode($data));

}

// /**
// * Return the redis key used to log the failure information
// *
// * @param Resque_Job $job
// * @param string
// */
// protected function redisRetryKey($job) {
// return 'failed-retrying:'.$job->retryKey;
// }

// /**
// * Clean up the retry information from Redis
// *
// * @param Resque_Job $job
// */
// protected function clearRetryKey($job) {
// $retryKey = $this->redisRetryKey($job);

// Resque::redis()->del($retryKey);
// }
}
25 changes: 21 additions & 4 deletions lib/Resque/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ public function getInstance()
return $this->instance;
}

public function getClass() {
return $this->payload['class'];
}

/**
* Actually execute a job by calling the perform method on the class
* associated with the job with the supplied arguments.
Expand Down Expand Up @@ -230,10 +234,23 @@ public function perform()
public function fail($exception) {
Resque::redis()->lrem('working:' . $this->queue, 0, $this->payload['id']);
Resque::redis()->del('job:' . $this->payload['id'] . ':worker');
Resque_Event::trigger('onFailure', array(
'exception' => $exception,
'job' => $this,
));
try {
Resque_Event::trigger('onFailure', array(
'exception' => $exception,
'job' => $this,
));
} catch(Resque_Job_IsRetryingException $e) {
if($this->worker) {
$this->worker->logger->log(Psr\Log\LogLevel::DEBUG, 'onFailure is triggering a retry so do not mark status as failed for {job}', array('job' => $this));
}

$this->updateStatus(Resque_Job_Status::STATUS_RETRYING);
return;
} catch(Throwable $e) {
if($this->worker) {
$this->worker->logger->log(Psr\Log\LogLevel::EMERGENCY, 'Failed to report onFailure for {job}: {stack}', array('job' => $this, 'stack' => $e->getMessage()));
}
}

$this->updateStatus(Resque_Job_Status::STATUS_FAILED);
Resque_Failure::create(
Expand Down
5 changes: 5 additions & 0 deletions lib/Resque/Job/IsRetryingException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<?php
class Resque_Job_IsRetryingException extends RuntimeException
{

}
1 change: 1 addition & 0 deletions lib/Resque/Job/Status.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class Resque_Job_Status
const STATUS_FAILED = 3;
const STATUS_COMPLETE = 4;
const STATUS_DELAYED = 5;
const STATUS_RETRYING = 6;

/**
* @var string The ID of the job this status class refers back to.
Expand Down
126 changes: 126 additions & 0 deletions lib/Resque/Plugin.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
<?php

// Copied from https://github.com/JaapRood/php-resque-plugin/blob/master/lib/Resque/Plugin.php
class Resque_Plugin {

/**
* @var array of plugin instances, ordered by job
*/
protected static $_pluginInstances = array();

/**
* @var array of hooks to listen for
*/
protected static $_hooks = array(
'beforePerform',
'afterPerform',
'onFailure'
);

/**
* Start listening to Resque_Event and start using those plugis
*/
public static function initialize() {
$hooks = static::$_hooks;
$class = get_called_class();
$notifyMethod = $class ."::notify_plugins";

foreach ($hooks as $hook) {
Resque_Event::listen($hook, function($job = null, $exception = null) use ($notifyMethod, $hook) {
$payload = func_get_args();
array_unshift($payload, $hook);

call_user_func_array($notifyMethod, $payload);
});
}
}

/**
* @param string $hook which hook to run
* @param mixed $jobOrFailure job for which to run the plugins
*/
public static function notify_plugins($hook, $jobOrFailure, $job = null) {
if ($jobOrFailure instanceof Resque_Job) {
$possibleException = $job;
$job = $jobOrFailure;
if($possibleException instanceof Throwable) {
$exception = $possibleException;
} else {
$exception = null;
}
} elseif ($jobOrFailure instanceof Exception) {
$exception = $jobOrFailure;
} else {
// TODO: review this choice, not sure if it's the right thing to do
return; // fail silently if we don't know how to handle this
}

$plugins = static::plugins($job, $hook);

foreach ($plugins as $plugin) {
$callable = array($plugin, $hook);
if (is_callable($callable)) {
$payload = array($job, $job->getInstance());

if (!is_null($exception)) {
array_unshift($payload, $exception);
}

call_user_func_array($callable, $payload);
}
}
}

/**
* Retrieve the plugin instances for this job, optionally filtered by a hook
*
* @param Resque_Job $job an instance of a job
* @param string $hook optional hook to filter by
* @return array of plugins for the job
*/
public static function plugins(Resque_Job $job, $hook = null) {
$jobName = (string) $job;

if (!array_key_exists($jobName, static::$_pluginInstances)) {
static::$_pluginInstances[$jobName] = static::createInstances($job);
}

$instances = static::$_pluginInstances[$jobName];

if (empty($hook) or empty($instances)) {
return $instances;
}

return array_filter($instances, function($instance) use ($hook) {
return is_callable(array($instance, $hook));
});
}

/**
* Create instances of the plugins for the specified job class
* @param Resque_Job $job
* @return array of plugin instances for this job class
*/
public static function createInstances($job) {
$instances = array();
$jobClass = $job->getClass();

if (property_exists($jobClass, 'resquePlugins')) {
$pluginClasses = $jobClass::$resquePlugins;

foreach ($pluginClasses as $pluginClass) {
if (stripos($pluginClass, '\\') !== 0) {
$pluginClass = '\\'. $pluginClass;
}

if (class_exists($pluginClass)) {
array_push($instances, new $pluginClass);
}
}
}

return $instances;
}


}
53 changes: 53 additions & 0 deletions lib/Resque/Plugin/ExponentialRetry.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?php

class Resque_ExponentialRetry extends Resque_Retry {

/**
* Get the retry delay from the job, defaults to the amount of steps in the defined backoff
* strategy
*
* @param Resque_Job $job
* @return int retry limit
*/
protected function retryLimit($job) {
return $this->getInstanceProperty($job, 'retryLimit', count($this->backoffStrategy($job)));
}

/**
* Get the retry delay for the job
*
* @param Resque_Job $job
* @return int retry delay in seconds
*/
protected function retryDelay($job) {
$backoffStrategy = $this->backoffStrategy($job);
$strategySteps = count($backoffStrategy);

if ($strategySteps <= 0) {
return 0;
} elseif (($strategySteps - 1) > $job->retryAttempt) {
return $backoffStrategy[$job->retryAttempt];
} else {
return $backoffStrategy[$strategySteps - 1];
}
}

/**
* Get the backoff strategy from the job, defaults to:
* - 1 second
* - 5 seconds
* - 30 seconds
* - 1 minute
* - 10 minutes
* - 1 hour
* - 3 hours
* - 6 hours
*
* @param Resque_Job $job
* @return int retry limit
*/
protected function backoffStrategy($job) {
$defaultStrategy = array(1, 5, 30, 60, 600, 3600, 10800, 21600);
return $this->getInstanceProperty($job, 'backoffStrategy', $defaultStrategy);
}
}
Loading

0 comments on commit 5a1fe89

Please sign in to comment.