Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 153 additions & 16 deletions tail.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
// Copyright (c) 2019 FOSS contributors of https://github.com/nxadm/tail
// Copyright (c) 2015 HPE Software Inc. All rights reserved.
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.

//nxadm/tail provides a Go library that emulates the features of the BSD `tail`
//program. The library comes with full support for truncation/move detection as
//it is designed to work with log rotation tools. The library works on all
//operating systems supported by Go, including POSIX systems like Linux and
//*BSD, and MS Windows. Go 1.9 is the oldest compiler release supported.
// nxadm/tail provides a Go library that emulates the features of the BSD `tail`
// program. The library comes with full support for truncation/move detection as
// it is designed to work with log rotation tools. The library works on all
// operating systems supported by Go, including POSIX systems like Linux and
// *BSD, and MS Windows. Go 1.9 is the oldest compiler release supported.
package tail

import (
Expand Down Expand Up @@ -67,19 +66,29 @@ type logger interface {
Println(v ...interface{})
}

type ReOpenBackoff struct {
Enable bool // Whether to enable the reopen backoff
MaxBackoff time.Duration // The maximum backoff amount. Default is 1s. Set to a negative value to uncap back off time.
MaxAttempts int // The maximum number of back off attempts. Default is 10. Set to a negative value for unlimited backoffs.
InitialBackoff time.Duration // The starting backoff duration. Default is 100ms.
}

// Config is used to specify how a file must be tailed.
type Config struct {
// File-specifc
Location *SeekInfo // Tail from this location. If nil, start at the beginning of the file
ReOpen bool // Reopen recreated files (tail -F)
MustExist bool // Fail early if the file does not exist
Poll bool // Poll for file changes instead of using the default inotify
Pipe bool // The file is a named pipe (mkfifo)
Location *SeekInfo // Tail from this location. If nil, start at the beginning of the file
ReOpen bool // Reopen recreated files (tail -F)
MustExist bool // Fail early if the file does not exist
Poll bool // Poll for file changes instead of using the default inotify
Pipe bool // The file is a named pipe (mkfifo)
ReOpenBackoff ReOpenBackoff

// Generic IO
Follow bool // Continue looking for new lines (tail -f)
MaxLineSize int // If non-zero, split longer lines into multiple lines
CompleteLines bool // Only return complete lines (that end with "\n" or EOF when Follow is false)
Follow bool // Continue looking for new lines (tail -f)
MaxLineSize int // If non-zero, split longer lines into multiple lines
CompleteLines bool // Only return complete lines (that end with "\n" or EOF when Follow is false)
KeepBufferOnReOpen bool // Keep line buffer accross reopening a file.
CloseWhileWaiting bool // Whether or not to close the file while waiting for newlines.

// Optionally, use a ratelimiter (e.g. created by the ratelimiter/NewLeakyBucket function)
RateLimiter *ratelimiter.LeakyBucket
Expand All @@ -98,6 +107,11 @@ type Tail struct {
reader *bufio.Reader
lineNum int

// Values to handle closing during wait
lastOffset int64
lastLineNum int
keepBuffer bool

lineBuf *strings.Builder

watcher watch.FileWatcher
Expand All @@ -115,6 +129,26 @@ var (
DiscardingLogger = log.New(ioutil.Discard, "", 0)
)

func validateBackoffConfig(bc ReOpenBackoff) ReOpenBackoff {
if !bc.Enable {
return bc
}

if bc.MaxAttempts == 0 {
bc.MaxAttempts = 10
}

if bc.InitialBackoff <= 0 {
bc.InitialBackoff = 100 * time.Millisecond
}

if bc.MaxBackoff == 0 {
bc.MaxBackoff = 10 * time.Second
}

return bc
}

// TailFile begins tailing the file. And returns a pointer to a Tail struct
// and an error. An output stream is made available via the Tail.Lines
// channel (e.g. to be looped and printed). To handle errors during tailing,
Expand All @@ -125,6 +159,8 @@ func TailFile(filename string, config Config) (*Tail, error) {
util.Fatal("cannot set ReOpen without Follow.")
}

config.ReOpenBackoff = validateBackoffConfig(config.ReOpenBackoff)

t := &Tail{
Filename: filename,
Lines: make(chan *Line),
Expand Down Expand Up @@ -208,15 +244,61 @@ func (tail *Tail) closeFile() {
}
}

func (tail *Tail) openFileWithBackoff() (*os.File, error) {
f, err := OpenFile(tail.Filename)
if err != nil && os.IsNotExist(err) {
return nil, err
}
if err == nil {
return f, nil
}

if !tail.ReOpenBackoff.Enable || tail.ReOpenBackoff.MaxAttempts == 0 {
return nil, err
}

curBackoff := tail.ReOpenBackoff.InitialBackoff
currAttempt := 0

var lastErr error = err

for {
if tail.ReOpenBackoff.MaxAttempts > 0 &&
tail.ReOpenBackoff.MaxAttempts < currAttempt {
return nil, lastErr
}
select {
case <-tail.Dying():
return nil, ErrStop
case <-time.After(curBackoff):
}

currAttempt++
f, err := OpenFile(tail.Filename)
if err != nil && os.IsNotExist(err) {
return nil, err
}
if err == nil {
return f, nil
}
lastErr = err
curBackoff = curBackoff * 2
if tail.ReOpenBackoff.MaxBackoff <= 0 &&
curBackoff > tail.ReOpenBackoff.MaxBackoff {
curBackoff = tail.ReOpenBackoff.MaxBackoff
}
}
}

func (tail *Tail) reopen() error {
if tail.lineBuf != nil {
if !tail.keepBuffer && !tail.KeepBufferOnReOpen && tail.lineBuf != nil {
tail.lineBuf.Reset()
}
tail.closeFile()
tail.lineNum = 0
for {
var err error
tail.file, err = OpenFile(tail.Filename)
tail.file, err = tail.openFileWithBackoff()
if err != nil {
if os.IsNotExist(err) {
tail.Logger.Printf("Waiting for %s to appear...", tail.Filename)
Expand Down Expand Up @@ -267,6 +349,36 @@ func (tail *Tail) readLine() (string, error) {
}
}

func (tail *Tail) waitCloseFile() {
tail.lastLineNum = tail.lineNum
loc, err := tail.Tell()
tail.closeFile()

if err != nil {
return
}

tail.lastOffset = loc
}

func (tail *Tail) waitReopenFile() {
err := tail.reopen()
tail.lineNum = tail.lastLineNum
if err != nil {
if err != tomb.ErrDying {
tail.Kill(err)
}
return
}

_, err = tail.file.Seek(tail.lastOffset, io.SeekStart)
if err != nil {
tail.Killf("Seek error on %s: %s", tail.Filename, err)
return
}
tail.openReader()
}

func (tail *Tail) tailFileSync() {
defer tail.Done()
defer tail.close()
Expand Down Expand Up @@ -314,6 +426,7 @@ func (tail *Tail) tailFileSync() {
// file when rate limit is reached.
msg := ("Too much log activity; waiting a second before resuming tailing")
offset, _ := tail.Tell()
tail.lastOffset = offset
tail.Lines <- &Line{msg, tail.lineNum, SeekInfo{Offset: offset}, time.Now(), errors.New(msg)}
select {
case <-time.After(time.Second):
Expand Down Expand Up @@ -377,16 +490,33 @@ func (tail *Tail) waitForChanges() error {
if err != nil {
return err
}
tail.lastOffset = pos
tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos)
if err != nil {
return err
}
}

if tail.CloseWhileWaiting {
// Since we are just waiting for the file to update, we don't want
// to reset our line buffer
tail.keepBuffer = true
tail.waitCloseFile()
}

select {
case <-tail.changes.Modified:
if tail.CloseWhileWaiting {
tail.waitReopenFile()
// Now that the file is re-opened, we want to keep the normal
// line buffer reset logic
tail.keepBuffer = false
}
return nil
case <-tail.changes.Deleted:
// If the event is not an updated modification, we want to use the
// normal line buffer reset logic
tail.keepBuffer = false
tail.changes = nil
if tail.ReOpen {
// XXX: we must not log from a library.
Expand All @@ -401,6 +531,9 @@ func (tail *Tail) waitForChanges() error {
tail.Logger.Printf("Stopping tail as file no longer exists: %s", tail.Filename)
return ErrStop
case <-tail.changes.Truncated:
// If the event is not an updated modification, we want to use the
// normal line buffer reset logic
tail.keepBuffer = false
// Always reopen truncated files (Follow is true)
tail.Logger.Printf("Re-opening truncated file %s ...", tail.Filename)
if err := tail.reopen(); err != nil {
Expand All @@ -410,6 +543,9 @@ func (tail *Tail) waitForChanges() error {
tail.openReader()
return nil
case <-tail.Dying():
// If the event is not an updated modification, we want to use the
// normal line buffer reset logic
tail.keepBuffer = false
return ErrStop
}
}
Expand Down Expand Up @@ -453,6 +589,7 @@ func (tail *Tail) sendLine(line string) bool {
for _, line := range lines {
tail.lineNum++
offset, _ := tail.Tell()
tail.lastOffset = offset
select {
case tail.Lines <- &Line{line, tail.lineNum, SeekInfo{Offset: offset}, now, nil}:
case <-tail.Dying():
Expand Down
Loading