-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathconn.go
More file actions
377 lines (322 loc) · 10.2 KB
/
conn.go
File metadata and controls
377 lines (322 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
// Package socket provides a simple TCP server framework for Go.
// It supports custom message encoding/decoding, asynchronous I/O operations,
// and connection management with idle timeout monitoring.
package socket
import (
"bufio"
"context"
"errors"
"io"
"net"
"sync/atomic"
"time"
"golang.org/x/sync/errgroup"
)
// Errors returned by connection operations.
var (
// ErrInvalidCodec is returned when no codec is provided.
ErrInvalidCodec = errors.New("invalid codec callback")
// ErrInvalidOnMessage is returned when no message handler is provided.
ErrInvalidOnMessage = errors.New("invalid on message callback")
// ErrMessageTooLarge is returned when a message exceeds the maximum allowed size.
ErrMessageTooLarge = errors.New("message too large")
)
// ErrConnectionClosed is returned when operating on a closed connection.
var ErrConnectionClosed = errors.New("connection closed")
// limitedReader wraps a reader and returns ErrMessageTooLarge when the limit is exceeded.
type limitedReader struct {
r io.Reader
remaining int64
}
func newLimitedReader(r io.Reader, limit int64) *limitedReader {
return &limitedReader{r: r, remaining: limit}
}
func (l *limitedReader) Read(p []byte) (n int, err error) {
if l.remaining <= 0 {
return 0, ErrMessageTooLarge
}
if int64(len(p)) > l.remaining {
p = p[:l.remaining]
}
n, err = l.r.Read(p)
l.remaining -= int64(n)
return
}
// reset resets the limit counter for reuse with a new message.
// Only remaining is reset because the underlying reader (bufio.Reader)
// maintains its own buffer state and continues reading from where it left off.
func (l *limitedReader) reset(limit int64) {
l.remaining = limit
}
// Conn represents a client connection to a TCP server.
// It manages the underlying TCP connection, message encoding/decoding,
// and provides read/write loops for asynchronous communication.
type Conn struct {
rawConn *net.TCPConn
reader *bufio.Reader
limitedReader *limitedReader
logger Logger
opts options
sendMsg chan []byte
closed atomic.Bool
cancel context.CancelFunc
}
// Default configuration values.
const (
// defaultBufferSize is the default size of the message channel buffer.
defaultBufferSize = 1
// defaultMaxPackageLength is the default maximum size of a single message (1MB).
defaultMaxPackageLength = 1024 * 1024
)
// NewConn creates a new connection wrapper around the given TCP connection.
// It applies the provided options and validates them before returning.
// Returns an error if required options (codec, onMessage) are missing.
func NewConn(conn *net.TCPConn, opt ...Option) (*Conn, error) {
var opts options
for _, o := range opt {
o(&opts)
}
err := checkOptions(&opts)
if err != nil {
return nil, err
}
return newClientConnWithOptions(conn, opts), nil
}
// checkOptions validates and sets default values for connection options.
func checkOptions(opts *options) error {
if opts.bufferSize <= 0 {
opts.bufferSize = defaultBufferSize
}
if opts.maxReadLength <= 0 {
opts.maxReadLength = defaultMaxPackageLength
}
if opts.onMessage == nil {
return ErrInvalidOnMessage
}
if opts.idleTimeout <= 0 {
opts.idleTimeout = time.Second * 30
}
if opts.codec == nil {
return ErrInvalidCodec
}
if opts.onError == nil {
opts.onError = func(err error) ErrorAction { return Disconnect }
}
if opts.logger == nil {
opts.logger = defaultLogger()
}
return nil
}
// newClientConnWithOptions creates a new Conn with the given options.
func newClientConnWithOptions(c *net.TCPConn, opts options) *Conn {
reader := bufio.NewReaderSize(c, opts.maxReadLength)
cc := &Conn{
rawConn: c,
reader: reader,
limitedReader: newLimitedReader(reader, int64(opts.maxReadLength)),
logger: opts.logger,
opts: opts,
sendMsg: make(chan []byte, opts.bufferSize),
}
return cc
}
// Run starts the connection's read and write loops.
// It creates two goroutines for concurrent reading and writing,
// and blocks until an error occurs or the context is canceled.
// The connection is automatically closed when Run returns.
func (c *Conn) Run(ctx context.Context) error {
c.logger.Info("connection established", "addr", c.Addr())
c.logger.Debug("connection options", "addr", c.Addr(),
"buffer_size", c.opts.bufferSize,
"max_read_length", c.opts.maxReadLength,
"idle_timeout", c.opts.idleTimeout)
ctx, c.cancel = context.WithCancel(ctx)
group, child := errgroup.WithContext(ctx)
group.Go(func() error {
return c.readLoop(child)
})
group.Go(func() error {
return c.writeLoop(child)
})
err := group.Wait()
c.closeConn()
if err != nil && !errors.Is(err, context.Canceled) {
c.logger.Info("connection closed with error", "addr", c.Addr(), "error", err)
} else {
c.logger.Info("connection closed", "addr", c.Addr())
}
return err
}
// Close gracefully closes the connection.
// It cancels the context and closes the underlying TCP connection.
// Safe to call multiple times.
func (c *Conn) Close() error {
if c.closed.Swap(true) {
return nil // already closed
}
if c.cancel != nil {
c.cancel()
}
return c.rawConn.Close()
}
// IsClosed returns true if the connection has been closed.
func (c *Conn) IsClosed() bool {
return c.closed.Load()
}
// ErrBufferFull is returned when the send buffer is full and cannot accept more messages.
// This error indicates backpressure - the receiver is not consuming messages fast enough.
// Recommended handling strategies:
// - Drop the message (for non-critical data like metrics)
// - Use WriteBlocking or WriteTimeout to wait for buffer space
// - Implement application-level flow control
var ErrBufferFull = errors.New("send buffer full")
// Write sends a message through the connection without blocking (fire-and-forget).
// The message is encoded using the configured codec and queued for sending.
//
// Returns:
// - nil: message was successfully queued (not yet sent)
// - ErrBufferFull: send buffer is full, message was NOT queued
// - ErrConnectionClosed: connection is closed
// - encoding error: if codec.Encode fails
//
// Use this method when:
// - You can tolerate message loss under backpressure
// - You have your own retry/backpressure logic
// - Low latency is critical and blocking is unacceptable
//
// For guaranteed delivery, use WriteBlocking or WriteTimeout instead.
func (c *Conn) Write(message Message) error {
if c.closed.Load() {
return ErrConnectionClosed
}
bytes, err := c.opts.codec.Encode(message)
if err != nil {
return err
}
select {
case c.sendMsg <- bytes:
return nil
default:
return ErrBufferFull
}
}
// WriteBlocking sends a message through the connection, blocking until the message
// is queued or the context is canceled. This is the safest write method for
// guaranteed delivery.
//
// Returns:
// - nil: message was successfully queued
// - context.Canceled or context.DeadlineExceeded: context was canceled
// - ErrConnectionClosed: connection is closed
// - encoding error: if codec.Encode fails
//
// Use this method when:
// - Message delivery is critical
// - You have proper timeout handling via context
// - Blocking is acceptable for your use case
func (c *Conn) WriteBlocking(ctx context.Context, message Message) error {
if c.closed.Load() {
return ErrConnectionClosed
}
bytes, err := c.opts.codec.Encode(message)
if err != nil {
return err
}
select {
case c.sendMsg <- bytes:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// WriteTimeout sends a message through the connection with a timeout.
// This provides a middle ground between Write (non-blocking) and WriteBlocking.
//
// Returns:
// - nil: message was successfully queued
// - ErrBufferFull: timeout expired before message could be queued
// - ErrConnectionClosed: connection is closed
// - encoding error: if codec.Encode fails
//
// Use this method when:
// - You want to wait for buffer space but with a time limit
// - You don't have an existing context to pass
func (c *Conn) WriteTimeout(message Message, timeout time.Duration) error {
if c.closed.Load() {
return ErrConnectionClosed
}
bytes, err := c.opts.codec.Encode(message)
if err != nil {
return err
}
select {
case c.sendMsg <- bytes:
return nil
case <-time.After(timeout):
return ErrBufferFull
}
}
// Addr returns the remote address of the connection.
func (c *Conn) Addr() net.Addr {
return c.rawConn.RemoteAddr()
}
// readLoop continuously reads from the connection and processes messages.
// It decodes incoming data using the configured codec and calls the message handler.
// Returns when the context is canceled or an unrecoverable error occurs.
// Messages exceeding maxReadLength will return ErrMessageTooLarge.
func (c *Conn) readLoop(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
_ = c.rawConn.SetReadDeadline(time.Now().Add(c.opts.idleTimeout * 2))
// Reset the limit for each message
c.limitedReader.reset(int64(c.opts.maxReadLength))
message, err := c.opts.codec.Decode(c.limitedReader)
if err != nil {
c.logger.Debug("read error", "addr", c.Addr(), "error", err)
if c.opts.onError(err) == Disconnect {
return err
}
continue
}
if err = c.opts.onMessage(message); err != nil {
return err
}
}
}
}
// writeLoop continuously sends messages from the send channel to the connection.
// Returns when the context is canceled or an unrecoverable error occurs.
func (c *Conn) writeLoop(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case data := <-c.sendMsg:
if err := c.write(data); err != nil {
return err
}
}
}
}
// write sends data to the connection with a deadline.
// If an error occurs and onError returns true, the error is propagated.
// Otherwise, the error is suppressed and writing continues.
func (c *Conn) write(data []byte) error {
_ = c.rawConn.SetWriteDeadline(time.Now().Add(c.opts.idleTimeout * 2))
_, err := c.rawConn.Write(data)
if err != nil {
c.logger.Debug("write error", "addr", c.Addr(), "error", err)
if c.opts.onError(err) == Disconnect {
return err
}
}
return nil
}
// closeConn marks the connection as closed and closes the underlying TCP connection.
func (c *Conn) closeConn() {
c.closed.Store(true)
c.rawConn.Close()
}