diff --git a/api/pkg/filtermanager/api/api.go b/api/pkg/filtermanager/api/api.go index 8f310bf2..40c9d16d 100644 --- a/api/pkg/filtermanager/api/api.go +++ b/api/pkg/filtermanager/api/api.go @@ -229,6 +229,10 @@ type StreamFilterCallbacks interface { // PluginState returns the PluginState associated to this request. PluginState() PluginState + // Metrics API for plugins + GetCounterMetrics(pluginName, metricsName string) api.CounterMetric + GetGaugeMetrics(pluginName, metricsName string) api.GaugeMetric + // WithLogArg injectes `key: value` as the suffix of application log created by this // callback's Log* methods. The injected log arguments are only valid in the current request. // This method can be used to inject IDs or other context information into the logs. diff --git a/api/pkg/filtermanager/api_impl.go b/api/pkg/filtermanager/api_impl.go index 24078e62..44ea830d 100644 --- a/api/pkg/filtermanager/api_impl.go +++ b/api/pkg/filtermanager/api_impl.go @@ -30,6 +30,7 @@ import ( "mosn.io/htnn/api/internal/cookie" "mosn.io/htnn/api/internal/pluginstate" "mosn.io/htnn/api/pkg/filtermanager/api" + "mosn.io/htnn/api/pkg/plugins" ) type filterManagerRequestHeaderMap struct { @@ -146,6 +147,7 @@ type filterManagerCallbackHandler struct { namespace string consumer api.Consumer pluginState api.PluginState + metrics map[string]plugins.MetricsWriter streamInfo *filterManagerStreamInfo @@ -164,6 +166,7 @@ func (cb *filterManagerCallbackHandler) Reset() { cb.streamInfo = nil cb.logArgNames = "" cb.logArgs = nil + cb.metrics = nil cb.cacheLock.Unlock() } @@ -207,6 +210,40 @@ func (cb *filterManagerCallbackHandler) PluginState() api.PluginState { return cb.pluginState } +func (cb *filterManagerCallbackHandler) GetCounterMetrics(pluginName, metricName string) capi.CounterMetric { + if cb.metrics == nil { + api.LogErrorf("metrics not exist or not initialized for plugin %s", pluginName) + return nil + } + writer, ok := cb.metrics[pluginName] + if !ok { + api.LogErrorf("metrics writer for plugin %s not found", pluginName) + return nil + } + if writer.Counters == nil || writer.Counters[metricName] == nil { + api.LogErrorf("counter metric %s not found in plugin %s", metricName, pluginName) + return nil + } + return writer.Counters[metricName] +} + +func (cb *filterManagerCallbackHandler) GetGaugeMetrics(pluginName, metricName string) capi.GaugeMetric { + if cb.metrics == nil { + api.LogErrorf("metrics not exist or not initialized for plugin %s", pluginName) + return nil + } + writer, ok := cb.metrics[pluginName] + if !ok { + api.LogErrorf("metrics writer for plugin %s not found", pluginName) + return nil + } + if writer.Gaugers == nil || writer.Gaugers[metricName] == nil { + api.LogErrorf("gauge metric %s not found in plugin %s", metricName, pluginName) + return nil + } + return writer.Gaugers[metricName] +} + func (cb *filterManagerCallbackHandler) WithLogArg(key string, value any) api.StreamFilterCallbacks { // As the log is embedded into the Envoy's log, it's not so necessary to use structural logging // here. So far the value is just an ID string, introduce complex processions like quoting is diff --git a/api/pkg/filtermanager/config.go b/api/pkg/filtermanager/config.go index 6cd411fe..5baed145 100644 --- a/api/pkg/filtermanager/config.go +++ b/api/pkg/filtermanager/config.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" "sort" + "strings" "sync" xds "github.com/cncf/xds/go/xds/type/v3" @@ -28,6 +29,7 @@ import ( "mosn.io/htnn/api/pkg/filtermanager/api" "mosn.io/htnn/api/pkg/filtermanager/model" + "mosn.io/htnn/api/pkg/plugins" pkgPlugins "mosn.io/htnn/api/pkg/plugins" ) @@ -57,6 +59,8 @@ type filterManagerConfig struct { parsed []*model.ParsedFilterConfig pool *sync.Pool + metricsWriters map[string]plugins.MetricsWriter + namespace string enableDebugMode bool @@ -92,6 +96,9 @@ func (conf *filterManagerConfig) Merge(another *filterManagerConfig) *filterMana ns = another.namespace } + // Pass LDS metrics writers to the merged config for golang filter to use at route level + conf.metricsWriters = another.metricsWriters + // It's tough to do the data plane merge right. We don't use shallow copy, which may share // data structure accidentally. We don't use deep copy all the fields, which may copy unexpected computed data. // Let's copy fields manually. @@ -179,9 +186,25 @@ func (conf *filterManagerConfig) InitOnce() { func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigCallbackHandler) (interface{}, error) { configStruct := &xds.TypedStruct{} + var metricsWriters = map[string]plugins.MetricsWriter{} + + if callbacks != nil { + // If callbacks is not nil, it means this filter is configured in the LDS level. + // We need to initialize the metrics for all golang plugins here. + registers := plugins.GetMetricsDefinitions() + registeredPlugins := []string{} + for pluginName, register := range registers { + api.LogInfof("registering metrics for golang plugin %s", pluginName) + metricsWriters[pluginName] = register(callbacks) + registeredPlugins = append(registeredPlugins, pluginName) + } + capi.LogInfof("metrics registered for plugins: [%s]", strings.Join(registeredPlugins, ", ")) + } + // No configuration if any.GetTypeUrl() == "" { conf := initFilterManagerConfig("") + conf.metricsWriters = metricsWriters return conf, nil } @@ -216,6 +239,7 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC for _, proto := range plugins { name := proto.Name + if plugin := pkgPlugins.LoadHTTPFilterFactoryAndParser(name); plugin != nil { config, err := plugin.ConfigParser.Parse(proto.Config) if err != nil { @@ -264,6 +288,7 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC if needInit { conf.initOnce = &sync.Once{} } + conf.metricsWriters = metricsWriters return conf, nil } @@ -279,6 +304,7 @@ func (p *FilterManagerConfigParser) Merge(parent interface{}, child interface{}) } if httpFilterCfg == nil || len(httpFilterCfg.parsed) == 0 { + routeCfg.metricsWriters = httpFilterCfg.metricsWriters return routeCfg } diff --git a/api/pkg/filtermanager/filtermanager.go b/api/pkg/filtermanager/filtermanager.go index 30c4271b..69c19540 100644 --- a/api/pkg/filtermanager/filtermanager.go +++ b/api/pkg/filtermanager/filtermanager.go @@ -153,6 +153,8 @@ func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) (streamF fm.callbacks.FilterCallbackHandler = cb + fm.callbacks.metrics = conf.metricsWriters + canSkipMethods := fm.canSkipMethods canSyncRunMethods := fm.canSyncRunMethods if canSkipMethods == nil { diff --git a/api/pkg/plugins/plugins.go b/api/pkg/plugins/plugins.go index 5ab2742a..538dc898 100644 --- a/api/pkg/plugins/plugins.go +++ b/api/pkg/plugins/plugins.go @@ -20,6 +20,8 @@ import ( "errors" "runtime/debug" + capi "github.com/envoyproxy/envoy/contrib/golang/common/go/api" + "mosn.io/htnn/api/internal/proto" "mosn.io/htnn/api/pkg/filtermanager/api" "mosn.io/htnn/api/pkg/log" @@ -31,6 +33,7 @@ var ( pluginTypes = map[string]Plugin{} plugins = map[string]Plugin{} httpFilterFactoryAndParser = map[string]*FilterFactoryAndParser{} + metricsDefinitions = map[string]MetricsRegister{} ) // Here we introduce extra struct to avoid cyclic import between pkg/filtermanager and pkg/plugins @@ -234,3 +237,18 @@ func ComparePluginOrderInt(a, b string) int { } return cmp.Compare(a, b) } + +type MetricsWriter struct { + Counters map[string]capi.CounterMetric + Gaugers map[string]capi.GaugeMetric +} + +type MetricsRegister func(capi.ConfigCallbacks) MetricsWriter + +func RegisterMetricsDefinitions(pluginName string, definition MetricsRegister) { + metricsDefinitions[pluginName] = definition +} + +func GetMetricsDefinitions() map[string]MetricsRegister { + return metricsDefinitions +} diff --git a/api/plugins/tests/integration/dataplane/data_plane.go b/api/plugins/tests/integration/dataplane/data_plane.go index 4e29b0e0..ef178477 100644 --- a/api/plugins/tests/integration/dataplane/data_plane.go +++ b/api/plugins/tests/integration/dataplane/data_plane.go @@ -543,6 +543,27 @@ func (dp *DataPlane) do(method string, path string, header http.Header, body io. return resp, err } +func (dp *DataPlane) GetAdmin(path string) (*http.Response, error) { + req, err := http.NewRequest("GET", "http://localhost:"+dp.adminAPIPort+path, nil) + if err != nil { + return nil, err + } + tr := &http.Transport{ + DialContext: func(ctx context.Context, proto, addr string) (conn net.Conn, err error) { + return net.DialTimeout("tcp", ":"+dp.adminAPIPort, 1*time.Second) + }, + } + + client := &http.Client{Transport: tr, + Timeout: 10 * time.Second, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + } + resp, err := client.Do(req) + return resp, err +} + func (dp *DataPlane) doWithTrailer(method string, path string, header http.Header, body io.Reader, trailer http.Header) (*http.Response, error) { req, err := http.NewRequest(method, "http://localhost:"+dp.dataPlanePort+path, body) if err != nil { diff --git a/api/plugins/tests/pkg/envoy/capi.go b/api/plugins/tests/pkg/envoy/capi.go index e26d7328..ed10d146 100644 --- a/api/plugins/tests/pkg/envoy/capi.go +++ b/api/plugins/tests/pkg/envoy/capi.go @@ -583,6 +583,13 @@ func (i *filterCallbackHandler) PluginState() api.PluginState { return i.pluginState } +func (i *filterCallbackHandler) GetCounterMetrics(pluginName, metricsName string) capi.CounterMetric { + return nil +} +func (i *filterCallbackHandler) GetGaugeMetrics(pluginName, metricsName string) capi.GaugeMetric { + return nil +} + func (i *filterCallbackHandler) WithLogArg(key string, value any) api.StreamFilterCallbacks { return i } diff --git a/api/tests/integration/filtermanager_latest_test.go b/api/tests/integration/filtermanager_latest_test.go index 308433e5..033d2dd3 100644 --- a/api/tests/integration/filtermanager_latest_test.go +++ b/api/tests/integration/filtermanager_latest_test.go @@ -19,9 +19,11 @@ package integration import ( "bytes" _ "embed" + "io" "net/http" "os" "path/filepath" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -339,3 +341,55 @@ func TestFilterManagerLogWithTrailers(t *testing.T) { require.Nil(t, err) assert.Equal(t, 200, resp.StatusCode) } + +func TestMetricsEnabledPlugin(t *testing.T) { + dp, err := dataplane.StartDataPlane(t, &dataplane.Option{ + LogLevel: "debug", + }) + if err != nil { + t.Fatalf("failed to start data plane: %v", err) + return + } + defer dp.Stop() + + lp := &filtermanager.FilterManagerConfig{ + Plugins: []*model.FilterConfig{ + { + Name: "metrics", + Config: &Config{}, + }, + }, + } + + controlPlane.UseGoPluginConfig(t, lp, dp) + hdr := http.Header{} + resp, err := dp.Get("/", hdr) + require.Nil(t, err) + body, err := io.ReadAll(resp.Body) + require.Nil(t, err) + assert.Equal(t, 200, resp.StatusCode, "response: %s", string(body)) + resp.Body.Close() + + resp, err = dp.GetAdmin("/stats") + require.Nil(t, err) + body, err = io.ReadAll(resp.Body) + require.Nil(t, err) + lines := strings.Split(string(body), "\n") + + var found int + + for _, l := range lines { + if !strings.Contains(l, "metrics-test") { + continue + } + if strings.Contains(l, "usage.counter") { + found++ + assert.Equal(t, "metrics-test.usage.counter: 1", l) + } + if strings.Contains(l, "usage.gauge") { + found++ + assert.Contains(t, "metrics-test.usage.gauge: 2", l) + } + } + assert.Equal(t, 2, found, "expect to have metrics usage.counter and usage.gauge") +} diff --git a/api/tests/integration/test_plugins.go b/api/tests/integration/test_plugins.go index dac15c4c..9ac0b005 100644 --- a/api/tests/integration/test_plugins.go +++ b/api/tests/integration/test_plugins.go @@ -21,6 +21,8 @@ import ( "strconv" "strings" + capi "github.com/envoyproxy/envoy/contrib/golang/common/go/api" + "mosn.io/htnn/api/pkg/filtermanager/api" "mosn.io/htnn/api/pkg/plugins" ) @@ -619,6 +621,68 @@ func (f *onLogFilter) OnLog(reqHeaders api.RequestHeaderMap, reqTrailers api.Req api.LogWarnf("receive request trailers: %+v", trailers) } +type metricsConfig struct { + Config +} + +type metricsPlugin struct { + plugins.PluginMethodDefaultImpl +} + +func (p *metricsPlugin) Config() api.PluginConfig { + return &metricsConfig{} +} + +func (p *metricsPlugin) Factory() api.FilterFactory { + return metricsFactory +} + +func metricsFactory(c interface{}, callbacks api.FilterCallbackHandler) api.Filter { + return &metricsFilter{ + callbacks: callbacks, + config: c.(*metricsConfig), + } +} + +type metricsFilter struct { + api.PassThroughFilter + + callbacks api.FilterCallbackHandler + config *metricsConfig +} + +const metricsUsageCounter = "metrics-test.usage.counter" +const metricsGauge = "metrics-test.usage.gauge" + +func RegisterMetrics(c capi.ConfigCallbacks) plugins.MetricsWriter { + writer := plugins.MetricsWriter{ + Counters: map[string]capi.CounterMetric{}, + Gaugers: map[string]capi.GaugeMetric{}, + } + writer.Counters[metricsUsageCounter] = c.DefineCounterMetric(metricsUsageCounter) + writer.Gaugers[metricsGauge] = c.DefineGaugeMetric(metricsGauge) + return writer +} + +func (f *metricsFilter) DecodeHeaders(headers api.RequestHeaderMap, endStream bool) api.ResultAction { + usageCounter := f.callbacks.GetCounterMetrics("metrics", metricsUsageCounter) + if usageCounter != nil { + usageCounter.Increment(1) + } else { + return &api.LocalResponse{Code: 500, Msg: "metrics config counter is nil"} + } + + gauger := f.callbacks.GetGaugeMetrics("metrics", metricsGauge) + if gauger != nil { + gauger.Record(2) + } else { + return &api.LocalResponse{Code: 500, Msg: "metrics config gauge is nil"} + } + return &api.LocalResponse{Code: 200, Msg: "metrics works"} +} + +var mp = &metricsPlugin{} + func init() { plugins.RegisterPlugin("stream", &streamPlugin{}) plugins.RegisterPlugin("buffer", &bufferPlugin{}) @@ -631,4 +695,9 @@ func init() { plugins.RegisterPlugin("beforeConsumerAndHasOtherMethod", &beforeConsumerAndHasOtherMethodPlugin{}) plugins.RegisterPlugin("beforeConsumerAndHasDecodeRequest", &beforeConsumerAndHasDecodeRequestPlugin{}) plugins.RegisterPlugin("onLog", &onLogPlugin{}) + // register plugin "metrics" for plugin execution + plugins.RegisterPlugin("metrics", mp) + // register metrics definition for plugin "metrics" + plugins.RegisterMetricsDefinitions("metrics", RegisterMetrics) + // TODO(wonderflow): allow metrics to contains runtime information especially for listener name, this require support from envoy upstream: https://github.com/envoyproxy/envoy/issues/37808 }