diff --git a/cmd/mtail/main.go b/cmd/mtail/main.go index b3dc7b0a9..1637df286 100644 --- a/cmd/mtail/main.go +++ b/cmd/mtail/main.go @@ -182,7 +182,7 @@ func main() { } if *staleLogGcTickInterval > 0 { staleLogGcWaker := waker.NewTimed(ctx, *staleLogGcTickInterval) - opts = append(opts, mtail.StaleLogGcWaker(staleLogGcWaker)) + opts = append(opts, mtail.GcWaker(staleLogGcWaker)) } if *pollInterval > 0 { logStreamPollWaker := waker.NewTimed(ctx, *pollInterval) diff --git a/internal/mtail/basic_tail_integration_test.go b/internal/mtail/basic_tail_integration_test.go index 25b6d9fd0..1bfc95f9f 100644 --- a/internal/mtail/basic_tail_integration_test.go +++ b/internal/mtail/basic_tail_integration_test.go @@ -21,7 +21,7 @@ func TestBasicTail(t *testing.T) { } logDir := testutil.TestTempDir(t) - m, stopM := mtail.TestStartServer(t, 2, 1, mtail.LogPathPatterns(logDir+"/*"), mtail.ProgramPath("../../examples/linecount.mtail")) + m, stopM := mtail.TestStartServer(t, 1, 1, mtail.LogPathPatterns(logDir+"/*"), mtail.ProgramPath("../../examples/linecount.mtail")) defer stopM() logFile := filepath.Join(logDir, "log") @@ -31,7 +31,7 @@ func TestBasicTail(t *testing.T) { f := testutil.TestOpenFile(t, logFile) defer f.Close() - m.AwakenPatternPollers(2, 2) // Find `logFile` + m.AwakenPatternPollers(1, 1) // Find `logFile` m.AwakenLogStreams(1, 1) // Force a sync to EOF for i := 1; i <= 3; i++ { diff --git a/internal/mtail/log_deletion_integration_unix_test.go b/internal/mtail/log_deletion_integration_unix_test.go index d02d032dc..0d7a845ca 100644 --- a/internal/mtail/log_deletion_integration_unix_test.go +++ b/internal/mtail/log_deletion_integration_unix_test.go @@ -39,6 +39,6 @@ func TestLogDeletion(t *testing.T) { m.AwakenLogStreams(1, 0) // run stream to observe it's missing logCloseCheck() - m.AwakenPatternPollers(1, 1) + m.AwakenGcPoller(1, 1) logCountCheck() } diff --git a/internal/mtail/log_rotation_integration_test.go b/internal/mtail/log_rotation_integration_test.go index 7bf0c452c..398168761 100644 --- a/internal/mtail/log_rotation_integration_test.go +++ b/internal/mtail/log_rotation_integration_test.go @@ -54,6 +54,7 @@ func TestLogRotationBySoftLinkChange(t *testing.T) { defer trueLog2.Close() m.AwakenPatternPollers(1, 1) m.AwakenLogStreams(1, 1) + m.AwakenGcPoller(1, 1) logClosedCheck := m.ExpectMapExpvarDeltaWithDeadline("log_closes_total", logFilepath, 1) logCompletedCheck := m.ExpectExpvarDeltaWithDeadline("log_count", -1) testutil.FatalIfErr(t, os.Remove(logFilepath)) @@ -63,7 +64,7 @@ func TestLogRotationBySoftLinkChange(t *testing.T) { m.AwakenPatternPollers(1, 1) // simulate race condition with this poll. m.AwakenLogStreams(1, 0) logClosedCheck() // barrier until filestream closes fd - m.AwakenPatternPollers(1, 1) + m.AwakenGcPoller(1, 1) logCompletedCheck() // barrier until the logstream is removed from tailer } testutil.FatalIfErr(t, os.Symlink(logFilepath+".true2", logFilepath)) diff --git a/internal/mtail/log_rotation_integration_unix_test.go b/internal/mtail/log_rotation_integration_unix_test.go index a3a240f3e..c772abcfc 100644 --- a/internal/mtail/log_rotation_integration_unix_test.go +++ b/internal/mtail/log_rotation_integration_unix_test.go @@ -69,7 +69,7 @@ func TestLogRotationByRename(t *testing.T) { m.AwakenPatternPollers(1, 1) // simulate race condition with this poll. m.AwakenLogStreams(1, 0) logClosedCheck() // barrier until filestream closes fd - m.AwakenPatternPollers(1, 1) + m.AwakenGcPoller(1, 1) logCompletedCheck() // barrier until the logstream is removed from tailer } glog.Info("create") diff --git a/internal/mtail/options.go b/internal/mtail/options.go index 71dc062b0..5d4b3699d 100644 --- a/internal/mtail/options.go +++ b/internal/mtail/options.go @@ -109,17 +109,17 @@ func (opt overrideLocation) apply(m *Server) error { return nil } -// StaleLogGcWaker triggers garbage collection runs for stale logs in the tailer. -func StaleLogGcWaker(w waker.Waker) Option { - return &staleLogGcWaker{w} +// GcWaker triggers garbage collection runs for stale logs in the tailer. +func GcWaker(w waker.Waker) Option { + return &gcWaker{w} } -type staleLogGcWaker struct { +type gcWaker struct { waker.Waker } -func (opt staleLogGcWaker) apply(m *Server) error { - m.tOpts = append(m.tOpts, tailer.StaleLogGcWaker(opt.Waker)) +func (opt gcWaker) apply(m *Server) error { + m.tOpts = append(m.tOpts, tailer.GcWaker(opt.Waker)) return nil } diff --git a/internal/mtail/testing.go b/internal/mtail/testing.go index edd70f2c5..25c4670b7 100644 --- a/internal/mtail/testing.go +++ b/internal/mtail/testing.go @@ -34,6 +34,9 @@ type TestServer struct { // method, synchronising the pattern poll with the test. AwakenPatternPollers waker.WakeFunc // the glob awakens + gcWaker waker.Waker // activate the cleanup routines + AwakenGcPoller waker.WakeFunc + tb testing.TB cancel context.CancelFunc @@ -65,9 +68,11 @@ func TestMakeServer(tb testing.TB, patternWakers int, streamWakers int, options } ts.streamWaker, ts.AwakenLogStreams = waker.NewTest(ctx, streamWakers, "streams") ts.patternWaker, ts.AwakenPatternPollers = waker.NewTest(ctx, patternWakers, "patterns") + ts.gcWaker, ts.AwakenGcPoller = waker.NewTest(ctx, 1, "gc") options = append(options, LogstreamPollWaker(ts.streamWaker), LogPatternPollWaker(ts.patternWaker), + GcWaker(ts.gcWaker), ) var err error ts.Server, err = New(ctx, metrics.NewStore(), options...) diff --git a/internal/tailer/tail.go b/internal/tailer/tail.go index e6c7473a2..9d03dd15e 100644 --- a/internal/tailer/tail.go +++ b/internal/tailer/tail.go @@ -49,7 +49,7 @@ type Tailer struct { logstreamsMu sync.RWMutex // protects `logstreams`. logstreams map[string]logstream.LogStream // Map absolte pathname to logstream reading that pathname. - staleLogGcWaker waker.Waker // Used to wake stale log GC + gcWaker waker.Waker // Used to wake stale log and completion pollers initDone chan struct{} } @@ -85,8 +85,8 @@ func (opt IgnoreRegex) apply(t *Tailer) error { return t.SetIgnorePattern(string(opt)) } -// StaleLogGcWaker triggers garbage collection runs for stale logs in the tailer. -func StaleLogGcWaker(w waker.Waker) Option { +// GcWaker triggers garbage collection runs for stale logs in the tailer. +func GcWaker(w waker.Waker) Option { return &staleLogGcWaker{w} } @@ -95,7 +95,7 @@ type staleLogGcWaker struct { } func (opt staleLogGcWaker) apply(t *Tailer) error { - t.staleLogGcWaker = opt.Waker + t.gcWaker = opt.Waker return nil } @@ -158,8 +158,7 @@ func New(ctx context.Context, wg *sync.WaitGroup, lines chan<- *logline.LogLine, } } // Start the routine for checking if logstreams have completed. - t.startPollLogStreamsForCompletion(ctx, wg) - t.StartStaleLogstreamExpirationLoop(ctx, wg) + t.StartGcPoller(ctx) // This goroutine cancels the Tailer if all of our dependent subroutines are done. // These are any live logstreams, and any log pattern pollers. @@ -299,45 +298,6 @@ func (t *Tailer) TailPath(pathname string) error { return nil } -// ExpireStaleLogstreams removes logstreams that have had no reads for 1h or more. -func (t *Tailer) ExpireStaleLogstreams() error { - t.logstreamsMu.Lock() - defer t.logstreamsMu.Unlock() - for _, v := range t.logstreams { - if time.Since(v.LastReadTime()) > (time.Hour * 24) { - v.Stop() - } - } - return nil -} - -// StartStaleLogstreamExpirationLoop runs a permanent goroutine to expire stale logstreams. -func (t *Tailer) StartStaleLogstreamExpirationLoop(ctx context.Context, wg *sync.WaitGroup) { - if t.staleLogGcWaker == nil { - glog.InfoContext(ctx, "Log handle expiration disabled") - return - } - wg.Add(1) - go func() { - defer wg.Done() - <-t.initDone - if t.oneShot { - glog.InfoContext(ctx, "No gc loop in oneshot mode.") - return - } - for { - select { - case <-ctx.Done(): - return - case <-t.staleLogGcWaker.Wake(): - if err := t.ExpireStaleLogstreams(); err != nil { - glog.Info(err) - } - } - } - }() -} - // pollLogPattern runs a permanent goroutine to poll for new log files that // match `pattern`. It is on the subroutine waitgroup as we do not want to // shut down the tailer when there are outstanding patterns to poll for. @@ -396,47 +356,58 @@ func (t *Tailer) doPatternGlob(pattern string) error { return nil } -// PollLogStreamsForCompletion looks at the existing paths and checks if they're already -// complete, removing it from the map if so. -func (t *Tailer) PollLogStreamsForCompletion() error { - t.logstreamsMu.Lock() - defer t.logstreamsMu.Unlock() - for name, l := range t.logstreams { - if l.IsComplete() { - glog.Infof("%s is complete", name) - delete(t.logstreams, name) - logCount.Add(-1) - continue - } - } - return nil -} - -// StartPollLogStreamsForCompletion runs a permanent goroutine to poll for -// completed LogStreams. It is on the parent waitgroup as it should exit only once the tailer is shutting down. -func (t *Tailer) startPollLogStreamsForCompletion(ctx context.Context, wg *sync.WaitGroup) { - // Uses the log pattern poll waker for maintenance. - if t.logPatternPollWaker == nil { - glog.InfoContext(ctx, "Log completion polling disabled by no waker") +// StartGcPoller runs a permanent goroutine to expire stale logstreams and clean up completed streams. This background goroutine isn't waited for during shutdown. +func (t *Tailer) StartGcPoller(ctx context.Context) { + if t.gcWaker == nil { + glog.InfoContext(ctx, "stream gc disabled because no waker") return } - wg.Add(1) go func() { - defer wg.Done() <-t.initDone if t.oneShot { - glog.InfoContext(ctx, "No logstream completion polling loop in oneshot mode.") + glog.InfoContext(ctx, "No gc loop in oneshot mode.") return } for { select { case <-ctx.Done(): return - case <-t.logPatternPollWaker.Wake(): - if err := t.PollLogStreamsForCompletion(); err != nil { + case <-t.gcWaker.Wake(): + if err := t.ExpireStaleLogstreams(); err != nil { + glog.Info(err) + } + if err := t.RemoveCompletedLogstreams(); err != nil { glog.Info(err) } } } }() } + +// RemoveCompletedLogstreams checks if current logstreams have completed, +// removing it from the map if so. +func (t *Tailer) RemoveCompletedLogstreams() error { + t.logstreamsMu.Lock() + defer t.logstreamsMu.Unlock() + for name, l := range t.logstreams { + if l.IsComplete() { + glog.Infof("%s is complete", name) + delete(t.logstreams, name) + logCount.Add(-1) + continue + } + } + return nil +} + +// ExpireStaleLogstreams removes logstreams that have had no reads for 1h or more. +func (t *Tailer) ExpireStaleLogstreams() error { + t.logstreamsMu.Lock() + defer t.logstreamsMu.Unlock() + for _, v := range t.logstreams { + if time.Since(v.LastReadTime()) > (time.Hour * 24) { + v.Stop() + } + } + return nil +} diff --git a/internal/tailer/tail_unix_test.go b/internal/tailer/tail_unix_test.go index 421e89d36..57e00002b 100644 --- a/internal/tailer/tail_unix_test.go +++ b/internal/tailer/tail_unix_test.go @@ -36,26 +36,26 @@ func TestTailerOpenRetries(t *testing.T) { if err := ta.TailPath(logfile); err == nil || !os.IsPermission(err) { t.Fatalf("Expected a permission denied error here: %s", err) } - testutil.FatalIfErr(t, ta.PollLogStreamsForCompletion()) + testutil.FatalIfErr(t, ta.RemoveCompletedLogstreams()) ta.awakenPattern(1, 1) glog.Info("remove") if err := os.Remove(logfile); err != nil { t.Fatal(err) } - testutil.FatalIfErr(t, ta.PollLogStreamsForCompletion()) + testutil.FatalIfErr(t, ta.RemoveCompletedLogstreams()) ta.awakenPattern(1, 1) glog.Info("openfile") f, err := os.OpenFile(logfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0) //nolint:staticcheck // test code defer f.Close() testutil.FatalIfErr(t, err) - testutil.FatalIfErr(t, ta.PollLogStreamsForCompletion()) + testutil.FatalIfErr(t, ta.RemoveCompletedLogstreams()) ta.awakenPattern(1, 1) glog.Info("chmod") if err := os.Chmod(logfile, 0o666); err != nil { t.Fatal(err) } - testutil.FatalIfErr(t, ta.PollLogStreamsForCompletion()) + testutil.FatalIfErr(t, ta.RemoveCompletedLogstreams()) ta.awakenPattern(1, 1) ta.awakenStreams(1, 1) // force sync to EOF glog.Info("write string")