Skip to content

Commit e257c14

Browse files
authored
fix(agent): Use a unique WAL file for plugin instances of the same type (#15966)
1 parent 39a5ca2 commit e257c14

7 files changed

+17
-9
lines changed

docs/CONFIGURATION.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ The agent table configures Telegraf and the defaults used across all plugins.
362362

363363
- **buffer_directory**:
364364
The directory to use when in `disk` buffer mode. Each output plugin will make
365-
another subdirectory in this directory with the output plugin's name.
365+
another subdirectory in this directory with the output plugin's ID.
366366

367367
## Plugins
368368

models/buffer.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type BufferStats struct {
4949
}
5050

5151
// NewBuffer returns a new empty Buffer with the given capacity.
52-
func NewBuffer(name string, alias string, capacity int, strategy string, path string) (Buffer, error) {
52+
func NewBuffer(name, id, alias string, capacity int, strategy, path string) (Buffer, error) {
5353
registerGob()
5454

5555
bs := NewBufferStats(name, alias, capacity)
@@ -58,7 +58,7 @@ func NewBuffer(name string, alias string, capacity int, strategy string, path st
5858
case "", "memory":
5959
return NewMemoryBuffer(capacity, bs)
6060
case "disk":
61-
return NewDiskBuffer(name, path, bs)
61+
return NewDiskBuffer(name, id, path, bs)
6262
}
6363
return nil, fmt.Errorf("invalid buffer strategy %q", strategy)
6464
}

models/buffer_disk.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,19 @@ type DiskBuffer struct {
3333
isEmpty bool
3434
}
3535

36-
func NewDiskBuffer(name string, path string, stats BufferStats) (*DiskBuffer, error) {
37-
filePath := filepath.Join(path, name)
36+
func NewDiskBuffer(name, id, path string, stats BufferStats) (*DiskBuffer, error) {
37+
filePath := filepath.Join(path, id)
3838
walFile, err := wal.Open(filePath, nil)
3939
if err != nil {
4040
return nil, fmt.Errorf("failed to open wal file: %w", err)
4141
}
42+
//nolint:errcheck // cannot error here
43+
if index, _ := walFile.FirstIndex(); index == 0 {
44+
// simple way to test if the walfile is freshly initialized, meaning no existing file was found
45+
log.Printf("I! WAL file not found for plugin outputs.%s (%s), "+
46+
"this can safely be ignored if you added this plugin instance for the first time", name, id)
47+
}
48+
4249
buf := &DiskBuffer{
4350
BufferStats: stats,
4451
file: walFile,

models/buffer_disk_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func newTestDiskBuffer(t testing.TB) Buffer {
2222

2323
func newTestDiskBufferWithPath(t testing.TB, name string, path string) Buffer {
2424
t.Helper()
25-
buf, err := NewBuffer(name, "", 0, "disk", path)
25+
buf, err := NewBuffer(name, "123", "", 0, "disk", path)
2626
require.NoError(t, err)
2727
buf.Stats().MetricsAdded.Set(0)
2828
buf.Stats().MetricsWritten.Set(0)
@@ -45,6 +45,7 @@ func TestBuffer_RetainsTrackingInformation(t *testing.T) {
4545
func TestBuffer_TrackingDroppedFromOldWal(t *testing.T) {
4646
path, err := os.MkdirTemp("", "*-buffer-test")
4747
require.NoError(t, err)
48+
path = filepath.Join(path, "123")
4849
walfile, err := wal.Open(path, nil)
4950
require.NoError(t, err)
5051

models/buffer_mem_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88

99
func newTestMemoryBuffer(t testing.TB, capacity int) Buffer {
1010
t.Helper()
11-
buf, err := NewBuffer("test", "", capacity, "memory", "")
11+
buf, err := NewBuffer("test", "123", "", capacity, "memory", "")
1212
require.NoError(t, err)
1313
buf.Stats().MetricsAdded.Set(0)
1414
buf.Stats().MetricsWritten.Set(0)

models/buffer_suite_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func MetricTime(sec int64) telegraf.Metric {
8484

8585
func (s *BufferSuiteTest) newTestBuffer(capacity int) Buffer {
8686
s.T().Helper()
87-
buf, err := NewBuffer("test", "", capacity, s.bufferType, s.bufferPath)
87+
buf, err := NewBuffer("test", "123", "", capacity, s.bufferType, s.bufferPath)
8888
s.Require().NoError(err)
8989
buf.Stats().MetricsAdded.Set(0)
9090
buf.Stats().MetricsWritten.Set(0)

models/running_output.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func NewRunningOutput(
104104
batchSize = DefaultMetricBatchSize
105105
}
106106

107-
b, err := NewBuffer(config.Name, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory)
107+
b, err := NewBuffer(config.Name, config.ID, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory)
108108
if err != nil {
109109
panic(err)
110110
}

0 commit comments

Comments
 (0)