Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,14 @@ func (b *pickfirstBalancer) closeSubConnsLocked() {

// deDupAddresses ensures that each address appears only once in the slice.
func deDupAddresses(addrs []resolver.Address) []resolver.Address {
seenAddrs := resolver.NewAddressMapV2[*scData]()
seenAddrs := resolver.NewAddressMapV2[bool]()
retAddrs := []resolver.Address{}

for _, addr := range addrs {
if _, ok := seenAddrs.Get(addr); ok {
continue
}
seenAddrs.Set(addr, true)
retAddrs = append(retAddrs, addr)
}
return retAddrs
Expand Down
1 change: 1 addition & 0 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,7 @@ func (s) TestPickFirstLeaf_InterleavingIPv6Preffered(t *testing.T) {
ResolverState: resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"}}},
{Addresses: []resolver.Address{{Addr: "[0001:0001:0001:0001:0001:0001:0001:0001]:8080"}}}, // duplicate, should be ignored.
{Addresses: []resolver.Address{{Addr: "1.1.1.1:1111"}}},
{Addresses: []resolver.Address{{Addr: "2.2.2.2:2"}}},
{Addresses: []resolver.Address{{Addr: "3.3.3.3:3"}}},
Expand Down
46 changes: 46 additions & 0 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ func (s) TestPickFirstLeaf_TFPickerUpdate(t *testing.T) {
ResolverState: resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}},
{Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}}, // duplicate, should be ignored.
{Addresses: []resolver.Address{{Addr: "2.2.2.2:2"}}},
{Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}}, // duplicate, should be ignored.
},
},
}
Expand All @@ -213,14 +215,35 @@ func (s) TestPickFirstLeaf_TFPickerUpdate(t *testing.T) {
// once.
tfErr := fmt.Errorf("test err: connection refused")
sc1 := <-cc.NewSubConnCh
select {
case <-sc1.ConnectCh:
case <-ctx.Done():
t.Fatal("Context timed out waiting for Connect() to be called on sc1.")
}
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure, ConnectionError: tfErr})

// Move the subconn back to IDLE, it should not be re-connected until the
// first pass is complete.
shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer shortCancel()
sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
select {
case <-sc1.ConnectCh:
t.Fatal("Connect() unexpectedly called on sc1.")
case <-shortCtx.Done():
}

if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatalf("cc.WaitForPickerWithErr(%v) returned error: %v", balancer.ErrNoSubConnAvailable, err)
}

sc2 := <-cc.NewSubConnCh
select {
case <-sc2.ConnectCh:
case <-ctx.Done():
t.Fatal("Context timed out waiting for Connect() to be called on sc2.")
}
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure, ConnectionError: tfErr})

Expand All @@ -230,6 +253,29 @@ func (s) TestPickFirstLeaf_TFPickerUpdate(t *testing.T) {

// Subsequent TRANSIENT_FAILUREs should be reported only after seeing "# of SubConns"
// TRANSIENT_FAILUREs.

// Both the subconns should be connected in parallel.
select {
case <-sc1.ConnectCh:
case <-ctx.Done():
t.Fatal("Context timed out waiting for Connect() to be called on sc1.")
}

shortCtx, shortCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
defer shortCancel()
select {
case <-sc2.ConnectCh:
t.Fatal("Connect() called on sc2 before it completed backing-off.")
case <-shortCtx.Done():
}

sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
select {
case <-sc2.ConnectCh:
case <-ctx.Done():
t.Fatal("Context timed out waiting for Connect() to be called on sc2.")
}

newTfErr := fmt.Errorf("test err: unreachable")
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure, ConnectionError: newTfErr})
select {
Expand Down