diff --git a/README.md b/README.md index b3d0290..b505d47 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,4 @@ # stream - -- import "." @@ -50,7 +49,6 @@ var ErrFnRequired = fmt.Errorf("nil InterceptFunc, Fn is required") ```go func Any[T any](ctx context.Context, in <-chan T) <-chan any ``` - Any accepts an incoming data channel and converts the channel to a readonly channel of the `any` type. @@ -58,10 +56,9 @@ channel of the `any` type. ```go func Distribute[T any]( - ctx context.Context, in <-chan T, out ...chan<- T, + ctx context.Context, in <-chan T, out ...chan<- T, ) ``` - Distribute accepts an incoming data channel and distributes the data among the supplied outgoing data channels using a dynamic select statement. @@ -74,7 +71,6 @@ ensure that the goroutine is properly terminated. ```go func Drain[T any](ctx context.Context, in <-chan T) ``` - Drain accepts a channel and drains the channel until the channel is closed or the context is canceled. @@ -83,7 +79,6 @@ the context is canceled. ```go func FanIn[T any](ctx context.Context, in ...<-chan T) <-chan T ``` - FanIn accepts incoming data channels and forwards returns a single channel that receives all the data from the supplied channels. @@ -95,10 +90,9 @@ ensure that the goroutine is terminated. ```go func FanOut[T any]( - ctx context.Context, in <-chan T, out ...chan<- T, + ctx context.Context, in <-chan T, out ...chan<- T, ) ``` - FanOut accepts an incoming data channel and copies the data to each of the supplied outgoing data channels. @@ -110,12 +104,11 @@ ensure that the goroutine is properly terminated. ```go func Intercept[T, U any]( - ctx context.Context, - in <-chan T, - fn InterceptFunc[T, U], + ctx context.Context, + in <-chan T, + fn InterceptFunc[T, U], ) <-chan U ``` - Intercept accepts an incoming data channel and a function literal that accepts the incoming data and returns data of the same type and a boolean indicating whether the data should be forwarded to the output channel. The function is @@ -126,10 +119,9 @@ not canceled or the incoming channel remains open. ```go func Pipe[T any]( - ctx context.Context, in <-chan T, out chan<- T, + ctx context.Context, in <-chan T, out chan<- T, ) ``` - Pipe accepts an incoming data channel and pipes it to the supplied outgoing data channel. @@ -141,25 +133,25 @@ that the goroutine is properly terminated. ```go type DurationScaler struct { - // Interval is the number the current step must be divisible by in order - // to modify the time.Duration. - Interval int - - // ScalingFactor is a value between -1 and 1 that is used to modify the - // time.Duration of a ticker or timer. The value is multiplied by - // the ScalingFactor is multiplied by the duration for scaling. - // - // For example, if the ScalingFactor is 0.5, then the duration will be - // multiplied by 0.5. If the ScalingFactor is -0.5, then the duration will - // be divided by 0.5. If the ScalingFactor is 0, then the duration will - // not be modified. - // - // A negative ScalingFactor will cause the duration to decrease as the - // step value increases causing the ticker or timer to fire more often - // and create more routines. A positive ScalingFactor will cause the - // duration to increase as the step value increases causing the ticker - // or timer to fire less often and create less routines. - ScalingFactor float64 + // Interval is the number the current step must be divisible by in order + // to modify the time.Duration. + Interval int + + // ScalingFactor is a value between -1 and 1 that is used to modify the + // time.Duration of a ticker or timer. The value is multiplied by + // the ScalingFactor is multiplied by the duration for scaling. + // + // For example, if the ScalingFactor is 0.5, then the duration will be + // multiplied by 0.5. If the ScalingFactor is -0.5, then the duration will + // be divided by 0.5. If the ScalingFactor is 0, then the duration will + // not be modified. + // + // A negative ScalingFactor will cause the duration to decrease as the + // step value increases causing the ticker or timer to fire more often + // and create more routines. A positive ScalingFactor will cause the + // duration to increase as the step value increases causing the ticker + // or timer to fire less often and create less routines. + ScalingFactor float64 } ``` @@ -172,18 +164,19 @@ a configured step value and modifier (between -1 and 1) value. type InterceptFunc[T, U any] func(context.Context, T) (U, bool) ``` + #### type Scaler ```go type Scaler[T, U any] struct { - Wait time.Duration - Life time.Duration - Fn InterceptFunc[T, U] - - // WaitModifier is used to modify the Wait time based on the number of - // times the Scaler has scaled up. This is useful for systems - // that are CPU bound and need to scale up more/less quickly. - WaitModifier DurationScaler + Wait time.Duration + Life time.Duration + Fn InterceptFunc[T, U] + + // WaitModifier is used to modify the Wait time based on the number of + // times the Scaler has scaled up. This is useful for systems + // that are CPU bound and need to scale up more/less quickly. + WaitModifier DurationScaler } ``` @@ -199,7 +192,7 @@ To use Scalar, simply create a new Scaler[T, U], configuring the Wait, Life, and InterceptFunc fields. These fields are what configure the functionality of the Scaler. -NOTE: Fn is REQUIRED! +NOTE: Fn is REQUIRED! Defaults: Wait = 1ns, Life = 1µs After creating the Scaler instance and configuring it, call the Exec method passing the appropriate context and input channel. @@ -216,7 +209,6 @@ successful send occurs. (This should only loop twice). ```go func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) ``` - Exec starts the internal Scaler routine (the first layer of processing) and returns the output channel where the resulting data from the Fn function will be sent.