diff --git a/README.md b/README.md index 045423075..bac603178 100644 --- a/README.md +++ b/README.md @@ -722,7 +722,7 @@ Batches track a set of jobs, and enqueue an optional callback job when all of th - [`GoodJob::Batch`](app/models/good_job/batch.rb) has a number of assignable attributes and methods: ```ruby -batch = GoodJob::Batch.new +batch = GoodJob::Batch.new # or .new(paused: true) to pause all jobs added to the batch batch.description = "My batch" batch.on_finish = "MyBatchCallbackJob" # Callback job when all jobs have finished batch.on_success = "MyBatchCallbackJob" # Callback job when/if all jobs have succeeded @@ -734,12 +734,14 @@ batch.add do MyJob.perform_later end batch.enqueue +batch.unpause # Unpauses all jobs within the batch, allowing them to be executed batch.discarded? # => Boolean batch.discarded_at # => batch.finished? # => Boolean batch.finished_at # => batch.succeeded? # => Boolean +batch.paused? # => Boolean // TODO: expand on what this method does batch.active_jobs # => Array of ActiveJob::Base-inherited jobs that are part of the batch batch = GoodJob::Batch.find(batch.id) @@ -831,6 +833,10 @@ end GoodJob::Batch.enqueue(on_finish: BatchJob) ``` +#### Pausing batches + +// TODO: document how to pause/unpause a batch (potentially create as an entirely separate section about pausing things?) + #### Other batch details - Whether to enqueue a callback job is evaluated once the batch is in an `enqueued?`-state by using `GoodJob::Batch.enqueue` or `batch.enqueue`. diff --git a/app/models/good_job/batch.rb b/app/models/good_job/batch.rb index 17a635c45..ee2e32270 100644 --- a/app/models/good_job/batch.rb +++ b/app/models/good_job/batch.rb @@ -119,7 +119,7 @@ def enqueue(active_jobs = [], **properties, &block) def add(active_jobs = nil, &block) record.save if record.new_record? - buffer = Bulk::Buffer.new + buffer = Bulk::Buffer.new(pause_jobs: properties[:paused] || false) buffer.add(active_jobs) buffer.capture(&block) if block @@ -130,6 +130,44 @@ def add(active_jobs = nil, &block) buffer.active_jobs end + # TODO: document + def paused? + # TODO: consider querying to see if any jobs within the batch are paused, and if/how that should be represented if that result does not match properties[:paused] + # I think there are probably going to need to be separate methods for "is the batch set to pause all jobs within it" and "does the batch contain any paused jobs" as those cases aren't always lined up + properties[:paused] || false + end + + # TODO + # def pause; end + + # TODO: document + def unpause + # TODO: consider raising an exception if the batch isn't paused in the first place + + # TODO: consider setting this at the end of the method, or doing something similar to help handle situations where an exception is raised during unpausing + assign_properties(paused: false) + + # TODO: this could be implemented with COALESCE and `jsonb_path_query(serialized_params, '$.scheduled_at.datetime()')` to extract the previously scheduled time within a single UPDATE, but that method is not available in PG12 (still supported at the time of writing) + unpaused_count = 0 + + loop do + jobs = record.jobs.where(scheduled_at: nil).limit(1_000) + break if jobs.empty? + + jobs.each do |job| + job.update!(scheduled_at: job.serialized_params['scheduled_at'] || Time.current) + end + + jobs.collect(&:queue_name).tally.each do |q, c| + GoodJob::Notifier.notify({ queue_name: q, count: c }) + end + + unpaused_count += jobs.size + end + + unpaused_count + end + def active_jobs record.jobs.map(&:active_job) end diff --git a/app/models/good_job/execution.rb b/app/models/good_job/execution.rb index 2b4c4850b..357550b7d 100644 --- a/app/models/good_job/execution.rb +++ b/app/models/good_job/execution.rb @@ -91,11 +91,11 @@ def self.queue_parser(string) scope :unfinished, -> { where(finished_at: nil) } # Get executions that are not scheduled for a later time than now (i.e. jobs that - # are not scheduled or scheduled for earlier than the current time). + # are scheduled for earlier than the current time). Paused jobs are excluded. # @!method only_scheduled # @!scope class # @return [ActiveRecord::Relation] - scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', DateTime.current, ActiveRecord::Type::DateTime))).or(where(scheduled_at: nil)) } + scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', DateTime.current, ActiveRecord::Type::DateTime))) } # Order executions by priority (highest priority first). # @!method priority_ordered @@ -331,6 +331,7 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false else execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at }) execution.make_discrete if discrete_support? + execution.scheduled_at ||= Time.current # set after make_discrete so it can manage assigning both created_at and scheduled_at simultaneously end if create_with_advisory_lock @@ -342,6 +343,7 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false end instrument_payload[:execution] = execution + execution.scheduled_at = nil if active_job.respond_to?(:good_job_paused) && active_job.good_job_paused execution.save! if retried diff --git a/demo/app/jobs/other_job.rb b/demo/app/jobs/other_job.rb index 24f511a75..2fabd5aa0 100644 --- a/demo/app/jobs/other_job.rb +++ b/demo/app/jobs/other_job.rb @@ -2,5 +2,6 @@ class OtherJob < ApplicationJob JobError = Class.new(StandardError) def perform(*) + # raise 'nope' end end diff --git a/lib/good_job.rb b/lib/good_job.rb index 8208f5ad7..de37ad2fa 100644 --- a/lib/good_job.rb +++ b/lib/good_job.rb @@ -14,6 +14,7 @@ require_relative "good_job/active_job_extensions/interrupt_errors" require_relative "good_job/active_job_extensions/labels" require_relative "good_job/active_job_extensions/notify_options" +require_relative "good_job/active_job_extensions/pauseable" require_relative "good_job/overridable_connection" require_relative "good_job/bulk" diff --git a/lib/good_job/active_job_extensions/pauseable.rb b/lib/good_job/active_job_extensions/pauseable.rb new file mode 100644 index 000000000..86d98c34b --- /dev/null +++ b/lib/good_job/active_job_extensions/pauseable.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +module GoodJob + module ActiveJobExtensions + # Allows configuring whether the job should start 'paused' when a job is enqueued. + # Configuration will apply either globally to the Job Class, or individually to jobs + # on initial enqueue and subsequent retries. + # + # @example + # class MyJob < ApplicationJob + # self.good_job_paused = true + # end + # + # # Or, configure jobs individually to not notify: + # MyJob.set(good_job_paused: true).perform_later + # + # See also - GoodJob:Batch#new's `paused` option + + module Pauseable + extend ActiveSupport::Concern + + module Prepends + def enqueue(options = {}) + self.good_job_paused = options[:good_job_paused] if options.key?(:good_job_paused) + super + end + + # good_job_paused is intentionally excluded from the serialized params so we fully rely on the scheduled_at value once the job is enqueued + # TODO: remove before merge + # def serialize + # super.tap do |job_data| + # # job_data["good_job_paused"] = good_job_paused unless good_job_paused.nil? + # end + # end + + # def deserialize(job_data) + # super + # self.good_job_paused = job_data["good_job_paused"] + # end + end + + included do + prepend Prepends + class_attribute :good_job_paused, instance_accessor: false, instance_predicate: false, default: nil + attr_accessor :good_job_paused + end + end + end +end + +# Jobs can be paused through batches which rely on good_job_paused being available, so this must be included globally +ActiveSupport.on_load(:active_job) { include GoodJob::ActiveJobExtensions::Pauseable } diff --git a/lib/good_job/adapter.rb b/lib/good_job/adapter.rb index 9029ee299..b774e84ba 100644 --- a/lib/good_job/adapter.rb +++ b/lib/good_job/adapter.rb @@ -62,6 +62,7 @@ def enqueue_all(active_jobs) if GoodJob::Execution.discrete_support? execution.make_discrete execution.scheduled_at = current_time if execution.scheduled_at == execution.created_at + execution.scheduled_at = nil if active_job.respond_to?(:good_job_paused) && active_job.good_job_paused end execution.created_at = current_time @@ -92,7 +93,7 @@ def enqueue_all(active_jobs) executions = executions.select(&:persisted?) # prune unpersisted executions if execute_inline? - inline_executions = executions.select { |execution| (execution.scheduled_at.nil? || execution.scheduled_at <= Time.current) } + inline_executions = executions.select { |execution| execution.scheduled_at.present? && execution.scheduled_at <= Time.current } inline_executions.each(&:advisory_lock!) end end @@ -146,14 +147,14 @@ def enqueue_all(active_jobs) # @param timestamp [Integer, nil] the epoch time to perform the job # @return [GoodJob::Execution] def enqueue_at(active_job, timestamp) - scheduled_at = timestamp ? Time.zone.at(timestamp) : nil + scheduled_at = timestamp ? Time.zone.at(timestamp) : Time.current # If there is a currently open Bulk in the current thread, direct the # job there to be enqueued using enqueue_all return if GoodJob::Bulk.capture(active_job, queue_adapter: self) Rails.application.executor.wrap do - will_execute_inline = execute_inline? && (scheduled_at.nil? || scheduled_at <= Time.current) + will_execute_inline = execute_inline? && (scheduled_at.present? && scheduled_at <= Time.current) will_retry_inline = will_execute_inline && CurrentThread.execution&.active_job_id == active_job.job_id && !CurrentThread.retry_now if will_retry_inline @@ -171,7 +172,7 @@ def enqueue_at(active_job, timestamp) result = execution.perform retried_execution = result.retried - while retried_execution && (retried_execution.scheduled_at.nil? || retried_execution.scheduled_at <= Time.current) + while retried_execution && (retried_execution.scheduled_at.present? && retried_execution.scheduled_at <= Time.current) execution = retried_execution result = execution.perform retried_execution = result.retried diff --git a/lib/good_job/bulk.rb b/lib/good_job/bulk.rb index 9511ef30c..85b77b4e1 100644 --- a/lib/good_job/bulk.rb +++ b/lib/good_job/bulk.rb @@ -60,8 +60,9 @@ def self.unbuffer end class Buffer - def initialize + def initialize(pause_jobs: false) @values = [] + @pause_jobs = pause_jobs end def capture @@ -79,6 +80,9 @@ def add(active_jobs, queue_adapter: nil) adapter = queue_adapter || active_job.class.queue_adapter raise Error, "Jobs must have a Queue Adapter" unless adapter + # TODO: should explicitly setting `good_job_paused = false` on a job override this? + active_job.good_job_paused = true if @pause_jobs && active_job.respond_to?(:good_job_paused) + [active_job, adapter] end @values.append(*new_pairs) diff --git a/spec/app/filters/good_job/jobs_filter_spec.rb b/spec/app/filters/good_job/jobs_filter_spec.rb index 83daa3c8f..976a59d62 100644 --- a/spec/app/filters/good_job/jobs_filter_spec.rb +++ b/spec/app/filters/good_job/jobs_filter_spec.rb @@ -12,6 +12,7 @@ ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external) ExampleJob.set(queue: 'cron').perform_later + GoodJob::Job.order(created_at: :asc).last.update!(cron_key: "frequent_cron") ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :inline) @@ -20,6 +21,7 @@ Timecop.travel 1.hour.ago ExampleJob.set(queue: 'elephants').perform_later(ExampleJob::DEAD_TYPE) + 5.times do Timecop.travel 5.minutes GoodJob.perform_inline diff --git a/spec/app/models/good_job/execution_spec.rb b/spec/app/models/good_job/execution_spec.rb index 369683120..92d7519b8 100644 --- a/spec/app/models/good_job/execution_spec.rb +++ b/spec/app/models/good_job/execution_spec.rb @@ -63,7 +63,7 @@ def perform(result_value = nil, raise_error: false) context 'when NOT discrete' do before { allow(described_class).to receive(:discrete_support?).and_return(false) } - it 'does not assign id, scheduled_at' do + it 'does not assign id, does assign scheduled_at' do expect { described_class.enqueue(active_job) }.to change(described_class, :count).by(1) execution = described_class.last @@ -71,7 +71,7 @@ def perform(result_value = nil, raise_error: false) expect(execution).to have_attributes( is_discrete: nil, active_job_id: active_job.job_id, - scheduled_at: nil + scheduled_at: within(1).of(Time.current) ) end end @@ -88,7 +88,7 @@ def perform(result_value = nil, raise_error: false) serialized_params: a_kind_of(Hash), queue_name: 'test', priority: 50, - scheduled_at: nil + scheduled_at: within(1).of(Time.current) ) end @@ -182,9 +182,10 @@ def perform(result_value = nil, raise_error: false) context 'with multiple jobs' do def job_params - { active_job_id: SecureRandom.uuid, queue_name: "default", priority: 0, serialized_params: { job_class: "TestJob" } } + { active_job_id: SecureRandom.uuid, queue_name: "default", priority: 0, serialized_params: { job_class: "TestJob" }, scheduled_at: sched } end + let!(:sched) { Time.current } let!(:older_job) { described_class.create!(job_params.merge(created_at: 10.minutes.ago)) } let!(:newer_job) { described_class.create!(job_params.merge(created_at: 5.minutes.ago)) } let!(:low_priority_job) { described_class.create!(job_params.merge(priority: 20)) } @@ -205,9 +206,10 @@ def job_params context "with multiple jobs and ordered queues" do def job_params - { active_job_id: SecureRandom.uuid, queue_name: "default", priority: 0, serialized_params: { job_class: "TestJob" } } + { active_job_id: SecureRandom.uuid, queue_name: "default", priority: 0, serialized_params: { job_class: "TestJob" }, scheduled_at: sched } end + let!(:sched) { Time.current } let(:parsed_queues) { { include: %w{one two}, ordered_queues: true } } let!(:queue_two_job) { described_class.create!(job_params.merge(queue_name: "two", created_at: 10.minutes.ago, priority: 100)) } let!(:queue_one_job) { described_class.create!(job_params.merge(queue_name: "one", created_at: 1.minute.ago, priority: 1)) } @@ -698,7 +700,7 @@ def job_params job_class: good_job.job_class, queue_name: good_job.queue_name, created_at: within(0.001).of(good_job.performed_at), - scheduled_at: within(0.001).of(good_job.created_at), + scheduled_at: within(0.1).of(good_job.created_at), finished_at: within(1.second).of(Time.current), error: nil, serialized_params: good_job.serialized_params diff --git a/spec/lib/good_job/adapter_spec.rb b/spec/lib/good_job/adapter_spec.rb index 54611d214..0c3d1d6ef 100644 --- a/spec/lib/good_job/adapter_spec.rb +++ b/spec/lib/good_job/adapter_spec.rb @@ -56,7 +56,7 @@ expect(GoodJob::Execution).to have_received(:enqueue).with( active_job, - scheduled_at: nil + scheduled_at: be_within(1).of(Time.current) ) end diff --git a/todo.txt b/todo.txt new file mode 100644 index 000000000..1a28936e0 --- /dev/null +++ b/todo.txt @@ -0,0 +1,2 @@ +* Indicate paused status in the jobs dashboard +