Skip to content

Commit 3484022

Browse files
gqcnDGuang21housemeFTLLOVEhailaz
authored
move go-redis implements Adapter from package gredis to contrib/nosql/redis; add redis string operation functions for package gredis (#2240)
* unify configuration pattern of for package gdb * version updates * improve implements `internal/rwmutex` and `internal/mutex`; add `TablesFields` cache implements in `gdb.Core` instead of `contrib/drivers`; add `ClearTableFields` and `ClearCache` functions for `gdb.Core` (#2128) * add ClearTableFiels/ClearCache for Core of package gdb * improve TableFields for contrib/drivers * fix UT case for contrib/drivers/clickhouse * remove unecessary attribute state for internal/rwmutex and internal/mutex * add ClearTableFieldsAll/ClearCacheAll for gdb.Core * improve clickhouse driver * improve clickhouse driver * fix ut * feat: improve import Co-authored-by: daguang <[email protected]> Co-authored-by: houseme <[email protected]> * refract builtin rules management mechanism, add `eq/not-eq/gt/gte/lt/lte/before/before-equal/after/after-equal/array/not-regex` rules for for package `gvalid` (#2133) * refract builtin rules management for package gvalid * refract builtin rules management for package gvalid * refract builtin rules management for package gvalid * add valiation rules and implements for package gvalid * UT cases update for package gvalid * improve error message of fields validation for package gvalid * up * add more validation rules for package gvalid * add validation rule foreach for package gvalid (#2136) * add ToSQL/CatchSQL funcions for package gdb (#2137) * add ToSQL/CatchSQL funcions for package gdb * Update gdb_core_underlying.go * fix ci Co-authored-by: houseme <[email protected]> * add redis interface for package gredis * up * remove `FilteredLink` function for DB and all driver implements and improve details for package gdb (#2142) * fix: pgsql DoExec Transaction checks (#2101) Co-authored-by: John Guo <[email protected]> * improve package gdb * up * up * up * up * up * add DriverWrapper and DriverWarapperDB for package gdb * add DriverWrapper and DriverWarapperDB for package gdb * up Co-authored-by: HaiLaz <[email protected]> * add new database driver `dm` * add drivers dm * upd go version * add gf ci yaml Co-authored-by: Xu <zhenghao.xu> * move go-redis implements from package gredis to contrib/nosql/redis; add redis string operation functions for package gredis * improve `contrib/drivers/dm` (#2144) * improve contrib/drivers/dm * format TODO list info * 1) add config.Name is required 2) The upper layer no longer needs to specify the schema 3) Adjust unit tests Co-authored-by: Xu <zhenghao.xu> Co-authored-by: houseme <[email protected]> * move redis adapter related ut case from package gcache/gsession to package contrib/nosql/redis * up * up * up * up * up * improve comment * add implements of `gcfg.Adapter` using kubernetes configmap (#2145) * remove Logger from kubecm.Client * README updates for package kubecm * error message update for package gredis * comment update for package gdb * Feature/v2.2.0 gredis (#2155) * improve package gredis (#2162) * improve package gredis * Update gredis_redis_group_list.go * fix * up Co-authored-by: houseme <[email protected]> * up * up * up * up * up * up * add func Test_GroupScript_Eval * ut cases for group string * UT cases update for group script * mv redis operation implements to contrib/nosql/redis from package gredis * test: add redis group list unit test (#2248) * test: add redis group list unit test * improve comment * test: fix redis group list unit test Co-authored-by: houseme <[email protected]> * up * add func Test_GroupGeneric_Copy, Test_GroupGeneric_Exists,Test_GroupGeneric_Type,Test_GroupGeneric_Unlink,Test_GroupGeneric_Rename,Test_GroupGeneric_Move,Test_GroupGeneric_Del * add Redis GroupGeneric UnitTest (#2253) add func Test_GroupGeneric_RandomKey,Test_GroupGeneric_DBSize,Test_GroupGeneric_Keys,Test_GroupGeneric_FlushDB,Test_GroupGeneric_FlushAll,Test_GroupGeneric_Expire,Test_GroupGeneric_ExpireAt * hash test case completed (#2260) Co-authored-by: junler <[email protected]> * add Redis GroupGeneric Unit Test part2 (#2258) * up * ci updates * ci updates * up * Feature/contrib redis fsprouts (#2274) * Feature/contrib redis starck (#2275) * up * up * fix `/*` router supported for handler of package ghttp; fix json tag name issue when it contains `,` for package goai; add proxy example for http server (#2294) * fix router supported for handler of package ghttp; fix json tag name issue when it contains for package goai * add proxy example for http server * fix: update szenius/[email protected] (#2293) * add Tag* functions to retreive most commonly used tag value from struct field for package gstructs; use description tag as default value if brief is empty for gcmd.Argument (#2299) * fix cache issue in Count/Value functions for gdb.Model (#2300) * add Tag* functions to retreive most commonly used tag value from struct field for package gstructs; use description tag as default value if brief is empty for gcmd.Argument * fix cache issue in Count/Value functions for gdb.Model * add more ut case for package gdb * version updates * add minus of `start` parameter support for `gstr.Substr`, like the `substr` function in `PHP` (#2297) * Make the substr like the substr in PHP Make the substr like the substr in PHP * Update gstr_z_unit_test.go * Update gstr_z_unit_test.go * Make the SubStrRune like the mb_substr in PHP Make the SubStrRune like the mb_substr in PHP * Update gstr_z_unit_test.go * Update gstr_z_unit_test.go * Update gins_z_unit_view_test.go * Update gview_z_unit_test.go * add ut cases for package gcode (#2307) * add ut cases for package gerror (#2304) * add ut cases for package gerror * add ut cases for package gerror * add ut cases for package gtime (#2303) * add ut cases for package gtime * add ut cases for package gtime * add ut cases for package gtime * add ut cases for package glog (#2302) * add ut cases for package glog * add ut cases for package glog * add ut cases for package glog * add ut cases for package glog * add ut cases for package glog * add ut cases for package glog * change result data type of function Count from int to int64 for package gdb (#2298) * feat: modify model count value int64 * fix * fix:modify int64 * fix * feat: cmd gf prebuild suport oracle (#2312) * add ut cases for package g (#2315) * add ut cases for package gdebug (#2313) * add ut cases for package gdebug * add ut cases for package gdebug * add ut cases for package gdebug Co-authored-by: houseme <[email protected]> * add zookeeper registry support (#2284) * add ut cases for package glog part2 (#2317) * fix invalid UpdatedAt usage in soft deleting feature for package gdb (#2323) * fix issue in failed installing when there's shortcut between file paths for command install (#2326) * fix issue in failed installing when has shortcut between file paths for command install * version updates * template for command gf updates * improve lru clearing for package gcache (#2327) * add ut cases for package ghttp_middleware and ghttp_request (#2344) * add ut cases for package ghttp_middleware * add ut cases for package ghttp_request * add ut cases for package ghttp_request * add ut cases for package ghttp_response (#2352) * add ut cases for package ghttp_response * add ut cases for package ghttp_response * add ut cases for package ghttp_response * add ut cases for package ghttp_request (#2351) * add ut cases for package ghttp_middleware * add ut cases for package ghttp_request * add ut cases for package ghttp_request * add ut cases for package ghttp_request * add ut cases for package ghttp_request - form * add ut cases for package ghttp_request - query * add ut cases for package ghttp_request - request * add ut cases for package ghttp_request - router * add ut cases for package gcache (#2341) * gTcp Example Function: 1.NewConn 2.NewConnTLS 3.NewConnKeyCrt * gTcp Example Function: 1.Send * add example function ExampleConn_Recv and ExampleConn_RecvWithTimeout * add example function 1. ExampleConn_SendWithTimeout 2. ExampleConn_RecvLine 3. ExampleConn_RecvTill * add example function 1. ExampleConn_SendRecv 2. ExampleConn_SendRecvWithTimeout 3. ExampleConn_SetDeadline 4. ExampleConn_SetReceiveBufferWait * add gtcp test function 1. Test_Package_Option_HeadSize4 2. Test_Package_Option_Error * add gtcp example function 1. ExampleGetFreePorts 2. ExampleSend 3. ExampleSendRecv 4. ExampleSendWithTimeout 5. ExampleSendRecvWithTimeout 6. ExampleMustGetFreePort * add gtcp example function 1. ExampleSendPkg 2. ExampleSendRecvPkg 3. ExampleSendPkgWithTimeout 4. ExampleSendRecvPkgWithTimeout * add gtcp test function 1. Test_Pool_Send 2. Test_Pool_Recv 3. Test_Pool_RecvLine 4. Test_Pool_RecvTill 5. Test_Pool_RecvWithTimeout 6. Test_Pool_SendWithTimeout 7. Test_Pool_SendRecvWithTimeout * fix * add gtcp example function 1. ExampleGetServer 2. ExampleSetAddress 3. ExampleSetHandler 4. ExampleRun_NilHandle * exec CI * exec CI * exec CI * modify test server address * modify and exec CI * modify and exec CI * modify and exec CI * modify and exec CI * modify and exec CI * modify and exec CI * add example funcion ExampleConn_Recv_Once and fix * fix * add some error case in example function * add some error case in example function * 1.add example function ExampleNewServerKeyCrt 2.add function SendRecvPkgWithTimeout unit test * add function Test_Server_NewServerKeyCrt unit test * revert * add function Test_Package_Timeout, Test_Package_Option_HeadSize3, Test_Conn_RecvPkgError unit test * fix * add example function 1.ExampleClient_Clone 2.ExampleLoadKeyCrt * add example function 1.ExampleNewNetConnKeyCrt * fix * add example function 1.ExampleClient_DeleteBytes 2.ExampleClient_HeadBytes 3.ExampleClient_PatchBytes 4.ExampleClient_ConnectBytes 5.ExampleClient_OptionsBytes 6.ExampleClient_TraceBytes 7.ExampleClient_PutBytes * add example function 1.ExampleClient_Prefix 2.ExampleClient_Retry 3.ExampleClient_RedirectLimit * add example function 1.ExampleClient_SetBrowserMode 2.ExampleClient_SetHeader 3.ExampleClient_SetRedirectLimit * add example function 1.ExampleClient_SetTLSKeyCrt 2.ExampleClient_SetTLSConfig modify example funcion 1.ExampleClient_SetProxy 2.ExampleClient_Proxy * add example function 1.ExampleClient_PutContent 2.ExampleClient_DeleteContent 3.ExampleClient_HeadContent 4.ExampleClient_PatchContent 5.ExampleClient_ConnectContent 6.ExampleClient_OptionsContent 7.ExampleClient_TraceContent 8.ExampleClient_RequestContent * add example function 1.ExampleClient_RawRequest * add unit function 1.TestGetFreePorts 2.TestNewConn 3.TestNewConnTLS 4.TestNewConnKeyCrt 5.TestConn_SendWithTimeout * add unit function 1.TestConn_Send 2.TestConn_SendRecv 3.TestConn_SendRecvWithTimeout * modify * modify * add example function 1.TestConn_SetReceiveBufferWait 2.TestNewNetConnKeyCrt 3.TestSend * add example function 1.TestSendRecv 2.TestSendWithTimeout * add unit function 1.TestMustGetFreePort 2.TestSendRecvWithTimeout 3.TestSendPkg * add client recevied server's response content assert * modify * modify * add example function 1.TestSendRecvPkg 2.TestSendPkgWithTimeout 3.TestSendRecvPkgWithTimeout * add GetAddress() function add unit funciton 1.TestNewServer 2.TestGetServer 3.TestServer_SetAddress 4.TestServer_SetHandler 5.TestServer_Run * modify * modify * add unit funciton 1.TestLoadKeyCrt * modify * delete function fromHex * add gclient dump unit test * add example function 1.ExampleClient_Put 2.ExampleClient_Delete 3.ExampleClient_Head 4.ExampleClient_Patch 5.ExampleClient_Connect 6.ExampleClient_Options 7.ExampleClient_Trace * add example function 1.TestClient_DoRequest * add example function 1.ExampleClient_PutVar 2.ExampleClient_DeleteVar 3.ExampleClient_HeadVar 4.ExampleClient_PatchVar 5.ExampleClient_ConnectVar 6.ExampleClient_OptionsVar 7.ExampleClient_TraceVar * modify * modify * add CustomProvider function * modify * add unit funciton 1.Test_NewConn 2.Test_GetFreePorts * add unit funciton 1.Test_Server * garray_normal_any code converage * garray_normal_int code converage * garray_normal_str code converage * garray_sorted_any code converage * garray_sorted_int code converage * garray_sorted_str code converage * glist code converage * gmap, gmap_hash_any_any_map code converage * gmap_hash_int_any_map code converage * gmap_hash_int_any_map code converage * gmap_hash_int_int_map code converage * gmap_hash_int_str_map code converage * gmap_hash_str_any_map code converage * gmap_hash_str_int_map code converage * gmap_hash_str_str_map code converage * gmap_list_map code converage * gmap_list_map code converage * revert gf.yml * add gtest unit test function * add ut cases for package gcache * add ut cases for package gcache * add ut cases for package gcache * add ut cases for package gcache * add ut cases for package gcache * modify Co-authored-by: John Guo <[email protected]> * improve ut case for package internal/rwmutex (#2364) * fix issue when only one file was uploaded in batch receiver attribute (#2365) * fix fixed An error occurred when only one file was uploaded in batches and add unit testing(#2092) * fix issue uploading files for ghttp.Server Co-authored-by: yxh <[email protected]> * fix issue #2334 when accessing static files with cache time (#2366) * Solve the problem of error when accessing static files with cache time. Error message: 2022-11-29 19:40:11.090 [ERRO] http: superfluous response.WriteHeader call from github.com/gogf/gf/v2/net/ghttp.(*ResponseWriter).Flush (ghttp_response_writer.go:58) Stack: Verification method: curl 'http://127.0.0.1:8000/' -H 'If-Modified-Since: Thu, 08 Dec 2022 03:13:55 GMT' --compressed * Solve the problem of error when accessing static files with cache time. Error message: 2022-11-29 19:40:11.090 [ERRO] http: superfluous response.WriteHeader call from github.com/gogf/gf/v2/net/ghttp.(*ResponseWriter).Flush (ghttp_response_writer.go:58) Stack: Verification method: curl 'http://127.0.0.1:8000/' -H 'If-Modified-Since: Thu, 08 Dec 2022 03:13:55 GMT' --compressed * Solve the problem of error when accessing static files with cache time. Error message: 2022-11-29 19:40:11.090 [ERRO] http: superfluous response.WriteHeader call from github.com/gogf/gf/v2/net/ghttp.(*ResponseWriter).Flush (ghttp_response_writer.go:58) Stack: Verification method: curl 'http://127.0.0.1:8000/' -H 'If-Modified-Since: Thu, 08 Dec 2022 03:13:55 GMT' --compressed * fix issue #2334 when accessing static files with cache time * up Co-authored-by: 曾洪亮 <[email protected]> Co-authored-by: houseme <[email protected]> * fix issue in cycle dumping for g.Dump (#2367) * fix issue in cycle dumping for g.Dump * up * up * up Co-authored-by: houseme <[email protected]> * 由于 clickhouse 的 position的初始值为 1,导致gdb_core_utility.HasField 中对 fieldsArray 初始化出错 (#2346) * 由于 clickhouse 的 position的初始值为 1,导致gdb_core_utility.HasField 中对 fieldsArray 初始化出错 * 修复单元测试 * 修复单元测试 * 补充单元测试 * 增加CK防御性代码 Co-authored-by: longl <[email protected]> Co-authored-by: houseme <[email protected]> * fix: ghttp server static path config (#2335) Co-authored-by: daguang <[email protected]> Co-authored-by: houseme <[email protected]> Co-authored-by: ftl <[email protected]> Co-authored-by: HaiLaz <[email protected]> Co-authored-by: zhonghuaxunGM <[email protected]> Co-authored-by: huangqian <[email protected]> Co-authored-by: junler <[email protected]> Co-authored-by: junler <[email protected]> Co-authored-by: Starccck <[email protected]> Co-authored-by: Jinhongyu <[email protected]> Co-authored-by: YuanXin Hu <[email protected]> Co-authored-by: yxh <[email protected]> Co-authored-by: 曾洪亮 <[email protected]> Co-authored-by: long <[email protected]> Co-authored-by: longl <[email protected]>
1 parent 1f4e9c8 commit 3484022

File tree

108 files changed

+8301
-2291
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

108 files changed

+8301
-2291
lines changed

.github/workflows/gf.yml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ jobs:
3737
services:
3838
# Redis backend server.
3939
redis:
40-
image : loads/redis:latest
40+
image : loads/redis:7.0
4141
options: >-
4242
--health-cmd "redis-cli ping"
4343
--health-interval 10s
@@ -77,7 +77,7 @@ jobs:
7777
7878
# MSSQL backend server.
7979
mssql:
80-
image: loads/mssqldocker:latest
80+
image: loads/mssqldocker:14.0.3391.2
8181
env:
8282
ACCEPT_EULA: Y
8383
SA_PASSWORD: LoremIpsum86
@@ -94,25 +94,25 @@ jobs:
9494
--health-retries 10
9595
9696
# ClickHouse backend server.
97-
# docker run -d --name clickhouse -p 9000:9000 -p 8123:8123 -p 9001:9001 loads/clickhouse-server:latest
97+
# docker run -d --name clickhouse -p 9000:9000 -p 8123:8123 -p 9001:9001 loads/clickhouse-server:22.1.3.7
9898
clickhouse-server:
99-
image: loads/clickhouse-server:latest
99+
image: loads/clickhouse-server:22.1.3.7
100100
ports:
101101
- 9000:9000
102102
- 8123:8123
103103
- 9001:9001
104104

105105
# Polaris backend server.
106106
polaris:
107-
image: loads/polaris-server-standalone:latest
107+
image: loads/polaris-server-standalone:1.11.2
108108
ports:
109109
- 8090:8090
110110
- 8091:8091
111111
- 8093:8093
112112

113113
# Oracle 11g server
114114
oracle-server:
115-
image: loads/oracle-xe-11g-r2:latest
115+
image: loads/oracle-xe-11g-r2:11.2.0
116116
env:
117117
ORACLE_ALLOW_REMOTE: true
118118
ORACLE_SID: XE

cmd/gf/go.mod

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,13 @@ require (
1515

1616
require (
1717
github.com/BurntSushi/toml v1.2.1 // indirect
18-
github.com/cespare/xxhash/v2 v2.1.2 // indirect
1918
github.com/clbanning/mxj/v2 v2.5.5 // indirect
2019
github.com/denisenkom/go-mssqldb v0.11.0 // indirect
21-
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
2220
github.com/fatih/color v1.13.0 // indirect
2321
github.com/fsnotify/fsnotify v1.5.4 // indirect
2422
github.com/glebarez/go-sqlite v1.17.3 // indirect
2523
github.com/go-logr/logr v1.2.3 // indirect
2624
github.com/go-logr/stdr v1.2.2 // indirect
27-
github.com/go-redis/redis/v8 v8.11.5 // indirect
2825
github.com/go-sql-driver/mysql v1.6.0 // indirect
2926
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect
3027
github.com/google/uuid v1.3.0 // indirect

cmd/gf/go.sum

Lines changed: 1 addition & 74 deletions
Large diffs are not rendered by default.

container/gpool/gpool.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ func (p *Pool) Put(value interface{}) error {
8484
return nil
8585
}
8686

87+
// MustPut puts an item to pool, it panics if any error occurs.
88+
func (p *Pool) MustPut(value interface{}) {
89+
if err := p.Put(value); err != nil {
90+
panic(err)
91+
}
92+
}
93+
8794
// Clear clears pool, which means it will remove all items from pool.
8895
func (p *Pool) Clear() {
8996
if p.ExpireFunc != nil {

container/gpool/gpool_z_example_test.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func ExamplePool_Put() {
6161
// conn.(*DBConn).Conn.QueryContext(context.Background(), "select * from user")
6262

6363
// put back conn
64-
dbConnPool.Put(conn)
64+
dbConnPool.MustPut(conn)
6565

6666
fmt.Println(conn.(*DBConn).Limit)
6767

@@ -88,8 +88,8 @@ func ExamplePool_Clear() {
8888
})
8989

9090
conn, _ := dbConnPool.Get()
91-
dbConnPool.Put(conn)
92-
dbConnPool.Put(conn)
91+
dbConnPool.MustPut(conn)
92+
dbConnPool.MustPut(conn)
9393
fmt.Println(dbConnPool.Size())
9494
dbConnPool.Clear()
9595
fmt.Println(dbConnPool.Size())
@@ -144,8 +144,8 @@ func ExamplePool_Size() {
144144

145145
conn, _ := dbConnPool.Get()
146146
fmt.Println(dbConnPool.Size())
147-
dbConnPool.Put(conn)
148-
dbConnPool.Put(conn)
147+
dbConnPool.MustPut(conn)
148+
dbConnPool.MustPut(conn)
149149
fmt.Println(dbConnPool.Size())
150150

151151
// Output:
@@ -158,21 +158,22 @@ func ExamplePool_Close() {
158158
Conn *sql.Conn
159159
Limit int
160160
}
161-
162-
dbConnPool := gpool.New(time.Hour,
163-
func() (interface{}, error) {
161+
var (
162+
newFunc = func() (interface{}, error) {
164163
dbConn := new(DBConn)
165164
dbConn.Limit = 10
166165
return dbConn, nil
167-
},
168-
func(i interface{}) {
166+
}
167+
closeFunc = func(i interface{}) {
169168
fmt.Println("Close The Pool")
170169
// sample : close db conn
171170
// i.(DBConn).Conn.Close()
172-
})
171+
}
172+
)
173+
dbConnPool := gpool.New(time.Hour, newFunc, closeFunc)
173174

174175
conn, _ := dbConnPool.Get()
175-
dbConnPool.Put(conn)
176+
dbConnPool.MustPut(conn)
176177

177178
dbConnPool.Close()
178179

container/gqueue/gqueue.go

Lines changed: 43 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
// Queue is a concurrent-safe queue built on doubly linked list and channel.
2828
type Queue struct {
2929
limit int // Limit for queue size.
30+
length *gtype.Int64 // Queue length.
3031
list *glist.List // Underlying list structure for data maintaining.
3132
closed *gtype.Bool // Whether queue is closed.
3233
events chan struct{} // Events for data writing.
@@ -44,6 +45,7 @@ const (
4445
func New(limit ...int) *Queue {
4546
q := &Queue{
4647
closed: gtype.NewBool(),
48+
length: gtype.NewInt64(),
4749
}
4850
if len(limit) > 0 && limit[0] > 0 {
4951
q.limit = limit[0]
@@ -57,43 +59,10 @@ func New(limit ...int) *Queue {
5759
return q
5860
}
5961

60-
// asyncLoopFromListToChannel starts an asynchronous goroutine,
61-
// which handles the data synchronization from list `q.list` to channel `q.C`.
62-
func (q *Queue) asyncLoopFromListToChannel() {
63-
defer func() {
64-
if q.closed.Val() {
65-
_ = recover()
66-
}
67-
}()
68-
for !q.closed.Val() {
69-
<-q.events
70-
for !q.closed.Val() {
71-
if length := q.list.Len(); length > 0 {
72-
if length > defaultBatchSize {
73-
length = defaultBatchSize
74-
}
75-
for _, v := range q.list.PopFronts(length) {
76-
// When q.C is closed, it will panic here, especially q.C is being blocked for writing.
77-
// If any error occurs here, it will be caught by recover and be ignored.
78-
q.C <- v
79-
}
80-
} else {
81-
break
82-
}
83-
}
84-
// Clear q.events to remain just one event to do the next synchronization check.
85-
for i := 0; i < len(q.events)-1; i++ {
86-
<-q.events
87-
}
88-
}
89-
// It should be here to close `q.C` if `q` is unlimited size.
90-
// It's the sender's responsibility to close channel when it should be closed.
91-
close(q.C)
92-
}
93-
9462
// Push pushes the data `v` into the queue.
9563
// Note that it would panic if Push is called after the queue is closed.
9664
func (q *Queue) Push(v interface{}) {
65+
q.length.Add(1)
9766
if q.limit > 0 {
9867
q.C <- v
9968
} else {
@@ -107,7 +76,9 @@ func (q *Queue) Push(v interface{}) {
10776
// Pop pops an item from the queue in FIFO way.
10877
// Note that it would return nil immediately if Pop is called after the queue is closed.
10978
func (q *Queue) Pop() interface{} {
110-
return <-q.C
79+
item := <-q.C
80+
q.length.Add(-1)
81+
return item
11182
}
11283

11384
// Close closes the queue.
@@ -130,15 +101,45 @@ func (q *Queue) Close() {
130101
// Len returns the length of the queue.
131102
// Note that the result might not be accurate as there's an
132103
// asynchronous channel reading the list constantly.
133-
func (q *Queue) Len() (length int) {
134-
if q.list != nil {
135-
length += q.list.Len()
136-
}
137-
length += len(q.C)
138-
return
104+
func (q *Queue) Len() (length int64) {
105+
return q.length.Val()
139106
}
140107

141108
// Size is alias of Len.
142-
func (q *Queue) Size() int {
109+
func (q *Queue) Size() int64 {
143110
return q.Len()
144111
}
112+
113+
// asyncLoopFromListToChannel starts an asynchronous goroutine,
114+
// which handles the data synchronization from list `q.list` to channel `q.C`.
115+
func (q *Queue) asyncLoopFromListToChannel() {
116+
defer func() {
117+
if q.closed.Val() {
118+
_ = recover()
119+
}
120+
}()
121+
for !q.closed.Val() {
122+
<-q.events
123+
for !q.closed.Val() {
124+
if length := q.list.Len(); length > 0 {
125+
if length > defaultBatchSize {
126+
length = defaultBatchSize
127+
}
128+
for _, v := range q.list.PopFronts(length) {
129+
// When q.C is closed, it will panic here, especially q.C is being blocked for writing.
130+
// If any error occurs here, it will be caught by recover and be ignored.
131+
q.C <- v
132+
}
133+
} else {
134+
break
135+
}
136+
}
137+
// Clear q.events to remain just one event to do the next synchronization check.
138+
for i := 0; i < len(q.events)-1; i++ {
139+
<-q.events
140+
}
141+
}
142+
// It should be here to close `q.C` if `q` is unlimited size.
143+
// It's the sender's responsibility to close channel when it should be closed.
144+
close(q.C)
145+
}

0 commit comments

Comments
 (0)