diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d5542af..55a503d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Pausing or resuming a queue that was already paused or not paused respectively no longer returns `rivertype.ErrNotFound`. The same goes for pausing or resuming using the all queues string (`*`) when no queues are in the database (previously that also returned `rivertype.ErrNotFound`). [PR #408](https://github.com/riverqueue/river/pull/408). + ## [0.8.0] - 2024-06-25 ### Added diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index ea4f61db..1e18538e 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -1949,7 +1949,25 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv t.Run("QueuePause", func(t *testing.T) { t.Parallel() - t.Run("ExistingQueue", func(t *testing.T) { + t.Run("ExistingPausedQueue", func(t *testing.T) { + t.Parallel() + + exec, _ := setupExecutor(ctx, t, driver, beginTx) + + queue := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{ + PausedAt: ptrutil.Ptr(time.Now()), + }) + + require.NoError(t, exec.QueuePause(ctx, queue.Name)) + + queueFetched, err := exec.QueueGet(ctx, queue.Name) + require.NoError(t, err) + require.NotNil(t, queueFetched.PausedAt) + requireEqualTime(t, *queue.PausedAt, *queueFetched.PausedAt) // paused_at stays unchanged + requireEqualTime(t, queue.UpdatedAt, queueFetched.UpdatedAt) // updated_at stays unchanged + }) + + t.Run("ExistingUnpausedQueue", func(t *testing.T) { t.Parallel() exec, _ := setupExecutor(ctx, t, driver, beginTx) @@ -1974,7 +1992,7 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv require.ErrorIs(t, err, rivertype.ErrNotFound) }) - t.Run("AllQueues", func(t *testing.T) { + t.Run("AllQueuesExistingQueues", func(t *testing.T) { t.Parallel() exec, _ := setupExecutor(ctx, t, driver, beginTx) @@ -1998,25 +2016,48 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv require.NotNil(t, queue2Fetched.PausedAt) require.WithinDuration(t, now, *(queue2Fetched.PausedAt), 500*time.Millisecond) }) + + t.Run("AllQueuesNoQueues", func(t *testing.T) { + t.Parallel() + + exec, _ := setupExecutor(ctx, t, driver, beginTx) + + require.NoError(t, exec.QueuePause(ctx, rivercommon.AllQueuesString)) + }) }) t.Run("QueueResume", func(t *testing.T) { t.Parallel() - t.Run("ExistingQueue", func(t *testing.T) { + t.Run("ExistingPausedQueue", func(t *testing.T) { + t.Parallel() + + exec, _ := setupExecutor(ctx, t, driver, beginTx) + + queue := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{ + PausedAt: ptrutil.Ptr(time.Now()), + }) + + require.NoError(t, exec.QueueResume(ctx, queue.Name)) + + queueFetched, err := exec.QueueGet(ctx, queue.Name) + require.NoError(t, err) + require.Nil(t, queueFetched.PausedAt) + }) + + t.Run("ExistingUnpausedQueue", func(t *testing.T) { t.Parallel() exec, _ := setupExecutor(ctx, t, driver, beginTx) queue := testfactory.Queue(ctx, t, exec, nil) - require.Nil(t, queue.PausedAt) - require.NoError(t, exec.QueuePause(ctx, queue.Name)) require.NoError(t, exec.QueueResume(ctx, queue.Name)) queueFetched, err := exec.QueueGet(ctx, queue.Name) require.NoError(t, err) require.Nil(t, queueFetched.PausedAt) + requireEqualTime(t, queue.UpdatedAt, queueFetched.UpdatedAt) // updated_at stays unchanged }) t.Run("NonExistentQueue", func(t *testing.T) { @@ -2028,7 +2069,7 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv require.ErrorIs(t, err, rivertype.ErrNotFound) }) - t.Run("AllQueues", func(t *testing.T) { + t.Run("AllQueuesExistingQueues", func(t *testing.T) { t.Parallel() exec, _ := setupExecutor(ctx, t, driver, beginTx) @@ -2049,6 +2090,14 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv require.NoError(t, err) require.Nil(t, queue2Fetched.PausedAt) }) + + t.Run("AllQueuesNoQueues", func(t *testing.T) { + t.Parallel() + + exec, _ := setupExecutor(ctx, t, driver, beginTx) + + require.NoError(t, exec.QueueResume(ctx, rivercommon.AllQueuesString)) + }) }) }) } diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 3a3a3d71..bc13f868 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -20,6 +20,8 @@ import ( "github.com/riverqueue/river/rivertype" ) +const AllQueuesString = "*" + var ( ErrClosedPool = errors.New("underlying driver pool is closed") ErrNotImplemented = errors.New("driver does not implement this functionality") diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql index 9323b774..0c4da1ab 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql @@ -48,32 +48,49 @@ LIMIT @limit_count::integer; -- name: QueuePause :execresult WITH queue_to_update AS ( - SELECT name + SELECT name, paused_at FROM river_queue - WHERE CASE WHEN @name::text = '*' THEN true ELSE river_queue.name = @name::text END - AND paused_at IS NULL + WHERE CASE WHEN @name::text = '*' THEN true ELSE name = @name END FOR UPDATE +), +updated_queue AS ( + UPDATE river_queue + SET + paused_at = now(), + updated_at = now() + FROM queue_to_update + WHERE river_queue.name = queue_to_update.name + AND river_queue.paused_at IS NULL + RETURNING river_queue.* ) - -UPDATE river_queue -SET - paused_at = now(), - updated_at = now() -FROM queue_to_update -WHERE river_queue.name = queue_to_update.name; +SELECT * +FROM river_queue +WHERE name = @name + AND name NOT IN (SELECT name FROM updated_queue) +UNION +SELECT * +FROM updated_queue; -- name: QueueResume :execresult WITH queue_to_update AS ( SELECT name FROM river_queue WHERE CASE WHEN @name::text = '*' THEN true ELSE river_queue.name = @name::text END - AND paused_at IS NOT NULL FOR UPDATE +), +updated_queue AS ( + UPDATE river_queue + SET + paused_at = NULL, + updated_at = now() + FROM queue_to_update + WHERE river_queue.name = queue_to_update.name + RETURNING river_queue.* ) - -UPDATE river_queue -SET - paused_at = NULL, - updated_at = now() -FROM queue_to_update -WHERE river_queue.name = queue_to_update.name; +SELECT * +FROM river_queue +WHERE name = @name + AND name NOT IN (SELECT name FROM updated_queue) +UNION +SELECT * +FROM updated_queue; \ No newline at end of file diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go index 838ff10e..7d2a7f1e 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go @@ -153,19 +153,28 @@ func (q *Queries) QueueList(ctx context.Context, db DBTX, limitCount int32) ([]* const queuePause = `-- name: QueuePause :execresult WITH queue_to_update AS ( - SELECT name + SELECT name, paused_at FROM river_queue - WHERE CASE WHEN $1::text = '*' THEN true ELSE river_queue.name = $1::text END - AND paused_at IS NULL + WHERE CASE WHEN $1::text = '*' THEN true ELSE name = $1 END FOR UPDATE +), +updated_queue AS ( + UPDATE river_queue + SET + paused_at = now(), + updated_at = now() + FROM queue_to_update + WHERE river_queue.name = queue_to_update.name + AND river_queue.paused_at IS NULL + RETURNING river_queue.name, river_queue.created_at, river_queue.metadata, river_queue.paused_at, river_queue.updated_at ) - -UPDATE river_queue -SET - paused_at = now(), - updated_at = now() -FROM queue_to_update -WHERE river_queue.name = queue_to_update.name +SELECT name, created_at, metadata, paused_at, updated_at +FROM river_queue +WHERE name = $1 + AND name NOT IN (SELECT name FROM updated_queue) +UNION +SELECT name, created_at, metadata, paused_at, updated_at +FROM updated_queue ` func (q *Queries) QueuePause(ctx context.Context, db DBTX, name string) (pgconn.CommandTag, error) { @@ -177,16 +186,24 @@ WITH queue_to_update AS ( SELECT name FROM river_queue WHERE CASE WHEN $1::text = '*' THEN true ELSE river_queue.name = $1::text END - AND paused_at IS NOT NULL FOR UPDATE +), +updated_queue AS ( + UPDATE river_queue + SET + paused_at = NULL, + updated_at = now() + FROM queue_to_update + WHERE river_queue.name = queue_to_update.name + RETURNING river_queue.name, river_queue.created_at, river_queue.metadata, river_queue.paused_at, river_queue.updated_at ) - -UPDATE river_queue -SET - paused_at = NULL, - updated_at = now() -FROM queue_to_update -WHERE river_queue.name = queue_to_update.name +SELECT name, created_at, metadata, paused_at, updated_at +FROM river_queue +WHERE name = $1 + AND name NOT IN (SELECT name FROM updated_queue) +UNION +SELECT name, created_at, metadata, paused_at, updated_at +FROM updated_queue ` func (q *Queries) QueueResume(ctx context.Context, db DBTX, name string) (pgconn.CommandTag, error) { diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 341b30fc..683e829a 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -524,7 +524,7 @@ func (e *Executor) QueuePause(ctx context.Context, name string) error { if err != nil { return interpretError(err) } - if res.RowsAffected() == 0 { + if res.RowsAffected() == 0 && name != riverdriver.AllQueuesString { return rivertype.ErrNotFound } return nil @@ -535,7 +535,7 @@ func (e *Executor) QueueResume(ctx context.Context, name string) error { if err != nil { return interpretError(err) } - if res.RowsAffected() == 0 { + if res.RowsAffected() == 0 && name != riverdriver.AllQueuesString { return rivertype.ErrNotFound } return nil