diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c96a694..6c8d6bf8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ +- [Unreleased](#unreleased) - [2.0.0 \(2022-08-25\)](#200-2022-08-25) - [1.4.1 \(2022-07-24\)](#141-2022-07-24) - [2.0.0.beta1 \(2022-03-24\)](#200beta1-2022-03-24) @@ -54,8 +55,31 @@ +## Unreleased + +- **Added**: + + Added bulk enqueue interface for performance when enqueuing a large number of jobs at once - [docs](docs#enqueueing-jobs-in-bulk). +- **Deprecated**: + + Deprecated `que_state_notify` trigger (`que_state` notification channel / `job_change` notification message). See [#372](https://github.com/que-rb/que/issues/372). We plan to remove this in a future release - let us know on the issue if you desire otherwise. + +This release contains a database migration. You will need to migrate Que to the latest database schema version (7). For example, on ActiveRecord and Rails 6: + +```ruby +class UpdateQueTablesToVersion6 < ActiveRecord::Migration[6.0] + def up + Que.migrate!(version: 7) + end + + def down + Que.migrate!(version: 6) + end +end +``` + ## 2.0.0 (2022-08-25) +**Important: Do not upgrade straight to Que 2.** You will need to first update to the latest 1.x version, apply the Que database schema migration, and deploy, before you can safely begin the process of upgrading to Que 2. See the [2.0.0.beta1 changelog entry](#200beta1-2022-03-24) for details. + See beta 2.0.0.beta1, plus: - **Fixed**: diff --git a/Gemfile b/Gemfile index 894108f1..084460f2 100644 --- a/Gemfile +++ b/Gemfile @@ -20,6 +20,7 @@ group :test do gem 'minitest', '~> 5.10.1' gem 'minitest-profile', '0.0.2' gem 'minitest-hooks', '1.4.0' + gem 'minitest-fail-fast', '0.1.0' gem 'm' diff --git a/docs/README.md b/docs/README.md index 31a72b50..e9361fc1 100644 --- a/docs/README.md +++ b/docs/README.md @@ -53,6 +53,7 @@ - [Defining Middleware For Jobs](#defining-middleware-for-jobs) - [Defining Middleware For SQL statements](#defining-middleware-for-sql-statements) - [Vacuuming](#vacuuming) +- [Enqueueing jobs in bulk](#enqueueing-jobs-in-bulk) - [Expired jobs](#expired-jobs) - [Finished jobs](#finished-jobs) @@ -836,6 +837,30 @@ class ManualVacuumJob < CronJob end ``` +## Enqueueing jobs in bulk + +If you need to enqueue a large number of jobs at once, enqueueing each one separately (and running the notify trigger for each) can become a performance bottleneck. To mitigate this, there is a bulk enqueue interface: + +```ruby +Que.bulk_enqueue do + MyJob.enqueue(user_id: 1) + MyJob.enqueue(user_id: 2) + # ... +end +``` + +The jobs are only actually enqueued at the end of the block, at which point they are inserted into the database in one big query. + +Limitations: + +- ActiveJob is not supported +- All jobs must use the same job class +- All jobs must use the same `job_options` (`job_options` must be provided to `.bulk_enqueue` instead of `.enqueue`) +- The `que_attrs` of a job instance returned from `.enqueue` is empty (`{}`) +- The notify trigger is not run by default, so jobs will only be picked up by a worker upon its next poll + +If you still want the notify trigger to run for each job, use `Que.bulk_enqueue(notify: true) { ... }`. + ## Expired jobs Expired jobs hang around in the `que_jobs` table. If necessary, you can get an expired job to run again by clearing the `error_count` and `expired_at` columns, e.g.: @@ -850,8 +875,7 @@ If you prefer to leave finished jobs in the database for a while, to performantl ```sql BEGIN; -ALTER TABLE que_jobs DISABLE TRIGGER que_state_notify; +SET LOCAL que.skip_notify TO true; DELETE FROM que_jobs WHERE finished_at < (select now() - interval '7 days'); -ALTER TABLE que_jobs ENABLE TRIGGER que_state_notify; COMMIT; ``` diff --git a/lib/que.rb b/lib/que.rb index a29dce7e..f54832c8 100644 --- a/lib/que.rb +++ b/lib/que.rb @@ -69,7 +69,7 @@ class << self # Copy some commonly-used methods here, for convenience. def_delegators :pool, :execute, :checkout, :in_transaction? - def_delegators Job, :enqueue, :run_synchronously, :run_synchronously= + def_delegators Job, :enqueue, :bulk_enqueue, :run_synchronously, :run_synchronously= def_delegators Migrations, :db_version, :migrate! # Global configuration logic. diff --git a/lib/que/job.rb b/lib/que/job.rb index 0b2e4a33..6093b162 100644 --- a/lib/que/job.rb +++ b/lib/que/job.rb @@ -27,6 +27,26 @@ class Job RETURNING * } + SQL[:bulk_insert_jobs] = + %{ + WITH args_and_kwargs as ( + SELECT * from json_to_recordset(coalesce($5, '[{args:{},kwargs:{}}]')::json) as x(args jsonb, kwargs jsonb) + ) + INSERT INTO public.que_jobs + (queue, priority, run_at, job_class, args, kwargs, data, job_schema_version) + SELECT + coalesce($1, 'default')::text, + coalesce($2, 100)::smallint, + coalesce($3, now())::timestamptz, + $4::text, + args_and_kwargs.args, + args_and_kwargs.kwargs, + coalesce($6, '{}')::jsonb, + #{Que.job_schema_version} + FROM args_and_kwargs + RETURNING * + } + attr_reader :que_attrs attr_accessor :que_error, :que_resolved @@ -78,30 +98,120 @@ def enqueue(*args) queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue, priority: job_options[:priority] || resolve_que_setting(:priority), run_at: job_options[:run_at] || resolve_que_setting(:run_at), - args: Que.serialize_json(args), - kwargs: Que.serialize_json(kwargs), - data: job_options[:tags] ? Que.serialize_json(tags: job_options[:tags]) : "{}", + args: args, + kwargs: kwargs, + data: job_options[:tags] ? { tags: job_options[:tags] } : {}, job_class: \ job_options[:job_class] || name || raise(Error, "Can't enqueue an anonymous subclass of Que::Job"), } - if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously) - attrs[:args] = Que.deserialize_json(attrs[:args]) - attrs[:kwargs] = Que.deserialize_json(attrs[:kwargs]) - attrs[:data] = Que.deserialize_json(attrs[:data]) + if Thread.current[:que_jobs_to_bulk_insert] + if self.name == 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper' + raise Que::Error, "Que.bulk_enqueue does not support ActiveJob." + end + + raise Que::Error, "When using .bulk_enqueue, job_options must be passed to that method rather than .enqueue" unless job_options == {} + + Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] << attrs + new({}) + elsif attrs[:run_at].nil? && resolve_que_setting(:run_synchronously) + attrs.merge!( + args: Que.deserialize_json(Que.serialize_json(attrs[:args])), + kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])), + data: Que.deserialize_json(Que.serialize_json(attrs[:data])), + ) _run_attrs(attrs) else - values = - Que.execute( - :insert_job, - attrs.values_at(:queue, :priority, :run_at, :job_class, :args, :kwargs, :data), - ).first + attrs.merge!( + args: Que.serialize_json(attrs[:args]), + kwargs: Que.serialize_json(attrs[:kwargs]), + data: Que.serialize_json(attrs[:data]), + ) + values = Que.execute( + :insert_job, + attrs.values_at(:queue, :priority, :run_at, :job_class, :args, :kwargs, :data), + ).first new(values) end end ruby2_keywords(:enqueue) if respond_to?(:ruby2_keywords, true) + def bulk_enqueue(job_options: {}, notify: false) + raise Que::Error, "Can't nest .bulk_enqueue" unless Thread.current[:que_jobs_to_bulk_insert].nil? + Thread.current[:que_jobs_to_bulk_insert] = { jobs_attrs: [], job_options: job_options } + yield + jobs_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] + job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options] + return [] if jobs_attrs.empty? + raise Que::Error, "When using .bulk_enqueue, all jobs enqueued must be of the same job class" unless jobs_attrs.map { |attrs| attrs[:job_class] }.uniq.one? + args_and_kwargs_array = jobs_attrs.map { |attrs| attrs.slice(:args, :kwargs) } + klass = job_options[:job_class] ? Que::Job : Que.constantize(jobs_attrs.first[:job_class]) + klass._bulk_enqueue_insert(args_and_kwargs_array, job_options: job_options, notify: notify) + ensure + Thread.current[:que_jobs_to_bulk_insert] = nil + end + + def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:) + raise 'Unexpected bulk args format' if !args_and_kwargs_array.is_a?(Array) || !args_and_kwargs_array.all? { |a| a.is_a?(Hash) } + + if job_options[:tags] + if job_options[:tags].length > MAXIMUM_TAGS_COUNT + raise Que::Error, "Can't enqueue a job with more than #{MAXIMUM_TAGS_COUNT} tags! (passed #{job_options[:tags].length})" + end + + job_options[:tags].each do |tag| + if tag.length > MAXIMUM_TAG_LENGTH + raise Que::Error, "Can't enqueue a job with a tag longer than 100 characters! (\"#{tag}\")" + end + end + end + + args_and_kwargs_array = args_and_kwargs_array.map do |args_and_kwargs| + args_and_kwargs.merge( + args: args_and_kwargs.fetch(:args, []), + kwargs: args_and_kwargs.fetch(:kwargs, {}), + ) + end + + attrs = { + queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue, + priority: job_options[:priority] || resolve_que_setting(:priority), + run_at: job_options[:run_at] || resolve_que_setting(:run_at), + args_and_kwargs_array: args_and_kwargs_array, + data: job_options[:tags] ? { tags: job_options[:tags] } : {}, + job_class: \ + job_options[:job_class] || name || + raise(Error, "Can't enqueue an anonymous subclass of Que::Job"), + } + + if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously) + args_and_kwargs_array = Que.deserialize_json(Que.serialize_json(attrs.delete(:args_and_kwargs_array))) + args_and_kwargs_array.map do |args_and_kwargs| + _run_attrs( + attrs.merge( + args: args_and_kwargs.fetch(:args), + kwargs: args_and_kwargs.fetch(:kwargs), + ), + ) + end + else + attrs.merge!( + args_and_kwargs_array: Que.serialize_json(attrs[:args_and_kwargs_array]), + data: Que.serialize_json(attrs[:data]), + ) + values_array = + Que.transaction do + Que.execute('SET LOCAL que.skip_notify TO true') unless notify + Que.execute( + :bulk_insert_jobs, + attrs.values_at(:queue, :priority, :run_at, :job_class, :args_and_kwargs_array, :data), + ) + end + values_array.map(&method(:new)) + end + end + def run(*args) # Make sure things behave the same as they would have with a round-trip # to the DB. diff --git a/lib/que/migrations.rb b/lib/que/migrations.rb index 80b775df..884e3396 100644 --- a/lib/que/migrations.rb +++ b/lib/que/migrations.rb @@ -4,7 +4,7 @@ module Que module Migrations # In order to ship a schema change, add the relevant up and down sql files # to the migrations directory, and bump the version here. - CURRENT_VERSION = 6 + CURRENT_VERSION = 7 class << self def migrate!(version:) diff --git a/lib/que/migrations/7/down.sql b/lib/que/migrations/7/down.sql new file mode 100644 index 00000000..be0db2ae --- /dev/null +++ b/lib/que/migrations/7/down.sql @@ -0,0 +1,5 @@ +DROP TRIGGER que_job_notify ON que_jobs; +CREATE TRIGGER que_job_notify + AFTER INSERT ON que_jobs + FOR EACH ROW + EXECUTE PROCEDURE public.que_job_notify(); diff --git a/lib/que/migrations/7/up.sql b/lib/que/migrations/7/up.sql new file mode 100644 index 00000000..a7778a3b --- /dev/null +++ b/lib/que/migrations/7/up.sql @@ -0,0 +1,13 @@ +DROP TRIGGER que_job_notify ON que_jobs; +CREATE TRIGGER que_job_notify + AFTER INSERT ON que_jobs + FOR EACH ROW + WHEN (NOT coalesce(current_setting('que.skip_notify', true), '') = 'true') + EXECUTE PROCEDURE public.que_job_notify(); + +DROP TRIGGER que_state_notify ON que_jobs; +CREATE TRIGGER que_state_notify + AFTER INSERT OR UPDATE OR DELETE ON que_jobs + FOR EACH ROW + WHEN (NOT coalesce(current_setting('que.skip_notify', true), '') = 'true') + EXECUTE PROCEDURE public.que_state_notify(); diff --git a/scripts/test b/scripts/test index acc87155..a3fa8039 100755 --- a/scripts/test +++ b/scripts/test @@ -3,3 +3,4 @@ set -Eeuo pipefail bundle exec rake spec "$@" +USE_RAILS=true bundle exec rake spec "$@" diff --git a/spec/que/active_job/extensions_spec.rb b/spec/que/active_job/extensions_spec.rb index 85717751..325fe203 100644 --- a/spec/que/active_job/extensions_spec.rb +++ b/spec/que/active_job/extensions_spec.rb @@ -16,6 +16,7 @@ def perform(*args, **kwargs) after do Object.send :remove_const, :TestJobClass $args = nil + $kwargs = nil end def execute(&perform_later_block) @@ -171,5 +172,29 @@ def perform(*args) assert_equal error, notified_error end end + + describe 'with bulk_enqueue' do + describe 'ActiveJobClass.perform_later' do + it "is not supported" do + assert_raises_with_message( + Que::Error, + /Que\.bulk_enqueue does not support ActiveJob\./ + ) do + Que.bulk_enqueue { TestJobClass.perform_later(1, 2) } + end + end + end + + describe 'active_job#enqueue' do + it "is not supported" do + assert_raises_with_message( + Que::Error, + /Que\.bulk_enqueue does not support ActiveJob\./ + ) do + Que.bulk_enqueue { TestJobClass.new.enqueue } + end + end + end + end end end diff --git a/spec/que/job.bulk_enqueue_spec.rb b/spec/que/job.bulk_enqueue_spec.rb new file mode 100644 index 00000000..4a1b85b8 --- /dev/null +++ b/spec/que/job.bulk_enqueue_spec.rb @@ -0,0 +1,711 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe Que::Job, '.bulk_enqueue' do + def assert_enqueue( + expected_queue: 'default', + expected_priority: 100, + expected_run_at: Time.now, + expected_job_class: Que::Job, + expected_result_class: nil, + expected_args:, + expected_kwargs:, + expected_tags: nil, + expected_count:, + &enqueue_block + ) + + assert_equal 0, jobs_dataset.count + + results = enqueue_block.call + + assert_equal expected_count, jobs_dataset.count + + results.each_with_index do |result, i| + assert_kind_of Que::Job, result + assert_instance_of (expected_result_class || expected_job_class), result + + assert_equal expected_priority, result.que_attrs[:priority] + assert_equal expected_args[i], result.que_attrs[:args] + assert_equal expected_kwargs[i], result.que_attrs[:kwargs] + + if expected_tags.nil? + assert_equal({}, result.que_attrs[:data]) + else + assert_equal expected_tags, result.que_attrs[:data][:tags] + end + end + + jobs_dataset.order(:id).each_with_index do |job, i| + assert_equal expected_queue, job[:queue] + assert_equal expected_priority, job[:priority] + assert_in_delta job[:run_at], expected_run_at, QueSpec::TIME_SKEW + assert_equal expected_job_class.to_s, job[:job_class] + assert_equal expected_args[i], job[:args] + assert_equal expected_kwargs[i], job[:kwargs] + end + + jobs_dataset.delete + end + + it "should be able to queue zero jobs without error" do + assert_enqueue( + expected_count: 0, + expected_args: [], + expected_kwargs: [], + ) do + Que.bulk_enqueue {} + end + end + + it "should be able to queue multiple jobs" do + assert_enqueue( + expected_count: 3, + expected_args: Array.new(3) { [] }, + expected_kwargs: Array.new(3) { {} }, + ) do + Que.bulk_enqueue do + 3.times { Que.enqueue } + end + end + end + + it "should be able to queue multiple jobs with arguments" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: '3' }, { five: '6' }], + ) do + Que.bulk_enqueue do + Que.enqueue(1, two: '3') + Que.enqueue(4, five: '6') + end + end + end + + it "should be able to queue jobs with complex arguments" do + assert_enqueue( + expected_count: 2, + expected_args: [[1, 'two'], ['3', 4]], + expected_kwargs: [ + { string: 'string', integer: 5, array: [1, 'two', { three: 3 }] }, + { hash: { one: 1, two: 'two', three: [3] } }, + ], + ) do + Que.bulk_enqueue do + Que.enqueue(1, 'two', string: 'string', integer: 5, array: [1, 'two', { three: 3 }]) + Que.enqueue('3', 4, hash: { one: 1, two: 'two', three: [3] }) + end + end + end + + describe "when bulk_enqueue args/kwargs are empty/omitted" do + it "can enqueue jobs with empty args" do + assert_enqueue( + expected_count: 2, + expected_args: [[], []], + expected_kwargs: [{ one: '2' }, {three: '4' }], + ) do + Que.bulk_enqueue do + Que.enqueue(one: '2') + Que.enqueue(three: '4') + end + end + end + + it "can enqueue jobs where args is omitted" do + assert_enqueue( + expected_count: 2, + expected_args: [[], []], + expected_kwargs: [{ one: '2' }, { three: '4' }], + ) do + Que.bulk_enqueue do + Que.enqueue(one: '2') + Que.enqueue(three: '4') + end + end + end + + it "can enqueue jobs where kwargs is omitted" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [2]], + expected_kwargs: [{}, {}], + ) do + Que.bulk_enqueue do + Que.enqueue(1) + Que.enqueue(2) + end + end + end + + it "can enqueue jobs where args and kwargs is omitted" do + assert_enqueue( + expected_count: 2, + expected_args: [[], []], + expected_kwargs: [{}, {}], + ) do + Que.bulk_enqueue do + Que.enqueue + Que.enqueue + end + end + end + + it "can enqueue jobs with empty args" do + assert_enqueue( + expected_count: 2, + expected_args: [[], []], + expected_kwargs: [{ one: '2' }, { three: '4' }], + ) do + Que.bulk_enqueue do + Que.enqueue(one: '2') + Que.enqueue(three: '4') + end + end + end + + it "can enqueue jobs with empty kwargs" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [2]], + expected_kwargs: [{}, {}] + ) do + Que.bulk_enqueue do + Que.enqueue(1) + Que.enqueue(2) + end + end + end + end + + it "should be able to handle a namespaced job class" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: '3' }, { five: '6' }], + expected_job_class: NamespacedJobNamespace::NamespacedJob, + ) do + Que.bulk_enqueue do + NamespacedJobNamespace::NamespacedJob.enqueue(1, two: '3') + NamespacedJobNamespace::NamespacedJob.enqueue(4, five: '6') + end + end + end + + it "should error appropriately on an anonymous job subclass" do + klass = Class.new(Que::Job) + error = assert_raises(Que::Error) { Que.bulk_enqueue { klass.enqueue } } + assert_equal \ + "Can't enqueue an anonymous subclass of Que::Job", + error.message + end + + it "should be able to queue jobs with specific queue names" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: '3' }, { five: 'six' }], + expected_queue: 'special_queue_name', + ) do + Que.bulk_enqueue(job_options: { queue: 'special_queue_name' }) do + Que.enqueue(1, two: '3') + Que.enqueue(4, five: 'six') + end + end + end + + + it "should be able to queue jobs with a specific time to run" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [2]], + expected_kwargs: [{}, {}], + expected_run_at: Time.now + 60, + ) do + Que.bulk_enqueue(job_options: { run_at: Time.now + 60 }) do + Que.enqueue(1) + Que.enqueue(2) + end + end + end + + it "should be able to enqueue jobs with a specific priority" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [2]], + expected_kwargs: [{}, {}], + expected_priority: 4 + ) do + Que.bulk_enqueue(job_options: { priority: 4 }) do + Que.enqueue(1) + Que.enqueue(2) + end + end + end + + it "should be able to queue jobs with options in addition to args and kwargs" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_run_at: Time.now + 60, + expected_priority: 4, + ) do + Que.bulk_enqueue(job_options: { run_at: Time.now + 60, priority: 4 }) do + Que.enqueue(1, two: "3") + Que.enqueue(4, five: "six") + end + end + end + + it "should no longer fall back to using job options specified at the top level if not specified in job_options" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [ + { two: "3", run_at: Time.utc(2050).to_s, priority: 10 }, + { five: "six" } + ], + expected_run_at: Time.now, + expected_priority: 15, + ) do + Que.bulk_enqueue(job_options: { priority: 15 }) do + Que.enqueue(1, two: "3", run_at: Time.utc(2050), priority: 10) + Que.enqueue(4, five: "six") + end + end + end + + it "should raise when job_options are passed to .enqueue rather than .bulk_enqueue" do + assert_raises_with_message(Que::Error, "When using .bulk_enqueue, job_options must be passed to that method rather than .enqueue") do + Que.bulk_enqueue do + Que.enqueue(1, two: "3", job_options: { priority: 15 }) + end + end + end + + describe "when enqueuing jobs with tags" do + it "should be able to specify tags on a case-by-case basis" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_tags: ["tag_1", "tag_2"], + ) do + Que.bulk_enqueue(job_options: { tags: ["tag_1", "tag_2"] }) do + Que.enqueue(1, two: "3") + Que.enqueue(4, five: "six") + end + end + end + + it "should no longer fall back to using tags specified at the top level if not specified in job_options" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [ + { two: "3", tags: ["tag_1", "tag_2"] }, + { five: "six" }, + ], + expected_tags: nil, + ) do + Que.bulk_enqueue do + Que.enqueue(1, two: "3", tags: ["tag_1", "tag_2"]) + Que.enqueue(4, five: "six") + end + end + end + + it "should raise an error if passing too many tags" do + error = + assert_raises(Que::Error) do + Que.bulk_enqueue(job_options: { tags: %w[a b c d e f] }) do + Que::Job.enqueue(1, two: "3") + Que::Job.enqueue(4, five: "six") + end + end + + assert_equal \ + "Can't enqueue a job with more than 5 tags! (passed 6)", + error.message + end + + it "should raise an error if any of the tags are too long" do + error = + assert_raises(Que::Error) do + Que.bulk_enqueue(job_options: { tags: ["a" * 101] }) do + Que::Job.enqueue(1, two: "3") + Que::Job.enqueue(4, five: "six") + end + end + + assert_equal \ + "Can't enqueue a job with a tag longer than 100 characters! (\"#{"a" * 101}\")", + error.message + end + end + + it "should respect a job class defined as a string" do + class MyJobClass < Que::Job; end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_job_class: MyJobClass, + expected_result_class: Que::Job, + ) do + Que.bulk_enqueue(job_options: { job_class: 'MyJobClass' }) do + Que.enqueue(1, two: "3") + Que.enqueue(4, five: "six") + end + end + end + + describe "when there's a hierarchy of job classes" do + class PriorityDefaultJob < Que::Job + self.priority = 3 + end + + class PrioritySubclassJob < PriorityDefaultJob + end + + class RunAtDefaultJob < Que::Job + self.run_at = -> { Time.now + 30 } + end + + class RunAtSubclassJob < RunAtDefaultJob + end + + class QueueDefaultJob < Que::Job + self.queue = :queue_1 + end + + class QueueSubclassJob < QueueDefaultJob + end + + describe "priority" do + it "should respect a default priority in a job class" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_priority: 3, + expected_job_class: PriorityDefaultJob, + ) do + Que.bulk_enqueue do + PriorityDefaultJob.enqueue(1, two: "3") + PriorityDefaultJob.enqueue(4, five: "six") + end + end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_priority: 4, + expected_job_class: PriorityDefaultJob, + ) do + Que.bulk_enqueue(job_options: { priority: 4 }) do + PriorityDefaultJob.enqueue(1, two: "3") + PriorityDefaultJob.enqueue(4, five: "six") + end + end + end + + it "should respect an inherited priority in a job class" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_priority: 3, + expected_job_class: PrioritySubclassJob + ) do + Que.bulk_enqueue do + PrioritySubclassJob.enqueue(1, two: "3") + PrioritySubclassJob.enqueue(4, five: "six") + end + end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_priority: 4, + expected_job_class: PrioritySubclassJob + ) do + Que.bulk_enqueue(job_options: { priority: 4 }) do + PrioritySubclassJob.enqueue(1, two: "3") + PrioritySubclassJob.enqueue(4, five: "six") + end + end + end + + it "should respect an overridden priority in a job class" do + begin + PrioritySubclassJob.priority = 60 + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_priority: 60, + expected_job_class: PrioritySubclassJob + ) do + Que.bulk_enqueue do + PrioritySubclassJob.enqueue(1, two: "3") + PrioritySubclassJob.enqueue(4, five: "six") + end + end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_priority: 4, + expected_job_class: PrioritySubclassJob + ) do + Que.bulk_enqueue(job_options: { priority: 4 }) do + PrioritySubclassJob.enqueue(1, two: "3") + PrioritySubclassJob.enqueue(4, five: "six") + end + end + ensure + PrioritySubclassJob.remove_instance_variable(:@priority) + end + end + end + + describe "run_at" do + it "should respect a default run_at in a job class" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_run_at: Time.now + 30, + expected_job_class: RunAtDefaultJob + ) do + Que.bulk_enqueue do + RunAtDefaultJob.enqueue(1, two: "3") + RunAtDefaultJob.enqueue(4, five: "six") + end + end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_run_at: Time.now + 60, + expected_job_class: RunAtDefaultJob + ) do + Que.bulk_enqueue(job_options: { run_at: Time.now + 60 }) do + RunAtDefaultJob.enqueue(1, two: "3") + RunAtDefaultJob.enqueue(4, five: "six") + end + end + end + + it "should respect an inherited run_at in a job class" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_run_at: Time.now + 30, + expected_job_class: RunAtDefaultJob + ) do + Que.bulk_enqueue do + RunAtDefaultJob.enqueue(1, two: "3") + RunAtDefaultJob.enqueue(4, five: "six") + end + end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_run_at: Time.now + 60, + expected_job_class: RunAtDefaultJob + ) do + Que.bulk_enqueue(job_options: { run_at: Time.now + 60 }) do + RunAtDefaultJob.enqueue(1, two: "3") + RunAtDefaultJob.enqueue(4, five: "six") + end + end + end + + it "should respect an overridden run_at in a job class" do + begin + RunAtSubclassJob.run_at = -> {Time.now + 90} + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_run_at: Time.now + 90, + expected_job_class: RunAtSubclassJob + ) do + Que.bulk_enqueue do + RunAtSubclassJob.enqueue(1, two: "3") + RunAtSubclassJob.enqueue(4, five: "six") + end + end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_run_at: Time.now + 60, + expected_job_class: RunAtSubclassJob + ) do + Que.bulk_enqueue(job_options: { run_at: Time.now + 60 }) do + RunAtSubclassJob.enqueue(1, two: "3") + RunAtSubclassJob.enqueue(4, five: "six") + end + end + ensure + RunAtSubclassJob.remove_instance_variable(:@run_at) + end + end + end + + describe "queue" do + it "should respect a default queue in a job class" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_queue: 'queue_1', + expected_job_class: QueueDefaultJob + ) do + Que.bulk_enqueue do + QueueDefaultJob.enqueue(1, two: "3") + QueueDefaultJob.enqueue(4, five: "six") + end + end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_queue: 'queue_3', + expected_job_class: QueueDefaultJob + ) do + Que.bulk_enqueue(job_options: { queue: 'queue_3' }) do + QueueDefaultJob.enqueue(1, two: "3") + QueueDefaultJob.enqueue(4, five: "six") + end + end + end + + it "should respect an inherited queue in a job class" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_queue: 'queue_1', + expected_job_class: QueueSubclassJob + ) do + Que.bulk_enqueue do + QueueSubclassJob.enqueue(1, two: "3") + QueueSubclassJob.enqueue(4, five: "six") + end + end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_queue: 'queue_3', + expected_job_class: QueueSubclassJob + ) do + Que.bulk_enqueue(job_options: { queue: 'queue_3' }) do + QueueSubclassJob.enqueue(1, two: "3") + QueueSubclassJob.enqueue(4, five: "six") + end + end + end + + it "should respect an overridden queue in a job class" do + begin + QueueSubclassJob.queue = :queue_2 + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_queue: 'queue_2', + expected_job_class: QueueSubclassJob + ) do + Que.bulk_enqueue do + QueueSubclassJob.enqueue(1, two: "3") + QueueSubclassJob.enqueue(4, five: "six") + end + end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_queue: 'queue_3', + expected_job_class: QueueSubclassJob + ) do + Que.bulk_enqueue(job_options: { queue: 'queue_3' }) do + QueueSubclassJob.enqueue(1, two: "3") + QueueSubclassJob.enqueue(4, five: "six") + end + end + ensure + QueueSubclassJob.remove_instance_variable(:@queue) + end + end + end + end + + describe "when run in synchronous mode" do + before do + Que::Job.run_synchronously = true + + Object.send(:remove_const, "SynchronousJob") if Object.const_defined?("SynchronousJob") + + class SynchronousJob < Que::Job + @@ran_args_and_kwargs = [] + + def self.ran_args_and_kwargs + @@ran_args_and_kwargs + end + + def run(*args, **kwargs) + @@ran_args_and_kwargs << [args, kwargs] + end + end + end + + after do + Que::Job.remove_instance_variable(:@run_synchronously) + Object.send(:remove_const, "SynchronousJob") + end + + it "runs each job synchronously at the end of the block with the correct args & kwargs" do + Que.bulk_enqueue do + SynchronousJob.enqueue('a1', a2: 'a3') + SynchronousJob.enqueue('b1', b2: 'b3') + SynchronousJob.enqueue('c1') + SynchronousJob.enqueue(d1: 'd2') + SynchronousJob.enqueue + assert_equal [], SynchronousJob.ran_args_and_kwargs + end + assert_equal( + [ + [['a1'], { a2: 'a3' }], + [['b1'], { b2: 'b3' }], + [['c1'], {}], + [[], { d1: 'd2' }], + [[], {}], + ], + SynchronousJob.ran_args_and_kwargs, + ) + end + end +end diff --git a/spec/que/migrations.state_trigger_spec.rb b/spec/que/migrations.state_trigger_spec.rb index 66e54521..d7be9269 100644 --- a/spec/que/migrations.state_trigger_spec.rb +++ b/spec/que/migrations.state_trigger_spec.rb @@ -232,4 +232,18 @@ def assert_notification( end end end + + it "should not notify when using bulk_enqueue" do + Que.bulk_enqueue do + Que.enqueue(job_class: "MyJobClass") + end + assert_nil get_message(timeout: 0.1, expect_nothing: true) + end + + it "should notify when using bulk_enqueue with 'notify: true'" do + Que.bulk_enqueue(notify: true) do + Que.enqueue(job_class: "MyJobClass") + end + assert get_message + end end diff --git a/spec/que/migrations.work_job_trigger_spec.rb b/spec/que/migrations.work_job_trigger_spec.rb index ac89c25b..a16a654b 100644 --- a/spec/que/migrations.work_job_trigger_spec.rb +++ b/spec/que/migrations.work_job_trigger_spec.rb @@ -130,4 +130,29 @@ def listen_connection assert_nil conn.wait_for_notify(0.01) end end + + it "should not notify a locker when using bulk_enqueue" do + listen_connection do |conn| + DB[:que_lockers].insert(locker_attrs) + conn.async_exec "LISTEN que_listener_1" + Que.bulk_enqueue do + Que::Job.enqueue('job1_arg1', job1_kwarg1: 'x') + Que::Job.enqueue('job2_arg1', job2_kwarg1: 'x') + end + assert_nil conn.wait_for_notify(0.01) + end + end + + it "should notify a locker when using bulk_enqueue with 'notify: true'" do + listen_connection do |conn| + DB[:que_lockers].insert(locker_attrs) + conn.async_exec "LISTEN que_listener_1" + Que.bulk_enqueue(notify: true) do + Que::Job.enqueue('job1_arg1', job1_kwarg1: 'x') + Que::Job.enqueue('job2_arg1', job2_kwarg1: 'x') + end + 2.times { conn.wait_for_notify } + assert_nil conn.wait_for_notify(0.01) + end + end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index dc775c82..2f19f794 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -211,6 +211,11 @@ def sleep_until?(timeout: SLEEP_UNTIL_TIMEOUT) end end + def assert_raises_with_message(expected_error_class, expected_message, &block) + e = assert_raises(expected_error_class, &block) + assert_match(expected_message, e.message, "#{mu_pp(expected_error_class)} error was raised but without the expected message") + end + class << self # More easily hammer a certain spec. def hit(*args, &block)