Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Pausing or resuming a queue that was already paused or not paused respectively no longer returns `rivertype.ErrNotFound`. The same goes for pausing or resuming using the all queues string (`*`) when no queues are in the database (previously that also returned `rivertype.ErrNotFound`). [PR #408](https://github.com/riverqueue/river/pull/408).

## [0.8.0] - 2024-06-25

### Added
Expand Down
61 changes: 55 additions & 6 deletions internal/riverinternaltest/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1949,7 +1949,25 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv
t.Run("QueuePause", func(t *testing.T) {
t.Parallel()

t.Run("ExistingQueue", func(t *testing.T) {
t.Run("ExistingPausedQueue", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)

queue := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{
PausedAt: ptrutil.Ptr(time.Now()),
})

require.NoError(t, exec.QueuePause(ctx, queue.Name))

queueFetched, err := exec.QueueGet(ctx, queue.Name)
require.NoError(t, err)
require.NotNil(t, queueFetched.PausedAt)
requireEqualTime(t, *queue.PausedAt, *queueFetched.PausedAt) // paused_at stays unchanged
requireEqualTime(t, queue.UpdatedAt, queueFetched.UpdatedAt) // updated_at stays unchanged
})

t.Run("ExistingUnpausedQueue", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)
Expand All @@ -1974,7 +1992,7 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv
require.ErrorIs(t, err, rivertype.ErrNotFound)
})

t.Run("AllQueues", func(t *testing.T) {
t.Run("AllQueuesExistingQueues", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)
Expand All @@ -1998,25 +2016,48 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv
require.NotNil(t, queue2Fetched.PausedAt)
require.WithinDuration(t, now, *(queue2Fetched.PausedAt), 500*time.Millisecond)
})

t.Run("AllQueuesNoQueues", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)

require.NoError(t, exec.QueuePause(ctx, rivercommon.AllQueuesString))
})
})

t.Run("QueueResume", func(t *testing.T) {
t.Parallel()

t.Run("ExistingQueue", func(t *testing.T) {
t.Run("ExistingPausedQueue", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)

queue := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{
PausedAt: ptrutil.Ptr(time.Now()),
})

require.NoError(t, exec.QueueResume(ctx, queue.Name))

queueFetched, err := exec.QueueGet(ctx, queue.Name)
require.NoError(t, err)
require.Nil(t, queueFetched.PausedAt)
})

t.Run("ExistingUnpausedQueue", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)

queue := testfactory.Queue(ctx, t, exec, nil)
require.Nil(t, queue.PausedAt)

require.NoError(t, exec.QueuePause(ctx, queue.Name))
require.NoError(t, exec.QueueResume(ctx, queue.Name))

queueFetched, err := exec.QueueGet(ctx, queue.Name)
require.NoError(t, err)
require.Nil(t, queueFetched.PausedAt)
requireEqualTime(t, queue.UpdatedAt, queueFetched.UpdatedAt) // updated_at stays unchanged
})

t.Run("NonExistentQueue", func(t *testing.T) {
Expand All @@ -2028,7 +2069,7 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv
require.ErrorIs(t, err, rivertype.ErrNotFound)
})

t.Run("AllQueues", func(t *testing.T) {
t.Run("AllQueuesExistingQueues", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)
Expand All @@ -2049,6 +2090,14 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv
require.NoError(t, err)
require.Nil(t, queue2Fetched.PausedAt)
})

t.Run("AllQueuesNoQueues", func(t *testing.T) {
t.Parallel()

exec, _ := setupExecutor(ctx, t, driver, beginTx)

require.NoError(t, exec.QueueResume(ctx, rivercommon.AllQueuesString))
})
})
})
}
Expand Down
2 changes: 2 additions & 0 deletions riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/riverqueue/river/rivertype"
)

const AllQueuesString = "*"

var (
ErrClosedPool = errors.New("underlying driver pool is closed")
ErrNotImplemented = errors.New("driver does not implement this functionality")
Expand Down
53 changes: 35 additions & 18 deletions riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,32 +48,49 @@ LIMIT @limit_count::integer;

-- name: QueuePause :execresult
WITH queue_to_update AS (
SELECT name
SELECT name, paused_at
FROM river_queue
WHERE CASE WHEN @name::text = '*' THEN true ELSE river_queue.name = @name::text END
AND paused_at IS NULL
WHERE CASE WHEN @name::text = '*' THEN true ELSE name = @name END
FOR UPDATE
),
updated_queue AS (
UPDATE river_queue
SET
paused_at = now(),
updated_at = now()
FROM queue_to_update
WHERE river_queue.name = queue_to_update.name
AND river_queue.paused_at IS NULL
RETURNING river_queue.*
)

UPDATE river_queue
SET
paused_at = now(),
updated_at = now()
FROM queue_to_update
WHERE river_queue.name = queue_to_update.name;
SELECT *
FROM river_queue
WHERE name = @name
AND name NOT IN (SELECT name FROM updated_queue)
UNION
SELECT *
FROM updated_queue;

-- name: QueueResume :execresult
WITH queue_to_update AS (
SELECT name
FROM river_queue
WHERE CASE WHEN @name::text = '*' THEN true ELSE river_queue.name = @name::text END
AND paused_at IS NOT NULL
FOR UPDATE
),
updated_queue AS (
UPDATE river_queue
SET
paused_at = NULL,
updated_at = now()
FROM queue_to_update
WHERE river_queue.name = queue_to_update.name
RETURNING river_queue.*
)

UPDATE river_queue
SET
paused_at = NULL,
updated_at = now()
FROM queue_to_update
WHERE river_queue.name = queue_to_update.name;
SELECT *
FROM river_queue
WHERE name = @name
AND name NOT IN (SELECT name FROM updated_queue)
UNION
SELECT *
FROM updated_queue;
53 changes: 35 additions & 18 deletions riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions riverdriver/riverpgxv5/river_pgx_v5_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (e *Executor) QueuePause(ctx context.Context, name string) error {
if err != nil {
return interpretError(err)
}
if res.RowsAffected() == 0 {
if res.RowsAffected() == 0 && name != riverdriver.AllQueuesString {
return rivertype.ErrNotFound
}
return nil
Expand All @@ -535,7 +535,7 @@ func (e *Executor) QueueResume(ctx context.Context, name string) error {
if err != nil {
return interpretError(err)
}
if res.RowsAffected() == 0 {
if res.RowsAffected() == 0 && name != riverdriver.AllQueuesString {
return rivertype.ErrNotFound
}
return nil
Expand Down