-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsingletonqueue.go
More file actions
156 lines (137 loc) · 4.43 KB
/
singletonqueue.go
File metadata and controls
156 lines (137 loc) · 4.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Package singletonqueue implements global singleton queues of tasks that can be accessed by their IDs.
// Package structure is loosely based on https://golang.org/pkg/container/heap/ .
package singletonqueue
import (
"github.com/garyburd/redigo/redis"
log "gopkg.in/Sirupsen/logrus.v0"
"gopkg.in/vmihailenco/msgpack.v2"
"os"
"os/signal"
"strings"
"syscall"
)
// RedisPool is the reference to the pool used to draw connections from. Set this before calling any functions in this package.
var RedisPool *redis.Pool
//Interface singleton queue interface
type Interface interface {
QueueID() string // Unique ID
Process(message Message) error // The function that will process messages
New(queueID string) Interface // When trying to reinitialize interface given a queueID
}
// Message is passed to Queue.WorkerFunc.
type Message struct {
Payload []byte
// TODO: put creation timestamp
}
// RedisQueueKey returns the redis key name of the list used for the queue.
func redisQueueKey(q Interface) string {
return "sq:q:" + q.QueueID()
}
// RedisLockKey returns the redis key name used to "lock" the worker instance such that only one will run at a time.
func redisLockKey(q Interface) string {
return "sq:lock:" + q.QueueID()
}
// Length returns the number of jobs currently in the queue.
func Length(q Interface) (int, error) {
conn := RedisPool.Get()
defer conn.Close()
return redis.Int(conn.Do("LLEN", redisQueueKey(q)))
}
// Push queues a new job and runs the worker in a separate goroutine when none is running.
func Push(q Interface, payload []byte) error {
message := Message{Payload: payload}
marshaled, err := msgpack.Marshal(message)
if err != nil {
return err
}
conn := RedisPool.Get()
defer conn.Close()
_, err = conn.Do("LPUSH", redisQueueKey(q), marshaled)
go EnsureWorkerIsRunning(q)
return err
}
// PushIfEmpty queues a new job only when the queue is empty. Useful for cached values that are refreshed only every so often - this will ensure that there is no dogpile effect (multiple attempts to refresh cache at the same time). Note that the job may still be invoked multiple times in a small amount of time (eg if the second push happens after the worker has done processing the first one), thus the task should be idempotent.
func PushIfEmpty(q Interface, payload []byte) error {
queueLength, err := Length(q)
if err != nil {
return err
}
if queueLength == 0 {
Push(q, payload)
} else {
EnsureWorkerIsRunning(q)
}
return nil
}
// EnsureWorkerIsRunning runs as a worker if none runs, but returns immediately if one already runs. It is recommended to run this in a goroutine.
func EnsureWorkerIsRunning(q Interface) {
logger := log.WithField("worker", q.QueueID())
ret, err := redis.String(safeDo("SET", redisLockKey(q), 1, "NX"))
if err != nil || ret != "OK" {
logger.Debug("Unable to acquire lock, another worker is possibly running")
return
}
logger.Info("Worker is starting")
defer logger.Info("Worker has terminated")
defer safeDo("DEL", redisLockKey(q))
quit := make(chan struct{})
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt)
signal.Notify(sigs, syscall.SIGTERM)
quitting := false
go func() {
for {
select {
case <-sigs:
quitting = true
return
case <-quit:
quitting = true
return
}
}
}()
for {
if quitting {
signal.Stop(sigs)
close(sigs)
return
}
ret, err := redis.Bytes(safeDo("RPOP", redisQueueKey(q)))
//log.Info("ret:", ret, " err:", err)
if err != nil && len(ret) == 0 {
logger.Debug("No more jobs in the queue, exiting.")
close(quit)
return // no more jobs in the queue
}
var message Message
err = msgpack.Unmarshal(ret, &message)
if err != nil {
logger.WithError(err).Error("Unmarshal failed, skipping this message")
// skip this message
continue
}
err = q.Process(message)
if err != nil {
logger.Error(err)
logger.WithError(err).WithField("message", message).Error("Worker returned error")
// TODO option to retry job if fail
}
}
}
func safeDo(commandName string, args ...interface{}) (interface{}, error) {
conn := RedisPool.Get()
defer conn.Close()
return conn.Do(commandName, args...)
}
func Respawn(prefix string, queue Interface) {
log.Info(prefix)
ret, _ := redis.Strings(safeDo("KEYS", "sq:q:"+prefix+"*"))
for _, key := range ret {
queueID := strings.TrimPrefix(key, "sq:q:")
newQueue := queue.New(queueID)
if newQueue != nil {
go EnsureWorkerIsRunning(newQueue)
}
}
}