Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Use channel semantics to manage LogStream lifecycle in Tailer. #893

Merged
merged 11 commits into from
Jul 5, 2024
6 changes: 1 addition & 5 deletions cmd/mtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ var (
pollInterval = flag.Duration("poll_interval", 250*time.Millisecond, "Set the interval to poll each log file for data; must be positive, or zero to disable polling. With polling mode, only the files found at mtail startup will be polled.")
pollLogInterval = flag.Duration("poll_log_interval", 250*time.Millisecond, "Set the interval to find all matched log files for polling; must be positive, or zero to disable polling. With polling mode, only the files found at mtail startup will be polled.")
expiredMetricGcTickInterval = flag.Duration("expired_metrics_gc_interval", time.Hour, "interval between expired metric garbage collection runs")
staleLogGcTickInterval = flag.Duration("stale_log_gc_interval", time.Hour, "interval between stale log garbage collection runs")
metricPushInterval = flag.Duration("metric_push_interval", time.Minute, "interval between metric pushes to passive collectors")
maxRegexpLength = flag.Int("max_regexp_length", 1024, "The maximum length a mtail regexp expression can have. Excessively long patterns are likely to cause compilation and runtime performance problems.")
maxRecursionDepth = flag.Int("max_recursion_depth", 100, "The maximum length a mtail statement can be, as measured by parsed tokens. Excessively long mtail expressions are likely to cause compilation and runtime performance problems.")
Expand All @@ -83,6 +82,7 @@ var (
// Deprecated.
_ = flag.Bool("disable_fsnotify", true, "DEPRECATED: this flag is no longer in use.")
_ = flag.Int("metric_push_interval_seconds", 0, "DEPRECATED: use --metric_push_interval instead")
_ = flag.Duration("stale_log_gc_interval", time.Hour, "DEPRECATED: this flag is no longer in use")
)

func init() {
Expand Down Expand Up @@ -180,10 +180,6 @@ func main() {
if *logRuntimeErrors {
opts = append(opts, mtail.LogRuntimeErrors)
}
if *staleLogGcTickInterval > 0 {
staleLogGcWaker := waker.NewTimed(ctx, *staleLogGcTickInterval)
opts = append(opts, mtail.GcWaker(staleLogGcWaker))
}
if *pollInterval > 0 {
logStreamPollWaker := waker.NewTimed(ctx, *pollInterval)
logPatternPollWaker := waker.NewTimed(ctx, *pollLogInterval)
Expand Down
1 change: 0 additions & 1 deletion internal/mtail/log_deletion_integration_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,5 @@ func TestLogDeletion(t *testing.T) {

m.AwakenLogStreams(1, 0) // run stream to observe it's missing
logCloseCheck()
m.AwakenGcPoller(1, 1)
logCountCheck()
}
8 changes: 3 additions & 5 deletions internal/mtail/log_glob_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestGlobBeforeStart(t *testing.T) {
log.Close()
}
m, stopM := mtail.TestStartServer(t, 0, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")))
stopM()
defer stopM()

if r := m.GetExpvar("log_count"); r.(*expvar.Int).Value() != count {
t.Errorf("Expecting log count of %d, received %d", count, r)
Expand Down Expand Up @@ -142,8 +142,7 @@ func TestGlobIgnoreFolder(t *testing.T) {
}

m, stopM := mtail.TestStartServer(t, 0, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")), mtail.IgnoreRegexPattern("\\.gz"))

stopM()
defer stopM()

if r := m.GetExpvar("log_count"); r.(*expvar.Int).Value() != count {
t.Errorf("Expecting log count of %d, received %v", count, r)
Expand Down Expand Up @@ -184,8 +183,7 @@ func TestFilenameRegexIgnore(t *testing.T) {
}

m, stopM := mtail.TestStartServer(t, 0, 0, mtail.LogPathPatterns(filepath.Join(workdir, "log*")), mtail.IgnoreRegexPattern("\\.gz"))

stopM()
defer stopM()

if r := m.GetExpvar("log_count"); r.(*expvar.Int).Value() != count {
t.Errorf("Log count not matching, expected: %d received: %v", count, r)
Expand Down
4 changes: 1 addition & 3 deletions internal/mtail/log_rotation_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestLogRotationBySoftLinkChange(t *testing.T) {
defer trueLog2.Close()
m.AwakenPatternPollers(1, 1)
m.AwakenLogStreams(1, 1)
m.AwakenGcPoller(1, 1)
logClosedCheck := m.ExpectMapExpvarDeltaWithDeadline("log_closes_total", logFilepath, 1)
logCompletedCheck := m.ExpectExpvarDeltaWithDeadline("log_count", -1)
testutil.FatalIfErr(t, os.Remove(logFilepath))
Expand All @@ -63,8 +62,7 @@ func TestLogRotationBySoftLinkChange(t *testing.T) {
// existing stream.
m.AwakenPatternPollers(1, 1) // simulate race condition with this poll.
m.AwakenLogStreams(1, 0)
logClosedCheck() // barrier until filestream closes fd
m.AwakenGcPoller(1, 1)
logClosedCheck() // barrier until filestream closes fd
logCompletedCheck() // barrier until the logstream is removed from tailer
}
testutil.FatalIfErr(t, os.Symlink(logFilepath+".true2", logFilepath))
Expand Down
3 changes: 1 addition & 2 deletions internal/mtail/log_rotation_integration_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ func TestLogRotationByRename(t *testing.T) {
// existing stream.
m.AwakenPatternPollers(1, 1) // simulate race condition with this poll.
m.AwakenLogStreams(1, 0)
logClosedCheck() // barrier until filestream closes fd
m.AwakenGcPoller(1, 1)
logClosedCheck() // barrier until filestream closes fd
logCompletedCheck() // barrier until the logstream is removed from tailer
}
glog.Info("create")
Expand Down
14 changes: 0 additions & 14 deletions internal/mtail/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,6 @@ func (opt overrideLocation) apply(m *Server) error {
return nil
}

// GcWaker triggers garbage collection runs for stale logs in the tailer.
func GcWaker(w waker.Waker) Option {
return &gcWaker{w}
}

type gcWaker struct {
waker.Waker
}

func (opt gcWaker) apply(m *Server) error {
m.tOpts = append(m.tOpts, tailer.GcWaker(opt.Waker))
return nil
}

// LogPatternPollWaker triggers polls on the filesystem for new logs that match the log glob patterns.
func LogPatternPollWaker(w waker.Waker) Option {
return &logPatternPollWaker{w}
Expand Down
5 changes: 0 additions & 5 deletions internal/mtail/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ type TestServer struct {
// method, synchronising the pattern poll with the test.
AwakenPatternPollers waker.WakeFunc // the glob awakens

gcWaker waker.Waker // activate the cleanup routines
AwakenGcPoller waker.WakeFunc

tb testing.TB

cancel context.CancelFunc
Expand Down Expand Up @@ -68,11 +65,9 @@ func TestMakeServer(tb testing.TB, patternWakers int, streamWakers int, options
}
ts.streamWaker, ts.AwakenLogStreams = waker.NewTest(ctx, streamWakers, "streams")
ts.patternWaker, ts.AwakenPatternPollers = waker.NewTest(ctx, patternWakers, "patterns")
ts.gcWaker, ts.AwakenGcPoller = waker.NewTest(ctx, 1, "gc")
options = append(options,
LogstreamPollWaker(ts.streamWaker),
LogPatternPollWaker(ts.patternWaker),
GcWaker(ts.gcWaker),
)
var err error
ts.Server, err = New(ctx, metrics.NewStore(), options...)
Expand Down
30 changes: 8 additions & 22 deletions internal/tailer/logstream/dgramstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ type dgramStream struct {
address string // Given name for the underlying socket path on the filesystem or hostport.

mu sync.RWMutex // protects following fields
completed bool // This pipestream is completed and can no longer be used.
lastReadTime time.Time // Last time a log line was read from this named pipe

staleTimer *time.Timer // Expire the stream if no read in 24h
}

func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, oneShot OneShotMode) (LogStream, error) {
Expand All @@ -40,12 +41,6 @@ func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker,
return ss, nil
}

func (ds *dgramStream) LastReadTime() time.Time {
ds.mu.RLock()
defer ds.mu.RUnlock()
return ds.lastReadTime
}

// The read buffer size for datagrams.
const datagramReadBufferSize = 131072

Expand All @@ -71,11 +66,8 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
glog.Info(err)
}
logCloses.Add(ds.address, 1)
ds.mu.Lock()
ds.completed = true
close(ds.lines)
ds.mu.Unlock()
ds.Stop()
ds.cancel()
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -85,6 +77,10 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
n, _, err := c.ReadFrom(b)
glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ds.scheme, ds.address, n, err)

if ds.staleTimer != nil {
ds.staleTimer.Stop()
}

// This is a test-only trick that says if we've already put this
// logstream in graceful shutdown, then a zero-byte read is
// equivalent to an "EOF" in connection and file oriented streams.
Expand Down Expand Up @@ -114,6 +110,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
ds.mu.Lock()
ds.lastReadTime = time.Now()
ds.mu.Unlock()
ds.staleTimer = time.AfterFunc(time.Hour*24, ds.cancel)
}

if err != nil && IsEndOrCancel(err) {
Expand Down Expand Up @@ -143,17 +140,6 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
return nil
}

func (ds *dgramStream) IsComplete() bool {
ds.mu.RLock()
defer ds.mu.RUnlock()
return ds.completed
}

func (ds *dgramStream) Stop() {
glog.V(2).Infof("stream(%s:%s): Stop received on datagram stream.", ds.scheme, ds.address)
ds.cancel()
}

// Lines implements the LogStream interface, returning the output lines channel.
func (ds *dgramStream) Lines() <-chan *logline.LogLine {
return ds.lines
Expand Down
10 changes: 4 additions & 6 deletions internal/tailer/logstream/dgramstream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
}

ctx, cancel := context.WithCancel(context.Background())
// Stream is not shut down with cancel in this test
defer cancel()
waker, awaken := waker.NewTest(ctx, 1, "stream")

sockName := scheme + "://" + addr
Expand Down Expand Up @@ -67,10 +69,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {

checkLineDiff()

cancel()
wg.Wait()

if !ds.IsComplete() {
if v := <-ds.Lines(); v != nil {
t.Errorf("expecting dgramstream to be complete because socket closed")
}
}))
Expand Down Expand Up @@ -115,12 +114,11 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {
awaken(0, 0) // Synchronise past read.

cancel() // This cancellation should cause the stream to shut down.

wg.Wait()

checkLineDiff()

if !ds.IsComplete() {
if v := <-ds.Lines(); v != nil {
t.Errorf("expecting dgramstream to be complete because cancel")
}
}))
Expand Down
37 changes: 8 additions & 29 deletions internal/tailer/logstream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type fileStream struct {

mu sync.RWMutex // protects following fields.
lastReadTime time.Time // Last time a log line was read from this file
completed bool // The filestream is completed and can no longer be used.

staleTimer *time.Timer // Expire the stream if no read in 24h
}

// newFileStream creates a new log stream from a regular file.
Expand All @@ -57,12 +58,6 @@ func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, p
return fs, nil
}

func (fs *fileStream) LastReadTime() time.Time {
fs.mu.RLock()
defer fs.mu.RUnlock()
return fs.lastReadTime
}

func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, fi os.FileInfo, oneShot OneShotMode, streamFromStart bool) error {
fd, err := os.OpenFile(fs.pathname, os.O_RDONLY, 0o600)
if err != nil {
Expand Down Expand Up @@ -107,6 +102,10 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
count, err := fd.Read(b)
glog.V(2).Infof("stream(%s): read %d bytes, err is %v", fs.pathname, count, err)

if fs.staleTimer != nil {
fs.staleTimer.Stop()
}

if count > 0 {
total += count
glog.V(2).Infof("stream(%s): decode and send", fs.pathname)
Expand All @@ -121,6 +120,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
fs.mu.Lock()
fs.lastReadTime = time.Now()
fs.mu.Unlock()
fs.staleTimer = time.AfterFunc(time.Hour*24, fs.cancel)
}

if err != nil && err != io.EOF {
Expand Down Expand Up @@ -154,17 +154,13 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
// file is in the middle of a rotation and gets recreated
// in the next moment. We can't rely on the Tailer to tell
// us we're deleted because the tailer can only tell us to
// cancel, which ends up causing us to race here against
// detection of IsCompleted.
// cancel.
if os.IsNotExist(serr) {
glog.V(2).Infof("stream(%s): source no longer exists, exiting", fs.pathname)
if partial.Len() > 0 {
sendLine(ctx, fs.pathname, partial, fs.lines)
}
fs.mu.Lock()
fs.completed = true
close(fs.lines)
fs.mu.Unlock()
return
}
logErrors.Add(fs.pathname, 1)
Expand Down Expand Up @@ -226,10 +222,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
if partial.Len() > 0 {
sendLine(ctx, fs.pathname, partial, fs.lines)
}
fs.mu.Lock()
fs.completed = true
close(fs.lines)
fs.mu.Unlock()
return
}
select {
Expand All @@ -238,10 +231,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
if partial.Len() > 0 {
sendLine(ctx, fs.pathname, partial, fs.lines)
}
fs.mu.Lock()
fs.completed = true
close(fs.lines)
fs.mu.Unlock()
return
default:
// keep going
Expand Down Expand Up @@ -273,17 +263,6 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
return nil
}

func (fs *fileStream) IsComplete() bool {
fs.mu.RLock()
defer fs.mu.RUnlock()
return fs.completed
}

// Stop implements the LogStream interface.
func (fs *fileStream) Stop() {
fs.cancel()
}

// Lines implements the LogStream interface, returning the output lines channel.
func (fs *fileStream) Lines() <-chan *logline.LogLine {
return fs.lines
Expand Down
Loading
Loading