Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.2.0] - 2024-04-27

### Added

- Implement #insert_many for batch job insertion. [PR #5](https://github.com/riverqueue/riverqueue-ruby/pull/5).

## [0.1.0] - 2024-04-25

### Added
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@ standardrb:
steep:
bundle exec steep check

.PHONY: test
test: spec

.PHONY: type-check
type-check: steep
21 changes: 21 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class SortArgs
end

insert_res = client.insert(SimpleArgs.new(strings: ["whale", "tiger", "bear"]))
insert_res.job # inserted job row
```

Job args should:
Expand Down Expand Up @@ -69,6 +70,26 @@ insert_res = client.insert(River::JobArgsHash.new("hash_kind", {
}))
```

### Bulk inserting jobs

Use `#insert_many` to bulk insert jobs as a single operation for improved efficiency:

```ruby
num_inserted = client.insert_many([
SimpleArgs.new(job_num: 1),
SimpleArgs.new(job_num: 2)
])
```

Or with `InsertManyParams`, which may include insertion options:

```ruby
num_inserted = client.insert_many([
River::InsertManyParams.new(SimpleArgs.new(job_num: 1), insert_opts: InsertOpts.new(max_attempts: 5)),
River::InsertManyParams.new(SimpleArgs.new(job_num: 2), insert_opts: InsertOpts.new(queue: "high_priority"))
])
```

## Drivers

### ActiveRecord
Expand Down
2 changes: 2 additions & 0 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ $ open coverage/index.html

## Publish gems

Update `CHANGELOG.md` to include the new version and open a pull request with the changes.

```shell
git checkout master && git pull --rebase
export VERSION=v0.0.x
Expand Down
92 changes: 49 additions & 43 deletions drivers/riverqueue-activerecord/lib/driver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,66 +22,72 @@ def self.dangerous_attribute_method?(method_name)
return false if method_name == "errors"
super
end

# See comment above, but since we force allowed `errors` as an
# attribute name, ActiveRecord would otherwise fail to save a row as
# it checked for its own `errors` hash and finding no values.
def errors = {}
end)
end
end

def insert(insert_params)
to_job_row(RiverJob.create(insert_params_to_hash(insert_params)))
end

def insert_many(insert_params_many)
RiverJob.insert_all(insert_params_many.map { |p| insert_params_to_hash(p) })
insert_params_many.count
end

private def insert_params_to_hash(insert_params)
# the call to `#compact` is important so that we remove nils and table
# default values get picked up instead
to_job_row(
RiverJob.insert(
{
args: insert_params.encoded_args,
kind: insert_params.kind,
max_attempts: insert_params.max_attempts,
priority: insert_params.priority,
queue: insert_params.queue,
state: insert_params.state,
scheduled_at: insert_params.scheduled_at,
tags: insert_params.tags
}.compact,
returning: Arel.sql("*")
).first
)
{
args: insert_params.encoded_args,
kind: insert_params.kind,
max_attempts: insert_params.max_attempts,
priority: insert_params.priority,
queue: insert_params.queue,
state: insert_params.state,
scheduled_at: insert_params.scheduled_at,
tags: insert_params.tags
}.compact
end

# Type type injected to this method is not a `RiverJob`, but rather a raw
# hash with stringified keys because we're inserting with the Arel framework
# directly rather than generating a record from a model.
private def to_job_row(raw_job)
deserialize = ->(field) do
RiverJob._default_attributes[field].type.deserialize(raw_job[field])
end

# Errors is `jsonb[]` so the subtype here will decode `jsonb`.
errors_subtype = RiverJob._default_attributes["errors"].type.subtype
private def to_job_row(river_job)
# needs to be accessed through values because `errors` is shadowed by both
# ActiveRecord and the patch above
errors = river_job.attributes["errors"]

River::JobRow.new(
id: deserialize.call("id"),
args: deserialize.call("args").yield_self { |a| a ? JSON.parse(a) : nil },
attempt: deserialize.call("attempt"),
attempted_at: deserialize.call("attempted_at"),
attempted_by: deserialize.call("attempted_by"),
created_at: deserialize.call("created_at"),
errors: deserialize.call("errors")&.map do |e|
deserialized_error = errors_subtype.deserialize(e)
id: river_job.id,
args: river_job.args ? JSON.parse(river_job.args) : nil,
attempt: river_job.attempt,
attempted_at: river_job.attempted_at,
attempted_by: river_job.attempted_by,
created_at: river_job.created_at,
errors: errors&.map { |e|
deserialized_error = JSON.parse(e, symbolize_names: true)

River::AttemptError.new(
at: Time.parse(deserialized_error["at"]),
attempt: deserialized_error["attempt"],
error: deserialized_error["error"],
trace: deserialized_error["trace"]
at: Time.parse(deserialized_error[:at]),
attempt: deserialized_error[:attempt],
error: deserialized_error[:error],
trace: deserialized_error[:trace]
)
end,
finalized_at: deserialize.call("finalized_at"),
kind: deserialize.call("kind"),
max_attempts: deserialize.call("max_attempts"),
priority: deserialize.call("priority"),
queue: deserialize.call("queue"),
scheduled_at: deserialize.call("scheduled_at"),
state: deserialize.call("state"),
tags: deserialize.call("tags")
},
finalized_at: river_job.finalized_at,
kind: river_job.kind,
max_attempts: river_job.max_attempts,
priority: river_job.priority,
queue: river_job.queue,
scheduled_at: river_job.scheduled_at,
state: river_job.state,
tags: river_job.tags
)
end
end
Expand Down
76 changes: 70 additions & 6 deletions drivers/riverqueue-activerecord/spec/driver_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,22 +127,86 @@ class SimpleArgsWithInsertOpts < SimpleArgs
raise ActiveRecord::Rollback
end

# Not visible because the job was rolled back.
# Not present because the job was rolled back.
river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id)
expect(river_job).to be_nil
end
end

describe "#insert_many" do
it "inserts multiple jobs" do
num_inserted = client.insert_many([
SimpleArgs.new(job_num: 1),
SimpleArgs.new(job_num: 2)
])
expect(num_inserted).to eq(2)

job1 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.first)
expect(job1).to have_attributes(
attempt: 0,
args: {"job_num" => 1},
created_at: be_within(2).of(Time.now.utc),
kind: "simple",
max_attempts: River::MAX_ATTEMPTS_DEFAULT,
queue: River::QUEUE_DEFAULT,
priority: River::PRIORITY_DEFAULT,
scheduled_at: be_within(2).of(Time.now.utc),
state: River::JOB_STATE_AVAILABLE,
tags: []
)

job2 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.offset(1).first)
expect(job2).to have_attributes(
attempt: 0,
args: {"job_num" => 2},
created_at: be_within(2).of(Time.now.utc),
kind: "simple",
max_attempts: River::MAX_ATTEMPTS_DEFAULT,
queue: River::QUEUE_DEFAULT,
priority: River::PRIORITY_DEFAULT,
scheduled_at: be_within(2).of(Time.now.utc),
state: River::JOB_STATE_AVAILABLE,
tags: []
)
end

it "inserts multiple jobs in a transaction" do
job1 = nil
job2 = nil

ActiveRecord::Base.transaction(requires_new: true) do
num_inserted = client.insert_many([
SimpleArgs.new(job_num: 1),
SimpleArgs.new(job_num: 2)
])
expect(num_inserted).to eq(2)

job1 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.first)
job2 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.offset(1).first)

raise ActiveRecord::Rollback
end

# Not present because the jobs were rolled back.
expect do
River::Driver::ActiveRecord::RiverJob.find(job1.id)
end.to raise_error(ActiveRecord::RecordNotFound)
expect do
River::Driver::ActiveRecord::RiverJob.find(job2.id)
end.to raise_error(ActiveRecord::RecordNotFound)
end
end

describe "#to_job_row" do
it "converts a database record to `River::JobRow`" do
now = Time.now.utc
river_job = {
river_job = River::Driver::ActiveRecord::RiverJob.new(
id: 1,
attempt: 1,
attempted_at: now,
attempted_by: ["client1"],
created_at: now,
args: JSON.generate(%({"job_num":1})), # encoded twice, like how ActiveRecord returns it
args: %({"job_num":1}),
finalized_at: now,
kind: "simple",
max_attempts: River::MAX_ATTEMPTS_DEFAULT,
Expand All @@ -151,7 +215,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs
scheduled_at: now,
state: River::JOB_STATE_COMPLETED,
tags: ["tag1"]
}.transform_keys { |k| k.to_s }
)

job_row = driver.send(:to_job_row, river_job)

Expand All @@ -176,7 +240,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs

it "with errors" do
now = Time.now.utc
river_job = {
river_job = River::Driver::ActiveRecord::RiverJob.new(
errors: [JSON.dump(
{
at: now,
Expand All @@ -185,7 +249,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs
trace: "error trace"
}
)]
}.transform_keys { |k| k.to_s }
)

job_row = driver.send(:to_job_row, river_job)

Expand Down
33 changes: 19 additions & 14 deletions drivers/riverqueue-sequel/lib/driver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,27 @@ def initialize(db)
end

def insert(insert_params)
to_job_row(RiverJob.create(insert_params_to_hash(insert_params)))
end

def insert_many(insert_params_many)
RiverJob.multi_insert(insert_params_many.map { |p| insert_params_to_hash(p) })
insert_params_many.count
end

private def insert_params_to_hash(insert_params)
# the call to `#compact` is important so that we remove nils and table
# default values get picked up instead
to_job_row(
RiverJob.create(
{
args: insert_params.encoded_args,
kind: insert_params.kind,
max_attempts: insert_params.max_attempts,
priority: insert_params.priority,
queue: insert_params.queue,
state: insert_params.state,
scheduled_at: insert_params.scheduled_at,
tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags) : nil
}.compact
)
)
{
args: insert_params.encoded_args,
kind: insert_params.kind,
max_attempts: insert_params.max_attempts,
priority: insert_params.priority,
queue: insert_params.queue,
state: insert_params.state,
scheduled_at: insert_params.scheduled_at,
tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags) : nil
}.compact
end

private def to_job_row(river_job)
Expand Down
Loading