diff --git a/.storybook/preview.tsx b/.storybook/preview.tsx index 251c68b6..2fe3c706 100644 --- a/.storybook/preview.tsx +++ b/.storybook/preview.tsx @@ -26,6 +26,7 @@ export const withFeatures: Decorator = (StoryFn, context) => { // Default features with story-specific overrides const features = { hasProducerTable: true, + producerQueries: true, ...context.parameters?.features, }; diff --git a/common_test.go b/common_test.go deleted file mode 100644 index 1f0c7908..00000000 --- a/common_test.go +++ /dev/null @@ -1,32 +0,0 @@ -package riverui - -import ( - "log/slog" - "testing" - - "github.com/jackc/pgx/v5" - "github.com/stretchr/testify/require" - - "github.com/riverqueue/river" - "github.com/riverqueue/river/riverdriver" - "github.com/riverqueue/river/riverdriver/riverpgxv5" - - "riverqueue.com/riverui/internal/uicommontest" -) - -func insertOnlyClient(t *testing.T, logger *slog.Logger) (*river.Client[pgx.Tx], riverdriver.Driver[pgx.Tx]) { - t.Helper() - - workers := river.NewWorkers() - river.AddWorker(workers, &uicommontest.NoOpWorker{}) - - driver := riverpgxv5.New(nil) - - client, err := river.NewClient(driver, &river.Config{ - Logger: logger, - Workers: workers, - }) - require.NoError(t, err) - - return client, driver -} diff --git a/go.mod b/go.mod index 42d3c7c6..9e4da0f2 100644 --- a/go.mod +++ b/go.mod @@ -7,11 +7,11 @@ toolchain go1.24.4 require ( github.com/jackc/pgx/v5 v5.7.6 github.com/riverqueue/apiframe v0.0.0-20250408034821-b206bbbd0fb4 - github.com/riverqueue/river v0.25.0 - github.com/riverqueue/river/riverdriver v0.25.0 - github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.0 - github.com/riverqueue/river/rivershared v0.25.0 - github.com/riverqueue/river/rivertype v0.25.0 + github.com/riverqueue/river v0.25.1-0.20251001013213-822d5bb676cc + github.com/riverqueue/river/riverdriver v0.25.1-0.20251001013213-822d5bb676cc + github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.1-0.20251001013213-822d5bb676cc + github.com/riverqueue/river/rivershared v0.25.1-0.20251001013213-822d5bb676cc + github.com/riverqueue/river/rivertype v0.25.1-0.20251001013213-822d5bb676cc github.com/rs/cors v1.11.1 github.com/samber/slog-http v1.8.2 github.com/stretchr/testify v1.11.1 diff --git a/go.sum b/go.sum index 7629371c..1727212c 100644 --- a/go.sum +++ b/go.sum @@ -35,16 +35,16 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/riverqueue/apiframe v0.0.0-20250408034821-b206bbbd0fb4 h1:ejJogJ57bF+jMbvGjZQ6H6LR0NCTDQr30SJ/wSVepgs= github.com/riverqueue/apiframe v0.0.0-20250408034821-b206bbbd0fb4/go.mod h1:6aXA9FSXKkxwjbOUSXdrIOuw478Lvtz/eEu45R4MoQk= -github.com/riverqueue/river v0.25.0 h1:dRnA9ltq9hTYRMmZgBnhqRh3AzBIFVu+qVLpBqy6b+g= -github.com/riverqueue/river v0.25.0/go.mod h1:KetN5MQQu9IjtganQrIt0OFubweeh+qkAqJaCdalwtI= -github.com/riverqueue/river/riverdriver v0.25.0 h1:RkvBWBlybYGaU1DoQ/mSwnWp1hm0FfS8yyksr/dM5tI= -github.com/riverqueue/river/riverdriver v0.25.0/go.mod h1:p2Jvr1N6NfPA+ngIKK8urqxG2vmusX4jO7g/UH/soQY= -github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.0 h1:Ed6dtSSwsj7VwbquG6Bh+2+271sBOL6WyRbisY/XHiY= -github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.0/go.mod h1:h77bWaGJyA5GMKEKmANQN9mhsV3XWYt4sRUx6FtQa84= -github.com/riverqueue/river/rivershared v0.25.0 h1:grjuTHJEVvi4srzcspQ2UXWjISxdqbubQl+9DDg3agQ= -github.com/riverqueue/river/rivershared v0.25.0/go.mod h1:ZdVeOnT8X8PiAZRUfWHc+Ne6fNXqe1oYb2eioZb6URM= -github.com/riverqueue/river/rivertype v0.25.0 h1:DPwd0DGqajLIv9zsB+BOwlum0D1/4Iiqz34+nwIZaZ0= -github.com/riverqueue/river/rivertype v0.25.0/go.mod h1:9bbWVYkr1B/YzW43lUs/Vk/tEYqLrabrZWrtUWQ+Goo= +github.com/riverqueue/river v0.25.1-0.20251001013213-822d5bb676cc h1:G9TV3+iiEMs18MAeexA8gb4KltmHNvIE1uB4ra6k6yY= +github.com/riverqueue/river v0.25.1-0.20251001013213-822d5bb676cc/go.mod h1:KetN5MQQu9IjtganQrIt0OFubweeh+qkAqJaCdalwtI= +github.com/riverqueue/river/riverdriver v0.25.1-0.20251001013213-822d5bb676cc h1:AAMdacQhDBNebXmjuiGaPHeZAoZT77ELelhIMoJLD7E= +github.com/riverqueue/river/riverdriver v0.25.1-0.20251001013213-822d5bb676cc/go.mod h1:p2Jvr1N6NfPA+ngIKK8urqxG2vmusX4jO7g/UH/soQY= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.1-0.20251001013213-822d5bb676cc h1:uOZcH6W9ItGhG4IFziy54Zkbo9O7t4BDELyCnCjX1y4= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.1-0.20251001013213-822d5bb676cc/go.mod h1:h77bWaGJyA5GMKEKmANQN9mhsV3XWYt4sRUx6FtQa84= +github.com/riverqueue/river/rivershared v0.25.1-0.20251001013213-822d5bb676cc h1:7KjQiHOdPioVtGLwO4imQvt7swiGeww1oF4JZfcEaLk= +github.com/riverqueue/river/rivershared v0.25.1-0.20251001013213-822d5bb676cc/go.mod h1:ZdVeOnT8X8PiAZRUfWHc+Ne6fNXqe1oYb2eioZb6URM= +github.com/riverqueue/river/rivertype v0.25.1-0.20251001013213-822d5bb676cc h1:75lj6WkDnCNaF6HJl3PKDQj+TlDypmvfGl9b28uLghA= +github.com/riverqueue/river/rivertype v0.25.1-0.20251001013213-822d5bb676cc/go.mod h1:9bbWVYkr1B/YzW43lUs/Vk/tEYqLrabrZWrtUWQ+Goo= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= diff --git a/handler.go b/handler.go index ccf8ea84..5f83e8fe 100644 --- a/handler.go +++ b/handler.go @@ -28,10 +28,6 @@ import ( "riverqueue.com/riverui/uiendpoints" ) -type endpointsExtensions interface { - Extensions() map[string]bool -} - // EndpointsOpts are the options for creating a new Endpoints bundle. type EndpointsOpts[TTx any] struct { // Tx is an optional transaction to wrap all database operations. It's mainly @@ -42,6 +38,7 @@ type EndpointsOpts[TTx any] struct { type endpoints[TTx any] struct { bundleOpts *uiendpoints.BundleOpts client *river.Client[TTx] + extensions func(ctx context.Context) (map[string]bool, error) opts *EndpointsOpts[TTx] } @@ -66,6 +63,19 @@ func (e *endpoints[TTx]) Configure(bundleOpts *uiendpoints.BundleOpts) { e.bundleOpts = bundleOpts } +func (e *endpoints[TTx]) Extensions(ctx context.Context) (map[string]bool, error) { + if e.extensions == nil { + return map[string]bool{}, nil + } + return e.extensions(ctx) +} + +// SetExtensionsProvider sets the extensions provider function for this bundle. +// This is a private "friend interface" method for use by riverproui. +func (e *endpoints[TTx]) SetExtensionsProvider(provider func(ctx context.Context) (map[string]bool, error)) { + e.extensions = provider +} + func (e *endpoints[TTx]) Validate() error { if e.client == nil { return errors.New("client is required") @@ -73,7 +83,7 @@ func (e *endpoints[TTx]) Validate() error { return nil } -func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger *slog.Logger, mux *http.ServeMux, mountOpts *apiendpoint.MountOpts, extensions map[string]bool) []apiendpoint.EndpointInterface { +func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger *slog.Logger, mux *http.ServeMux, mountOpts *apiendpoint.MountOpts) []apiendpoint.EndpointInterface { driver := e.client.Driver() var executor riverdriver.Executor if e.opts.Tx == nil { @@ -86,7 +96,7 @@ func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger Client: e.client, DB: executor, Driver: driver, - Extensions: extensions, + Extensions: e.Extensions, JobListHideArgsByDefault: e.bundleOpts.JobListHideArgsByDefault, Logger: logger, } @@ -232,12 +242,7 @@ func NewHandler(opts *HandlerOpts) (*Handler, error) { Validator: apitype.NewValidator(), } - extensions := map[string]bool{} - if withExtensions, ok := opts.Endpoints.(endpointsExtensions); ok { - extensions = withExtensions.Extensions() - } - - endpoints := opts.Endpoints.MountEndpoints(baseservice.NewArchetype(opts.Logger), opts.Logger, mux, &mountOpts, extensions) + endpoints := opts.Endpoints.MountEndpoints(baseservice.NewArchetype(opts.Logger), opts.Logger, mux, &mountOpts) var services []startstop.Service diff --git a/handler_api_endpoint.go b/handler_api_endpoint.go index ef2c0544..58a109d5 100644 --- a/handler_api_endpoint.go +++ b/handler_api_endpoint.go @@ -176,62 +176,19 @@ type featuresGetRequest struct{} type featuresGetResponse struct { Extensions map[string]bool `json:"extensions"` - HasClientTable bool `json:"has_client_table"` - HasProducerTable bool `json:"has_producer_table"` - HasSequenceTable bool `json:"has_sequence_table"` - HasWorkflows bool `json:"has_workflows"` JobListHideArgsByDefault bool `json:"job_list_hide_args_by_default"` } func (a *featuresGetEndpoint[TTx]) Execute(ctx context.Context, _ *featuresGetRequest) (*featuresGetResponse, error) { - return dbutil.WithTxV(ctx, a.DB, func(ctx context.Context, execTx riverdriver.ExecutorTx) (*featuresGetResponse, error) { - tx := a.Driver.UnwrapTx(execTx) - - schema := a.Client.Schema() - hasClientTable, err := a.Driver.UnwrapExecutor(tx).TableExists(ctx, &riverdriver.TableExistsParams{ - Schema: schema, - Table: "river_client", - }) - if err != nil { - return nil, err - } - - hasProducerTable, err := a.Driver.UnwrapExecutor(tx).TableExists(ctx, &riverdriver.TableExistsParams{ - Schema: schema, - Table: "river_producer", - }) - if err != nil { - return nil, err - } - - hasSequenceTable, err := a.Driver.UnwrapExecutor(tx).TableExists(ctx, &riverdriver.TableExistsParams{ - Schema: schema, - Table: "river_job_sequence", - }) - if err != nil { - return nil, err - } - - indexResults, err := a.Driver.UnwrapExecutor(tx).IndexesExist(ctx, &riverdriver.IndexesExistParams{ - IndexNames: []string{ - "river_job_workflow_list_active", - "river_job_workflow_scheduling", - }, - Schema: schema, - }) - if err != nil { - return nil, err - } + extensions, err := a.Extensions(ctx) + if err != nil { + return nil, err + } - return &featuresGetResponse{ - Extensions: a.Extensions, - HasClientTable: hasClientTable, - HasProducerTable: hasProducerTable, - HasSequenceTable: hasSequenceTable, - HasWorkflows: indexResults["river_job_workflow_list_active"] || indexResults["river_job_workflow_scheduling"], - JobListHideArgsByDefault: a.JobListHideArgsByDefault, - }, nil - }) + return &featuresGetResponse{ + Extensions: extensions, + JobListHideArgsByDefault: a.JobListHideArgsByDefault, + }, nil } // diff --git a/handler_api_endpoint_test.go b/handler_api_endpoint_test.go index cd23af9d..8c9c5247 100644 --- a/handler_api_endpoint_test.go +++ b/handler_api_endpoint_test.go @@ -14,14 +14,15 @@ import ( "github.com/riverqueue/apiframe/apitest" "github.com/riverqueue/apiframe/apitype" "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdbtest" "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivershared/startstop" "github.com/riverqueue/river/rivershared/util/ptrutil" "github.com/riverqueue/river/rivertype" "riverqueue.com/riverui/internal/apibundle" - "riverqueue.com/riverui/internal/riverinternaltest" "riverqueue.com/riverui/internal/riverinternaltest/testfactory" "riverqueue.com/riverui/internal/uicommontest" ) @@ -37,18 +38,23 @@ func setupEndpoint[TEndpoint any](ctx context.Context, t *testing.T, initFunc fu t.Helper() var ( - logger = riverinternaltest.Logger(t) - client, driver = insertOnlyClient(t, logger) - tx = riverinternaltest.TestTx(ctx, t) - exec = driver.UnwrapExecutor(tx) + logger = riversharedtest.Logger(t) + driver = riverpgxv5.New(riversharedtest.DBPool(ctx, t)) + tx, _ = riverdbtest.TestTxPgxDriver(ctx, t, driver, nil) + exec = driver.UnwrapExecutor(tx) ) + client, err := river.NewClient(driver, &river.Config{ + Logger: logger, + }) + require.NoError(t, err) + endpoint := initFunc(apibundle.APIBundle[pgx.Tx]{ Archetype: riversharedtest.BaseServiceArchetype(t), Client: client, DB: exec, Driver: driver, - Extensions: map[string]bool{}, + Extensions: func(_ context.Context) (map[string]bool, error) { return map[string]bool{}, nil }, Logger: logger, }) @@ -68,7 +74,7 @@ func setupEndpoint[TEndpoint any](ctx context.Context, t *testing.T, initFunc fu func testMountOpts(t *testing.T) *apiendpoint.MountOpts { t.Helper() return &apiendpoint.MountOpts{ - Logger: riverinternaltest.Logger(t), + Logger: riversharedtest.Logger(t), Validator: apitype.NewValidator(), } } @@ -247,10 +253,6 @@ func TestAPIHandlerFeaturesGet(t *testing.T) { require.NoError(t, err) require.Equal(t, &featuresGetResponse{ Extensions: map[string]bool{}, - HasClientTable: false, - HasProducerTable: false, - HasSequenceTable: false, - HasWorkflows: false, JobListHideArgsByDefault: false, }, resp) }) @@ -276,10 +278,6 @@ func TestAPIHandlerFeaturesGet(t *testing.T) { require.NoError(t, err) require.Equal(t, &featuresGetResponse{ Extensions: map[string]bool{}, - HasClientTable: true, - HasProducerTable: true, - HasSequenceTable: true, - HasWorkflows: true, JobListHideArgsByDefault: true, }, resp) }) @@ -288,9 +286,11 @@ func TestAPIHandlerFeaturesGet(t *testing.T) { t.Parallel() endpoint, _ := setupEndpoint(ctx, t, newFeaturesGetEndpoint) - endpoint.Extensions = map[string]bool{ - "test_1": true, - "test_2": false, + endpoint.Extensions = func(_ context.Context) (map[string]bool, error) { + return map[string]bool{ + "test_1": true, + "test_2": false, + }, nil } resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &featuresGetRequest{}) diff --git a/handler_test.go b/handler_test.go index 2a2d8088..b80c1d2b 100644 --- a/handler_test.go +++ b/handler_test.go @@ -3,6 +3,7 @@ package riverui import ( "context" "fmt" + "log/slog" "net/http" "net/http/httptest" "testing" @@ -12,10 +13,12 @@ import ( "github.com/riverqueue/apiframe/apitype" "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdbtest" "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riversharedtest" "riverqueue.com/riverui/internal/handlertest" - "riverqueue.com/riverui/internal/riverinternaltest" "riverqueue.com/riverui/internal/riverinternaltest/testfactory" "riverqueue.com/riverui/internal/uicommontest" "riverqueue.com/riverui/uiendpoints" @@ -24,7 +27,19 @@ import ( func TestNewHandlerIntegration(t *testing.T) { t.Parallel() - createClient := insertOnlyClient + createClient := func(ctx context.Context, tb testing.TB, logger *slog.Logger) (*river.Client[pgx.Tx], riverdriver.Driver[pgx.Tx], pgx.Tx) { + tb.Helper() + + driver := riverpgxv5.New(riversharedtest.DBPool(ctx, tb)) + tx, _ := riverdbtest.TestTxPgxDriver(ctx, tb, driver, nil) + + client, err := river.NewClient(driver, &river.Config{ + Logger: logger, + }) + require.NoError(tb, err) + + return client, driver, tx + } createBundle := func(client *river.Client[pgx.Tx], tx pgx.Tx) uiendpoints.Bundle { return NewEndpoints(client, &EndpointsOpts[pgx.Tx]{ @@ -35,7 +50,7 @@ func TestNewHandlerIntegration(t *testing.T) { createHandler := func(t *testing.T, bundle uiendpoints.Bundle) http.Handler { t.Helper() - logger := riverinternaltest.Logger(t) + logger := riversharedtest.Logger(t) server, err := NewHandler(&HandlerOpts{ DevMode: true, Endpoints: bundle, @@ -105,7 +120,7 @@ func TestMountStaticFiles(t *testing.T) { t.Parallel() var ( - logger = riverinternaltest.Logger(t) + logger = riversharedtest.Logger(t) mux = http.NewServeMux() ) diff --git a/internal/apibundle/api_bundle.go b/internal/apibundle/api_bundle.go index e90127df..fda6db52 100644 --- a/internal/apibundle/api_bundle.go +++ b/internal/apibundle/api_bundle.go @@ -1,6 +1,7 @@ package apibundle import ( + "context" "log/slog" "github.com/riverqueue/river" @@ -14,7 +15,13 @@ type APIBundle[TTx any] struct { Client *river.Client[TTx] DB riverdriver.Executor Driver riverdriver.Driver[TTx] - Extensions map[string]bool + Extensions func(ctx context.Context) (map[string]bool, error) JobListHideArgsByDefault bool Logger *slog.Logger } + +// APIExtensionsProviderSetter is an interface to allow setting the extensions +// provider for the feature flag API. +type APIExtensionsProviderSetter interface { + SetExtensionsProvider(provider func(context.Context) (map[string]bool, error)) +} diff --git a/internal/handlertest/handlertest.go b/internal/handlertest/handlertest.go index 5fcad43d..c0726148 100644 --- a/internal/handlertest/handlertest.go +++ b/internal/handlertest/handlertest.go @@ -13,22 +13,21 @@ import ( "github.com/stretchr/testify/require" "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/rivershared/riversharedtest" - "riverqueue.com/riverui/internal/riverinternaltest" "riverqueue.com/riverui/uiendpoints" ) type APICallFunc = func(t *testing.T, testCaseName, method, path string, payload []byte) -func RunIntegrationTest[TClient any](t *testing.T, createClient func(t *testing.T, logger *slog.Logger) (TClient, riverdriver.Driver[pgx.Tx]), createBundle func(client TClient, tx pgx.Tx) uiendpoints.Bundle, createHandler func(t *testing.T, bundle uiendpoints.Bundle) http.Handler, testRunner func(exec riverdriver.Executor, makeAPICall APICallFunc)) { +func RunIntegrationTest[TClient any](t *testing.T, createClient func(ctx context.Context, tb testing.TB, logger *slog.Logger) (TClient, riverdriver.Driver[pgx.Tx], pgx.Tx), createBundle func(client TClient, tx pgx.Tx) uiendpoints.Bundle, createHandler func(t *testing.T, bundle uiendpoints.Bundle) http.Handler, testRunner func(exec riverdriver.Executor, makeAPICall APICallFunc)) { t.Helper() var ( - ctx = context.Background() - logger = riverinternaltest.Logger(t) - client, driver = createClient(t, logger) - tx = riverinternaltest.TestTx(ctx, t) - exec = driver.UnwrapExecutor(tx) + ctx = t.Context() + logger = riversharedtest.Logger(t) + client, driver, tx = createClient(ctx, t, logger) + exec = driver.UnwrapExecutor(tx) ) makeAPICall := func(t *testing.T, testCaseName, method, path string, payload []byte) { diff --git a/internal/querycacher/query_cacher_test.go b/internal/querycacher/query_cacher_test.go index 905666b7..df813462 100644 --- a/internal/querycacher/query_cacher_test.go +++ b/internal/querycacher/query_cacher_test.go @@ -7,13 +7,13 @@ import ( "github.com/stretchr/testify/require" + "github.com/riverqueue/river/riverdbtest" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivershared/startstoptest" "github.com/riverqueue/river/rivertype" - "riverqueue.com/riverui/internal/riverinternaltest" "riverqueue.com/riverui/internal/riverinternaltest/testfactory" ) @@ -32,7 +32,7 @@ func TestQueryCacher(t *testing.T) { var ( archetype = riversharedtest.BaseServiceArchetype(t) driver = riverpgxv5.New(nil) - tx = riverinternaltest.TestTx(ctx, t) + tx = riverdbtest.TestTxPgx(ctx, t) runQuery = func(ctx context.Context) (map[rivertype.JobState]int, error) { return driver.UnwrapExecutor(tx).JobCountByAllStates(ctx, &riverdriver.JobCountByAllStatesParams{Schema: ""}) } diff --git a/internal/riverinternaltest/riverinternaltest.go b/internal/riverinternaltest/riverinternaltest.go deleted file mode 100644 index 55eac4ee..00000000 --- a/internal/riverinternaltest/riverinternaltest.go +++ /dev/null @@ -1,116 +0,0 @@ -package riverinternaltest - -import ( - "context" - "errors" - "log/slog" - "os" - "sync" - "testing" - - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/stretchr/testify/require" - - "github.com/riverqueue/river/rivershared/slogtest" -) - -// Logger returns a logger suitable for use in tests. -// -// Defaults to informational verbosity. If env is set with `RIVER_DEBUG=true`, -// debug level verbosity is activated. -func Logger(tb testing.TB) *slog.Logger { - tb.Helper() - - if os.Getenv("RIVER_DEBUG") == "1" || os.Getenv("RIVER_DEBUG") == "true" { - return slogtest.NewLogger(tb, &slog.HandlerOptions{Level: slog.LevelDebug}) - } - - return slogtest.NewLogger(tb, nil) -} - -// A pool and mutex to protect it, lazily initialized by TestTx. Once open, this -// pool is never explicitly closed, instead closing implicitly as the package -// tests finish. -var ( - dbPool *pgxpool.Pool //nolint:gochecknoglobals - dbPoolMu sync.RWMutex //nolint:gochecknoglobals -) - -// TestTx starts a test transaction that's rolled back automatically as the test -// case is cleaning itself up. This can be used as a lighter weight alternative -// to `testdb.Manager` in components where it's not necessary to have many -// connections open simultaneously. -func TestTx(ctx context.Context, tb testing.TB) pgx.Tx { - tb.Helper() - - tryPool := func() *pgxpool.Pool { - dbPoolMu.RLock() - defer dbPoolMu.RUnlock() - return dbPool - } - - getPool := func() *pgxpool.Pool { - if dbPool := tryPool(); dbPool != nil { - return dbPool - } - - dbPoolMu.Lock() - defer dbPoolMu.Unlock() - - // Multiple goroutines may have passed the initial `nil` check on start - // up, so check once more to make sure pool hasn't been set yet. - if dbPool != nil { - return dbPool - } - - testDatabaseURL := os.Getenv("TEST_DATABASE_URL") - if testDatabaseURL == "" { - testDatabaseURL = "postgres://localhost/river_test" - } - - var err error - dbPool, err = pgxpool.New(ctx, testDatabaseURL) - require.NoError(tb, err) - - return dbPool - } - - tx, err := getPool().Begin(ctx) - require.NoError(tb, err) - - tb.Cleanup(func() { - err := tx.Rollback(ctx) - if err == nil { - return - } - - // Try to look for an error on rollback because it does occasionally - // reveal a real problem in the way a test is written. However, allow - // tests to roll back their transaction early if they like, so ignore - // `ErrTxClosed`. - if errors.Is(err, pgx.ErrTxClosed) { - return - } - - // In case of a cancelled context during a database operation, which - // happens in many tests, pgx seems to not only roll back the - // transaction, but closes the connection, and returns this error on - // rollback. Allow this error since it's hard to prevent it in our flows - // that use contexts heavily. - if err.Error() == "conn closed" { - return - } - - // Similar to the above, but a newly appeared error that wraps the - // above. As far as I can tell, no error variables are available to use - // with `errors.Is`. - if err.Error() == "failed to deallocate cached statement(s): conn closed" { - return - } - - require.NoError(tb, err) - }) - - return tx -} diff --git a/internal/riveruicmd/riveruicmd.go b/internal/riveruicmd/riveruicmd.go index 8facc522..5528d7e7 100644 --- a/internal/riveruicmd/riveruicmd.go +++ b/internal/riveruicmd/riveruicmd.go @@ -129,7 +129,7 @@ type initServerResult struct { uiHandler *riverui.Handler // River UI handler } -func initServer[TClient any](ctx context.Context, logger *slog.Logger, pathPrefix string, createClient func(*pgxpool.Pool) (TClient, error), createBundler func(TClient) uiendpoints.Bundle) (*initServerResult, error) { +func initServer[TClient any](ctx context.Context, logger *slog.Logger, pathPrefix string, createClient func(*pgxpool.Pool) (TClient, error), createBundle func(TClient) uiendpoints.Bundle) (*initServerResult, error) { if !strings.HasPrefix(pathPrefix, "/") || pathPrefix == "" { return nil, fmt.Errorf("invalid path prefix: %s", pathPrefix) } @@ -170,7 +170,7 @@ func initServer[TClient any](ctx context.Context, logger *slog.Logger, pathPrefi uiHandler, err := riverui.NewHandler(&riverui.HandlerOpts{ DevMode: devMode, - Endpoints: createBundler(client), + Endpoints: createBundle(client), JobListHideArgsByDefault: jobListHideArgsByDefault, LiveFS: liveFS, Logger: logger, diff --git a/main_test.go b/main_test.go new file mode 100644 index 00000000..d4423e60 --- /dev/null +++ b/main_test.go @@ -0,0 +1,11 @@ +package riverui + +import ( + "testing" + + "github.com/riverqueue/river/rivershared/riversharedtest" +) + +func TestMain(m *testing.M) { + riversharedtest.WrapTestMain(m) +} diff --git a/riverproui/endpoints.go b/riverproui/endpoints.go index a13d0fdb..8472396e 100644 --- a/riverproui/endpoints.go +++ b/riverproui/endpoints.go @@ -1,11 +1,13 @@ package riverproui import ( + "context" "errors" "log/slog" "net/http" "github.com/riverqueue/apiframe/apiendpoint" + "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" "riverqueue.com/riverpro" @@ -47,6 +49,12 @@ type endpoints[TTx any] struct { func (e *endpoints[TTx]) Configure(bundleOpts *uiendpoints.BundleOpts) { e.bundleOpts = bundleOpts + + // Inject extensions provider into OSS via private setter + if s, ok := e.ossEndpoints.(apibundle.APIExtensionsProviderSetter); ok { + s.SetExtensionsProvider(e.Extensions) + } + e.ossEndpoints.Configure(bundleOpts) } @@ -60,7 +68,85 @@ func (e *endpoints[TTx]) Validate() error { return nil } -func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger *slog.Logger, mux *http.ServeMux, mountOpts *apiendpoint.MountOpts, extensions map[string]bool) []apiendpoint.EndpointInterface { +func (e *endpoints[TTx]) Extensions(ctx context.Context) (map[string]bool, error) { + ossDriver := e.client.Driver() + driver, ok := ossDriver.(prodriver.ProDriver[TTx]) + if !ok { + panic("riverpro.Client is not configured with a ProDriver") + } + + var executor prodriver.ProExecutor + if e.proOpts.Tx == nil { + executor = driver.GetProExecutor() + } else { + executor = driver.UnwrapProExecutor(*e.proOpts.Tx) + } + + schema := e.client.Schema() + + execTx, err := executor.Begin(ctx) + if err != nil { + return nil, err + } + defer execTx.Rollback(ctx) + + hasClientTable, err := execTx.TableExists(ctx, &riverdriver.TableExistsParams{ + Schema: schema, + Table: "river_client", + }) + if err != nil { + return nil, err + } + + hasPeriodicJobTable, err := execTx.TableExists(ctx, &riverdriver.TableExistsParams{ + Schema: schema, + Table: "river_periodic_job", + }) + if err != nil { + return nil, err + } + + hasProducerTable, err := execTx.TableExists(ctx, &riverdriver.TableExistsParams{ + Schema: schema, + Table: "river_producer", + }) + if err != nil { + return nil, err + } + + hasSequenceTable, err := execTx.TableExists(ctx, &riverdriver.TableExistsParams{ + Schema: schema, + Table: "river_job_sequence", + }) + if err != nil { + return nil, err + } + + indexResults, err := execTx.IndexesExist(ctx, &riverdriver.IndexesExistParams{ + IndexNames: []string{ + "river_job_workflow_list_active", + "river_job_workflow_scheduling", + }, + Schema: schema, + }) + if err != nil { + return nil, err + } + + hasWorkflows := indexResults["river_job_workflow_list_active"] || indexResults["river_job_workflow_scheduling"] + + return map[string]bool{ + "durable_periodic_jobs": hasPeriodicJobTable, + "producer_queries": true, + "workflow_queries": true, + "has_client_table": hasClientTable, + "has_producer_table": hasProducerTable, + "has_sequence_table": hasSequenceTable, + "has_workflows": hasWorkflows, + }, nil +} + +func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger *slog.Logger, mux *http.ServeMux, mountOpts *apiendpoint.MountOpts) []apiendpoint.EndpointInterface { ossDriver := e.client.Driver() driver, ok := ossDriver.(prodriver.ProDriver[TTx]) if !ok { @@ -79,6 +165,7 @@ func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger Client: e.client.Client, DB: executor, Driver: driver, + Extensions: e.Extensions, JobListHideArgsByDefault: e.bundleOpts.JobListHideArgsByDefault, Logger: logger, }, @@ -86,8 +173,9 @@ func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger DB: executor, } - endpoints := e.ossEndpoints.MountEndpoints(archetype, logger, mux, mountOpts, extensions) + endpoints := e.ossEndpoints.MountEndpoints(archetype, logger, mux, mountOpts) endpoints = append(endpoints, + apiendpoint.Mount(mux, prohandler.NewPeriodicJobListEndpoint(bundle), mountOpts), apiendpoint.Mount(mux, prohandler.NewProducerListEndpoint(bundle), mountOpts), apiendpoint.Mount(mux, prohandler.NewWorkflowCancelEndpoint(bundle), mountOpts), apiendpoint.Mount(mux, prohandler.NewWorkflowGetEndpoint(bundle), mountOpts), @@ -97,10 +185,3 @@ func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger return endpoints } - -func (e *endpoints[TTx]) Extensions() map[string]bool { - return map[string]bool{ - "producer_queries": true, - "workflow_queries": true, - } -} diff --git a/riverproui/endpoints_test.go b/riverproui/endpoints_test.go new file mode 100644 index 00000000..b46cf7a2 --- /dev/null +++ b/riverproui/endpoints_test.go @@ -0,0 +1,216 @@ +package riverproui + +import ( + "context" + "log/slog" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/stretchr/testify/require" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdbtest" + "github.com/riverqueue/river/rivershared/riversharedtest" + + "riverqueue.com/riverpro" + "riverqueue.com/riverpro/driver/riverpropgxv5" + + "riverqueue.com/riverui/internal/uicommontest" +) + +func TestProEndpointsExtensions(t *testing.T) { + t.Parallel() + + // Most of these tests involve schema changes and can't be parallelized without + // causing deadlocks. + ctx := context.Background() + + type testBundle struct { + client *riverpro.Client[pgx.Tx] + endpoint *endpoints[pgx.Tx] + logger *slog.Logger + tx pgx.Tx + } + + setup := func(ctx context.Context, t *testing.T) *testBundle { + t.Helper() + + logger := riversharedtest.Logger(t) + + workers := river.NewWorkers() + river.AddWorker(workers, &uicommontest.NoOpWorker{}) + + // We're making DB schema changes, so we need to disable schema sharing: + driver := riverpropgxv5.New(riversharedtest.DBPool(ctx, t)) + tx, _ := riverdbtest.TestTxPgxDriver(ctx, t, driver, &riverdbtest.TestTxOpts{DisableSchemaSharing: true}) + client, err := riverpro.NewClient(driver, &riverpro.Config{ + Config: river.Config{ + Logger: logger, + Workers: workers, + }, + }) + require.NoError(t, err) + + endpoint := &endpoints[pgx.Tx]{ + client: client, + proOpts: &EndpointsOpts[pgx.Tx]{Tx: &tx}, + } + + return &testBundle{ + client: client, + endpoint: endpoint, + logger: logger, + tx: tx, + } + } + + t.Run("DurablePeriodicJobs", func(t *testing.T) { //nolint:dupl + t.Parallel() + + t.Run("NoPeriodicJobTable", func(t *testing.T) { + t.Parallel() + + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `DROP TABLE IF EXISTS river_periodic_job;`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.False(t, ext["durable_periodic_jobs"]) + }) + + t.Run("WithPeriodicJobTable", func(t *testing.T) { + t.Parallel() + + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `CREATE TABLE IF NOT EXISTS river_periodic_job (id SERIAL PRIMARY KEY);`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.True(t, ext["durable_periodic_jobs"]) + }) + }) + + t.Run("ClientTableDetection", func(t *testing.T) { //nolint:dupl + t.Parallel() + + t.Run("NoClientTable", func(t *testing.T) { + t.Parallel() + + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `DROP TABLE IF EXISTS river_client CASCADE;`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.False(t, ext["has_client_table"]) + }) + + t.Run("WithClientTable", func(t *testing.T) { + t.Parallel() + + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `CREATE TABLE IF NOT EXISTS river_client (id SERIAL PRIMARY KEY);`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.True(t, ext["has_client_table"]) + }) + }) + + t.Run("ProducerTableDetection", func(t *testing.T) { //nolint:dupl + t.Parallel() + + t.Run("NoProducerTable", func(t *testing.T) { + t.Parallel() + + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `DROP TABLE IF EXISTS river_producer;`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.False(t, ext["has_producer_table"]) + }) + + t.Run("WithProducerTable", func(t *testing.T) { + t.Parallel() + + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `CREATE TABLE IF NOT EXISTS river_producer (id SERIAL PRIMARY KEY);`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.True(t, ext["has_producer_table"]) + }) + }) + + t.Run("SequenceTableDetection", func(t *testing.T) { //nolint:dupl + t.Parallel() + + t.Run("NoSequenceTable", func(t *testing.T) { + t.Parallel() + + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `DROP TABLE IF EXISTS river_job_sequence;`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.False(t, ext["has_sequence_table"]) + }) + + t.Run("WithSequenceTable", func(t *testing.T) { + t.Parallel() + + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `CREATE TABLE IF NOT EXISTS river_job_sequence (id SERIAL PRIMARY KEY);`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.True(t, ext["has_sequence_table"]) + }) + }) + + t.Run("WorkflowsDetection", func(t *testing.T) { + t.Parallel() + + t.Run("NoWorkflowIndexes", func(t *testing.T) { + t.Parallel() + + bundle := setup(ctx, t) + + _, err := bundle.tx.Exec(ctx, `DROP INDEX IF EXISTS river_job_workflow_list_active;`) + require.NoError(t, err) + _, err = bundle.tx.Exec(ctx, `DROP INDEX IF EXISTS river_job_workflow_scheduling;`) + require.NoError(t, err) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.False(t, ext["has_workflows"]) + }) + }) + + t.Run("StaticAttributesAlwaysTrue", func(t *testing.T) { + t.Parallel() + + bundle := setup(ctx, t) + + ext, err := bundle.endpoint.Extensions(ctx) + require.NoError(t, err) + require.True(t, ext["producer_queries"]) + require.True(t, ext["workflow_queries"]) + }) +} diff --git a/riverproui/go.mod b/riverproui/go.mod index 36713021..29eb67b7 100644 --- a/riverproui/go.mod +++ b/riverproui/go.mod @@ -8,10 +8,10 @@ require ( github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.7.6 github.com/riverqueue/apiframe v0.0.0-20250708014637-e55c49c01ff7 - github.com/riverqueue/river v0.25.0 - github.com/riverqueue/river/riverdriver v0.25.0 - github.com/riverqueue/river/rivershared v0.25.0 - github.com/riverqueue/river/rivertype v0.25.0 + github.com/riverqueue/river v0.25.1-0.20251001013213-822d5bb676cc + github.com/riverqueue/river/riverdriver v0.25.1-0.20251001013213-822d5bb676cc + github.com/riverqueue/river/rivershared v0.25.1-0.20251001013213-822d5bb676cc + github.com/riverqueue/river/rivertype v0.25.1-0.20251001013213-822d5bb676cc github.com/stretchr/testify v1.11.1 riverqueue.com/riverpro v0.18.0 riverqueue.com/riverpro/driver v0.18.0 @@ -31,7 +31,7 @@ require ( github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.0 // indirect + github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.1-0.20251001013213-822d5bb676cc // indirect github.com/rs/cors v1.11.1 // indirect github.com/samber/slog-http v1.8.2 // indirect github.com/tidwall/gjson v1.18.0 // indirect diff --git a/riverproui/go.sum b/riverproui/go.sum index ab91a07c..037bf55f 100644 --- a/riverproui/go.sum +++ b/riverproui/go.sum @@ -35,16 +35,16 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/riverqueue/apiframe v0.0.0-20250708014637-e55c49c01ff7 h1:A16RTdTAQ2cIY++FPjiQ9yL8/2FXR4wYmaN7SpP3yP0= github.com/riverqueue/apiframe v0.0.0-20250708014637-e55c49c01ff7/go.mod h1:jV49jb/qzxSqwDajmG4N2Cm50KATxblxlSiXMF9Ck1E= -github.com/riverqueue/river v0.25.0 h1:dRnA9ltq9hTYRMmZgBnhqRh3AzBIFVu+qVLpBqy6b+g= -github.com/riverqueue/river v0.25.0/go.mod h1:KetN5MQQu9IjtganQrIt0OFubweeh+qkAqJaCdalwtI= -github.com/riverqueue/river/riverdriver v0.25.0 h1:RkvBWBlybYGaU1DoQ/mSwnWp1hm0FfS8yyksr/dM5tI= -github.com/riverqueue/river/riverdriver v0.25.0/go.mod h1:p2Jvr1N6NfPA+ngIKK8urqxG2vmusX4jO7g/UH/soQY= -github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.0 h1:Ed6dtSSwsj7VwbquG6Bh+2+271sBOL6WyRbisY/XHiY= -github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.0/go.mod h1:h77bWaGJyA5GMKEKmANQN9mhsV3XWYt4sRUx6FtQa84= -github.com/riverqueue/river/rivershared v0.25.0 h1:grjuTHJEVvi4srzcspQ2UXWjISxdqbubQl+9DDg3agQ= -github.com/riverqueue/river/rivershared v0.25.0/go.mod h1:ZdVeOnT8X8PiAZRUfWHc+Ne6fNXqe1oYb2eioZb6URM= -github.com/riverqueue/river/rivertype v0.25.0 h1:DPwd0DGqajLIv9zsB+BOwlum0D1/4Iiqz34+nwIZaZ0= -github.com/riverqueue/river/rivertype v0.25.0/go.mod h1:9bbWVYkr1B/YzW43lUs/Vk/tEYqLrabrZWrtUWQ+Goo= +github.com/riverqueue/river v0.25.1-0.20251001013213-822d5bb676cc h1:G9TV3+iiEMs18MAeexA8gb4KltmHNvIE1uB4ra6k6yY= +github.com/riverqueue/river v0.25.1-0.20251001013213-822d5bb676cc/go.mod h1:KetN5MQQu9IjtganQrIt0OFubweeh+qkAqJaCdalwtI= +github.com/riverqueue/river/riverdriver v0.25.1-0.20251001013213-822d5bb676cc h1:AAMdacQhDBNebXmjuiGaPHeZAoZT77ELelhIMoJLD7E= +github.com/riverqueue/river/riverdriver v0.25.1-0.20251001013213-822d5bb676cc/go.mod h1:p2Jvr1N6NfPA+ngIKK8urqxG2vmusX4jO7g/UH/soQY= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.1-0.20251001013213-822d5bb676cc h1:uOZcH6W9ItGhG4IFziy54Zkbo9O7t4BDELyCnCjX1y4= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.1-0.20251001013213-822d5bb676cc/go.mod h1:h77bWaGJyA5GMKEKmANQN9mhsV3XWYt4sRUx6FtQa84= +github.com/riverqueue/river/rivershared v0.25.1-0.20251001013213-822d5bb676cc h1:7KjQiHOdPioVtGLwO4imQvt7swiGeww1oF4JZfcEaLk= +github.com/riverqueue/river/rivershared v0.25.1-0.20251001013213-822d5bb676cc/go.mod h1:ZdVeOnT8X8PiAZRUfWHc+Ne6fNXqe1oYb2eioZb6URM= +github.com/riverqueue/river/rivertype v0.25.1-0.20251001013213-822d5bb676cc h1:75lj6WkDnCNaF6HJl3PKDQj+TlDypmvfGl9b28uLghA= +github.com/riverqueue/river/rivertype v0.25.1-0.20251001013213-822d5bb676cc/go.mod h1:9bbWVYkr1B/YzW43lUs/Vk/tEYqLrabrZWrtUWQ+Goo= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= diff --git a/riverproui/internal/prohandler/pro_handler_api_endpoints.go b/riverproui/internal/prohandler/pro_handler_api_endpoints.go index 52bf6f52..36d8256f 100644 --- a/riverproui/internal/prohandler/pro_handler_api_endpoints.go +++ b/riverproui/internal/prohandler/pro_handler_api_endpoints.go @@ -39,6 +39,60 @@ func listResponseFrom[T any](data []*T) *listResponse[T] { return &listResponse[T]{Data: data} } +// +// periodicJobListEndpoint +// + +type periodicJobListEndpoint[TTx any] struct { + ProAPIBundle[TTx] + apiendpoint.Endpoint[periodicJobListRequest, listResponse[uitype.RiverPeriodicJob]] +} + +func NewPeriodicJobListEndpoint[TTx any](apiBundle ProAPIBundle[TTx]) *periodicJobListEndpoint[TTx] { + return &periodicJobListEndpoint[TTx]{ProAPIBundle: apiBundle} +} + +func (*periodicJobListEndpoint[TTx]) Meta() *apiendpoint.EndpointMeta { + return &apiendpoint.EndpointMeta{ + Pattern: "GET /api/pro/periodic-jobs", + StatusCode: http.StatusOK, + } +} + +type periodicJobListRequest struct { + Limit *int `json:"-" validate:"omitempty,min=0,max=1000"` // from ExtractRaw +} + +func (req *periodicJobListRequest) ExtractRaw(r *http.Request) error { + if limitStr := r.URL.Query().Get("limit"); limitStr != "" { + limit, err := strconv.Atoi(limitStr) + if err != nil { + return apierror.NewBadRequestf("Couldn't convert `limit` to integer: %s.", err) + } + + req.Limit = &limit + } + + return nil +} + +func (a *periodicJobListEndpoint[TTx]) Execute(ctx context.Context, req *periodicJobListRequest) (*listResponse[uitype.RiverPeriodicJob], error) { + result, err := a.DB.PeriodicJobGetAll(ctx, &riverprodriver.PeriodicJobGetAllParams{ + Max: ptrutil.ValOrDefault(req.Limit, 100), + Schema: a.Client.Schema(), + StaleUpdatedAtHorizon: time.Now().Add(-24 * time.Hour), + }) + if err != nil { + return nil, fmt.Errorf("error listing periodic jobs: %w", err) + } + + return listResponseFrom(sliceutil.Map(result, internalPeriodicJobToSerializablePeriodicJob)), nil +} + +func internalPeriodicJobToSerializablePeriodicJob(internal *riverprodriver.PeriodicJob) *uitype.RiverPeriodicJob { + return (*uitype.RiverPeriodicJob)(internal) +} + // // producerListEndpoint // @@ -74,7 +128,7 @@ func (req *producerListRequest) ExtractRaw(r *http.Request) error { func (a *producerListEndpoint[TTx]) Execute(ctx context.Context, req *producerListRequest) (*listResponse[uitype.RiverProducer], error) { result, err := a.DB.ProducerListByQueue(ctx, &riverprodriver.ProducerListByQueueParams{ QueueName: req.QueueName, - Schema: "", // TODO: need to inject schema from Client or params + Schema: a.Client.Schema(), }) if err != nil { return nil, fmt.Errorf("error listing producers: %w", err) diff --git a/riverproui/internal/prohandler/pro_handler_api_endpoints_test.go b/riverproui/internal/prohandler/pro_handler_api_endpoints_test.go index 00048c61..7a896cd5 100644 --- a/riverproui/internal/prohandler/pro_handler_api_endpoints_test.go +++ b/riverproui/internal/prohandler/pro_handler_api_endpoints_test.go @@ -14,6 +14,7 @@ import ( "github.com/riverqueue/apiframe/apitest" "github.com/riverqueue/apiframe/apitype" "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdbtest" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivershared/startstop" @@ -25,14 +26,13 @@ import ( "riverqueue.com/riverpro/driver/riverpropgxv5" "riverqueue.com/riverui/internal/apibundle" - "riverqueue.com/riverui/internal/riverinternaltest" "riverqueue.com/riverui/internal/riverinternaltest/testfactory" - "riverqueue.com/riverui/internal/uicommontest" + "riverqueue.com/riverui/riverproui/internal/protestfactory" ) type setupEndpointTestBundle struct { client *riverpro.Client[pgx.Tx] - exec riverdriver.ExecutorTx + exec driver.ProExecutorTx logger *slog.Logger tx pgx.Tx } @@ -41,19 +41,27 @@ func setupEndpoint[TEndpoint any](ctx context.Context, t *testing.T, initFunc fu t.Helper() var ( - logger = riverinternaltest.Logger(t) - client, driver = insertOnlyClient(t, logger) - tx = riverinternaltest.TestTx(ctx, t) - exec = driver.UnwrapProExecutor(tx) + logger = riversharedtest.Logger(t) + driver = riverpropgxv5.New(riversharedtest.DBPool(ctx, t)) + tx, _ = riverdbtest.TestTxPgxDriver(ctx, t, driver, nil) + exec = driver.UnwrapProExecutor(tx) ) + client, err := riverpro.NewClient(driver, &riverpro.Config{ + Config: river.Config{ + Logger: logger, + }, + }) + require.NoError(t, err) + endpoint := initFunc(ProAPIBundle[pgx.Tx]{ APIBundle: apibundle.APIBundle[pgx.Tx]{ - Archetype: riversharedtest.BaseServiceArchetype(t), - Client: client.Client, - DB: exec, - Driver: driver, - Extensions: map[string]bool{}, + Archetype: riversharedtest.BaseServiceArchetype(t), + Client: client.Client, + DB: exec, + Driver: driver, + // Extensions aren't needed for any of these test endpoints + Extensions: func(_ context.Context) (map[string]bool, error) { return map[string]bool{}, nil }, Logger: logger, }, Client: client, @@ -73,31 +81,47 @@ func setupEndpoint[TEndpoint any](ctx context.Context, t *testing.T, initFunc fu } } -func insertOnlyClient(t *testing.T, logger *slog.Logger) (*riverpro.Client[pgx.Tx], driver.ProDriver[pgx.Tx]) { +func testMountOpts(t *testing.T) *apiendpoint.MountOpts { t.Helper() + return &apiendpoint.MountOpts{ + Logger: riversharedtest.Logger(t), + Validator: apitype.NewValidator(), + } +} - workers := river.NewWorkers() - river.AddWorker(workers, &uicommontest.NoOpWorker{}) +func TestProAPIHandlerPeriodicJobList(t *testing.T) { + t.Parallel() - driver := riverpropgxv5.New(nil) + ctx := context.Background() - client, err := riverpro.NewClient(driver, &riverpro.Config{ - Config: river.Config{ - Logger: logger, - Workers: workers, - }, + t.Run("Success", func(t *testing.T) { + t.Parallel() + + endpoint, bundle := setupEndpoint(ctx, t, NewPeriodicJobListEndpoint) + + job1 := protestfactory.PeriodicJob(ctx, t, bundle.exec, &protestfactory.PeriodicJobOpts{ID: ptrutil.Ptr("alpha"), NextRunAt: ptrutil.Ptr(time.Now().Add(time.Minute))}) + job2 := protestfactory.PeriodicJob(ctx, t, bundle.exec, &protestfactory.PeriodicJobOpts{ID: ptrutil.Ptr("beta"), NextRunAt: ptrutil.Ptr(time.Now().Add(2 * time.Minute))}) + + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &periodicJobListRequest{}) + require.NoError(t, err) + require.Len(t, resp.Data, 2) + require.Equal(t, job1.ID, resp.Data[0].ID) + require.Equal(t, job2.ID, resp.Data[1].ID) }) - require.NoError(t, err) - return client, driver -} + t.Run("Limit", func(t *testing.T) { + t.Parallel() -func testMountOpts(t *testing.T) *apiendpoint.MountOpts { - t.Helper() - return &apiendpoint.MountOpts{ - Logger: riverinternaltest.Logger(t), - Validator: apitype.NewValidator(), - } + endpoint, bundle := setupEndpoint(ctx, t, NewPeriodicJobListEndpoint) + + job1 := protestfactory.PeriodicJob(ctx, t, bundle.exec, nil) + _ = protestfactory.PeriodicJob(ctx, t, bundle.exec, nil) + + resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &periodicJobListRequest{Limit: ptrutil.Ptr(1)}) + require.NoError(t, err) + require.Len(t, resp.Data, 1) + require.Equal(t, job1.ID, resp.Data[0].ID) + }) } func TestProAPIHandlerWorkflowCancel(t *testing.T) { diff --git a/riverproui/internal/protestfactory/pro_test_factory.go b/riverproui/internal/protestfactory/pro_test_factory.go new file mode 100644 index 00000000..ee145a79 --- /dev/null +++ b/riverproui/internal/protestfactory/pro_test_factory.go @@ -0,0 +1,44 @@ +package protestfactory + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/rivershared/util/ptrutil" + + "riverqueue.com/riverpro/driver" +) + +type PeriodicJobOpts struct { + ID *string + NextRunAt *time.Time + UpdatedAt *time.Time +} + +func PeriodicJob(ctx context.Context, tb testing.TB, exec driver.ProExecutor, opts *PeriodicJobOpts) *driver.PeriodicJob { + tb.Helper() + + if opts == nil { + opts = &PeriodicJobOpts{} + } + + periodicJob, err := exec.PeriodicJobInsert(ctx, &driver.PeriodicJobInsertParams{ + ID: ptrutil.ValOrDefaultFunc(opts.ID, func() string { return fmt.Sprintf("periodic_job_%05d", nextSeq()) }), + NextRunAt: ptrutil.ValOrDefaultFunc(opts.NextRunAt, time.Now), + UpdatedAt: opts.UpdatedAt, + Schema: "", + }) + require.NoError(tb, err) + return periodicJob +} + +var seq int64 = 1 //nolint:gochecknoglobals + +func nextSeq() int { + return int(atomic.AddInt64(&seq, 1)) +} diff --git a/riverproui/internal/uitype/ui_api_types.go b/riverproui/internal/uitype/ui_api_types.go index 0f844951..7490b871 100644 --- a/riverproui/internal/uitype/ui_api_types.go +++ b/riverproui/internal/uitype/ui_api_types.go @@ -13,6 +13,13 @@ type PartitionConfig struct { ByKind bool `json:"by_kind"` } +type RiverPeriodicJob struct { + ID string `json:"id"` + CreatedAt time.Time `json:"created_at"` + NextRunAt time.Time `json:"next_run_at"` + UpdatedAt time.Time `json:"updated_at"` +} + type RiverProducer struct { ID int64 `json:"id"` ClientID string `json:"client_id"` diff --git a/riverproui/pro_handler_test.go b/riverproui/pro_handler_test.go index d1c74859..8a87f70a 100644 --- a/riverproui/pro_handler_test.go +++ b/riverproui/pro_handler_test.go @@ -2,9 +2,11 @@ package riverproui import ( "context" + "encoding/json" "fmt" "log/slog" "net/http" + "net/http/httptest" "testing" "github.com/google/uuid" @@ -12,49 +14,53 @@ import ( "github.com/stretchr/testify/require" "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdbtest" "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/rivershared/riversharedtest" "riverqueue.com/riverpro" + "riverqueue.com/riverpro/driver" "riverqueue.com/riverpro/driver/riverpropgxv5" "riverqueue.com/riverui" "riverqueue.com/riverui/internal/handlertest" - "riverqueue.com/riverui/internal/riverinternaltest" "riverqueue.com/riverui/internal/riverinternaltest/testfactory" "riverqueue.com/riverui/internal/uicommontest" + "riverqueue.com/riverui/riverproui/internal/protestfactory" "riverqueue.com/riverui/uiendpoints" ) -func insertOnlyProClient(t *testing.T, logger *slog.Logger) (*riverpro.Client[pgx.Tx], riverdriver.Driver[pgx.Tx]) { - t.Helper() +func TestProHandlerIntegration(t *testing.T) { + t.Parallel() - workers := river.NewWorkers() - river.AddWorker(workers, &uicommontest.NoOpWorker{}) + createBundle := func(client *riverpro.Client[pgx.Tx], tx pgx.Tx) uiendpoints.Bundle { + return NewEndpoints(client, &EndpointsOpts[pgx.Tx]{Tx: &tx}) + } - driver := riverpropgxv5.New(nil) + createClient := func(ctx context.Context, tb testing.TB, logger *slog.Logger) (*riverpro.Client[pgx.Tx], riverdriver.Driver[pgx.Tx], pgx.Tx) { + tb.Helper() - client, err := riverpro.NewClient(driver, &riverpro.Config{ - Config: river.Config{ - Logger: logger, - Workers: workers, - }, - }) - require.NoError(t, err) + workers := river.NewWorkers() + river.AddWorker(workers, &uicommontest.NoOpWorker{}) - return client, driver -} + driver := riverpropgxv5.New(riversharedtest.DBPool(ctx, tb)) + tx, _ := riverdbtest.TestTxPgxDriver(ctx, tb, driver, nil) -func TestProHandlerIntegration(t *testing.T) { - t.Parallel() + client, err := riverpro.NewClient(driver, &riverpro.Config{ + Config: river.Config{ + Logger: logger, + Workers: workers, + }, + }) + require.NoError(tb, err) - createBundle := func(client *riverpro.Client[pgx.Tx], tx pgx.Tx) uiendpoints.Bundle { - return NewEndpoints(client, &EndpointsOpts[pgx.Tx]{Tx: &tx}) + return client, driver, tx } createHandler := func(t *testing.T, bundle uiendpoints.Bundle) http.Handler { t.Helper() - logger := riverinternaltest.Logger(t) + logger := riversharedtest.Logger(t) opts := &riverui.HandlerOpts{ DevMode: true, Endpoints: bundle, @@ -69,6 +75,11 @@ func TestProHandlerIntegration(t *testing.T) { testRunner := func(exec riverdriver.Executor, makeAPICall handlertest.APICallFunc) { ctx := context.Background() + proExec, ok := exec.(driver.ProExecutor) + require.True(t, ok) + + _ = protestfactory.PeriodicJob(ctx, t, proExec, nil) + queue := testfactory.Queue(ctx, t, exec, nil) workflowID := uuid.New() @@ -76,11 +87,79 @@ func TestProHandlerIntegration(t *testing.T) { workflowID2 := uuid.New() _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Metadata: uicommontest.MustMarshalJSON(t, map[string]uuid.UUID{"workflow_id": workflowID2})}) + // Verify OSS features endpoint is mounted and returns success even w/ Pro bundle: + makeAPICall(t, "FeaturesGet", http.MethodGet, "/api/features", nil) + + makeAPICall(t, "PeriodicJobList", http.MethodGet, "/api/pro/periodic-jobs", nil) makeAPICall(t, "ProducerList", http.MethodGet, "/api/pro/producers?queue_name="+queue.Name, nil) makeAPICall(t, "WorkflowCancel", http.MethodPost, fmt.Sprintf("/api/pro/workflows/%s/cancel", workflowID), nil) makeAPICall(t, "WorkflowGet", http.MethodGet, fmt.Sprintf("/api/pro/workflows/%s", workflowID2), nil) makeAPICall(t, "WorkflowList", http.MethodGet, "/api/pro/workflows", nil) } - handlertest.RunIntegrationTest(t, insertOnlyProClient, createBundle, createHandler, testRunner) + handlertest.RunIntegrationTest(t, createClient, createBundle, createHandler, testRunner) +} + +func TestProFeaturesEndpointResponse(t *testing.T) { + t.Parallel() + + ctx := t.Context() + logger := riversharedtest.Logger(t) + + driver := riverpropgxv5.New(riversharedtest.DBPool(ctx, t)) + tx, _ := riverdbtest.TestTxPgxDriver(ctx, t, driver, &riverdbtest.TestTxOpts{DisableSchemaSharing: true}) + client, err := riverpro.NewClient(driver, &riverpro.Config{ + Config: river.Config{ + Logger: logger, + }, + }) + require.NoError(t, err) + + bundle := NewEndpoints(client, &EndpointsOpts[pgx.Tx]{Tx: &tx}) + + // Reuse the same handler creation pattern as integration tests + handler := func() http.Handler { + logger := riversharedtest.Logger(t) + opts := &riverui.HandlerOpts{ + DevMode: true, + Endpoints: bundle, + LiveFS: false, + Logger: logger, + } + h, err := riverui.NewHandler(opts) + require.NoError(t, err) + return h + }() + + recorder := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/api/features", nil) + + handler.ServeHTTP(recorder, req) + + status := recorder.Result().StatusCode + require.Equal(t, http.StatusOK, status) + + contentType := recorder.Header().Get("Content-Type") + // apiframe sets JSON content-type with charset; allow either exact or prefixed + require.Contains(t, contentType, "application/json") + + var resp struct { + Extensions map[string]bool `json:"extensions"` + JobListHideArgsByDefault bool `json:"job_list_hide_args_by_default"` + } + + require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &resp)) + require.NotNil(t, resp.Extensions) + + // Static flags always true; dynamic flags should also be true because pro migrations run for tests + expectedExtensions := map[string]bool{ + "durable_periodic_jobs": true, // dynamic + "has_client_table": true, // dynamic + "has_producer_table": true, // dynamic + "has_sequence_table": true, // dynamic + "has_workflows": true, // dynamic + "producer_queries": true, // static + "workflow_queries": true, // static + } + require.Equal(t, expectedExtensions, resp.Extensions) } diff --git a/src/components/JobList.stories.ts b/src/components/JobList.stories.ts index a80e06c3..d8c1f124 100644 --- a/src/components/JobList.stories.ts +++ b/src/components/JobList.stories.ts @@ -4,6 +4,7 @@ import { useFeatures } from "@contexts/Features.hook"; import { useSettings } from "@hooks/use-settings"; import { JobState } from "@services/types"; import { jobMinimalFactory } from "@test/factories/job"; +import { createFeatures } from "@test/utils/features"; import { vi } from "vitest"; import JobList from "./JobList"; @@ -38,9 +39,9 @@ export const Running: Story = { { hook: useFeatures, mockValue: { - features: { + features: createFeatures({ jobListHideArgsByDefault: false, - }, + }), }, }, { @@ -64,9 +65,9 @@ export const ArgsHiddenByDefault: Story = { { hook: useFeatures, mockValue: { - features: { + features: createFeatures({ jobListHideArgsByDefault: true, - }, + }), }, }, { @@ -90,9 +91,9 @@ export const ArgsVisibleUserOverride: Story = { { hook: useFeatures, mockValue: { - features: { + features: createFeatures({ jobListHideArgsByDefault: true, - }, + }), }, }, { @@ -116,9 +117,9 @@ export const ArgsHiddenUserOverride: Story = { { hook: useFeatures, mockValue: { - features: { + features: createFeatures({ jobListHideArgsByDefault: false, - }, + }), }, }, { diff --git a/src/components/JobList.test.tsx b/src/components/JobList.test.tsx index eb01df6d..fda42b07 100644 --- a/src/components/JobList.test.tsx +++ b/src/components/JobList.test.tsx @@ -2,6 +2,7 @@ import { FeaturesContext } from "@contexts/Features"; import { useSettings } from "@hooks/use-settings"; import { JobState } from "@services/types"; import { jobMinimalFactory } from "@test/factories/job"; +import { createFeatures } from "@test/utils/features"; import { render, screen } from "@testing-library/react"; import { describe, expect, it, vi } from "vitest"; @@ -33,14 +34,9 @@ vi.mock("@hooks/use-settings", () => ({ describe("JobList", () => { it("shows job args by default", () => { const job = jobMinimalFactory.build(); - const features = { - hasClientTable: false, - hasProducerTable: false, - hasWorkflows: false, + const features = createFeatures({ jobListHideArgsByDefault: false, - producerQueries: false, - workflowQueries: false, - }; + }); // Mock settings with no override (useSettings as unknown as ReturnType).mockReturnValue({ @@ -70,14 +66,9 @@ describe("JobList", () => { it("hides job args when jobListHideArgsByDefault is true", () => { const job = jobMinimalFactory.build(); - const features = { - hasClientTable: false, - hasProducerTable: false, - hasWorkflows: false, + const features = createFeatures({ jobListHideArgsByDefault: true, - producerQueries: false, - workflowQueries: false, - }; + }); // Mock settings with no override (useSettings as unknown as ReturnType).mockReturnValue({ @@ -109,14 +100,9 @@ describe("JobList", () => { it("shows job args when user overrides default hide setting", () => { const job = jobMinimalFactory.build(); - const features = { - hasClientTable: false, - hasProducerTable: false, - hasWorkflows: false, + const features = createFeatures({ jobListHideArgsByDefault: true, // Server default is to hide - producerQueries: false, - workflowQueries: false, - }; + }); // Mock settings with override to show args (useSettings as unknown as ReturnType).mockReturnValue({ @@ -147,14 +133,9 @@ describe("JobList", () => { it("hides job args when user overrides default show setting", () => { const job = jobMinimalFactory.build(); - const features = { - hasClientTable: false, - hasProducerTable: false, - hasWorkflows: false, + const features = createFeatures({ jobListHideArgsByDefault: false, // Server default is to show - producerQueries: false, - workflowQueries: false, - }; + }); // Mock settings with override to hide args (useSettings as unknown as ReturnType).mockReturnValue({ diff --git a/src/components/Layout.tsx b/src/components/Layout.tsx index 9bda8b01..d7b4c3bd 100644 --- a/src/components/Layout.tsx +++ b/src/components/Layout.tsx @@ -1,4 +1,5 @@ import Toast from "@components/Toast"; +import { useFeatures } from "@contexts/Features.hook"; import { useSidebarSetting } from "@contexts/SidebarSetting.hook"; import { Dialog, @@ -7,13 +8,13 @@ import { TransitionChild, } from "@headlessui/react"; import { + CalendarDaysIcon, Cog6ToothIcon, InboxStackIcon, QueueListIcon, RectangleGroupIcon, XMarkIcon, } from "@heroicons/react/24/outline"; -import useFeature from "@hooks/use-feature"; import { Link } from "@tanstack/react-router"; import { Fragment, PropsWithChildren, useMemo } from "react"; @@ -22,7 +23,7 @@ type LayoutProps = PropsWithChildren; const Layout = ({ children }: LayoutProps) => { const { open: sidebarOpen, setOpen: setSidebarOpen } = useSidebarSetting(); - const featureEnabledWorkflows = useFeature("ENABLE_WORKFLOWS", true); + const { features } = useFeatures(); const navigation = useMemo( () => @@ -34,14 +35,15 @@ const Layout = ({ children }: LayoutProps) => { search: {}, }, { href: "/queues", icon: InboxStackIcon, name: "Queues" }, + { href: "/workflows", icon: RectangleGroupIcon, name: "Workflows" }, { - hidden: !featureEnabledWorkflows, - href: "/workflows", - icon: RectangleGroupIcon, - name: "Workflows", + hidden: !features.durablePeriodicJobs, + href: "/periodic-jobs", + icon: CalendarDaysIcon, + name: "Periodic Jobs", }, ].filter((item) => item.hidden === undefined || item.hidden === false), - [featureEnabledWorkflows], + [features.durablePeriodicJobs], ); return ( @@ -163,8 +165,8 @@ const Layout = ({ children }: LayoutProps) => { {/* Static sidebar for desktop */} -
-
+
+