Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .storybook/preview.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export const withFeatures: Decorator = (StoryFn, context) => {
// Default features with story-specific overrides
const features = {
hasProducerTable: true,
producerQueries: true,
...context.parameters?.features,
};

Expand Down
32 changes: 0 additions & 32 deletions common_test.go

This file was deleted.

10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ toolchain go1.24.4
require (
github.com/jackc/pgx/v5 v5.7.6
github.com/riverqueue/apiframe v0.0.0-20250408034821-b206bbbd0fb4
github.com/riverqueue/river v0.25.0
github.com/riverqueue/river/riverdriver v0.25.0
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.0
github.com/riverqueue/river/rivershared v0.25.0
github.com/riverqueue/river/rivertype v0.25.0
github.com/riverqueue/river v0.25.1-0.20251001013213-822d5bb676cc
github.com/riverqueue/river/riverdriver v0.25.1-0.20251001013213-822d5bb676cc
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.1-0.20251001013213-822d5bb676cc
github.com/riverqueue/river/rivershared v0.25.1-0.20251001013213-822d5bb676cc
github.com/riverqueue/river/rivertype v0.25.1-0.20251001013213-822d5bb676cc
github.com/rs/cors v1.11.1
github.com/samber/slog-http v1.8.2
github.com/stretchr/testify v1.11.1
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/apiframe v0.0.0-20250408034821-b206bbbd0fb4 h1:ejJogJ57bF+jMbvGjZQ6H6LR0NCTDQr30SJ/wSVepgs=
github.com/riverqueue/apiframe v0.0.0-20250408034821-b206bbbd0fb4/go.mod h1:6aXA9FSXKkxwjbOUSXdrIOuw478Lvtz/eEu45R4MoQk=
github.com/riverqueue/river v0.25.0 h1:dRnA9ltq9hTYRMmZgBnhqRh3AzBIFVu+qVLpBqy6b+g=
github.com/riverqueue/river v0.25.0/go.mod h1:KetN5MQQu9IjtganQrIt0OFubweeh+qkAqJaCdalwtI=
github.com/riverqueue/river/riverdriver v0.25.0 h1:RkvBWBlybYGaU1DoQ/mSwnWp1hm0FfS8yyksr/dM5tI=
github.com/riverqueue/river/riverdriver v0.25.0/go.mod h1:p2Jvr1N6NfPA+ngIKK8urqxG2vmusX4jO7g/UH/soQY=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.0 h1:Ed6dtSSwsj7VwbquG6Bh+2+271sBOL6WyRbisY/XHiY=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.0/go.mod h1:h77bWaGJyA5GMKEKmANQN9mhsV3XWYt4sRUx6FtQa84=
github.com/riverqueue/river/rivershared v0.25.0 h1:grjuTHJEVvi4srzcspQ2UXWjISxdqbubQl+9DDg3agQ=
github.com/riverqueue/river/rivershared v0.25.0/go.mod h1:ZdVeOnT8X8PiAZRUfWHc+Ne6fNXqe1oYb2eioZb6URM=
github.com/riverqueue/river/rivertype v0.25.0 h1:DPwd0DGqajLIv9zsB+BOwlum0D1/4Iiqz34+nwIZaZ0=
github.com/riverqueue/river/rivertype v0.25.0/go.mod h1:9bbWVYkr1B/YzW43lUs/Vk/tEYqLrabrZWrtUWQ+Goo=
github.com/riverqueue/river v0.25.1-0.20251001013213-822d5bb676cc h1:G9TV3+iiEMs18MAeexA8gb4KltmHNvIE1uB4ra6k6yY=
github.com/riverqueue/river v0.25.1-0.20251001013213-822d5bb676cc/go.mod h1:KetN5MQQu9IjtganQrIt0OFubweeh+qkAqJaCdalwtI=
github.com/riverqueue/river/riverdriver v0.25.1-0.20251001013213-822d5bb676cc h1:AAMdacQhDBNebXmjuiGaPHeZAoZT77ELelhIMoJLD7E=
github.com/riverqueue/river/riverdriver v0.25.1-0.20251001013213-822d5bb676cc/go.mod h1:p2Jvr1N6NfPA+ngIKK8urqxG2vmusX4jO7g/UH/soQY=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.1-0.20251001013213-822d5bb676cc h1:uOZcH6W9ItGhG4IFziy54Zkbo9O7t4BDELyCnCjX1y4=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.25.1-0.20251001013213-822d5bb676cc/go.mod h1:h77bWaGJyA5GMKEKmANQN9mhsV3XWYt4sRUx6FtQa84=
github.com/riverqueue/river/rivershared v0.25.1-0.20251001013213-822d5bb676cc h1:7KjQiHOdPioVtGLwO4imQvt7swiGeww1oF4JZfcEaLk=
github.com/riverqueue/river/rivershared v0.25.1-0.20251001013213-822d5bb676cc/go.mod h1:ZdVeOnT8X8PiAZRUfWHc+Ne6fNXqe1oYb2eioZb6URM=
github.com/riverqueue/river/rivertype v0.25.1-0.20251001013213-822d5bb676cc h1:75lj6WkDnCNaF6HJl3PKDQj+TlDypmvfGl9b28uLghA=
github.com/riverqueue/river/rivertype v0.25.1-0.20251001013213-822d5bb676cc/go.mod h1:9bbWVYkr1B/YzW43lUs/Vk/tEYqLrabrZWrtUWQ+Goo=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
Expand Down
29 changes: 17 additions & 12 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ import (
"riverqueue.com/riverui/uiendpoints"
)

type endpointsExtensions interface {
Extensions() map[string]bool
}

// EndpointsOpts are the options for creating a new Endpoints bundle.
type EndpointsOpts[TTx any] struct {
// Tx is an optional transaction to wrap all database operations. It's mainly
Expand All @@ -42,6 +38,7 @@ type EndpointsOpts[TTx any] struct {
type endpoints[TTx any] struct {
bundleOpts *uiendpoints.BundleOpts
client *river.Client[TTx]
extensions func(ctx context.Context) (map[string]bool, error)
opts *EndpointsOpts[TTx]
}

Expand All @@ -66,14 +63,27 @@ func (e *endpoints[TTx]) Configure(bundleOpts *uiendpoints.BundleOpts) {
e.bundleOpts = bundleOpts
}

func (e *endpoints[TTx]) Extensions(ctx context.Context) (map[string]bool, error) {
if e.extensions == nil {
return map[string]bool{}, nil
}
return e.extensions(ctx)
}

// SetExtensionsProvider sets the extensions provider function for this bundle.
// This is a private "friend interface" method for use by riverproui.
func (e *endpoints[TTx]) SetExtensionsProvider(provider func(ctx context.Context) (map[string]bool, error)) {
e.extensions = provider
}

func (e *endpoints[TTx]) Validate() error {
if e.client == nil {
return errors.New("client is required")
}
return nil
}

func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger *slog.Logger, mux *http.ServeMux, mountOpts *apiendpoint.MountOpts, extensions map[string]bool) []apiendpoint.EndpointInterface {
func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger *slog.Logger, mux *http.ServeMux, mountOpts *apiendpoint.MountOpts) []apiendpoint.EndpointInterface {
driver := e.client.Driver()
var executor riverdriver.Executor
if e.opts.Tx == nil {
Expand All @@ -86,7 +96,7 @@ func (e *endpoints[TTx]) MountEndpoints(archetype *baseservice.Archetype, logger
Client: e.client,
DB: executor,
Driver: driver,
Extensions: extensions,
Extensions: e.Extensions,
JobListHideArgsByDefault: e.bundleOpts.JobListHideArgsByDefault,
Logger: logger,
}
Expand Down Expand Up @@ -232,12 +242,7 @@ func NewHandler(opts *HandlerOpts) (*Handler, error) {
Validator: apitype.NewValidator(),
}

extensions := map[string]bool{}
if withExtensions, ok := opts.Endpoints.(endpointsExtensions); ok {
extensions = withExtensions.Extensions()
}

endpoints := opts.Endpoints.MountEndpoints(baseservice.NewArchetype(opts.Logger), opts.Logger, mux, &mountOpts, extensions)
endpoints := opts.Endpoints.MountEndpoints(baseservice.NewArchetype(opts.Logger), opts.Logger, mux, &mountOpts)

var services []startstop.Service

Expand Down
59 changes: 8 additions & 51 deletions handler_api_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,62 +176,19 @@ type featuresGetRequest struct{}

type featuresGetResponse struct {
Extensions map[string]bool `json:"extensions"`
HasClientTable bool `json:"has_client_table"`
HasProducerTable bool `json:"has_producer_table"`
HasSequenceTable bool `json:"has_sequence_table"`
HasWorkflows bool `json:"has_workflows"`
JobListHideArgsByDefault bool `json:"job_list_hide_args_by_default"`
}

func (a *featuresGetEndpoint[TTx]) Execute(ctx context.Context, _ *featuresGetRequest) (*featuresGetResponse, error) {
return dbutil.WithTxV(ctx, a.DB, func(ctx context.Context, execTx riverdriver.ExecutorTx) (*featuresGetResponse, error) {
tx := a.Driver.UnwrapTx(execTx)

schema := a.Client.Schema()
hasClientTable, err := a.Driver.UnwrapExecutor(tx).TableExists(ctx, &riverdriver.TableExistsParams{
Schema: schema,
Table: "river_client",
})
if err != nil {
return nil, err
}

hasProducerTable, err := a.Driver.UnwrapExecutor(tx).TableExists(ctx, &riverdriver.TableExistsParams{
Schema: schema,
Table: "river_producer",
})
if err != nil {
return nil, err
}

hasSequenceTable, err := a.Driver.UnwrapExecutor(tx).TableExists(ctx, &riverdriver.TableExistsParams{
Schema: schema,
Table: "river_job_sequence",
})
if err != nil {
return nil, err
}

indexResults, err := a.Driver.UnwrapExecutor(tx).IndexesExist(ctx, &riverdriver.IndexesExistParams{
IndexNames: []string{
"river_job_workflow_list_active",
"river_job_workflow_scheduling",
},
Schema: schema,
})
if err != nil {
return nil, err
}
extensions, err := a.Extensions(ctx)
if err != nil {
return nil, err
}

return &featuresGetResponse{
Extensions: a.Extensions,
HasClientTable: hasClientTable,
HasProducerTable: hasProducerTable,
HasSequenceTable: hasSequenceTable,
HasWorkflows: indexResults["river_job_workflow_list_active"] || indexResults["river_job_workflow_scheduling"],
JobListHideArgsByDefault: a.JobListHideArgsByDefault,
}, nil
})
return &featuresGetResponse{
Extensions: extensions,
JobListHideArgsByDefault: a.JobListHideArgsByDefault,
}, nil
}

//
Expand Down
36 changes: 18 additions & 18 deletions handler_api_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ import (
"github.com/riverqueue/apiframe/apitest"
"github.com/riverqueue/apiframe/apitype"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivertype"

"riverqueue.com/riverui/internal/apibundle"
"riverqueue.com/riverui/internal/riverinternaltest"
"riverqueue.com/riverui/internal/riverinternaltest/testfactory"
"riverqueue.com/riverui/internal/uicommontest"
)
Expand All @@ -37,18 +38,23 @@ func setupEndpoint[TEndpoint any](ctx context.Context, t *testing.T, initFunc fu
t.Helper()

var (
logger = riverinternaltest.Logger(t)
client, driver = insertOnlyClient(t, logger)
tx = riverinternaltest.TestTx(ctx, t)
exec = driver.UnwrapExecutor(tx)
logger = riversharedtest.Logger(t)
driver = riverpgxv5.New(riversharedtest.DBPool(ctx, t))
tx, _ = riverdbtest.TestTxPgxDriver(ctx, t, driver, nil)
exec = driver.UnwrapExecutor(tx)
)

client, err := river.NewClient(driver, &river.Config{
Logger: logger,
})
require.NoError(t, err)

endpoint := initFunc(apibundle.APIBundle[pgx.Tx]{
Archetype: riversharedtest.BaseServiceArchetype(t),
Client: client,
DB: exec,
Driver: driver,
Extensions: map[string]bool{},
Extensions: func(_ context.Context) (map[string]bool, error) { return map[string]bool{}, nil },
Logger: logger,
})

Expand All @@ -68,7 +74,7 @@ func setupEndpoint[TEndpoint any](ctx context.Context, t *testing.T, initFunc fu
func testMountOpts(t *testing.T) *apiendpoint.MountOpts {
t.Helper()
return &apiendpoint.MountOpts{
Logger: riverinternaltest.Logger(t),
Logger: riversharedtest.Logger(t),
Validator: apitype.NewValidator(),
}
}
Expand Down Expand Up @@ -247,10 +253,6 @@ func TestAPIHandlerFeaturesGet(t *testing.T) {
require.NoError(t, err)
require.Equal(t, &featuresGetResponse{
Extensions: map[string]bool{},
HasClientTable: false,
HasProducerTable: false,
HasSequenceTable: false,
HasWorkflows: false,
JobListHideArgsByDefault: false,
}, resp)
})
Expand All @@ -276,10 +278,6 @@ func TestAPIHandlerFeaturesGet(t *testing.T) {
require.NoError(t, err)
require.Equal(t, &featuresGetResponse{
Extensions: map[string]bool{},
HasClientTable: true,
HasProducerTable: true,
HasSequenceTable: true,
HasWorkflows: true,
JobListHideArgsByDefault: true,
}, resp)
})
Expand All @@ -288,9 +286,11 @@ func TestAPIHandlerFeaturesGet(t *testing.T) {
t.Parallel()

endpoint, _ := setupEndpoint(ctx, t, newFeaturesGetEndpoint)
endpoint.Extensions = map[string]bool{
"test_1": true,
"test_2": false,
endpoint.Extensions = func(_ context.Context) (map[string]bool, error) {
return map[string]bool{
"test_1": true,
"test_2": false,
}, nil
}

resp, err := apitest.InvokeHandler(ctx, endpoint.Execute, testMountOpts(t), &featuresGetRequest{})
Expand Down
23 changes: 19 additions & 4 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package riverui
import (
"context"
"fmt"
"log/slog"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -12,10 +13,12 @@ import (

"github.com/riverqueue/apiframe/apitype"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/riversharedtest"

"riverqueue.com/riverui/internal/handlertest"
"riverqueue.com/riverui/internal/riverinternaltest"
"riverqueue.com/riverui/internal/riverinternaltest/testfactory"
"riverqueue.com/riverui/internal/uicommontest"
"riverqueue.com/riverui/uiendpoints"
Expand All @@ -24,7 +27,19 @@ import (
func TestNewHandlerIntegration(t *testing.T) {
t.Parallel()

createClient := insertOnlyClient
createClient := func(ctx context.Context, tb testing.TB, logger *slog.Logger) (*river.Client[pgx.Tx], riverdriver.Driver[pgx.Tx], pgx.Tx) {
tb.Helper()

driver := riverpgxv5.New(riversharedtest.DBPool(ctx, tb))
tx, _ := riverdbtest.TestTxPgxDriver(ctx, tb, driver, nil)

client, err := river.NewClient(driver, &river.Config{
Logger: logger,
})
require.NoError(tb, err)

return client, driver, tx
}

createBundle := func(client *river.Client[pgx.Tx], tx pgx.Tx) uiendpoints.Bundle {
return NewEndpoints(client, &EndpointsOpts[pgx.Tx]{
Expand All @@ -35,7 +50,7 @@ func TestNewHandlerIntegration(t *testing.T) {
createHandler := func(t *testing.T, bundle uiendpoints.Bundle) http.Handler {
t.Helper()

logger := riverinternaltest.Logger(t)
logger := riversharedtest.Logger(t)
server, err := NewHandler(&HandlerOpts{
DevMode: true,
Endpoints: bundle,
Expand Down Expand Up @@ -105,7 +120,7 @@ func TestMountStaticFiles(t *testing.T) {
t.Parallel()

var (
logger = riverinternaltest.Logger(t)
logger = riversharedtest.Logger(t)
mux = http.NewServeMux()
)

Expand Down
Loading
Loading