diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml deleted file mode 100644 index ece0e65..0000000 --- a/.github/workflows/golangci-lint.yml +++ /dev/null @@ -1,23 +0,0 @@ -name: golangci-lint - -on: [push, pull_request] - -jobs: - golangci: - name: lint - runs-on: ubuntu-latest - strategy: - matrix: - go-version: [1.19.x] - steps: - - name: Checkout code - uses: actions/checkout@v3 - - name: Install Go - uses: actions/setup-go@v4 - with: - go-version: ${{ matrix.go-version }} - stable: false - - name: golangci-lint - uses: golangci/golangci-lint-action@v3 - with: - skip-go-installation: true diff --git a/scaler.go b/scaler.go index 70b7547..e34cf68 100644 --- a/scaler.go +++ b/scaler.go @@ -7,6 +7,15 @@ import ( "time" ) +// MinWait is the absolute minimum wait time for the ticker. This is used to +// prevent the ticker from firing too often and causing too small of a wait +// time. +const MinWait = time.Millisecond + +// MinLife is the minimum life time for the scaler. This is used to prevent +// the scaler from exiting too quickly, and causing too small of a lifetime. +const MinLife = time.Millisecond + // Scaler implements generic auto-scaling logic which starts with a net-zero // set of processing routines (with the exception of the channel listener) and // then scales up and down based on the CPU contention of a system and the speed @@ -42,6 +51,10 @@ type Scaler[T, U any] struct { // that are CPU bound and need to scale up more/less quickly. WaitModifier DurationScaler + // Max is the maximum number of layer2 routines that will be spawned. + // If Max is set to 0, then there is no limit. + Max uint + wScale *DurationScaler } @@ -51,7 +64,7 @@ var ErrFnRequired = fmt.Errorf("nil InterceptFunc, Fn is required") // returns the output channel where the resulting data from the Fn function // will be sent. // -//nolint:funlen // This really can't be broken up any further +//nolint:funlen,gocognit // This really can't be broken up any further func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) { ctx = _ctx(ctx) @@ -72,13 +85,13 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) { // because the caller did not specify a wait time. This means Scaler will // likely always scale up rather than waiting for an existing layer2 routine // to pick up data. - if s.Wait <= 0 { - s.Wait = time.Nanosecond + if s.Wait <= MinWait { + s.Wait = MinWait } // Minimum life of a spawned layer2 should be 1ms - if s.Life < time.Microsecond { - s.Life = time.Microsecond + if s.Life < MinLife { + s.Life = MinLife } go func() { @@ -99,7 +112,16 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) { ticker := time.NewTicker(s.Wait) defer ticker.Stop() step := 0 - var stepMu sync.RWMutex + stepMu := sync.RWMutex{} + + var max chan struct{} + + if s.Max > 0 { + max = make(chan struct{}, s.Max) + for i := uint(0); i < s.Max; i++ { + max <- struct{}{} + } + } scaleLoop: for { @@ -117,6 +139,17 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) { case <-ctx.Done(): return case <-ticker.C: + if max != nil { + select { + case <-ctx.Done(): + return + case <-max: // start a new layer2 routine + default: + // wait for a layer2 routine to finish + continue l2loop + } + } + wgMu.Lock() wg.Add(1) wgMu.Unlock() @@ -129,6 +162,16 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) { go func() { defer wg.Done() + + if s.Max > 0 { + defer func() { + select { + case <-ctx.Done(): + case max <- struct{}{}: + } + }() + } + if !s.WaitModifier.inactive() { defer func() { stepMu.Lock() @@ -144,11 +187,16 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) { } } - stepMu.RLock() + stepN := 0 + if !s.WaitModifier.inactive() { + stepMu.RLock() + stepN = step + stepMu.RUnlock() + } + // Reset the ticker so that it does not immediately trip the // case statement on loop. - ticker.Reset(s.wScale.scaledDuration(s.Wait, step)) - stepMu.RUnlock() + ticker.Reset(s.wScale.scaledDuration(s.Wait, stepN)) } } }() @@ -261,6 +309,10 @@ func (t *DurationScaler) scaledDuration( dur time.Duration, currentInterval int, ) time.Duration { + if dur < MinWait { + dur = MinWait + } + if t.inactive() { return dur } @@ -272,7 +324,12 @@ func (t *DurationScaler) scaledDuration( if currentInterval%t.Interval == 0 { t.lastInterval = currentInterval - return dur + time.Duration(float64(t.originalDuration)*mod) + out := dur + time.Duration(float64(t.originalDuration)*mod) + if out < MinWait { + return MinWait + } + + return out } return dur diff --git a/scaler_test.go b/scaler_test.go index bc0bfed..28873e3 100644 --- a/scaler_test.go +++ b/scaler_test.go @@ -2,6 +2,8 @@ package stream import ( "context" + "fmt" + "sync" "testing" "time" @@ -377,6 +379,30 @@ func TestTickDur(t *testing.T) { currentStep: 2, expected: 10 * time.Second, }, + { + name: "Test case 7: testing below minwait", + tick: DurationScaler{ + Interval: 1, + ScalingFactor: -0.999, + originalDuration: time.Millisecond * 2, + lastInterval: 0, + }, + duration: MinWait, + currentStep: 1, + expected: MinWait, + }, + { + name: "Test case 8: testing below minwait", + tick: DurationScaler{ + Interval: 1, + ScalingFactor: -0.999, + originalDuration: time.Millisecond * 900, + lastInterval: 0, + }, + duration: MinWait, + currentStep: 1, + expected: MinWait, + }, } for _, tc := range testCases { @@ -391,7 +417,200 @@ func TestTickDur(t *testing.T) { func FuzzTick(f *testing.F) { f.Fuzz(func(t *testing.T, step, cStep int, mod float64, orig, dur int64) { - tick := &DurationScaler{Interval: step, ScalingFactor: mod, originalDuration: time.Duration(orig)} - _ = tick.scaledDuration(time.Duration(dur), cStep) + tick := &DurationScaler{ + Interval: step, + ScalingFactor: mod, + originalDuration: time.Duration(orig), + } + + v := tick.scaledDuration(time.Duration(dur), cStep) + if v < 0 { + t.Fatalf("negative duration: %v", v) + } + }) +} + +func FuzzScaler(f *testing.F) { + interceptFunc := func(ctx context.Context, t int) (string, bool) { + return fmt.Sprintf("%d", t), true + } + + f.Fuzz(func( + t *testing.T, + wait, life int64, + step, cStep int, + mod float64, + max uint, + in int, + ) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tick := DurationScaler{ + Interval: step, + ScalingFactor: mod, + } + + // Initialize Scaler + scaler := Scaler[int, string]{ + Wait: time.Millisecond * time.Duration(wait), + Life: time.Millisecond * time.Duration(life), + Fn: interceptFunc, + WaitModifier: tick, + Max: max, + } + + // Create a simple input channel + input := make(chan int, 1) + defer close(input) + + // Execute the Scaler + out, err := scaler.Exec(ctx, input) + if err != nil { + t.Errorf("Scaler Exec failed: %v", err) + t.Fail() + } + + // Send input value and check output + input <- in + + select { + case <-ctx.Done(): + t.Errorf("Scaler Exec timed out") + t.Fail() + case res := <-out: + if res != fmt.Sprintf("%d", in) { + t.Errorf("Scaler Exec failed: expected %d, got %s", in, res) + t.Fail() + } + + t.Logf("Scaler Exec succeeded: expected %d, got %s", in, res) + } }) } + +func Test_Scaler_Max(t *testing.T) { + tests := map[string]struct { + max uint + send int + expected int + }{ + "max 0": { + max: 0, + send: 1000, + expected: 1000, + }, + "max 1": { + max: 1, + send: 10, + expected: 10, + }, + "max 2": { + max: 2, + send: 10, + expected: 10, + }, + "max 3": { + max: 3, + send: 10, + expected: 10, + }, + "max 4": { + max: 4, + send: 100, + expected: 100, + }, + "max 1000": { + max: 1000, + send: 10000, + expected: 10000, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + inited := 0 + initedMu := sync.Mutex{} + release := make(chan struct{}) + + interceptFunc := func(ctx context.Context, t int) (int, bool) { + defer func() { + initedMu.Lock() + defer initedMu.Unlock() + inited-- + }() + + initedMu.Lock() + inited++ + initedMu.Unlock() + + <-release + + return t, true + } + + // Initialize Scaler + scaler := Scaler[int, int]{ + Wait: time.Millisecond, + Life: time.Millisecond, + Fn: interceptFunc, + Max: test.max, + } + + // Create a simple input channel + input := make(chan int, test.send) + defer close(input) + + for i := 0; i < test.send; i++ { + input <- i + } + + // Execute the Scaler + out, err := scaler.Exec(ctx, input) + if err != nil { + t.Errorf("Scaler Exec failed: %v", err) + t.Fail() + } + + recv := 1 + + tloop: + for { + select { + case <-ctx.Done(): + t.Errorf("Scaler Exec timed out") + case _, ok := <-out: + if !ok { + break tloop + } + + recv++ + t.Logf("received %d", recv) + if recv >= test.expected { + break tloop + } + default: + time.Sleep(time.Millisecond) + + initedMu.Lock() + if test.max > 0 && inited > int(test.max) { + t.Errorf("Scaler Exec failed: expected %d, got %d", test.max, inited) + t.Fail() + } + initedMu.Unlock() + + // Release one goroutine + release <- struct{}{} + } + } + + if recv != test.expected { + t.Errorf("Scaler Exec failed: expected %d, got %d", test.expected, recv) + t.Fail() + } + }) + } +} diff --git a/testdata/fuzz/FuzzTick/db1d459b216861c8 b/testdata/fuzz/FuzzTick/db1d459b216861c8 new file mode 100644 index 0000000..9da868e --- /dev/null +++ b/testdata/fuzz/FuzzTick/db1d459b216861c8 @@ -0,0 +1,6 @@ +go test fuzz v1 +int(-84) +int(0) +float64(-0.625) +int64(0) +int64(-94) diff --git a/testdata/fuzz/FuzzTick/eaae912ff48d75e8 b/testdata/fuzz/FuzzTick/eaae912ff48d75e8 new file mode 100644 index 0000000..c07be3d --- /dev/null +++ b/testdata/fuzz/FuzzTick/eaae912ff48d75e8 @@ -0,0 +1,6 @@ +go test fuzz v1 +int(0) +int(0) +float64(0) +int64(0) +int64(-90) diff --git a/testdata/fuzz/Fuzz_Scaler/555659aba42d18b7 b/testdata/fuzz/Fuzz_Scaler/555659aba42d18b7 new file mode 100644 index 0000000..8bdd246 --- /dev/null +++ b/testdata/fuzz/Fuzz_Scaler/555659aba42d18b7 @@ -0,0 +1,8 @@ +go test fuzz v1 +int64(0) +int64(0) +int(0) +int(29) +float64(0) +uint(36) +int(0) diff --git a/testdata/fuzz/Fuzz_Scaler/62629b316805e69d b/testdata/fuzz/Fuzz_Scaler/62629b316805e69d new file mode 100644 index 0000000..0f2cb3a --- /dev/null +++ b/testdata/fuzz/Fuzz_Scaler/62629b316805e69d @@ -0,0 +1,8 @@ +go test fuzz v1 +int64(-62) +int64(-78) +int(54) +int(103) +float64(-0.16666666666666666) +uint(33) +int(76) diff --git a/testdata/fuzz/Fuzz_Scaler/b9bef74a2c85cbf3 b/testdata/fuzz/Fuzz_Scaler/b9bef74a2c85cbf3 new file mode 100644 index 0000000..af8293a --- /dev/null +++ b/testdata/fuzz/Fuzz_Scaler/b9bef74a2c85cbf3 @@ -0,0 +1,8 @@ +go test fuzz v1 +int64(-158) +int64(-95) +int(54) +int(19) +float64(-0.16666666666666666) +uint(33) +int(76)