Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3485a08
Add Que::Job#bulk_enqueue method for enqueuing jobs in bulk
oeoeaio Feb 22, 2022
aedd27f
Bulk enqueue: Finish mirroring bulk_enqueue specs from enqueue specs
ZimbiX Jul 19, 2022
d2f8a2a
Bulk enqueue: Support omitting args/kwargs in bulk_enqueue
ZimbiX Jul 19, 2022
fde1ee6
Fix failing bulk_enqueue test when a particular test suite ordering i…
ZimbiX Jul 20, 2022
6775950
Bulk enqueue: Disable que_job_notify trigger in bulk_enqueue for perf…
ZimbiX Jul 22, 2022
acb730e
que_job_notify trigger: Switch SQL from 'FUNCTION' back to deprecated…
ZimbiX Jul 22, 2022
24fdade
Add gem: minitest-fail-fast
ZimbiX Jul 23, 2022
b7edc93
Bulk enqueue: Rework interface to use block form instead with normal …
ZimbiX Jul 23, 2022
b5baa88
Bulk enqueue: Namespace thread-local variable
ZimbiX Jul 23, 2022
2dd226b
Bulk enqueue: Add option to provide control over whether notify happens
ZimbiX Jul 23, 2022
3577fe5
Bulk enqueue: Remove unnecessary line
ZimbiX Jul 23, 2022
5fa6d72
Bulk enqueue: Document usage
ZimbiX Jul 23, 2022
556a8b4
Docs: Switch mass job deletion instructions to using `que.skip_notify`
ZimbiX Jul 23, 2022
4f796aa
Bulk enqueue: Allow not enqueueing any jobs within a .bulk_enqueue block
ZimbiX Aug 23, 2022
4e806ed
Bulk enqueue: Skip notify in que_state_notify trigger as well
ZimbiX Aug 23, 2022
8432e80
Changelog: Add bulk enqueue and que_state_notify deprecation to unrel…
ZimbiX Aug 23, 2022
065a6e8
Bulk enqueue: Add test for raising when job_options are passed to .en…
ZimbiX Aug 23, 2022
906b415
Bulk enqueue: Fix executing jobs in synchronous mode with the correct…
ZimbiX Aug 23, 2022
1a01099
Bulk enqueue: Reorganise serialisation & deserialisation to avoid unn…
ZimbiX Aug 23, 2022
13244da
Changelog: Note for 2.0.0 not to upgrade straight to Que 2
ZimbiX Aug 25, 2022
c1a84cf
Changelog: Add database migration instructions to unreleased section
ZimbiX Aug 25, 2022
4766433
Run the ActiveRecord specs as well when running scripts/test
jpaulgs Aug 24, 2022
3034fb8
Be explicit that Que.bulk_enqueue does not support ActiveJob.
jpaulgs Aug 24, 2022
cccd6f2
Bulk enqueue: Remove incorrect suggestion for ActiveJob use
ZimbiX Aug 25, 2022
297c6cc
Bulk enqueue: Note in docs that ActiveJob is not supported
ZimbiX Aug 25, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

<!-- MarkdownTOC autolink=true -->

- [Unreleased](#unreleased)
- [2.0.0 \(2022-08-25\)](#200-2022-08-25)
- [1.4.1 \(2022-07-24\)](#141-2022-07-24)
- [2.0.0.beta1 \(2022-03-24\)](#200beta1-2022-03-24)
Expand Down Expand Up @@ -54,8 +55,31 @@

<!-- /MarkdownTOC -->

## Unreleased

- **Added**:
+ Added bulk enqueue interface for performance when enqueuing a large number of jobs at once - [docs](docs#enqueueing-jobs-in-bulk).
- **Deprecated**:
+ Deprecated `que_state_notify` trigger (`que_state` notification channel / `job_change` notification message). See [#372](https://github.com/que-rb/que/issues/372). We plan to remove this in a future release - let us know on the issue if you desire otherwise.

This release contains a database migration. You will need to migrate Que to the latest database schema version (7). For example, on ActiveRecord and Rails 6:

```ruby
class UpdateQueTablesToVersion6 < ActiveRecord::Migration[6.0]
def up
Que.migrate!(version: 7)
end

def down
Que.migrate!(version: 6)
end
end
```

## 2.0.0 (2022-08-25)

**Important: Do not upgrade straight to Que 2.** You will need to first update to the latest 1.x version, apply the Que database schema migration, and deploy, before you can safely begin the process of upgrading to Que 2. See the [2.0.0.beta1 changelog entry](#200beta1-2022-03-24) for details.

See beta 2.0.0.beta1, plus:

- **Fixed**:
Expand Down
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ group :test do
gem 'minitest', '~> 5.10.1'
gem 'minitest-profile', '0.0.2'
gem 'minitest-hooks', '1.4.0'
gem 'minitest-fail-fast', '0.1.0'

gem 'm'

Expand Down
28 changes: 26 additions & 2 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
- [Defining Middleware For Jobs](#defining-middleware-for-jobs)
- [Defining Middleware For SQL statements](#defining-middleware-for-sql-statements)
- [Vacuuming](#vacuuming)
- [Enqueueing jobs in bulk](#enqueueing-jobs-in-bulk)
- [Expired jobs](#expired-jobs)
- [Finished jobs](#finished-jobs)

Expand Down Expand Up @@ -836,6 +837,30 @@ class ManualVacuumJob < CronJob
end
```

## Enqueueing jobs in bulk

If you need to enqueue a large number of jobs at once, enqueueing each one separately (and running the notify trigger for each) can become a performance bottleneck. To mitigate this, there is a bulk enqueue interface:

```ruby
Que.bulk_enqueue do
MyJob.enqueue(user_id: 1)
MyJob.enqueue(user_id: 2)
# ...
end
```

The jobs are only actually enqueued at the end of the block, at which point they are inserted into the database in one big query.

Limitations:

- ActiveJob is not supported
- All jobs must use the same job class
- All jobs must use the same `job_options` (`job_options` must be provided to `.bulk_enqueue` instead of `.enqueue`)
- The `que_attrs` of a job instance returned from `.enqueue` is empty (`{}`)
- The notify trigger is not run by default, so jobs will only be picked up by a worker upon its next poll

If you still want the notify trigger to run for each job, use `Que.bulk_enqueue(notify: true) { ... }`.

## Expired jobs

Expired jobs hang around in the `que_jobs` table. If necessary, you can get an expired job to run again by clearing the `error_count` and `expired_at` columns, e.g.:
Expand All @@ -850,8 +875,7 @@ If you prefer to leave finished jobs in the database for a while, to performantl

```sql
BEGIN;
ALTER TABLE que_jobs DISABLE TRIGGER que_state_notify;
SET LOCAL que.skip_notify TO true;
DELETE FROM que_jobs WHERE finished_at < (select now() - interval '7 days');
ALTER TABLE que_jobs ENABLE TRIGGER que_state_notify;
COMMIT;
```
2 changes: 1 addition & 1 deletion lib/que.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class << self

# Copy some commonly-used methods here, for convenience.
def_delegators :pool, :execute, :checkout, :in_transaction?
def_delegators Job, :enqueue, :run_synchronously, :run_synchronously=
def_delegators Job, :enqueue, :bulk_enqueue, :run_synchronously, :run_synchronously=
def_delegators Migrations, :db_version, :migrate!

# Global configuration logic.
Expand Down
134 changes: 122 additions & 12 deletions lib/que/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,26 @@ class Job
RETURNING *
}

SQL[:bulk_insert_jobs] =
%{
WITH args_and_kwargs as (
SELECT * from json_to_recordset(coalesce($5, '[{args:{},kwargs:{}}]')::json) as x(args jsonb, kwargs jsonb)
)
INSERT INTO public.que_jobs
(queue, priority, run_at, job_class, args, kwargs, data, job_schema_version)
SELECT
coalesce($1, 'default')::text,
coalesce($2, 100)::smallint,
coalesce($3, now())::timestamptz,
$4::text,
args_and_kwargs.args,
args_and_kwargs.kwargs,
coalesce($6, '{}')::jsonb,
#{Que.job_schema_version}
FROM args_and_kwargs
RETURNING *
}

attr_reader :que_attrs
attr_accessor :que_error, :que_resolved

Expand Down Expand Up @@ -78,30 +98,120 @@ def enqueue(*args)
queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue,
priority: job_options[:priority] || resolve_que_setting(:priority),
run_at: job_options[:run_at] || resolve_que_setting(:run_at),
args: Que.serialize_json(args),
kwargs: Que.serialize_json(kwargs),
data: job_options[:tags] ? Que.serialize_json(tags: job_options[:tags]) : "{}",
args: args,
kwargs: kwargs,
data: job_options[:tags] ? { tags: job_options[:tags] } : {},
job_class: \
job_options[:job_class] || name ||
raise(Error, "Can't enqueue an anonymous subclass of Que::Job"),
}

if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
attrs[:args] = Que.deserialize_json(attrs[:args])
attrs[:kwargs] = Que.deserialize_json(attrs[:kwargs])
attrs[:data] = Que.deserialize_json(attrs[:data])
if Thread.current[:que_jobs_to_bulk_insert]
if self.name == 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'
raise Que::Error, "Que.bulk_enqueue does not support ActiveJob."
end

raise Que::Error, "When using .bulk_enqueue, job_options must be passed to that method rather than .enqueue" unless job_options == {}

Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] << attrs
new({})
elsif attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
attrs.merge!(
args: Que.deserialize_json(Que.serialize_json(attrs[:args])),
kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])),
data: Que.deserialize_json(Que.serialize_json(attrs[:data])),
)
_run_attrs(attrs)
else
values =
Que.execute(
:insert_job,
attrs.values_at(:queue, :priority, :run_at, :job_class, :args, :kwargs, :data),
).first
attrs.merge!(
args: Que.serialize_json(attrs[:args]),
kwargs: Que.serialize_json(attrs[:kwargs]),
data: Que.serialize_json(attrs[:data]),
)
values = Que.execute(
:insert_job,
attrs.values_at(:queue, :priority, :run_at, :job_class, :args, :kwargs, :data),
).first
new(values)
end
end
ruby2_keywords(:enqueue) if respond_to?(:ruby2_keywords, true)

def bulk_enqueue(job_options: {}, notify: false)
raise Que::Error, "Can't nest .bulk_enqueue" unless Thread.current[:que_jobs_to_bulk_insert].nil?
Thread.current[:que_jobs_to_bulk_insert] = { jobs_attrs: [], job_options: job_options }
yield
jobs_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs]
job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options]
return [] if jobs_attrs.empty?
raise Que::Error, "When using .bulk_enqueue, all jobs enqueued must be of the same job class" unless jobs_attrs.map { |attrs| attrs[:job_class] }.uniq.one?
args_and_kwargs_array = jobs_attrs.map { |attrs| attrs.slice(:args, :kwargs) }
klass = job_options[:job_class] ? Que::Job : Que.constantize(jobs_attrs.first[:job_class])
klass._bulk_enqueue_insert(args_and_kwargs_array, job_options: job_options, notify: notify)
ensure
Thread.current[:que_jobs_to_bulk_insert] = nil
end

def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:)
raise 'Unexpected bulk args format' if !args_and_kwargs_array.is_a?(Array) || !args_and_kwargs_array.all? { |a| a.is_a?(Hash) }

if job_options[:tags]
if job_options[:tags].length > MAXIMUM_TAGS_COUNT
raise Que::Error, "Can't enqueue a job with more than #{MAXIMUM_TAGS_COUNT} tags! (passed #{job_options[:tags].length})"
end

job_options[:tags].each do |tag|
if tag.length > MAXIMUM_TAG_LENGTH
raise Que::Error, "Can't enqueue a job with a tag longer than 100 characters! (\"#{tag}\")"
end
end
end

args_and_kwargs_array = args_and_kwargs_array.map do |args_and_kwargs|
args_and_kwargs.merge(
args: args_and_kwargs.fetch(:args, []),
kwargs: args_and_kwargs.fetch(:kwargs, {}),
)
end

attrs = {
queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue,
priority: job_options[:priority] || resolve_que_setting(:priority),
run_at: job_options[:run_at] || resolve_que_setting(:run_at),
args_and_kwargs_array: args_and_kwargs_array,
data: job_options[:tags] ? { tags: job_options[:tags] } : {},
job_class: \
job_options[:job_class] || name ||
raise(Error, "Can't enqueue an anonymous subclass of Que::Job"),
}

if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
args_and_kwargs_array = Que.deserialize_json(Que.serialize_json(attrs.delete(:args_and_kwargs_array)))
args_and_kwargs_array.map do |args_and_kwargs|
_run_attrs(
attrs.merge(
args: args_and_kwargs.fetch(:args),
kwargs: args_and_kwargs.fetch(:kwargs),
),
)
end
else
attrs.merge!(
args_and_kwargs_array: Que.serialize_json(attrs[:args_and_kwargs_array]),
data: Que.serialize_json(attrs[:data]),
)
values_array =
Que.transaction do
Que.execute('SET LOCAL que.skip_notify TO true') unless notify
Que.execute(
:bulk_insert_jobs,
attrs.values_at(:queue, :priority, :run_at, :job_class, :args_and_kwargs_array, :data),
)
end
values_array.map(&method(:new))
end
end

def run(*args)
# Make sure things behave the same as they would have with a round-trip
# to the DB.
Expand Down
2 changes: 1 addition & 1 deletion lib/que/migrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Que
module Migrations
# In order to ship a schema change, add the relevant up and down sql files
# to the migrations directory, and bump the version here.
CURRENT_VERSION = 6
CURRENT_VERSION = 7

class << self
def migrate!(version:)
Expand Down
5 changes: 5 additions & 0 deletions lib/que/migrations/7/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DROP TRIGGER que_job_notify ON que_jobs;
CREATE TRIGGER que_job_notify
AFTER INSERT ON que_jobs
FOR EACH ROW
EXECUTE PROCEDURE public.que_job_notify();
13 changes: 13 additions & 0 deletions lib/que/migrations/7/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
DROP TRIGGER que_job_notify ON que_jobs;
CREATE TRIGGER que_job_notify
AFTER INSERT ON que_jobs
FOR EACH ROW
WHEN (NOT coalesce(current_setting('que.skip_notify', true), '') = 'true')
EXECUTE PROCEDURE public.que_job_notify();

DROP TRIGGER que_state_notify ON que_jobs;
CREATE TRIGGER que_state_notify
AFTER INSERT OR UPDATE OR DELETE ON que_jobs
FOR EACH ROW
WHEN (NOT coalesce(current_setting('que.skip_notify', true), '') = 'true')
EXECUTE PROCEDURE public.que_state_notify();
1 change: 1 addition & 0 deletions scripts/test
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
set -Eeuo pipefail

bundle exec rake spec "$@"
USE_RAILS=true bundle exec rake spec "$@"
25 changes: 25 additions & 0 deletions spec/que/active_job/extensions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def perform(*args, **kwargs)
after do
Object.send :remove_const, :TestJobClass
$args = nil
$kwargs = nil
end

def execute(&perform_later_block)
Expand Down Expand Up @@ -171,5 +172,29 @@ def perform(*args)
assert_equal error, notified_error
end
end

describe 'with bulk_enqueue' do
describe 'ActiveJobClass.perform_later' do
it "is not supported" do
assert_raises_with_message(
Que::Error,
/Que\.bulk_enqueue does not support ActiveJob\./
) do
Que.bulk_enqueue { TestJobClass.perform_later(1, 2) }
end
end
end

describe 'active_job#enqueue' do
it "is not supported" do
assert_raises_with_message(
Que::Error,
/Que\.bulk_enqueue does not support ActiveJob\./
) do
Que.bulk_enqueue { TestJobClass.new.enqueue }
end
end
end
end
end
end
Loading