Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- The `BatchCompleter` that marks jobs as completed can now batch database updates for _all_ states of jobs that have finished execution. Prior to this change, only `completed` jobs were batched into a single `UPDATE` call, while jobs moving to any other state used a single `UPDATE` per job. This change should significantly reduce database and pool contention on high volume system when jobs get retried, snoozed, cancelled, or discarded following execution. [PR #617](https://github.com/riverqueue/river/pull/617).

### Fixed

- Unique job changes from v0.12.0 / [PR #590](https://github.com/riverqueue/river/pull/590) introduced a bug with scheduled or retryable unique jobs where they could be considered in conflict with themselves and moved to `discarded` by mistake. There was also a possibility of a broken job scheduler if duplicate `retryable` unique jobs were attempted to be scheduled at the same time. The job scheduling query was corrected to address these issues along with missing test coverage. [PR #619](https://github.com/riverqueue/river/pull/619).

## [0.12.0] - 2024-09-23

⚠️ Version 0.12.0 contains a new database migration, version 6. See [documentation on running River migrations](https://riverqueue.com/docs/migrations). If migrating with the CLI, make sure to update it to its latest version:
Expand Down
213 changes: 176 additions & 37 deletions internal/riverinternaltest/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"golang.org/x/text/cases"
"golang.org/x/text/language"

"github.com/riverqueue/river/internal/dbunique"
"github.com/riverqueue/river/internal/notifier"
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/riverdriver"
Expand Down Expand Up @@ -1503,54 +1505,191 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
t.Run("JobSchedule", func(t *testing.T) {
t.Parallel()

exec, _ := setup(ctx, t)
t.Run("BasicScheduling", func(t *testing.T) {
exec, _ := setup(ctx, t)

var (
horizon = time.Now()
beforeHorizon = horizon.Add(-1 * time.Minute)
afterHorizon = horizon.Add(1 * time.Minute)
)
var (
horizon = time.Now()
beforeHorizon = horizon.Add(-1 * time.Minute)
afterHorizon = horizon.Add(1 * time.Minute)
)

job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)})
job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)})
job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)})

job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)})
job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)})
job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)})
// States that aren't scheduled.
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateAvailable)})
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateCompleted)})
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateDiscarded)})

// States that aren't scheduled.
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateAvailable)})
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateCompleted)})
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateDiscarded)})
// Right state, but after horizon.
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)})
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)})

// Right state, but after horizon.
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)})
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)})
// First two scheduled because of limit.
result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{
Max: 2,
Now: horizon,
})
require.NoError(t, err)
require.Len(t, result, 2)

// And then job3 scheduled.
result, err = exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{
Max: 2,
Now: horizon,
})
require.NoError(t, err)
require.Len(t, result, 1)

// First two scheduled because of limit.
result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{
Max: 2,
Now: horizon,
updatedJob1, err := exec.JobGetByID(ctx, job1.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateAvailable, updatedJob1.State)

updatedJob2, err := exec.JobGetByID(ctx, job2.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateAvailable, updatedJob2.State)

updatedJob3, err := exec.JobGetByID(ctx, job3.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateAvailable, updatedJob3.State)
})
require.NoError(t, err)
require.Len(t, result, 2)

// And then job3 scheduled.
result, err = exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{
Max: 2,
Now: horizon,
t.Run("HandlesUniqueConflicts", func(t *testing.T) {
t.Parallel()

exec, _ := setup(ctx, t)

var (
horizon = time.Now()
beforeHorizon = horizon.Add(-1 * time.Minute)
)

defaultUniqueStates := []rivertype.JobState{
rivertype.JobStateAvailable,
rivertype.JobStatePending,
rivertype.JobStateRetryable,
rivertype.JobStateRunning,
rivertype.JobStateScheduled,
}
// The default unique state list, minus retryable to allow for these conflicts:
nonRetryableUniqueStates := []rivertype.JobState{
rivertype.JobStateAvailable,
rivertype.JobStatePending,
rivertype.JobStateRunning,
rivertype.JobStateScheduled,
}

job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
ScheduledAt: &beforeHorizon,
State: ptrutil.Ptr(rivertype.JobStateRetryable),
UniqueKey: []byte("unique-key-1"),
UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates),
})
job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
ScheduledAt: &beforeHorizon,
State: ptrutil.Ptr(rivertype.JobStateRetryable),
UniqueKey: []byte("unique-key-2"),
UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates),
})
// job3 has no conflict (it's the only one with this key), so it should be
// scheduled.
job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
ScheduledAt: &beforeHorizon,
State: ptrutil.Ptr(rivertype.JobStateRetryable),
UniqueKey: []byte("unique-key-3"),
UniqueStates: dbunique.UniqueStatesToBitmask(defaultUniqueStates),
})

// This one is a conflict with job1 because it's already running and has
// the same unique properties:
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
ScheduledAt: &beforeHorizon,
State: ptrutil.Ptr(rivertype.JobStateRunning),
UniqueKey: []byte("unique-key-1"),
UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates),
})
// This one is *not* a conflict with job2 because it's completed, which
// isn't in the unique states:
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
ScheduledAt: &beforeHorizon,
State: ptrutil.Ptr(rivertype.JobStateCompleted),
UniqueKey: []byte("unique-key-2"),
UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates),
})

result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{
Max: 100,
Now: horizon,
})
require.NoError(t, err)
require.Len(t, result, 3)

updatedJob1, err := exec.JobGetByID(ctx, job1.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateDiscarded, updatedJob1.State)
require.Equal(t, "scheduler_discarded", gjson.GetBytes(updatedJob1.Metadata, "unique_key_conflict").String())

updatedJob2, err := exec.JobGetByID(ctx, job2.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateAvailable, updatedJob2.State)
require.False(t, gjson.GetBytes(updatedJob2.Metadata, "unique_key_conflict").Exists())

updatedJob3, err := exec.JobGetByID(ctx, job3.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateAvailable, updatedJob3.State)
require.False(t, gjson.GetBytes(updatedJob3.Metadata, "unique_key_conflict").Exists())
})
require.NoError(t, err)
require.Len(t, result, 1)

updatedJob1, err := exec.JobGetByID(ctx, job1.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateAvailable, updatedJob1.State)
t.Run("SchedulingTwoRetryableJobsThatWillConflictWithEachOther", func(t *testing.T) {
t.Parallel()

updatedJob2, err := exec.JobGetByID(ctx, job2.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateAvailable, updatedJob2.State)
exec, _ := setup(ctx, t)

updatedJob3, err := exec.JobGetByID(ctx, job3.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateAvailable, updatedJob3.State)
var (
horizon = time.Now()
beforeHorizon = horizon.Add(-1 * time.Minute)
)

// The default unique state list, minus retryable to allow for these conflicts:
nonRetryableUniqueStates := []rivertype.JobState{
rivertype.JobStateAvailable,
rivertype.JobStatePending,
rivertype.JobStateRunning,
rivertype.JobStateScheduled,
}

job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
ScheduledAt: &beforeHorizon,
State: ptrutil.Ptr(rivertype.JobStateRetryable),
UniqueKey: []byte("unique-key-1"),
UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates),
})
job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
ScheduledAt: &beforeHorizon,
State: ptrutil.Ptr(rivertype.JobStateRetryable),
UniqueKey: []byte("unique-key-1"),
UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates),
})

result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{
Max: 100,
Now: horizon,
})
require.NoError(t, err)
require.Len(t, result, 2)

updatedJob1, err := exec.JobGetByID(ctx, job1.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateAvailable, updatedJob1.State)
require.False(t, gjson.GetBytes(updatedJob1.Metadata, "unique_key_conflict").Exists())

updatedJob2, err := exec.JobGetByID(ctx, job2.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateDiscarded, updatedJob2.State)
require.Equal(t, "scheduler_discarded", gjson.GetBytes(updatedJob2.Metadata, "unique_key_conflict").String())
})
})

t.Run("JobSetCompleteIfRunningMany", func(t *testing.T) {
Expand Down
76 changes: 54 additions & 22 deletions riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading