Skip to content

Commit

Permalink
Add status tracking to delayed jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
daneren2005 committed Aug 28, 2023
1 parent 6fdb2ab commit 8d35ee9
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 14 deletions.
6 changes: 4 additions & 2 deletions lib/Resque.php
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,11 @@ public static function sizeWorking($queue) {
*
* @return string|boolean Job ID when the job was created, false if creation was cancelled due to beforeEnqueue
*/
public static function enqueue($queue, $class, $args = null, $trackStatus = false)
public static function enqueue($queue, $class, $args = null, $trackStatus = false, $id = null)
{
$id = Resque::generateJobId();
if(!$id) {
$id = Resque::generateJobId();
}
$hookParams = array(
'class' => $class,
'args' => $args,
Expand Down
6 changes: 4 additions & 2 deletions lib/Resque/Job/Status.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Resque_Job_Status
const STATUS_RUNNING = 2;
const STATUS_FAILED = 3;
const STATUS_COMPLETE = 4;
const STATUS_DELAYED = 5;

/**
* @var string The ID of the job this status class refers back to.
Expand Down Expand Up @@ -47,11 +48,12 @@ public function __construct($id)
* all necessary keys in Redis to monitor the status of a job.
*
* @param string $id The ID of the job to monitor the status of.
* @param int $startStatus The start status of the job - defaults to STATUS_WAITING
*/
public static function create($id)
public static function create($id, $startStatus = Resque_Job_Status::STATUS_WAITING)
{
$statusPacket = array(
'status' => self::STATUS_WAITING,
'status' => $startStatus,
'updated' => time(),
self::statusToString(self::STATUS_WAITING) => time(),
'started' => time(),
Expand Down
31 changes: 22 additions & 9 deletions lib/ResqueScheduler.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ class ResqueScheduler
* @param string $queue The name of the queue to place the job in.
* @param string $class The name of the class that contains the code to execute the job.
* @param array $args Any optional arguments that should be passed when the job is executed.
* @param boolean $trackStatus Set to true to be able to monitor the status of a job.
*/
public static function enqueueIn($in, $queue, $class, array $args = array())
public static function enqueueIn($in, $queue, $class, array $args = array(), $trackStatus = false)
{
self::enqueueAt(time() + $in, $queue, $class, $args);
return self::enqueueAt(time() + $in, $queue, $class, $args, $trackStatus);
}

/**
Expand All @@ -38,20 +39,29 @@ public static function enqueueIn($in, $queue, $class, array $args = array())
* @param string $queue The name of the queue to place the job in.
* @param string $class The name of the class that contains the code to execute the job.
* @param array $args Any optional arguments that should be passed when the job is executed.
* @param boolean $trackStatus Set to true to be able to monitor the status of a job.
*/
public static function enqueueAt($at, $queue, $class, $args = array())
public static function enqueueAt($at, $queue, $class, $args = array(), $trackStatus = false)
{
self::validateJob($class, $queue);

$job = self::jobToHash($queue, $class, $args);
$id = Resque::generateJobId();
$job = self::jobToHash($queue, $class, $args, $id, $trackStatus);
self::delayedPush($at, $job);

if($trackStatus) {
Resque_Job_Status::create($id, Resque_Job_Status::STATUS_DELAYED);
}

Resque_Event::trigger('afterSchedule', array(
'at' => $at,
'queue' => $queue,
'class' => $class,
'args' => $args,
'id' => $id
));

return $id;
}

/**
Expand Down Expand Up @@ -91,6 +101,7 @@ public static function getDelayedTimestampSize($timestamp)
return Resque::redis()->llen('delayed:' . $timestamp, $timestamp);
}

// TODO: Re-implement removeDelayed after adding status tracking to job hash
/**
* Remove a delayed job from the queue
*
Expand All @@ -106,7 +117,7 @@ public static function getDelayedTimestampSize($timestamp)
* @param $args
* @return int number of jobs that were removed
*/
public static function removeDelayed($queue, $class, $args)
/*public static function removeDelayed($queue, $class, $args)
{
$destroyed=0;
$item=json_encode(self::jobToHash($queue, $class, $args));
Expand All @@ -119,7 +130,7 @@ public static function removeDelayed($queue, $class, $args)
}
return $destroyed;
}
}*/

/**
* removed a delayed job queued for a specific timestamp
Expand All @@ -134,7 +145,7 @@ public static function removeDelayed($queue, $class, $args)
* @param $args
* @return mixed
*/
public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class, $args)
/*public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class, $args)
{
$key = 'delayed:' . self::getTimestamp($timestamp);
$item = json_encode(self::jobToHash($queue, $class, $args));
Expand All @@ -143,7 +154,7 @@ public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class,
self::cleanupTimestamp($key, $timestamp);
return $count;
}
}*/

/**
* Generate hash of all job properties to be saved in the scheduled queue.
Expand All @@ -153,12 +164,14 @@ public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class,
* @param array $args Array of job arguments.
*/

private static function jobToHash($queue, $class, $args)
private static function jobToHash($queue, $class, $args, $id, $trackStatus)
{
return array(
'class' => $class,
'args' => array($args),
'queue' => $queue,
'id' => $id,
'trackStatus' => $trackStatus
);
}

Expand Down
2 changes: 1 addition & 1 deletion lib/ResqueScheduler/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public function enqueueDelayedItemsForTimestamp($timestamp)
'args' => $item['args'],
));

$payload = array_merge(array($item['queue'], $item['class']), $item['args']);
$payload = array_merge(array($item['queue'], $item['class']), $item['args'], array($item['trackStatus'], $item['id']));
call_user_func_array('Resque::enqueue', $payload);
}
}
Expand Down

0 comments on commit 8d35ee9

Please sign in to comment.