Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ func (t *Telegraf) loadConfiguration() (*config.Config, error) {
c.OutputFilters = t.outputFilters
c.InputFilters = t.inputFilters
c.SecretStoreFilters = t.secretstoreFilters
c.TestMode = !t.once && (t.test || t.testWait != 0)

if err := t.getConfigFiles(); err != nil {
return c, err
Expand Down
31 changes: 31 additions & 0 deletions cmd/telegraf/telegraf_config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/internal"
)

func TestLoadConfigurationTestModeSkipsDiskOutputBuffer(t *testing.T) {
savedVersion := internal.Version
internal.Version = "0.0.0"
defer func() {
internal.Version = savedVersion
}()

agent := &Telegraf{
GlobalFlags: GlobalFlags{
config: []string{"testdata/test_mode_disk_buffer.conf"},
},
}
_, err := agent.loadConfiguration()
require.ErrorContains(t, err, "creating buffer failed")

agent.test = true
cfg, err := agent.loadConfiguration()
require.NoError(t, err)
require.Len(t, cfg.Outputs, 1)
require.Equal(t, "discard", cfg.Outputs[0].Config.BufferStrategy)
}
1 change: 1 addition & 0 deletions cmd/telegraf/testdata/buffer-not-directory
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
not a directory
9 changes: 9 additions & 0 deletions cmd/telegraf/testdata/test_mode_disk_buffer.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[agent]
buffer_strategy = "disk"
buffer_directory = "testdata/buffer-not-directory"

[[inputs.cpu]]

[[outputs.influxdb]]
urls = ["http://localhost:8086"]
database = "telegraf"
10 changes: 9 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type Config struct {
InputFilters []string
OutputFilters []string
SecretStoreFilters []string
// TestMode keeps output parsing in place while avoiding resources only
// needed when outputs are actually used.
TestMode bool

SecretStores map[string]telegraf.SecretStore
secretStoreSource map[string][]string
Expand Down Expand Up @@ -1784,7 +1787,12 @@ func (c *Config) buildOutput(name, source string, tbl *ast.Table) (*models.Outpu
return nil, c.firstErr()
}

if oc.BufferStrategy == "disk_write_through" {
if err := models.CheckBufferSettings(oc.BufferStrategy); err != nil {
return nil, err
}
if c.TestMode {
oc.BufferStrategy = "discard"
} else if oc.BufferStrategy == "disk_write_through" {
log.Printf("W! Using disk-write-through buffer strategy for plugin outputs.%s, this is an experimental feature", name)
}

Expand Down
16 changes: 16 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,22 @@ func TestConfig_InlineTables(t *testing.T) {
require.Equal(t, []string{"org_id"}, c.Outputs[0].Config.Filter.TagInclude)
}

func TestConfig_TestModeSkipsOutputBuffer(t *testing.T) {
c := config.NewConfig()
err := c.LoadConfig("testdata/test_mode_disk_buffer.toml")
require.ErrorContains(t, err, "creating buffer failed")

c = config.NewConfig()
c.TestMode = true
require.NoError(t, c.LoadConfig("testdata/test_mode_disk_buffer.toml"))
require.Len(t, c.Outputs, 1)
require.Equal(t, "discard", c.Outputs[0].Config.BufferStrategy)

output, ok := c.Outputs[0].Output.(*MockupOutputPluginSerializerNew)
require.True(t, ok)
require.NotNil(t, output.Serializer)
}

func TestConfig_SliceComment(t *testing.T) {
c := config.NewConfig()
require.NoError(t, c.LoadConfig("./testdata/slice_comment.toml"))
Expand Down
1 change: 1 addition & 0 deletions config/testdata/buffer-not-directory
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
not a directory
6 changes: 6 additions & 0 deletions config/testdata/test_mode_disk_buffer.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[agent]
buffer_strategy = "disk"
buffer_directory = "testdata/buffer-not-directory"

[[outputs.serializer_test_new]]
data_format = "json"
12 changes: 12 additions & 0 deletions models/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,22 @@ func NewBuffer(name, id, alias string, capacity int, strategy, path string, disk
return NewMemoryBuffer(capacity, bs)
case "disk_write_through":
return NewDiskBuffer(id, path, bs, diskSync)
case "discard":
return newDiscardBuffer(bs), nil
}
return nil, fmt.Errorf("invalid buffer strategy %q", strategy)
}

// CheckBufferSettings verifies that the buffer settings are valid without
// opening or allocating the buffer.
func CheckBufferSettings(strategy string) error {
switch strategy {
case "", "memory", "disk_write_through":
return nil
}
return fmt.Errorf("invalid buffer strategy %q", strategy)
}

func NewBufferStats(tags map[string]string, capacity int) BufferStats {
bs := BufferStats{
MetricsAdded: selfstat.Register(
Expand Down
37 changes: 37 additions & 0 deletions models/buffer_discard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package models

import "github.com/influxdata/telegraf"

type discardBuffer struct {
BufferStats
}

func newDiscardBuffer(stats BufferStats) *discardBuffer {
return &discardBuffer{BufferStats: stats}
}

func (*discardBuffer) Len() int {
return 0
}

func (b *discardBuffer) Add(metrics ...telegraf.Metric) int {
for _, m := range metrics {
b.metricDropped(m)
}
return len(metrics)
}

func (*discardBuffer) BeginTransaction(int) *Transaction {
return &Transaction{}
}

func (*discardBuffer) EndTransaction(*Transaction) {
}

func (b *discardBuffer) Stats() BufferStats {
return b.BufferStats
}

func (*discardBuffer) Close() error {
return nil
}
28 changes: 28 additions & 0 deletions models/buffer_mem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,34 @@ func TestMemoryBufferAcceptCallsMetricAccept(t *testing.T) {
require.Equal(t, 2, accept)
}

func TestCheckBufferSettings(t *testing.T) {
for _, strategy := range []string{"", "memory", "disk_write_through"} {
require.NoError(t, CheckBufferSettings(strategy))
}
require.ErrorContains(t, CheckBufferSettings("discard"), `invalid buffer strategy "discard"`)
require.ErrorContains(t, CheckBufferSettings("unknown"), `invalid buffer strategy "unknown"`)
}

func TestDiscardBufferDropsMetrics(t *testing.T) {
buf, err := NewBuffer("test", "123", "", 5, "discard", "", true)
require.NoError(t, err)
buf.Stats().MetricsDropped.Set(0)
defer buf.Close()

var rejected int
mm := &mockMetric{
Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)),
RejectF: func() {
rejected++
},
}
require.Equal(t, 1, buf.Add(mm))
require.Equal(t, 0, buf.Len())
require.Equal(t, 1, rejected)
require.Equal(t, int64(1), buf.Stats().MetricsDropped.Get())
require.Empty(t, buf.BeginTransaction(1).Batch)
}

func BenchmarkMemoryBufferAddMetrics(b *testing.B) {
buf, err := NewBuffer("test", "123", "", 10000, "memory", "", true)
require.NoError(b, err)
Expand Down
Loading