Skip to content

Commit 451ff12

Browse files
committed
server: Add nonblockSendChannel utility for non-blocking channel sends
Extract repeated non-blocking channel send pattern into a reusable generic utility function. Replace 5 instances of inline select/case/ default with nonblockSendChannel() calls. Signed-off-by: FUJITA Tomonori <[email protected]>
1 parent 0bfe6cc commit 451ff12

File tree

2 files changed

+20
-32
lines changed

2 files changed

+20
-32
lines changed

pkg/server/fsm.go

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -551,10 +551,9 @@ func (h *fsmHandler) connectLoop(ctx context.Context, wg *sync.WaitGroup) {
551551
}
552552

553553
if err == nil {
554-
select {
555-
case fsm.connCh <- conn:
554+
if nonblockSendChannel(fsm.connCh, conn) {
556555
return
557-
default:
556+
} else {
558557
conn.Close()
559558
fsm.logger.Warn("active conn is closed to avoid being blocked")
560559
}
@@ -877,14 +876,6 @@ func (h *fsmHandler) handlingError(m *bgp.BGPMessage, e error, useRevisedError b
877876
}
878877

879878
func (h *fsmHandler) recvMessageWithError(conn net.Conn, stateReasonCh chan<- fsmStateReason) (*fsmMsg, error) {
880-
sendToStateReasonCh := func(typ fsmStateReasonType, notif *bgp.BGPMessage) {
881-
// probably doesn't happen but be cautious
882-
select {
883-
case stateReasonCh <- *newfsmStateReason(typ, notif, nil):
884-
default:
885-
}
886-
}
887-
888879
headerBuf, err := readAll(conn, bgp.BGP_HEADER_LENGTH)
889880
if errors.Is(err, os.ErrDeadlineExceeded) {
890881
// we set a read deadline when we cancel the FSM handler context,
@@ -893,7 +884,7 @@ func (h *fsmHandler) recvMessageWithError(conn net.Conn, stateReasonCh chan<- fs
893884
// shutting down.
894885
return nil, nil
895886
} else if err != nil {
896-
sendToStateReasonCh(fsmReadFailed, nil)
887+
nonblockSendChannel(stateReasonCh, *newfsmStateReason(fsmReadFailed, nil, nil))
897888
return nil, err
898889
}
899890

@@ -920,7 +911,7 @@ func (h *fsmHandler) recvMessageWithError(conn net.Conn, stateReasonCh chan<- fs
920911
if errors.Is(err, os.ErrDeadlineExceeded) {
921912
return nil, nil
922913
} else if err != nil {
923-
sendToStateReasonCh(fsmReadFailed, nil)
914+
nonblockSendChannel(stateReasonCh, *newfsmStateReason(fsmReadFailed, nil, nil))
924915
return nil, err
925916
}
926917

@@ -1360,14 +1351,6 @@ func (h *fsmHandler) openconfirm(ctx context.Context) (bgp.FSMState, *fsmStateRe
13601351
}
13611352

13621353
func (h *fsmHandler) sendMessageloop(ctx context.Context, conn net.Conn, stateReasonCh chan<- fsmStateReason, wg *sync.WaitGroup) error {
1363-
sendToStateReasonCh := func(typ fsmStateReasonType, notif *bgp.BGPMessage) {
1364-
// probably doesn't happen but be cautious
1365-
select {
1366-
case stateReasonCh <- *newfsmStateReason(typ, notif, nil):
1367-
default:
1368-
}
1369-
}
1370-
13711354
defer wg.Done()
13721355
fsm := h.fsm
13731356
ticker := keepaliveTicker(fsm)
@@ -1394,7 +1377,7 @@ func (h *fsmHandler) sendMessageloop(ctx context.Context, conn net.Conn, stateRe
13941377
slog.String("State", fsm.state.String()),
13951378
slog.Any("Data", err))
13961379

1397-
sendToStateReasonCh(fsmWriteFailed, nil)
1380+
nonblockSendChannel(stateReasonCh, *newfsmStateReason(fsmWriteFailed, nil, nil))
13981381
conn.Close()
13991382
return fmt.Errorf("closed")
14001383
}
@@ -1463,10 +1446,7 @@ func (h *fsmHandler) recvMessageloop(ctx context.Context, conn net.Conn, holdtim
14631446
// if the length of holdtimerResetCh
14641447
// isn't zero, the timer will be reset
14651448
// soon anyway.
1466-
select {
1467-
case holdtimerResetCh <- struct{}{}:
1468-
default:
1469-
}
1449+
nonblockSendChannel(holdtimerResetCh, struct{}{})
14701450
body := m.Body.(*bgp.BGPUpdate)
14711451

14721452
rfMap := h.fsm.familyMap.Load().(map[bgp.Family]bgp.BGPAddPathMode)
@@ -1511,10 +1491,7 @@ func (h *fsmHandler) recvMessageloop(ctx context.Context, conn net.Conn, holdtim
15111491
// if the length of holdtimerResetCh
15121492
// isn't zero, the timer will be reset
15131493
// soon anyway.
1514-
select {
1515-
case holdtimerResetCh <- struct{}{}:
1516-
default:
1517-
}
1494+
nonblockSendChannel(holdtimerResetCh, struct{}{})
15181495
if m.Header.Type == bgp.BGP_MSG_KEEPALIVE {
15191496
doCallback = false
15201497
}
@@ -1541,9 +1518,9 @@ func (h *fsmHandler) recvMessageloop(ctx context.Context, conn net.Conn, holdtim
15411518
hardReset := s.Enabled && s.NotificationEnabled && body.ErrorCode == bgp.BGP_ERROR_CEASE && body.ErrorSubcode == bgp.BGP_ERROR_SUB_HARD_RESET
15421519
h.fsm.lock.Unlock()
15431520
if hardReset {
1544-
stateReasonCh <- *newfsmStateReason(fsmHardReset, m, nil)
1521+
nonblockSendChannel(stateReasonCh, *newfsmStateReason(fsmHardReset, m, nil))
15451522
} else {
1546-
stateReasonCh <- *newfsmStateReason(fsmNotificationRecv, m, nil)
1523+
nonblockSendChannel(stateReasonCh, *newfsmStateReason(fsmNotificationRecv, m, nil))
15471524
}
15481525
}
15491526

pkg/server/util.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,17 @@ import (
2121
"github.com/osrg/gobgp/v4/pkg/packet/bgp"
2222
)
2323

24+
func nonblockSendChannel[T any](ch chan<- T, item T) bool {
25+
select {
26+
case ch <- item:
27+
// sent
28+
return true
29+
default:
30+
// drop the item
31+
return false
32+
}
33+
}
34+
2435
func drainChannel[T any](ch <-chan T) {
2536
for {
2637
select {

0 commit comments

Comments
 (0)