Skip to content

Commit ae5f664

Browse files
committed
daemon/logger: open log reader synchronously
The asynchronous startup of the log-reading goroutine made the follow-tail tests nondeterministic. The Log calls in the tests which were supposed to happen after the reader started reading would sometimes execute before the reader, throwing off the counts. Tweak the ReadLogs implementation so that the order of operations is deterministic. Signed-off-by: Cory Snider <[email protected]>
1 parent 9aa9d6f commit ae5f664

6 files changed

Lines changed: 66 additions & 109 deletions

File tree

daemon/logger/jsonfilelog/jsonfilelog.go

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"encoding/json"
99
"fmt"
1010
"strconv"
11-
"sync"
1211

1312
"github.com/docker/docker/daemon/logger"
1413
"github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog"
@@ -22,11 +21,8 @@ const Name = "json-file"
2221

2322
// JSONFileLogger is Logger implementation for default Docker logging.
2423
type JSONFileLogger struct {
25-
mu sync.Mutex
26-
closed bool
27-
writer *loggerutils.LogFile
28-
readers map[*logger.LogWatcher]struct{} // stores the active log followers
29-
tag string // tag values requested by the user to log
24+
writer *loggerutils.LogFile
25+
tag string // tag values requested by the user to log
3026
}
3127

3228
func init() {
@@ -115,18 +111,14 @@ func New(info logger.Info) (logger.Logger, error) {
115111
}
116112

117113
return &JSONFileLogger{
118-
writer: writer,
119-
readers: make(map[*logger.LogWatcher]struct{}),
120-
tag: tag,
114+
writer: writer,
115+
tag: tag,
121116
}, nil
122117
}
123118

124119
// Log converts logger.Message to jsonlog.JSONLog and serializes it to file.
125120
func (l *JSONFileLogger) Log(msg *logger.Message) error {
126-
l.mu.Lock()
127-
err := l.writer.WriteLogEntry(msg)
128-
l.mu.Unlock()
129-
return err
121+
return l.writer.WriteLogEntry(msg)
130122
}
131123

132124
func marshalMessage(msg *logger.Message, extra json.RawMessage, buf *bytes.Buffer) error {
@@ -169,15 +161,7 @@ func ValidateLogOpt(cfg map[string]string) error {
169161
// Close closes underlying file and signals all the readers
170162
// that the logs producer is gone.
171163
func (l *JSONFileLogger) Close() error {
172-
l.mu.Lock()
173-
l.closed = true
174-
err := l.writer.Close()
175-
for r := range l.readers {
176-
r.ProducerGone()
177-
delete(l.readers, r)
178-
}
179-
l.mu.Unlock()
180-
return err
164+
return l.writer.Close()
181165
}
182166

183167
// Name returns name of this logger.

daemon/logger/jsonfilelog/read.go

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,7 @@ const maxJSONDecodeRetry = 20000
1818
// ReadLogs implements the logger's LogReader interface for the logs
1919
// created by this driver.
2020
func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
21-
logWatcher := logger.NewLogWatcher()
22-
23-
go l.readLogs(logWatcher, config)
24-
return logWatcher
25-
}
26-
27-
func (l *JSONFileLogger) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
28-
defer close(watcher.Msg)
29-
30-
l.mu.Lock()
31-
l.readers[watcher] = struct{}{}
32-
l.mu.Unlock()
33-
34-
l.writer.ReadLogs(config, watcher)
35-
36-
l.mu.Lock()
37-
delete(l.readers, watcher)
38-
l.mu.Unlock()
21+
return l.writer.ReadLogs(config)
3922
}
4023

4124
func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {

daemon/logger/local/local.go

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"encoding/binary"
55
"io"
66
"strconv"
7-
"sync"
87
"time"
98

109
"github.com/docker/docker/api/types/backend"
@@ -56,10 +55,7 @@ func init() {
5655
}
5756

5857
type driver struct {
59-
mu sync.Mutex
60-
closed bool
6158
logfile *loggerutils.LogFile
62-
readers map[*logger.LogWatcher]struct{} // stores the active log followers
6359
}
6460

6561
// New creates a new local logger
@@ -145,7 +141,6 @@ func newDriver(logPath string, cfg *CreateConfig) (logger.Logger, error) {
145141
}
146142
return &driver{
147143
logfile: lf,
148-
readers: make(map[*logger.LogWatcher]struct{}),
149144
}, nil
150145
}
151146

@@ -154,22 +149,11 @@ func (d *driver) Name() string {
154149
}
155150

156151
func (d *driver) Log(msg *logger.Message) error {
157-
d.mu.Lock()
158-
err := d.logfile.WriteLogEntry(msg)
159-
d.mu.Unlock()
160-
return err
152+
return d.logfile.WriteLogEntry(msg)
161153
}
162154

163155
func (d *driver) Close() error {
164-
d.mu.Lock()
165-
d.closed = true
166-
err := d.logfile.Close()
167-
for r := range d.readers {
168-
r.ProducerGone()
169-
delete(d.readers, r)
170-
}
171-
d.mu.Unlock()
172-
return err
156+
return d.logfile.Close()
173157
}
174158

175159
func messageToProto(msg *logger.Message, proto *logdriver.LogEntry, partial *logdriver.PartialLogEntryMetadata) {

daemon/logger/local/read.go

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,7 @@ import (
1919
const maxMsgLen int = 1e6 // 1MB.
2020

2121
func (d *driver) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
22-
logWatcher := logger.NewLogWatcher()
23-
24-
go d.readLogs(logWatcher, config)
25-
return logWatcher
26-
}
27-
28-
func (d *driver) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
29-
defer close(watcher.Msg)
30-
31-
d.mu.Lock()
32-
d.readers[watcher] = struct{}{}
33-
d.mu.Unlock()
34-
35-
d.logfile.ReadLogs(config, watcher)
36-
37-
d.mu.Lock()
38-
delete(d.readers, watcher)
39-
d.mu.Unlock()
22+
return d.logfile.ReadLogs(config)
4023
}
4124

4225
func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) {

daemon/logger/loggerutils/logfile.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ type LogFile struct {
8787
lastTimestamp time.Time // timestamp of the last log
8888
filesRefCounter refCounter // keep reference-counted of decompressed files
8989
notifyReaders *pubsub.Publisher
90+
readers map[*logger.LogWatcher]struct{} // stores the active log followers
9091
marshal logger.MarshalFunc
9192
createDecoder MakeDecoderFn
9293
getTailReader GetTailReaderFunc
@@ -141,6 +142,7 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar
141142
compress: compress,
142143
filesRefCounter: refCounter{counter: make(map[string]int)},
143144
notifyReaders: pubsub.NewPublisher(0, 1),
145+
readers: make(map[*logger.LogWatcher]struct{}),
144146
marshal: marshaller,
145147
createDecoder: decodeFunc,
146148
perms: perms,
@@ -342,6 +344,10 @@ func (w *LogFile) Close() error {
342344
if w.closed {
343345
return nil
344346
}
347+
for r := range w.readers {
348+
r.ProducerGone()
349+
delete(w.readers, r)
350+
}
345351
if err := w.f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
346352
return err
347353
}
@@ -353,8 +359,29 @@ func (w *LogFile) Close() error {
353359
//
354360
// Note: Using the follow option can become inconsistent in cases with very frequent rotations and max log files is 1.
355361
// TODO: Consider a different implementation which can effectively follow logs under frequent rotations.
356-
func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher) {
362+
func (w *LogFile) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
363+
watcher := logger.NewLogWatcher()
364+
w.mu.Lock()
365+
w.readers[watcher] = struct{}{}
366+
w.mu.Unlock()
367+
368+
// Lock before starting the reader goroutine to synchronize operations
369+
// for race-free unit testing. The writer is locked out until the reader
370+
// has opened the log file and set the read cursor to the current
371+
// position.
357372
w.mu.RLock()
373+
go w.readLogsLocked(config, watcher)
374+
return watcher
375+
}
376+
377+
func (w *LogFile) readLogsLocked(config logger.ReadConfig, watcher *logger.LogWatcher) {
378+
defer func() {
379+
close(watcher.Msg)
380+
w.mu.Lock()
381+
delete(w.readers, watcher)
382+
w.mu.Unlock()
383+
}()
384+
358385
currentFile, err := open(w.f.Name())
359386
if err != nil {
360387
w.mu.RUnlock()

daemon/logger/loggerutils/logfile_test.go

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"time"
1616

1717
"github.com/docker/docker/daemon/logger"
18-
"github.com/docker/docker/pkg/pubsub"
1918
"github.com/docker/docker/pkg/tailfile"
2019
"gotest.tools/v3/assert"
2120
"gotest.tools/v3/poll"
@@ -247,60 +246,57 @@ func TestFollowLogsProducerGone(t *testing.T) {
247246
}
248247

249248
func TestCheckCapacityAndRotate(t *testing.T) {
250-
dir, err := os.MkdirTemp("", t.Name())
251-
assert.NilError(t, err)
252-
defer os.RemoveAll(dir)
253-
254-
f, err := os.CreateTemp(dir, "log")
255-
assert.NilError(t, err)
249+
dir := t.TempDir()
256250

257-
l := &LogFile{
258-
f: f,
259-
capacity: 5,
260-
maxFiles: 3,
261-
compress: true,
262-
notifyReaders: pubsub.NewPublisher(0, 1),
263-
perms: 0600,
264-
filesRefCounter: refCounter{counter: make(map[string]int)},
265-
getTailReader: func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
266-
return tailfile.NewTailReader(ctx, r, lines)
267-
},
268-
createDecoder: func(io.Reader) Decoder {
269-
return dummyDecoder{}
270-
},
271-
marshal: func(msg *logger.Message) ([]byte, error) {
272-
return msg.Line, nil
273-
},
251+
logPath := filepath.Join(dir, "log")
252+
getTailReader := func(ctx context.Context, r SizeReaderAt, lines int) (io.Reader, int, error) {
253+
return tailfile.NewTailReader(ctx, r, lines)
274254
}
255+
createDecoder := func(io.Reader) Decoder {
256+
return dummyDecoder{}
257+
}
258+
marshal := func(msg *logger.Message) ([]byte, error) {
259+
return msg.Line, nil
260+
}
261+
l, err := NewLogFile(
262+
logPath,
263+
5, // capacity
264+
3, // maxFiles
265+
true, // compress
266+
marshal,
267+
createDecoder,
268+
0600, // perms
269+
getTailReader,
270+
)
271+
assert.NilError(t, err)
275272
defer l.Close()
276273

277274
ls := dirStringer{dir}
278275

279276
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
280-
_, err = os.Stat(f.Name() + ".1")
277+
_, err = os.Stat(logPath + ".1")
281278
assert.Assert(t, os.IsNotExist(err), ls)
282279

283280
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
284-
poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
281+
poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
285282

286283
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
287-
poll.WaitOn(t, checkFileExists(f.Name()+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
288-
poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
284+
poll.WaitOn(t, checkFileExists(logPath+".1.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
285+
poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
289286

290287
t.Run("closed log file", func(t *testing.T) {
291288
// Now let's simulate a failed rotation where the file was able to be closed but something else happened elsewhere
292289
// down the line.
293290
// We want to make sure that we can recover in the case that `l.f` was closed while attempting a rotation.
294291
l.f.Close()
295292
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world!")}))
296-
assert.NilError(t, os.Remove(f.Name()+".2.gz"))
293+
assert.NilError(t, os.Remove(logPath+".2.gz"))
297294
})
298295

299296
t.Run("with log reader", func(t *testing.T) {
300297
// Make sure rotate works with an active reader
301-
lw := logger.NewLogWatcher()
298+
lw := l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000})
302299
defer lw.ConsumerGone()
303-
go l.ReadLogs(logger.ReadConfig{Follow: true, Tail: 1000}, lw)
304300

305301
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 0!")}), ls)
306302
// make sure the log reader is primed
@@ -310,7 +306,7 @@ func TestCheckCapacityAndRotate(t *testing.T) {
310306
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 2!")}), ls)
311307
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 3!")}), ls)
312308
assert.NilError(t, l.WriteLogEntry(&logger.Message{Line: []byte("hello world 4!")}), ls)
313-
poll.WaitOn(t, checkFileExists(f.Name()+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
309+
poll.WaitOn(t, checkFileExists(logPath+".2.gz"), poll.WithDelay(time.Millisecond), poll.WithTimeout(30*time.Second))
314310
})
315311
}
316312

0 commit comments

Comments
 (0)