Skip to content

Commit 2ce6f13

Browse files
committed
server: return io error in streaming List RPCs
Return an error of stream.Send() if it happens. Signed-off-by: FUJITA Tomonori <[email protected]>
1 parent 78d53ad commit 2ce6f13

File tree

1 file changed

+85
-22
lines changed

1 file changed

+85
-22
lines changed

pkg/server/grpc_server.go

Lines changed: 85 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -116,25 +116,35 @@ func (s *server) serve() error {
116116
func (s *server) ListDynamicNeighbor(r *api.ListDynamicNeighborRequest, stream api.GoBgpService_ListDynamicNeighborServer) error {
117117
ctx, cancel := context.WithCancel(stream.Context())
118118
defer cancel()
119+
var sendErr error
119120
fn := func(dn *api.DynamicNeighbor) {
120-
if err := stream.Send(&api.ListDynamicNeighborResponse{DynamicNeighbor: dn}); err != nil {
121+
if sendErr = stream.Send(&api.ListDynamicNeighborResponse{DynamicNeighbor: dn}); sendErr != nil {
121122
cancel()
122123
return
123124
}
124125
}
125-
return s.bgpServer.ListDynamicNeighbor(ctx, r, fn)
126+
err := s.bgpServer.ListDynamicNeighbor(ctx, r, fn)
127+
if sendErr != nil {
128+
return sendErr
129+
}
130+
return err
126131
}
127132

128133
func (s *server) ListPeerGroup(r *api.ListPeerGroupRequest, stream api.GoBgpService_ListPeerGroupServer) error {
129134
ctx, cancel := context.WithCancel(stream.Context())
130135
defer cancel()
136+
var sendErr error
131137
fn := func(pg *api.PeerGroup) {
132-
if err := stream.Send(&api.ListPeerGroupResponse{PeerGroup: pg}); err != nil {
138+
if sendErr = stream.Send(&api.ListPeerGroupResponse{PeerGroup: pg}); sendErr != nil {
133139
cancel()
134140
return
135141
}
136142
}
137-
return s.bgpServer.ListPeerGroup(ctx, r, fn)
143+
err := s.bgpServer.ListPeerGroup(ctx, r, fn)
144+
if sendErr != nil {
145+
return sendErr
146+
}
147+
return err
138148
}
139149

140150
func parseHost(host string) (string, string) {
@@ -148,13 +158,18 @@ func parseHost(host string) (string, string) {
148158
func (s *server) ListPeer(r *api.ListPeerRequest, stream api.GoBgpService_ListPeerServer) error {
149159
ctx, cancel := context.WithCancel(stream.Context())
150160
defer cancel()
161+
var sendErr error
151162
fn := func(p *api.Peer) {
152-
if err := stream.Send(&api.ListPeerResponse{Peer: p}); err != nil {
163+
if sendErr = stream.Send(&api.ListPeerResponse{Peer: p}); sendErr != nil {
153164
cancel()
154165
return
155166
}
156167
}
157-
return s.bgpServer.ListPeer(ctx, r, fn)
168+
err := s.bgpServer.ListPeer(ctx, r, fn)
169+
if sendErr != nil {
170+
return sendErr
171+
}
172+
return err
158173
}
159174

160175
func toApiState(s oc.RpkiValidationResultType) api.ValidationState {
@@ -729,12 +744,18 @@ func (s *server) DeleteBmp(ctx context.Context, r *api.DeleteBmpRequest) (*api.D
729744
func (s *server) ListBmp(r *api.ListBmpRequest, stream api.GoBgpService_ListBmpServer) error {
730745
ctx, cancel := context.WithCancel(stream.Context())
731746
defer cancel()
747+
var sendErr error
732748
fn := func(rsp *api.ListBmpResponse_BmpStation) {
733-
if err := stream.Send(&api.ListBmpResponse{Station: rsp}); err != nil {
749+
if sendErr = stream.Send(&api.ListBmpResponse{Station: rsp}); sendErr != nil {
734750
cancel()
751+
return
735752
}
736753
}
737-
return s.bgpServer.ListBmp(ctx, r, fn)
754+
err := s.bgpServer.ListBmp(ctx, r, fn)
755+
if sendErr != nil {
756+
return sendErr
757+
}
758+
return err
738759
}
739760

740761
func (s *server) AddRpki(ctx context.Context, r *api.AddRpkiRequest) (*api.AddRpkiResponse, error) {
@@ -760,23 +781,35 @@ func (s *server) ResetRpki(ctx context.Context, r *api.ResetRpkiRequest) (*api.R
760781
func (s *server) ListRpki(r *api.ListRpkiRequest, stream api.GoBgpService_ListRpkiServer) error {
761782
ctx, cancel := context.WithCancel(stream.Context())
762783
defer cancel()
784+
var sendErr error
763785
fn := func(r *api.Rpki) {
764-
if err := stream.Send(&api.ListRpkiResponse{Server: r}); err != nil {
786+
if sendErr = stream.Send(&api.ListRpkiResponse{Server: r}); sendErr != nil {
765787
cancel()
788+
return
766789
}
767790
}
768-
return s.bgpServer.ListRpki(ctx, r, fn)
791+
err := s.bgpServer.ListRpki(ctx, r, fn)
792+
if sendErr != nil {
793+
return sendErr
794+
}
795+
return err
769796
}
770797

771798
func (s *server) ListRpkiTable(r *api.ListRpkiTableRequest, stream api.GoBgpService_ListRpkiTableServer) error {
772799
ctx, cancel := context.WithCancel(stream.Context())
773800
defer cancel()
801+
var sendErr error
774802
fn := func(r *api.Roa) {
775-
if err := stream.Send(&api.ListRpkiTableResponse{Roa: r}); err != nil {
803+
if sendErr = stream.Send(&api.ListRpkiTableResponse{Roa: r}); sendErr != nil {
776804
cancel()
805+
return
777806
}
778807
}
779-
return s.bgpServer.ListRpkiTable(ctx, r, fn)
808+
err := s.bgpServer.ListRpkiTable(ctx, r, fn)
809+
if sendErr != nil {
810+
return sendErr
811+
}
812+
return err
780813
}
781814

782815
func (s *server) EnableZebra(ctx context.Context, r *api.EnableZebraRequest) (*api.EnableZebraResponse, error) {
@@ -786,12 +819,18 @@ func (s *server) EnableZebra(ctx context.Context, r *api.EnableZebraRequest) (*a
786819
func (s *server) ListVrf(r *api.ListVrfRequest, stream api.GoBgpService_ListVrfServer) error {
787820
ctx, cancel := context.WithCancel(stream.Context())
788821
defer cancel()
822+
var sendErr error
789823
fn := func(v *api.Vrf) {
790-
if err := stream.Send(&api.ListVrfResponse{Vrf: v}); err != nil {
824+
if sendErr = stream.Send(&api.ListVrfResponse{Vrf: v}); sendErr != nil {
791825
cancel()
826+
return
792827
}
793828
}
794-
return s.bgpServer.ListVrf(ctx, r, fn)
829+
err := s.bgpServer.ListVrf(ctx, r, fn)
830+
if sendErr != nil {
831+
return sendErr
832+
}
833+
return err
795834
}
796835

797836
func (s *server) AddVrf(ctx context.Context, r *api.AddVrfRequest) (*api.AddVrfResponse, error) {
@@ -1402,12 +1441,18 @@ var _regexpPrefixMaskLengthRange = regexp.MustCompile(`(\d+)\.\.(\d+)`)
14021441
func (s *server) ListDefinedSet(r *api.ListDefinedSetRequest, stream api.GoBgpService_ListDefinedSetServer) error {
14031442
ctx, cancel := context.WithCancel(stream.Context())
14041443
defer cancel()
1444+
var sendErr error
14051445
fn := func(d *api.DefinedSet) {
1406-
if err := stream.Send(&api.ListDefinedSetResponse{DefinedSet: d}); err != nil {
1446+
if sendErr = stream.Send(&api.ListDefinedSetResponse{DefinedSet: d}); sendErr != nil {
14071447
cancel()
1448+
return
14081449
}
14091450
}
1410-
return s.bgpServer.ListDefinedSet(ctx, r, fn)
1451+
err := s.bgpServer.ListDefinedSet(ctx, r, fn)
1452+
if sendErr != nil {
1453+
return sendErr
1454+
}
1455+
return err
14111456
}
14121457

14131458
func (s *server) AddDefinedSet(ctx context.Context, r *api.AddDefinedSetRequest) (*api.AddDefinedSetResponse, error) {
@@ -2161,12 +2206,18 @@ func newStatementFromApiStruct(a *api.Statement) (*table.Statement, error) {
21612206
func (s *server) ListStatement(r *api.ListStatementRequest, stream api.GoBgpService_ListStatementServer) error {
21622207
ctx, cancel := context.WithCancel(stream.Context())
21632208
defer cancel()
2209+
var sendErr error
21642210
fn := func(s *api.Statement) {
2165-
if err := stream.Send(&api.ListStatementResponse{Statement: s}); err != nil {
2211+
if sendErr = stream.Send(&api.ListStatementResponse{Statement: s}); sendErr != nil {
21662212
cancel()
2213+
return
21672214
}
21682215
}
2169-
return s.bgpServer.ListStatement(ctx, r, fn)
2216+
err := s.bgpServer.ListStatement(ctx, r, fn)
2217+
if sendErr != nil {
2218+
return sendErr
2219+
}
2220+
return err
21702221
}
21712222

21722223
func (s *server) AddStatement(ctx context.Context, r *api.AddStatementRequest) (*api.AddStatementResponse, error) {
@@ -2243,12 +2294,18 @@ func newRoaListFromTableStructList(origin []*table.ROA) []*api.Roa {
22432294
func (s *server) ListPolicy(r *api.ListPolicyRequest, stream api.GoBgpService_ListPolicyServer) error {
22442295
ctx, cancel := context.WithCancel(stream.Context())
22452296
defer cancel()
2297+
var sendErr error
22462298
fn := func(p *api.Policy) {
2247-
if err := stream.Send(&api.ListPolicyResponse{Policy: p}); err != nil {
2299+
if sendErr = stream.Send(&api.ListPolicyResponse{Policy: p}); sendErr != nil {
22482300
cancel()
2301+
return
22492302
}
22502303
}
2251-
return s.bgpServer.ListPolicy(ctx, r, fn)
2304+
err := s.bgpServer.ListPolicy(ctx, r, fn)
2305+
if sendErr != nil {
2306+
return sendErr
2307+
}
2308+
return err
22522309
}
22532310

22542311
func (s *server) AddPolicy(ctx context.Context, r *api.AddPolicyRequest) (*api.AddPolicyResponse, error) {
@@ -2262,12 +2319,18 @@ func (s *server) DeletePolicy(ctx context.Context, r *api.DeletePolicyRequest) (
22622319
func (s *server) ListPolicyAssignment(r *api.ListPolicyAssignmentRequest, stream api.GoBgpService_ListPolicyAssignmentServer) error {
22632320
ctx, cancel := context.WithCancel(stream.Context())
22642321
defer cancel()
2322+
var sendErr error
22652323
fn := func(a *api.PolicyAssignment) {
2266-
if err := stream.Send(&api.ListPolicyAssignmentResponse{Assignment: a}); err != nil {
2324+
if sendErr = stream.Send(&api.ListPolicyAssignmentResponse{Assignment: a}); sendErr != nil {
22672325
cancel()
2326+
return
22682327
}
22692328
}
2270-
return s.bgpServer.ListPolicyAssignment(ctx, r, fn)
2329+
err := s.bgpServer.ListPolicyAssignment(ctx, r, fn)
2330+
if sendErr != nil {
2331+
return sendErr
2332+
}
2333+
return err
22712334
}
22722335

22732336
func defaultRouteType(d api.RouteAction) table.RouteType {

0 commit comments

Comments
 (0)