Skip to content

Commit 8575f01

Browse files
joy999Copilothailaz
authored
feat(container/gqueue): add generic queuefeature (#4497)
add TQueue --------- Co-authored-by: Copilot <[email protected]> Co-authored-by: hailaz <[email protected]>
1 parent ac75026 commit 8575f01

File tree

3 files changed

+356
-88
lines changed

3 files changed

+356
-88
lines changed

container/gqueue/gqueue.go

Lines changed: 7 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,9 @@
1717
// 4. Blocking when reading data from queue;
1818
package gqueue
1919

20-
import (
21-
"math"
22-
23-
"github.com/gogf/gf/v2/container/glist"
24-
"github.com/gogf/gf/v2/container/gtype"
25-
)
26-
2720
// Queue is a concurrent-safe queue built on doubly linked list and channel.
2821
type Queue struct {
29-
limit int // Limit for queue size.
30-
list *glist.List // Underlying list structure for data maintaining.
31-
closed *gtype.Bool // Whether queue is closed.
32-
events chan struct{} // Events for data writing.
33-
C chan any // Underlying channel for data reading.
22+
*TQueue[any]
3423
}
3524

3625
const (
@@ -42,74 +31,35 @@ const (
4231
// Optional parameter `limit` is used to limit the size of the queue, which is unlimited in default.
4332
// When `limit` is given, the queue will be static and high performance which is comparable with stdlib channel.
4433
func New(limit ...int) *Queue {
45-
q := &Queue{
46-
closed: gtype.NewBool(),
47-
}
48-
if len(limit) > 0 && limit[0] > 0 {
49-
q.limit = limit[0]
50-
q.C = make(chan any, limit[0])
51-
} else {
52-
q.list = glist.New(true)
53-
q.events = make(chan struct{}, math.MaxInt32)
54-
q.C = make(chan any, defaultQueueSize)
55-
go q.asyncLoopFromListToChannel()
34+
return &Queue{
35+
TQueue: NewTQueue[any](limit...),
5636
}
57-
return q
5837
}
5938

6039
// Push pushes the data `v` into the queue.
6140
// Note that it would panic if Push is called after the queue is closed.
6241
func (q *Queue) Push(v any) {
63-
if q.limit > 0 {
64-
q.C <- v
65-
} else {
66-
q.list.PushBack(v)
67-
if len(q.events) < defaultQueueSize {
68-
q.events <- struct{}{}
69-
}
70-
}
42+
q.TQueue.Push(v)
7143
}
7244

7345
// Pop pops an item from the queue in FIFO way.
7446
// Note that it would return nil immediately if Pop is called after the queue is closed.
7547
func (q *Queue) Pop() any {
76-
return <-q.C
48+
return q.TQueue.Pop()
7749
}
7850

7951
// Close closes the queue.
8052
// Notice: It would notify all goroutines return immediately,
8153
// which are being blocked reading using Pop method.
8254
func (q *Queue) Close() {
83-
if !q.closed.Cas(false, true) {
84-
return
85-
}
86-
if q.events != nil {
87-
close(q.events)
88-
}
89-
if q.limit > 0 {
90-
close(q.C)
91-
} else {
92-
for range defaultBatchSize {
93-
q.Pop()
94-
}
95-
}
55+
q.TQueue.Close()
9656
}
9757

9858
// Len returns the length of the queue.
9959
// Note that the result might not be accurate if using unlimited queue size as there's an
10060
// asynchronous channel reading the list constantly.
10161
func (q *Queue) Len() (length int64) {
102-
bufferedSize := int64(len(q.C))
103-
if q.limit > 0 {
104-
return bufferedSize
105-
}
106-
// If the queue is unlimited and the buffered size is exactly the default size,
107-
// it means there might be some data in the list not synchronized to channel yet.
108-
// So we need to add 1 to the buffered size to make the result more accurate.
109-
if bufferedSize == defaultQueueSize {
110-
bufferedSize++
111-
}
112-
return int64(q.list.Size()) + bufferedSize
62+
return q.TQueue.Len()
11363
}
11464

11565
// Size is alias of Len.
@@ -118,34 +68,3 @@ func (q *Queue) Len() (length int64) {
11868
func (q *Queue) Size() int64 {
11969
return q.Len()
12070
}
121-
122-
// asyncLoopFromListToChannel starts an asynchronous goroutine,
123-
// which handles the data synchronization from list `q.list` to channel `q.C`.
124-
func (q *Queue) asyncLoopFromListToChannel() {
125-
defer func() {
126-
if q.closed.Val() {
127-
_ = recover()
128-
}
129-
}()
130-
for !q.closed.Val() {
131-
<-q.events
132-
for !q.closed.Val() {
133-
if bufferLength := q.list.Len(); bufferLength > 0 {
134-
// When q.C is closed, it will panic here, especially q.C is being blocked for writing.
135-
// If any error occurs here, it will be caught by recover and be ignored.
136-
for range bufferLength {
137-
q.C <- q.list.PopFront()
138-
}
139-
} else {
140-
break
141-
}
142-
}
143-
// Clear q.events to remain just one event to do the next synchronization check.
144-
for i := 0; i < len(q.events)-1; i++ {
145-
<-q.events
146-
}
147-
}
148-
// It should be here to close `q.C` if `q` is unlimited size.
149-
// It's the sender's responsibility to close channel when it should be closed.
150-
close(q.C)
151-
}

container/gqueue/gqueue_t.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
2+
//
3+
// This Source Code Form is subject to the terms of the MIT License.
4+
// If a copy of the MIT was not distributed with this file,
5+
// You can obtain one at https://github.com/gogf/gf.
6+
package gqueue
7+
8+
import (
9+
"math"
10+
11+
"github.com/gogf/gf/v2/container/glist"
12+
"github.com/gogf/gf/v2/container/gtype"
13+
)
14+
15+
// TQueue is a concurrent-safe queue built on doubly linked list and channel.
16+
type TQueue[T any] struct {
17+
limit int // Limit for queue size.
18+
list *glist.TList[T] // Underlying list structure for data maintaining.
19+
closed *gtype.Bool // Whether queue is closed.
20+
events chan struct{} // Events for data writing.
21+
C chan T // Underlying channel for data reading.
22+
}
23+
24+
// NewTQueue returns an empty queue object.
25+
// Optional parameter `limit` is used to limit the size of the queue, which is unlimited in default.
26+
// When `limit` is given, the queue will be static and high performance which is comparable with stdlib channel.
27+
func NewTQueue[T any](limit ...int) *TQueue[T] {
28+
q := &TQueue[T]{
29+
closed: gtype.NewBool(),
30+
}
31+
if len(limit) > 0 && limit[0] > 0 {
32+
q.limit = limit[0]
33+
q.C = make(chan T, limit[0])
34+
} else {
35+
q.list = glist.NewT[T](true)
36+
q.events = make(chan struct{}, math.MaxInt32)
37+
q.C = make(chan T, defaultQueueSize)
38+
go q.asyncLoopFromListToChannel()
39+
}
40+
return q
41+
}
42+
43+
// Push pushes the data `v` into the queue.
44+
// Note that it would panic if Push is called after the queue is closed.
45+
func (q *TQueue[T]) Push(v T) {
46+
if q.limit > 0 {
47+
q.C <- v
48+
} else {
49+
q.list.PushBack(v)
50+
if len(q.events) < defaultQueueSize {
51+
q.events <- struct{}{}
52+
}
53+
}
54+
}
55+
56+
// Pop pops an item from the queue in FIFO way.
57+
// Note that it would return nil immediately if Pop is called after the queue is closed.
58+
func (q *TQueue[T]) Pop() T {
59+
return <-q.C
60+
}
61+
62+
// Close closes the queue.
63+
// Notice: It would notify all goroutines return immediately,
64+
// which are being blocked reading using Pop method.
65+
func (q *TQueue[T]) Close() {
66+
if !q.closed.Cas(false, true) {
67+
return
68+
}
69+
if q.events != nil {
70+
close(q.events)
71+
}
72+
if q.limit > 0 {
73+
close(q.C)
74+
} else {
75+
for range defaultBatchSize {
76+
q.Pop()
77+
}
78+
}
79+
}
80+
81+
// Len returns the length of the queue.
82+
// Note that the result might not be accurate if using unlimited queue size as there's an
83+
// asynchronous channel reading the list constantly.
84+
func (q *TQueue[T]) Len() (length int64) {
85+
bufferedSize := int64(len(q.C))
86+
if q.limit > 0 {
87+
return bufferedSize
88+
}
89+
// If the queue is unlimited and the buffered size is exactly the default size,
90+
// it means there might be some data in the list not synchronized to channel yet.
91+
// So we need to add 1 to the buffered size to make the result more accurate.
92+
if bufferedSize == defaultQueueSize {
93+
bufferedSize++
94+
}
95+
return int64(q.list.Size()) + bufferedSize
96+
}
97+
98+
// Size is alias of Len.
99+
//
100+
// Deprecated: use Len instead.
101+
func (q *TQueue[T]) Size() int64 {
102+
return q.Len()
103+
}
104+
105+
// asyncLoopFromListToChannel starts an asynchronous goroutine,
106+
// which handles the data synchronization from list `q.list` to channel `q.C`.
107+
func (q *TQueue[T]) asyncLoopFromListToChannel() {
108+
defer func() {
109+
if q.closed.Val() {
110+
_ = recover()
111+
}
112+
}()
113+
for !q.closed.Val() {
114+
<-q.events
115+
for !q.closed.Val() {
116+
if bufferLength := q.list.Len(); bufferLength > 0 {
117+
// When q.C is closed, it will panic here, especially q.C is being blocked for writing.
118+
// If any error occurs here, it will be caught by recover and be ignored.
119+
for range bufferLength {
120+
q.C <- q.list.PopFront()
121+
}
122+
} else {
123+
break
124+
}
125+
}
126+
// Clear q.events to remain just one event to do the next synchronization check.
127+
for i := 0; i < len(q.events)-1; i++ {
128+
<-q.events
129+
}
130+
}
131+
// It should be here to close `q.C` if `q` is unlimited size.
132+
// It's the sender's responsibility to close channel when it should be closed.
133+
close(q.C)
134+
}

0 commit comments

Comments
 (0)