Skip to content
/ stream Public
generated from devnw/oss-template
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions .github/workflows/golangci-lint.yml

This file was deleted.

77 changes: 67 additions & 10 deletions scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ import (
"time"
)

// MinWait is the absolute minimum wait time for the ticker. This is used to
// prevent the ticker from firing too often and causing too small of a wait
// time.
const MinWait = time.Millisecond

// MinLife is the minimum life time for the scaler. This is used to prevent
// the scaler from exiting too quickly, and causing too small of a lifetime.
const MinLife = time.Millisecond

// Scaler implements generic auto-scaling logic which starts with a net-zero
// set of processing routines (with the exception of the channel listener) and
// then scales up and down based on the CPU contention of a system and the speed
Expand Down Expand Up @@ -42,6 +51,10 @@ type Scaler[T, U any] struct {
// that are CPU bound and need to scale up more/less quickly.
WaitModifier DurationScaler

// Max is the maximum number of layer2 routines that will be spawned.
// If Max is set to 0, then there is no limit.
Max uint

wScale *DurationScaler
}

Expand All @@ -51,7 +64,7 @@ var ErrFnRequired = fmt.Errorf("nil InterceptFunc, Fn is required")
// returns the output channel where the resulting data from the Fn function
// will be sent.
//
//nolint:funlen // This really can't be broken up any further
//nolint:funlen,gocognit // This really can't be broken up any further
func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
ctx = _ctx(ctx)

Expand All @@ -72,13 +85,13 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
// because the caller did not specify a wait time. This means Scaler will
// likely always scale up rather than waiting for an existing layer2 routine
// to pick up data.
if s.Wait <= 0 {
s.Wait = time.Nanosecond
if s.Wait <= MinWait {
s.Wait = MinWait
}

// Minimum life of a spawned layer2 should be 1ms
if s.Life < time.Microsecond {
s.Life = time.Microsecond
if s.Life < MinLife {
s.Life = MinLife
}

go func() {
Expand All @@ -99,7 +112,16 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
ticker := time.NewTicker(s.Wait)
defer ticker.Stop()
step := 0
var stepMu sync.RWMutex
stepMu := sync.RWMutex{}

var max chan struct{}

if s.Max > 0 {
max = make(chan struct{}, s.Max)
for i := uint(0); i < s.Max; i++ {
max <- struct{}{}
}
}

scaleLoop:
for {
Expand All @@ -117,6 +139,17 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
case <-ctx.Done():
return
case <-ticker.C:
if max != nil {
select {
case <-ctx.Done():
return
case <-max: // start a new layer2 routine
default:
// wait for a layer2 routine to finish
continue l2loop
}
}

wgMu.Lock()
wg.Add(1)
wgMu.Unlock()
Expand All @@ -129,6 +162,16 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {

go func() {
defer wg.Done()

if s.Max > 0 {
defer func() {
select {
case <-ctx.Done():
case max <- struct{}{}:
}
}()
}

if !s.WaitModifier.inactive() {
defer func() {
stepMu.Lock()
Expand All @@ -144,11 +187,16 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
}
}

stepMu.RLock()
stepN := 0
if !s.WaitModifier.inactive() {
stepMu.RLock()
stepN = step
stepMu.RUnlock()
}

// Reset the ticker so that it does not immediately trip the
// case statement on loop.
ticker.Reset(s.wScale.scaledDuration(s.Wait, step))
stepMu.RUnlock()
ticker.Reset(s.wScale.scaledDuration(s.Wait, stepN))
}
}
}()
Expand Down Expand Up @@ -261,6 +309,10 @@ func (t *DurationScaler) scaledDuration(
dur time.Duration,
currentInterval int,
) time.Duration {
if dur < MinWait {
dur = MinWait
}

if t.inactive() {
return dur
}
Expand All @@ -272,7 +324,12 @@ func (t *DurationScaler) scaledDuration(

if currentInterval%t.Interval == 0 {
t.lastInterval = currentInterval
return dur + time.Duration(float64(t.originalDuration)*mod)
out := dur + time.Duration(float64(t.originalDuration)*mod)
if out < MinWait {
return MinWait
}

return out
}

return dur
Expand Down
Loading