Skip to content
2 changes: 2 additions & 0 deletions br/cmd/br/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("google.golang.org/grpc.(*ClientConn).WaitForStateChange"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/config/retry.(*Config).createBackoffFn.newBackoffFn.func2"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.NewCallbackSerializer"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"),
)
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,20 @@ go_library(
"//pkg/metrics/common",
"//pkg/parser/terror",
"//pkg/timer/metrics",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/promutil",
"@com_github_pingcap_errors//:errors",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_golang//prometheus/collectors",
"@com_github_prometheus_client_model//go",
"@com_github_tikv_client_go_v2//metrics",
"@com_github_tikv_client_go_v2//util/collectors",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//channelz/grpc_channelz_v1",
"@org_golang_google_grpc//channelz/service",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//test/bufconn",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -61,11 +68,12 @@ go_test(
],
embed = [":metrics"],
flaky = True,
shard_count = 5,
shard_count = 8,
deps = [
"//pkg/parser/terror",
"//pkg/statistics/handle/cache",
"//pkg/testkit/testsetup",
"//pkg/util/intest",
"@com_github_pingcap_errors//:errors",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_model//go",
Expand Down
1 change: 1 addition & 0 deletions pkg/metrics/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.Cleanup(func(int) { cleanupGrpcChannelzCollectorForTest() }),
}
goleak.VerifyTestMain(m, opts...)
}
153 changes: 153 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,26 @@
package metrics

import (
"context"
"net"
"sync"

"github.com/pingcap/tidb/pkg/dxf/framework/dxfmetric"
"github.com/pingcap/tidb/pkg/ingestor/ingestmetric"
metricscommon "github.com/pingcap/tidb/pkg/metrics/common"
timermetrics "github.com/pingcap/tidb/pkg/timer/metrics"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
tikvmetrics "github.com/tikv/client-go/v2/metrics"
tikvcollectors "github.com/tikv/client-go/v2/util/collectors"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/channelz/grpc_channelz_v1"
"google.golang.org/grpc/channelz/service"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
)

var (
Expand Down Expand Up @@ -394,6 +403,9 @@ func RegisterMetrics() {
// StmtSummary
prometheus.MustRegister(StmtSummaryWindowRecordCount)
prometheus.MustRegister(StmtSummaryWindowEvictedCount)

// Channelz
setupChannelzCollector()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would other goleak-based suites calling metrics.RegisterMetrics() directly get go leak check error in some caces?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved by c33b6e1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case some of integration tests which call RegisterMetrics but are NOT compiled with -tags=intest, a6303d8 added the goroutine to the goleak whitelist.

}

// Register registers custom collectors.
Expand Down Expand Up @@ -458,3 +470,144 @@ func ToggleSimplifiedMode(simplified bool) {
}
}
}

var grpcChannelzCollector struct {
mu sync.Mutex

listener *bufconn.Listener
server *grpc.Server
conn *grpc.ClientConn

collector prometheus.Collector
registered bool
}

func setupChannelzCollector() {
if intest.InTest {
return
}

grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

if err := initGrpcChannelzCollectorLocked(); err != nil {
logutil.BgLogger().Warn("setup internal channelz collector failed", zap.Error(err))
return
}
if grpcChannelzCollector.registered {
return
}
prometheus.MustRegister(grpcChannelzCollector.collector)
grpcChannelzCollector.registered = true
}

// initGrpcChannelzCollectorLocked initializes the singleton channelz collector.
// It must be called with grpcChannelzCollector.mu held.
func initGrpcChannelzCollectorLocked() error {
if grpcChannelzCollector.collector != nil {
return nil
}

grpcChannelzCollector.listener = bufconn.Listen(1 << 20)
grpcChannelzCollector.server = grpc.NewServer()
service.RegisterChannelzServiceToServer(grpcChannelzCollector.server)
go func(listener *bufconn.Listener, server *grpc.Server) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is graceful shutdown of tidb needs to be considered here to close this background thread properly?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, it's just for collecting channelz data and won't block graceful shutdown.

if err := server.Serve(listener); err != nil {
logutil.BgLogger().Warn("internal channelz grpc server stopped", zap.Error(err))
}
}(grpcChannelzCollector.listener, grpcChannelzCollector.server)

listener := grpcChannelzCollector.listener
conn, err := grpc.NewClient(
"passthrough:///bufnet",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
return listener.DialContext(ctx)
}),
)
if err != nil {
stopGrpcChannelzCollectorLocked()
return err
}

grpcChannelzCollector.conn = conn
grpcChannelzCollector.collector = tikvcollectors.NewChannelzCollector(conn, channelzCollectorOpts())
return nil
}

func channelzCollectorOpts() tikvcollectors.ChannelzCollectorOpts {
return tikvcollectors.ChannelzCollectorOpts{
Namespace: namespace,
Filter: func(node any) (collect bool, walkChildren bool) {
Comment on lines +538 to +541
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This filter seems to include the collector’s own internal bufnet connection, so scraping
may inflate tidb_grpc_channelz_* by itself. Should we exclude it here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved by c5f3b5b

// Only collect socket and leaf subchannel info, which are more useful for troubleshooting network issues.
switch n := node.(type) {
case *grpc_channelz_v1.Channel:
if isInternalChannelzTarget(n.GetData().GetTarget()) {
return false, false
}
return false, true

case *grpc_channelz_v1.Subchannel:
if isInternalChannelzTarget(n.GetData().GetTarget()) {
return false, false
}
isLeaf := len(n.GetSocketRef()) > 0 &&
len(n.GetChannelRef()) == 0 &&
len(n.GetSubchannelRef()) == 0

return isLeaf, true

case *grpc_channelz_v1.Socket:
if isInternalChannelzSocket(n) {
return false, false
}
Comment on lines +561 to +563
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check necessary?

Since the parent channel is already filtered by isInternalChannelzTarget (with walkChildren=false), child sockets should never be visited during the walk, making this socket-level filter redundant.

AI suggests that it it is kept, remote == nil && remoteName == "" could accidentally match legitimate sockets in transient states (e.g., connection handshaking).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the leaf socket nodes should never be visited, and I think it is reasonable to filter out those sockets in transient states, otherwise, we may still write related PromQLs like ...{..., remote!=""}.

return true, false

default:
return false, true
}
},
}
}

// isInternalChannelzTarget returns true if the target is used for internal channelz collector, which is identified by
// the fact that its target is "bufnet" or "passthrough:///bufnet".
func isInternalChannelzTarget(target string) bool {
return target == "bufnet" || target == "passthrough:///bufnet"
}

// isInternalChannelzSocket returns true if the socket is created by the internal channelz collector for scrapping
// channelz metrics, which is identified by the fact that it has no remote endpoint.
func isInternalChannelzSocket(socket *grpc_channelz_v1.Socket) bool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better adding comments to explain the meaning of internal channel.

return socket.GetRemote() == nil && socket.GetRemoteName() == ""
}

func cleanupGrpcChannelzCollectorForTest() {
grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

stopGrpcChannelzCollectorLocked()
}

// stopGrpcChannelzCollectorLocked stops and resets the singleton channelz collector.
// It must be called with grpcChannelzCollector.mu held.
func stopGrpcChannelzCollectorLocked() {
if grpcChannelzCollector.registered && grpcChannelzCollector.collector != nil {
prometheus.Unregister(grpcChannelzCollector.collector)
}
if grpcChannelzCollector.conn != nil {
_ = grpcChannelzCollector.conn.Close()
}
if grpcChannelzCollector.server != nil {
grpcChannelzCollector.server.Stop()
}
if grpcChannelzCollector.listener != nil {
_ = grpcChannelzCollector.listener.Close()
}

grpcChannelzCollector.server = nil
grpcChannelzCollector.listener = nil
grpcChannelzCollector.conn = nil
grpcChannelzCollector.collector = nil
grpcChannelzCollector.registered = false
}
101 changes: 101 additions & 0 deletions pkg/metrics/metrics_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package metrics

import (
"strings"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -64,3 +66,102 @@ func TestStmtSummaryMetricLabels(t *testing.T) {
require.Equal(t, 5.0, readGaugeValue(t, StmtSummaryWindowRecordCount.WithLabelValues(StmtSummaryTypeV2)))
require.Equal(t, 2.0, readGaugeValue(t, StmtSummaryWindowEvictedCount.WithLabelValues(StmtSummaryTypeV2)))
}

func TestGrpcChannelzCollectorSingleton(t *testing.T) {
cleanupGrpcChannelzCollectorForTest()
t.Cleanup(cleanupGrpcChannelzCollectorForTest)

func() {
grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

require.NoError(t, initGrpcChannelzCollectorLocked())
firstServer := grpcChannelzCollector.server
firstListener := grpcChannelzCollector.listener
firstConn := grpcChannelzCollector.conn
firstCollector := grpcChannelzCollector.collector

require.NoError(t, initGrpcChannelzCollectorLocked())
require.Same(t, firstServer, grpcChannelzCollector.server)
require.Same(t, firstListener, grpcChannelzCollector.listener)
require.Same(t, firstConn, grpcChannelzCollector.conn)
require.True(t, firstCollector == grpcChannelzCollector.collector)
}()

cleanupGrpcChannelzCollectorForTest()

func() {
grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

require.Nil(t, grpcChannelzCollector.server)
require.Nil(t, grpcChannelzCollector.listener)
require.Nil(t, grpcChannelzCollector.conn)
require.Nil(t, grpcChannelzCollector.collector)
require.False(t, grpcChannelzCollector.registered)
}()
}

func TestSetupChannelzCollectorSkippedInTest(t *testing.T) {
cleanupGrpcChannelzCollectorForTest()
t.Cleanup(cleanupGrpcChannelzCollectorForTest)
require.True(t, intest.InTest)

setupChannelzCollector()

func() {
grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

require.Nil(t, grpcChannelzCollector.collector)
require.False(t, grpcChannelzCollector.registered)
}()
}

func TestGrpcChannelzCollectorGather(t *testing.T) {
cleanupGrpcChannelzCollectorForTest()
t.Cleanup(cleanupGrpcChannelzCollectorForTest)

var collector prometheus.Collector
func() {
grpcChannelzCollector.mu.Lock()
defer grpcChannelzCollector.mu.Unlock()

require.NoError(t, initGrpcChannelzCollectorLocked())
collector = grpcChannelzCollector.collector
}()

registry := prometheus.NewRegistry()
require.NoError(t, registry.Register(collector))
families, err := registry.Gather()
require.NoError(t, err)

require.NotNil(t, findMetricFamily(families, "tidb_grpc_channelz_fetch_errors_total"))
for _, family := range families {
for _, metric := range family.GetMetric() {
require.False(t, metricHasLabelValue(metric, "target", "bufnet"))
require.False(t, metricHasLabelValue(metric, "target", "passthrough:///bufnet"))
if strings.HasPrefix(family.GetName(), "tidb_grpc_channelz_socket_") {
require.False(t, metricHasLabelValue(metric, "remote", ""))
}
}
}
}

func findMetricFamily(families []*dto.MetricFamily, name string) *dto.MetricFamily {
for _, family := range families {
if family.GetName() == name {
return family
}
}
return nil
}

func metricHasLabelValue(metric *dto.Metric, name string, value string) bool {
for _, label := range metric.GetLabel() {
if label.GetName() == name && label.GetValue() == value {
return true
}
}
return false
}
2 changes: 2 additions & 0 deletions pkg/server/handler/extractorhandler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"),
}

goleak.VerifyTestMain(m, opts...)
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/handler/optimizor/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"),
}

goleak.VerifyTestMain(m, opts...)
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/handler/tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"),
}

goleak.VerifyTestMain(m, opts...)
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("google.golang.org/grpc.(*ccBalancerWrapper).watcher"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*controlBuffer).get"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*http2Client).keepalive"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"),
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/server/tests/commontest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"),
goleak.IgnoreTopFunction("google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run"),
goleak.IgnoreTopFunction("google.golang.org/grpc/test/bufconn.(*Listener).Accept"),
}

goleak.VerifyTestMain(m, opts...)
Expand Down
Loading
Loading