-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
152 lines (139 loc) · 3.96 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package main
import (
"fmt"
"github.com/inSituo/LeveledLogger"
zmq "github.com/pebbe/zmq4"
"os"
"sync"
"time"
)
const (
POLL_TIMEOUT = 1 * time.Millisecond
THREADS_SLEEP = 5 * time.Millisecond
)
type Server struct {
port int
log *LeveledLogger.Logger
ll_level int
dbconf *MongoConf
wn int
wbuff int
}
func NewServer(port int, wn int, wbuff int, dbconf *MongoConf, ll_level int) *Server {
return &Server{
port: port,
log: LeveledLogger.New(os.Stdout, ll_level),
dbconf: dbconf,
wn: wn,
wbuff: wbuff,
ll_level: ll_level,
}
}
func (s *Server) Run() error {
iname := "Server.Run"
addr := fmt.Sprintf("tcp://*:%d", s.port)
s.log.Info(
iname,
"connecting to MongoDB",
*s.dbconf.Host,
*s.dbconf.Port,
*s.dbconf.DB,
)
db, err := NewDB(s.dbconf)
if err != nil {
return err
}
defer db.Close()
s.log.Debug(iname, "creating frontend socket")
felock := sync.Mutex{}
context, err := zmq.NewContext()
if err != nil {
return err
}
frontend, err := context.NewSocket(zmq.ROUTER)
if err != nil {
return err
}
defer frontend.Close()
s.log.Debug(iname, "binding frontend socket", addr)
if err := frontend.Bind(
fmt.Sprintf("tcp://*:%d", s.port),
); err != nil {
return err
}
outgoing := make(chan *Product, s.wbuff*s.wn)
incoming := make(chan []string, s.wbuff*s.wn)
workq := make(chan *Work, s.wbuff*s.wn)
// pool of worker goroutines
s.log.Debug(iname, "creating workers pool")
for i := 0; i < s.wn; i++ {
worker := NewWorker(i, workq, outgoing, db, s.ll_level)
go worker.Run()
defer worker.Stop()
}
// receiver:
go func() {
poller := zmq.NewPoller()
poller.Add(frontend, zmq.POLLIN)
s.log.Info(iname, "listening to incoming requests", addr)
for {
felock.Lock()
polled, err := poller.Poll(POLL_TIMEOUT)
if err == nil {
if len(polled) > 0 {
msg, err := frontend.RecvMessage(0)
if err == nil {
incoming <- msg
} else {
s.log.Warn(iname, "failed to receive incoming message", err)
}
}
} else {
s.log.Warn(iname, "failed to poll socket", err)
}
felock.Unlock()
// give other threads a chance to obtain the lock:
time.Sleep(THREADS_SLEEP)
}
}()
// dispatcher
go func() {
for {
msg := <-incoming
s.log.Debug(iname, "message received", msg)
if len(msg) < 3 {
s.log.Debug(iname, "not enough message parts", len(msg))
if len(msg) == 2 {
outgoing <- &Product{
id: msg,
success: false,
empty: false,
payload: []byte("no task specified"),
}
}
continue
}
work := Work{
id: msg[:2],
params: msg[2:],
}
workq <- &work
}
}()
// reply to requests pending in the replies quque
s.log.Debug(iname, "waiting for payloads")
for {
prod := <-outgoing
s.log.Debug(iname, "sending reply", prod)
felock.Lock()
_, err := frontend.SendMessage(prod.id, prod.success, prod.empty, prod.payload)
felock.Unlock()
if err != nil {
s.log.Warn(iname, "unable to send reply", err)
}
// give other threads a chance to obtain the lock:
time.Sleep(THREADS_SLEEP)
}
return nil
// now the deferred worker.Stop and frontend.Close will be called
}