Skip to content

Commit

Permalink
Move the stream cleanup onto the GC poller.
Browse files Browse the repository at this point in the history
Rename the waker option and the goroutine launcher as it's not just for stale
logs anymore.

Fix tests that were relying on the pattern poller for log completion.
  • Loading branch information
jaqx0r committed May 26, 2024
1 parent 455da56 commit 6e3f971
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 88 deletions.
2 changes: 1 addition & 1 deletion cmd/mtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func main() {
}
if *staleLogGcTickInterval > 0 {
staleLogGcWaker := waker.NewTimed(ctx, *staleLogGcTickInterval)
opts = append(opts, mtail.StaleLogGcWaker(staleLogGcWaker))
opts = append(opts, mtail.GcWaker(staleLogGcWaker))
}
if *pollInterval > 0 {
logStreamPollWaker := waker.NewTimed(ctx, *pollInterval)
Expand Down
4 changes: 2 additions & 2 deletions internal/mtail/basic_tail_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestBasicTail(t *testing.T) {
}
logDir := testutil.TestTempDir(t)

m, stopM := mtail.TestStartServer(t, 2, 1, mtail.LogPathPatterns(logDir+"/*"), mtail.ProgramPath("../../examples/linecount.mtail"))
m, stopM := mtail.TestStartServer(t, 1, 1, mtail.LogPathPatterns(logDir+"/*"), mtail.ProgramPath("../../examples/linecount.mtail"))
defer stopM()

logFile := filepath.Join(logDir, "log")
Expand All @@ -31,7 +31,7 @@ func TestBasicTail(t *testing.T) {

f := testutil.TestOpenFile(t, logFile)
defer f.Close()
m.AwakenPatternPollers(2, 2) // Find `logFile`
m.AwakenPatternPollers(1, 1) // Find `logFile`
m.AwakenLogStreams(1, 1) // Force a sync to EOF

for i := 1; i <= 3; i++ {
Expand Down
2 changes: 1 addition & 1 deletion internal/mtail/log_deletion_integration_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ func TestLogDeletion(t *testing.T) {

m.AwakenLogStreams(1, 0) // run stream to observe it's missing
logCloseCheck()
m.AwakenPatternPollers(1, 1)
m.AwakenGcPoller(1, 1)
logCountCheck()
}
3 changes: 2 additions & 1 deletion internal/mtail/log_rotation_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestLogRotationBySoftLinkChange(t *testing.T) {
defer trueLog2.Close()
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(1, 1)
m.AwakenGcPoller(1, 1)
logClosedCheck := m.ExpectMapExpvarDeltaWithDeadline("log_closes_total", logFilepath, 1)
logCompletedCheck := m.ExpectExpvarDeltaWithDeadline("log_count", -1)
testutil.FatalIfErr(t, os.Remove(logFilepath))
Expand All @@ -63,7 +64,7 @@ func TestLogRotationBySoftLinkChange(t *testing.T) {
m.AwakenPatternPollers(1, 1) // simulate race condition with this poll.
m.AwakenLogStreams(1, 0)
logClosedCheck() // barrier until filestream closes fd
m.AwakenPatternPollers(1, 1)
m.AwakenGcPoller(1, 1)
logCompletedCheck() // barrier until the logstream is removed from tailer
}
testutil.FatalIfErr(t, os.Symlink(logFilepath+".true2", logFilepath))
Expand Down
2 changes: 1 addition & 1 deletion internal/mtail/log_rotation_integration_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestLogRotationByRename(t *testing.T) {
m.AwakenPatternPollers(1, 1) // simulate race condition with this poll.
m.AwakenLogStreams(1, 0)
logClosedCheck() // barrier until filestream closes fd
m.AwakenPatternPollers(1, 1)
m.AwakenGcPoller(1, 1)
logCompletedCheck() // barrier until the logstream is removed from tailer
}
glog.Info("create")
Expand Down
12 changes: 6 additions & 6 deletions internal/mtail/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,17 @@ func (opt overrideLocation) apply(m *Server) error {
return nil
}

// StaleLogGcWaker triggers garbage collection runs for stale logs in the tailer.
func StaleLogGcWaker(w waker.Waker) Option {
return &staleLogGcWaker{w}
// GcWaker triggers garbage collection runs for stale logs in the tailer.
func GcWaker(w waker.Waker) Option {
return &gcWaker{w}
}

type staleLogGcWaker struct {
type gcWaker struct {
waker.Waker
}

func (opt staleLogGcWaker) apply(m *Server) error {
m.tOpts = append(m.tOpts, tailer.StaleLogGcWaker(opt.Waker))
func (opt gcWaker) apply(m *Server) error {
m.tOpts = append(m.tOpts, tailer.GcWaker(opt.Waker))
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions internal/mtail/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type TestServer struct {
// method, synchronising the pattern poll with the test.
AwakenPatternPollers waker.WakeFunc // the glob awakens

gcWaker waker.Waker // activate the cleanup routines
AwakenGcPoller waker.WakeFunc

tb testing.TB

cancel context.CancelFunc
Expand Down Expand Up @@ -65,9 +68,11 @@ func TestMakeServer(tb testing.TB, patternWakers int, streamWakers int, options
}
ts.streamWaker, ts.AwakenLogStreams = waker.NewTest(ctx, streamWakers, "streams")
ts.patternWaker, ts.AwakenPatternPollers = waker.NewTest(ctx, patternWakers, "patterns")
ts.gcWaker, ts.AwakenGcPoller = waker.NewTest(ctx, 1, "gc")
options = append(options,
LogstreamPollWaker(ts.streamWaker),
LogPatternPollWaker(ts.patternWaker),
GcWaker(ts.gcWaker),
)
var err error
ts.Server, err = New(ctx, metrics.NewStore(), options...)
Expand Down
115 changes: 43 additions & 72 deletions internal/tailer/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Tailer struct {
logstreamsMu sync.RWMutex // protects `logstreams`.
logstreams map[string]logstream.LogStream // Map absolte pathname to logstream reading that pathname.

staleLogGcWaker waker.Waker // Used to wake stale log GC
gcWaker waker.Waker // Used to wake stale log and completion pollers

initDone chan struct{}
}
Expand Down Expand Up @@ -85,8 +85,8 @@ func (opt IgnoreRegex) apply(t *Tailer) error {
return t.SetIgnorePattern(string(opt))
}

// StaleLogGcWaker triggers garbage collection runs for stale logs in the tailer.
func StaleLogGcWaker(w waker.Waker) Option {
// GcWaker triggers garbage collection runs for stale logs in the tailer.
func GcWaker(w waker.Waker) Option {
return &staleLogGcWaker{w}
}

Expand All @@ -95,7 +95,7 @@ type staleLogGcWaker struct {
}

func (opt staleLogGcWaker) apply(t *Tailer) error {
t.staleLogGcWaker = opt.Waker
t.gcWaker = opt.Waker
return nil
}

Expand Down Expand Up @@ -158,8 +158,7 @@ func New(ctx context.Context, wg *sync.WaitGroup, lines chan<- *logline.LogLine,
}
}
// Start the routine for checking if logstreams have completed.
t.startPollLogStreamsForCompletion(ctx, wg)
t.StartStaleLogstreamExpirationLoop(ctx, wg)
t.StartGcPoller(ctx)

// This goroutine cancels the Tailer if all of our dependent subroutines are done.
// These are any live logstreams, and any log pattern pollers.
Expand Down Expand Up @@ -299,45 +298,6 @@ func (t *Tailer) TailPath(pathname string) error {
return nil
}

// ExpireStaleLogstreams removes logstreams that have had no reads for 1h or more.
func (t *Tailer) ExpireStaleLogstreams() error {
t.logstreamsMu.Lock()
defer t.logstreamsMu.Unlock()
for _, v := range t.logstreams {
if time.Since(v.LastReadTime()) > (time.Hour * 24) {
v.Stop()
}
}
return nil
}

// StartStaleLogstreamExpirationLoop runs a permanent goroutine to expire stale logstreams.
func (t *Tailer) StartStaleLogstreamExpirationLoop(ctx context.Context, wg *sync.WaitGroup) {
if t.staleLogGcWaker == nil {
glog.InfoContext(ctx, "Log handle expiration disabled")
return
}
wg.Add(1)
go func() {
defer wg.Done()
<-t.initDone
if t.oneShot {
glog.InfoContext(ctx, "No gc loop in oneshot mode.")
return
}
for {
select {
case <-ctx.Done():
return
case <-t.staleLogGcWaker.Wake():
if err := t.ExpireStaleLogstreams(); err != nil {
glog.Info(err)
}
}
}
}()
}

// pollLogPattern runs a permanent goroutine to poll for new log files that
// match `pattern`. It is on the subroutine waitgroup as we do not want to
// shut down the tailer when there are outstanding patterns to poll for.
Expand Down Expand Up @@ -396,47 +356,58 @@ func (t *Tailer) doPatternGlob(pattern string) error {
return nil
}

// PollLogStreamsForCompletion looks at the existing paths and checks if they're already
// complete, removing it from the map if so.
func (t *Tailer) PollLogStreamsForCompletion() error {
t.logstreamsMu.Lock()
defer t.logstreamsMu.Unlock()
for name, l := range t.logstreams {
if l.IsComplete() {
glog.Infof("%s is complete", name)
delete(t.logstreams, name)
logCount.Add(-1)
continue
}
}
return nil
}

// StartPollLogStreamsForCompletion runs a permanent goroutine to poll for
// completed LogStreams. It is on the parent waitgroup as it should exit only once the tailer is shutting down.
func (t *Tailer) startPollLogStreamsForCompletion(ctx context.Context, wg *sync.WaitGroup) {
// Uses the log pattern poll waker for maintenance.
if t.logPatternPollWaker == nil {
glog.InfoContext(ctx, "Log completion polling disabled by no waker")
// StartGcPoller runs a permanent goroutine to expire stale logstreams and clean up completed streams. This background goroutine isn't waited for during shutdown.
func (t *Tailer) StartGcPoller(ctx context.Context) {
if t.gcWaker == nil {
glog.InfoContext(ctx, "stream gc disabled because no waker")
return
}
wg.Add(1)
go func() {
defer wg.Done()
<-t.initDone
if t.oneShot {
glog.InfoContext(ctx, "No logstream completion polling loop in oneshot mode.")
glog.InfoContext(ctx, "No gc loop in oneshot mode.")
return
}
for {
select {
case <-ctx.Done():
return
case <-t.logPatternPollWaker.Wake():
if err := t.PollLogStreamsForCompletion(); err != nil {
case <-t.gcWaker.Wake():
if err := t.ExpireStaleLogstreams(); err != nil {
glog.Info(err)
}
if err := t.RemoveCompletedLogstreams(); err != nil {
glog.Info(err)
}
}
}
}()
}

// RemoveCompletedLogstreams checks if current logstreams have completed,
// removing it from the map if so.
func (t *Tailer) RemoveCompletedLogstreams() error {
t.logstreamsMu.Lock()
defer t.logstreamsMu.Unlock()
for name, l := range t.logstreams {
if l.IsComplete() {
glog.Infof("%s is complete", name)
delete(t.logstreams, name)
logCount.Add(-1)
continue
}
}
return nil
}

// ExpireStaleLogstreams removes logstreams that have had no reads for 1h or more.
func (t *Tailer) ExpireStaleLogstreams() error {
t.logstreamsMu.Lock()
defer t.logstreamsMu.Unlock()
for _, v := range t.logstreams {
if time.Since(v.LastReadTime()) > (time.Hour * 24) {
v.Stop()
}
}
return nil
}
8 changes: 4 additions & 4 deletions internal/tailer/tail_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,26 @@ func TestTailerOpenRetries(t *testing.T) {
if err := ta.TailPath(logfile); err == nil || !os.IsPermission(err) {
t.Fatalf("Expected a permission denied error here: %s", err)
}
testutil.FatalIfErr(t, ta.PollLogStreamsForCompletion())
testutil.FatalIfErr(t, ta.RemoveCompletedLogstreams())
ta.awakenPattern(1, 1)
glog.Info("remove")
if err := os.Remove(logfile); err != nil {
t.Fatal(err)
}
testutil.FatalIfErr(t, ta.PollLogStreamsForCompletion())
testutil.FatalIfErr(t, ta.RemoveCompletedLogstreams())
ta.awakenPattern(1, 1)
glog.Info("openfile")
f, err := os.OpenFile(logfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0)
//nolint:staticcheck // test code
defer f.Close()
testutil.FatalIfErr(t, err)
testutil.FatalIfErr(t, ta.PollLogStreamsForCompletion())
testutil.FatalIfErr(t, ta.RemoveCompletedLogstreams())
ta.awakenPattern(1, 1)
glog.Info("chmod")
if err := os.Chmod(logfile, 0o666); err != nil {
t.Fatal(err)
}
testutil.FatalIfErr(t, ta.PollLogStreamsForCompletion())
testutil.FatalIfErr(t, ta.RemoveCompletedLogstreams())
ta.awakenPattern(1, 1)
ta.awakenStreams(1, 1) // force sync to EOF
glog.Info("write string")
Expand Down

0 comments on commit 6e3f971

Please sign in to comment.