diff --git a/CHANGELOG.md b/CHANGELOG.md index 9931a74..e7739db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.2.0] - 2024-04-27 + +### Added + +- Implement #insert_many for batch job insertion. [PR #5](https://github.com/riverqueue/riverqueue-ruby/pull/5). + ## [0.1.0] - 2024-04-25 ### Added diff --git a/Makefile b/Makefile index b6ff2cc..cce9802 100644 --- a/Makefile +++ b/Makefile @@ -20,5 +20,8 @@ standardrb: steep: bundle exec steep check +.PHONY: test +test: spec + .PHONY: type-check type-check: steep diff --git a/docs/README.md b/docs/README.md index eb564c6..1dfab22 100644 --- a/docs/README.md +++ b/docs/README.md @@ -34,6 +34,7 @@ class SortArgs end insert_res = client.insert(SimpleArgs.new(strings: ["whale", "tiger", "bear"])) +insert_res.job # inserted job row ``` Job args should: @@ -69,6 +70,26 @@ insert_res = client.insert(River::JobArgsHash.new("hash_kind", { })) ``` +### Bulk inserting jobs + +Use `#insert_many` to bulk insert jobs as a single operation for improved efficiency: + +```ruby +num_inserted = client.insert_many([ + SimpleArgs.new(job_num: 1), + SimpleArgs.new(job_num: 2) +]) +``` + +Or with `InsertManyParams`, which may include insertion options: + +```ruby +num_inserted = client.insert_many([ + River::InsertManyParams.new(SimpleArgs.new(job_num: 1), insert_opts: InsertOpts.new(max_attempts: 5)), + River::InsertManyParams.new(SimpleArgs.new(job_num: 2), insert_opts: InsertOpts.new(queue: "high_priority")) +]) +``` + ## Drivers ### ActiveRecord diff --git a/docs/development.md b/docs/development.md index c78b0de..a0eedb3 100644 --- a/docs/development.md +++ b/docs/development.md @@ -44,6 +44,8 @@ $ open coverage/index.html ## Publish gems +Update `CHANGELOG.md` to include the new version and open a pull request with the changes. + ```shell git checkout master && git pull --rebase export VERSION=v0.0.x diff --git a/drivers/riverqueue-activerecord/lib/driver.rb b/drivers/riverqueue-activerecord/lib/driver.rb index 4ad7317..6eacf92 100644 --- a/drivers/riverqueue-activerecord/lib/driver.rb +++ b/drivers/riverqueue-activerecord/lib/driver.rb @@ -22,66 +22,72 @@ def self.dangerous_attribute_method?(method_name) return false if method_name == "errors" super end + + # See comment above, but since we force allowed `errors` as an + # attribute name, ActiveRecord would otherwise fail to save a row as + # it checked for its own `errors` hash and finding no values. + def errors = {} end) end end def insert(insert_params) + to_job_row(RiverJob.create(insert_params_to_hash(insert_params))) + end + + def insert_many(insert_params_many) + RiverJob.insert_all(insert_params_many.map { |p| insert_params_to_hash(p) }) + insert_params_many.count + end + + private def insert_params_to_hash(insert_params) # the call to `#compact` is important so that we remove nils and table # default values get picked up instead - to_job_row( - RiverJob.insert( - { - args: insert_params.encoded_args, - kind: insert_params.kind, - max_attempts: insert_params.max_attempts, - priority: insert_params.priority, - queue: insert_params.queue, - state: insert_params.state, - scheduled_at: insert_params.scheduled_at, - tags: insert_params.tags - }.compact, - returning: Arel.sql("*") - ).first - ) + { + args: insert_params.encoded_args, + kind: insert_params.kind, + max_attempts: insert_params.max_attempts, + priority: insert_params.priority, + queue: insert_params.queue, + state: insert_params.state, + scheduled_at: insert_params.scheduled_at, + tags: insert_params.tags + }.compact end # Type type injected to this method is not a `RiverJob`, but rather a raw # hash with stringified keys because we're inserting with the Arel framework # directly rather than generating a record from a model. - private def to_job_row(raw_job) - deserialize = ->(field) do - RiverJob._default_attributes[field].type.deserialize(raw_job[field]) - end - - # Errors is `jsonb[]` so the subtype here will decode `jsonb`. - errors_subtype = RiverJob._default_attributes["errors"].type.subtype + private def to_job_row(river_job) + # needs to be accessed through values because `errors` is shadowed by both + # ActiveRecord and the patch above + errors = river_job.attributes["errors"] River::JobRow.new( - id: deserialize.call("id"), - args: deserialize.call("args").yield_self { |a| a ? JSON.parse(a) : nil }, - attempt: deserialize.call("attempt"), - attempted_at: deserialize.call("attempted_at"), - attempted_by: deserialize.call("attempted_by"), - created_at: deserialize.call("created_at"), - errors: deserialize.call("errors")&.map do |e| - deserialized_error = errors_subtype.deserialize(e) + id: river_job.id, + args: river_job.args ? JSON.parse(river_job.args) : nil, + attempt: river_job.attempt, + attempted_at: river_job.attempted_at, + attempted_by: river_job.attempted_by, + created_at: river_job.created_at, + errors: errors&.map { |e| + deserialized_error = JSON.parse(e, symbolize_names: true) River::AttemptError.new( - at: Time.parse(deserialized_error["at"]), - attempt: deserialized_error["attempt"], - error: deserialized_error["error"], - trace: deserialized_error["trace"] + at: Time.parse(deserialized_error[:at]), + attempt: deserialized_error[:attempt], + error: deserialized_error[:error], + trace: deserialized_error[:trace] ) - end, - finalized_at: deserialize.call("finalized_at"), - kind: deserialize.call("kind"), - max_attempts: deserialize.call("max_attempts"), - priority: deserialize.call("priority"), - queue: deserialize.call("queue"), - scheduled_at: deserialize.call("scheduled_at"), - state: deserialize.call("state"), - tags: deserialize.call("tags") + }, + finalized_at: river_job.finalized_at, + kind: river_job.kind, + max_attempts: river_job.max_attempts, + priority: river_job.priority, + queue: river_job.queue, + scheduled_at: river_job.scheduled_at, + state: river_job.state, + tags: river_job.tags ) end end diff --git a/drivers/riverqueue-activerecord/spec/driver_spec.rb b/drivers/riverqueue-activerecord/spec/driver_spec.rb index 6b7b745..a501788 100644 --- a/drivers/riverqueue-activerecord/spec/driver_spec.rb +++ b/drivers/riverqueue-activerecord/spec/driver_spec.rb @@ -127,22 +127,86 @@ class SimpleArgsWithInsertOpts < SimpleArgs raise ActiveRecord::Rollback end - # Not visible because the job was rolled back. + # Not present because the job was rolled back. river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) expect(river_job).to be_nil end end + describe "#insert_many" do + it "inserts multiple jobs" do + num_inserted = client.insert_many([ + SimpleArgs.new(job_num: 1), + SimpleArgs.new(job_num: 2) + ]) + expect(num_inserted).to eq(2) + + job1 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.first) + expect(job1).to have_attributes( + attempt: 0, + args: {"job_num" => 1}, + created_at: be_within(2).of(Time.now.utc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.utc), + state: River::JOB_STATE_AVAILABLE, + tags: [] + ) + + job2 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.offset(1).first) + expect(job2).to have_attributes( + attempt: 0, + args: {"job_num" => 2}, + created_at: be_within(2).of(Time.now.utc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.utc), + state: River::JOB_STATE_AVAILABLE, + tags: [] + ) + end + + it "inserts multiple jobs in a transaction" do + job1 = nil + job2 = nil + + ActiveRecord::Base.transaction(requires_new: true) do + num_inserted = client.insert_many([ + SimpleArgs.new(job_num: 1), + SimpleArgs.new(job_num: 2) + ]) + expect(num_inserted).to eq(2) + + job1 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.first) + job2 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.offset(1).first) + + raise ActiveRecord::Rollback + end + + # Not present because the jobs were rolled back. + expect do + River::Driver::ActiveRecord::RiverJob.find(job1.id) + end.to raise_error(ActiveRecord::RecordNotFound) + expect do + River::Driver::ActiveRecord::RiverJob.find(job2.id) + end.to raise_error(ActiveRecord::RecordNotFound) + end + end + describe "#to_job_row" do it "converts a database record to `River::JobRow`" do now = Time.now.utc - river_job = { + river_job = River::Driver::ActiveRecord::RiverJob.new( id: 1, attempt: 1, attempted_at: now, attempted_by: ["client1"], created_at: now, - args: JSON.generate(%({"job_num":1})), # encoded twice, like how ActiveRecord returns it + args: %({"job_num":1}), finalized_at: now, kind: "simple", max_attempts: River::MAX_ATTEMPTS_DEFAULT, @@ -151,7 +215,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs scheduled_at: now, state: River::JOB_STATE_COMPLETED, tags: ["tag1"] - }.transform_keys { |k| k.to_s } + ) job_row = driver.send(:to_job_row, river_job) @@ -176,7 +240,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs it "with errors" do now = Time.now.utc - river_job = { + river_job = River::Driver::ActiveRecord::RiverJob.new( errors: [JSON.dump( { at: now, @@ -185,7 +249,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs trace: "error trace" } )] - }.transform_keys { |k| k.to_s } + ) job_row = driver.send(:to_job_row, river_job) diff --git a/drivers/riverqueue-sequel/lib/driver.rb b/drivers/riverqueue-sequel/lib/driver.rb index 0626499..6973f94 100644 --- a/drivers/riverqueue-sequel/lib/driver.rb +++ b/drivers/riverqueue-sequel/lib/driver.rb @@ -22,22 +22,27 @@ def initialize(db) end def insert(insert_params) + to_job_row(RiverJob.create(insert_params_to_hash(insert_params))) + end + + def insert_many(insert_params_many) + RiverJob.multi_insert(insert_params_many.map { |p| insert_params_to_hash(p) }) + insert_params_many.count + end + + private def insert_params_to_hash(insert_params) # the call to `#compact` is important so that we remove nils and table # default values get picked up instead - to_job_row( - RiverJob.create( - { - args: insert_params.encoded_args, - kind: insert_params.kind, - max_attempts: insert_params.max_attempts, - priority: insert_params.priority, - queue: insert_params.queue, - state: insert_params.state, - scheduled_at: insert_params.scheduled_at, - tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags) : nil - }.compact - ) - ) + { + args: insert_params.encoded_args, + kind: insert_params.kind, + max_attempts: insert_params.max_attempts, + priority: insert_params.priority, + queue: insert_params.queue, + state: insert_params.state, + scheduled_at: insert_params.scheduled_at, + tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags) : nil + }.compact end private def to_job_row(river_job) diff --git a/drivers/riverqueue-sequel/spec/driver_spec.rb b/drivers/riverqueue-sequel/spec/driver_spec.rb index 9c2fde7..44b3b96 100644 --- a/drivers/riverqueue-sequel/spec/driver_spec.rb +++ b/drivers/riverqueue-sequel/spec/driver_spec.rb @@ -127,12 +127,74 @@ class SimpleArgsWithInsertOpts < SimpleArgs raise Sequel::Rollback end - # Not visible because the job was rolled back. + # Not present because the job was rolled back. river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) expect(river_job).to be_nil end end + describe "#insert_many" do + it "inserts multiple jobs" do + num_inserted = client.insert_many([ + SimpleArgs.new(job_num: 1), + SimpleArgs.new(job_num: 2) + ]) + expect(num_inserted).to eq(2) + + job1 = driver.send(:to_job_row, River::Driver::Sequel::RiverJob.first) + expect(job1).to have_attributes( + attempt: 0, + args: {"job_num" => 1}, + created_at: be_within(2).of(Time.now.utc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.utc), + state: River::JOB_STATE_AVAILABLE, + tags: ::Sequel.pg_array([]) + ) + + job2 = driver.send(:to_job_row, River::Driver::Sequel::RiverJob.limit(nil, 1).first) + expect(job2).to have_attributes( + attempt: 0, + args: {"job_num" => 2}, + created_at: be_within(2).of(Time.now.utc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.utc), + state: River::JOB_STATE_AVAILABLE, + tags: ::Sequel.pg_array([]) + ) + end + + it "inserts multiple jobs in a transaction" do + job1 = nil + job2 = nil + + DB.transaction(savepoint: true) do + num_inserted = client.insert_many([ + SimpleArgs.new(job_num: 1), + SimpleArgs.new(job_num: 2) + ]) + expect(num_inserted).to eq(2) + + job1 = driver.send(:to_job_row, River::Driver::Sequel::RiverJob.first) + job2 = driver.send(:to_job_row, River::Driver::Sequel::RiverJob.limit(nil, 1).first) + + raise Sequel::Rollback + end + + # Not present because the jobs were rolled back. + river_job1 = River::Driver::Sequel::RiverJob.first(id: job1.id) + expect(river_job1).to be_nil + river_job2 = River::Driver::Sequel::RiverJob.first(id: job2.id) + expect(river_job2).to be_nil + end + end + describe "#to_job_row" do it "converts a database record to `River::JobRow`" do now = Time.now.utc diff --git a/lib/client.rb b/lib/client.rb index 58d5bbc..c4192a3 100644 --- a/lib/client.rb +++ b/lib/client.rb @@ -20,9 +20,22 @@ def initialize(driver) @driver = driver end + EMPTY_INSERT_OPTS = InsertOpts.new.freeze + private_constant :EMPTY_INSERT_OPTS + # Inserts a new job for work given a job args implementation and insertion # options (which may be omitted). # + # With job args only: + # + # insert_res = client.insert(SimpleArgs.new(job_num: 1)) + # insert_res.job # inserted job row + # + # With insert opts: + # + # insert_res = client.insert(SimpleArgs.new(job_num: 1), insert_opts: InsertOpts.new(queue: "high_priority")) + # insert_res.job # inserted job row + # # Job arg implementations are expected to respond to: # # * `#kind`: A string that uniquely identifies the job in the database. @@ -34,8 +47,84 @@ def initialize(driver) # kind. Insertion options provided as an argument to `#insert` override # those returned by job args. # + # For example: + # + # class SimpleArgs + # attr_accessor :job_num + # + # def initialize(job_num:) + # self.job_num = job_num + # end + # + # def kind = "simple" + # + # def to_json = JSON.dump({job_num: job_num}) + # end + # + # See also JobArgsHash for an easy way to insert a job from a hash. + # # Returns an instance of InsertResult. - def insert(args, insert_opts: InsertOpts.new) + def insert(args, insert_opts: EMPTY_INSERT_OPTS) + job = @driver.insert(make_insert_params(args, insert_opts)) + InsertResult.new(job) + end + + # Inserts many new jobs as part of a single batch operation for improved + # efficiency. + # + # Takes an array of job args or InsertManyParams which encapsulate job args + # and a paired InsertOpts. + # + # With job args: + # + # num_inserted = client.insert_many([ + # SimpleArgs.new(job_num: 1), + # SimpleArgs.new(job_num: 2) + # ]) + # + # With InsertManyParams: + # + # num_inserted = client.insert_many([ + # River::InsertManyParams.new(SimpleArgs.new(job_num: 1), insert_opts: InsertOpts.new(max_attempts: 5)), + # River::InsertManyParams.new(SimpleArgs.new(job_num: 2), insert_opts: InsertOpts.new(queue: "high_priority")) + # ]) + # + # Job arg implementations are expected to respond to: + # + # * `#kind`: A string that uniquely identifies the job in the database. + # * `#to_json`: Encodes the args to JSON for persistence in the database. + # Must match encoding an args struct on the Go side to be workable. + # + # For example: + # + # class SimpleArgs + # attr_accessor :job_num + # + # def initialize(job_num:) + # self.job_num = job_num + # end + # + # def kind = "simple" + # + # def to_json = JSON.dump({job_num: job_num}) + # end + # + # See also JobArgsHash for an easy way to insert a job from a hash. + # + # Returns the number of jobs inserted. + def insert_many(args) + all_params = args.map do |arg| + if arg.is_a?(InsertManyParams) + make_insert_params(arg.args, arg.insert_opts || EMPTY_INSERT_OPTS) + else # jobArgs + make_insert_params(arg, EMPTY_INSERT_OPTS) + end + end + + @driver.insert_many(all_params) + end + + private def make_insert_params(args, insert_opts) raise "args should respond to `#kind`" if !args.respond_to?(:kind) # ~all objects in Ruby respond to `#to_json`, so check non-nil instead. @@ -44,14 +133,14 @@ def insert(args, insert_opts: InsertOpts.new) args_insert_opts = if args.respond_to?(:insert_opts) args_with_insert_opts = args #: _JobArgsWithInsertOpts # rubocop:disable Layout/LeadingCommentSpace - args_with_insert_opts.insert_opts || InsertOpts.new + args_with_insert_opts.insert_opts || EMPTY_INSERT_OPTS else - InsertOpts.new + EMPTY_INSERT_OPTS end scheduled_at = insert_opts.scheduled_at || args_insert_opts.scheduled_at - job = @driver.insert(Driver::JobInsertParams.new( + Driver::JobInsertParams.new( encoded_args: args_json, kind: args.kind, max_attempts: insert_opts.max_attempts || args_insert_opts.max_attempts || MAX_ATTEMPTS_DEFAULT, @@ -60,26 +149,7 @@ def insert(args, insert_opts: InsertOpts.new) scheduled_at: scheduled_at&.utc, # database defaults to now state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE, tags: insert_opts.tags || args_insert_opts.tags - )) - - InsertResult.new(job) - end - - # Inserts many new jobs as part of a single batch operation for improved - # efficiency. - # - # Takes an array of job args or InsertManyParams which encapsulate job args - # and a paired InsertOpts. - # - # Job arg implementations are expected to respond to: - # - # * `#kind`: A string that uniquely identifies the job in the database. - # * `#to_json`: Encodes the args to JSON for persistence in the database. - # Must match encoding an args struct on the Go side to be workable. - # - # Returns the number of jobs inserted. - def insert_many(args) - raise "sorry, not implemented yet" + ) end end diff --git a/lib/job.rb b/lib/job.rb index 58e32c9..650fad6 100644 --- a/lib/job.rb +++ b/lib/job.rb @@ -11,6 +11,12 @@ module River # way to insert a job without having to define a class. The first argument is # a "kind" string for identifying the job in the database and the second is a # hash that will be encoded to JSON. + # + # For example: + # + # insert_res = client.insert(River::JobArgsHash.new("job_kind", { + # job_num: 1 + # })) class JobArgsHash def initialize(kind, hash) raise "kind should be non-nil" if !kind diff --git a/lib/riverqueue.rb b/lib/riverqueue.rb index e77b938..fd8309b 100644 --- a/lib/riverqueue.rb +++ b/lib/riverqueue.rb @@ -1,9 +1,10 @@ require "json" -require_relative "client" -require_relative "driver" require_relative "insert_opts" require_relative "job" +require_relative "client" +require_relative "driver" + module River end diff --git a/sig/client.rbs b/sig/client.rbs index b45a73c..76cf8f8 100644 --- a/sig/client.rbs +++ b/sig/client.rbs @@ -6,9 +6,13 @@ module River class Client @driver: _Driver + EMPTY_INSERT_OPTS: InsertOpts + def initialize: (_Driver driver) -> void def insert: (jobArgs, ?insert_opts: InsertOpts) -> InsertResult def insert_many: (Array[jobArgs | InsertManyParams]) -> Integer + + private def make_insert_params: (jobArgs, InsertOpts) -> Driver::JobInsertParams end class InsertManyParams @@ -19,6 +23,7 @@ module River attr_reader insert_opts: InsertOpts? def initialize: (jobArgs job, ?insert_opts: InsertOpts?) -> void + def is_a?: (Class) -> bool end class InsertResult diff --git a/sig/driver.rbs b/sig/driver.rbs index 2d4c9b8..21b5516 100644 --- a/sig/driver.rbs +++ b/sig/driver.rbs @@ -1,6 +1,7 @@ module River interface _Driver def insert: (Driver::JobInsertParams) -> JobRow + def insert_many: (Array[Driver::JobInsertParams]) -> Integer end module Driver diff --git a/sig/insert_opts.rbs b/sig/insert_opts.rbs index 387905a..698f692 100644 --- a/sig/insert_opts.rbs +++ b/sig/insert_opts.rbs @@ -1,15 +1,10 @@ module River class InsertOpts attr_accessor max_attempts: Integer? - attr_accessor priority: Integer? - attr_accessor queue: String? - attr_accessor scheduled_at: Time? - attr_accessor tags: Array[String]? - attr_accessor unique_opts: UniqueOpts? def initialize: (?max_attempts: Integer?, ?priority: Integer?, ?queue: String?, ?scheduled_at: Time?, ?tags: Array[String]?, ?unique_opts: UniqueOpts?) -> void diff --git a/sig/job.rbs b/sig/job.rbs index 78bda83..126f398 100644 --- a/sig/job.rbs +++ b/sig/job.rbs @@ -10,6 +10,7 @@ module River type jobStateAll = "available" | "cancelled" | "completed" | "discarded" | "retryable" | "running" | "scheduled" interface _JobArgs + def is_a?: (Class) -> bool def kind: () -> String def respond_to?: (Symbol) -> bool def to_json: () -> String diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 9def9cf..67915cb 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -3,15 +3,26 @@ # We use a mock here, but each driver has a more comprehensive test suite that # performs full integration level tests. class MockDriver + attr_accessor :inserted_jobs + def initialize - @insert_params = [] + @inserted_jobs = [] @next_id = 0 end def insert(insert_params) - @insert_params << insert_params + insert_params_to_jow_row(insert_params) + end + + def insert_many(insert_params_many) + insert_params_many.each do |insert_params| + insert_params_to_jow_row(insert_params) + end + insert_params_many.count + end - River::JobRow.new( + private def insert_params_to_jow_row(insert_params) + job = River::JobRow.new( id: (@next_id += 1), args: JSON.parse(insert_params.encoded_args), attempt: 0, @@ -27,6 +38,8 @@ def insert(insert_params) state: insert_params.state, tags: insert_params.tags ) + inserted_jobs << job + job end end @@ -164,10 +177,131 @@ def to_json = nil end describe "#insert_many" do - it "inserts many jobs" do - expect do - client.insert_many([]) - end.to raise_error(RuntimeError, "sorry, not implemented yet") + it "inserts jobs from jobArgs with defaults" do + num_inserted = client.insert_many([ + SimpleArgs.new(job_num: 1), + SimpleArgs.new(job_num: 2) + ]) + expect(num_inserted).to eq(2) + + job1 = mock_driver.inserted_jobs[0] + expect(job1).to have_attributes( + id: 1, + args: {"job_num" => 1}, + attempt: 0, + created_at: be_within(2).of(Time.now), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: be_within(2).of(Time.now), + state: River::JOB_STATE_AVAILABLE, + tags: nil + ) + + job2 = mock_driver.inserted_jobs[1] + expect(job2).to have_attributes( + id: 2, + args: {"job_num" => 2}, + attempt: 0, + created_at: be_within(2).of(Time.now), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: be_within(2).of(Time.now), + state: River::JOB_STATE_AVAILABLE, + tags: nil + ) + end + + it "inserts jobs from InsertManyParams with defaults" do + num_inserted = client.insert_many([ + River::InsertManyParams.new(SimpleArgs.new(job_num: 1)), + River::InsertManyParams.new(SimpleArgs.new(job_num: 2)) + ]) + expect(num_inserted).to eq(2) + + job1 = mock_driver.inserted_jobs[0] + expect(job1).to have_attributes( + id: 1, + args: {"job_num" => 1}, + attempt: 0, + created_at: be_within(2).of(Time.now), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: be_within(2).of(Time.now), + state: River::JOB_STATE_AVAILABLE, + tags: nil + ) + + job2 = mock_driver.inserted_jobs[1] + expect(job2).to have_attributes( + id: 2, + args: {"job_num" => 2}, + attempt: 0, + created_at: be_within(2).of(Time.now), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: be_within(2).of(Time.now), + state: River::JOB_STATE_AVAILABLE, + tags: nil + ) + end + + it "inserts jobs with insert opts" do + # We set job insert opts in this spec too so that we can verify that the + # options passed at insertion time take precedence. + args1 = SimpleArgsWithInsertOpts.new(job_num: 1) + args1.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue_1", + tags: ["job_custom_1"] + ) + args2 = SimpleArgsWithInsertOpts.new(job_num: 2) + args2.insert_opts = River::InsertOpts.new( + max_attempts: 24, + priority: 3, + queue: "job_custom_queue_2", + tags: ["job_custom_2"] + ) + + num_inserted = client.insert_many([ + River::InsertManyParams.new(args1, insert_opts: River::InsertOpts.new( + max_attempts: 17, + priority: 3, + queue: "my_queue_1", + tags: ["custom_1"] + )), + River::InsertManyParams.new(args2, insert_opts: River::InsertOpts.new( + max_attempts: 18, + priority: 4, + queue: "my_queue_2", + tags: ["custom_2"] + )) + ]) + expect(num_inserted).to eq(2) + + job1 = mock_driver.inserted_jobs[0] + expect(job1).to have_attributes( + max_attempts: 17, + priority: 3, + queue: "my_queue_1", + tags: ["custom_1"] + ) + + job2 = mock_driver.inserted_jobs[1] + expect(job2).to have_attributes( + max_attempts: 18, + priority: 4, + queue: "my_queue_2", + tags: ["custom_2"] + ) end end end @@ -175,6 +309,14 @@ def to_json = nil RSpec.describe River::InsertManyParams do it "initializes" do args = SimpleArgs.new(job_num: 1) + + params = River::InsertManyParams.new(args) + expect(params.args).to eq(args) + expect(params.insert_opts).to be_nil + end + + it "initializes with insert opts" do + args = SimpleArgs.new(job_num: 1) insert_opts = River::InsertOpts.new(queue: "other") params = River::InsertManyParams.new(args, insert_opts: insert_opts)