Skip to content

Commit 0994c23

Browse files
committed
Add balancer aggregator
1 parent 82799c7 commit 0994c23

File tree

4 files changed

+207
-163
lines changed

4 files changed

+207
-163
lines changed

examples/features/customloadbalancer/client/customroundrobin/customroundrobin.go

Lines changed: 39 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,10 @@ import (
2323
"fmt"
2424
"sync/atomic"
2525

26-
"google.golang.org/grpc"
2726
"google.golang.org/grpc/balancer"
2827
"google.golang.org/grpc/connectivity"
2928
"google.golang.org/grpc/grpclog"
30-
"google.golang.org/grpc/resolver"
29+
"google.golang.org/grpc/internal/balancer/balanceraggregator"
3130
"google.golang.org/grpc/serviceconfig"
3231
)
3332

@@ -40,6 +39,11 @@ const customRRName = "custom_round_robin"
4039
type customRRConfig struct {
4140
serviceconfig.LoadBalancingConfig `json:"-"`
4241

42+
// ChildPolicy is the child policy of this balancer. This will be hardcoded
43+
// to a graceful switch config which wraps a pick first with no shuffling
44+
// enabled.
45+
ChildPolicy serviceconfig.LoadBalancingConfig `json:"childPolicy"`
46+
4347
// ChooseSecond represents how often pick iterations choose the second
4448
// SubConn in the list. Defaults to 3. If 0 never choose the second SubConn.
4549
ChooseSecond uint32 `json:"chooseSecond,omitempty"`
@@ -48,9 +52,17 @@ type customRRConfig struct {
4852
type customRoundRobinBuilder struct{}
4953

5054
func (customRoundRobinBuilder) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
55+
// Hardcode a pick first with no shuffling, since this is a petiole, and
56+
// that is what petiole policies will interact with.
57+
gspf, err := balanceraggregator.ParseConfig(json.RawMessage(balanceraggregator.PickFirstConfig))
58+
if err != nil {
59+
return nil, fmt.Errorf("error parsing hardcoded pick_first config: %v", err)
60+
}
5161
lbConfig := &customRRConfig{
5262
ChooseSecond: 3,
63+
ChildPolicy: gspf,
5364
}
65+
5466
if err := json.Unmarshal(s, lbConfig); err != nil {
5567
return nil, fmt.Errorf("custom-round-robin: unable to unmarshal customRRConfig: %v", err)
5668
}
@@ -62,16 +74,12 @@ func (customRoundRobinBuilder) Name() string {
6274
}
6375

6476
func (customRoundRobinBuilder) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
65-
pfBuilder := balancer.Get(grpc.PickFirstBalancerName)
66-
if pfBuilder == nil {
67-
return nil
68-
}
69-
return &customRoundRobin{
70-
cc: cc,
71-
bOpts: bOpts,
72-
pfs: resolver.NewEndpointMap(),
73-
pickFirstBuilder: pfBuilder,
77+
crr := &customRoundRobin{
78+
ClientConn: cc,
79+
bOpts: bOpts,
7480
}
81+
crr.balancerAggregator = balanceraggregator.Build(crr, bOpts)
82+
return crr
7583
}
7684

7785
var logger = grpclog.Component("example")
@@ -83,14 +91,12 @@ type customRoundRobin struct {
8391
// balancer.Balancer calls as well, and children are called one at a time),
8492
// in which calls are guaranteed to come synchronously. Thus, no extra
8593
// synchronization is required in this balancer.
86-
cc balancer.ClientConn
94+
balancer.ClientConn
8795
bOpts balancer.BuildOptions
88-
// Note that this balancer is a petiole policy which wraps pick first (see
89-
// gRFC A61). This is the intended way a user written custom lb should be
90-
// specified, as pick first will contain a lot of useful functionality, such
91-
// as Sticky Transient Failure, Happy Eyeballs, and Health Checking.
92-
pickFirstBuilder balancer.Builder
93-
pfs *resolver.EndpointMap
96+
97+
// hardcoded child config, graceful switch that wraps a pick_first with no
98+
// shuffling enabled.
99+
balancerAggregator balancer.Balancer
94100

95101
cfg *customRRConfig
96102

@@ -105,59 +111,14 @@ func (crr *customRoundRobin) UpdateClientConnState(state balancer.ClientConnStat
105111
return balancer.ErrBadResolverState
106112
}
107113
crr.cfg = crrCfg
108-
109-
endpointSet := resolver.NewEndpointMap()
110-
crr.inhibitPickerUpdates = true
111-
for _, endpoint := range state.ResolverState.Endpoints {
112-
endpointSet.Set(endpoint, nil)
113-
var pickFirst *balancerWrapper
114-
if pf, ok := crr.pfs.Get(endpoint); ok {
115-
pickFirst = pf.(*balancerWrapper)
116-
} else {
117-
pickFirst = &balancerWrapper{
118-
ClientConn: crr.cc,
119-
crr: crr,
120-
}
121-
pfb := crr.pickFirstBuilder.Build(pickFirst, crr.bOpts)
122-
pickFirst.Balancer = pfb
123-
crr.pfs.Set(endpoint, pickFirst)
124-
}
125-
// Update child uncondtionally, in case attributes or address ordering
126-
// changed. Let pick first deal with any potential diffs, too
127-
// complicated to only update if we know something changed.
128-
pickFirst.UpdateClientConnState(balancer.ClientConnState{
129-
ResolverState: resolver.State{
130-
Endpoints: []resolver.Endpoint{endpoint},
131-
Attributes: state.ResolverState.Attributes,
132-
},
133-
// no service config, never needed to turn on address list shuffling
134-
// bool in petiole policies.
135-
})
136-
// Ignore error because just care about ready children.
137-
}
138-
for _, e := range crr.pfs.Keys() {
139-
ep, _ := crr.pfs.Get(e)
140-
pickFirst := ep.(balancer.Balancer)
141-
// pick first was removed by resolver (unique endpoint logically
142-
// corresponding to pick first child was removed).
143-
if _, ok := endpointSet.Get(e); !ok {
144-
pickFirst.Close()
145-
crr.pfs.Delete(e)
146-
}
147-
}
148-
crr.inhibitPickerUpdates = false
149-
crr.regeneratePicker() // one synchronous picker update per UpdateClientConnState operation.
150-
return nil
114+
return crr.balancerAggregator.UpdateClientConnState(balancer.ClientConnState{
115+
BalancerConfig: crrCfg.ChildPolicy,
116+
ResolverState: state.ResolverState,
117+
})
151118
}
152119

153120
func (crr *customRoundRobin) ResolverError(err error) {
154-
crr.inhibitPickerUpdates = true
155-
for _, pf := range crr.pfs.Values() {
156-
pickFirst := pf.(*balancerWrapper)
157-
pickFirst.ResolverError(err)
158-
}
159-
crr.inhibitPickerUpdates = false
160-
crr.regeneratePicker()
121+
crr.balancerAggregator.ResolverError(err)
161122
}
162123

163124
// This function is deprecated. SubConn state updates now come through listener
@@ -168,32 +129,24 @@ func (crr *customRoundRobin) UpdateSubConnState(sc balancer.SubConn, state balan
168129
}
169130

170131
func (crr *customRoundRobin) Close() {
171-
for _, pf := range crr.pfs.Values() {
172-
pickFirst := pf.(balancer.Balancer)
173-
pickFirst.Close()
174-
}
132+
crr.balancerAggregator.Close()
175133
}
176134

177-
// regeneratePicker generates a picker based off persisted child balancer state
178-
// and forwards it upward. This is intended to be fully executed once per
179-
// relevant balancer.Balancer operation into custom round robin balancer.
180-
func (crr *customRoundRobin) regeneratePicker() {
181-
if crr.inhibitPickerUpdates {
182-
return
183-
}
184-
135+
// regeneratePicker generates a picker if both child balancers are READY and
136+
// forwards it upward.
137+
func (crr *customRoundRobin) UpdateChildState(childStates []balanceraggregator.ChildState) {
185138
var readyPickers []balancer.Picker
186-
for _, bw := range crr.pfs.Values() {
187-
pickFirst := bw.(*balancerWrapper)
188-
if pickFirst.state.ConnectivityState == connectivity.Ready {
189-
readyPickers = append(readyPickers, pickFirst.state.Picker)
139+
for _, childState := range childStates {
140+
if childState.State.ConnectivityState == connectivity.Ready {
141+
readyPickers = append(readyPickers, childState.State.Picker)
190142
}
191143
}
192144

193145
// For determinism, this balancer only updates it's picker when both
194146
// backends of the example are ready. Thus, no need to keep track of
195147
// aggregated state and can simply specify this balancer is READY once it
196-
// has two ready children.
148+
// has two ready children. Other balancers can keep track of aggregated
149+
// state and interact with errors as part of picker.
197150
if len(readyPickers) != 2 {
198151
return
199152
}
@@ -202,35 +155,12 @@ func (crr *customRoundRobin) regeneratePicker() {
202155
chooseSecond: crr.cfg.ChooseSecond,
203156
next: 0,
204157
}
205-
crr.cc.UpdateState(balancer.State{
158+
crr.ClientConn.UpdateState(balancer.State{
206159
ConnectivityState: connectivity.Ready,
207160
Picker: picker,
208161
})
209162
}
210163

211-
type balancerWrapper struct {
212-
balancer.Balancer // Simply forward balancer.Balancer operations
213-
balancer.ClientConn // embed to intercept UpdateState, doesn't deal with SubConns
214-
215-
crr *customRoundRobin
216-
217-
state balancer.State
218-
}
219-
220-
// Picker updates from pick first are all triggered by synchronous calls down
221-
// into balancer.Balancer (client conn state updates, resolver errors, subconn
222-
// state updates (through listener callbacks, which is still treated as part of
223-
// balancer API)).
224-
func (bw *balancerWrapper) UpdateState(state balancer.State) {
225-
bw.state = state
226-
// Calls back into this inline will be inhibited when part of
227-
// UpdateClientConnState() and ResolverError(), and regenerate picker will
228-
// be called manually at the end of those operations. However, for
229-
// UpdateSubConnState() and subsequent UpdateState(), this needs to update
230-
// picker, so call this regeneratePicker() here.
231-
bw.crr.regeneratePicker()
232-
}
233-
234164
type customRoundRobinPicker struct {
235165
pickers []balancer.Picker
236166
chooseSecond uint32
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
*
3+
* Copyright 2024 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
// Package balanceraggregator implements a BalancerAggregator helper.
20+
package balanceraggregator
21+
22+
import (
23+
"encoding/json"
24+
"fmt"
25+
26+
"google.golang.org/grpc/balancer"
27+
"google.golang.org/grpc/internal/balancer/gracefulswitch"
28+
"google.golang.org/grpc/resolver"
29+
"google.golang.org/grpc/serviceconfig"
30+
)
31+
32+
// ChildState is the balancer state of the child along with the
33+
// endpoint which ID's the child balancer.
34+
type ChildState struct {
35+
Endpoint resolver.Endpoint
36+
State balancer.State
37+
}
38+
39+
// Parent is a balancer.ClientConn that can also receive
40+
// child state updates.
41+
type Parent interface {
42+
balancer.ClientConn
43+
UpdateChildState(childStates []ChildState)
44+
}
45+
46+
// Build returns a new BalancerAggregator.
47+
func Build(parent Parent, opts balancer.BuildOptions) *BalancerAggregator {
48+
return &BalancerAggregator{
49+
parent: parent,
50+
bOpts: opts,
51+
children: resolver.NewEndpointMap(),
52+
}
53+
}
54+
55+
// BalancerAggregator is a balancer that wraps child balancers. It creates a
56+
// child balancer with child config for every Endpoint received. It updates the
57+
// child states on any update from parent or child.
58+
type BalancerAggregator struct {
59+
parent Parent
60+
bOpts balancer.BuildOptions
61+
62+
children *resolver.EndpointMap
63+
64+
inhibitChildUpdates bool
65+
}
66+
67+
func (ba *BalancerAggregator) UpdateClientConnState(state balancer.ClientConnState) error {
68+
endpointSet := resolver.NewEndpointMap()
69+
ba.inhibitChildUpdates = true
70+
// Update/Create new children.
71+
for _, endpoint := range state.ResolverState.Endpoints {
72+
endpointSet.Set(endpoint, nil)
73+
var bal *balancerWrapper
74+
if child, ok := ba.children.Get(endpoint); ok {
75+
bal = child.(*balancerWrapper)
76+
} else {
77+
bal = &balancerWrapper{
78+
endpoint: endpoint,
79+
ClientConn: ba.parent,
80+
ba: ba,
81+
}
82+
bal.Balancer = gracefulswitch.NewBalancer(bal, ba.bOpts)
83+
ba.children.Set(endpoint, bal)
84+
}
85+
if err := bal.UpdateClientConnState(balancer.ClientConnState{
86+
BalancerConfig: state.BalancerConfig,
87+
ResolverState: resolver.State{
88+
Endpoints: []resolver.Endpoint{endpoint},
89+
Attributes: state.ResolverState.Attributes,
90+
},
91+
}); err != nil {
92+
return fmt.Errorf("error updating child balancer: %v", err)
93+
}
94+
}
95+
// Delete old children that are no longer present.
96+
for _, e := range ba.children.Keys() {
97+
child, _ := ba.children.Get(e)
98+
bal := child.(balancer.Balancer)
99+
if _, ok := endpointSet.Get(e); !ok {
100+
bal.Close()
101+
ba.children.Delete(e)
102+
}
103+
}
104+
ba.inhibitChildUpdates = false
105+
ba.updateChildStates()
106+
return nil
107+
}
108+
109+
func (ba *BalancerAggregator) ResolverError(err error) {
110+
ba.inhibitChildUpdates = true
111+
for _, child := range ba.children.Values() {
112+
bal := child.(balancer.Balancer)
113+
bal.ResolverError(err)
114+
}
115+
ba.inhibitChildUpdates = false
116+
ba.updateChildStates()
117+
}
118+
119+
func (ba *BalancerAggregator) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
120+
// UpdateSubConnState is deprecated.
121+
}
122+
123+
func (ba *BalancerAggregator) Close() {
124+
for _, child := range ba.children.Values() {
125+
bal := child.(balancer.Balancer)
126+
bal.Close()
127+
}
128+
}
129+
130+
func (ba *BalancerAggregator) updateChildStates() {
131+
if ba.inhibitChildUpdates {
132+
return
133+
}
134+
135+
childUpdates := make([]ChildState, ba.children.Len())
136+
for _, child := range ba.children.Values() {
137+
bw := child.(*balancerWrapper)
138+
childUpdates = append(childUpdates, ChildState{
139+
Endpoint: bw.endpoint,
140+
State: bw.state,
141+
})
142+
}
143+
ba.parent.UpdateChildState(childUpdates)
144+
}
145+
146+
// balancerWrapper is a wrapper of a balancer. It ID's a child balancer by
147+
// endpoint, and persists recent child balancer state.
148+
type balancerWrapper struct {
149+
balancer.Balancer // Simply forward balancer.Balancer operations.
150+
balancer.ClientConn // embed to intercept UpdateState, doesn't deal with SubConns
151+
152+
ba *BalancerAggregator
153+
154+
endpoint resolver.Endpoint
155+
state balancer.State
156+
}
157+
158+
func (bw *balancerWrapper) UpdateState(state balancer.State) {
159+
bw.state = state
160+
bw.ba.updateChildStates()
161+
}
162+
163+
func ParseConfig(cfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
164+
return gracefulswitch.ParseConfig(cfg)
165+
}
166+
167+
// PickFirstConfig is a pick first config without shuffling enabled.
168+
const PickFirstConfig = "[{\"pick_first\": {}}]"

0 commit comments

Comments
 (0)