From 954d9e00f1d242552eb77122ae419a8a419c6c95 Mon Sep 17 00:00:00 2001 From: benjivesterby Date: Mon, 27 Mar 2023 12:12:06 +0000 Subject: [PATCH] docs(README.md): update go documentation --- README.md | 184 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 182 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9de3cf9..83f4444 100644 --- a/README.md +++ b/README.md @@ -37,5 +37,185 @@ be seen on our [Benchmark Report Card]. [Benchmark Report Card]: https://devnw.github.io/stream/dev/bench/ - - +# stream +-- + import "." + +Package stream provides a set of generic functions for working concurrent design +patterns in Go. + +## Usage + +```go +var ErrFnRequired = fmt.Errorf("nil InterceptFunc, Fn is required") +``` + +#### func Any + +```go +func Any[T any](ctx context.Context, in <-chan T) <-chan any +``` +Any accepts an incoming data channel and converts the channel to a readonly +channel of the `any` type. + +#### func Distribute + +```go +func Distribute[T any]( + ctx context.Context, in <-chan T, out ...chan<- T, +) +``` +Distribute accepts an incoming data channel and distributes the data among the +supplied outgoing data channels using a dynamic select statement. + +NOTE: Execute the Distribute function in a goroutine if parallel execution is +desired. Canceling the context or closing the incoming channel is important to +ensure that the goroutine is properly terminated. + +#### func Drain + +```go +func Drain[T any](ctx context.Context, in <-chan T) +``` +Drain accepts a channel and drains the channel until the channel is closed or +the context is canceled. + +#### func FanIn + +```go +func FanIn[T any](ctx context.Context, in ...<-chan T) <-chan T +``` +FanIn accepts incoming data channels and forwards returns a single channel that +receives all the data from the supplied channels. + +NOTE: The transfer takes place in a goroutine for each channel so ensuring that +the context is canceled or the incoming channels are closed is important to +ensure that the goroutine is terminated. + +#### func FanOut + +```go +func FanOut[T any]( + ctx context.Context, in <-chan T, out ...chan<- T, +) +``` +FanOut accepts an incoming data channel and copies the data to each of the +supplied outgoing data channels. + +NOTE: Execute the FanOut function in a goroutine if parallel execution is +desired. Canceling the context or closing the incoming channel is important to +ensure that the goroutine is properly terminated. + +#### func Intercept + +```go +func Intercept[T, U any]( + ctx context.Context, + in <-chan T, + fn InterceptFunc[T, U], +) <-chan U +``` +Intercept accepts an incoming data channel and a function literal that accepts +the incoming data and returns data of the same type and a boolean indicating +whether the data should be forwarded to the output channel. The function is +executed for each data item in the incoming channel as long as the context is +not canceled or the incoming channel remains open. + +#### func Pipe + +```go +func Pipe[T any]( + ctx context.Context, in <-chan T, out chan<- T, +) +``` +Pipe accepts an incoming data channel and pipes it to the supplied outgoing data +channel. + +NOTE: Execute the Pipe function in a goroutine if parallel execution is desired. +Canceling the context or closing the incoming channel is important to ensure +that the goroutine is properly terminated. + +#### type DurationScaler + +```go +type DurationScaler struct { + // Interval is the number the current step must be divisible by in order + // to modify the time.Duration. + Interval int + + // ScalingFactor is a value between -1 and 1 that is used to modify the + // time.Duration of a ticker or timer. The value is multiplied by + // the ScalingFactor is multiplied by the duration for scaling. + // + // For example, if the ScalingFactor is 0.5, then the duration will be + // multiplied by 0.5. If the ScalingFactor is -0.5, then the duration will + // be divided by 0.5. If the ScalingFactor is 0, then the duration will + // not be modified. + // + // A negative ScalingFactor will cause the duration to decrease as the + // step value increases causing the ticker or timer to fire more often + // and create more routines. A positive ScalingFactor will cause the + // duration to increase as the step value increases causing the ticker + // or timer to fire less often and create less routines. + ScalingFactor float64 +} +``` + +DurationScaler is used to modify the time.Duration of a ticker or timer based on +a configured step value and modifier (between -1 and 1) value. + +#### type InterceptFunc + +```go +type InterceptFunc[T, U any] func(context.Context, T) (U, bool) +``` + + +#### type Scaler + +```go +type Scaler[T, U any] struct { + Wait time.Duration + Life time.Duration + Fn InterceptFunc[T, U] + + // WaitModifier is used to modify the Wait time based on the number of + // times the Scaler has scaled up. This is useful for systems + // that are CPU bound and need to scale up more/less quickly. + WaitModifier DurationScaler +} +``` + +Scaler implements generic auto-scaling logic which starts with a net-zero set of +processing routines (with the exception of the channel listener) and then scales +up and down based on the CPU contention of a system and the speed at which the +InterceptionFunc is able to process data. Once the incoming channel becomes +blocked (due to nothing being sent) each of the spawned routines will finish out +their execution of Fn and then the internal timer will collapse brining the +routine count back to zero until there is more to be done. + +To use Scalar, simply create a new Scaler[T, U], configuring the Wait, Life, and +InterceptFunc fields. These fields are what configure the functionality of the +Scaler. + +NOTE: Fn is REQUIRED! + +After creating the Scaler instance and configuring it, call the Exec method +passing the appropriate context and input channel. + +Internally the Scaler implementation will wait for data on the incoming channel +and attempt to send it to a layer2 channel. If the layer2 channel is blocking +and the Wait time has been reached, then the Scaler will spawn a new layer2 +which will increase throughput for the Scaler, and Scaler will attempt to send +the data to the layer2 channel once more. This process will repeat until a +successful send occurs. (This should only loop twice). + +#### func (Scaler[T, U]) Exec + +```go +func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) +``` +Exec starts the internal Scaler routine (the first layer of processing) and +returns the output channel where the resulting data from the Fn function will be +sent. +sent.