Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 88 additions & 20 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,13 @@ type Config struct {
// Defaults to DefaultRetryPolicy.
RetryPolicy ClientRetryPolicy

// schema is a non-standard schema where River tables are located. All table
// references in database queries will use this value as a prefix.
//
// Defaults to empty, which causes the client to look for tables using the
// setting of Postgres `search_path`.
schema string
Comment on lines +285 to +290
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, would we be better off defaulting to this explicitly setting public vs letting it default to conn string search path? Probably not, just trying to think through the full cycle of existing users migrating to this setup once we have full schema support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thinkk at this point we'll probably have to leave empty as default for backwards compatibility if nothing else.

Starting from scratch, yeah fair question. If we gave it a default though, we'd have to also provide a way to zero it for anyone who'd prefer to use search_path by choice.


// SkipUnknownJobCheck is a flag to control whether the client should skip
// checking to see if a registered worker exists in the client's worker bundle
// for a job arg prior to insertion.
Expand Down Expand Up @@ -376,6 +383,7 @@ func (c *Config) WithDefaults() *Config {
ReindexerSchedule: c.ReindexerSchedule,
RescueStuckJobsAfter: valutil.ValOrDefault(c.RescueStuckJobsAfter, rescueAfter),
RetryPolicy: retryPolicy,
schema: c.schema,
SkipUnknownJobCheck: c.SkipUnknownJobCheck,
Test: c.Test,
TestOnly: c.TestOnly,
Expand Down Expand Up @@ -668,7 +676,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
// uses listen/notify. Instead, each service polls for changes it's
// interested in. e.g. Elector polls to see if leader has expired.
if !config.PollOnly {
client.notifier = notifier.New(archetype, driver.GetListener())
client.notifier = notifier.New(archetype, driver.GetListener(config.schema))
client.services = append(client.services, client.notifier)
}
} else {
Expand Down Expand Up @@ -705,6 +713,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
CancelledJobRetentionPeriod: config.CancelledJobRetentionPeriod,
CompletedJobRetentionPeriod: config.CompletedJobRetentionPeriod,
DiscardedJobRetentionPeriod: config.DiscardedJobRetentionPeriod,
Schema: config.schema,
Timeout: config.JobCleanerTimeout,
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, jobCleaner)
Expand All @@ -715,6 +724,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
jobRescuer := maintenance.NewRescuer(archetype, &maintenance.JobRescuerConfig{
ClientRetryPolicy: config.RetryPolicy,
RescueAfter: config.RescueStuckJobsAfter,
Schema: config.schema,
WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory {
if workerInfo, ok := config.Workers.workersMap[kind]; ok {
return workerInfo.workUnitFactory
Expand All @@ -730,6 +740,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
jobScheduler := maintenance.NewJobScheduler(archetype, &maintenance.JobSchedulerConfig{
Interval: config.schedulerInterval,
NotifyInsert: client.maybeNotifyInsertForQueues,
Schema: config.schema,
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, jobScheduler)
client.testSignals.jobScheduler = &jobScheduler.TestSignals
Expand All @@ -750,6 +761,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
{
queueCleaner := maintenance.NewQueueCleaner(archetype, &maintenance.QueueCleanerConfig{
RetentionPeriod: maintenance.QueueRetentionPeriodDefault,
Schema: config.schema,
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, queueCleaner)
client.testSignals.queueCleaner = &queueCleaner.TestSignals
Expand All @@ -761,7 +773,10 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
scheduleFunc = config.ReindexerSchedule.Next
}

reindexer := maintenance.NewReindexer(archetype, &maintenance.ReindexerConfig{ScheduleFunc: scheduleFunc}, driver.GetExecutor())
reindexer := maintenance.NewReindexer(archetype, &maintenance.ReindexerConfig{
ScheduleFunc: scheduleFunc,
Schema: config.schema,
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, reindexer)
client.testSignals.reindexer = &reindexer.TestSignals
}
Expand Down Expand Up @@ -1236,14 +1251,18 @@ func (c *Client[TTx]) jobCancel(ctx context.Context, exec riverdriver.Executor,
ID: jobID,
CancelAttemptedAt: c.baseService.Time.NowUTC(),
ControlTopic: string(notifier.NotificationTopicControl),
Schema: c.config.schema,
})
}

// JobDelete deletes the job with the given ID from the database, returning the
// deleted row if it was deleted. Jobs in the running state are not deleted,
// instead returning rivertype.ErrJobRunning.
func (c *Client[TTx]) JobDelete(ctx context.Context, id int64) (*rivertype.JobRow, error) {
return c.driver.GetExecutor().JobDelete(ctx, id)
return c.driver.GetExecutor().JobDelete(ctx, &riverdriver.JobDeleteParams{
ID: id,
Schema: c.config.schema,
})
}

// JobDelete deletes the job with the given ID from the database, returning the
Expand All @@ -1253,20 +1272,29 @@ func (c *Client[TTx]) JobDelete(ctx context.Context, id int64) (*rivertype.JobRo
// until the transaction commits, and if the transaction rolls back, so too is
// the deleted job.
func (c *Client[TTx]) JobDeleteTx(ctx context.Context, tx TTx, id int64) (*rivertype.JobRow, error) {
return c.driver.UnwrapExecutor(tx).JobDelete(ctx, id)
return c.driver.UnwrapExecutor(tx).JobDelete(ctx, &riverdriver.JobDeleteParams{
ID: id,
Schema: c.config.schema,
})
}

// JobGet fetches a single job by its ID. Returns the up-to-date JobRow for the
// specified jobID if it exists. Returns ErrNotFound if the job doesn't exist.
func (c *Client[TTx]) JobGet(ctx context.Context, id int64) (*rivertype.JobRow, error) {
return c.driver.GetExecutor().JobGetByID(ctx, id)
return c.driver.GetExecutor().JobGetByID(ctx, &riverdriver.JobGetByIDParams{
ID: id,
Schema: c.config.schema,
})
}

// JobGetTx fetches a single job by its ID, within a transaction. Returns the
// up-to-date JobRow for the specified jobID if it exists. Returns ErrNotFound
// if the job doesn't exist.
func (c *Client[TTx]) JobGetTx(ctx context.Context, tx TTx, id int64) (*rivertype.JobRow, error) {
return c.driver.UnwrapExecutor(tx).JobGetByID(ctx, id)
return c.driver.UnwrapExecutor(tx).JobGetByID(ctx, &riverdriver.JobGetByIDParams{
ID: id,
Schema: c.config.schema,
})
}

// JobRetry updates the job with the given ID to make it immediately available
Expand All @@ -1278,7 +1306,10 @@ func (c *Client[TTx]) JobGetTx(ctx context.Context, tx TTx, id int64) (*rivertyp
// MaxAttempts is also incremented by one if the job has already exhausted its
// max attempts.
func (c *Client[TTx]) JobRetry(ctx context.Context, id int64) (*rivertype.JobRow, error) {
return c.driver.GetExecutor().JobRetry(ctx, id)
return c.driver.GetExecutor().JobRetry(ctx, &riverdriver.JobRetryParams{
ID: id,
Schema: c.config.schema,
})
}

// JobRetryTx updates the job with the given ID to make it immediately available
Expand All @@ -1295,7 +1326,10 @@ func (c *Client[TTx]) JobRetry(ctx context.Context, id int64) (*rivertype.JobRow
// MaxAttempts is also incremented by one if the job has already exhausted its
// max attempts.
func (c *Client[TTx]) JobRetryTx(ctx context.Context, tx TTx, id int64) (*rivertype.JobRow, error) {
return c.driver.UnwrapExecutor(tx).JobRetry(ctx, id)
return c.driver.UnwrapExecutor(tx).JobRetry(ctx, &riverdriver.JobRetryParams{
ID: id,
Schema: c.config.schema,
})
}

// ID returns the unique ID of this client as set in its config or
Expand Down Expand Up @@ -1561,7 +1595,10 @@ func (c *Client[TTx]) validateParamsAndInsertMany(ctx context.Context, tx riverd
// by the PeriodicJobEnqueuer.
func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error) {
return c.insertManyShared(ctx, tx, insertParams, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
results, err := c.pilot.JobInsertMany(ctx, tx, insertParams)
results, err := c.pilot.JobInsertMany(ctx, tx, &riverdriver.JobInsertFastManyParams{
Jobs: insertParams,
Schema: c.config.schema,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1731,7 +1768,10 @@ func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.Executo
}

results, err := c.insertManyShared(ctx, tx, insertParams, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
count, err := tx.JobInsertFastManyNoReturning(ctx, insertParams)
count, err := tx.JobInsertFastManyNoReturning(ctx, &riverdriver.JobInsertFastManyParams{
Jobs: insertParams,
Schema: c.config.schema,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1773,8 +1813,9 @@ func (c *Client[TTx]) maybeNotifyInsertForQueues(ctx context.Context, tx riverdr
}

err := tx.NotifyMany(ctx, &riverdriver.NotifyManyParams{
Topic: string(notifier.NotificationTopicInsert),
Payload: payloads,
Schema: c.config.schema,
Topic: string(notifier.NotificationTopicInsert),
})
if err != nil {
c.baseService.Logger.ErrorContext(
Expand Down Expand Up @@ -1804,6 +1845,7 @@ func (c *Client[TTx]) notifyQueuePauseOrResume(ctx context.Context, tx riverdriv

err = tx.NotifyMany(ctx, &riverdriver.NotifyManyParams{
Payload: []string{string(payload)},
Schema: c.config.schema,
Topic: string(notifier.NotificationTopicControl),
})
if err != nil {
Expand Down Expand Up @@ -1850,6 +1892,7 @@ func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) *pr
QueueEventCallback: c.subscriptionManager.distributeQueueEvent,
RetryPolicy: c.config.RetryPolicy,
SchedulerInterval: c.config.schedulerInterval,
Schema: c.config.schema,
StaleProducerRetentionPeriod: 5 * time.Minute,
Workers: c.config.Workers,
})
Expand Down Expand Up @@ -1966,7 +2009,10 @@ func (c *Client[TTx]) Queues() *QueueBundle { return c.queues }
// The provided context is used for the underlying Postgres query and can be
// used to cancel the operation or apply a timeout.
func (c *Client[TTx]) QueueGet(ctx context.Context, name string) (*rivertype.Queue, error) {
return c.driver.GetExecutor().QueueGet(ctx, name)
return c.driver.GetExecutor().QueueGet(ctx, &riverdriver.QueueGetParams{
Name: name,
Schema: c.config.schema,
})
}

// QueueGetTx returns the queue with the given name. If the queue has not recently
Expand All @@ -1975,7 +2021,10 @@ func (c *Client[TTx]) QueueGet(ctx context.Context, name string) (*rivertype.Que
// The provided context is used for the underlying Postgres query and can be
// used to cancel the operation or apply a timeout.
func (c *Client[TTx]) QueueGetTx(ctx context.Context, tx TTx, name string) (*rivertype.Queue, error) {
return c.driver.UnwrapExecutor(tx).QueueGet(ctx, name)
return c.driver.UnwrapExecutor(tx).QueueGet(ctx, &riverdriver.QueueGetParams{
Name: name,
Schema: c.config.schema,
})
}

// QueueListResult is the result of a job list operation. It contains a list of
Expand All @@ -2001,7 +2050,10 @@ func (c *Client[TTx]) QueueList(ctx context.Context, params *QueueListParams) (*
params = NewQueueListParams()
}

queues, err := c.driver.GetExecutor().QueueList(ctx, int(params.paginationCount))
queues, err := c.driver.GetExecutor().QueueList(ctx, &riverdriver.QueueListParams{
Limit: int(params.paginationCount),
Schema: c.config.schema,
})
if err != nil {
return nil, err
}
Expand All @@ -2025,7 +2077,10 @@ func (c *Client[TTx]) QueueListTx(ctx context.Context, tx TTx, params *QueueList
params = NewQueueListParams()
}

queues, err := c.driver.UnwrapExecutor(tx).QueueList(ctx, int(params.paginationCount))
queues, err := c.driver.UnwrapExecutor(tx).QueueList(ctx, &riverdriver.QueueListParams{
Limit: int(params.paginationCount),
Schema: c.config.schema,
})
if err != nil {
return nil, err
}
Expand All @@ -2051,7 +2106,10 @@ func (c *Client[TTx]) QueuePause(ctx context.Context, name string, opts *QueuePa
}
defer tx.Rollback(ctx)

if err := tx.QueuePause(ctx, name); err != nil {
if err := tx.QueuePause(ctx, &riverdriver.QueuePauseParams{
Name: name,
Schema: c.config.schema,
}); err != nil {
return err
}

Expand All @@ -2076,7 +2134,10 @@ func (c *Client[TTx]) QueuePause(ctx context.Context, name string, opts *QueuePa
func (c *Client[TTx]) QueuePauseTx(ctx context.Context, tx TTx, name string, opts *QueuePauseOpts) error {
executorTx := c.driver.UnwrapExecutor(tx)

if err := executorTx.QueuePause(ctx, name); err != nil {
if err := executorTx.QueuePause(ctx, &riverdriver.QueuePauseParams{
Name: name,
Schema: c.config.schema,
}); err != nil {
return err
}

Expand Down Expand Up @@ -2106,7 +2167,10 @@ func (c *Client[TTx]) QueueResume(ctx context.Context, name string, opts *QueueP
}
defer tx.Rollback(ctx)

if err := tx.QueueResume(ctx, name); err != nil {
if err := tx.QueueResume(ctx, &riverdriver.QueueResumeParams{
Name: name,
Schema: c.config.schema,
}); err != nil {
return err
}

Expand All @@ -2132,7 +2196,10 @@ func (c *Client[TTx]) QueueResume(ctx context.Context, name string, opts *QueueP
func (c *Client[TTx]) QueueResumeTx(ctx context.Context, tx TTx, name string, opts *QueuePauseOpts) error {
executorTx := c.driver.UnwrapExecutor(tx)

if err := executorTx.QueueResume(ctx, name); err != nil {
if err := executorTx.QueueResume(ctx, &riverdriver.QueueResumeParams{
Name: name,
Schema: c.config.schema,
}); err != nil {
return err
}

Expand Down Expand Up @@ -2202,8 +2269,9 @@ func (c *Client[TTx]) queueUpdate(ctx context.Context, executorTx riverdriver.Ex
}

if err := executorTx.NotifyMany(ctx, &riverdriver.NotifyManyParams{
Topic: string(notifier.NotificationTopicControl),
Payload: []string{string(payload)},
Schema: c.config.schema,
Topic: string(notifier.NotificationTopicControl),
}); err != nil {
return nil, err
}
Expand Down
Loading
Loading