From da348aec59b69b5cf8fc17da7d17ebab412cb9dd Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 18 Apr 2026 17:16:19 +0900 Subject: [PATCH 01/14] feat(raft): replace unary Send with long-lived client-streaming SendStream per peer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem Per-peer heartbeat channels (cap=64) fill up because the unary Send RPC blocks for one full RTT per message. At 100 heartbeats/s per peer, any RTT above ~640 ms causes the channel to overflow and messages to be dropped. Disk I/O and CPU are not the bottleneck; the RTT wait in the dispatch worker is the root cause. ## Solution Add SendStream (client-streaming RPC) to the EtcdRaft gRPC service. stream.Send() writes to the gRPC send buffer and returns immediately — no RTT wait — so the dispatch worker can enqueue messages at line rate. ### Protocol - New proto RPC: SendStream(stream EtcdRaftMessage) returns (EtcdRaftAck) - Old unary Send RPC kept for backward compatibility ### Sender (GRPCTransport) - One long-lived peerStream per peer node, opened on first dispatch - getOrOpenStream: double-checked locking without holding t.mu during dial, avoiding lock-order inversion with t.mu (clientFor) vs streamsMu - On stream.Send error: close stream, return error; Raft retransmits - Backward compat: codes.Unimplemented → add to noStream map, fall back to dispatchUnary (old path); noStream cleared on UpsertPeer address change - Close/RemovePeer/UpsertPeer: release t.mu before calling closeStream to maintain lock-order invariant (streamsMu → t.mu, never the reverse) ### Receiver (GRPCTransport) - SendStream server handler: recv loop → handle each message → SendAndClose ### Dispatch goroutine model (Engine) - Single multiplexing goroutine per peer (runMultiplexDispatchWorker) replaces the previous two-goroutine model; gRPC stream.Send is not goroutine-safe so a single writer is required - Biased-select pattern: drainPriorityChannel (non-blocking heartbeat drain) before waitForChannel (blocks on either channel), ensuring heartbeats are never starved by normal log-entry traffic - runDispatchWorker kept for backward compatibility with existing tests --- internal/raftengine/etcd/engine.go | 80 ++++++- internal/raftengine/etcd/grpc_transport.go | 198 ++++++++++++++++-- .../raftengine/etcd/grpc_transport_test.go | 6 + proto/etcd_raft.pb.go | 18 +- proto/etcd_raft.proto | 1 + proto/etcd_raft_grpc.pb.go | 33 ++- 6 files changed, 311 insertions(+), 25 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 6e6c41b05..8f5ce2118 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -2019,11 +2019,8 @@ func (e *Engine) startPeerDispatcher(nodeID uint64) { cancel: cancel, } e.peerDispatchers[nodeID] = pd - workers := []chan dispatchRequest{pd.normal, pd.heartbeat} - e.dispatchWG.Add(len(workers)) - for _, w := range workers { - go e.runDispatchWorker(ctx, w) - } + e.dispatchWG.Add(1) + go e.runMultiplexDispatchWorker(ctx, pd) } // runDispatchWorker drains ch until the channel is closed, the engine stops, @@ -2053,6 +2050,79 @@ func (e *Engine) runDispatchWorker(ctx context.Context, ch chan dispatchRequest) } } +// runMultiplexDispatchWorker is the single per-peer goroutine that reads from +// both pd.heartbeat and pd.normal with a biased select: it drains the +// heartbeat channel completely before waiting on the normal channel, bounding +// heartbeat delay to one normal-message send time. +// +// A single writer goroutine per peer is required because gRPC stream.Send is +// not goroutine-safe. The biased-select replaces the two-goroutine model from +// runDispatchWorker while preserving heartbeat priority. +func (e *Engine) runMultiplexDispatchWorker(ctx context.Context, pd *peerQueues) { + defer e.dispatchWG.Done() + for { + drained, stop := e.drainPriorityChannel(ctx, pd) + if stop { + return + } + if drained { + continue // re-check priority before waiting + } + if e.waitForChannel(ctx, pd) { + return + } + } +} + +// drainPriorityChannel non-blockingly dequeues one heartbeat message. +// Returns (true, false) when a message was processed (caller should loop), +// (false, true) when the worker must stop, (false, false) when no message +// was pending (caller should block on waitForChannel). +func (e *Engine) drainPriorityChannel(ctx context.Context, pd *peerQueues) (drained bool, stop bool) { + select { + case <-e.dispatchStopCh: + return false, true + case <-ctx.Done(): + return false, true + case req, ok := <-pd.heartbeat: + stop = e.handleChannelReq(ctx, req, ok) + return !stop, stop + default: + return false, false + } +} + +// waitForChannel blocks until a message arrives on either channel or the +// worker should stop. Returns true if the worker must exit. +func (e *Engine) waitForChannel(ctx context.Context, pd *peerQueues) (stop bool) { + select { + case <-e.dispatchStopCh: + return true + case <-ctx.Done(): + return true + case req, ok := <-pd.heartbeat: + return e.handleChannelReq(ctx, req, ok) + case req, ok := <-pd.normal: + return e.handleChannelReq(ctx, req, ok) + } +} + +// handleChannelReq processes one dequeued dispatch request. +// Returns true if the worker must stop (channel closed or context cancelled). +func (e *Engine) handleChannelReq(ctx context.Context, req dispatchRequest, ok bool) (stop bool) { + if !ok { + return true + } + if ctx.Err() != nil { + if err := req.Close(); err != nil { + slog.Error("etcd raft dispatch: failed to close request", "err", err) + } + return true + } + e.handleDispatchRequest(ctx, req) + return false +} + func (e *Engine) handleDispatchRequest(ctx context.Context, req dispatchRequest) { dispatchErr := e.dispatchTransport(ctx, req) if err := req.Close(); err != nil { diff --git a/internal/raftengine/etcd/grpc_transport.go b/internal/raftengine/etcd/grpc_transport.go index 6ab53d3d7..c221c22ce 100644 --- a/internal/raftengine/etcd/grpc_transport.go +++ b/internal/raftengine/etcd/grpc_transport.go @@ -14,6 +14,8 @@ import ( raftpb "go.etcd.io/raft/v3/raftpb" "golang.org/x/sync/singleflight" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const defaultSnapshotChunkSize = 16 << 20 @@ -33,8 +35,20 @@ var ( errSnapshotMetadataDuplicate = errors.New("etcd raft snapshot metadata was sent more than once") errSnapshotMessageNil = errors.New("etcd raft snapshot message is required") errSnapshotStreamShort = errors.New("etcd raft snapshot stream closed before final chunk") + // errStreamNotSupported is returned by getOrOpenStream when the remote peer + // responded with codes.Unimplemented on a previous SendStream attempt. + // dispatchRegular falls back to the unary Send path on this error. + errStreamNotSupported = errors.New("etcd raft peer does not support SendStream") ) +// peerStream holds a long-lived client-streaming gRPC stream to one peer. +// cancel tears down the stream context; the transport deletes the entry on +// any Send error and re-opens on the next dispatch attempt. +type peerStream struct { + stream pb.EtcdRaft_SendStreamClient + cancel context.CancelFunc +} + var grpcNewClient = grpc.NewClient type MessageHandler func(context.Context, raftpb.Message) error @@ -61,6 +75,16 @@ type GRPCTransport struct { // that aggregate in-memory allocation stays bounded even when multiple // dispatch workers run simultaneously. bridgeSem chan struct{} + + // streamsMu protects streams and noStream. + // Lock ordering: always acquire streamsMu before t.mu (never the reverse). + streamsMu sync.Mutex + // streams holds one long-lived SendStream RPC per peer node ID. + // Each entry is owned by the single per-peer multiplexing dispatch goroutine. + streams map[uint64]*peerStream + // noStream records peers that returned codes.Unimplemented on SendStream; + // dispatchRegular falls back to unary Send for those peers. + noStream map[uint64]struct{} } func NewGRPCTransport(peers []Peer) *GRPCTransport { @@ -83,6 +107,8 @@ func NewGRPCTransport(peers []Peer) *GRPCTransport { conns: make(map[string]*grpc.ClientConn), snapshotChunkSize: defaultSnapshotChunkSize, bridgeSem: make(chan struct{}, defaultBridgeMaterializeLimit), + streams: make(map[uint64]*peerStream), + noStream: make(map[uint64]struct{}), } } @@ -145,34 +171,53 @@ func (t *GRPCTransport) UpsertPeer(peer Peer) { if t == nil || peer.NodeID == 0 { return } + addressChanged := false t.mu.Lock() - defer t.mu.Unlock() - if existing, ok := t.peers[peer.NodeID]; ok && existing.Address != "" && existing.Address != peer.Address { t.closePeerConnLocked(existing.Address) + addressChanged = true } t.peers[peer.NodeID] = peer + t.mu.Unlock() + + // Clear stream state outside mu to avoid lock-order inversion + // (getOrOpenStream acquires streamsMu then mu; callers of UpsertPeer must + // not hold streamsMu when calling here). + if addressChanged { + t.closeStream(peer.NodeID) + t.clearNoStream(peer.NodeID) + } } func (t *GRPCTransport) RemovePeer(nodeID uint64) { if t == nil || nodeID == 0 { return } + removed := false t.mu.Lock() - defer t.mu.Unlock() - peer, ok := t.peers[nodeID] - if !ok { - return + if ok { + delete(t.peers, nodeID) + t.closePeerConnLocked(peer.Address) + removed = true + } + t.mu.Unlock() + + // Tear down the stream outside mu to avoid lock-order inversion. + if removed { + t.closeStream(nodeID) + t.clearNoStream(nodeID) } - delete(t.peers, nodeID) - t.closePeerConnLocked(peer.Address) } func (t *GRPCTransport) Close() error { if t == nil { return nil } + // Cancel all streams before closing connections so in-flight Send calls + // fail cleanly rather than blocking on a half-closed TCP connection. + t.closeAllStreams() + t.mu.Lock() defer t.mu.Unlock() @@ -319,14 +364,32 @@ func (t *GRPCTransport) applyBridgeMode(ctx context.Context, msg raftpb.Message) } func (t *GRPCTransport) dispatchRegular(ctx context.Context, msg raftpb.Message) error { - ctx, cancel := transportContext(ctx, defaultDispatchTimeout) - defer cancel() - raw, err := msg.Marshal() if err != nil { return errors.WithStack(err) } - client, err := t.clientFor(msg.To) + + stream, err := t.getOrOpenStream(msg.To) + if err != nil { + if errors.Is(err, errStreamNotSupported) { + return t.dispatchUnary(ctx, raw, msg.To) + } + return err + } + + if err := stream.Send(&pb.EtcdRaftMessage{Message: raw}); err != nil { + t.closeStream(msg.To) + return errors.WithStack(err) + } + return nil +} + +// dispatchUnary sends a single Raft message via the legacy unary Send RPC. +// Used as a fallback when the remote peer does not support SendStream. +func (t *GRPCTransport) dispatchUnary(ctx context.Context, raw []byte, to uint64) error { + ctx, cancel := transportContext(ctx, defaultDispatchTimeout) + defer cancel() + client, err := t.clientFor(to) if err != nil { return err } @@ -371,6 +434,28 @@ func (t *GRPCTransport) Send(ctx context.Context, req *pb.EtcdRaftMessage) (*pb. return &pb.EtcdRaftAck{}, nil } +// SendStream is the server-side handler for the client-streaming SendStream RPC. +// The client sends a sequence of Raft messages over one long-lived stream; +// this handler processes each one and closes with a single EtcdRaftAck. +func (t *GRPCTransport) SendStream(stream pb.EtcdRaft_SendStreamServer) error { + for { + req, err := stream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + return errors.WithStack(stream.SendAndClose(&pb.EtcdRaftAck{})) + } + return errors.WithStack(err) + } + var msg raftpb.Message + if err := msg.Unmarshal(req.Message); err != nil { + return errors.WithStack(err) + } + if err := t.handle(stream.Context(), msg); err != nil { + return err + } + } +} + func (t *GRPCTransport) sendSnapshot(ctx context.Context, msg raftpb.Message) error { client, err := t.clientFor(msg.To) if err != nil { @@ -689,6 +774,95 @@ func appendSnapshotChunk(metadata *raftpb.Message, payload io.Writer, chunk *pb. return seenMetadata, nil } +// getOrOpenStream returns the live SendStream to nodeID, opening one if +// necessary. Returns errStreamNotSupported if the peer previously returned +// codes.Unimplemented; callers should fall back to the unary Send path. +// +// Lock ordering: this method acquires streamsMu without holding t.mu; it +// calls clientFor (which may acquire t.mu) only after releasing streamsMu. +func (t *GRPCTransport) getOrOpenStream(nodeID uint64) (pb.EtcdRaft_SendStreamClient, error) { + // Fast path: stream already open or peer known to be unary-only. + t.streamsMu.Lock() + _, skip := t.noStream[nodeID] + ps, ok := t.streams[nodeID] + t.streamsMu.Unlock() + + if skip { + return nil, errStreamNotSupported + } + if ok { + return ps.stream, nil + } + + // Need a new stream. Dial without holding streamsMu to avoid lock-order + // inversion with t.mu (clientFor acquires t.mu internally). + client, err := t.clientFor(nodeID) + if err != nil { + return nil, err + } + + // Re-acquire streamsMu to install the stream atomically. + t.streamsMu.Lock() + defer t.streamsMu.Unlock() + + // Re-check: another goroutine or closeStream may have raced. + if _, skip = t.noStream[nodeID]; skip { + return nil, errStreamNotSupported + } + if ps, ok = t.streams[nodeID]; ok { + return ps.stream, nil + } + + streamCtx, cancel := context.WithCancel(context.Background()) + stream, err := client.SendStream(streamCtx) + if err != nil { + cancel() + if status.Code(err) == codes.Unimplemented { + t.noStream[nodeID] = struct{}{} + return nil, errStreamNotSupported + } + return nil, errors.WithStack(err) + } + t.streams[nodeID] = &peerStream{stream: stream, cancel: cancel} + return stream, nil +} + +// closeStream tears down the SendStream for nodeID. Safe to call with no +// stream open. The caller must not hold streamsMu. +func (t *GRPCTransport) closeStream(nodeID uint64) { + t.streamsMu.Lock() + ps, ok := t.streams[nodeID] + if ok { + delete(t.streams, nodeID) + } + t.streamsMu.Unlock() + if ok { + ps.cancel() + } +} + +// clearNoStream removes nodeID from the noStream set so that the next +// dispatch attempt will probe for SendStream support again (e.g. after an +// upgrade that adds streaming to a previously unary-only peer). +func (t *GRPCTransport) clearNoStream(nodeID uint64) { + t.streamsMu.Lock() + delete(t.noStream, nodeID) + t.streamsMu.Unlock() +} + +// closeAllStreams cancels every open stream and resets the streams/noStream +// maps. Called by Close before tearing down the underlying connections. +func (t *GRPCTransport) closeAllStreams() { + t.streamsMu.Lock() + old := t.streams + t.streams = make(map[uint64]*peerStream) + t.noStream = make(map[uint64]struct{}) + t.streamsMu.Unlock() + for _, ps := range old { + ps.cancel() + } +} + func buildSnapshotMessage(metadata raftpb.Message, spool *snapshotSpool, seenMetadata bool) (raftpb.Message, error) { if !seenMetadata || metadata.Snapshot == nil { return raftpb.Message{}, errors.WithStack(errSnapshotMetadataNil) diff --git a/internal/raftengine/etcd/grpc_transport_test.go b/internal/raftengine/etcd/grpc_transport_test.go index 13a5b9801..bf2a4769a 100644 --- a/internal/raftengine/etcd/grpc_transport_test.go +++ b/internal/raftengine/etcd/grpc_transport_test.go @@ -16,7 +16,9 @@ import ( "github.com/stretchr/testify/require" raftpb "go.etcd.io/raft/v3/raftpb" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) func TestTransportContextAppliesTimeoutWhenUnset(t *testing.T) { @@ -357,6 +359,10 @@ func (c *testEtcdRaftClient) Send(_ context.Context, _ *pb.EtcdRaftMessage, _ .. return &pb.EtcdRaftAck{}, nil } +func (c *testEtcdRaftClient) SendStream(_ context.Context, _ ...grpc.CallOption) (pb.EtcdRaft_SendStreamClient, error) { + return nil, status.Error(codes.Unimplemented, "SendStream not implemented in test mock") +} + func (c *testEtcdRaftClient) SendSnapshot(_ context.Context, _ ...grpc.CallOption) (pb.EtcdRaft_SendSnapshotClient, error) { return c.stream, nil } diff --git a/proto/etcd_raft.pb.go b/proto/etcd_raft.pb.go index bba5e7917..9eb466a73 100644 --- a/proto/etcd_raft.pb.go +++ b/proto/etcd_raft.pb.go @@ -172,9 +172,11 @@ const file_etcd_raft_proto_rawDesc = "" + "\bmetadata\x18\x01 \x01(\fR\bmetadata\x12\x14\n" + "\x05chunk\x18\x02 \x01(\fR\x05chunk\x12\x14\n" + "\x05final\x18\x03 \x01(\bR\x05final\"\r\n" + - "\vEtcdRaftAck2n\n" + + "\vEtcdRaftAck2\xa0\x01\n" + "\bEtcdRaft\x12(\n" + - "\x04Send\x12\x10.EtcdRaftMessage\x1a\f.EtcdRaftAck\"\x00\x128\n" + + "\x04Send\x12\x10.EtcdRaftMessage\x1a\f.EtcdRaftAck\"\x00\x120\n" + + "\n" + + "SendStream\x12\x10.EtcdRaftMessage\x1a\f.EtcdRaftAck\"\x00(\x01\x128\n" + "\fSendSnapshot\x12\x16.EtcdRaftSnapshotChunk\x1a\f.EtcdRaftAck\"\x00(\x01B#Z!github.com/bootjp/elastickv/protob\x06proto3" var ( @@ -197,11 +199,13 @@ var file_etcd_raft_proto_goTypes = []any{ } var file_etcd_raft_proto_depIdxs = []int32{ 0, // 0: EtcdRaft.Send:input_type -> EtcdRaftMessage - 1, // 1: EtcdRaft.SendSnapshot:input_type -> EtcdRaftSnapshotChunk - 2, // 2: EtcdRaft.Send:output_type -> EtcdRaftAck - 2, // 3: EtcdRaft.SendSnapshot:output_type -> EtcdRaftAck - 2, // [2:4] is the sub-list for method output_type - 0, // [0:2] is the sub-list for method input_type + 0, // 1: EtcdRaft.SendStream:input_type -> EtcdRaftMessage + 1, // 2: EtcdRaft.SendSnapshot:input_type -> EtcdRaftSnapshotChunk + 2, // 3: EtcdRaft.Send:output_type -> EtcdRaftAck + 2, // 4: EtcdRaft.SendStream:output_type -> EtcdRaftAck + 2, // 5: EtcdRaft.SendSnapshot:output_type -> EtcdRaftAck + 3, // [3:6] is the sub-list for method output_type + 0, // [0:3] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/proto/etcd_raft.proto b/proto/etcd_raft.proto index 87c0b2a73..e043eb8f7 100644 --- a/proto/etcd_raft.proto +++ b/proto/etcd_raft.proto @@ -4,6 +4,7 @@ option go_package = "github.com/bootjp/elastickv/proto"; service EtcdRaft { rpc Send(EtcdRaftMessage) returns (EtcdRaftAck) {} + rpc SendStream(stream EtcdRaftMessage) returns (EtcdRaftAck) {} rpc SendSnapshot(stream EtcdRaftSnapshotChunk) returns (EtcdRaftAck) {} } diff --git a/proto/etcd_raft_grpc.pb.go b/proto/etcd_raft_grpc.pb.go index 8c0a5915d..74b349bef 100644 --- a/proto/etcd_raft_grpc.pb.go +++ b/proto/etcd_raft_grpc.pb.go @@ -20,6 +20,7 @@ const _ = grpc.SupportPackageIsVersion9 const ( EtcdRaft_Send_FullMethodName = "/EtcdRaft/Send" + EtcdRaft_SendStream_FullMethodName = "/EtcdRaft/SendStream" EtcdRaft_SendSnapshot_FullMethodName = "/EtcdRaft/SendSnapshot" ) @@ -28,6 +29,7 @@ const ( // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type EtcdRaftClient interface { Send(ctx context.Context, in *EtcdRaftMessage, opts ...grpc.CallOption) (*EtcdRaftAck, error) + SendStream(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[EtcdRaftMessage, EtcdRaftAck], error) SendSnapshot(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[EtcdRaftSnapshotChunk, EtcdRaftAck], error) } @@ -49,9 +51,22 @@ func (c *etcdRaftClient) Send(ctx context.Context, in *EtcdRaftMessage, opts ... return out, nil } +func (c *etcdRaftClient) SendStream(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[EtcdRaftMessage, EtcdRaftAck], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &EtcdRaft_ServiceDesc.Streams[0], EtcdRaft_SendStream_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[EtcdRaftMessage, EtcdRaftAck]{ClientStream: stream} + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type EtcdRaft_SendStreamClient = grpc.ClientStreamingClient[EtcdRaftMessage, EtcdRaftAck] + func (c *etcdRaftClient) SendSnapshot(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[EtcdRaftSnapshotChunk, EtcdRaftAck], error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - stream, err := c.cc.NewStream(ctx, &EtcdRaft_ServiceDesc.Streams[0], EtcdRaft_SendSnapshot_FullMethodName, cOpts...) + stream, err := c.cc.NewStream(ctx, &EtcdRaft_ServiceDesc.Streams[1], EtcdRaft_SendSnapshot_FullMethodName, cOpts...) if err != nil { return nil, err } @@ -67,6 +82,7 @@ type EtcdRaft_SendSnapshotClient = grpc.ClientStreamingClient[EtcdRaftSnapshotCh // for forward compatibility. type EtcdRaftServer interface { Send(context.Context, *EtcdRaftMessage) (*EtcdRaftAck, error) + SendStream(grpc.ClientStreamingServer[EtcdRaftMessage, EtcdRaftAck]) error SendSnapshot(grpc.ClientStreamingServer[EtcdRaftSnapshotChunk, EtcdRaftAck]) error mustEmbedUnimplementedEtcdRaftServer() } @@ -81,6 +97,9 @@ type UnimplementedEtcdRaftServer struct{} func (UnimplementedEtcdRaftServer) Send(context.Context, *EtcdRaftMessage) (*EtcdRaftAck, error) { return nil, status.Error(codes.Unimplemented, "method Send not implemented") } +func (UnimplementedEtcdRaftServer) SendStream(grpc.ClientStreamingServer[EtcdRaftMessage, EtcdRaftAck]) error { + return status.Error(codes.Unimplemented, "method SendStream not implemented") +} func (UnimplementedEtcdRaftServer) SendSnapshot(grpc.ClientStreamingServer[EtcdRaftSnapshotChunk, EtcdRaftAck]) error { return status.Error(codes.Unimplemented, "method SendSnapshot not implemented") } @@ -123,6 +142,13 @@ func _EtcdRaft_Send_Handler(srv interface{}, ctx context.Context, dec func(inter return interceptor(ctx, in, info, handler) } +func _EtcdRaft_SendStream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(EtcdRaftServer).SendStream(&grpc.GenericServerStream[EtcdRaftMessage, EtcdRaftAck]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type EtcdRaft_SendStreamServer = grpc.ClientStreamingServer[EtcdRaftMessage, EtcdRaftAck] + func _EtcdRaft_SendSnapshot_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(EtcdRaftServer).SendSnapshot(&grpc.GenericServerStream[EtcdRaftSnapshotChunk, EtcdRaftAck]{ServerStream: stream}) } @@ -143,6 +169,11 @@ var EtcdRaft_ServiceDesc = grpc.ServiceDesc{ }, }, Streams: []grpc.StreamDesc{ + { + StreamName: "SendStream", + Handler: _EtcdRaft_SendStream_Handler, + ClientStreams: true, + }, { StreamName: "SendSnapshot", Handler: _EtcdRaft_SendSnapshot_Handler, From 444eeb14234705bc466269d176dc20bd8c5094d7 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 18 Apr 2026 17:42:00 +0900 Subject: [PATCH 02/14] fix(raft): drain all pending dispatch requests on worker exit and fix doc issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - runMultiplexDispatchWorker now defers drainPendingRequests so every buffered dispatchRequest in both priority and normal channels is closed when the per-peer context is cancelled (e.g. peer removal), not just the first one encountered (addresses Gemini review) - Expand defaultHeartbeatBufPerPeer comment to list all Resp variants routed to the priority channel (MsgHeartbeatResp, MsgVoteResp, etc.) - Fix design doc: heartbeat channel capacity shown as defaultHeartbeatBufPerPeer=64 rather than the incorrect MaxInflightMsg - Fix design doc: §3.6 → §4.1 (was under the wrong heading level) - Fix design doc: replace non-existent defaultDispatchWorkersPerPeer=2 with the accurate single-multiplexing-goroutine description - Update status header from "proposed" to "implemented" --- docs/design/raft-grpc-streaming-transport.md | 15 ++++----- internal/raftengine/etcd/engine.go | 34 +++++++++++++++++--- 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/docs/design/raft-grpc-streaming-transport.md b/docs/design/raft-grpc-streaming-transport.md index 3a4ff426c..bee58bf16 100644 --- a/docs/design/raft-grpc-streaming-transport.md +++ b/docs/design/raft-grpc-streaming-transport.md @@ -1,9 +1,8 @@ # Design: gRPC Streaming Transport for Raft Messages -> **Status: proposed — not yet implemented.** +> **Status: implemented.** > PR #522 delivers the per-peer dispatch channel foundation described in §2. -> This document specifies the next step: replacing per-message unary RPCs with -> a long-lived client-streaming gRPC connection per peer. +> This document specifies the streaming transport replacement implemented on top of it. ## 1. Background and motivation @@ -22,9 +21,9 @@ messages per follower before receiving ACK. A single dispatch worker serialises those 256 sends: at 1 ms RTT, throughput is capped at ~1 000 msg/s per peer regardless of bandwidth. -PR #522 introduced per-peer dispatch channels and `defaultDispatchWorkersPerPeer = 2` -(one goroutine for normal messages, one dedicated to heartbeats) to eliminate -cross-peer head-of-line blocking. The remaining bottleneck is the +PR #522 introduced per-peer dispatch channels with a single multiplexing dispatch +goroutine per peer (biased-select: heartbeat priority over normal messages) to +eliminate cross-peer head-of-line blocking. The remaining bottleneck is the per-message RTT of unary gRPC. ### Goal @@ -52,7 +51,7 @@ Engine run loop │ ↓ │ receive EtcdRaftAck │ - └─ heartbeat messages → peerDispatchers[nodeID].heartbeat (chan, cap MaxInflightMsg) + └─ heartbeat messages → peerDispatchers[nodeID].heartbeat (chan, cap defaultHeartbeatBufPerPeer=64) ↓ runDispatchWorker (1 goroutine/peer, dedicated) ↓ @@ -199,7 +198,7 @@ be used to skip the probe after the first successful stream is established. | **Ordering** | Single stream per peer preserves FIFO — safe for Raft | | **Heartbeat starvation** | Under a burst of log entries, heartbeats could be delayed in the send buffer. Mitigated via biased-select (see below). | -### 3.6 Heartbeat starvation mitigation — biased select +### 4.1 Heartbeat starvation mitigation — biased select When the multiplexing dispatch worker (Option A from §3.2) reads from both channels, a sustained `MsgApp` burst can delay heartbeats. The mitigation is diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 8f5ce2118..377d5b57b 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -33,10 +33,11 @@ const ( defaultMaxInflightMsg = 256 defaultMaxSizePerMsg = 1 << 20 // defaultHeartbeatBufPerPeer is the capacity of the priority dispatch channel. - // It carries only truly low-frequency control messages (heartbeats, votes, - // read-index, leader-transfer). MsgAppResp is intentionally kept in the - // normal channel: followers — the only senders of MsgAppResp — do not send - // MsgApp, so there is no head-of-line blocking risk there. + // It carries low-latency control messages: MsgHeartbeat, MsgHeartbeatResp, + // MsgVote, MsgVoteResp, MsgPreVote, MsgPreVoteResp, MsgReadIndex, + // MsgReadIndexResp, and MsgTimeoutNow. MsgAppResp is intentionally kept in + // the normal channel: followers — the only senders of MsgAppResp — do not + // send MsgApp, so there is no head-of-line blocking risk there. defaultHeartbeatBufPerPeer = 64 defaultSnapshotEvery = 10_000 defaultSnapshotQueueSize = 1 @@ -2060,6 +2061,7 @@ func (e *Engine) runDispatchWorker(ctx context.Context, ch chan dispatchRequest) // runDispatchWorker while preserving heartbeat priority. func (e *Engine) runMultiplexDispatchWorker(ctx context.Context, pd *peerQueues) { defer e.dispatchWG.Done() + defer e.drainPendingRequests(pd) for { drained, stop := e.drainPriorityChannel(ctx, pd) if stop { @@ -2107,6 +2109,30 @@ func (e *Engine) waitForChannel(ctx context.Context, pd *peerQueues) (stop bool) } } +// drainPendingRequests closes all messages remaining in pd's channels. +// Called as a deferred step when the multiplexing worker exits so that any +// buffered dispatchRequests are properly released regardless of why the +// worker stopped. +func (e *Engine) drainPendingRequests(pd *peerQueues) { + drainCh := func(ch <-chan dispatchRequest) { + for { + select { + case req, ok := <-ch: + if !ok { + return + } + if err := req.Close(); err != nil { + slog.Error("etcd raft dispatch: failed to close pending request", "err", err) + } + default: + return + } + } + } + drainCh(pd.heartbeat) + drainCh(pd.normal) +} + // handleChannelReq processes one dequeued dispatch request. // Returns true if the worker must stop (channel closed or context cancelled). func (e *Engine) handleChannelReq(ctx context.Context, req dispatchRequest, ok bool) (stop bool) { From 40d65a06f8598ec7c17895cf4c244818d0c649fe Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 18 Apr 2026 17:51:21 +0900 Subject: [PATCH 03/14] fix(raft): address CodeRabbit and Gemini review comments on streaming transport - closePeerConnLocked now returns the set of node IDs that were using the closed address; UpsertPeer and RemovePeer clear stream/noStream for all of them, so a peer that upgrades to support streaming at the same address is re-probed on the next dispatch rather than staying stuck in unary mode - SendStream server handler now continues on errStepQueueFull instead of tearing down the stream; transient backpressure drops are logged and the long-lived stream is preserved (matches unary Send behaviour) - dispatchRegular checks ctx.Err() before attempting stream.Send so a cancelled dispatch context is honoured on the streaming path - streamsMu changed from sync.Mutex to sync.RWMutex; fast-path reads in getOrOpenStream use RLock to reduce contention on the dispatch hot path - Lock-ordering comment updated from "acquire streamsMu before t.mu" to the accurate invariant: never hold both simultaneously --- internal/raftengine/etcd/grpc_transport.go | 75 ++++++++++++++-------- 1 file changed, 50 insertions(+), 25 deletions(-) diff --git a/internal/raftengine/etcd/grpc_transport.go b/internal/raftengine/etcd/grpc_transport.go index c221c22ce..64d3185cf 100644 --- a/internal/raftengine/etcd/grpc_transport.go +++ b/internal/raftengine/etcd/grpc_transport.go @@ -77,8 +77,12 @@ type GRPCTransport struct { bridgeSem chan struct{} // streamsMu protects streams and noStream. - // Lock ordering: always acquire streamsMu before t.mu (never the reverse). - streamsMu sync.Mutex + // Invariant: never hold streamsMu and t.mu simultaneously. All paths that + // need both release the first before acquiring the second: + // getOrOpenStream: releases streamsMu before calling clientFor (t.mu), then re-acquires streamsMu. + // UpsertPeer/RemovePeer: release t.mu before calling closeStream/clearNoStream (streamsMu). + // Using RWMutex so concurrent reads on the hot dispatch path do not contend. + streamsMu sync.RWMutex // streams holds one long-lived SendStream RPC per peer node ID. // Each entry is owned by the single per-peer multiplexing dispatch goroutine. streams map[uint64]*peerStream @@ -171,21 +175,18 @@ func (t *GRPCTransport) UpsertPeer(peer Peer) { if t == nil || peer.NodeID == 0 { return } - addressChanged := false t.mu.Lock() + var closedNodeIDs []uint64 if existing, ok := t.peers[peer.NodeID]; ok && existing.Address != "" && existing.Address != peer.Address { - t.closePeerConnLocked(existing.Address) - addressChanged = true + closedNodeIDs = t.closePeerConnLocked(existing.Address) } t.peers[peer.NodeID] = peer t.mu.Unlock() - // Clear stream state outside mu to avoid lock-order inversion - // (getOrOpenStream acquires streamsMu then mu; callers of UpsertPeer must - // not hold streamsMu when calling here). - if addressChanged { - t.closeStream(peer.NodeID) - t.clearNoStream(peer.NodeID) + // Clear stream state outside mu (never hold streamsMu and t.mu simultaneously). + for _, id := range closedNodeIDs { + t.closeStream(id) + t.clearNoStream(id) } } @@ -193,20 +194,18 @@ func (t *GRPCTransport) RemovePeer(nodeID uint64) { if t == nil || nodeID == 0 { return } - removed := false t.mu.Lock() - peer, ok := t.peers[nodeID] - if ok { + var closedNodeIDs []uint64 + if peer, ok := t.peers[nodeID]; ok { delete(t.peers, nodeID) - t.closePeerConnLocked(peer.Address) - removed = true + closedNodeIDs = t.closePeerConnLocked(peer.Address) } t.mu.Unlock() - // Tear down the stream outside mu to avoid lock-order inversion. - if removed { - t.closeStream(nodeID) - t.clearNoStream(nodeID) + // Tear down stream state outside mu (never hold streamsMu and t.mu simultaneously). + for _, id := range closedNodeIDs { + t.closeStream(id) + t.clearNoStream(id) } } @@ -230,20 +229,32 @@ func (t *GRPCTransport) Close() error { return errors.WithStack(err) } -func (t *GRPCTransport) closePeerConnLocked(address string) { +// closePeerConnLocked closes the gRPC connection for address and returns the +// node IDs that were using it. Callers should clear stream/noStream state for +// those IDs after releasing t.mu, so the next dispatch re-probes streaming +// support (e.g. after a peer upgrade at the same address). +// Caller must hold t.mu. +func (t *GRPCTransport) closePeerConnLocked(address string) []uint64 { if address == "" { - return + return nil + } + var nodeIDs []uint64 + for id, peer := range t.peers { + if peer.Address == address { + nodeIDs = append(nodeIDs, id) + } } conn, ok := t.conns[address] if !ok { delete(t.clients, address) - return + return nodeIDs } delete(t.conns, address) delete(t.clients, address) if err := conn.Close(); err != nil { slog.Warn("failed to close etcd raft peer connection", "address", address, "error", err) } + return nodeIDs } func (t *GRPCTransport) Dispatch(ctx context.Context, msg raftpb.Message) error { @@ -364,6 +375,10 @@ func (t *GRPCTransport) applyBridgeMode(ctx context.Context, msg raftpb.Message) } func (t *GRPCTransport) dispatchRegular(ctx context.Context, msg raftpb.Message) error { + if err := ctx.Err(); err != nil { + return errors.WithStack(err) + } + raw, err := msg.Marshal() if err != nil { return errors.WithStack(err) @@ -437,6 +452,8 @@ func (t *GRPCTransport) Send(ctx context.Context, req *pb.EtcdRaftMessage) (*pb. // SendStream is the server-side handler for the client-streaming SendStream RPC. // The client sends a sequence of Raft messages over one long-lived stream; // this handler processes each one and closes with a single EtcdRaftAck. +// Transient backpressure (errStepQueueFull) is logged and skipped rather than +// tearing down the stream — identical to how the unary Send handler behaves. func (t *GRPCTransport) SendStream(stream pb.EtcdRaft_SendStreamServer) error { for { req, err := stream.Recv() @@ -451,6 +468,14 @@ func (t *GRPCTransport) SendStream(stream pb.EtcdRaft_SendStreamServer) error { return errors.WithStack(err) } if err := t.handle(stream.Context(), msg); err != nil { + if errors.Is(err, errStepQueueFull) { + slog.Warn("etcd raft SendStream: step queue full, dropping message", + "type", msg.Type.String(), + "from", msg.From, + "to", msg.To, + ) + continue + } return err } } @@ -782,10 +807,10 @@ func appendSnapshotChunk(metadata *raftpb.Message, payload io.Writer, chunk *pb. // calls clientFor (which may acquire t.mu) only after releasing streamsMu. func (t *GRPCTransport) getOrOpenStream(nodeID uint64) (pb.EtcdRaft_SendStreamClient, error) { // Fast path: stream already open or peer known to be unary-only. - t.streamsMu.Lock() + t.streamsMu.RLock() _, skip := t.noStream[nodeID] ps, ok := t.streams[nodeID] - t.streamsMu.Unlock() + t.streamsMu.RUnlock() if skip { return nil, errStreamNotSupported From 18a6dc2c76467dfa99c56a6f8848e2d052ecd696 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 18 Apr 2026 18:05:47 +0900 Subject: [PATCH 04/14] fix(raft): thread per-peer ctx into getOrOpenStream; clear noStream on UpsertPeer --- internal/raftengine/etcd/grpc_transport.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/internal/raftengine/etcd/grpc_transport.go b/internal/raftengine/etcd/grpc_transport.go index 35109ea9c..c58952cbd 100644 --- a/internal/raftengine/etcd/grpc_transport.go +++ b/internal/raftengine/etcd/grpc_transport.go @@ -188,6 +188,11 @@ func (t *GRPCTransport) UpsertPeer(peer Peer) { t.closeStream(id) t.clearNoStream(id) } + // Always clear noStream for the upserted peer so that a same-address + // upgrade (peer binary updated without changing address) re-probes + // streaming support on the next dispatch rather than staying stuck on + // the unary fallback indefinitely. + t.clearNoStream(peer.NodeID) } func (t *GRPCTransport) RemovePeer(nodeID uint64) { @@ -387,7 +392,7 @@ func (t *GRPCTransport) dispatchRegular(ctx context.Context, msg raftpb.Message) return errors.WithStack(err) } - stream, err := t.getOrOpenStream(msg.To) + stream, err := t.getOrOpenStream(ctx, msg.To) if err != nil { if errors.Is(err, errStreamNotSupported) { return t.dispatchUnary(ctx, raw, msg.To) @@ -806,9 +811,13 @@ func appendSnapshotChunk(metadata *raftpb.Message, payload io.Writer, chunk *pb. // necessary. Returns errStreamNotSupported if the peer previously returned // codes.Unimplemented; callers should fall back to the unary Send path. // +// ctx is the per-peer dispatch context: the stream's lifetime is derived from +// it so that a cancelled context (peer removed, engine shutdown) unblocks any +// pending stream.Send without waiting for a TCP timeout. +// // Lock ordering: this method acquires streamsMu without holding t.mu; it // calls clientFor (which may acquire t.mu) only after releasing streamsMu. -func (t *GRPCTransport) getOrOpenStream(nodeID uint64) (pb.EtcdRaft_SendStreamClient, error) { +func (t *GRPCTransport) getOrOpenStream(ctx context.Context, nodeID uint64) (pb.EtcdRaft_SendStreamClient, error) { // Fast path: stream already open or peer known to be unary-only. t.streamsMu.RLock() _, skip := t.noStream[nodeID] @@ -841,7 +850,7 @@ func (t *GRPCTransport) getOrOpenStream(nodeID uint64) (pb.EtcdRaft_SendStreamCl return ps.stream, nil } - streamCtx, cancel := context.WithCancel(context.Background()) + streamCtx, cancel := context.WithCancel(ctx) stream, err := client.SendStream(streamCtx) if err != nil { cancel() From fa1f633aace146783625a37fc0933b2b7d69455b Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 18 Apr 2026 18:13:24 +0900 Subject: [PATCH 05/14] test(raft): add unit tests for streaming dispatch and biased-select priority - TestDispatchRegularUsesStreamWhenSupported: verifies messages go through stream.Send (not unary) when the peer supports SendStream - TestDispatchRegularFallsBackToUnaryOnUnimplemented: verifies noStream is set and unary Send is used when peer returns Unimplemented on SendStream - TestDispatchRegularClosesStreamOnSendError: verifies the stream is evicted from the cache after stream.Send returns an error - TestRunMultiplexDispatchWorkerPrioritizesHeartbeats: verifies the biased- select drains all heartbeats before processing normal messages --- internal/raftengine/etcd/engine_test.go | 64 +++++++++++ .../raftengine/etcd/grpc_transport_test.go | 102 +++++++++++++++++- 2 files changed, 164 insertions(+), 2 deletions(-) diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index 780674c21..7c367330e 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1606,3 +1606,67 @@ func mustRawNode(t *testing.T, storage *etcdraft.MemoryStorage, nodeID uint64) * require.NoError(t, err) return rawNode } + +// TestRunMultiplexDispatchWorkerPrioritizesHeartbeats verifies the biased-select +// invariant: heartbeat-channel messages are always drained before normal messages +// when both channels have pending entries. +func TestRunMultiplexDispatchWorkerPrioritizesHeartbeats(t *testing.T) { + var ( + dispatched []raftpb.MessageType + mu sync.Mutex + ) + e := &Engine{ + dispatchFn: func(_ context.Context, req dispatchRequest) error { + mu.Lock() + dispatched = append(dispatched, req.msg.Type) + mu.Unlock() + return nil + }, + dispatchStopCh: make(chan struct{}), + } + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + pd := &peerQueues{ + normal: make(chan dispatchRequest, 10), + heartbeat: make(chan dispatchRequest, 10), + ctx: ctx, + cancel: cancel, + } + + // Pre-fill: 3 normal messages then 3 heartbeats so both channels are + // non-empty when the worker first runs. + for range 3 { + pd.normal <- prepareDispatchRequest(raftpb.Message{Type: raftpb.MsgApp}) + } + for range 3 { + pd.heartbeat <- prepareDispatchRequest(raftpb.Message{Type: raftpb.MsgHeartbeat}) + } + + e.dispatchWG.Add(1) + go e.runMultiplexDispatchWorker(ctx, pd) + + // Wait until all 6 messages are dispatched, then stop. + require.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return len(dispatched) == 6 + }, time.Second, time.Millisecond) + cancel() + e.dispatchWG.Wait() + + mu.Lock() + defer mu.Unlock() + // The biased select drains heartbeat before waiting on normal, so the first + // 3 dispatched messages must all be heartbeats. + require.Len(t, dispatched, 6) + for i := range 3 { + require.Equal(t, raftpb.MsgHeartbeat, dispatched[i], + "position %d should be MsgHeartbeat (heartbeat priority)", i) + } + for i := 3; i < 6; i++ { + require.Equal(t, raftpb.MsgApp, dispatched[i], + "position %d should be MsgApp (normal channel)", i) + } +} diff --git a/internal/raftengine/etcd/grpc_transport_test.go b/internal/raftengine/etcd/grpc_transport_test.go index bf2a4769a..cded5a0e1 100644 --- a/internal/raftengine/etcd/grpc_transport_test.go +++ b/internal/raftengine/etcd/grpc_transport_test.go @@ -352,14 +352,20 @@ func (*testSnapshotSendClient) RecvMsg(any) error { return nil } // testEtcdRaftClient is a minimal mock of pb.EtcdRaftClient that routes // SendSnapshot calls to a pre-wired testSnapshotSendClient. type testEtcdRaftClient struct { - stream *testSnapshotSendClient + stream *testSnapshotSendClient + sendStreamFn func(ctx context.Context, opts ...grpc.CallOption) (pb.EtcdRaft_SendStreamClient, error) + sendCallCount atomic.Int32 } func (c *testEtcdRaftClient) Send(_ context.Context, _ *pb.EtcdRaftMessage, _ ...grpc.CallOption) (*pb.EtcdRaftAck, error) { + c.sendCallCount.Add(1) return &pb.EtcdRaftAck{}, nil } -func (c *testEtcdRaftClient) SendStream(_ context.Context, _ ...grpc.CallOption) (pb.EtcdRaft_SendStreamClient, error) { +func (c *testEtcdRaftClient) SendStream(ctx context.Context, opts ...grpc.CallOption) (pb.EtcdRaft_SendStreamClient, error) { + if c.sendStreamFn != nil { + return c.sendStreamFn(ctx, opts...) + } return nil, status.Error(codes.Unimplemented, "SendStream not implemented in test mock") } @@ -367,6 +373,38 @@ func (c *testEtcdRaftClient) SendSnapshot(_ context.Context, _ ...grpc.CallOptio return c.stream, nil } +// testSendStreamClient is a mock of pb.EtcdRaft_SendStreamClient that records +// sent messages and optionally returns a configured error. +type testSendStreamClient struct { + mu sync.Mutex + sent []*pb.EtcdRaftMessage + sendErr error +} + +func (c *testSendStreamClient) Send(msg *pb.EtcdRaftMessage) error { + c.mu.Lock() + defer c.mu.Unlock() + if c.sendErr != nil { + return c.sendErr + } + c.sent = append(c.sent, msg) + return nil +} + +func (c *testSendStreamClient) SentCount() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.sent) +} + +func (c *testSendStreamClient) CloseAndRecv() (*pb.EtcdRaftAck, error) { return &pb.EtcdRaftAck{}, nil } +func (c *testSendStreamClient) Header() (metadata.MD, error) { return nil, nil } +func (c *testSendStreamClient) Trailer() metadata.MD { return nil } +func (c *testSendStreamClient) CloseSend() error { return nil } +func (c *testSendStreamClient) Context() context.Context { return context.Background() } +func (c *testSendStreamClient) SendMsg(any) error { return nil } +func (c *testSendStreamClient) RecvMsg(any) error { return nil } + // injectClient pre-populates the transport's client cache for the given peer // address so calls to clientFor return the mock without dialling. func injectClient(t *testing.T, transport *GRPCTransport, address string, client pb.EtcdRaftClient) { @@ -588,3 +626,63 @@ func TestDispatchSnapshotTokenNoOpenerFallsBackToBridge(t *testing.T) { } require.Equal(t, token, got) } + +// --- dispatchRegular streaming path tests --- + +func TestDispatchRegularUsesStreamWhenSupported(t *testing.T) { + streamClient := &testSendStreamClient{} + mock := &testEtcdRaftClient{ + sendStreamFn: func(_ context.Context, _ ...grpc.CallOption) (pb.EtcdRaft_SendStreamClient, error) { + return streamClient, nil + }, + } + transport := NewGRPCTransport([]Peer{{NodeID: 5, Address: "host:5"}}) + t.Cleanup(func() { _ = transport.Close() }) + injectClient(t, transport, "host:5", mock) + + msg := raftpb.Message{Type: raftpb.MsgApp, To: 5, From: 1} + require.NoError(t, transport.dispatchRegular(context.Background(), msg)) + + require.Equal(t, 1, streamClient.SentCount(), "message should be sent via stream.Send") + require.Equal(t, int32(0), mock.sendCallCount.Load(), "unary Send should not be called") +} + +func TestDispatchRegularFallsBackToUnaryOnUnimplemented(t *testing.T) { + mock := &testEtcdRaftClient{} // sendStreamFn nil → returns Unimplemented + transport := NewGRPCTransport([]Peer{{NodeID: 6, Address: "host:6"}}) + t.Cleanup(func() { _ = transport.Close() }) + injectClient(t, transport, "host:6", mock) + + msg := raftpb.Message{Type: raftpb.MsgApp, To: 6, From: 1} + require.NoError(t, transport.dispatchRegular(context.Background(), msg)) + + require.Equal(t, int32(1), mock.sendCallCount.Load(), "unary Send should be called as fallback") + + // noStream must be set so subsequent dispatches skip the probe. + transport.streamsMu.RLock() + _, skipped := transport.noStream[uint64(6)] + transport.streamsMu.RUnlock() + require.True(t, skipped, "noStream[6] should be set after Unimplemented") +} + +func TestDispatchRegularClosesStreamOnSendError(t *testing.T) { + streamClient := &testSendStreamClient{sendErr: errors.New("send failed")} + mock := &testEtcdRaftClient{ + sendStreamFn: func(_ context.Context, _ ...grpc.CallOption) (pb.EtcdRaft_SendStreamClient, error) { + return streamClient, nil + }, + } + transport := NewGRPCTransport([]Peer{{NodeID: 7, Address: "host:7"}}) + t.Cleanup(func() { _ = transport.Close() }) + injectClient(t, transport, "host:7", mock) + + msg := raftpb.Message{Type: raftpb.MsgApp, To: 7, From: 1} + err := transport.dispatchRegular(context.Background(), msg) + require.Error(t, err) + + // Stream must be removed from the cache after a Send error. + transport.streamsMu.RLock() + _, stillCached := transport.streams[uint64(7)] + transport.streamsMu.RUnlock() + require.False(t, stillCached, "stream should be evicted from cache after Send error") +} From 7a4f9d847b86d51e567307a3227298d62d409c3e Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 18 Apr 2026 18:19:06 +0900 Subject: [PATCH 06/14] perf(raft): open SendStream outside streamsMu to avoid blocking dispatch reads client.SendStream is a network operation (TLS handshake + connect) that can block for hundreds of milliseconds. Holding streamsMu.Lock during that call stalls all concurrent RLock readers on the dispatch hot path. Fix: release streamsMu before calling client.SendStream, then re-acquire to install the new stream. A final re-check after re-acquire handles the race where two goroutines both observe no stream and both connect; the loser cancels its stream and returns the winner's. --- internal/raftengine/etcd/grpc_transport.go | 27 ++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/internal/raftengine/etcd/grpc_transport.go b/internal/raftengine/etcd/grpc_transport.go index c58952cbd..fe6c153e8 100644 --- a/internal/raftengine/etcd/grpc_transport.go +++ b/internal/raftengine/etcd/grpc_transport.go @@ -838,28 +838,47 @@ func (t *GRPCTransport) getOrOpenStream(ctx context.Context, nodeID uint64) (pb. return nil, err } - // Re-acquire streamsMu to install the stream atomically. + // Double-check under write lock before making the network call. + // Another goroutine may have installed a stream while we dialled. t.streamsMu.Lock() - defer t.streamsMu.Unlock() - - // Re-check: another goroutine or closeStream may have raced. if _, skip = t.noStream[nodeID]; skip { + t.streamsMu.Unlock() return nil, errStreamNotSupported } if ps, ok = t.streams[nodeID]; ok { + t.streamsMu.Unlock() return ps.stream, nil } + t.streamsMu.Unlock() + // Open the stream outside any lock: SendStream is a network operation that + // can block during TLS handshake or connect, and holding streamsMu would + // stall all concurrent reads on the dispatch hot path. streamCtx, cancel := context.WithCancel(ctx) stream, err := client.SendStream(streamCtx) if err != nil { cancel() if status.Code(err) == codes.Unimplemented { + t.streamsMu.Lock() t.noStream[nodeID] = struct{}{} + t.streamsMu.Unlock() return nil, errStreamNotSupported } return nil, errors.WithStack(err) } + + // Install under the lock. Another goroutine may have raced and already + // opened a stream while we were connecting; prefer the existing one. + t.streamsMu.Lock() + defer t.streamsMu.Unlock() + if _, skip = t.noStream[nodeID]; skip { + cancel() + return nil, errStreamNotSupported + } + if ps, ok = t.streams[nodeID]; ok { + cancel() + return ps.stream, nil + } t.streams[nodeID] = &peerStream{stream: stream, cancel: cancel} return stream, nil } From 27f128df6d675a0ed05cc59b88844969334bd1d2 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 18 Apr 2026 18:25:21 +0900 Subject: [PATCH 07/14] fix(test): fix gci import ordering in grpc_transport_test.go --- internal/raftengine/etcd/grpc_transport_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/raftengine/etcd/grpc_transport_test.go b/internal/raftengine/etcd/grpc_transport_test.go index cded5a0e1..4fba714d4 100644 --- a/internal/raftengine/etcd/grpc_transport_test.go +++ b/internal/raftengine/etcd/grpc_transport_test.go @@ -352,9 +352,9 @@ func (*testSnapshotSendClient) RecvMsg(any) error { return nil } // testEtcdRaftClient is a minimal mock of pb.EtcdRaftClient that routes // SendSnapshot calls to a pre-wired testSnapshotSendClient. type testEtcdRaftClient struct { - stream *testSnapshotSendClient - sendStreamFn func(ctx context.Context, opts ...grpc.CallOption) (pb.EtcdRaft_SendStreamClient, error) - sendCallCount atomic.Int32 + stream *testSnapshotSendClient + sendStreamFn func(ctx context.Context, opts ...grpc.CallOption) (pb.EtcdRaft_SendStreamClient, error) + sendCallCount atomic.Int32 } func (c *testEtcdRaftClient) Send(_ context.Context, _ *pb.EtcdRaftMessage, _ ...grpc.CallOption) (*pb.EtcdRaftAck, error) { From 1e9f2f8ae48aa9a759599c96811158542f681856 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 18 Apr 2026 20:42:48 +0900 Subject: [PATCH 08/14] fix(raft): close gRPC conn outside t.mu and document channel drain on shutdown - closePeerConnLocked now returns the *grpc.ClientConn instead of calling conn.Close() internally; UpsertPeer and RemovePeer close the connection after releasing t.mu so a blocking conn.Close() cannot stall concurrent management operations holding the same lock - Add inline comment on runMultiplexDispatchWorker explaining why drainPendingRequests is deferred: ensures every buffered dispatchRequest is properly closed on worker exit regardless of the stop reason --- internal/raftengine/etcd/engine.go | 5 ++ internal/raftengine/etcd/grpc_transport.go | 53 +++++++++++++++------- 2 files changed, 41 insertions(+), 17 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 32a6eca2f..423cb9bc1 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -2062,6 +2062,11 @@ func (e *Engine) runDispatchWorker(ctx context.Context, ch chan dispatchRequest) // runDispatchWorker while preserving heartbeat priority. func (e *Engine) runMultiplexDispatchWorker(ctx context.Context, pd *peerQueues) { defer e.dispatchWG.Done() + // drainPendingRequests runs after the main loop exits regardless of the + // stop reason (peer removal, engine shutdown, channel close). It closes + // every dispatchRequest still sitting in either channel so that callers + // waiting on response futures are not left blocked, and so that + // dispatchRequest.Close() can release any associated resources. defer e.drainPendingRequests(pd) for { drained, stop := e.drainPriorityChannel(ctx, pd) diff --git a/internal/raftengine/etcd/grpc_transport.go b/internal/raftengine/etcd/grpc_transport.go index fe6c153e8..c3dffe450 100644 --- a/internal/raftengine/etcd/grpc_transport.go +++ b/internal/raftengine/etcd/grpc_transport.go @@ -176,13 +176,23 @@ func (t *GRPCTransport) UpsertPeer(peer Peer) { return } t.mu.Lock() - var closedNodeIDs []uint64 + var ( + closedNodeIDs []uint64 + oldConn *grpc.ClientConn + ) if existing, ok := t.peers[peer.NodeID]; ok && existing.Address != "" && existing.Address != peer.Address { - closedNodeIDs = t.closePeerConnLocked(existing.Address) + closedNodeIDs, oldConn = t.closePeerConnLocked(existing.Address) } t.peers[peer.NodeID] = peer t.mu.Unlock() + // Close the old connection outside mu: conn.Close is a network operation + // that can block; holding t.mu during it would stall concurrent callers. + if oldConn != nil { + if err := oldConn.Close(); err != nil { + slog.Warn("failed to close etcd raft peer connection", "address", peer.Address, "error", err) + } + } // Clear stream state outside mu (never hold streamsMu and t.mu simultaneously). for _, id := range closedNodeIDs { t.closeStream(id) @@ -200,16 +210,26 @@ func (t *GRPCTransport) RemovePeer(nodeID uint64) { return } t.mu.Lock() - var closedNodeIDs []uint64 + var ( + closedNodeIDs []uint64 + oldConn *grpc.ClientConn + ) if peer, ok := t.peers[nodeID]; ok { // Collect affected nodeIDs before deleting: closePeerConnLocked iterates // t.peers to find all nodes using the address, so nodeID must still be // present at this point or it won't be included in the cleanup list. - closedNodeIDs = t.closePeerConnLocked(peer.Address) + closedNodeIDs, oldConn = t.closePeerConnLocked(peer.Address) delete(t.peers, nodeID) } t.mu.Unlock() + // Close the old connection outside mu: conn.Close is a network operation + // that can block; holding t.mu during it would stall concurrent callers. + if oldConn != nil { + if err := oldConn.Close(); err != nil { + slog.Warn("failed to close etcd raft peer connection", "address", "", "error", err) + } + } // Tear down stream state outside mu (never hold streamsMu and t.mu simultaneously). for _, id := range closedNodeIDs { t.closeStream(id) @@ -237,32 +257,31 @@ func (t *GRPCTransport) Close() error { return errors.WithStack(err) } -// closePeerConnLocked closes the gRPC connection for address and returns the -// node IDs that were using it. Callers should clear stream/noStream state for -// those IDs after releasing t.mu, so the next dispatch re-probes streaming -// support (e.g. after a peer upgrade at the same address). +// closePeerConnLocked removes the gRPC connection for address from the +// transport maps and returns (nodeIDs using that address, the connection to +// close). The caller must call conn.Close() after releasing t.mu; doing so +// inside the lock would block management operations on a network operation. +// Callers should also clear stream/noStream state for the returned nodeIDs +// after releasing t.mu so the next dispatch re-probes streaming support. // Caller must hold t.mu. -func (t *GRPCTransport) closePeerConnLocked(address string) []uint64 { +func (t *GRPCTransport) closePeerConnLocked(address string) (nodeIDs []uint64, conn *grpc.ClientConn) { if address == "" { - return nil + return nil, nil } - var nodeIDs []uint64 for id, peer := range t.peers { if peer.Address == address { nodeIDs = append(nodeIDs, id) } } - conn, ok := t.conns[address] + var ok bool + conn, ok = t.conns[address] if !ok { delete(t.clients, address) - return nodeIDs + return nodeIDs, nil } delete(t.conns, address) delete(t.clients, address) - if err := conn.Close(); err != nil { - slog.Warn("failed to close etcd raft peer connection", "address", address, "error", err) - } - return nodeIDs + return nodeIDs, conn } func (t *GRPCTransport) Dispatch(ctx context.Context, msg raftpb.Message) error { From c8b65be2eb21cd883bc32c513acf0b48adbb38b9 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 18 Apr 2026 20:56:06 +0900 Subject: [PATCH 09/14] fix(raft/transport): address CodeRabbit/Copilot/Gemini review comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix slog.Warn in UpsertPeer to log old connection address (existing.Address) instead of the new peer address; fix RemovePeer to log the captured peer address instead of an empty string literal - Call ps.stream.CloseSend() before ps.cancel() in closeStream so the server receives io.EOF rather than a context-cancelled error - Add comment to drainPendingRequests explaining why the non-blocking select is safe: the engine event loop (sole producer) stops enqueuing before the multiplexing worker exits - Fix design doc §1 inconsistency: PR #522 introduced two workers per peer; the single multiplexing worker is introduced by this PR --- docs/design/raft-grpc-streaming-transport.md | 11 +++++++---- internal/raftengine/etcd/engine.go | 6 ++++++ internal/raftengine/etcd/grpc_transport.go | 11 +++++++++-- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/docs/design/raft-grpc-streaming-transport.md b/docs/design/raft-grpc-streaming-transport.md index bee58bf16..15bd9ba5b 100644 --- a/docs/design/raft-grpc-streaming-transport.md +++ b/docs/design/raft-grpc-streaming-transport.md @@ -21,10 +21,13 @@ messages per follower before receiving ACK. A single dispatch worker serialises those 256 sends: at 1 ms RTT, throughput is capped at ~1 000 msg/s per peer regardless of bandwidth. -PR #522 introduced per-peer dispatch channels with a single multiplexing dispatch -goroutine per peer (biased-select: heartbeat priority over normal messages) to -eliminate cross-peer head-of-line blocking. The remaining bottleneck is the -per-message RTT of unary gRPC. +PR #522 introduced per-peer dispatch channels with **two** dispatch goroutines +per peer (one for normal messages, one for heartbeats) to eliminate cross-peer +head-of-line blocking. This PR replaces those two workers with a **single +multiplexing dispatch goroutine** per peer (biased-select: heartbeat priority +over normal messages) — a prerequisite for the streaming transport, since gRPC +requires a single writer per stream. The remaining bottleneck addressed here is +the per-message RTT of unary gRPC. ### Goal diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 423cb9bc1..d37fb4b8b 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -2119,6 +2119,12 @@ func (e *Engine) waitForChannel(ctx context.Context, pd *peerQueues) (stop bool) // Called as a deferred step when the multiplexing worker exits so that any // buffered dispatchRequests are properly released regardless of why the // worker stopped. +// +// The non-blocking select (default branch) is safe here: the multiplexing +// worker is the only consumer of pd's channels, and its goroutine has already +// exited (or is about to via defer) before this runs. The engine's event loop +// — the only producer — stops enqueuing to these channels before it waits for +// the worker to finish, so no new items will appear after the drain starts. func (e *Engine) drainPendingRequests(pd *peerQueues) { drainCh := func(ch <-chan dispatchRequest) { for { diff --git a/internal/raftengine/etcd/grpc_transport.go b/internal/raftengine/etcd/grpc_transport.go index c3dffe450..5cb7b70b9 100644 --- a/internal/raftengine/etcd/grpc_transport.go +++ b/internal/raftengine/etcd/grpc_transport.go @@ -179,8 +179,10 @@ func (t *GRPCTransport) UpsertPeer(peer Peer) { var ( closedNodeIDs []uint64 oldConn *grpc.ClientConn + oldAddress string ) if existing, ok := t.peers[peer.NodeID]; ok && existing.Address != "" && existing.Address != peer.Address { + oldAddress = existing.Address closedNodeIDs, oldConn = t.closePeerConnLocked(existing.Address) } t.peers[peer.NodeID] = peer @@ -190,7 +192,7 @@ func (t *GRPCTransport) UpsertPeer(peer Peer) { // that can block; holding t.mu during it would stall concurrent callers. if oldConn != nil { if err := oldConn.Close(); err != nil { - slog.Warn("failed to close etcd raft peer connection", "address", peer.Address, "error", err) + slog.Warn("failed to close etcd raft peer connection", "address", oldAddress, "error", err) } } // Clear stream state outside mu (never hold streamsMu and t.mu simultaneously). @@ -213,8 +215,10 @@ func (t *GRPCTransport) RemovePeer(nodeID uint64) { var ( closedNodeIDs []uint64 oldConn *grpc.ClientConn + oldAddress string ) if peer, ok := t.peers[nodeID]; ok { + oldAddress = peer.Address // Collect affected nodeIDs before deleting: closePeerConnLocked iterates // t.peers to find all nodes using the address, so nodeID must still be // present at this point or it won't be included in the cleanup list. @@ -227,7 +231,7 @@ func (t *GRPCTransport) RemovePeer(nodeID uint64) { // that can block; holding t.mu during it would stall concurrent callers. if oldConn != nil { if err := oldConn.Close(); err != nil { - slog.Warn("failed to close etcd raft peer connection", "address", "", "error", err) + slog.Warn("failed to close etcd raft peer connection", "address", oldAddress, "error", err) } } // Tear down stream state outside mu (never hold streamsMu and t.mu simultaneously). @@ -912,6 +916,9 @@ func (t *GRPCTransport) closeStream(nodeID uint64) { } t.streamsMu.Unlock() if ok { + // CloseSend signals EOF to the server before cancelling the context so + // the server's Recv loop sees io.EOF rather than a context-cancelled error. + _ = ps.stream.CloseSend() ps.cancel() } } From 7fe0c60c59ba55ab0df5aed4e161ded1ab7e20ae Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 18 Apr 2026 21:03:27 +0900 Subject: [PATCH 10/14] fix(raft/transport): closeAllStreams CloseSend + stream.Send blocking comment - Call CloseSend before cancel in closeAllStreams for consistent EOF signalling to server (matching closeStream which was already fixed) - Add comment in dispatchRegular explaining why stream.Send does not need a per-send deadline: gRPC send buffer is non-blocking under normal load (Raft bounds inflight messages), stream ctx cancellation handles shutdown, and gRPC keepalive handles stalled TCP connections --- internal/raftengine/etcd/grpc_transport.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/internal/raftengine/etcd/grpc_transport.go b/internal/raftengine/etcd/grpc_transport.go index 5cb7b70b9..da50b041e 100644 --- a/internal/raftengine/etcd/grpc_transport.go +++ b/internal/raftengine/etcd/grpc_transport.go @@ -423,6 +423,13 @@ func (t *GRPCTransport) dispatchRegular(ctx context.Context, msg raftpb.Message) return err } + // stream.Send enqueues the message into gRPC's send buffer and returns + // immediately under normal conditions. It can block briefly under HTTP/2 + // flow control, but Raft bounds in-flight messages via MaxInflightMsg, so + // the send buffer will not saturate during steady-state operation. + // Stalled TCP connections are detected by gRPC keepalive and fail the + // stream; the stream context (derived from ctx) is also cancelled on engine + // shutdown, unblocking any in-progress Send. if err := stream.Send(&pb.EtcdRaftMessage{Message: raw}); err != nil { t.closeStream(msg.To) return errors.WithStack(err) @@ -941,6 +948,9 @@ func (t *GRPCTransport) closeAllStreams() { t.noStream = make(map[uint64]struct{}) t.streamsMu.Unlock() for _, ps := range old { + // CloseSend signals EOF to the server before cancelling the context so + // the server's Recv loop sees io.EOF rather than a context-cancelled error. + _ = ps.stream.CloseSend() ps.cancel() } } From 2034e4ce4659506c95b2de48780074f75906a204 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 18 Apr 2026 21:09:52 +0900 Subject: [PATCH 11/14] fix(raft/transport): log CloseSend errors instead of silently ignoring them --- internal/raftengine/etcd/grpc_transport.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/raftengine/etcd/grpc_transport.go b/internal/raftengine/etcd/grpc_transport.go index da50b041e..5faddd973 100644 --- a/internal/raftengine/etcd/grpc_transport.go +++ b/internal/raftengine/etcd/grpc_transport.go @@ -925,7 +925,9 @@ func (t *GRPCTransport) closeStream(nodeID uint64) { if ok { // CloseSend signals EOF to the server before cancelling the context so // the server's Recv loop sees io.EOF rather than a context-cancelled error. - _ = ps.stream.CloseSend() + if err := ps.stream.CloseSend(); err != nil { + slog.Warn("etcd raft: CloseSend on peer stream failed", "nodeID", nodeID, "error", err) + } ps.cancel() } } @@ -950,7 +952,9 @@ func (t *GRPCTransport) closeAllStreams() { for _, ps := range old { // CloseSend signals EOF to the server before cancelling the context so // the server's Recv loop sees io.EOF rather than a context-cancelled error. - _ = ps.stream.CloseSend() + if err := ps.stream.CloseSend(); err != nil { + slog.Warn("etcd raft: CloseSend on peer stream failed during shutdown", "error", err) + } ps.cancel() } } From 35022fd708f7daa5a1b509fd7dd2a3d15e9a22a9 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 18 Apr 2026 21:20:58 +0900 Subject: [PATCH 12/14] fix(raft/transport): log gRPC status code on SendStream Recv error --- internal/raftengine/etcd/grpc_transport.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/raftengine/etcd/grpc_transport.go b/internal/raftengine/etcd/grpc_transport.go index 5faddd973..e34be69f2 100644 --- a/internal/raftengine/etcd/grpc_transport.go +++ b/internal/raftengine/etcd/grpc_transport.go @@ -499,6 +499,10 @@ func (t *GRPCTransport) SendStream(stream pb.EtcdRaft_SendStreamServer) error { if errors.Is(err, io.EOF) { return errors.WithStack(stream.SendAndClose(&pb.EtcdRaftAck{})) } + slog.Warn("etcd raft SendStream: Recv error", + "code", status.Code(err).String(), + "error", err, + ) return errors.WithStack(err) } var msg raftpb.Message From e065c8d1cbdf61a51eb34ff8cf3a8c1e31585218 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 18 Apr 2026 21:27:01 +0900 Subject: [PATCH 13/14] docs(raft/transport): fix misleading comments surfaced by Copilot review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - peerStream: document single-writer requirement (stream.Send is not goroutine-safe; must be owned by exactly one goroutine — the per-peer multiplexing dispatch worker) - SendStream doc: replace "identical to unary" with accurate description: errStepQueueFull drops the message and keeps the stream alive; other errors close the stream and rely on Raft retransmission - dispatchRegular: correct keepalive claim — gRPC keepalive is not configured here; stalled TCP connections fall back to OS-level keepalive unless the caller adds grpc.WithKeepalive --- internal/raftengine/etcd/grpc_transport.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/internal/raftengine/etcd/grpc_transport.go b/internal/raftengine/etcd/grpc_transport.go index e34be69f2..32f15ea91 100644 --- a/internal/raftengine/etcd/grpc_transport.go +++ b/internal/raftengine/etcd/grpc_transport.go @@ -44,6 +44,11 @@ var ( // peerStream holds a long-lived client-streaming gRPC stream to one peer. // cancel tears down the stream context; the transport deletes the entry on // any Send error and re-opens on the next dispatch attempt. +// +// stream.Send is NOT goroutine-safe. This struct must be owned by exactly +// one goroutine at a time — in practice the per-peer multiplexing dispatch +// worker (runMultiplexDispatchWorker). Never call stream.Send from more than +// one goroutine concurrently. type peerStream struct { stream pb.EtcdRaft_SendStreamClient cancel context.CancelFunc @@ -427,9 +432,10 @@ func (t *GRPCTransport) dispatchRegular(ctx context.Context, msg raftpb.Message) // immediately under normal conditions. It can block briefly under HTTP/2 // flow control, but Raft bounds in-flight messages via MaxInflightMsg, so // the send buffer will not saturate during steady-state operation. - // Stalled TCP connections are detected by gRPC keepalive and fail the - // stream; the stream context (derived from ctx) is also cancelled on engine - // shutdown, unblocking any in-progress Send. + // The stream context is derived from ctx and is cancelled on engine + // shutdown, unblocking any in-progress Send. Note: gRPC keepalive is not + // configured here; stalled TCP connections rely on the OS-level TCP + // keepalive (typically ~2 h) unless the caller configures grpc.WithKeepalive. if err := stream.Send(&pb.EtcdRaftMessage{Message: raw}); err != nil { t.closeStream(msg.To) return errors.WithStack(err) @@ -490,8 +496,9 @@ func (t *GRPCTransport) Send(ctx context.Context, req *pb.EtcdRaftMessage) (*pb. // SendStream is the server-side handler for the client-streaming SendStream RPC. // The client sends a sequence of Raft messages over one long-lived stream; // this handler processes each one and closes with a single EtcdRaftAck. -// Transient backpressure (errStepQueueFull) is logged and skipped rather than -// tearing down the stream — identical to how the unary Send handler behaves. +// Transient backpressure (errStepQueueFull) is logged and the message is +// dropped so the stream stays alive; the sender's Raft layer retransmits. +// Other handler errors close the stream; Raft retransmits on reconnect. func (t *GRPCTransport) SendStream(stream pb.EtcdRaft_SendStreamServer) error { for { req, err := stream.Recv() From 66e7c038a0d1fe72e2d1e41b0540732e4bbde9c6 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 18 Apr 2026 21:32:16 +0900 Subject: [PATCH 14/14] feat(grpc): add keepalive to all gRPC connections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Without keepalive, stalled TCP connections (e.g. silently dropped by a NAT or load balancer) are not detected until the OS-level TCP keepalive fires — typically ~2 h. Long-lived SendStream RPCs introduced in this PR make this gap much more visible than the previous per-message unary model. Client: ping every 10 s while a stream is active; declare dead after 3 s Server: enforce MinTime=5 s so GOAWAY is not sent before the first ping Stalled connections are now detected within ~13 s (PingTime + PingTimeout) across all callers of GRPCDialOptions/GRPCServerOptions (Raft transport, KV client connection cache, Hashicorp Raft transport, demo server). --- internal/grpc.go | 37 ++++++++++++++++++++++ internal/raftengine/etcd/grpc_transport.go | 6 ++-- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/internal/grpc.go b/internal/grpc.go index 590926157..dbf608741 100644 --- a/internal/grpc.go +++ b/internal/grpc.go @@ -1,18 +1,46 @@ package internal import ( + "time" + "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" ) const GRPCMaxMessageBytes = 64 << 20 +// keepalive timing constants. +// - PingTime: interval between keepalive pings while a stream is active. +// - PingTimeout: how long to wait for a ping ACK before declaring the peer dead. +// - ServerMinPingTime: minimum interval the server accepts from clients; must +// be shorter than PingTime so the server never sends GOAWAY before the first ping. +// +// With these values a stalled TCP connection is detected within ~13 s +// (PingTime + PingTimeout) rather than the OS default of ~2 hours. +const ( + keepalivePingTime = 10 * time.Second + keepalivePingTimeout = 3 * time.Second + keepaliveServerMinTime = 5 * time.Second +) + // GRPCServerOptions keeps Raft replication and the public/internal APIs aligned // on the same message-size budget. func GRPCServerOptions() []grpc.ServerOption { return []grpc.ServerOption{ grpc.MaxRecvMsgSize(GRPCMaxMessageBytes), grpc.MaxSendMsgSize(GRPCMaxMessageBytes), + // Accept client keepalive pings no more frequently than keepaliveServerMinTime. + // Must be less than the client's PingTime so the server does not send + // GOAWAY before the client's first ping. + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: keepaliveServerMinTime, + PermitWithoutStream: false, + }), + grpc.KeepaliveParams(keepalive.ServerParameters{ + Time: keepalivePingTime, + Timeout: keepalivePingTimeout, + }), } } @@ -25,5 +53,14 @@ func GRPCDialOptions() []grpc.DialOption { grpc.MaxCallRecvMsgSize(GRPCMaxMessageBytes), grpc.MaxCallSendMsgSize(GRPCMaxMessageBytes), ), + // Send a keepalive ping every keepalivePingTime while a stream is active + // so that stalled TCP connections (e.g. silently dropped by a NAT or + // load balancer) are detected within ~13 s rather than the OS default + // of ~2 h. + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: keepalivePingTime, + Timeout: keepalivePingTimeout, + PermitWithoutStream: false, + }), } } diff --git a/internal/raftengine/etcd/grpc_transport.go b/internal/raftengine/etcd/grpc_transport.go index 32f15ea91..b574da9dd 100644 --- a/internal/raftengine/etcd/grpc_transport.go +++ b/internal/raftengine/etcd/grpc_transport.go @@ -433,9 +433,9 @@ func (t *GRPCTransport) dispatchRegular(ctx context.Context, msg raftpb.Message) // flow control, but Raft bounds in-flight messages via MaxInflightMsg, so // the send buffer will not saturate during steady-state operation. // The stream context is derived from ctx and is cancelled on engine - // shutdown, unblocking any in-progress Send. Note: gRPC keepalive is not - // configured here; stalled TCP connections rely on the OS-level TCP - // keepalive (typically ~2 h) unless the caller configures grpc.WithKeepalive. + // shutdown, unblocking any in-progress Send. Stalled TCP connections are + // detected within ~13 s by the gRPC keepalive configured in GRPCDialOptions + // (10 s ping interval + 3 s timeout). if err := stream.Send(&pb.EtcdRaftMessage{Message: raw}); err != nil { t.closeStream(msg.To) return errors.WithStack(err)