diff --git a/filebeat/input/log/reader.go b/filebeat/input/log/reader.go index ceffc6c19ee3..09a2d6d22822 100644 --- a/filebeat/input/log/reader.go +++ b/filebeat/input/log/reader.go @@ -358,9 +358,11 @@ func (h *FileHarvester) Run() { case <-tick.C: if len(newForwarders) > 0 { logp.Info("found new forward join, reload file:%s", h.state.Source) + h.forwardersLock.Lock() for _, reuseReader := range newForwarders { h.forwarders[reuseReader.HarvesterID] = reuseReader } + h.forwardersLock.Unlock() newForwarders = make([]*ReuseHarvester, 0) offset, err := h.reloadFileOffset() @@ -378,6 +380,7 @@ func (h *FileHarvester) Run() { h.readerDone.Add(1) go h.loopRead() } else { + h.forwardersLock.Lock() for _, reuseReader := range h.forwarders { select { case <-reuseReader.done: @@ -386,6 +389,7 @@ func (h *FileHarvester) Run() { default: } } + h.forwardersLock.Unlock() } if len(h.forwarders) > 0 { @@ -449,8 +453,13 @@ func (h *FileHarvester) forward(message reader.Message, err error) { message: message, error: err, } - + reuseReaders := make([]*ReuseHarvester, 0) + h.forwardersLock.Lock() for _, reuseReader := range h.forwarders { + reuseReaders = append(reuseReaders, reuseReader) + } + h.forwardersLock.Unlock() + for _, reuseReader := range reuseReaders { select { case <-h.done: return