diff --git a/container/gqueue/gqueue.go b/container/gqueue/gqueue.go index d9455136909..57cbf064e39 100644 --- a/container/gqueue/gqueue.go +++ b/container/gqueue/gqueue.go @@ -17,20 +17,9 @@ // 4. Blocking when reading data from queue; package gqueue -import ( - "math" - - "github.com/gogf/gf/v2/container/glist" - "github.com/gogf/gf/v2/container/gtype" -) - // Queue is a concurrent-safe queue built on doubly linked list and channel. type Queue struct { - limit int // Limit for queue size. - list *glist.List // Underlying list structure for data maintaining. - closed *gtype.Bool // Whether queue is closed. - events chan struct{} // Events for data writing. - C chan any // Underlying channel for data reading. + *TQueue[any] } const ( @@ -42,74 +31,35 @@ const ( // Optional parameter `limit` is used to limit the size of the queue, which is unlimited in default. // When `limit` is given, the queue will be static and high performance which is comparable with stdlib channel. func New(limit ...int) *Queue { - q := &Queue{ - closed: gtype.NewBool(), - } - if len(limit) > 0 && limit[0] > 0 { - q.limit = limit[0] - q.C = make(chan any, limit[0]) - } else { - q.list = glist.New(true) - q.events = make(chan struct{}, math.MaxInt32) - q.C = make(chan any, defaultQueueSize) - go q.asyncLoopFromListToChannel() + return &Queue{ + TQueue: NewTQueue[any](limit...), } - return q } // Push pushes the data `v` into the queue. // Note that it would panic if Push is called after the queue is closed. func (q *Queue) Push(v any) { - if q.limit > 0 { - q.C <- v - } else { - q.list.PushBack(v) - if len(q.events) < defaultQueueSize { - q.events <- struct{}{} - } - } + q.TQueue.Push(v) } // Pop pops an item from the queue in FIFO way. // Note that it would return nil immediately if Pop is called after the queue is closed. func (q *Queue) Pop() any { - return <-q.C + return q.TQueue.Pop() } // Close closes the queue. // Notice: It would notify all goroutines return immediately, // which are being blocked reading using Pop method. func (q *Queue) Close() { - if !q.closed.Cas(false, true) { - return - } - if q.events != nil { - close(q.events) - } - if q.limit > 0 { - close(q.C) - } else { - for range defaultBatchSize { - q.Pop() - } - } + q.TQueue.Close() } // Len returns the length of the queue. // Note that the result might not be accurate if using unlimited queue size as there's an // asynchronous channel reading the list constantly. func (q *Queue) Len() (length int64) { - bufferedSize := int64(len(q.C)) - if q.limit > 0 { - return bufferedSize - } - // If the queue is unlimited and the buffered size is exactly the default size, - // it means there might be some data in the list not synchronized to channel yet. - // So we need to add 1 to the buffered size to make the result more accurate. - if bufferedSize == defaultQueueSize { - bufferedSize++ - } - return int64(q.list.Size()) + bufferedSize + return q.TQueue.Len() } // Size is alias of Len. @@ -118,34 +68,3 @@ func (q *Queue) Len() (length int64) { func (q *Queue) Size() int64 { return q.Len() } - -// asyncLoopFromListToChannel starts an asynchronous goroutine, -// which handles the data synchronization from list `q.list` to channel `q.C`. -func (q *Queue) asyncLoopFromListToChannel() { - defer func() { - if q.closed.Val() { - _ = recover() - } - }() - for !q.closed.Val() { - <-q.events - for !q.closed.Val() { - if bufferLength := q.list.Len(); bufferLength > 0 { - // When q.C is closed, it will panic here, especially q.C is being blocked for writing. - // If any error occurs here, it will be caught by recover and be ignored. - for range bufferLength { - q.C <- q.list.PopFront() - } - } else { - break - } - } - // Clear q.events to remain just one event to do the next synchronization check. - for i := 0; i < len(q.events)-1; i++ { - <-q.events - } - } - // It should be here to close `q.C` if `q` is unlimited size. - // It's the sender's responsibility to close channel when it should be closed. - close(q.C) -} diff --git a/container/gqueue/gqueue_t.go b/container/gqueue/gqueue_t.go new file mode 100644 index 00000000000..74e56e6b05f --- /dev/null +++ b/container/gqueue/gqueue_t.go @@ -0,0 +1,134 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. +package gqueue + +import ( + "math" + + "github.com/gogf/gf/v2/container/glist" + "github.com/gogf/gf/v2/container/gtype" +) + +// TQueue is a concurrent-safe queue built on doubly linked list and channel. +type TQueue[T any] struct { + limit int // Limit for queue size. + list *glist.TList[T] // Underlying list structure for data maintaining. + closed *gtype.Bool // Whether queue is closed. + events chan struct{} // Events for data writing. + C chan T // Underlying channel for data reading. +} + +// NewTQueue returns an empty queue object. +// Optional parameter `limit` is used to limit the size of the queue, which is unlimited in default. +// When `limit` is given, the queue will be static and high performance which is comparable with stdlib channel. +func NewTQueue[T any](limit ...int) *TQueue[T] { + q := &TQueue[T]{ + closed: gtype.NewBool(), + } + if len(limit) > 0 && limit[0] > 0 { + q.limit = limit[0] + q.C = make(chan T, limit[0]) + } else { + q.list = glist.NewT[T](true) + q.events = make(chan struct{}, math.MaxInt32) + q.C = make(chan T, defaultQueueSize) + go q.asyncLoopFromListToChannel() + } + return q +} + +// Push pushes the data `v` into the queue. +// Note that it would panic if Push is called after the queue is closed. +func (q *TQueue[T]) Push(v T) { + if q.limit > 0 { + q.C <- v + } else { + q.list.PushBack(v) + if len(q.events) < defaultQueueSize { + q.events <- struct{}{} + } + } +} + +// Pop pops an item from the queue in FIFO way. +// Note that it would return nil immediately if Pop is called after the queue is closed. +func (q *TQueue[T]) Pop() T { + return <-q.C +} + +// Close closes the queue. +// Notice: It would notify all goroutines return immediately, +// which are being blocked reading using Pop method. +func (q *TQueue[T]) Close() { + if !q.closed.Cas(false, true) { + return + } + if q.events != nil { + close(q.events) + } + if q.limit > 0 { + close(q.C) + } else { + for range defaultBatchSize { + q.Pop() + } + } +} + +// Len returns the length of the queue. +// Note that the result might not be accurate if using unlimited queue size as there's an +// asynchronous channel reading the list constantly. +func (q *TQueue[T]) Len() (length int64) { + bufferedSize := int64(len(q.C)) + if q.limit > 0 { + return bufferedSize + } + // If the queue is unlimited and the buffered size is exactly the default size, + // it means there might be some data in the list not synchronized to channel yet. + // So we need to add 1 to the buffered size to make the result more accurate. + if bufferedSize == defaultQueueSize { + bufferedSize++ + } + return int64(q.list.Size()) + bufferedSize +} + +// Size is alias of Len. +// +// Deprecated: use Len instead. +func (q *TQueue[T]) Size() int64 { + return q.Len() +} + +// asyncLoopFromListToChannel starts an asynchronous goroutine, +// which handles the data synchronization from list `q.list` to channel `q.C`. +func (q *TQueue[T]) asyncLoopFromListToChannel() { + defer func() { + if q.closed.Val() { + _ = recover() + } + }() + for !q.closed.Val() { + <-q.events + for !q.closed.Val() { + if bufferLength := q.list.Len(); bufferLength > 0 { + // When q.C is closed, it will panic here, especially q.C is being blocked for writing. + // If any error occurs here, it will be caught by recover and be ignored. + for range bufferLength { + q.C <- q.list.PopFront() + } + } else { + break + } + } + // Clear q.events to remain just one event to do the next synchronization check. + for i := 0; i < len(q.events)-1; i++ { + <-q.events + } + } + // It should be here to close `q.C` if `q` is unlimited size. + // It's the sender's responsibility to close channel when it should be closed. + close(q.C) +} diff --git a/container/gqueue/gqueue_z_unit_test.go b/container/gqueue/gqueue_z_unit_test.go index 88ed6f962ac..75fe87716e0 100644 --- a/container/gqueue/gqueue_z_unit_test.go +++ b/container/gqueue/gqueue_z_unit_test.go @@ -128,3 +128,218 @@ func TestIssue4376(t *testing.T) { t.Log(gq.Len(), len(cq)) }) } + +// Test static queue (with limit) close operation +func TestQueue_StaticClose(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + q := gqueue.New(10) + defer func() { + if err := recover(); err == nil { + t.Log("Close succeeded") + } + }() + q.Push(1) + q.Push(2) + q.Close() + // After closing, Pop should return nil + v := q.Pop() + t.Assert(v, nil) + }) +} + +// Test Size() method (deprecated alias of Len) +func TestQueue_Size(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + q := gqueue.New(20) + for i := range 10 { + q.Push(i) + } + t.Assert(q.Size(), 10) + t.Assert(q.Len(), 10) + q.Close() + }) + gtest.C(t, func(t *gtest.T) { + q := gqueue.New() + for i := range 15 { + q.Push(i) + } + time.Sleep(10 * time.Millisecond) + t.Assert(q.Size(), q.Len()) + q.Close() + }) +} + +// Test TQueue directly with generic type +func TestTQueue_Generic(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + // Test with custom type + q := gqueue.NewTQueue[string]() + defer q.Close() + q.Push("hello") + q.Push("world") + t.Assert(q.Pop(), "hello") + t.Assert(q.Pop(), "world") + }) +} + +// Test TQueue Size method directly +func TestTQueue_Size(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + q := gqueue.NewTQueue[int]() + defer q.Close() + for i := range 10 { + q.Push(i) + } + time.Sleep(10 * time.Millisecond) + // Size is an alias of Len for TQueue + t.Assert(q.Size(), q.Len()) + }) +} + +// Test TQueue with static limit +func TestTQueue_StaticLimit(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + q := gqueue.NewTQueue[int](5) + defer q.Close() + for i := range 5 { + q.Push(i) + } + t.Assert(q.Len(), 5) + for i := range 5 { + t.Assert(q.Pop(), i) + } + t.Assert(q.Len(), 0) + }) +} + +// Test queue with large data push/pop +func TestQueue_LargeDataScale(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + q := gqueue.New() + defer q.Close() + n := 5000 + for i := range n { + q.Push(i) + } + time.Sleep(50 * time.Millisecond) + // Pop should retrieve all items in order + for i := range n { + v := q.Pop() + t.Assert(v, i) + } + }) +} + +// Test double close (idempotent close) +func TestQueue_DoubleClose(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + q := gqueue.New() + q.Push(1) + q.Close() + // Second close should not panic + q.Close() + t.Assert(q.Pop(), nil) + }) + gtest.C(t, func(t *gtest.T) { + q := gqueue.New(10) + q.Push(1) + q.Close() + // Second close should not panic for static queue + q.Close() + // Pop from closed static queue returns the buffered value + v := q.Pop() + t.Assert(v, 1) + }) +} + +// Test concurrent push and pop +func TestQueue_ConcurrentPushPop(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + q := gqueue.New() + defer q.Close() + // Producer goroutine + go func() { + for i := range 100 { + q.Push(i) + } + time.Sleep(50 * time.Millisecond) + q.Close() + }() + // Consumer + count := 0 + for { + v := q.Pop() + if v == nil { + break + } + count++ + } + t.AssertGE(count, 1) + }) +} + +// Test Pop on empty queue returns nil when closed +func TestQueue_PopEmptyClosed(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + q := gqueue.New() + q.Close() + v := q.Pop() + t.Assert(v, nil) + }) + gtest.C(t, func(t *gtest.T) { + q := gqueue.New(10) + q.Close() + v := q.Pop() + t.Assert(v, nil) + }) +} + +// Test Len with dynamic queue at capacity boundary +func TestQueue_LenAtBoundary(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + q := gqueue.New() + defer q.Close() + // Push exactly defaultQueueSize items to test boundary condition + for i := range 10000 { + q.Push(i) + } + time.Sleep(50 * time.Millisecond) + len := q.Len() + t.AssertGE(len, 0) + }) +} + +// Test Close on dynamic queue with pending asyncLoopFromListToChannel +func TestQueue_CloseWithAsyncLoop(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + q := gqueue.New() + // Push some data to activate asyncLoopFromListToChannel + for i := range 100 { + q.Push(i) + } + // Immediately close + q.Close() + // Pop should return values until exhausted, then nil + for { + v := q.Pop() + if v == nil { + break + } + } + t.Assert(q.Pop(), nil) + }) +} + +// Test static queue edge case with zero limit (should create unlimited queue) +func TestQueue_ZeroLimitCreatesUnlimited(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + q := gqueue.New(0) + defer q.Close() + for i := range 100 { + q.Push(i) + } + time.Sleep(10 * time.Millisecond) + len := q.Len() + t.Assert(len, 100) + }) +}