diff --git a/CHANGELOG.md b/CHANGELOG.md index c042b6e4..b305eace 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,14 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -### Changed - ⚠️ Version 0.19.0 has minor breaking changes for the `Worker.Middleware`, introduced fairly recently in 0.17.0. We tried not to make this change, but found the existing middleware interface insufficient to provide the necessary range of functionality we wanted, and this is a secondary middleware facility that won't be in use for many users, so it seemed worthwhile. ### Changed - The `river.RecordOutput` function now returns an error if the output is too large. The output is limited to 32MB in size. [PR #782](https://github.com/riverqueue/river/pull/782). - **Breaking change:** The `Worker` interface's `Middleware` function now takes a `JobRow` parameter instead of a generic `Job[T]`. This was necessary to expand the potential of what middleware can do: by letting the executor extract a middleware stack from a worker before a job is fully unmarshaled, the middleware can also participate in the unmarshaling process. [PR #783](https://github.com/riverqueue/river/pull/783). +- `JobList` has been reimplemented to use sqlc. [PR #795](https://github.com/riverqueue/river/pull/795). ## [0.18.0] - 2025-02-20 diff --git a/Makefile b/Makefile index 5756aab8..226c90e5 100644 --- a/Makefile +++ b/Makefile @@ -64,7 +64,6 @@ define test-race-target endef $(foreach mod,$(submodules),$(eval $(call test-race-target,$(mod)))) - .PHONY: tidy tidy:: ## Run `go mod tidy` for all submodules define tidy-target diff --git a/internal/dblist/db_list.go b/internal/dblist/db_list.go index a3529ca6..e6bd6bab 100644 --- a/internal/dblist/db_list.go +++ b/internal/dblist/db_list.go @@ -3,7 +3,6 @@ package dblist import ( "context" "errors" - "fmt" "strings" "github.com/riverqueue/river/riverdriver" @@ -11,17 +10,6 @@ import ( "github.com/riverqueue/river/rivertype" ) -const jobList = `-- name: JobList :many -SELECT - %s -FROM - river_job -%s -ORDER BY - %s -LIMIT @count::integer -` - type SortOrder int const ( @@ -47,7 +35,7 @@ type JobListParams struct { } func JobList(ctx context.Context, exec riverdriver.Executor, params *JobListParams) ([]*rivertype.JobRow, error) { - var conditionsBuilder strings.Builder + var whereBuilder strings.Builder orderBy := make([]JobListOrderBy, len(params.OrderBy)) for i, o := range params.OrderBy { @@ -62,41 +50,46 @@ func JobList(ctx context.Context, exec riverdriver.Executor, params *JobListPara namedArgs = make(map[string]any) } - writeWhereOrAnd := func() { - if conditionsBuilder.Len() == 0 { - conditionsBuilder.WriteString("WHERE\n ") - } else { - conditionsBuilder.WriteString("\n AND ") + // Writes an `AND` to connect SQL predicates as long as this isn't the first + // predicate. + writeAndAfterFirst := func() { + if whereBuilder.Len() != 0 { + whereBuilder.WriteString("\n AND ") } } if len(params.Kinds) > 0 { - writeWhereOrAnd() - conditionsBuilder.WriteString("kind = any(@kinds::text[])") + writeAndAfterFirst() + whereBuilder.WriteString("kind = any(@kinds::text[])") namedArgs["kinds"] = params.Kinds } if len(params.Queues) > 0 { - writeWhereOrAnd() - conditionsBuilder.WriteString("queue = any(@queues::text[])") + writeAndAfterFirst() + whereBuilder.WriteString("queue = any(@queues::text[])") namedArgs["queues"] = params.Queues } if len(params.States) > 0 { - writeWhereOrAnd() - conditionsBuilder.WriteString("state = any(@states::river_job_state[])") + writeAndAfterFirst() + whereBuilder.WriteString("state = any(@states::river_job_state[])") namedArgs["states"] = sliceutil.Map(params.States, func(s rivertype.JobState) string { return string(s) }) } if params.Conditions != "" { - writeWhereOrAnd() - conditionsBuilder.WriteString(params.Conditions) + writeAndAfterFirst() + whereBuilder.WriteString(params.Conditions) + } + + // A condition of some kind is needed, so given no others write one that'll + // always return true. + if whereBuilder.Len() < 1 { + whereBuilder.WriteString("1") } if params.LimitCount < 1 { return nil, errors.New("required parameter 'Count' in JobList must be greater than zero") } - namedArgs["count"] = params.LimitCount if len(params.OrderBy) == 0 { return nil, errors.New("sort order is required") @@ -116,7 +109,10 @@ func JobList(ctx context.Context, exec riverdriver.Executor, params *JobListPara } } - sql := fmt.Sprintf(jobList, exec.JobListFields(), conditionsBuilder.String(), orderByBuilder.String()) - - return exec.JobList(ctx, sql, namedArgs) + return exec.JobList(ctx, &riverdriver.JobListParams{ + Max: params.LimitCount, + NamedArgs: namedArgs, + OrderByClause: orderByBuilder.String(), + WhereClause: whereBuilder.String(), + }) } diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index fbb77183..86fbfd7d 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -1281,11 +1281,15 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, UniqueStates: 0xFF, }) - fetchedJobs, err := exec.JobList( - ctx, - fmt.Sprintf("SELECT %s FROM river_job WHERE id = @job_id_123", exec.JobListFields()), - map[string]any{"job_id_123": job.ID}, - ) + // Does not match predicate (makes sure where clause is working). + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{}) + + fetchedJobs, err := exec.JobList(ctx, &riverdriver.JobListParams{ + Max: 100, + NamedArgs: map[string]any{"job_id_123": job.ID}, + OrderByClause: "id", + WhereClause: "id = @job_id_123", + }) require.NoError(t, err) require.Len(t, fetchedJobs, 1) @@ -1316,36 +1320,29 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Kind: ptrutil.Ptr("test_kind2")}) { - fetchedJobs, err := exec.JobList( - ctx, - fmt.Sprintf("SELECT %s FROM river_job WHERE kind = @kind", exec.JobListFields()), - map[string]any{"kind": job1.Kind}, - ) + fetchedJobs, err := exec.JobList(ctx, &riverdriver.JobListParams{ + Max: 100, + NamedArgs: map[string]any{"kind": job1.Kind}, + OrderByClause: "id", + WhereClause: "kind = @kind", + }) require.NoError(t, err) require.Len(t, fetchedJobs, 1) } { - fetchedJobs, err := exec.JobList( - ctx, - fmt.Sprintf("SELECT %s FROM river_job WHERE kind = any(@kind::text[])", exec.JobListFields()), - map[string]any{"kind": []string{job1.Kind, job2.Kind}}, - ) + fetchedJobs, err := exec.JobList(ctx, &riverdriver.JobListParams{ + Max: 100, + NamedArgs: map[string]any{"kind": []string{job1.Kind, job2.Kind}}, + OrderByClause: "id", + WhereClause: "kind = any(@kind::text[])", + }) require.NoError(t, err) require.Len(t, fetchedJobs, 2) } }) }) - t.Run("JobListFields", func(t *testing.T) { - t.Parallel() - - exec, _ := setup(ctx, t) - - require.Equal(t, "id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states", - exec.JobListFields()) - }) - t.Run("JobRescueMany", func(t *testing.T) { t.Parallel() diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index f6d8ff5a..50884cae 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -118,8 +118,7 @@ type Executor interface { JobInsertFastMany(ctx context.Context, params []*JobInsertFastParams) ([]*JobInsertFastResult, error) JobInsertFastManyNoReturning(ctx context.Context, params []*JobInsertFastParams) (int, error) JobInsertFull(ctx context.Context, params *JobInsertFullParams) (*rivertype.JobRow, error) - JobList(ctx context.Context, query string, namedArgs map[string]any) ([]*rivertype.JobRow, error) - JobListFields() string + JobList(ctx context.Context, params *JobListParams) ([]*rivertype.JobRow, error) JobRescueMany(ctx context.Context, params *JobRescueManyParams) (*struct{}, error) JobRetry(ctx context.Context, id int64) (*rivertype.JobRow, error) JobSchedule(ctx context.Context, params *JobScheduleParams) ([]*JobScheduleResult, error) @@ -290,6 +289,13 @@ type JobInsertFullParams struct { UniqueStates byte } +type JobListParams struct { + Max int32 + NamedArgs map[string]any + OrderByClause string + WhereClause string +} + type JobRescueManyParams struct { ID []int64 Error [][]byte diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index 429ff2f8..dc430083 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -825,6 +825,56 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull return &i, err } +const jobList = `-- name: JobList :many +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +FROM river_job +WHERE /* TEMPLATE_BEGIN: where_clause */ 1 /* TEMPLATE_END */ +ORDER BY /* TEMPLATE_BEGIN: order_by_clause */ id /* TEMPLATE_END */ +LIMIT $1::int +` + +func (q *Queries) JobList(ctx context.Context, db DBTX, max int32) ([]*RiverJob, error) { + rows, err := db.QueryContext(ctx, jobList, max) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + pq.Array(&i.AttemptedBy), + &i.CreatedAt, + pq.Array(&i.Errors), + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + pq.Array(&i.Tags), + &i.UniqueKey, + &i.UniqueStates, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const jobRescueMany = `-- name: JobRescueMany :exec UPDATE river_job SET diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 195609ec..53727047 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -19,7 +19,6 @@ import ( "time" "github.com/jackc/pgx/v5/pgtype" - "github.com/lib/pq" "github.com/riverqueue/river/internal/dbunique" "github.com/riverqueue/river/riverdriver" @@ -364,50 +363,22 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns return jobRowFromInternal(job) } -func (e *Executor) JobList(ctx context.Context, query string, namedArgs map[string]any) ([]*rivertype.JobRow, error) { - query, err := replaceNamed(query, namedArgs) +func (e *Executor) JobList(ctx context.Context, params *riverdriver.JobListParams) ([]*rivertype.JobRow, error) { + whereClause, err := replaceNamed(params.WhereClause, params.NamedArgs) if err != nil { return nil, err } - rows, err := e.dbtx.QueryContext(ctx, query) + ctx = sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{ + "order_by_clause": {Value: params.OrderByClause}, + "where_clause": {Value: whereClause}, + }, nil) // named params not passed because they've already been replaced above + + jobs, err := dbsqlc.New().JobList(ctx, e.dbtx, params.Max) if err != nil { - return nil, err - } - defer rows.Close() - - var items []*dbsqlc.RiverJob - for rows.Next() { - var i dbsqlc.RiverJob - if err := rows.Scan( - &i.ID, - &i.Args, - &i.Attempt, - &i.AttemptedAt, - pq.Array(&i.AttemptedBy), - &i.CreatedAt, - pq.Array(&i.Errors), - &i.FinalizedAt, - &i.Kind, - &i.MaxAttempts, - &i.Metadata, - &i.Priority, - &i.Queue, - &i.State, - &i.ScheduledAt, - pq.Array(&i.Tags), - &i.UniqueKey, - &i.UniqueStates, - ); err != nil { - return nil, err - } - items = append(items, &i) - } - if err := rows.Err(); err != nil { return nil, interpretError(err) } - - return mapSliceError(items, jobRowFromInternal) + return mapSliceError(jobs, jobRowFromInternal) } func escapeSinglePostgresValue(value any) string { @@ -502,10 +473,6 @@ func replaceNamed(query string, namedArgs map[string]any) (string, error) { return query, nil } -func (e *Executor) JobListFields() string { - return "id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states" -} - func (e *Executor) JobRescueMany(ctx context.Context, params *riverdriver.JobRescueManyParams) (*struct{}, error) { err := dbsqlc.New().JobRescueMany(ctx, e.dbtx, &dbsqlc.JobRescueManyParams{ ID: params.ID, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index a6be7556..bf4ac802 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -315,6 +315,12 @@ INSERT INTO river_job( @unique_states ) RETURNING *; +-- name: JobList :many +SELECT * +FROM river_job +WHERE /* TEMPLATE_BEGIN: where_clause */ 1 /* TEMPLATE_END */ +ORDER BY /* TEMPLATE_BEGIN: order_by_clause */ id /* TEMPLATE_END */ +LIMIT @max::int; -- Run by the rescuer to queue for retry or discard depending on job state. -- name: JobRescueMany :exec diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 4cd0810c..a4c8673e 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -809,6 +809,53 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull return &i, err } +const jobList = `-- name: JobList :many +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +FROM river_job +WHERE /* TEMPLATE_BEGIN: where_clause */ 1 /* TEMPLATE_END */ +ORDER BY /* TEMPLATE_BEGIN: order_by_clause */ id /* TEMPLATE_END */ +LIMIT $1::int +` + +func (q *Queries) JobList(ctx context.Context, db DBTX, max int32) ([]*RiverJob, error) { + rows, err := db.Query(ctx, jobList, max) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + &i.AttemptedBy, + &i.CreatedAt, + &i.Errors, + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + &i.Tags, + &i.UniqueKey, + &i.UniqueStates, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const jobRescueMany = `-- name: JobRescueMany :exec UPDATE river_job SET diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 3db9d3af..ed2811f7 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -350,49 +350,17 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns return jobRowFromInternal(job) } -func (e *Executor) JobList(ctx context.Context, query string, namedArgs map[string]any) ([]*rivertype.JobRow, error) { - rows, err := e.dbtx.Query(ctx, query, pgx.NamedArgs(namedArgs)) +func (e *Executor) JobList(ctx context.Context, params *riverdriver.JobListParams) ([]*rivertype.JobRow, error) { + ctx = sqlctemplate.WithReplacements(ctx, map[string]sqlctemplate.Replacement{ + "order_by_clause": {Value: params.OrderByClause}, + "where_clause": {Value: params.WhereClause}, + }, params.NamedArgs) + + jobs, err := dbsqlc.New().JobList(ctx, e.dbtx, params.Max) if err != nil { - return nil, err - } - defer rows.Close() - - var items []*dbsqlc.RiverJob - for rows.Next() { - var i dbsqlc.RiverJob - if err := rows.Scan( - &i.ID, - &i.Args, - &i.Attempt, - &i.AttemptedAt, - &i.AttemptedBy, - &i.CreatedAt, - &i.Errors, - &i.FinalizedAt, - &i.Kind, - &i.MaxAttempts, - &i.Metadata, - &i.Priority, - &i.Queue, - &i.State, - &i.ScheduledAt, - &i.Tags, - &i.UniqueKey, - &i.UniqueStates, - ); err != nil { - return nil, err - } - items = append(items, &i) - } - if err := rows.Err(); err != nil { return nil, interpretError(err) } - - return mapSliceError(items, jobRowFromInternal) -} - -func (e *Executor) JobListFields() string { - return "id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states" + return mapSliceError(jobs, jobRowFromInternal) } func (e *Executor) JobRescueMany(ctx context.Context, params *riverdriver.JobRescueManyParams) (*struct{}, error) {