-
Notifications
You must be signed in to change notification settings - Fork 242
/
process.go
90 lines (73 loc) · 2.16 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package goworker
import (
"fmt"
"math/rand"
"os"
"strings"
"time"
)
type process struct {
Hostname string
Pid int
ID string
Queues []string
}
func newProcess(id string, queues []string) (*process, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}
return &process{
Hostname: hostname,
Pid: os.Getpid(),
ID: id,
Queues: queues,
}, nil
}
func (p *process) String() string {
return fmt.Sprintf("%s:%d-%s:%s", p.Hostname, p.Pid, p.ID, strings.Join(p.Queues, ","))
}
func (p *process) open(conn *RedisConn) error {
conn.Send("SADD", fmt.Sprintf("%sworkers", workerSettings.Namespace), p)
conn.Send("SET", fmt.Sprintf("%sstat:processed:%v", workerSettings.Namespace, p), "0")
conn.Send("SET", fmt.Sprintf("%sstat:failed:%v", workerSettings.Namespace, p), "0")
conn.Flush()
return nil
}
func (p *process) close(conn *RedisConn) error {
logger.Infof("%v shutdown", p)
conn.Send("SREM", fmt.Sprintf("%sworkers", workerSettings.Namespace), p)
conn.Send("DEL", fmt.Sprintf("%sstat:processed:%s", workerSettings.Namespace, p))
conn.Send("DEL", fmt.Sprintf("%sstat:failed:%s", workerSettings.Namespace, p))
conn.Flush()
return nil
}
func (p *process) start(conn *RedisConn) error {
conn.Send("SET", fmt.Sprintf("%sworker:%s:started", workerSettings.Namespace, p), time.Now().String())
conn.Flush()
return nil
}
func (p *process) finish(conn *RedisConn) error {
conn.Send("DEL", fmt.Sprintf("%sworker:%s", workerSettings.Namespace, p))
conn.Send("DEL", fmt.Sprintf("%sworker:%s:started", workerSettings.Namespace, p))
conn.Flush()
return nil
}
func (p *process) fail(conn *RedisConn) error {
conn.Send("INCR", fmt.Sprintf("%sstat:failed", workerSettings.Namespace))
conn.Send("INCR", fmt.Sprintf("%sstat:failed:%s", workerSettings.Namespace, p))
conn.Flush()
return nil
}
func (p *process) queues(strict bool) []string {
// If the queues order is strict then just return them.
if strict {
return p.Queues
}
// If not then we want to to shuffle the queues before returning them.
queues := make([]string, len(p.Queues))
for i, v := range rand.Perm(len(p.Queues)) {
queues[i] = p.Queues[v]
}
return queues
}