Skip to content

Commit

Permalink
Fully drain messages during client.Stop
Browse files Browse the repository at this point in the history
Messages buffered by `SortLoop` were previously not drained and therefore not acked, leading to wrong metric counters.

The gist of this change is that `SortLoop` now waits for its buffered messages to be drained before returning.

PiperOrigin-RevId: 693705921
  • Loading branch information
torsm authored and copybara-github committed Nov 7, 2024
1 parent baa893c commit c15716f
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 27 deletions.
63 changes: 42 additions & 21 deletions fleetspeak/src/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ type Client struct {
outHigh chan service.AckMessage
outMedium chan service.AckMessage
outLow chan service.AckMessage
// used to wait until the retry loop goroutines are done
// used to wait until the retry and sort loop goroutines are done
retryLoopsDone sync.WaitGroup
sortLoopDone sync.WaitGroup

acks chan common.MessageID
errs chan *fspb.MessageErrorData
Expand Down Expand Up @@ -149,7 +150,11 @@ func New(cfg config.Configuration, cmps Components) (*Client, error) {
if f == nil {
f = flow.NewFilter()
}
go message.SortLoop(ret.outUnsorted, ret.outbox, f)
ret.sortLoopDone.Add(1)
go func() {
message.SortLoop(ret.outUnsorted, ret.outbox, f)
ret.sortLoopDone.Done()
}()

ssd := &serviceData{
config: ret.sc,
Expand Down Expand Up @@ -258,33 +263,49 @@ func (c *Client) ProcessMessage(ctx context.Context, am service.AckMessage) (err

// Stop shuts the client down gracefully. This includes stopping all communicators and services.
func (c *Client) Stop() {
log.Info("Stopping client...")
if c.com != nil {
c.com.Stop()
}
c.sc.Stop()
c.config.Stop()
close(c.outLow)
close(c.outMedium)
close(c.outHigh)
log.Info("Components have been stopped.")

// From here, shutdown is a little subtle:
//
// 1) At this point, the communicator is off, so nothing else should be
// draining outbox. We do this ourselves and Ack everything so that the
// RetryLoops are guaranteed to terminate.
// - At this point, the communicator is off, so nothing else should be
// draining outbox. We do this ourselves and Ack everything so that the
// RetryLoops are guaranteed to terminate.
//
// 2) The fake Acks in 1) are safe because the config manager is stopped.
// This means that client services are shut down and the Acks will not be
// reported outside of this process.
// - The fake Acks in 1) are safe because the config manager is stopped.
// This means that client services are shut down and the Acks will not be
// reported outside of this process.

close(c.outLow)
close(c.outMedium)
close(c.outHigh)
c.retryLoopsDone.Wait()
log.Info("Retry loops have terminated.")

// - Now, no more messages enter outUnsorted.
//
// 3) Then we close outUnsorted so that the SortLoop terminates.
for {
select {
case m := <-c.outbox:
m.Ack()
default:
c.retryLoopsDone.Wait()
close(c.outUnsorted)
return
// - We close outUnsorted and drain outbox, to make sure no messages are lost.
// Once these two things are complete, SortLoop will return, and the client
// can be shut down.

done := make(chan struct{})
close(c.outUnsorted)
go func() {
for {
select {
case m := <-c.outbox:
m.Ack()
case <-done:
return
}
}
}
}()
c.sortLoopDone.Wait()
done <- struct{}{}
log.Info("Messages have been drained.")
}
4 changes: 2 additions & 2 deletions fleetspeak/src/client/internal/message/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func RetryLoop(in <-chan service.AckMessage, out chan<- comms.MessageInfo, stats
if sm.m.Ack != nil {
sm.m.Ack()
}
stats.MessageAcknowledged(sm.m.M, sm.size)
acks <- sm
},
Nack: func() { nacks <- sm },
Expand All @@ -68,10 +69,9 @@ func RetryLoop(in <-chan service.AckMessage, out chan<- comms.MessageInfo, stats
case sm := <-acks:
size -= sm.size
count--
stats.MessageAcknowledged(sm.m.M, sm.size)
case sm := <-nacks:
out <- makeInfo(sm)
stats.BeforeMessageRetry(sm.m.M)
out <- makeInfo(sm)
case m, ok := <-optIn:
if !ok {
return
Expand Down
22 changes: 18 additions & 4 deletions fleetspeak/src/client/internal/message/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,28 @@ import (
)

// SortLoop connects in and out in a sorted manner. That is, it acts essentially
// as a buffered channel between in and out which sorts any messages within
// it. Caller is responsible for implementing any needed size limit. Returns
// when "in" is closed.
// as a buffered channel between in and out which sorts any messages within it.
// The caller is responsible for implementing any needed size limit.
// SortLoop returns when in is closed, and the buffered messages have been
// drained through out, to make sure no messages are lost.
func SortLoop(in <-chan comms.MessageInfo, out chan<- comms.MessageInfo, f *flow.Filter) {
// Keep a slice of messages for each priority level. These are used as fifo
// queues, appending to the end and retreiving from head.
// queues, appending to the end and retrieving from head.
var low, medium, high []comms.MessageInfo

// Block until all messages have been drained.
defer func() {
for _, mi := range high {
out <- mi
}
for _, mi := range medium {
out <- mi
}
for _, mi := range low {
out <- mi
}
}()

// Append a message to the correct list.
appendMI := func(mi comms.MessageInfo) {
switch mi.M.Priority {
Expand Down
36 changes: 36 additions & 0 deletions fleetspeak/src/client/internal/message/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,39 @@ func TestFilter(t *testing.T) {
}
}
}

func TestBlockUntilDrained(t *testing.T) {
in := make(chan comms.MessageInfo)
out := make(chan comms.MessageInfo)

m := &fspb.Message{
MessageId: []byte{0},
Priority: fspb.Message_HIGH,
}

done := make(chan struct{})
go func() {
SortLoop(in, out, flow.NewFilter())
done <- struct{}{}
}()

in <- comms.MessageInfo{M: m}
close(in)

select {
case <-done:
t.Fatalf("SortLoop returned before all messages were drained.")
case <-time.After(200 * time.Millisecond):
}

got := <-out
if !proto.Equal(got.M, m) {
t.Errorf("Unexpected message from output, got %v want %v.", got.M, m)
}

select {
case <-done:
case <-time.After(200 * time.Millisecond):
t.Fatalf("SortLoop did not return after all messages were drained.")
}
}

0 comments on commit c15716f

Please sign in to comment.