-
Notifications
You must be signed in to change notification settings - Fork 85
/
tube.go
112 lines (102 loc) · 2.94 KB
/
tube.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package beanstalk
import (
"time"
)
// Tube represents tube Name on the server connected to by Conn.
// It has methods for commands that operate on a single tube.
type Tube struct {
Conn *Conn
Name string
}
// NewTube returns a new Tube representing the given name.
func NewTube(c *Conn, name string) *Tube {
return &Tube{c, name}
}
// Put puts a job into tube t with priority pri and TTR ttr, and returns
// the id of the newly-created job. If delay is nonzero, the server will
// wait the given amount of time after returning to the client and before
// putting the job into the ready queue.
func (t *Tube) Put(body []byte, pri uint32, delay, ttr time.Duration) (id uint64, err error) {
r, err := t.Conn.cmd(t, nil, body, "put", pri, dur(delay), dur(ttr))
if err != nil {
return 0, err
}
_, err = t.Conn.readResp(r, false, "INSERTED %d", &id)
if err != nil {
return 0, err
}
return id, nil
}
// PeekReady gets a copy of the job at the front of t's ready queue.
func (t *Tube) PeekReady() (id uint64, body []byte, err error) {
r, err := t.Conn.cmd(t, nil, nil, "peek-ready")
if err != nil {
return 0, nil, err
}
body, err = t.Conn.readResp(r, true, "FOUND %d", &id)
if err != nil {
return 0, nil, err
}
return id, body, nil
}
// PeekDelayed gets a copy of the delayed job that is next to be
// put in t's ready queue.
func (t *Tube) PeekDelayed() (id uint64, body []byte, err error) {
r, err := t.Conn.cmd(t, nil, nil, "peek-delayed")
if err != nil {
return 0, nil, err
}
body, err = t.Conn.readResp(r, true, "FOUND %d", &id)
if err != nil {
return 0, nil, err
}
return id, body, nil
}
// PeekBuried gets a copy of the job in the holding area that would
// be kicked next by Kick.
func (t *Tube) PeekBuried() (id uint64, body []byte, err error) {
r, err := t.Conn.cmd(t, nil, nil, "peek-buried")
if err != nil {
return 0, nil, err
}
body, err = t.Conn.readResp(r, true, "FOUND %d", &id)
if err != nil {
return 0, nil, err
}
return id, body, nil
}
// Kick takes up to bound jobs from the holding area and moves them into
// the ready queue, then returns the number of jobs moved. Jobs will be
// taken in the order in which they were last buried.
func (t *Tube) Kick(bound int) (n int, err error) {
r, err := t.Conn.cmd(t, nil, nil, "kick", bound)
if err != nil {
return 0, err
}
_, err = t.Conn.readResp(r, false, "KICKED %d", &n)
if err != nil {
return 0, err
}
return n, nil
}
// Stats retrieves statistics about tube t.
func (t *Tube) Stats() (map[string]string, error) {
r, err := t.Conn.cmd(nil, nil, nil, "stats-tube", t.Name)
if err != nil {
return nil, err
}
body, err := t.Conn.readResp(r, true, "OK")
return parseDict(body), err
}
// Pause pauses new reservations in t for time d.
func (t *Tube) Pause(d time.Duration) error {
r, err := t.Conn.cmd(nil, nil, nil, "pause-tube", t.Name, dur(d))
if err != nil {
return err
}
_, err = t.Conn.readResp(r, false, "PAUSED")
if err != nil {
return err
}
return nil
}