-
Notifications
You must be signed in to change notification settings - Fork 28
/
seqbuf.go
420 lines (346 loc) · 9.47 KB
/
seqbuf.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
package main
import (
"errors"
"sync"
"time"
)
type seqNum int
func (s *seqNum) inc(maxSeqNum seqNum) seqNum {
if *s == maxSeqNum {
return 0
}
return *s + 1
}
func (s *seqNum) dec(maxSeqNum seqNum) seqNum {
if *s == 0 {
return maxSeqNum
}
return *s - 1
}
type seqNumRange [2]seqNum
func (r *seqNumRange) getDiff(maxSeqNum seqNum) (diff int) {
from := r[0]
to := r[1]
if to >= from {
diff = int(to) - int(from)
} else {
r[0] = from
r[1] = to
diff = (int(maxSeqNum) + 1) - int(from) + int(to)
}
return
}
type seqBufEntry struct {
seq seqNum
data []byte
}
type requestRetransmitCallbackType func(r seqNumRange) error
type seqBuf struct {
length time.Duration
maxSeqNum seqNum
maxSeqNumDiff seqNum
requestRetransmitCallback requestRetransmitCallbackType
// Available entries coming out from the seqbuf will be sent to entryChan.
entryChan chan seqBufEntry
// If this is true then the seqBuf is locked, which means no entries will be sent to entryChan.
lockedByInvalidSeq bool
lockedAt time.Time
// This is false until no packets have been sent to the entryChan.
alreadyReturnedFirstSeq bool
// The seqNum of the last packet sent to entryChan.
lastReturnedSeq seqNum
requestedRetransmit bool
lastRequestedRetransmitRange seqNumRange
ignoreMissingPktsUntilEnabled bool
ignoreMissingPktsUntilSeq seqNum
// Note that the most recently added entry is stored as the 0th entry.
entries []seqBufEntry
mutex sync.RWMutex
entryAddedChan chan bool
watcherCloseNeededChan chan bool
watcherCloseDoneChan chan bool
errOutOfOrder error
}
// func (s *seqBuf) string() (out string) {
// if len(s.entries) == 0 {
// return "empty"
// }
// for _, e := range s.entries {
// if out != "" {
// out += " "
// }
// out += fmt.Sprint(e.seq)
// }
// return out
// }
func (s *seqBuf) createEntry(seq seqNum, data []byte) seqBufEntry {
return seqBufEntry{
seq: seq,
data: data,
}
}
func (s *seqBuf) notifyWatcher() {
select {
case s.entryAddedChan <- true:
default:
}
}
func (s *seqBuf) addToFront(seq seqNum, data []byte) {
e := s.createEntry(seq, data)
s.entries = append([]seqBufEntry{e}, s.entries...)
s.notifyWatcher()
}
func (s *seqBuf) addToBack(seq seqNum, data []byte) {
e := s.createEntry(seq, data)
s.entries = append(s.entries, e)
s.notifyWatcher()
}
func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) {
if toPos == 0 {
s.addToFront(seq, data)
return
}
if toPos >= len(s.entries) {
s.addToBack(seq, data)
return
}
sliceBefore := s.entries[:toPos]
sliceAfter := s.entries[toPos:]
e := s.createEntry(seq, data)
s.entries = append(sliceBefore, append([]seqBufEntry{e}, sliceAfter...)...)
s.notifyWatcher()
}
func (s *seqBuf) getDiff(seq1, seq2 seqNum) seqNum {
if seq1 >= seq2 {
return seq1 - seq2
}
seq2Overflowed := s.maxSeqNum + 1 - seq2
return seq2Overflowed + seq1
}
type seqBufCompareResult int
const (
larger = seqBufCompareResult(iota)
smaller
equal
)
// Compares seq to toSeq, considering the seq turnover at maxSeqNum.
// Example: returns larger for seq=2 toSeq=1
// returns smaller for seq=0 toSeq=1
// returns smaller for seq=39 toSeq=1 if maxSeqNum is 40
func (s *seqBuf) compareSeq(seq, toSeq seqNum) seqBufCompareResult {
diff1 := s.getDiff(seq, toSeq)
diff2 := s.getDiff(toSeq, seq)
if diff1 == diff2 {
return equal
}
if diff1 > diff2 {
// This will cause an insert at the current position.
if s.maxSeqNumDiff > 0 && diff2 > s.maxSeqNumDiff {
return larger
}
return smaller
}
return larger
}
func (s *seqBuf) add(seq seqNum, data []byte) error {
s.mutex.Lock()
defer s.mutex.Unlock()
// log.Debug("inserting ", seq)
// defer func() {
// log.Print(s.String())
// }()
if seq > s.maxSeqNum {
return errors.New("seq out of range")
}
if len(s.entries) == 0 {
s.addToFront(seq, data)
return nil
}
if s.entries[0].seq == seq { // Dropping duplicate seq.
return nil
}
// Checking the first entry.
if s.compareSeq(seq, s.entries[0].seq) == larger {
s.addToFront(seq, data)
return nil
}
// Parsing through other entries if there are more than 1.
for i := 1; i < len(s.entries); i++ {
// This seqnum is already in the queue? Ignoring it.
if s.entries[i].seq == seq {
return nil
}
if s.compareSeq(seq, s.entries[i].seq) == larger {
// log.Debug("left for ", s.entries[i].seq)
s.insert(seq, data, i)
return nil
}
// log.Debug("right for ", s.entries[i].seq)
}
// No place found for the item?
s.addToBack(seq, data)
return nil
}
func (s *seqBuf) checkLockTimeout() (timeout bool, shouldRetryIn time.Duration) {
timeSinceLastInvalidSeq := time.Since(s.lockedAt)
lockDuration := s.length
if lockDuration < controlStreamLatency*2 {
lockDuration = controlStreamLatency * 2
}
if lockDuration > timeSinceLastInvalidSeq {
shouldRetryIn = lockDuration - timeSinceLastInvalidSeq
return
}
s.lockedByInvalidSeq = false
// log.Debug("lock timeout")
if s.requestedRetransmit {
s.ignoreMissingPktsUntilSeq = s.lastRequestedRetransmitRange[1]
s.ignoreMissingPktsUntilEnabled = true
}
return true, 0
}
// Returns true if all entries from the requested retransmit range have been received.
func (s *seqBuf) gotRetransmitRange() bool {
entryIdx := len(s.entries)
rangeSeq := s.lastRequestedRetransmitRange[0]
for {
entryIdx--
if entryIdx < 0 {
return false
}
if s.entries[entryIdx].seq != rangeSeq {
// log.Debug("entry idx ", entryIdx, " seq #", s.entries[entryIdx].seq, " does not match ", rangeSeq)
// log.Debug(s.string())
return false
}
if rangeSeq == s.lastRequestedRetransmitRange[1] {
return true
}
rangeSeq = rangeSeq.inc(s.maxSeqNum)
}
}
// shouldRetryIn is only filled when no entry is available, but there are entries in the seqbuf.
// err is not nil if the seqbuf is empty.
func (s *seqBuf) get() (e seqBufEntry, shouldRetryIn time.Duration, err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if len(s.entries) == 0 {
return e, 0, errors.New("seqbuf is empty")
}
entryCount := len(s.entries)
lastEntryIdx := entryCount - 1
e = s.entries[lastEntryIdx]
if s.alreadyReturnedFirstSeq {
if s.lockedByInvalidSeq {
if s.requestedRetransmit && s.gotRetransmitRange() {
s.lockedByInvalidSeq = false
// log.Debug("lock over")
} else {
var timeout bool
if timeout, shouldRetryIn = s.checkLockTimeout(); !timeout {
return
}
}
} else {
if s.compareSeq(e.seq, seqNum(s.lastReturnedSeq)) != larger {
// log.Debug("ignoring out of order seq ", e.seq)
s.entries = s.entries[:lastEntryIdx]
err = s.errOutOfOrder
return
}
if s.ignoreMissingPktsUntilEnabled {
if s.compareSeq(e.seq, s.ignoreMissingPktsUntilSeq) == larger {
// log.Debug("ignore over ", e.seq, " ", s.ignoreMissingPktsUntilSeq)
s.ignoreMissingPktsUntilEnabled = false
} //else {
// log.Debug("ignoring missing pkt, seq #", e.seq, " until ", s.ignoreMissingPktsUntilSeq)
//}
} else {
expectedNextSeq := s.lastReturnedSeq.inc(s.maxSeqNum)
if e.seq != expectedNextSeq {
// log.Debug("lock on, expected seq ", expectedNextSeq, " got ", e.seq)
s.lockedByInvalidSeq = true
s.lockedAt = time.Now()
s.requestedRetransmit = false
s.ignoreMissingPktsUntilEnabled = false
shouldRetryIn = s.length
if s.requestRetransmitCallback != nil {
s.lastRequestedRetransmitRange[0] = expectedNextSeq
s.lastRequestedRetransmitRange[1] = e.seq.dec(s.maxSeqNum)
if err = s.requestRetransmitCallback(s.lastRequestedRetransmitRange); err == nil {
s.requestedRetransmit = true
}
}
return
}
}
}
}
s.lastReturnedSeq = e.seq
s.alreadyReturnedFirstSeq = true
s.entries = s.entries[:lastEntryIdx]
return e, 0, nil
}
func (s *seqBuf) watcher() {
defer func() {
s.watcherCloseDoneChan <- true
}()
entryAvailableTimer := time.NewTimer(0)
<-entryAvailableTimer.C
var entryAvailableTimerRunning bool
for {
retry := true
for retry {
retry = false
e, t, err := s.get()
if err == nil && t == 0 {
if s.entryChan != nil {
select {
case s.entryChan <- e:
case <-s.watcherCloseNeededChan:
return
}
}
// We may have further available entries.
retry = true
} else {
if err == s.errOutOfOrder {
retry = true
} else if !entryAvailableTimerRunning && t > 0 {
// An entry will be available later, waiting for it.
entryAvailableTimer.Reset(t)
entryAvailableTimerRunning = true
}
}
}
select {
case <-s.watcherCloseNeededChan:
return
case <-s.entryAddedChan:
case <-entryAvailableTimer.C:
entryAvailableTimerRunning = false
}
}
}
// Setting a max. seqnum diff is optional. If it's 0 then the diff will be half of the maxSeqNum range.
// Available entries coming out from the seqbuf will be sent to entryChan.
func (s *seqBuf) init(length time.Duration, maxSeqNum, maxSeqNumDiff seqNum, entryChan chan seqBufEntry,
requestRetransmitCallback requestRetransmitCallbackType) {
s.length = length
s.maxSeqNum = maxSeqNum
s.maxSeqNumDiff = maxSeqNumDiff
s.entryChan = entryChan
s.requestRetransmitCallback = requestRetransmitCallback
s.entryAddedChan = make(chan bool)
s.watcherCloseNeededChan = make(chan bool)
s.watcherCloseDoneChan = make(chan bool)
s.errOutOfOrder = errors.New("out of order pkt")
go s.watcher()
}
func (s *seqBuf) deinit() {
if s.watcherCloseNeededChan == nil { // Init has not ran?
return
}
s.watcherCloseNeededChan <- true
<-s.watcherCloseDoneChan
}