Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: LogStreams create and close their own lines channels. #889

Merged
merged 1 commit into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 46 additions & 40 deletions internal/tailer/logstream/dgramstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
type dgramStream struct {
cancel context.CancelFunc

lines chan<- *logline.LogLine
lines chan *logline.LogLine

scheme string // Datagram scheme, either "unixgram" or "udp".
address string // Given name for the underlying socket path on the filesystem or hostport.
Expand All @@ -28,78 +28,79 @@ type dgramStream struct {
lastReadTime time.Time // Last time a log line was read from this named pipe
}

func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) {
func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, oneShot OneShotMode) (LogStream, error) {
if address == "" {
return nil, ErrEmptySocketAddress
}
ctx, cancel := context.WithCancel(ctx)
ss := &dgramStream{cancel: cancel, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines}
ss := &dgramStream{cancel: cancel, scheme: scheme, address: address, lastReadTime: time.Now(), lines: make(chan *logline.LogLine)}
if err := ss.stream(ctx, wg, waker, oneShot); err != nil {
return nil, err
}
return ss, nil
}

func (ss *dgramStream) LastReadTime() time.Time {
ss.mu.RLock()
defer ss.mu.RUnlock()
return ss.lastReadTime
func (ds *dgramStream) LastReadTime() time.Time {
ds.mu.RLock()
defer ds.mu.RUnlock()
return ds.lastReadTime
}

// The read buffer size for datagrams.
const datagramReadBufferSize = 131072

func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, oneShot OneShotMode) error {
c, err := net.ListenPacket(ss.scheme, ss.address)
func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, oneShot OneShotMode) error {
c, err := net.ListenPacket(ds.scheme, ds.address)
if err != nil {
logErrors.Add(ss.address, 1)
logErrors.Add(ds.address, 1)
return err
}
glog.V(2).Infof("stream(%s:%s): opened new datagram socket %v", ss.scheme, ss.address, c)
glog.V(2).Infof("stream(%s:%s): opened new datagram socket %v", ds.scheme, ds.address, c)
b := make([]byte, datagramReadBufferSize)
partial := bytes.NewBufferString("")
var total int
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
glog.V(2).Infof("stream(%s:%s): read total %d bytes", ss.scheme, ss.address, total)
glog.V(2).Infof("stream(%s:%s): closing connection", ss.scheme, ss.address)
glog.V(2).Infof("stream(%s:%s): read total %d bytes", ds.scheme, ds.address, total)
glog.V(2).Infof("stream(%s:%s): closing connection", ds.scheme, ds.address)
err := c.Close()
if err != nil {
logErrors.Add(ss.address, 1)
logErrors.Add(ds.address, 1)
glog.Info(err)
}
logCloses.Add(ss.address, 1)
ss.mu.Lock()
ss.completed = true
ss.mu.Unlock()
ss.Stop()
logCloses.Add(ds.address, 1)
ds.mu.Lock()
ds.completed = true
close(ds.lines)
ds.mu.Unlock()
ds.Stop()
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
SetReadDeadlineOnDone(ctx, c)

for {
n, _, err := c.ReadFrom(b)
glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ss.scheme, ss.address, n, err)
glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ds.scheme, ds.address, n, err)

// This is a test-only trick that says if we've already put this
// logstream in graceful shutdown, then a zero-byte read is
// equivalent to an "EOF" in connection and file oriented streams.
if n == 0 {
if oneShot {
glog.V(2).Infof("stream(%s:%s): exiting because zero byte read and one shot", ss.scheme, ss.address)
glog.V(2).Infof("stream(%s:%s): exiting because zero byte read and one shot", ds.scheme, ds.address)
if partial.Len() > 0 {
sendLine(ctx, ss.address, partial, ss.lines)
sendLine(ctx, ds.address, partial, ds.lines)
}
return
}
select {
case <-ctx.Done():
glog.V(2).Infof("stream(%s:%s): exiting because zero byte read after cancellation", ss.scheme, ss.address)
glog.V(2).Infof("stream(%s:%s): exiting because zero byte read after cancellation", ds.scheme, ds.address)
if partial.Len() > 0 {
sendLine(ctx, ss.address, partial, ss.lines)
sendLine(ctx, ds.address, partial, ds.lines)
}
return
default:
Expand All @@ -109,46 +110,51 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
if n > 0 {
total += n
//nolint:contextcheck
decodeAndSend(ctx, ss.lines, ss.address, n, b[:n], partial)
ss.mu.Lock()
ss.lastReadTime = time.Now()
ss.mu.Unlock()
decodeAndSend(ctx, ds.lines, ds.address, n, b[:n], partial)
ds.mu.Lock()
ds.lastReadTime = time.Now()
ds.mu.Unlock()
}

if err != nil && IsEndOrCancel(err) {
if partial.Len() > 0 {
sendLine(ctx, ss.address, partial, ss.lines)
sendLine(ctx, ds.address, partial, ds.lines)
}
glog.V(2).Infof("stream(%s:%s): exiting, stream has error %s", ss.scheme, ss.address, err)
glog.V(2).Infof("stream(%s:%s): exiting, stream has error %s", ds.scheme, ds.address, err)
return
}

// Yield and wait
glog.V(2).Infof("stream(%s:%s): waiting", ss.scheme, ss.address)
glog.V(2).Infof("stream(%s:%s): waiting", ds.scheme, ds.address)
select {
case <-ctx.Done():
// We may have started waiting here when the stop signal
// arrives, but since that wait the file may have been
// written to. The file is not technically yet at EOF so
// we need to go back and try one more read. We'll exit
// the stream in the zero byte handler above.
glog.V(2).Infof("stream(%s:%s): Stopping after next zero byte read", ss.scheme, ss.address)
glog.V(2).Infof("stream(%s:%s): Stopping after next zero byte read", ds.scheme, ds.address)
case <-waker.Wake():
// sleep until next Wake()
glog.V(2).Infof("stream(%s:%s): Wake received", ss.scheme, ss.address)
glog.V(2).Infof("stream(%s:%s): Wake received", ds.scheme, ds.address)
}
}
}()
return nil
}

func (ss *dgramStream) IsComplete() bool {
ss.mu.RLock()
defer ss.mu.RUnlock()
return ss.completed
func (ds *dgramStream) IsComplete() bool {
ds.mu.RLock()
defer ds.mu.RUnlock()
return ds.completed
}

func (ss *dgramStream) Stop() {
glog.V(2).Infof("stream(%s:%s): Stop received on datagram stream.", ss.scheme, ss.address)
ss.cancel()
func (ds *dgramStream) Stop() {
glog.V(2).Infof("stream(%s:%s): Stop received on datagram stream.", ds.scheme, ds.address)
ds.cancel()
}

// Lines implements the LogStream interface, returning the output lines channel.
func (ds *dgramStream) Lines() <-chan *logline.LogLine {
return ds.lines
}
36 changes: 18 additions & 18 deletions internal/tailer/logstream/dgramstream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,19 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
default:
t.Fatalf("bad scheme %s", scheme)
}
lines := make(chan *logline.LogLine, 1)

ctx, cancel := context.WithCancel(context.Background())
waker, awaken := waker.NewTest(ctx, 1, "stream")

sockName := scheme + "://" + addr
ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotEnabled)
ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotEnabled)
testutil.FatalIfErr(t, err)

expected := []*logline.LogLine{
{Context: context.TODO(), Filename: addr, Line: "1"},
}
checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ds.Lines())

s, err := net.Dial(scheme, addr)
testutil.FatalIfErr(t, err)

Expand All @@ -59,18 +64,13 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
testutil.FatalIfErr(t, err)

wg.Wait()
close(lines)

received := testutil.LinesReceived(lines)
expected := []*logline.LogLine{
{Context: context.TODO(), Filename: addr, Line: "1"},
}
testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context"))
checkLineDiff()

cancel()
wg.Wait()

if !ss.IsComplete() {
if !ds.IsComplete() {
t.Errorf("expecting dgramstream to be complete because socket closed")
}
}))
Expand All @@ -93,14 +93,19 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {
default:
t.Fatalf("bad scheme %s", scheme)
}
lines := make(chan *logline.LogLine, 1)

ctx, cancel := context.WithCancel(context.Background())
waker, awaken := waker.NewTest(ctx, 1, "stream")

sockName := scheme + "://" + addr
ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotDisabled)
ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotDisabled)
testutil.FatalIfErr(t, err)

expected := []*logline.LogLine{
{Context: context.TODO(), Filename: addr, Line: "1"},
}
checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ds.Lines())

s, err := net.Dial(scheme, addr)
testutil.FatalIfErr(t, err)

Expand All @@ -112,15 +117,10 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {
cancel() // This cancellation should cause the stream to shut down.

wg.Wait()
close(lines)

received := testutil.LinesReceived(lines)
expected := []*logline.LogLine{
{Context: context.TODO(), Filename: addr, Line: "1"},
}
testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context"))
checkLineDiff()

if !ss.IsComplete() {
if !ds.IsComplete() {
t.Errorf("expecting dgramstream to be complete because cancel")
}
}))
Expand Down
14 changes: 11 additions & 3 deletions internal/tailer/logstream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var fileTruncates = expvar.NewMap("file_truncates_total")
type fileStream struct {
cancel context.CancelFunc

lines chan<- *logline.LogLine
lines chan *logline.LogLine

pathname string // Given name for the underlying file on the filesystem

Expand All @@ -46,9 +46,9 @@ type fileStream struct {
}

// newFileStream creates a new log stream from a regular file.
func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) {
func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo, oneShot OneShotMode) (LogStream, error) {
ctx, cancel := context.WithCancel(ctx)
fs := &fileStream{cancel: cancel, pathname: pathname, lastReadTime: time.Now(), lines: lines}
fs := &fileStream{cancel: cancel, pathname: pathname, lastReadTime: time.Now(), lines: make(chan *logline.LogLine)}
// Stream from the start of the file when in one shot mode.
streamFromStart := oneShot == OneShotEnabled
if err := fs.stream(ctx, wg, waker, fi, oneShot, streamFromStart); err != nil {
Expand Down Expand Up @@ -163,6 +163,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
}
fs.mu.Lock()
fs.completed = true
close(fs.lines)
fs.mu.Unlock()
return
}
Expand Down Expand Up @@ -227,6 +228,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
}
fs.mu.Lock()
fs.completed = true
close(fs.lines)
fs.mu.Unlock()
return
}
Expand All @@ -238,6 +240,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
}
fs.mu.Lock()
fs.completed = true
close(fs.lines)
fs.mu.Unlock()
return
default:
Expand Down Expand Up @@ -280,3 +283,8 @@ func (fs *fileStream) IsComplete() bool {
func (fs *fileStream) Stop() {
fs.cancel()
}

// Lines implements the LogStream interface, returning the output lines channel.
func (fs *fileStream) Lines() <-chan *logline.LogLine {
return fs.lines
}
Loading
Loading