From 94a068846297d8b9b56891e486c8d3942aa84500 Mon Sep 17 00:00:00 2001 From: Torben Schmitz Date: Thu, 7 Nov 2024 06:12:33 -0800 Subject: [PATCH] Fully drain messages during `client.Stop` Messages buffered by `SortLoop` were previously not drained and therefore not acked, leading to wrong metric counters. The gist of this change is that `SortLoop` now waits for its buffered messages to be drained before returning. PiperOrigin-RevId: 694092374 --- fleetspeak/src/client/client.go | 63 ++++++++++++------- .../src/client/internal/message/retry.go | 4 +- .../src/client/internal/message/sort.go | 22 +++++-- .../src/client/internal/message/sort_test.go | 36 +++++++++++ 4 files changed, 98 insertions(+), 27 deletions(-) diff --git a/fleetspeak/src/client/client.go b/fleetspeak/src/client/client.go index b3f94adc..ac307350 100644 --- a/fleetspeak/src/client/client.go +++ b/fleetspeak/src/client/client.go @@ -74,8 +74,9 @@ type Client struct { outHigh chan service.AckMessage outMedium chan service.AckMessage outLow chan service.AckMessage - // used to wait until the retry loop goroutines are done + // used to wait until the retry and sort loop goroutines are done retryLoopsDone sync.WaitGroup + sortLoopDone sync.WaitGroup acks chan common.MessageID errs chan *fspb.MessageErrorData @@ -149,7 +150,11 @@ func New(cfg config.Configuration, cmps Components) (*Client, error) { if f == nil { f = flow.NewFilter() } - go message.SortLoop(ret.outUnsorted, ret.outbox, f) + ret.sortLoopDone.Add(1) + go func() { + message.SortLoop(ret.outUnsorted, ret.outbox, f) + ret.sortLoopDone.Done() + }() ssd := &serviceData{ config: ret.sc, @@ -258,33 +263,49 @@ func (c *Client) ProcessMessage(ctx context.Context, am service.AckMessage) (err // Stop shuts the client down gracefully. This includes stopping all communicators and services. func (c *Client) Stop() { + log.Info("Stopping client...") if c.com != nil { c.com.Stop() } c.sc.Stop() c.config.Stop() - close(c.outLow) - close(c.outMedium) - close(c.outHigh) + log.Info("Components have been stopped.") + // From here, shutdown is a little subtle: // - // 1) At this point, the communicator is off, so nothing else should be - // draining outbox. We do this ourselves and Ack everything so that the - // RetryLoops are guaranteed to terminate. + // - At this point, the communicator is off, so nothing else should be + // draining outbox. We do this ourselves and Ack everything so that the + // RetryLoops are guaranteed to terminate. // - // 2) The fake Acks in 1) are safe because the config manager is stopped. - // This means that client services are shut down and the Acks will not be - // reported outside of this process. + // - The fake Acks in 1) are safe because the config manager is stopped. + // This means that client services are shut down and the Acks will not be + // reported outside of this process. + + close(c.outLow) + close(c.outMedium) + close(c.outHigh) + c.retryLoopsDone.Wait() + log.Info("Retry loops have terminated.") + + // - Now, no more messages enter outUnsorted. // - // 3) Then we close outUnsorted so that the SortLoop terminates. - for { - select { - case m := <-c.outbox: - m.Ack() - default: - c.retryLoopsDone.Wait() - close(c.outUnsorted) - return + // - We close outUnsorted and drain outbox, to make sure no messages are lost. + // Once these two things are complete, SortLoop will return, and the client + // can be shut down. + + done := make(chan struct{}) + close(c.outUnsorted) + go func() { + for { + select { + case m := <-c.outbox: + m.Ack() + case <-done: + return + } } - } + }() + c.sortLoopDone.Wait() + done <- struct{}{} + log.Info("Messages have been drained.") } diff --git a/fleetspeak/src/client/internal/message/retry.go b/fleetspeak/src/client/internal/message/retry.go index 31c5dd74..e3356e0d 100644 --- a/fleetspeak/src/client/internal/message/retry.go +++ b/fleetspeak/src/client/internal/message/retry.go @@ -49,6 +49,7 @@ func RetryLoop(in <-chan service.AckMessage, out chan<- comms.MessageInfo, stats if sm.m.Ack != nil { sm.m.Ack() } + stats.MessageAcknowledged(sm.m.M, sm.size) acks <- sm }, Nack: func() { nacks <- sm }, @@ -68,10 +69,9 @@ func RetryLoop(in <-chan service.AckMessage, out chan<- comms.MessageInfo, stats case sm := <-acks: size -= sm.size count-- - stats.MessageAcknowledged(sm.m.M, sm.size) case sm := <-nacks: - out <- makeInfo(sm) stats.BeforeMessageRetry(sm.m.M) + out <- makeInfo(sm) case m, ok := <-optIn: if !ok { return diff --git a/fleetspeak/src/client/internal/message/sort.go b/fleetspeak/src/client/internal/message/sort.go index c15b6a5f..5f2614ca 100644 --- a/fleetspeak/src/client/internal/message/sort.go +++ b/fleetspeak/src/client/internal/message/sort.go @@ -24,14 +24,28 @@ import ( ) // SortLoop connects in and out in a sorted manner. That is, it acts essentially -// as a buffered channel between in and out which sorts any messages within -// it. Caller is responsible for implementing any needed size limit. Returns -// when "in" is closed. +// as a buffered channel between in and out which sorts any messages within it. +// The caller is responsible for implementing any needed size limit. +// SortLoop returns when in is closed, and the buffered messages have been +// drained through out, to make sure no messages are lost. func SortLoop(in <-chan comms.MessageInfo, out chan<- comms.MessageInfo, f *flow.Filter) { // Keep a slice of messages for each priority level. These are used as fifo - // queues, appending to the end and retreiving from head. + // queues, appending to the end and retrieving from head. var low, medium, high []comms.MessageInfo + // Block until all messages have been drained. + defer func() { + for _, mi := range high { + out <- mi + } + for _, mi := range medium { + out <- mi + } + for _, mi := range low { + out <- mi + } + }() + // Append a message to the correct list. appendMI := func(mi comms.MessageInfo) { switch mi.M.Priority { diff --git a/fleetspeak/src/client/internal/message/sort_test.go b/fleetspeak/src/client/internal/message/sort_test.go index c48fac8c..e54efd48 100644 --- a/fleetspeak/src/client/internal/message/sort_test.go +++ b/fleetspeak/src/client/internal/message/sort_test.go @@ -188,3 +188,39 @@ func TestFilter(t *testing.T) { } } } + +func TestBlockUntilDrained(t *testing.T) { + in := make(chan comms.MessageInfo) + out := make(chan comms.MessageInfo) + + m := &fspb.Message{ + MessageId: []byte{0}, + Priority: fspb.Message_HIGH, + } + + done := make(chan struct{}) + go func() { + SortLoop(in, out, flow.NewFilter()) + done <- struct{}{} + }() + + in <- comms.MessageInfo{M: m} + close(in) + + select { + case <-done: + t.Fatalf("SortLoop returned before all messages were drained.") + case <-time.After(200 * time.Millisecond): + } + + got := <-out + if !proto.Equal(got.M, m) { + t.Errorf("Unexpected message from output, got %v want %v.", got.M, m) + } + + select { + case <-done: + case <-time.After(200 * time.Millisecond): + t.Fatalf("SortLoop did not return after all messages were drained.") + } +}