Skip to content

Commit c9e353b

Browse files
prometherionoktalz
authored andcommitted
FEATURE/MEDIUM: transaction: rate limiting on max amount of open transactions
1 parent e48bc52 commit c9e353b

File tree

9 files changed

+275
-2
lines changed

9 files changed

+275
-2
lines changed

configuration/configuration.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type HAProxyConfiguration struct {
5858
UpdateMapFilesPeriod int64 `long:"update-map-files-period" description:"Elapsed time in seconds between two maps syncing operations" default:"10"`
5959
ClusterTLSCertDir string `long:"cluster-tls-dir" description:"Path where cluster tls certificates will be stored. Defaults to same directory as dataplane configuration file"`
6060
MasterWorkerMode bool `long:"master-worker-mode" description:"Flag to enable helpers when running within HAProxy"`
61+
MaxOpenTransactions int64 `long:"max-open-transactions" description:"Limit for active transaction in pending state" default:"20"`
6162
}
6263

6364
type APIConfiguration struct {

configure_data_plane.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import (
5454
dataplaneapi_config "github.com/haproxytech/dataplaneapi/configuration"
5555
"github.com/haproxytech/dataplaneapi/handlers"
5656
"github.com/haproxytech/dataplaneapi/haproxy"
57+
"github.com/haproxytech/dataplaneapi/rate"
5758

5859
"github.com/go-openapi/errors"
5960
"github.com/go-openapi/runtime"
@@ -102,6 +103,15 @@ func configureFlags(api *operations.DataPlaneAPI) {
102103
api.CommandLineOptionsGroups = append(api.CommandLineOptionsGroups, apiOptionsGroup)
103104
}
104105

106+
func currentOpenTransactions(client *client_native.HAProxyClient) int {
107+
ts, err := client.Configuration.GetTransactions("in_progress")
108+
if err != nil {
109+
log.Errorf("Cannot retrieve current open transactions for rate limit, default to zero (%s)", err.Error())
110+
return 0
111+
}
112+
return len(*ts)
113+
}
114+
105115
func configureAPI(api *operations.DataPlaneAPI) http.Handler {
106116
cfg := dataplaneapi_config.Get()
107117

@@ -261,6 +271,23 @@ func configureAPI(api *operations.DataPlaneAPI) http.Handler {
261271
api.TransactionsGetTransactionHandler = &handlers.GetTransactionHandlerImpl{Client: client}
262272
api.TransactionsGetTransactionsHandler = &handlers.GetTransactionsHandlerImpl{Client: client}
263273
api.TransactionsCommitTransactionHandler = &handlers.CommitTransactionHandlerImpl{Client: client, ReloadAgent: ra}
274+
if cfg.HAProxy.MaxOpenTransactions > 0 {
275+
// creating the threshold limit using the CLI flag as hard quota and current open transactions as starting point
276+
transactionLimiter := rate.NewThresholdLimit(uint64(cfg.HAProxy.MaxOpenTransactions), uint64(currentOpenTransactions(client)))
277+
278+
api.TransactionsStartTransactionHandler = &handlers.RateLimitedStartTransactionHandlerImpl{
279+
TransactionCounter: transactionLimiter,
280+
Handler: api.TransactionsStartTransactionHandler,
281+
}
282+
api.TransactionsDeleteTransactionHandler = &handlers.RateLimitedDeleteTransactionHandlerImpl{
283+
TransactionCounter: transactionLimiter,
284+
Handler: api.TransactionsDeleteTransactionHandler,
285+
}
286+
api.TransactionsCommitTransactionHandler = &handlers.RateLimitedCommitTransactionHandlerImpl{
287+
TransactionCounter: transactionLimiter,
288+
Handler: api.TransactionsCommitTransactionHandler,
289+
}
290+
}
264291

265292
// setup sites handlers
266293
api.SitesCreateSiteHandler = &handlers.CreateSiteHandlerImpl{Client: client, ReloadAgent: ra}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ require (
2727
github.com/rs/cors v1.7.0
2828
github.com/shirou/gopsutil v2.20.3+incompatible
2929
github.com/sirupsen/logrus v1.5.0
30+
github.com/stretchr/testify v1.6.1
3031
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0
3132
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f
3233
gopkg.in/yaml.v2 v2.3.0

handlers/transaction.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,26 @@ import (
2121
"github.com/haproxytech/dataplaneapi/haproxy"
2222
"github.com/haproxytech/dataplaneapi/misc"
2323
"github.com/haproxytech/dataplaneapi/operations/transactions"
24+
"github.com/haproxytech/dataplaneapi/rate"
2425
)
2526

27+
// RateLimitedStartTransactionHandlerImpl decorates StartTransactionHandlerImpl with the rate limiting logic
28+
type RateLimitedStartTransactionHandlerImpl struct {
29+
TransactionCounter rate.Threshold
30+
Handler transactions.StartTransactionHandler
31+
}
32+
2633
//StartTransactionHandlerImpl implementation of the StartTransactionHandler interface using client-native client
2734
type StartTransactionHandlerImpl struct {
2835
Client *client_native.HAProxyClient
2936
}
3037

38+
// RateLimitedDeleteTransactionHandlerImpl decorates the DeleteTransactionHandlerImpl with the rate limiting logic
39+
type RateLimitedDeleteTransactionHandlerImpl struct {
40+
TransactionCounter rate.Threshold
41+
Handler transactions.DeleteTransactionHandler
42+
}
43+
3144
//DeleteTransactionHandlerImpl implementation of the DeleteTransactionHandler interface using client-native client
3245
type DeleteTransactionHandlerImpl struct {
3346
Client *client_native.HAProxyClient
@@ -43,6 +56,12 @@ type GetTransactionsHandlerImpl struct {
4356
Client *client_native.HAProxyClient
4457
}
4558

59+
// RateLimitedCommitTransactionHandlerImpl decorates the CommitTransactionHandlerImpl with the rate limiting logic
60+
type RateLimitedCommitTransactionHandlerImpl struct {
61+
TransactionCounter rate.Threshold
62+
Handler transactions.CommitTransactionHandler
63+
}
64+
4665
//CommitTransactionHandlerImpl implementation of the CommitTransactionHandlerImpl interface using client-native client
4766
type CommitTransactionHandlerImpl struct {
4867
Client *client_native.HAProxyClient
@@ -111,3 +130,36 @@ func (th *CommitTransactionHandlerImpl) Handle(params transactions.CommitTransac
111130
rID := th.ReloadAgent.Reload()
112131
return transactions.NewCommitTransactionAccepted().WithReloadID(rID).WithPayload(t)
113132
}
133+
134+
// Handle executes the decorated Handler and, in case of successful deletion, decrease the counter
135+
func (r RateLimitedDeleteTransactionHandlerImpl) Handle(params transactions.DeleteTransactionParams, principal interface{}) middleware.Responder {
136+
res := r.Handler.Handle(params, principal)
137+
if _, ok := res.(*transactions.DeleteTransactionNoContent); ok {
138+
r.TransactionCounter.Decrease()
139+
}
140+
return res
141+
}
142+
143+
// Handle executes the decorated Handler and, in case of successful creation, increase the counter if this is
144+
func (r RateLimitedStartTransactionHandlerImpl) Handle(params transactions.StartTransactionParams, principal interface{}) middleware.Responder {
145+
if err := r.TransactionCounter.LimitReached(); err != nil {
146+
e := misc.HandleError(err)
147+
return transactions.NewStartTransactionDefault(int(*e.Code)).WithPayload(e)
148+
}
149+
res := r.Handler.Handle(params, principal)
150+
if _, ok := res.(*transactions.StartTransactionCreated); ok {
151+
r.TransactionCounter.Increase()
152+
}
153+
return res
154+
}
155+
156+
func (r RateLimitedCommitTransactionHandlerImpl) Handle(params transactions.CommitTransactionParams, principal interface{}) middleware.Responder {
157+
res := r.Handler.Handle(params, principal)
158+
switch res.(type) {
159+
case *transactions.CommitTransactionOK:
160+
r.TransactionCounter.Decrease()
161+
case *transactions.CommitTransactionAccepted:
162+
r.TransactionCounter.Decrease()
163+
}
164+
return res
165+
}

misc/misc.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ import (
2222
"strconv"
2323
"strings"
2424

25-
"github.com/haproxytech/dataplaneapi/haproxy"
26-
2725
"github.com/haproxytech/client-native/v2/configuration"
2826
client_errors "github.com/haproxytech/client-native/v2/errors"
2927
"github.com/haproxytech/models/v2"
28+
29+
"github.com/haproxytech/dataplaneapi/haproxy"
30+
"github.com/haproxytech/dataplaneapi/rate"
3031
)
3132

3233
const (
@@ -38,6 +39,8 @@ const (
3839
ErrHTTPInternalServerError = int64(500)
3940
// ErrHTTPBadRequest HTTP status code 400
4041
ErrHTTPBadRequest = int64(400)
42+
// ErrHTTPRateLimit HTTP status code 429
43+
ErrHTTPRateLimit = int64(429)
4144
)
4245

4346
// HandleError translates error codes from client native into models.Error with appropriate http status code
@@ -61,6 +64,10 @@ func HandleError(err error) *models.Error {
6164
httpCode := ErrHTTPBadRequest
6265
msg := t.Error()
6366
return &models.Error{Code: &httpCode, Message: &msg}
67+
case *rate.TransactionLimitReachedErr:
68+
httpCode := ErrHTTPRateLimit
69+
msg := t.Error()
70+
return &models.Error{Code: &httpCode, Message: &msg}
6471
default:
6572
msg := t.Error()
6673
code := ErrHTTPInternalServerError

rate/error.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2019 HAProxy Technologies
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package rate
16+
17+
import "fmt"
18+
19+
func NewTransactionLimitReachedError(limit uint64) error {
20+
return &TransactionLimitReachedErr{limit: limit}
21+
}
22+
23+
type TransactionLimitReachedErr struct {
24+
limit uint64
25+
}
26+
27+
func (l TransactionLimitReachedErr) Error() string {
28+
return fmt.Sprintf("cannot start a new transaction, reached the maximum amount of %d active transactions available", l.limit)
29+
}

rate/limiter.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright 2019 HAProxy Technologies
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package rate
16+
17+
type Threshold interface {
18+
LimitReached() error
19+
Increase()
20+
Decrease()
21+
}

rate/threshold_limit.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright 2019 HAProxy Technologies
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package rate
16+
17+
import (
18+
"sync/atomic"
19+
)
20+
21+
type thresholdLimit struct {
22+
limit *uint64
23+
actual *uint64
24+
}
25+
26+
func (t *thresholdLimit) LimitReached() (err error) {
27+
if *t.actual >= *t.limit {
28+
err = NewTransactionLimitReachedError(*t.limit)
29+
}
30+
return
31+
}
32+
33+
func NewThresholdLimit(limit uint64, startingFrom uint64) Threshold {
34+
return &thresholdLimit{
35+
actual: &startingFrom,
36+
limit: &limit,
37+
}
38+
}
39+
40+
func (t *thresholdLimit) Increase() {
41+
atomic.AddUint64(t.actual, 1)
42+
}
43+
44+
func (t thresholdLimit) Decrease() {
45+
atomic.AddUint64(t.actual, ^uint64(0))
46+
}

rate/threshold_limit_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Copyright 2019 HAProxy Technologies
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package rate
16+
17+
import (
18+
"fmt"
19+
"testing"
20+
21+
"github.com/stretchr/testify/assert"
22+
)
23+
24+
func TestNewThresholdLimit(t *testing.T) {
25+
type tc struct {
26+
limit uint64
27+
actual uint64
28+
}
29+
for name, tc := range map[string]tc{
30+
"from zero": {10, 0},
31+
"reached": {5, 5},
32+
} {
33+
t.Run(name, func(t *testing.T) {
34+
l := NewThresholdLimit(tc.limit, tc.actual).(*thresholdLimit)
35+
assert.Equal(t, *l.limit, tc.limit)
36+
assert.Equal(t, *l.actual, tc.actual)
37+
})
38+
}
39+
}
40+
41+
func Test_thresholdLimit_Decrease(t *testing.T) {
42+
for _, tc := range []uint64{5, 10} {
43+
t.Run(fmt.Sprintf("%d", tc), func(t *testing.T) {
44+
l := thresholdLimit{
45+
limit: func(v uint64) *uint64 {
46+
return &v
47+
}(tc),
48+
actual: func(v uint64) *uint64 {
49+
return &v
50+
}(tc),
51+
}
52+
var counter uint64
53+
for *l.actual > 0 {
54+
l.Decrease()
55+
counter++
56+
}
57+
assert.Equal(t, tc, counter)
58+
})
59+
}
60+
}
61+
62+
func Test_thresholdLimit_Increase(t *testing.T) {
63+
t.Run("success", func(t *testing.T) {
64+
l := thresholdLimit{
65+
limit: func(v uint64) *uint64 {
66+
return &v
67+
}(10),
68+
actual: func(v uint64) *uint64 {
69+
return &v
70+
}(10),
71+
}
72+
var counter int
73+
for *l.actual < *l.limit {
74+
l.Increase()
75+
counter++
76+
}
77+
})
78+
t.Run("failure", func(t *testing.T) {
79+
l := thresholdLimit{
80+
limit: func(v uint64) *uint64 {
81+
return &v
82+
}(10),
83+
actual: func(v uint64) *uint64 {
84+
return &v
85+
}(10),
86+
}
87+
assert.NotNil(t, l.LimitReached())
88+
})
89+
}

0 commit comments

Comments
 (0)