diff --git a/cmd/telegraf/telegraf.go b/cmd/telegraf/telegraf.go index 2546f65725571..ec2922c29ef82 100644 --- a/cmd/telegraf/telegraf.go +++ b/cmd/telegraf/telegraf.go @@ -402,6 +402,7 @@ func (t *Telegraf) loadConfiguration() (*config.Config, error) { c.OutputFilters = t.outputFilters c.InputFilters = t.inputFilters c.SecretStoreFilters = t.secretstoreFilters + c.TestMode = !t.once && (t.test || t.testWait != 0) if err := t.getConfigFiles(); err != nil { return c, err diff --git a/cmd/telegraf/telegraf_config_test.go b/cmd/telegraf/telegraf_config_test.go new file mode 100644 index 0000000000000..a5563d57e1cfd --- /dev/null +++ b/cmd/telegraf/telegraf_config_test.go @@ -0,0 +1,31 @@ +package main + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/internal" +) + +func TestLoadConfigurationTestModeSkipsDiskOutputBuffer(t *testing.T) { + savedVersion := internal.Version + internal.Version = "0.0.0" + defer func() { + internal.Version = savedVersion + }() + + agent := &Telegraf{ + GlobalFlags: GlobalFlags{ + config: []string{"testdata/test_mode_disk_buffer.conf"}, + }, + } + _, err := agent.loadConfiguration() + require.ErrorContains(t, err, "creating buffer failed") + + agent.test = true + cfg, err := agent.loadConfiguration() + require.NoError(t, err) + require.Len(t, cfg.Outputs, 1) + require.Equal(t, "discard", cfg.Outputs[0].Config.BufferStrategy) +} diff --git a/cmd/telegraf/testdata/buffer-not-directory b/cmd/telegraf/testdata/buffer-not-directory new file mode 100644 index 0000000000000..59768066c4326 --- /dev/null +++ b/cmd/telegraf/testdata/buffer-not-directory @@ -0,0 +1 @@ +not a directory diff --git a/cmd/telegraf/testdata/test_mode_disk_buffer.conf b/cmd/telegraf/testdata/test_mode_disk_buffer.conf new file mode 100644 index 0000000000000..891b4d51a5a5d --- /dev/null +++ b/cmd/telegraf/testdata/test_mode_disk_buffer.conf @@ -0,0 +1,9 @@ +[agent] + buffer_strategy = "disk" + buffer_directory = "testdata/buffer-not-directory" + +[[inputs.cpu]] + +[[outputs.influxdb]] + urls = ["http://localhost:8086"] + database = "telegraf" diff --git a/config/config.go b/config/config.go index c4cd1ea8fe1f2..02c9aa268aa59 100644 --- a/config/config.go +++ b/config/config.go @@ -88,6 +88,9 @@ type Config struct { InputFilters []string OutputFilters []string SecretStoreFilters []string + // TestMode keeps output parsing in place while avoiding resources only + // needed when outputs are actually used. + TestMode bool SecretStores map[string]telegraf.SecretStore secretStoreSource map[string][]string @@ -1784,7 +1787,12 @@ func (c *Config) buildOutput(name, source string, tbl *ast.Table) (*models.Outpu return nil, c.firstErr() } - if oc.BufferStrategy == "disk_write_through" { + if err := models.CheckBufferSettings(oc.BufferStrategy); err != nil { + return nil, err + } + if c.TestMode { + oc.BufferStrategy = "discard" + } else if oc.BufferStrategy == "disk_write_through" { log.Printf("W! Using disk-write-through buffer strategy for plugin outputs.%s, this is an experimental feature", name) } diff --git a/config/config_test.go b/config/config_test.go index fb9e4cd4fd37e..39eba4b8b380a 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -504,6 +504,22 @@ func TestConfig_InlineTables(t *testing.T) { require.Equal(t, []string{"org_id"}, c.Outputs[0].Config.Filter.TagInclude) } +func TestConfig_TestModeSkipsOutputBuffer(t *testing.T) { + c := config.NewConfig() + err := c.LoadConfig("testdata/test_mode_disk_buffer.toml") + require.ErrorContains(t, err, "creating buffer failed") + + c = config.NewConfig() + c.TestMode = true + require.NoError(t, c.LoadConfig("testdata/test_mode_disk_buffer.toml")) + require.Len(t, c.Outputs, 1) + require.Equal(t, "discard", c.Outputs[0].Config.BufferStrategy) + + output, ok := c.Outputs[0].Output.(*MockupOutputPluginSerializerNew) + require.True(t, ok) + require.NotNil(t, output.Serializer) +} + func TestConfig_SliceComment(t *testing.T) { c := config.NewConfig() require.NoError(t, c.LoadConfig("./testdata/slice_comment.toml")) diff --git a/config/testdata/buffer-not-directory b/config/testdata/buffer-not-directory new file mode 100644 index 0000000000000..59768066c4326 --- /dev/null +++ b/config/testdata/buffer-not-directory @@ -0,0 +1 @@ +not a directory diff --git a/config/testdata/test_mode_disk_buffer.toml b/config/testdata/test_mode_disk_buffer.toml new file mode 100644 index 0000000000000..f2c11a5f24918 --- /dev/null +++ b/config/testdata/test_mode_disk_buffer.toml @@ -0,0 +1,6 @@ +[agent] + buffer_strategy = "disk" + buffer_directory = "testdata/buffer-not-directory" + +[[outputs.serializer_test_new]] + data_format = "json" diff --git a/models/buffer.go b/models/buffer.go index 6ad1b2be380ee..473a154545db7 100644 --- a/models/buffer.go +++ b/models/buffer.go @@ -114,10 +114,22 @@ func NewBuffer(name, id, alias string, capacity int, strategy, path string, disk return NewMemoryBuffer(capacity, bs) case "disk_write_through": return NewDiskBuffer(id, path, bs, diskSync) + case "discard": + return newDiscardBuffer(bs), nil } return nil, fmt.Errorf("invalid buffer strategy %q", strategy) } +// CheckBufferSettings verifies that the buffer settings are valid without +// opening or allocating the buffer. +func CheckBufferSettings(strategy string) error { + switch strategy { + case "", "memory", "disk_write_through": + return nil + } + return fmt.Errorf("invalid buffer strategy %q", strategy) +} + func NewBufferStats(tags map[string]string, capacity int) BufferStats { bs := BufferStats{ MetricsAdded: selfstat.Register( diff --git a/models/buffer_discard.go b/models/buffer_discard.go new file mode 100644 index 0000000000000..24437a954cd8f --- /dev/null +++ b/models/buffer_discard.go @@ -0,0 +1,37 @@ +package models + +import "github.com/influxdata/telegraf" + +type discardBuffer struct { + BufferStats +} + +func newDiscardBuffer(stats BufferStats) *discardBuffer { + return &discardBuffer{BufferStats: stats} +} + +func (*discardBuffer) Len() int { + return 0 +} + +func (b *discardBuffer) Add(metrics ...telegraf.Metric) int { + for _, m := range metrics { + b.metricDropped(m) + } + return len(metrics) +} + +func (*discardBuffer) BeginTransaction(int) *Transaction { + return &Transaction{} +} + +func (*discardBuffer) EndTransaction(*Transaction) { +} + +func (b *discardBuffer) Stats() BufferStats { + return b.BufferStats +} + +func (*discardBuffer) Close() error { + return nil +} diff --git a/models/buffer_mem_test.go b/models/buffer_mem_test.go index bbf7858789ba5..baa2600a37451 100644 --- a/models/buffer_mem_test.go +++ b/models/buffer_mem_test.go @@ -31,6 +31,34 @@ func TestMemoryBufferAcceptCallsMetricAccept(t *testing.T) { require.Equal(t, 2, accept) } +func TestCheckBufferSettings(t *testing.T) { + for _, strategy := range []string{"", "memory", "disk_write_through"} { + require.NoError(t, CheckBufferSettings(strategy)) + } + require.ErrorContains(t, CheckBufferSettings("discard"), `invalid buffer strategy "discard"`) + require.ErrorContains(t, CheckBufferSettings("unknown"), `invalid buffer strategy "unknown"`) +} + +func TestDiscardBufferDropsMetrics(t *testing.T) { + buf, err := NewBuffer("test", "123", "", 5, "discard", "", true) + require.NoError(t, err) + buf.Stats().MetricsDropped.Set(0) + defer buf.Close() + + var rejected int + mm := &mockMetric{ + Metric: metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)), + RejectF: func() { + rejected++ + }, + } + require.Equal(t, 1, buf.Add(mm)) + require.Equal(t, 0, buf.Len()) + require.Equal(t, 1, rejected) + require.Equal(t, int64(1), buf.Stats().MetricsDropped.Get()) + require.Empty(t, buf.BeginTransaction(1).Batch) +} + func BenchmarkMemoryBufferAddMetrics(b *testing.B) { buf, err := NewBuffer("test", "123", "", 10000, "memory", "", true) require.NoError(b, err)