Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/pkg/filtermanager/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ type StreamFilterCallbacks interface {
// PluginState returns the PluginState associated to this request.
PluginState() PluginState

// Metrics API for plugins
GetCounterMetrics(pluginName, metricsName string) api.CounterMetric
GetGaugeMetrics(pluginName, metricsName string) api.GaugeMetric

// WithLogArg injectes `key: value` as the suffix of application log created by this
// callback's Log* methods. The injected log arguments are only valid in the current request.
// This method can be used to inject IDs or other context information into the logs.
Expand Down
37 changes: 37 additions & 0 deletions api/pkg/filtermanager/api_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"mosn.io/htnn/api/internal/cookie"
"mosn.io/htnn/api/internal/pluginstate"
"mosn.io/htnn/api/pkg/filtermanager/api"
"mosn.io/htnn/api/pkg/plugins"
)

type filterManagerRequestHeaderMap struct {
Expand Down Expand Up @@ -146,6 +147,7 @@ type filterManagerCallbackHandler struct {
namespace string
consumer api.Consumer
pluginState api.PluginState
metrics map[string]plugins.MetricsWriter

streamInfo *filterManagerStreamInfo

Expand All @@ -164,6 +166,7 @@ func (cb *filterManagerCallbackHandler) Reset() {
cb.streamInfo = nil
cb.logArgNames = ""
cb.logArgs = nil
cb.metrics = nil

cb.cacheLock.Unlock()
}
Expand Down Expand Up @@ -207,6 +210,40 @@ func (cb *filterManagerCallbackHandler) PluginState() api.PluginState {
return cb.pluginState
}

func (cb *filterManagerCallbackHandler) GetCounterMetrics(pluginName, metricName string) capi.CounterMetric {
if cb.metrics == nil {
api.LogErrorf("metrics not exist or not initialized for plugin %s", pluginName)
return nil
}
writer, ok := cb.metrics[pluginName]
if !ok {
api.LogErrorf("metrics writer for plugin %s not found", pluginName)
return nil
}
if writer.Counters == nil || writer.Counters[metricName] == nil {
api.LogErrorf("counter metric %s not found in plugin %s", metricName, pluginName)
return nil
}
return writer.Counters[metricName]
}

func (cb *filterManagerCallbackHandler) GetGaugeMetrics(pluginName, metricName string) capi.GaugeMetric {
if cb.metrics == nil {
api.LogErrorf("metrics not exist or not initialized for plugin %s", pluginName)
return nil
}
writer, ok := cb.metrics[pluginName]
if !ok {
api.LogErrorf("metrics writer for plugin %s not found", pluginName)
return nil
}
if writer.Gaugers == nil || writer.Gaugers[metricName] == nil {
api.LogErrorf("gauge metric %s not found in plugin %s", metricName, pluginName)
return nil
}
return writer.Gaugers[metricName]
}

func (cb *filterManagerCallbackHandler) WithLogArg(key string, value any) api.StreamFilterCallbacks {
// As the log is embedded into the Envoy's log, it's not so necessary to use structural logging
// here. So far the value is just an ID string, introduce complex processions like quoting is
Expand Down
26 changes: 26 additions & 0 deletions api/pkg/filtermanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"sort"
"strings"
"sync"

xds "github.com/cncf/xds/go/xds/type/v3"
Expand All @@ -28,6 +29,7 @@ import (

"mosn.io/htnn/api/pkg/filtermanager/api"
"mosn.io/htnn/api/pkg/filtermanager/model"
"mosn.io/htnn/api/pkg/plugins"
pkgPlugins "mosn.io/htnn/api/pkg/plugins"
)

Expand Down Expand Up @@ -57,6 +59,8 @@ type filterManagerConfig struct {
parsed []*model.ParsedFilterConfig
pool *sync.Pool

metricsWriters map[string]plugins.MetricsWriter

namespace string

enableDebugMode bool
Expand Down Expand Up @@ -92,6 +96,9 @@ func (conf *filterManagerConfig) Merge(another *filterManagerConfig) *filterMana
ns = another.namespace
}

// Pass LDS metrics writers to the merged config for golang filter to use at route level
conf.metricsWriters = another.metricsWriters

// It's tough to do the data plane merge right. We don't use shallow copy, which may share
// data structure accidentally. We don't use deep copy all the fields, which may copy unexpected computed data.
// Let's copy fields manually.
Expand Down Expand Up @@ -179,9 +186,25 @@ func (conf *filterManagerConfig) InitOnce() {
func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigCallbackHandler) (interface{}, error) {
configStruct := &xds.TypedStruct{}

var metricsWriters = map[string]plugins.MetricsWriter{}

if callbacks != nil {
// If callbacks is not nil, it means this filter is configured in the LDS level.
// We need to initialize the metrics for all golang plugins here.
registers := plugins.GetMetricsDefinitions()
registeredPlugins := []string{}
for pluginName, register := range registers {
api.LogInfof("registering metrics for golang plugin %s", pluginName)
metricsWriters[pluginName] = register(callbacks)
registeredPlugins = append(registeredPlugins, pluginName)
}
capi.LogInfof("metrics registered for plugins: [%s]", strings.Join(registeredPlugins, ", "))
}

// No configuration
if any.GetTypeUrl() == "" {
conf := initFilterManagerConfig("")
conf.metricsWriters = metricsWriters
return conf, nil
}

Expand Down Expand Up @@ -216,6 +239,7 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC

for _, proto := range plugins {
name := proto.Name

if plugin := pkgPlugins.LoadHTTPFilterFactoryAndParser(name); plugin != nil {
config, err := plugin.ConfigParser.Parse(proto.Config)
if err != nil {
Expand Down Expand Up @@ -264,6 +288,7 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
if needInit {
conf.initOnce = &sync.Once{}
}
conf.metricsWriters = metricsWriters

return conf, nil
}
Expand All @@ -279,6 +304,7 @@ func (p *FilterManagerConfigParser) Merge(parent interface{}, child interface{})
}

if httpFilterCfg == nil || len(httpFilterCfg.parsed) == 0 {
routeCfg.metricsWriters = httpFilterCfg.metricsWriters
return routeCfg
}

Expand Down
2 changes: 2 additions & 0 deletions api/pkg/filtermanager/filtermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) (streamF

fm.callbacks.FilterCallbackHandler = cb

fm.callbacks.metrics = conf.metricsWriters

canSkipMethods := fm.canSkipMethods
canSyncRunMethods := fm.canSyncRunMethods
if canSkipMethods == nil {
Expand Down
18 changes: 18 additions & 0 deletions api/pkg/plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"errors"
"runtime/debug"

capi "github.com/envoyproxy/envoy/contrib/golang/common/go/api"

"mosn.io/htnn/api/internal/proto"
"mosn.io/htnn/api/pkg/filtermanager/api"
"mosn.io/htnn/api/pkg/log"
Expand All @@ -31,6 +33,7 @@ var (
pluginTypes = map[string]Plugin{}
plugins = map[string]Plugin{}
httpFilterFactoryAndParser = map[string]*FilterFactoryAndParser{}
metricsDefinitions = map[string]MetricsRegister{}
)

// Here we introduce extra struct to avoid cyclic import between pkg/filtermanager and pkg/plugins
Expand Down Expand Up @@ -234,3 +237,18 @@ func ComparePluginOrderInt(a, b string) int {
}
return cmp.Compare(a, b)
}

type MetricsWriter struct {
Counters map[string]capi.CounterMetric
Gaugers map[string]capi.GaugeMetric
}

type MetricsRegister func(capi.ConfigCallbacks) MetricsWriter

func RegisterMetricsDefinitions(pluginName string, definition MetricsRegister) {
metricsDefinitions[pluginName] = definition
}

func GetMetricsDefinitions() map[string]MetricsRegister {
return metricsDefinitions
}
21 changes: 21 additions & 0 deletions api/plugins/tests/integration/dataplane/data_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,27 @@ func (dp *DataPlane) do(method string, path string, header http.Header, body io.
return resp, err
}

func (dp *DataPlane) GetAdmin(path string) (*http.Response, error) {
req, err := http.NewRequest("GET", "http://localhost:"+dp.adminAPIPort+path, nil)
if err != nil {
return nil, err
}
tr := &http.Transport{
DialContext: func(ctx context.Context, proto, addr string) (conn net.Conn, err error) {
return net.DialTimeout("tcp", ":"+dp.adminAPIPort, 1*time.Second)
},
}

client := &http.Client{Transport: tr,
Timeout: 10 * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
resp, err := client.Do(req)
return resp, err
}

func (dp *DataPlane) doWithTrailer(method string, path string, header http.Header, body io.Reader, trailer http.Header) (*http.Response, error) {
req, err := http.NewRequest(method, "http://localhost:"+dp.dataPlanePort+path, body)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions api/plugins/tests/pkg/envoy/capi.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,13 @@ func (i *filterCallbackHandler) PluginState() api.PluginState {
return i.pluginState
}

func (i *filterCallbackHandler) GetCounterMetrics(pluginName, metricsName string) capi.CounterMetric {
return nil
}
func (i *filterCallbackHandler) GetGaugeMetrics(pluginName, metricsName string) capi.GaugeMetric {
return nil
}

func (i *filterCallbackHandler) WithLogArg(key string, value any) api.StreamFilterCallbacks {
return i
}
Expand Down
54 changes: 54 additions & 0 deletions api/tests/integration/filtermanager_latest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package integration
import (
"bytes"
_ "embed"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -339,3 +341,55 @@ func TestFilterManagerLogWithTrailers(t *testing.T) {
require.Nil(t, err)
assert.Equal(t, 200, resp.StatusCode)
}

func TestMetricsEnabledPlugin(t *testing.T) {
dp, err := dataplane.StartDataPlane(t, &dataplane.Option{
LogLevel: "debug",
})
if err != nil {
t.Fatalf("failed to start data plane: %v", err)
return
}
defer dp.Stop()

lp := &filtermanager.FilterManagerConfig{
Plugins: []*model.FilterConfig{
{
Name: "metrics",
Config: &Config{},
},
},
}

controlPlane.UseGoPluginConfig(t, lp, dp)
hdr := http.Header{}
resp, err := dp.Get("/", hdr)
require.Nil(t, err)
body, err := io.ReadAll(resp.Body)
require.Nil(t, err)
assert.Equal(t, 200, resp.StatusCode, "response: %s", string(body))
resp.Body.Close()

resp, err = dp.GetAdmin("/stats")
require.Nil(t, err)
body, err = io.ReadAll(resp.Body)
require.Nil(t, err)
lines := strings.Split(string(body), "\n")

var found int

for _, l := range lines {
if !strings.Contains(l, "metrics-test") {
continue
}
if strings.Contains(l, "usage.counter") {
found++
assert.Equal(t, "metrics-test.usage.counter: 1", l)
}
if strings.Contains(l, "usage.gauge") {
found++
assert.Contains(t, "metrics-test.usage.gauge: 2", l)
}
}
assert.Equal(t, 2, found, "expect to have metrics usage.counter and usage.gauge")
}
Loading