-
Notifications
You must be signed in to change notification settings - Fork 134
Closed
Description
Why?
- Opening a span for tracing and stashing it on the
ctx - Generic "starting job" logging
(Distinct from https://riverqueue.com/docs/subscriptions because this needs to be synchronous / needs direct access to ctx.)
User experience
Define an optional hooks interface
// WorkerWithHooks is a job worker that provides hooks.
type WorkerWithHooks[T JobArgs] interface {
PreRun(ctx context.Context, job *Job[T]) (context.Context, error) // Naming inspired by cobra's PreRun
}and then use a type assertion to check if this optional interface is satisfied by a river.Worker.
Implementation
E.g. in jobExecutor{}.execute() you could make it possible to run a hook like
ctx, err := e.WorkUnit.PreWork(ctx)And you could invoke in wrapperWorkUnit{}.PreWork() similar to Work()
// noOpPreRun is a `PreRun()` function that does nothing.
func noOpPreRun[T JobArgs](ctx context.Context, _ *Job[T]) (context.Context, error) {
return ctx, nil
}
func (w *workUnitFactoryWrapper[T]) MakeUnit(jobRow *rivertype.JobRow) workunit.WorkUnit {
preRun := noOpPreRun[T]
// To make this type assertion cheaper, it should probably happen when
// `workUnitFactoryWrapper{}` is created (not when `MakeUnit()` is invoked)
wh, ok := w.worker.(WorkerWithHooks[T])
if ok && wh.PreRun != nil {
preRun = wh.PreRun
}
return &wrapperWorkUnit[T]{jobRow: jobRow, worker: w.worker, preRun: preRun}
}
// wrapperWorkUnit implements workUnit for a job and Worker.
type wrapperWorkUnit[T JobArgs] struct {
job *Job[T] // not set until after UnmarshalJob is invoked
jobRow *rivertype.JobRow
worker Worker[T]
preRun func(ctx context.Context, job *Job[T]) (context.Context, error)
}mfrister
Metadata
Metadata
Assignees
Labels
No labels