Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,7 @@ To enable Prometheus integration set:
2. `PROMETHEUS_ADDR`: The port to listen on for Prometheus metrics. Defaults to `:9090`
3. `PROMETHEUS_PATH`: The path to listen on for Prometheus metrics. Defaults to `/metrics`
4. `PROMETHEUS_MAPPER_YAML`: The path to the YAML file that defines the mapping from statsd to prometheus metrics.
5. `PROMETHEUS_RESPONSE_TIME_AS_MILLISECONDS`: `true` to keep the legacy millisecond behavior for `ratelimit_server.*.response_time` in the built-in mapper. Ignored when `PROMETHEUS_MAPPER_YAML` is set.

Define the mapping from statsd to prometheus metrics in a YAML file.
Find more information about the mapping in the [Metric Mapping and Configuration](https://github.com/prometheus/statsd_exporter?tab=readme-ov-file#metric-mapping-and-configuration).
Expand Down Expand Up @@ -1181,6 +1182,7 @@ mappings: # Requires statsd exporter >= v0.6.0 since it uses the "drop" action.
- match: "ratelimit_server.*.response_time"
name: "ratelimit_service_response_time_seconds"
timer_type: histogram
scale: 0.001
labels:
grpc_method: "$1"

Expand Down
3 changes: 2 additions & 1 deletion src/service_cmd/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ func NewRunner(s settings.Settings) Runner {
}
logger.Info("Stats initialized for Prometheus")
store = gostats.NewStore(prom.NewPrometheusSink(prom.WithAddr(s.PrometheusAddr),
prom.WithPath(s.PrometheusPath), prom.WithMapperYamlPath(s.PrometheusMapperYaml)), false)
prom.WithPath(s.PrometheusPath), prom.WithMapperYamlPath(s.PrometheusMapperYaml),
prom.WithResponseTimeAsMilliseconds(s.PrometheusResponseTimeAsMilliseconds)), false)
default:
logger.Info("Stats initialized for stdout")
store = gostats.NewStore(gostats.NewLoggingSink(), false)
Expand Down
25 changes: 13 additions & 12 deletions src/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,19 @@ type Settings struct {
XdsClientGrpcOptionsMaxMsgSizeInBytes int `envconfig:"XDS_CLIENT_MAX_MSG_SIZE_IN_BYTES" default:""`

// Stats-related settings
UseDogStatsd bool `envconfig:"USE_DOG_STATSD" default:"false"`
UseDogStatsdMogrifiers []string `envconfig:"USE_DOG_STATSD_MOGRIFIERS" default:""`
UseStatsd bool `envconfig:"USE_STATSD" default:"true"`
StatsdHost string `envconfig:"STATSD_HOST" default:"localhost"`
StatsdPort int `envconfig:"STATSD_PORT" default:"8125"`
ExtraTags map[string]string `envconfig:"EXTRA_TAGS" default:""`
StatsFlushInterval time.Duration `envconfig:"STATS_FLUSH_INTERVAL" default:"10s"`
DisableStats bool `envconfig:"DISABLE_STATS" default:"false"`
UsePrometheus bool `envconfig:"USE_PROMETHEUS" default:"false"`
PrometheusAddr string `envconfig:"PROMETHEUS_ADDR" default:":9090"`
PrometheusPath string `envconfig:"PROMETHEUS_PATH" default:"/metrics"`
PrometheusMapperYaml string `envconfig:"PROMETHEUS_MAPPER_YAML" default:""`
UseDogStatsd bool `envconfig:"USE_DOG_STATSD" default:"false"`
UseDogStatsdMogrifiers []string `envconfig:"USE_DOG_STATSD_MOGRIFIERS" default:""`
UseStatsd bool `envconfig:"USE_STATSD" default:"true"`
StatsdHost string `envconfig:"STATSD_HOST" default:"localhost"`
StatsdPort int `envconfig:"STATSD_PORT" default:"8125"`
ExtraTags map[string]string `envconfig:"EXTRA_TAGS" default:""`
StatsFlushInterval time.Duration `envconfig:"STATS_FLUSH_INTERVAL" default:"10s"`
DisableStats bool `envconfig:"DISABLE_STATS" default:"false"`
UsePrometheus bool `envconfig:"USE_PROMETHEUS" default:"false"`
PrometheusAddr string `envconfig:"PROMETHEUS_ADDR" default:":9090"`
PrometheusPath string `envconfig:"PROMETHEUS_PATH" default:"/metrics"`
PrometheusMapperYaml string `envconfig:"PROMETHEUS_MAPPER_YAML" default:""`
PrometheusResponseTimeAsMilliseconds bool `envconfig:"PROMETHEUS_RESPONSE_TIME_AS_MILLISECONDS" default:"false"`

// Settings for rate limit configuration
RuntimePath string `envconfig:"RUNTIME_ROOT" default:"/srv/runtime_data/current"`
Expand Down
19 changes: 19 additions & 0 deletions src/settings/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,31 @@ import (
"github.com/stretchr/testify/assert"
)

const prometheusResponseTimeAsMillisecondsEnv = "PROMETHEUS_RESPONSE_TIME_AS_MILLISECONDS"

func TestSettingsTlsConfigUnmodified(t *testing.T) {
settings := NewSettings()
assert.NotNil(t, settings.RedisTlsConfig)
assert.Nil(t, settings.RedisTlsConfig.RootCAs)
}

func TestPrometheusResponseTimeAsMillisecondsDefault(t *testing.T) {
os.Unsetenv(prometheusResponseTimeAsMillisecondsEnv)

settings := NewSettings()

assert.False(t, settings.PrometheusResponseTimeAsMilliseconds)
}

func TestPrometheusResponseTimeAsMillisecondsEnabled(t *testing.T) {
os.Setenv(prometheusResponseTimeAsMillisecondsEnv, "true")
defer os.Unsetenv(prometheusResponseTimeAsMillisecondsEnv)

settings := NewSettings()

assert.True(t, settings.PrometheusResponseTimeAsMilliseconds)
}

// Tests for RedisPoolOnEmptyBehavior
func TestRedisPoolOnEmptyBehavior_Default(t *testing.T) {
os.Unsetenv("REDIS_POOL_ON_EMPTY_BEHAVIOR")
Expand Down
1 change: 1 addition & 0 deletions src/stats/prom/default_mapper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ mappings:
- match: "ratelimit_server.*.response_time"
name: "ratelimit_service_response_time_seconds"
timer_type: histogram
scale: 0.001
labels:
grpc_method: "$1"

Expand Down
24 changes: 20 additions & 4 deletions src/stats/prom/prometheus_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package prom
import (
_ "embed"
"net/http"
"strings"

"github.com/go-kit/log"
gostats "github.com/lyft/gostats"
Expand Down Expand Up @@ -64,9 +65,10 @@ var (

type prometheusSink struct {
config struct {
addr string
path string
mapperYamlPath string
addr string
path string
mapperYamlPath string
responseTimeAsMilliseconds bool
}
mapper *mapper.MetricMapper
events chan event.Events
Expand All @@ -93,6 +95,20 @@ func WithMapperYamlPath(mapperYamlPath string) prometheusSinkOption {
}
}

func WithResponseTimeAsMilliseconds(responseTimeAsMilliseconds bool) prometheusSinkOption {
return func(sink *prometheusSink) {
sink.config.responseTimeAsMilliseconds = responseTimeAsMilliseconds
}
}

func (s *prometheusSink) mapperConfig() string {
if s.config.responseTimeAsMilliseconds {
return strings.Replace(defaultMapper, " scale: 0.001\n", "", 1)
}

return defaultMapper
}

// NewPrometheusSink returns a Sink that flushes stats to os.StdErr.
func NewPrometheusSink(opts ...prometheusSinkOption) gostats.Sink {
promRegistry := prometheus.DefaultRegisterer
Expand All @@ -119,7 +135,7 @@ func NewPrometheusSink(opts ...prometheusSinkOption) gostats.Sink {
if sink.config.mapperYamlPath != "" {
_ = sink.mapper.InitFromFile(sink.config.mapperYamlPath)
} else {
_ = sink.mapper.InitFromYAMLString(defaultMapper)
_ = sink.mapper.InitFromYAMLString(sink.mapperConfig())
}

sink.exp = exporter.NewExporter(promRegistry,
Expand Down
64 changes: 64 additions & 0 deletions src/stats/prom/prometheus_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,67 @@ func TestFlushTimer(t *testing.T) {
*m.Metric[0].Histogram.SampleSum == 1.0
}, time.Second, time.Millisecond)
}

func TestFlushResponseTimeConvertsMillisecondsToSeconds(t *testing.T) {
s.FlushTimer("ratelimit_server.ShouldRateLimit.response_time", 1000)
assert.Eventually(t, func() bool {
metricFamilies, err := prometheus.DefaultGatherer.Gather()
if err != nil {
return false
}

metrics := make(map[string]*dto.MetricFamily)
for _, metricFamily := range metricFamilies {
metrics[*metricFamily.Name] = metricFamily
}

m, ok := metrics["ratelimit_service_response_time_seconds"]
if !ok || len(m.Metric) != 1 {
return false
}

return *m.Metric[0].Histogram.SampleCount == uint64(1) &&
reflect.DeepEqual(toMap(m.Metric[0].Label), map[string]string{
"grpc_method": "ShouldRateLimit",
}) &&
*m.Metric[0].Histogram.SampleSum == 1.0
}, time.Second, time.Millisecond)
}

func TestFlushResponseTimeCanUseLegacyMilliseconds(t *testing.T) {
oldRegisterer := prometheus.DefaultRegisterer
oldGatherer := prometheus.DefaultGatherer
reg := prometheus.NewRegistry()
prometheus.DefaultRegisterer = reg
prometheus.DefaultGatherer = reg
defer func() {
prometheus.DefaultRegisterer = oldRegisterer
prometheus.DefaultGatherer = oldGatherer
}()

legacySink := NewPrometheusSink(WithAddr(":0"), WithPath("/metrics-legacy"), WithResponseTimeAsMilliseconds(true))
legacySink.FlushTimer("ratelimit_server.ShouldRateLimit.response_time", 1000)

assert.Eventually(t, func() bool {
metricFamilies, err := reg.Gather()
if err != nil {
return false
}

metrics := make(map[string]*dto.MetricFamily)
for _, metricFamily := range metricFamilies {
metrics[*metricFamily.Name] = metricFamily
}

m, ok := metrics["ratelimit_service_response_time_seconds"]
if !ok || len(m.Metric) != 1 {
return false
}

return *m.Metric[0].Histogram.SampleCount == uint64(1) &&
reflect.DeepEqual(toMap(m.Metric[0].Label), map[string]string{
"grpc_method": "ShouldRateLimit",
}) &&
*m.Metric[0].Histogram.SampleSum == 1000.0
}, time.Second, time.Millisecond)
}