diff --git a/DEPS.bzl b/DEPS.bzl index 58a8557ec712e..d0993ae30eb4d 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -7805,13 +7805,13 @@ def go_deps(): build_tags = ["nextgen", "intest"], build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sha256 = "46ee8c64e0f95ad1514e824506e49ea1d9f75329ee6419aae4b0817336550557", - strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20260401083018-b7f9a9e9d2ab", + sha256 = "a5e70899561f437abf34016a1fd907ab6394ab4771f75ba509b61dd5bb806351", + strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20260414033830-1adc54c38a51", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260401083018-b7f9a9e9d2ab.zip", - "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260401083018-b7f9a9e9d2ab.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260401083018-b7f9a9e9d2ab.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260401083018-b7f9a9e9d2ab.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260414033830-1adc54c38a51.zip", + "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260414033830-1adc54c38a51.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260414033830-1adc54c38a51.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20260414033830-1adc54c38a51.zip", ], ) go_repository( diff --git a/go.mod b/go.mod index 5f8f48bcf091c..24df31229957c 100644 --- a/go.mod +++ b/go.mod @@ -124,7 +124,7 @@ require ( github.com/stathat/consistent v1.0.0 github.com/stretchr/testify v1.11.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.8-0.20260401083018-b7f9a9e9d2ab + github.com/tikv/client-go/v2 v2.0.8-0.20260414033830-1adc54c38a51 github.com/tikv/pd/client v0.0.0-20260404141330-8a6813497b52 github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67 github.com/twmb/murmur3 v1.1.6 diff --git a/go.sum b/go.sum index 4177918239d12..99ef1f773df09 100644 --- a/go.sum +++ b/go.sum @@ -895,8 +895,8 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= -github.com/tikv/client-go/v2 v2.0.8-0.20260401083018-b7f9a9e9d2ab h1:t9Kh7tVsSSUMuSPpHChPF6W3VtN4Kq7Gi8EgWPbYRyY= -github.com/tikv/client-go/v2 v2.0.8-0.20260401083018-b7f9a9e9d2ab/go.mod h1:lfRxHwyBp1rjTmNC04SUZ+dqk7i1R1AeJ2zraMQaNvY= +github.com/tikv/client-go/v2 v2.0.8-0.20260414033830-1adc54c38a51 h1:HiKahyqWSI5aijRsSk1JciFg61D5a0x3TFyleYERBl4= +github.com/tikv/client-go/v2 v2.0.8-0.20260414033830-1adc54c38a51/go.mod h1:PILS4Yr8mWPD7J6W0+hVq4Z+lwhTIYxPYUA/OTxPSvg= github.com/tikv/pd/client v0.0.0-20260404141330-8a6813497b52 h1:fXIMowblD3qdfHXJYGJpe7SbBlTO4S9GPVZZvL3CPG8= github.com/tikv/pd/client v0.0.0-20260404141330-8a6813497b52/go.mod h1:I2yRx/Yf8Y8kgM5f3VNp4a8fWpnjPC4TxWk554AY8bM= github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67 h1:9LPGD+jzxMlnk5r6+hJnar67cgpDIz/iyD+rfl5r2Vk= diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index a8b013ef506c2..20a5a334d8ec5 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -51,6 +51,7 @@ import ( "github.com/pingcap/tidb/pkg/util/chunk" contextutil "github.com/pingcap/tidb/pkg/util/context" "github.com/pingcap/tidb/pkg/util/dbterror" + "github.com/pingcap/tidb/pkg/util/execdetails" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/size" @@ -585,7 +586,7 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor terror.Call(rs.Close) return err } - w.collector.Accepted(execDetails.UnpackedBytesReceivedKVTotal) + w.collector.Accepted(execdetails.LoadTiKVExecDetails(&execDetails).UnpackedBytesReceivedKVTotal) execDetails = kvutil.ExecDetails{} _, tableScanRowCount := distsqlCtx.RuntimeStatsColl.GetCopCountAndRows(tableScanCopID) diff --git a/pkg/distsql/context/BUILD.bazel b/pkg/distsql/context/BUILD.bazel index 77d1f113ad403..1535c0384d938 100644 --- a/pkg/distsql/context/BUILD.bazel +++ b/pkg/distsql/context/BUILD.bazel @@ -18,7 +18,6 @@ go_library( "//pkg/util/tiflash", "//pkg/util/topsql/stmtstats", "@com_github_tikv_client_go_v2//kv", - "@com_github_tikv_client_go_v2//tikvrpc/interceptor", "@org_uber_go_atomic//:atomic", ], ) @@ -43,7 +42,6 @@ go_test( "//pkg/util/tiflash", "//pkg/util/topsql/stmtstats", "@com_github_tikv_client_go_v2//kv", - "@com_github_tikv_client_go_v2//tikvrpc/interceptor", "@org_uber_go_atomic//:atomic", ], ) diff --git a/pkg/distsql/context/context.go b/pkg/distsql/context/context.go index 7f6d71f0de678..65796efc7bff5 100644 --- a/pkg/distsql/context/context.go +++ b/pkg/distsql/context/context.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb/pkg/util/tiflash" "github.com/pingcap/tidb/pkg/util/topsql/stmtstats" tikvstore "github.com/tikv/client-go/v2/kv" - "github.com/tikv/client-go/v2/tikvrpc/interceptor" "go.uber.org/atomic" ) @@ -46,7 +45,6 @@ type DistSQLContext struct { KVVars *tikvstore.Variables KvExecCounter *stmtstats.KvExecCounter RUV2Metrics *execdetails.RUV2Metrics - RUV2RPCInterceptor interceptor.RPCInterceptor SessionMemTracker *memory.Tracker Location *time.Location diff --git a/pkg/distsql/context/context_test.go b/pkg/distsql/context/context_test.go index 5872cc174046e..122eba4d8b624 100644 --- a/pkg/distsql/context/context_test.go +++ b/pkg/distsql/context/context_test.go @@ -32,7 +32,6 @@ import ( "github.com/pingcap/tidb/pkg/util/tiflash" "github.com/pingcap/tidb/pkg/util/topsql/stmtstats" tikvstore "github.com/tikv/client-go/v2/kv" - "github.com/tikv/client-go/v2/tikvrpc/interceptor" "go.uber.org/atomic" ) @@ -55,10 +54,7 @@ func TestContextDetach(t *testing.T) { KVVars: kvVars, KvExecCounter: &stmtstats.KvExecCounter{}, RUV2Metrics: execdetails.NewRUV2Metrics(), - RUV2RPCInterceptor: interceptor.NewRPCInterceptor("test", func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc { - return next - }), - SessionMemTracker: &memory.Tracker{}, + SessionMemTracker: &memory.Tracker{}, Location: time.Local, RuntimeStatsColl: &execdetails.RuntimeStatsColl{}, @@ -113,7 +109,6 @@ func TestContextDetach(t *testing.T) { "$.RunawayChecker", "$.RUConsumptionReporter", "$.ExecDetails", - "$.RUV2RPCInterceptor", })) staticObj := obj.Detach() @@ -130,7 +125,6 @@ func TestContextDetach(t *testing.T) { "$.RUConsumptionReporter", "$.ExecDetails", "$.RUV2Metrics", - "$.RUV2RPCInterceptor", "$.KVVars.Killed", "$.KvExecCounter", "$.SessionMemTracker", diff --git a/pkg/distsql/distsql.go b/pkg/distsql/distsql.go index cf7cadf95520b..2f18414d56a80 100644 --- a/pkg/distsql/distsql.go +++ b/pkg/distsql/distsql.go @@ -78,7 +78,6 @@ func Select(ctx context.Context, dctx *distsqlctx.DistSQLContext, kvReq *kv.Requ } ctx = WithSQLKvExecCounterInterceptor(ctx, dctx.KvExecCounter) - ctx = WithRUV2MetricsInterceptor(ctx, dctx.RUV2RPCInterceptor) option := &kv.ClientSendOption{ SessionMemTracker: dctx.SessionMemTracker, EnabledRateLimitAction: enabledRateLimitAction, @@ -178,7 +177,6 @@ func SelectWithRuntimeStats(ctx context.Context, dctx *distsqlctx.DistSQLContext func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars any, isRestrict bool, dctx *distsqlctx.DistSQLContext) (SelectResult, error) { ctx = WithSQLKvExecCounterInterceptor(ctx, dctx.KvExecCounter) - ctx = WithRUV2MetricsInterceptor(ctx, dctx.RUV2RPCInterceptor) failpoint.Inject("mockAnalyzeRequestWaitForCancel", func(val failpoint.Value) { if val.(bool) { <-ctx.Done() @@ -290,12 +288,3 @@ func WithSQLKvExecCounterInterceptor(ctx context.Context, counter *stmtstats.KvE } return ctx } - -// WithRUV2MetricsInterceptor binds an interceptor for client-go to collect -// statement-level RUv2 request counters and TiKV ExecDetailsV2-based metrics. -func WithRUV2MetricsInterceptor(ctx context.Context, ruv2Interceptor interceptor.RPCInterceptor) context.Context { - if ruv2Interceptor != nil { - return interceptor.WithRPCInterceptor(ctx, ruv2Interceptor) - } - return ctx -} diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index d06cc1c4ecae0..3d5b26fb28112 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -441,6 +441,7 @@ go_test( "//pkg/executor/internal/testutil", "//pkg/executor/join", "//pkg/executor/sortexec", + "//pkg/executor/staticrecordset", "//pkg/executor/unionexec", "//pkg/expression", "//pkg/expression/aggregation", diff --git a/pkg/executor/adapter.go b/pkg/executor/adapter.go index 4dd26ea9cd8b9..61709332bda59 100644 --- a/pkg/executor/adapter.go +++ b/pkg/executor/adapter.go @@ -185,8 +185,10 @@ func (a *recordSet) Next(ctx context.Context, req *chunk.Chunk) (err error) { if len(a.traceID) > 0 { ctx = tikvtrace.ContextWithTraceID(ctx, a.traceID) } + ctx = inheritStmtRUV2Context(ctx, a.stmt) err = a.stmt.next(ctx, a.executor, req) + syncRUV2MetricsAfterExec(a.stmt) if err != nil { a.lastErrs = append(a.lastErrs, err) return err @@ -204,6 +206,25 @@ func (a *recordSet) Next(ctx context.Context, req *chunk.Chunk) (err error) { return nil } +func inheritStmtRUV2Context(ctx context.Context, stmt *ExecStmt) context.Context { + if stmt == nil || stmt.GoCtx == nil { + return ctx + } + return execdetails.ContextWithInheritedRUV2Details(ctx, stmt.GoCtx) +} + +func syncRUV2MetricsAfterExec(stmt *ExecStmt) { + if stmt == nil || stmt.GoCtx == nil { + return + } + sessVars := stmt.Ctx.GetSessionVars() + if sessVars.RUV2Metrics == nil { + return + } + ruDetail, _ := stmt.GoCtx.Value(util.RUDetailsCtxKey).(*util.RUDetails) + execdetails.SyncRUV2MetricsFromRUDetails(sessVars.RUV2Metrics, ruDetail) +} + // NewChunk create a chunk base on top-level executor's exec.NewFirstChunk(). func (a *recordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk { if alloc == nil { @@ -259,7 +280,7 @@ func (a *recordSet) TryDetach() (sqlexec.RecordSet, bool, error) { if !ok { return nil, false, nil } - return staticrecordset.New(a.Fields(), e, a.stmt.GetTextToLog(false)), true, nil + return staticrecordset.New(a.Fields(), e, a.stmt.GetTextToLog(false), a.stmt.GoCtx), true, nil } // GetExecutor4Test exports the internal executor for test purpose. @@ -1110,6 +1131,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e exec.Executor) ( } err = a.next(ctx, e, exec.TryNewCacheChunk(e)) + syncRUV2MetricsAfterExec(a) if err != nil { return nil, err } @@ -1717,6 +1739,7 @@ func (a *ExecStmt) finalizeStatementRUV2Metrics() { if ruDetail == nil { return } + execdetails.SyncRUV2MetricsFromRUDetails(sessVars.RUV2Metrics, ruDetail) weights := sessVars.RUV2Weights() tidbRU := sessVars.RUV2Metrics.CalculateRUValues(weights) @@ -1951,7 +1974,7 @@ func (a *ExecStmt) updateNetworkTrafficStatsAndMetrics() { hasMPPTraffic := a.updateMPPNetworkTraffic() tikvExecDetailRaw := a.GoCtx.Value(util.ExecDetailsKey) if tikvExecDetailRaw != nil { - tikvExecDetail := tikvExecDetailRaw.(*util.ExecDetails) + tikvExecDetail := execdetails.LoadTiKVExecDetails(tikvExecDetailRaw.(*util.ExecDetails)) executor_metrics.ExecutorNetworkTransmissionSentTiKVTotal.Add(float64(tikvExecDetail.UnpackedBytesSentKVTotal)) executor_metrics.ExecutorNetworkTransmissionSentTiKVCrossZone.Add(float64(tikvExecDetail.UnpackedBytesSentKVCrossZone)) executor_metrics.ExecutorNetworkTransmissionReceivedTiKVTotal.Add(float64(tikvExecDetail.UnpackedBytesReceivedKVTotal)) diff --git a/pkg/executor/adapter_test.go b/pkg/executor/adapter_test.go index fe233070f312f..77063f076c36c 100644 --- a/pkg/executor/adapter_test.go +++ b/pkg/executor/adapter_test.go @@ -20,10 +20,12 @@ import ( "slices" "strings" "sync" + "sync/atomic" "testing" "time" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/kvrpcpb" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/tidb/pkg/config" distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" @@ -453,7 +455,7 @@ func TestWriteSlowLog(t *testing.T) { checkWriteSlowLog(true) } -func TestFinishExecuteStmtReportsTiDBRUV2WithoutSyncingRUDetails(t *testing.T) { +func TestFinishExecuteStmtSyncsTiDBRUV2FromRUDetails(t *testing.T) { original := config.GetGlobalConfig() originalGenerateBinaryPlan := variable.GenerateBinaryPlan.Load() t.Cleanup(func() { @@ -488,11 +490,19 @@ func TestFinishExecuteStmtReportsTiDBRUV2WithoutSyncingRUDetails(t *testing.T) { sessVars.RUV2Metrics.AddResultChunkCells(100) sessVars.RUV2Metrics.AddPlanCnt(2) sessVars.RUV2Metrics.AddSessionParserTotal(3) - - expected := sessVars.RUV2Metrics.CalculateRUValues(sessVars.RUV2Weights()) ruDetails := goCtx.Value(util.RUDetailsCtxKey).(*util.RUDetails) ruDetails.AddTiKVRUV2(23456) + rawRUV2 := &kvrpcpb.RUV2{ + ReadRpcCount: 5, + WriteRpcCount: 7, + StorageProcessedKeysBatchGet: 11, + } + ruDetails.AddRUV2(rawRUV2) ruDetails.UpdateTiFlash(&rmpb.Consumption{RRU: 345, WRU: 67}) + // Build expected metrics by cloning the current state and manually adding + // the RUV2 counters (without draining ruDetails, since FinishExecuteStmt will drain). + expected := sessVars.RUV2Metrics.Clone() + execdetails.UpdateRUV2MetricsFromRUV2(expected, rawRUV2) execStmt := &executor.ExecStmt{ Ctx: ctx, @@ -502,9 +512,12 @@ func TestFinishExecuteStmtReportsTiDBRUV2WithoutSyncingRUDetails(t *testing.T) { execStmt.FinishExecuteStmt(0, nil, false) require.Equal(t, float64(23456), ruDetails.TiKVRUV2()) + require.Equal(t, int64(5), sessVars.RUV2Metrics.ResourceManagerReadCnt()) + require.Equal(t, int64(7), sessVars.RUV2Metrics.ResourceManagerWriteCnt()) + require.Equal(t, int64(11), sessVars.RUV2Metrics.TiKVStorageProcessedKeysBatchGet()) require.Equal(t, "rg1", reporter.group) require.Equal(t, float64(23456), reporter.tikvRUV2) - require.Equal(t, expected, reporter.tidbRUV2) + require.Equal(t, expected.CalculateRUValues(sessVars.RUV2Weights()), reporter.tidbRUV2) require.Equal(t, float64(412), reporter.tiflashRU) t.Run("stmt summary ignores optimistic autocommit retry count", func(t *testing.T) { @@ -569,6 +582,60 @@ func TestFinishExecuteStmtReportsTiDBRUV2WithoutSyncingRUDetails(t *testing.T) { require.Zero(t, reporter.tidbRUV2) require.Zero(t, reporter.tiflashRU) }) + + t.Run("network traffic stats are read atomically", func(t *testing.T) { + reporter := &mockRUV2ConsumptionReporter{} + ctx := &mockRUV2ReportingContext{ + Context: mock.NewContext(), + reporter: reporter, + } + sessVars := ctx.GetSessionVars() + sessVars.StartTime = time.Now() + sessVars.StmtCtx.StmtType = "Select" + sessVars.StmtCtx.OriginalSQL = "select 1" + sessVars.StmtCtx.ResetSQLDigest(sessVars.StmtCtx.OriginalSQL) + sessVars.RUV2Metrics = execdetails.NewRUV2Metrics() + + goCtx := execdetails.ContextWithInitializedExecDetails(context.Background()) + tikvExecDetail := goCtx.Value(util.ExecDetailsKey).(*util.ExecDetails) + execStmt := &executor.ExecStmt{ + Ctx: ctx, + GoCtx: goCtx, + StmtNode: &ast.SelectStmt{}, + } + + done := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-done: + return + default: + atomic.AddInt64(&tikvExecDetail.WaitKVRespDuration, int64(time.Millisecond)) + atomic.AddInt64(&tikvExecDetail.WaitPDRespDuration, int64(time.Millisecond)) + atomic.AddInt64(&tikvExecDetail.BackoffDuration, int64(time.Millisecond)) + atomic.AddInt64(&tikvExecDetail.UnpackedBytesSentKVTotal, 1) + atomic.AddInt64(&tikvExecDetail.UnpackedBytesReceivedKVTotal, 1) + atomic.AddInt64(&tikvExecDetail.UnpackedBytesSentKVCrossZone, 1) + atomic.AddInt64(&tikvExecDetail.UnpackedBytesReceivedKVCrossZone, 1) + atomic.AddInt64(&tikvExecDetail.UnpackedBytesSentMPPTotal, 1) + atomic.AddInt64(&tikvExecDetail.UnpackedBytesReceivedMPPTotal, 1) + atomic.AddInt64(&tikvExecDetail.UnpackedBytesSentMPPCrossZone, 1) + atomic.AddInt64(&tikvExecDetail.UnpackedBytesReceivedMPPCrossZone, 1) + } + } + }() + + for range 64 { + execStmt.FinishExecuteStmt(0, nil, false) + } + + close(done) + wg.Wait() + }) } func TestSlowLogMaxPerSec(t *testing.T) { diff --git a/pkg/executor/explain.go b/pkg/executor/explain.go index 426e9d6692798..106b69f4fa65d 100644 --- a/pkg/executor/explain.go +++ b/pkg/executor/explain.go @@ -143,7 +143,7 @@ func (e *ExplainExec) executeAnalyzeExec(ctx context.Context) (err error) { if ruDetailsRaw != nil { ruDetails = ruDetailsRaw.(*clientutil.RUDetails).Clone() } - ruv2Metrics := execdetails.RUV2MetricsFromContext(ctx) + ruv2Metrics := execdetails.SyncRUV2MetricsFromContext(ctx) if ruDetails != nil || ruv2Metrics != nil { ruVersion := rmclient.DefaultRUVersion if do := domain.GetDomain(e.Ctx()); do != nil { diff --git a/pkg/executor/explain_unit_test.go b/pkg/executor/explain_unit_test.go index 28505b9f18e41..5e4923ff878af 100644 --- a/pkg/executor/explain_unit_test.go +++ b/pkg/executor/explain_unit_test.go @@ -19,7 +19,9 @@ import ( "errors" "testing" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/pkg/executor/internal/exec" + "github.com/pingcap/tidb/pkg/executor/staticrecordset" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/planner/core" @@ -31,6 +33,7 @@ import ( "github.com/pingcap/tidb/pkg/util/execdetails" "github.com/pingcap/tidb/pkg/util/mock" "github.com/stretchr/testify/require" + clientutil "github.com/tikv/client-go/v2/util" ) var ( @@ -47,6 +50,12 @@ type mockEmptyOperator struct { exec.BaseExecutor } +type mockExecDetailsObserver struct { + exec.BaseExecutor + seenMetrics *execdetails.RUV2Metrics + seenRUDetails *clientutil.RUDetails +} + func (e *mockErrorOperator) Open(_ context.Context) error { return nil } @@ -76,6 +85,21 @@ func (e *mockEmptyOperator) Close() error { return nil } +func (e *mockExecDetailsObserver) Open(_ context.Context) error { + return nil +} + +func (e *mockExecDetailsObserver) Next(ctx context.Context, chk *chunk.Chunk) error { + chk.Reset() + e.seenMetrics = execdetails.RUV2MetricsFromContext(ctx) + e.seenRUDetails, _ = ctx.Value(clientutil.RUDetailsCtxKey).(*clientutil.RUDetails) + return nil +} + +func (e *mockExecDetailsObserver) Close() error { + return nil +} + func getColumns() []*expression.Column { return []*expression.Column{ {Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)}, @@ -149,4 +173,67 @@ func TestExplainAnalyzeInvokeNextAndClose(t *testing.T) { recordInsertRows2Metrics(ctx.GetSessionVars()) require.Equal(t, int64(5), ctx.GetSessionVars().RUV2Metrics.ExecutorL5InsertRows()) }) + + t.Run("explain analyze drains pending raw ruv2 before snapshot", func(t *testing.T) { + ctx := mock.NewContext() + ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl(nil) + + goCtx := execdetails.ContextWithInitializedExecDetails(context.Background()) + ctx.GetSessionVars().RUV2Metrics = execdetails.RUV2MetricsFromContext(goCtx) + require.NotNil(t, ctx.GetSessionVars().RUV2Metrics) + + ruDetails := goCtx.Value(clientutil.RUDetailsCtxKey).(*clientutil.RUDetails) + ruDetails.AddRUV2(&kvrpcpb.RUV2{ + ReadRpcCount: 2, + WriteRpcCount: 3, + }) + + analyzeExec := &mockEmptyOperator{ + BaseExecutor: exec.NewBaseExecutor(ctx, expression.NewSchema(), 1), + } + targetPlan := physicalop.PhysicalTableDual{RowCount: 1}.Init(ctx, &property.StatsInfo{RowCount: 1}, 0) + explainExec := &ExplainExec{ + BaseExecutor: exec.NewBaseExecutor(ctx, expression.NewSchema(getColumns()...), 0), + explain: &core.Explain{ + Analyze: true, + TargetPlan: targetPlan, + }, + analyzeExec: analyzeExec, + } + + require.NoError(t, explainExec.executeAnalyzeExec(goCtx)) + + rootStats := ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(targetPlan.ID()) + _, groups := rootStats.MergeStats() + var ruStats *execdetails.RURuntimeStats + for _, group := range groups { + if stats, ok := group.(*execdetails.RURuntimeStats); ok { + ruStats = stats + break + } + } + require.NotNil(t, ruStats) + require.Equal(t, int64(2), ruStats.Metrics.ResourceManagerReadCnt()) + require.Equal(t, int64(3), ruStats.Metrics.ResourceManagerWriteCnt()) + }) + + t.Run("detached static recordset inherits statement ru context", func(t *testing.T) { + ctx := mock.NewContext() + observer := &mockExecDetailsObserver{ + BaseExecutor: exec.NewBaseExecutor(ctx, expression.NewSchema(), 0), + } + + sourceCtx := execdetails.ContextWithInitializedExecDetails(context.Background()) + sourceMetrics := execdetails.RUV2MetricsFromContext(sourceCtx) + sourceRUDetails := sourceCtx.Value(clientutil.RUDetailsCtxKey).(*clientutil.RUDetails) + rs := staticrecordset.New(nil, observer, "select 1", sourceCtx) + + fetchCtx := execdetails.ContextWithInitializedExecDetails(context.Background()) + require.NotSame(t, sourceMetrics, execdetails.RUV2MetricsFromContext(fetchCtx)) + require.NotSame(t, sourceRUDetails, fetchCtx.Value(clientutil.RUDetailsCtxKey).(*clientutil.RUDetails)) + + require.NoError(t, rs.Next(fetchCtx, rs.NewChunk(nil))) + require.Same(t, sourceMetrics, observer.seenMetrics) + require.Same(t, sourceRUDetails, observer.seenRUDetails) + }) } diff --git a/pkg/executor/staticrecordset/BUILD.bazel b/pkg/executor/staticrecordset/BUILD.bazel index 571ef0eea98a6..4a13ae7c15b46 100644 --- a/pkg/executor/staticrecordset/BUILD.bazel +++ b/pkg/executor/staticrecordset/BUILD.bazel @@ -14,8 +14,10 @@ go_library( "//pkg/session/cursor", "//pkg/util", "//pkg/util/chunk", + "//pkg/util/execdetails", "//pkg/util/logutil", "//pkg/util/sqlexec", + "@com_github_tikv_client_go_v2//util", "@org_uber_go_zap//:zap", ], ) diff --git a/pkg/executor/staticrecordset/recordset.go b/pkg/executor/staticrecordset/recordset.go index a3e6fa63ee0d5..5ced67e5f7ee2 100644 --- a/pkg/executor/staticrecordset/recordset.go +++ b/pkg/executor/staticrecordset/recordset.go @@ -21,8 +21,10 @@ import ( "github.com/pingcap/tidb/pkg/planner/core/resolve" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/execdetails" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/sqlexec" + clientutil "github.com/tikv/client-go/v2/util" "go.uber.org/zap" ) @@ -32,15 +34,21 @@ type staticRecordSet struct { fields []*resolve.ResultField executor exec.Executor - sqlText string + sqlText string + sourceCtx context.Context } // New creates a new staticRecordSet -func New(fields []*resolve.ResultField, executor exec.Executor, sqlText string) sqlexec.RecordSet { +func New(fields []*resolve.ResultField, executor exec.Executor, sqlText string, sourceCtx ...context.Context) sqlexec.RecordSet { + var inherited context.Context + if len(sourceCtx) > 0 { + inherited = sourceCtx[0] + } return &staticRecordSet{ - fields: fields, - executor: executor, - sqlText: sqlText, + fields: fields, + executor: executor, + sqlText: sqlText, + sourceCtx: inherited, } } @@ -49,6 +57,14 @@ func (s *staticRecordSet) Fields() []*resolve.ResultField { } func (s *staticRecordSet) Next(ctx context.Context, req *chunk.Chunk) (err error) { + if s.sourceCtx != nil { + if ruDetails, _ := s.sourceCtx.Value(clientutil.RUDetailsCtxKey).(*clientutil.RUDetails); ruDetails != nil { + ctx = context.WithValue(ctx, clientutil.RUDetailsCtxKey, ruDetails) + } + if metrics := execdetails.RUV2MetricsFromContext(s.sourceCtx); metrics != nil { + ctx = context.WithValue(ctx, execdetails.RUV2MetricsCtxKey, metrics) + } + } defer func() { r := recover() if r == nil { diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 1e90c56a99d76..9335ac7fceb62 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -214,6 +214,7 @@ go_test( "@com_github_docker_go_units//:go-units", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_kvproto//pkg/resource_manager", "@com_github_prometheus_client_golang//prometheus/testutil", diff --git a/pkg/server/conn_stmt_test.go b/pkg/server/conn_stmt_test.go index 8bc927d6cb708..59a808c52b783 100644 --- a/pkg/server/conn_stmt_test.go +++ b/pkg/server/conn_stmt_test.go @@ -22,6 +22,7 @@ import ( "fmt" "testing" + "github.com/pingcap/kvproto/pkg/kvrpcpb" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/meta/model" @@ -205,6 +206,7 @@ func TestCursorWithParams(t *testing.T) { ruv2Metrics.AddPlanCnt(2) ruDetails := goCtx.Value(clientutil.RUDetailsCtxKey).(*clientutil.RUDetails) ruDetails.AddTiKVRUV2(11) + ruDetails.AddRUV2(&kvrpcpb.RUV2{ReadRpcCount: 3}) weights := execdetails.RUV2Weights{ RUScale: cfg.RUV2.RUScale, ResultChunkCells: cfg.RUV2.ResultChunkCells, @@ -219,9 +221,9 @@ func TestCursorWithParams(t *testing.T) { SessionParserTotal: cfg.RUV2.SessionParserTotal, TxnCnt: cfg.RUV2.TxnCnt, } - baselineTiDBRU := ruv2Metrics.CalculateRUValues(weights) - tracker := resultset.NewCursorRUV2Tracker(reporter, "rg1", ruv2Metrics, ruDetails, weights) + require.Equal(t, int64(3), ruv2Metrics.ResourceManagerReadCnt()) + baselineTiDBRU := ruv2Metrics.CalculateRUValues(weights) resultsetRS := resultset.New(&mockCursorTrackerRecordSet{}, nil) resultset.AttachCursorRUV2Tracker(resultsetRS, tracker) resultset.ReportCursorRUV2Delta(resultsetRS, 6) @@ -232,6 +234,12 @@ func TestCursorWithParams(t *testing.T) { require.Equal(t, 0.0, reporter.tikvRUV2) require.Equal(t, 0.0, reporter.tiflashRU) + postBaselineTiDBRU := ruv2Metrics.CalculateRUValues(weights) + ruDetails.AddRUV2(&kvrpcpb.RUV2{WriteRpcCount: 4}) + resultset.ReportCursorRUV2Delta(resultsetRS, 0) + expectedPendingRawDelta := ruv2Metrics.CalculateRUValues(weights) - postBaselineTiDBRU + require.Equal(t, expectedCursorDelta+expectedPendingRawDelta, reporter.tidbRUV2) + ruDetails.AddTiKVRUV2(7) ruDetails.UpdateTiFlash(&rmpb.Consumption{RRU: 5, WRU: 8}) resultset.ReportCursorRUV2Delta(resultsetRS, 0) diff --git a/pkg/server/internal/resultset/resultset.go b/pkg/server/internal/resultset/resultset.go index 9276aae2245d6..031aa8dae7aa1 100644 --- a/pkg/server/internal/resultset/resultset.go +++ b/pkg/server/internal/resultset/resultset.go @@ -100,6 +100,7 @@ func NewCursorRUV2Tracker( ruDetails: ruDetails, weights: weights, } + execdetails.SyncRUV2MetricsFromRUDetails(tracker.metrics, tracker.ruDetails) if metrics != nil { tracker.reportedTiDBRU = metrics.CalculateRUValues(weights) } @@ -126,6 +127,7 @@ func (t *CursorRUV2Tracker) reportDelta() { var currentTiDBRU float64 if t.metrics != nil { + execdetails.SyncRUV2MetricsFromRUDetails(t.metrics, t.ruDetails) currentTiDBRU = t.metrics.CalculateRUValues(t.weights) } currentTiKVRUV2 := t.reportedTiKVRUV2 diff --git a/pkg/session/BUILD.bazel b/pkg/session/BUILD.bazel index 8ad1c907d87cb..8730e6e9ea553 100644 --- a/pkg/session/BUILD.bazel +++ b/pkg/session/BUILD.bazel @@ -95,7 +95,6 @@ go_library( "//pkg/statistics/handle/usage/indexusage", "//pkg/store", "//pkg/store/driver/error", - "//pkg/store/driver/txn", "//pkg/store/helper", "//pkg/store/mockstore", "//pkg/table", diff --git a/pkg/session/session.go b/pkg/session/session.go index dcfd740395092..a95f8d4b9502c 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -109,7 +109,6 @@ import ( "github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage" kvstore "github.com/pingcap/tidb/pkg/store" storeerr "github.com/pingcap/tidb/pkg/store/driver/error" - drivertxn "github.com/pingcap/tidb/pkg/store/driver/txn" "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tblctx" @@ -2425,6 +2424,7 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (sqlexec.RecordSet, error) { r, ctx := tracing.StartRegionEx(ctx, "session.ExecuteStmt") defer r.End() + ctx = execdetails.ContextWithMissingExecDetailsInitialized(ctx) if err := s.PrepareTxnCtx(ctx, stmtNode); err != nil { return nil, err @@ -2446,10 +2446,6 @@ func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (s return nil, err } ruv2Metrics := execdetails.RUV2MetricsFromContext(ctx) - if ruv2Metrics == nil { - ruv2Metrics = execdetails.NewRUV2Metrics() - ctx = context.WithValue(ctx, execdetails.RUV2MetricsCtxKey, ruv2Metrics) - } sessVars.RUV2Metrics = ruv2Metrics bypass := shouldBypass(ctx, stmtNode, sessVars) if ruv2Metrics != nil { @@ -3351,7 +3347,6 @@ func (s *session) GetDistSQLCtx() *distsqlctx.DistSQLContext { KVVars: vars.KVVars, KvExecCounter: sc.KvExecCounter, RUV2Metrics: vars.RUV2Metrics, - RUV2RPCInterceptor: drivertxn.NewStatementRUV2RPCInterceptor(vars.RUV2Metrics), SessionMemTracker: vars.MemTracker, Location: sc.TimeZone(), @@ -3407,7 +3402,6 @@ func (s *session) GetDistSQLCtx() *distsqlctx.DistSQLContext { } if dctx.RUV2Metrics != vars.RUV2Metrics { dctx.RUV2Metrics = vars.RUV2Metrics - dctx.RUV2RPCInterceptor = drivertxn.NewStatementRUV2RPCInterceptor(vars.RUV2Metrics) } return dctx diff --git a/pkg/session/tidb_test.go b/pkg/session/tidb_test.go index 413c99107dd14..b72430bfed24d 100644 --- a/pkg/session/tidb_test.go +++ b/pkg/session/tidb_test.go @@ -92,7 +92,6 @@ func TestRUV2SessionParserTotalDoesNotLeakAcrossStandaloneParse(t *testing.T) { dctx := se.GetDistSQLCtx() require.Same(t, se.sessionVars.RUV2Metrics, dctx.RUV2Metrics) - require.NotNil(t, dctx.RUV2RPCInterceptor) }) t.Run("internal others bypass skips parser ru accounting", func(t *testing.T) { @@ -198,9 +197,8 @@ func TestRUV2MetricsIsolatedPerStatementInExplicitTxn(t *testing.T) { require.NoError(t, err) metrics2 := se.sessionVars.RUV2Metrics - // Each statement must get a fresh RUV2Metrics object so that the - // interceptor bound during execution targets the current statement, - // not a previous one. + // Each statement must get a fresh RUV2Metrics object so that RUv2 accounting + // stays isolated per statement, not reused from a previous one. require.NotNil(t, metrics2) require.NotSame(t, metricsBegin, metrics1, "stmt1 should have different metrics from BEGIN") require.NotSame(t, metrics1, metrics2, "stmt2 should have different metrics from stmt1") diff --git a/pkg/sessionctx/variable/slow_log.go b/pkg/sessionctx/variable/slow_log.go index 2d824348c7aae..7db2b4b2340b9 100644 --- a/pkg/sessionctx/variable/slow_log.go +++ b/pkg/sessionctx/variable/slow_log.go @@ -332,17 +332,18 @@ func kvExecDetailFormat(buf *bytes.Buffer, kvExecDetail *util.ExecDetails) { writeSlowLogItem(buf, SlowLogUnpackedBytesReceivedTiFlashCrossZone, zeroStr) return } - writeSlowLogItem(buf, SlowLogKVTotal, strconv.FormatFloat(time.Duration(kvExecDetail.WaitKVRespDuration).Seconds(), 'f', -1, 64)) - writeSlowLogItem(buf, SlowLogPDTotal, strconv.FormatFloat(time.Duration(kvExecDetail.WaitPDRespDuration).Seconds(), 'f', -1, 64)) - writeSlowLogItem(buf, SlowLogBackoffTotal, strconv.FormatFloat(time.Duration(kvExecDetail.BackoffDuration).Seconds(), 'f', -1, 64)) - writeSlowLogItem(buf, SlowLogUnpackedBytesSentTiKVTotal, strconv.FormatInt(kvExecDetail.UnpackedBytesSentKVTotal, 10)) - writeSlowLogItem(buf, SlowLogUnpackedBytesReceivedTiKVTotal, strconv.FormatInt(kvExecDetail.UnpackedBytesReceivedKVTotal, 10)) - writeSlowLogItem(buf, SlowLogUnpackedBytesSentTiKVCrossZone, strconv.FormatInt(kvExecDetail.UnpackedBytesSentKVCrossZone, 10)) - writeSlowLogItem(buf, SlowLogUnpackedBytesReceivedTiKVCrossZone, strconv.FormatInt(kvExecDetail.UnpackedBytesReceivedKVCrossZone, 10)) - writeSlowLogItem(buf, SlowLogUnpackedBytesSentTiFlashTotal, strconv.FormatInt(kvExecDetail.UnpackedBytesSentMPPTotal, 10)) - writeSlowLogItem(buf, SlowLogUnpackedBytesReceivedTiFlashTotal, strconv.FormatInt(kvExecDetail.UnpackedBytesReceivedMPPTotal, 10)) - writeSlowLogItem(buf, SlowLogUnpackedBytesSentTiFlashCrossZone, strconv.FormatInt(kvExecDetail.UnpackedBytesSentMPPCrossZone, 10)) - writeSlowLogItem(buf, SlowLogUnpackedBytesReceivedTiFlashCrossZone, strconv.FormatInt(kvExecDetail.UnpackedBytesReceivedMPPCrossZone, 10)) + snapshot := execdetails.LoadTiKVExecDetails(kvExecDetail) + writeSlowLogItem(buf, SlowLogKVTotal, strconv.FormatFloat(time.Duration(snapshot.WaitKVRespDuration).Seconds(), 'f', -1, 64)) + writeSlowLogItem(buf, SlowLogPDTotal, strconv.FormatFloat(time.Duration(snapshot.WaitPDRespDuration).Seconds(), 'f', -1, 64)) + writeSlowLogItem(buf, SlowLogBackoffTotal, strconv.FormatFloat(time.Duration(snapshot.BackoffDuration).Seconds(), 'f', -1, 64)) + writeSlowLogItem(buf, SlowLogUnpackedBytesSentTiKVTotal, strconv.FormatInt(snapshot.UnpackedBytesSentKVTotal, 10)) + writeSlowLogItem(buf, SlowLogUnpackedBytesReceivedTiKVTotal, strconv.FormatInt(snapshot.UnpackedBytesReceivedKVTotal, 10)) + writeSlowLogItem(buf, SlowLogUnpackedBytesSentTiKVCrossZone, strconv.FormatInt(snapshot.UnpackedBytesSentKVCrossZone, 10)) + writeSlowLogItem(buf, SlowLogUnpackedBytesReceivedTiKVCrossZone, strconv.FormatInt(snapshot.UnpackedBytesReceivedKVCrossZone, 10)) + writeSlowLogItem(buf, SlowLogUnpackedBytesSentTiFlashTotal, strconv.FormatInt(snapshot.UnpackedBytesSentMPPTotal, 10)) + writeSlowLogItem(buf, SlowLogUnpackedBytesReceivedTiFlashTotal, strconv.FormatInt(snapshot.UnpackedBytesReceivedMPPTotal, 10)) + writeSlowLogItem(buf, SlowLogUnpackedBytesSentTiFlashCrossZone, strconv.FormatInt(snapshot.UnpackedBytesSentMPPCrossZone, 10)) + writeSlowLogItem(buf, SlowLogUnpackedBytesReceivedTiFlashCrossZone, strconv.FormatInt(snapshot.UnpackedBytesReceivedMPPCrossZone, 10)) } // SlowLogFormat uses for formatting slow log. @@ -645,7 +646,8 @@ func makeKVExecDetailAccessor(parse func(string) (any, error), if items.KVExecDetail == nil { tikvExecDetailRaw := ctx.Value(util.ExecDetailsKey) if tikvExecDetailRaw != nil { - items.KVExecDetail = tikvExecDetailRaw.(*util.ExecDetails) + snapshot := execdetails.LoadTiKVExecDetails(tikvExecDetailRaw.(*util.ExecDetails)) + items.KVExecDetail = &snapshot } } }, diff --git a/pkg/sessionctx/variable/tests/slowlog/slow_log_test.go b/pkg/sessionctx/variable/tests/slowlog/slow_log_test.go index 63f7fc0bfeb05..79afe57e8ef24 100644 --- a/pkg/sessionctx/variable/tests/slowlog/slow_log_test.go +++ b/pkg/sessionctx/variable/tests/slowlog/slow_log_test.go @@ -155,6 +155,21 @@ func TestMatchSpecialTypeConditions(t *testing.T) { accessor.Setter(childCtx, nil, items) checkRet(true, slowlogrule.SlowLogCondition{Field: variable.SlowLogKVTotal, Threshold: 0.00000001}) checkRet(true, slowlogrule.SlowLogCondition{Field: variable.SlowLogKVTotal, Threshold: 0.0}) + + t.Run("setter snapshots kv exec details", func(t *testing.T) { + items := &variable.SlowQueryLogItems{} + tikvExecDetail := util.ExecDetails{ + WaitKVRespDuration: (10 * time.Second).Nanoseconds(), + } + accessor := variable.SlowLogRuleFieldAccessors[strings.ToLower(variable.SlowLogKVTotal)] + accessor.Setter(context.WithValue(context.Background(), util.ExecDetailsKey, &tikvExecDetail), nil, items) + tikvExecDetail.WaitKVRespDuration = (20 * time.Second).Nanoseconds() + + require.NotNil(t, items.KVExecDetail) + require.Equal(t, (10 * time.Second).Nanoseconds(), items.KVExecDetail.WaitKVRespDuration) + require.True(t, accessor.Match(nil, items, 9.0)) + require.False(t, accessor.Match(nil, items, 11.0)) + }) }) t.Run("execdetails.ExecDetails type", func(t *testing.T) { diff --git a/pkg/sessiontxn/isolation/BUILD.bazel b/pkg/sessiontxn/isolation/BUILD.bazel index abd9eff2f60fb..eae35af4a6131 100644 --- a/pkg/sessiontxn/isolation/BUILD.bazel +++ b/pkg/sessiontxn/isolation/BUILD.bazel @@ -42,7 +42,6 @@ go_library( "@com_github_tikv_client_go_v2//error", "@com_github_tikv_client_go_v2//kv", "@com_github_tikv_client_go_v2//oracle", - "@com_github_tikv_client_go_v2//tikvrpc/interceptor", "@com_github_tikv_client_go_v2//txnkv/transaction", "@org_uber_go_zap//:zap", ], @@ -52,17 +51,16 @@ go_test( name = "isolation_test", timeout = "short", srcs = [ - "base_test.go", "main_test.go", "optimistic_test.go", "readcommitted_test.go", "repeatable_read_test.go", "serializable_test.go", ], - embed = [":isolation"], flaky = True, - shard_count = 29, + shard_count = 28, deps = [ + ":isolation", "//pkg/config", "//pkg/config/kerneltype", "//pkg/executor", @@ -75,12 +73,10 @@ go_test( "//pkg/planner/core/resolve", "//pkg/session", "//pkg/sessionctx", - "//pkg/sessionctx/variable", "//pkg/sessiontxn", "//pkg/testkit", "//pkg/testkit/testfork", "//pkg/testkit/testsetup", - "//pkg/util/execdetails", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/kvrpcpb", @@ -88,7 +84,6 @@ go_test( "@com_github_tikv_client_go_v2//error", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//tikv", - "@com_github_tikv_client_go_v2//tikvrpc", "@org_uber_go_goleak//:goleak", ], ) diff --git a/pkg/sessiontxn/isolation/base.go b/pkg/sessiontxn/isolation/base.go index 131aaee754f6f..3251b42b0848b 100644 --- a/pkg/sessiontxn/isolation/base.go +++ b/pkg/sessiontxn/isolation/base.go @@ -42,7 +42,6 @@ import ( "github.com/pingcap/tidb/pkg/util/tracing" tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" - "github.com/tikv/client-go/v2/tikvrpc/interceptor" "github.com/tikv/client-go/v2/txnkv/transaction" "go.uber.org/zap" ) @@ -78,13 +77,6 @@ type baseTxnContextProvider struct { constStartTS uint64 } -func currentStatementRUV2RPCInterceptor(sessVars *variable.SessionVars) interceptor.RPCInterceptor { - if sessVars == nil { - return nil - } - return drivertxn.NewStatementRUV2RPCInterceptor(sessVars.RUV2Metrics) -} - // OnInitialize is the hook that should be called when enter a new txn with this provider func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn.EnterNewTxnType) (err error) { if p.getStmtReadTSFunc == nil || p.getStmtForUpdateTSFunc == nil { @@ -463,14 +455,10 @@ func (p *baseTxnContextProvider) getSnapshotByTS(snapshotTS uint64) (kv.Snapshot } sessVars := p.sctx.GetSessionVars() - ruv2Interceptor := currentStatementRUV2RPCInterceptor(sessVars) txnCtx := sessVars.TxnCtx var snapshot kv.Snapshot if kvTxn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() && txnCtx.StartTS == snapshotTS { - if ruv2Interceptor != nil { - kvTxn.SetOption(kv.RPCInterceptor, ruv2Interceptor) - } snapshot = kvTxn.GetSnapshot() } else { snapshot = internal.GetSnapshotWithTS( @@ -478,9 +466,6 @@ func (p *baseTxnContextProvider) getSnapshotByTS(snapshotTS uint64) (kv.Snapshot snapshotTS, temptable.SessionSnapshotInterceptor(p.sctx, p.infoSchema), ) - if ruv2Interceptor != nil { - snapshot.SetOption(kv.RPCInterceptor, ruv2Interceptor) - } } snapshot.SetOption(kv.ReplicaRead, p.sctx.GetSessionVars().GetReplicaRead()) @@ -533,9 +518,6 @@ func (p *baseTxnContextProvider) SetOptionsOnTxnActive(txn kv.Transaction) { // Bind an interceptor for client-go to count the number of SQL executions of each TiKV. txn.SetOption(kv.RPCInterceptor, sessVars.StmtCtx.KvExecCounter.RPCInterceptor()) } - if interceptor := currentStatementRUV2RPCInterceptor(sessVars); interceptor != nil { - txn.SetOption(kv.RPCInterceptor, interceptor) - } txn.SetOption(kv.ResourceGroupTagger, sessVars.StmtCtx.GetResourceGroupTagger()) txn.SetOption(kv.ExplicitRequestSourceType, sessVars.ExplicitRequestSourceType) @@ -635,9 +617,6 @@ func (p *baseTxnContextProvider) SetOptionsBeforeCommit( // Bind an interceptor for client-go to count the number of SQL executions of each TiKV. txn.SetOption(kv.RPCInterceptor, sessVars.StmtCtx.KvExecCounter.RPCInterceptor()) } - if interceptor := currentStatementRUV2RPCInterceptor(sessVars); interceptor != nil { - txn.SetOption(kv.RPCInterceptor, interceptor) - } if tables := sessVars.TxnCtx.TemporaryTables; len(tables) > 0 { txn.SetOption(kv.KVFilter, temporaryTableKVFilter(tables)) diff --git a/pkg/sessiontxn/isolation/base_test.go b/pkg/sessiontxn/isolation/base_test.go deleted file mode 100644 index c8f22a6e79703..0000000000000 --- a/pkg/sessiontxn/isolation/base_test.go +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright 2026 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package isolation - -import ( - "testing" - - "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/tidb/pkg/sessionctx/variable" - "github.com/pingcap/tidb/pkg/util/execdetails" - "github.com/stretchr/testify/require" - "github.com/tikv/client-go/v2/tikvrpc" -) - -func TestCurrentStatementRUV2RPCInterceptorCapturesMetrics(t *testing.T) { - sessVars := variable.NewSessionVars(nil) - metrics1 := execdetails.NewRUV2Metrics() - metrics2 := execdetails.NewRUV2Metrics() - sessVars.RUV2Metrics = metrics1 - - it := currentStatementRUV2RPCInterceptor(sessVars) - require.NotNil(t, it) - - // Simulate the next statement replacing SessionVars.RUV2Metrics while the old - // statement still has async cleanup RPCs in flight. - sessVars.RUV2Metrics = metrics2 - - wrapFn := it.Wrap(func(_ string, req *tikvrpc.Request) (*tikvrpc.Response, error) { - switch req.Type { - case tikvrpc.CmdBatchGet: - return &tikvrpc.Response{ - Resp: &kvrpcpb.BatchGetResponse{ - ExecDetailsV2: &kvrpcpb.ExecDetailsV2{ - RuV2: &kvrpcpb.RUV2{ - StorageProcessedKeysBatchGet: 2, - }, - }, - }, - }, nil - case tikvrpc.CmdPrewrite: - return &tikvrpc.Response{Resp: &kvrpcpb.PrewriteResponse{}}, nil - default: - return &tikvrpc.Response{}, nil - } - }) - - _, err := wrapFn("tikv-1", &tikvrpc.Request{Type: tikvrpc.CmdBatchGet, StoreTp: tikvrpc.TiKV}) - require.NoError(t, err) - _, err = wrapFn("tikv-1", &tikvrpc.Request{Type: tikvrpc.CmdPrewrite, StoreTp: tikvrpc.TiKV}) - require.NoError(t, err) - - require.Equal(t, int64(1), metrics1.ResourceManagerReadCnt()) - require.Equal(t, int64(1), metrics1.ResourceManagerWriteCnt()) - require.Equal(t, int64(2), metrics1.TiKVStorageProcessedKeysBatchGet()) - require.Equal(t, int64(0), metrics2.ResourceManagerReadCnt()) - require.Equal(t, int64(0), metrics2.ResourceManagerWriteCnt()) -} diff --git a/pkg/sessiontxn/staleread/BUILD.bazel b/pkg/sessiontxn/staleread/BUILD.bazel index aac026f56b4a5..211a6c87ef43e 100644 --- a/pkg/sessiontxn/staleread/BUILD.bazel +++ b/pkg/sessiontxn/staleread/BUILD.bazel @@ -27,7 +27,6 @@ go_library( "//pkg/sessionctx/variable", "//pkg/sessiontxn", "//pkg/sessiontxn/internal", - "//pkg/store/driver/txn", "//pkg/table/temptable", "//pkg/types", "//pkg/util/dbterror/plannererrors", diff --git a/pkg/sessiontxn/staleread/provider.go b/pkg/sessiontxn/staleread/provider.go index 24a58698dffc0..319700220d8c2 100644 --- a/pkg/sessiontxn/staleread/provider.go +++ b/pkg/sessiontxn/staleread/provider.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/sessiontxn" "github.com/pingcap/tidb/pkg/sessiontxn/internal" - driver "github.com/pingcap/tidb/pkg/store/driver/txn" "github.com/pingcap/tidb/pkg/table/temptable" ) @@ -254,13 +253,9 @@ func (p *StalenessTxnContextProvider) GetSnapshotWithStmtReadTS() (kv.Snapshot, } sessVars := p.sctx.GetSessionVars() - ruv2Interceptor := driver.NewStatementRUV2RPCInterceptor(sessVars.RUV2Metrics) var snapshot kv.Snapshot if txn.Valid() { - if ruv2Interceptor != nil { - txn.SetOption(kv.RPCInterceptor, ruv2Interceptor) - } snapshot = txn.GetSnapshot() } else { snapshot = internal.GetSnapshotWithTS( @@ -268,9 +263,6 @@ func (p *StalenessTxnContextProvider) GetSnapshotWithStmtReadTS() (kv.Snapshot, p.ts, temptable.SessionSnapshotInterceptor(p.sctx, p.is), ) - if ruv2Interceptor != nil { - snapshot.SetOption(kv.RPCInterceptor, ruv2Interceptor) - } } replicaReadType := sessVars.GetReplicaRead() diff --git a/pkg/store/copr/BUILD.bazel b/pkg/store/copr/BUILD.bazel index 735ff9821db62..b9ffd0a3a1ace 100644 --- a/pkg/store/copr/BUILD.bazel +++ b/pkg/store/copr/BUILD.bazel @@ -12,7 +12,6 @@ go_library( "mpp_probe.go", "range_diagnostics.go", "region_cache.go", - "ruv2_metrics.go", "store.go", ], importpath = "github.com/pingcap/tidb/pkg/store/copr", diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go index c36de0b8c0f3a..b99f12498c31a 100644 --- a/pkg/store/copr/coprocessor.go +++ b/pkg/store/copr/coprocessor.go @@ -2495,9 +2495,6 @@ func (worker *copIteratorWorker) getLockResolverDetails() *util.ResolveLockDetai } func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse) error { - if resp != nil && resp.pbResp != nil { - updateRUV2MetricsFromExecDetailsV2(bo.GetCtx(), resp.pbResp.ExecDetailsV2) - } if worker.stats == nil { return nil } diff --git a/pkg/store/copr/ruv2_metrics.go b/pkg/store/copr/ruv2_metrics.go deleted file mode 100644 index 508aff20ca4a6..0000000000000 --- a/pkg/store/copr/ruv2_metrics.go +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright 2026 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package copr - -import ( - "context" - - "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/tidb/pkg/util/execdetails" -) - -func updateRUV2MetricsFromExecDetailsV2(ctx context.Context, details *kvrpcpb.ExecDetailsV2) { - if details == nil || details.RuV2 == nil { - return - } - ru := details.RuV2 - ruv2Metrics := execdetails.RUV2MetricsFromContext(ctx) - // Coprocessor responses only own cop-side ExecDetailsV2 fields here. - // StorageProcessedKeysBatchGet/Get are collected by pkg/store/driver/txn for - // Get/BatchGet responses and are not expected on cop responses. - if ru.KvEngineCacheMiss != 0 { - if ruv2Metrics != nil { - ruv2Metrics.AddTiKVKVEngineCacheMiss(int64(ru.KvEngineCacheMiss)) - } - } - if ru.CoprocessorExecutorIterations != 0 { - if ruv2Metrics != nil { - ruv2Metrics.AddTiKVCoprocessorExecutorIterations(int64(ru.CoprocessorExecutorIterations)) - } - } - if ru.CoprocessorResponseBytes != 0 { - if ruv2Metrics != nil { - ruv2Metrics.AddTiKVCoprocessorResponseBytes(int64(ru.CoprocessorResponseBytes)) - } - } - if ru.RaftstoreStoreWriteTriggerWbBytes != 0 { - if ruv2Metrics != nil { - ruv2Metrics.AddTiKVRaftstoreStoreWriteTriggerWB(int64(ru.RaftstoreStoreWriteTriggerWbBytes)) - } - } - if inputs := ru.ExecutorInputs; inputs != nil { - if inputs.TikvCoprocessorExecutorWorkTotalBatchIndexScan != 0 { - if ruv2Metrics != nil { - ruv2Metrics.AddTiKVCoprocessorWorkTotal("BatchIndexScan", int64(inputs.TikvCoprocessorExecutorWorkTotalBatchIndexScan)) - } - } - if inputs.TikvCoprocessorExecutorWorkTotalBatchTableScan != 0 { - if ruv2Metrics != nil { - ruv2Metrics.AddTiKVCoprocessorWorkTotal("BatchTableScan", int64(inputs.TikvCoprocessorExecutorWorkTotalBatchTableScan)) - } - } - if inputs.TikvCoprocessorExecutorWorkTotalBatchSelection != 0 { - if ruv2Metrics != nil { - ruv2Metrics.AddTiKVCoprocessorWorkTotal("BatchSelection", int64(inputs.TikvCoprocessorExecutorWorkTotalBatchSelection)) - } - } - if inputs.TikvCoprocessorExecutorWorkTotalBatchTopN != 0 { - if ruv2Metrics != nil { - ruv2Metrics.AddTiKVCoprocessorWorkTotal("BatchTopN", int64(inputs.TikvCoprocessorExecutorWorkTotalBatchTopN)) - } - } - if inputs.TikvCoprocessorExecutorWorkTotalBatchLimit != 0 { - if ruv2Metrics != nil { - ruv2Metrics.AddTiKVCoprocessorWorkTotal("BatchLimit", int64(inputs.TikvCoprocessorExecutorWorkTotalBatchLimit)) - } - } - if inputs.TikvCoprocessorExecutorWorkTotalBatchSimpleAggr != 0 { - if ruv2Metrics != nil { - ruv2Metrics.AddTiKVCoprocessorWorkTotal("BatchSimpleAggr", int64(inputs.TikvCoprocessorExecutorWorkTotalBatchSimpleAggr)) - } - } - if inputs.TikvCoprocessorExecutorWorkTotalBatchFastHashAggr != 0 { - if ruv2Metrics != nil { - ruv2Metrics.AddTiKVCoprocessorWorkTotal("BatchFastHashAggr", int64(inputs.TikvCoprocessorExecutorWorkTotalBatchFastHashAggr)) - } - } - } -} diff --git a/pkg/store/driver/txn/BUILD.bazel b/pkg/store/driver/txn/BUILD.bazel index 82e390882b702..67c68366026c6 100644 --- a/pkg/store/driver/txn/BUILD.bazel +++ b/pkg/store/driver/txn/BUILD.bazel @@ -5,7 +5,6 @@ go_library( srcs = [ "batch_getter.go", "error.go", - "ruv2_metrics.go", "scanner.go", "snapshot.go", "txn_driver.go", @@ -25,7 +24,6 @@ go_library( "//pkg/tablecodec", "//pkg/types", "//pkg/util", - "//pkg/util/execdetails", "//pkg/util/intest", "//pkg/util/logutil", "//pkg/util/tracing", @@ -53,22 +51,19 @@ go_test( "batch_getter_test.go", "driver_test.go", "main_test.go", - "ruv2_metrics_test.go", "union_iter_test.go", ], embed = [":txn"], flaky = True, - shard_count = 8, + shard_count = 5, deps = [ "//pkg/kv", "//pkg/testkit/testsetup", - "//pkg/util/execdetails", "//pkg/util/mock", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", - "@com_github_tikv_client_go_v2//tikvrpc", "@org_uber_go_goleak//:goleak", ], ) diff --git a/pkg/store/driver/txn/ruv2_metrics.go b/pkg/store/driver/txn/ruv2_metrics.go deleted file mode 100644 index d4d8be25b9114..0000000000000 --- a/pkg/store/driver/txn/ruv2_metrics.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2026 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package txn - -import ( - "github.com/pingcap/tidb/pkg/util/execdetails" - "github.com/tikv/client-go/v2/tikvrpc" - "github.com/tikv/client-go/v2/tikvrpc/interceptor" -) - -const statementRUV2MetricsInterceptorName = "ruv2-statement-metrics" - -// NewStatementRUV2RPCInterceptor creates an interceptor that collects -// statement-level RUv2 request counters and response-side metrics from -// ExecDetailsV2.RuV2. -func NewStatementRUV2RPCInterceptor(ruv2Metrics *execdetails.RUV2Metrics) interceptor.RPCInterceptor { - if ruv2Metrics == nil { - return nil - } - return newStatementRUV2RPCInterceptorWithGetter(func() *execdetails.RUV2Metrics { - return ruv2Metrics - }) -} - -// newStatementRUV2RPCInterceptorWithGetter creates an interceptor that collects -// statement-level RUv2 request counters and response-side metrics from -// ExecDetailsV2.RuV2 using the metrics returned by getter at request time. -func newStatementRUV2RPCInterceptorWithGetter(getter func() *execdetails.RUV2Metrics) interceptor.RPCInterceptor { - if getter == nil { - return nil - } - return interceptor.NewRPCInterceptor(statementRUV2MetricsInterceptorName, func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc { - return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) { - resp, err := next(target, req) - if err == nil && resp != nil { - ruv2Metrics := getter() - updateResourceManagerRUV2Metrics(ruv2Metrics, req) - updateStorageProcessedKeysRUV2Metrics(ruv2Metrics, resp) - } - return resp, err - } - }) -} - -func updateResourceManagerRUV2Metrics(ruv2Metrics *execdetails.RUV2Metrics, req *tikvrpc.Request) { - if ruv2Metrics == nil || req == nil || req.StoreTp != tikvrpc.TiKV || req.IsDebugReq() { - return - } - if req.IsTxnWriteRequest() || req.IsRawWriteRequest() { - ruv2Metrics.AddResourceManagerWriteCnt(1) - return - } - ruv2Metrics.AddResourceManagerReadCnt(1) -} - -func updateStorageProcessedKeysRUV2Metrics(ruv2Metrics *execdetails.RUV2Metrics, resp *tikvrpc.Response) { - if ruv2Metrics == nil || resp == nil { - return - } - details := resp.GetExecDetailsV2() - if details == nil || details.RuV2 == nil { - return - } - if details.RuV2.StorageProcessedKeysBatchGet != 0 { - ruv2Metrics.AddTiKVStorageProcessedKeysBatchGet(int64(details.RuV2.StorageProcessedKeysBatchGet)) - } - if details.RuV2.StorageProcessedKeysGet != 0 { - ruv2Metrics.AddTiKVStorageProcessedKeysGet(int64(details.RuV2.StorageProcessedKeysGet)) - } -} diff --git a/pkg/store/driver/txn/ruv2_metrics_test.go b/pkg/store/driver/txn/ruv2_metrics_test.go deleted file mode 100644 index d36d1c833a46f..0000000000000 --- a/pkg/store/driver/txn/ruv2_metrics_test.go +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright 2026 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package txn - -import ( - "testing" - - "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/tidb/pkg/util/execdetails" - "github.com/stretchr/testify/require" - "github.com/tikv/client-go/v2/tikvrpc" -) - -func TestStatementRUV2RPCInterceptor(t *testing.T) { - ruv2Metrics := execdetails.NewRUV2Metrics() - it := NewStatementRUV2RPCInterceptor(ruv2Metrics) - require.NotNil(t, it) - - readReq := &tikvrpc.Request{Type: tikvrpc.CmdBatchGet, StoreTp: tikvrpc.TiKV} - writeReq := &tikvrpc.Request{Type: tikvrpc.CmdPrewrite, StoreTp: tikvrpc.TiKV} - - wrapFn := it.Wrap(func(_ string, req *tikvrpc.Request) (*tikvrpc.Response, error) { - switch req.Type { - case tikvrpc.CmdBatchGet: - return &tikvrpc.Response{ - Resp: &kvrpcpb.BatchGetResponse{ - ExecDetailsV2: &kvrpcpb.ExecDetailsV2{ - RuV2: &kvrpcpb.RUV2{ - StorageProcessedKeysBatchGet: 9, - StorageProcessedKeysGet: 1, - }, - }, - }, - }, nil - case tikvrpc.CmdPrewrite: - return &tikvrpc.Response{Resp: &kvrpcpb.PrewriteResponse{}}, nil - default: - return &tikvrpc.Response{}, nil - } - }) - _, err := wrapFn("tikv-1", readReq) - require.NoError(t, err) - _, err = wrapFn("tikv-1", writeReq) - require.NoError(t, err) - - require.Equal(t, int64(1), ruv2Metrics.ResourceManagerReadCnt()) - require.Equal(t, int64(1), ruv2Metrics.ResourceManagerWriteCnt()) - require.Equal(t, int64(9), ruv2Metrics.TiKVStorageProcessedKeysBatchGet()) - require.Equal(t, int64(1), ruv2Metrics.TiKVStorageProcessedKeysGet()) - - t.Run("bypass ru skips interceptor accounting", func(t *testing.T) { - bypassed := execdetails.NewRUV2Metrics() - bypassed.SetBypass(true) - it := NewStatementRUV2RPCInterceptor(bypassed) - require.NotNil(t, it) - - wrapFn := it.Wrap(func(_ string, req *tikvrpc.Request) (*tikvrpc.Response, error) { - return &tikvrpc.Response{ - Resp: &kvrpcpb.BatchGetResponse{ - ExecDetailsV2: &kvrpcpb.ExecDetailsV2{ - RuV2: &kvrpcpb.RUV2{StorageProcessedKeysBatchGet: 4}, - }, - }, - }, nil - }) - _, err := wrapFn("tikv-1", &tikvrpc.Request{Type: tikvrpc.CmdBatchGet, StoreTp: tikvrpc.TiKV}) - require.NoError(t, err) - require.Zero(t, bypassed.ResourceManagerReadCnt()) - require.Zero(t, bypassed.TiKVStorageProcessedKeysBatchGet()) - }) -} - -func TestStatementRUV2RPCInterceptorNilMetrics(t *testing.T) { - require.Nil(t, NewStatementRUV2RPCInterceptor(nil)) -} - -func TestStatementRUV2RPCInterceptorWithGetterFollowsCurrentStatement(t *testing.T) { - metrics1 := execdetails.NewRUV2Metrics() - metrics2 := execdetails.NewRUV2Metrics() - current := metrics1 - - it := newStatementRUV2RPCInterceptorWithGetter(func() *execdetails.RUV2Metrics { - return current - }) - require.NotNil(t, it) - - wrapFn := it.Wrap(func(_ string, req *tikvrpc.Request) (*tikvrpc.Response, error) { - switch req.Type { - case tikvrpc.CmdBatchGet: - return &tikvrpc.Response{ - Resp: &kvrpcpb.BatchGetResponse{ - ExecDetailsV2: &kvrpcpb.ExecDetailsV2{ - RuV2: &kvrpcpb.RUV2{ - StorageProcessedKeysBatchGet: 2, - }, - }, - }, - }, nil - case tikvrpc.CmdPrewrite: - return &tikvrpc.Response{Resp: &kvrpcpb.PrewriteResponse{}}, nil - default: - return &tikvrpc.Response{}, nil - } - }) - - _, err := wrapFn("tikv-1", &tikvrpc.Request{Type: tikvrpc.CmdBatchGet, StoreTp: tikvrpc.TiKV}) - require.NoError(t, err) - - current = metrics2 - _, err = wrapFn("tikv-1", &tikvrpc.Request{Type: tikvrpc.CmdPrewrite, StoreTp: tikvrpc.TiKV}) - require.NoError(t, err) - - require.Equal(t, int64(1), metrics1.ResourceManagerReadCnt()) - require.Equal(t, int64(0), metrics1.ResourceManagerWriteCnt()) - require.Equal(t, int64(1), metrics2.ResourceManagerWriteCnt()) - require.Equal(t, int64(0), metrics2.ResourceManagerReadCnt()) -} diff --git a/pkg/util/execdetails/BUILD.bazel b/pkg/util/execdetails/BUILD.bazel index d61540606d219..817101c4a953b 100644 --- a/pkg/util/execdetails/BUILD.bazel +++ b/pkg/util/execdetails/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/kv", "//pkg/metrics", "@com_github_influxdata_tdigest//:tdigest", + "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/resource_manager", "@com_github_pingcap_tipb//go-tipb", "@com_github_tikv_client_go_v2//util", @@ -37,6 +38,7 @@ go_test( "//pkg/config", "//pkg/kv", "//pkg/testkit/testsetup", + "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/resource_manager", "@com_github_pingcap_tipb//go-tipb", "@com_github_stretchr_testify//require", diff --git a/pkg/util/execdetails/execdetails_test.go b/pkg/util/execdetails/execdetails_test.go index cfe92ca597dee..35bcb024fa53f 100644 --- a/pkg/util/execdetails/execdetails_test.go +++ b/pkg/util/execdetails/execdetails_test.go @@ -18,9 +18,11 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" + "github.com/pingcap/kvproto/pkg/kvrpcpb" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/kv" @@ -253,6 +255,36 @@ func TestString(t *testing.T) { require.Equal(t, expected, detail.String()) detail = &ExecDetails{} require.Equal(t, "", detail.String()) + + t.Run("load tikv exec details snapshot", func(t *testing.T) { + tikvExecDetail := &util.ExecDetails{} + atomic.StoreInt64(&tikvExecDetail.BackoffCount, 2) + atomic.StoreInt64(&tikvExecDetail.BackoffDuration, int64(3*time.Second)) + atomic.StoreInt64(&tikvExecDetail.WaitKVRespDuration, int64(4*time.Second)) + atomic.StoreInt64(&tikvExecDetail.WaitPDRespDuration, int64(5*time.Second)) + atomic.StoreInt64(&tikvExecDetail.UnpackedBytesSentKVTotal, 11) + atomic.StoreInt64(&tikvExecDetail.UnpackedBytesReceivedKVTotal, 12) + atomic.StoreInt64(&tikvExecDetail.UnpackedBytesSentKVCrossZone, 13) + atomic.StoreInt64(&tikvExecDetail.UnpackedBytesReceivedKVCrossZone, 14) + atomic.StoreInt64(&tikvExecDetail.UnpackedBytesSentMPPTotal, 15) + atomic.StoreInt64(&tikvExecDetail.UnpackedBytesReceivedMPPTotal, 16) + atomic.StoreInt64(&tikvExecDetail.UnpackedBytesSentMPPCrossZone, 17) + atomic.StoreInt64(&tikvExecDetail.UnpackedBytesReceivedMPPCrossZone, 18) + + snapshot := LoadTiKVExecDetails(tikvExecDetail) + require.Equal(t, int64(2), snapshot.BackoffCount) + require.Equal(t, int64(3*time.Second), snapshot.BackoffDuration) + require.Equal(t, int64(4*time.Second), snapshot.WaitKVRespDuration) + require.Equal(t, int64(5*time.Second), snapshot.WaitPDRespDuration) + require.Equal(t, int64(11), snapshot.UnpackedBytesSentKVTotal) + require.Equal(t, int64(12), snapshot.UnpackedBytesReceivedKVTotal) + require.Equal(t, int64(13), snapshot.UnpackedBytesSentKVCrossZone) + require.Equal(t, int64(14), snapshot.UnpackedBytesReceivedKVCrossZone) + require.Equal(t, int64(15), snapshot.UnpackedBytesSentMPPTotal) + require.Equal(t, int64(16), snapshot.UnpackedBytesReceivedMPPTotal) + require.Equal(t, int64(17), snapshot.UnpackedBytesSentMPPCrossZone) + require.Equal(t, int64(18), snapshot.UnpackedBytesReceivedMPPCrossZone) + }) } func mockExecutorExecutionSummary(TimeProcessedNs, NumProducedRows, NumIterations uint64) *tipb.ExecutorExecutionSummary { @@ -411,6 +443,119 @@ func TestRUV2MetricsSnapshotFreezesRUValues(t *testing.T) { require.NotEqual(t, baseline, metrics.CalculateRUValues(updated)) } +func TestUpdateRUV2MetricsFromRUV2(t *testing.T) { + metrics := NewRUV2Metrics() + UpdateRUV2MetricsFromRUV2(metrics, &kvrpcpb.RUV2{ + ReadRpcCount: 2, + WriteRpcCount: 3, + KvEngineCacheMiss: 5, + CoprocessorExecutorIterations: 7, + CoprocessorResponseBytes: 11, + RaftstoreStoreWriteTriggerWbBytes: 13, + StorageProcessedKeysBatchGet: 17, + StorageProcessedKeysGet: 19, + ExecutorInputs: &kvrpcpb.ExecutorInputs{ + TikvCoprocessorExecutorWorkTotalBatchIndexScan: 23, + TikvCoprocessorExecutorWorkTotalBatchTableScan: 29, + TikvCoprocessorExecutorWorkTotalBatchSelection: 31, + TikvCoprocessorExecutorWorkTotalBatchTopN: 37, + TikvCoprocessorExecutorWorkTotalBatchLimit: 41, + TikvCoprocessorExecutorWorkTotalBatchSimpleAggr: 43, + TikvCoprocessorExecutorWorkTotalBatchFastHashAggr: 47, + }, + }) + require.Equal(t, int64(2), metrics.ResourceManagerReadCnt()) + require.Equal(t, int64(3), metrics.ResourceManagerWriteCnt()) + require.Equal(t, int64(5), metrics.TiKVKVEngineCacheMiss()) + require.Equal(t, int64(7), metrics.TiKVCoprocessorExecutorIterations()) + require.Equal(t, int64(11), metrics.TiKVCoprocessorResponseBytes()) + require.Equal(t, int64(13), metrics.TiKVRaftstoreStoreWriteTriggerWB()) + require.Equal(t, int64(17), metrics.TiKVStorageProcessedKeysBatchGet()) + require.Equal(t, int64(19), metrics.TiKVStorageProcessedKeysGet()) + + detail := FormatRUV2Metrics(metrics, defaultRUV2WeightsForTest(), 0, 0) + require.Contains(t, detail, "resource_manager_read_cnt:2") + require.Contains(t, detail, "resource_manager_write_cnt:3") + require.Contains(t, detail, "tikv_storage_processed_keys_batch_get:17") + require.Contains(t, detail, "tikv_storage_processed_keys_get:19") + require.Contains(t, detail, "BatchFastHashAggr:47") +} + +func TestSyncRUV2MetricsFromRUDetailsIncremental(t *testing.T) { + metrics := NewRUV2Metrics() + ruDetails := util.NewRUDetails() + ruDetails.AddRUV2(&kvrpcpb.RUV2{ + ReadRpcCount: 2, + WriteRpcCount: 3, + KvEngineCacheMiss: 5, + RaftstoreStoreWriteTriggerWbBytes: 17, + StorageProcessedKeysBatchGet: 7, + StorageProcessedKeysGet: 19, + ExecutorInputs: &kvrpcpb.ExecutorInputs{ + TikvCoprocessorExecutorWorkTotalBatchIndexScan: 11, + TikvCoprocessorExecutorWorkTotalBatchFastHashAggr: 23, + }, + }) + + // First drain picks up all counters. + SyncRUV2MetricsFromRUDetails(metrics, ruDetails) + require.Equal(t, int64(2), metrics.ResourceManagerReadCnt()) + require.Equal(t, int64(3), metrics.ResourceManagerWriteCnt()) + require.Equal(t, int64(5), metrics.TiKVKVEngineCacheMiss()) + require.Equal(t, int64(17), metrics.TiKVRaftstoreStoreWriteTriggerWB()) + require.Equal(t, int64(7), metrics.TiKVStorageProcessedKeysBatchGet()) + require.Equal(t, int64(19), metrics.TiKVStorageProcessedKeysGet()) + + // Second drain without new data is a no-op. + SyncRUV2MetricsFromRUDetails(metrics, ruDetails) + require.Equal(t, int64(2), metrics.ResourceManagerReadCnt()) + require.Equal(t, int64(3), metrics.ResourceManagerWriteCnt()) + + // New counters accumulate after the first drain. + ruDetails.AddRUV2(&kvrpcpb.RUV2{ + ReadRpcCount: 10, + StorageProcessedKeysBatchGet: 100, + }) + SyncRUV2MetricsFromRUDetails(metrics, ruDetails) + require.Equal(t, int64(12), metrics.ResourceManagerReadCnt()) + require.Equal(t, int64(107), metrics.TiKVStorageProcessedKeysBatchGet()) + + detail := FormatRUV2Metrics(metrics, defaultRUV2WeightsForTest(), 0, 0) + require.Contains(t, detail, "resource_manager_read_cnt:12") + require.Contains(t, detail, "resource_manager_write_cnt:3") + require.Contains(t, detail, "tikv_storage_processed_keys_batch_get:107") + require.Contains(t, detail, "tikv_storage_processed_keys_get:19") + require.Contains(t, detail, "BatchIndexScan:11") + require.Contains(t, detail, "BatchFastHashAggr:23") +} + +func TestSyncRUV2MetricsFromRUDetailsBypass(t *testing.T) { + metrics := NewRUV2Metrics() + metrics.SetBypass(true) + ruDetails := util.NewRUDetails() + ruDetails.AddRUV2(&kvrpcpb.RUV2{ + StorageProcessedKeysBatchGet: 7, + }) + + SyncRUV2MetricsFromRUDetails(metrics, ruDetails) + require.Zero(t, metrics.ResourceManagerReadCnt()) + require.Zero(t, metrics.ResourceManagerWriteCnt()) + require.Zero(t, metrics.TiKVStorageProcessedKeysBatchGet()) +} + +func TestUpdateRUV2MetricsFromRUV2Bypass(t *testing.T) { + metrics := NewRUV2Metrics() + metrics.SetBypass(true) + UpdateRUV2MetricsFromRUV2(metrics, &kvrpcpb.RUV2{ + ReadRpcCount: 1, + WriteRpcCount: 1, + StorageProcessedKeysBatchGet: 1, + }) + require.Zero(t, metrics.ResourceManagerReadCnt()) + require.Zero(t, metrics.ResourceManagerWriteCnt()) + require.Zero(t, metrics.TiKVStorageProcessedKeysBatchGet()) +} + func TestFormatRUV2MetricsIncludesRUValuesFirst(t *testing.T) { weights := defaultRUV2WeightsForTest() metrics := NewRUV2Metrics() diff --git a/pkg/util/execdetails/ruv2_metrics.go b/pkg/util/execdetails/ruv2_metrics.go index 30bb3d057c411..978021402ea49 100644 --- a/pkg/util/execdetails/ruv2_metrics.go +++ b/pkg/util/execdetails/ruv2_metrics.go @@ -23,7 +23,9 @@ import ( "sync" "sync/atomic" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/pkg/metrics" + tikvutil "github.com/tikv/client-go/v2/util" ) type ruv2MetricsKeyType struct{} @@ -58,6 +60,70 @@ func RUV2MetricsFromContext(ctx context.Context) *RUV2Metrics { return metrics } +// UpdateRUV2MetricsFromRUV2 adds raw RUv2 counters into the statement-level metrics snapshot. +func UpdateRUV2MetricsFromRUV2(metrics *RUV2Metrics, ru *kvrpcpb.RUV2) { + if metrics == nil || ru == nil { + return + } + if ru.ReadRpcCount != 0 { + metrics.AddResourceManagerReadCnt(int64(ru.ReadRpcCount)) + } + if ru.WriteRpcCount != 0 { + metrics.AddResourceManagerWriteCnt(int64(ru.WriteRpcCount)) + } + if ru.KvEngineCacheMiss != 0 { + metrics.AddTiKVKVEngineCacheMiss(int64(ru.KvEngineCacheMiss)) + } + if ru.CoprocessorExecutorIterations != 0 { + metrics.AddTiKVCoprocessorExecutorIterations(int64(ru.CoprocessorExecutorIterations)) + } + if ru.CoprocessorResponseBytes != 0 { + metrics.AddTiKVCoprocessorResponseBytes(int64(ru.CoprocessorResponseBytes)) + } + if ru.RaftstoreStoreWriteTriggerWbBytes != 0 { + metrics.AddTiKVRaftstoreStoreWriteTriggerWB(int64(ru.RaftstoreStoreWriteTriggerWbBytes)) + } + if ru.StorageProcessedKeysBatchGet != 0 { + metrics.AddTiKVStorageProcessedKeysBatchGet(int64(ru.StorageProcessedKeysBatchGet)) + } + if ru.StorageProcessedKeysGet != 0 { + metrics.AddTiKVStorageProcessedKeysGet(int64(ru.StorageProcessedKeysGet)) + } + if inputs := ru.ExecutorInputs; inputs != nil { + if inputs.TikvCoprocessorExecutorWorkTotalBatchIndexScan != 0 { + metrics.AddTiKVCoprocessorWorkTotal("BatchIndexScan", int64(inputs.TikvCoprocessorExecutorWorkTotalBatchIndexScan)) + } + if inputs.TikvCoprocessorExecutorWorkTotalBatchTableScan != 0 { + metrics.AddTiKVCoprocessorWorkTotal("BatchTableScan", int64(inputs.TikvCoprocessorExecutorWorkTotalBatchTableScan)) + } + if inputs.TikvCoprocessorExecutorWorkTotalBatchSelection != 0 { + metrics.AddTiKVCoprocessorWorkTotal("BatchSelection", int64(inputs.TikvCoprocessorExecutorWorkTotalBatchSelection)) + } + if inputs.TikvCoprocessorExecutorWorkTotalBatchTopN != 0 { + metrics.AddTiKVCoprocessorWorkTotal("BatchTopN", int64(inputs.TikvCoprocessorExecutorWorkTotalBatchTopN)) + } + if inputs.TikvCoprocessorExecutorWorkTotalBatchLimit != 0 { + metrics.AddTiKVCoprocessorWorkTotal("BatchLimit", int64(inputs.TikvCoprocessorExecutorWorkTotalBatchLimit)) + } + if inputs.TikvCoprocessorExecutorWorkTotalBatchSimpleAggr != 0 { + metrics.AddTiKVCoprocessorWorkTotal("BatchSimpleAggr", int64(inputs.TikvCoprocessorExecutorWorkTotalBatchSimpleAggr)) + } + if inputs.TikvCoprocessorExecutorWorkTotalBatchFastHashAggr != 0 { + metrics.AddTiKVCoprocessorWorkTotal("BatchFastHashAggr", int64(inputs.TikvCoprocessorExecutorWorkTotalBatchFastHashAggr)) + } + } +} + +// SyncRUV2MetricsFromRUDetails drains the raw RUv2 counters accumulated in +// RUDetails since the last drain and adds them into the statement-level metrics. +// It is safe to call multiple times; each call transfers only the delta. +func SyncRUV2MetricsFromRUDetails(metrics *RUV2Metrics, ruDetails *tikvutil.RUDetails) { + if metrics == nil || ruDetails == nil || metrics.Bypass() { + return + } + UpdateRUV2MetricsFromRUV2(metrics, ruDetails.DrainRUV2()) +} + // RUV2Metrics stores statement-level RUv2 metrics. type RUV2Metrics struct { bypass atomic.Bool diff --git a/pkg/util/execdetails/tiflash_stats.go b/pkg/util/execdetails/tiflash_stats.go index d36a6e4d18d26..4b6bf2fce291a 100644 --- a/pkg/util/execdetails/tiflash_stats.go +++ b/pkg/util/execdetails/tiflash_stats.go @@ -21,6 +21,7 @@ import ( "math" "strconv" "strings" + "sync/atomic" "time" "github.com/pingcap/kvproto/pkg/resource_manager" @@ -631,13 +632,13 @@ func (networkTraffic *TiFlashNetworkTrafficSummary) UpdateTiKVExecDetails(tikvDe if tikvDetails == nil { return } - tikvDetails.UnpackedBytesSentMPPCrossZone += int64(networkTraffic.interZoneSendBytes) - tikvDetails.UnpackedBytesSentMPPTotal += int64(networkTraffic.interZoneSendBytes) - tikvDetails.UnpackedBytesSentMPPTotal += int64(networkTraffic.innerZoneSendBytes) + atomic.AddInt64(&tikvDetails.UnpackedBytesSentMPPCrossZone, int64(networkTraffic.interZoneSendBytes)) + atomic.AddInt64(&tikvDetails.UnpackedBytesSentMPPTotal, int64(networkTraffic.interZoneSendBytes)) + atomic.AddInt64(&tikvDetails.UnpackedBytesSentMPPTotal, int64(networkTraffic.innerZoneSendBytes)) - tikvDetails.UnpackedBytesReceivedMPPCrossZone += int64(networkTraffic.interZoneReceiveBytes) - tikvDetails.UnpackedBytesReceivedMPPTotal += int64(networkTraffic.interZoneReceiveBytes) - tikvDetails.UnpackedBytesReceivedMPPTotal += int64(networkTraffic.innerZoneReceiveBytes) + atomic.AddInt64(&tikvDetails.UnpackedBytesReceivedMPPCrossZone, int64(networkTraffic.interZoneReceiveBytes)) + atomic.AddInt64(&tikvDetails.UnpackedBytesReceivedMPPTotal, int64(networkTraffic.interZoneReceiveBytes)) + atomic.AddInt64(&tikvDetails.UnpackedBytesReceivedMPPTotal, int64(networkTraffic.innerZoneReceiveBytes)) } // Clone implements the deep copy of * TiFlashNetworkTrafficSummary diff --git a/pkg/util/execdetails/util.go b/pkg/util/execdetails/util.go index de150ccdc57d5..8961c2a748d5b 100644 --- a/pkg/util/execdetails/util.go +++ b/pkg/util/execdetails/util.go @@ -19,6 +19,7 @@ import ( "context" "math" "slices" + "sync/atomic" "time" "github.com/influxdata/tdigest" @@ -34,6 +35,55 @@ func ContextWithInitializedExecDetails(ctx context.Context) context.Context { return ctx } +// ContextWithMissingExecDetailsInitialized initializes any missing statement execution, execution, +// and resource usage details in the context while preserving existing objects. +func ContextWithMissingExecDetailsInitialized(ctx context.Context) context.Context { + if ctx.Value(StmtExecDetailKey) == nil { + ctx = context.WithValue(ctx, StmtExecDetailKey, &StmtExecDetails{}) + } + if ctx.Value(util.ExecDetailsKey) == nil { + ctx = context.WithValue(ctx, util.ExecDetailsKey, &util.ExecDetails{}) + } + if ctx.Value(util.RUDetailsCtxKey) == nil { + ctx = context.WithValue(ctx, util.RUDetailsCtxKey, util.NewRUDetails()) + } + if ctx.Value(RUV2MetricsCtxKey) == nil { + ctx = context.WithValue(ctx, RUV2MetricsCtxKey, NewRUV2Metrics()) + } + return ctx +} + +// ContextWithInheritedRUV2Details reuses statement-level RUDetails and RUv2 metrics +// from source when ctx does not already carry them. +func ContextWithInheritedRUV2Details(ctx, source context.Context) context.Context { + if source == nil { + return ctx + } + if ctx.Value(util.RUDetailsCtxKey) == nil { + if ruDetails, _ := source.Value(util.RUDetailsCtxKey).(*util.RUDetails); ruDetails != nil { + ctx = context.WithValue(ctx, util.RUDetailsCtxKey, ruDetails) + } + } + if ctx.Value(RUV2MetricsCtxKey) == nil { + if metrics := RUV2MetricsFromContext(source); metrics != nil { + ctx = context.WithValue(ctx, RUV2MetricsCtxKey, metrics) + } + } + return ctx +} + +// SyncRUV2MetricsFromContext drains any raw RUv2 counters in ctx's RUDetails into +// the statement-level RUv2 metrics and returns the metrics object. +func SyncRUV2MetricsFromContext(ctx context.Context) *RUV2Metrics { + metrics := RUV2MetricsFromContext(ctx) + if metrics == nil { + return nil + } + ruDetails, _ := ctx.Value(util.RUDetailsCtxKey).(*util.RUDetails) + SyncRUV2MetricsFromRUDetails(metrics, ruDetails) + return metrics +} + // GetExecDetailsFromContext gets stmt execution, execution and resource usage details from context. func GetExecDetailsFromContext(ctx context.Context) (stmtDetail StmtExecDetails, tikvExecDetail util.ExecDetails, ruDetails *util.RUDetails) { stmtDetailRaw := ctx.Value(StmtExecDetailKey) @@ -42,7 +92,7 @@ func GetExecDetailsFromContext(ctx context.Context) (stmtDetail StmtExecDetails, } tikvExecDetailRaw := ctx.Value(util.ExecDetailsKey) if tikvExecDetailRaw != nil { - tikvExecDetail = *(tikvExecDetailRaw.(*util.ExecDetails)) + tikvExecDetail = LoadTiKVExecDetails(tikvExecDetailRaw.(*util.ExecDetails)) } if ruDetailsVal := ctx.Value(util.RUDetailsCtxKey); ruDetailsVal != nil { ruDetails = ruDetailsVal.(*util.RUDetails) @@ -53,6 +103,29 @@ func GetExecDetailsFromContext(ctx context.Context) (stmtDetail StmtExecDetails, return } +// LoadTiKVExecDetails snapshots the fields in util.ExecDetails that are updated via atomic operations. +func LoadTiKVExecDetails(detail *util.ExecDetails) util.ExecDetails { + if detail == nil { + return util.ExecDetails{} + } + return util.ExecDetails{ + BackoffCount: atomic.LoadInt64(&detail.BackoffCount), + BackoffDuration: atomic.LoadInt64(&detail.BackoffDuration), + WaitKVRespDuration: atomic.LoadInt64(&detail.WaitKVRespDuration), + WaitPDRespDuration: atomic.LoadInt64(&detail.WaitPDRespDuration), + TrafficDetails: util.TrafficDetails{ + UnpackedBytesSentKVTotal: atomic.LoadInt64(&detail.UnpackedBytesSentKVTotal), + UnpackedBytesReceivedKVTotal: atomic.LoadInt64(&detail.UnpackedBytesReceivedKVTotal), + UnpackedBytesSentKVCrossZone: atomic.LoadInt64(&detail.UnpackedBytesSentKVCrossZone), + UnpackedBytesReceivedKVCrossZone: atomic.LoadInt64(&detail.UnpackedBytesReceivedKVCrossZone), + UnpackedBytesSentMPPTotal: atomic.LoadInt64(&detail.UnpackedBytesSentMPPTotal), + UnpackedBytesReceivedMPPTotal: atomic.LoadInt64(&detail.UnpackedBytesReceivedMPPTotal), + UnpackedBytesSentMPPCrossZone: atomic.LoadInt64(&detail.UnpackedBytesSentMPPCrossZone), + UnpackedBytesReceivedMPPCrossZone: atomic.LoadInt64(&detail.UnpackedBytesReceivedMPPCrossZone), + }, + } +} + type canGetFloat64 interface { GetFloat64() float64 } diff --git a/pkg/util/stmtsummary/statement_summary.go b/pkg/util/stmtsummary/statement_summary.go index b3a823a6324a6..06017d458e2e7 100644 --- a/pkg/util/stmtsummary/statement_summary.go +++ b/pkg/util/stmtsummary/statement_summary.go @@ -1118,13 +1118,14 @@ func (s *StmtNetworkTrafficSummary) Merge(other *StmtNetworkTrafficSummary) { // Add add a new sample value to the ru summary record. func (s *StmtNetworkTrafficSummary) Add(info *util.ExecDetails) { if info != nil { - s.UnpackedBytesSentTiKVTotal += info.UnpackedBytesSentKVTotal - s.UnpackedBytesReceivedTiKVTotal += info.UnpackedBytesReceivedKVTotal - s.UnpackedBytesSentTiKVCrossZone += info.UnpackedBytesSentKVCrossZone - s.UnpackedBytesReceivedTiKVCrossZone += info.UnpackedBytesReceivedKVCrossZone - s.UnpackedBytesSentTiFlashTotal += info.UnpackedBytesSentMPPTotal - s.UnpackedBytesReceivedTiFlashTotal += info.UnpackedBytesReceivedMPPTotal - s.UnpackedBytesSentTiFlashCrossZone += info.UnpackedBytesSentMPPCrossZone - s.UnpackedBytesReceivedTiFlashCrossZone += info.UnpackedBytesReceivedMPPCrossZone + snapshot := execdetails.LoadTiKVExecDetails(info) + s.UnpackedBytesSentTiKVTotal += snapshot.UnpackedBytesSentKVTotal + s.UnpackedBytesReceivedTiKVTotal += snapshot.UnpackedBytesReceivedKVTotal + s.UnpackedBytesSentTiKVCrossZone += snapshot.UnpackedBytesSentKVCrossZone + s.UnpackedBytesReceivedTiKVCrossZone += snapshot.UnpackedBytesReceivedKVCrossZone + s.UnpackedBytesSentTiFlashTotal += snapshot.UnpackedBytesSentMPPTotal + s.UnpackedBytesReceivedTiFlashTotal += snapshot.UnpackedBytesReceivedMPPTotal + s.UnpackedBytesSentTiFlashCrossZone += snapshot.UnpackedBytesSentMPPCrossZone + s.UnpackedBytesReceivedTiFlashCrossZone += snapshot.UnpackedBytesReceivedMPPCrossZone } } diff --git a/pkg/util/stmtsummary/v2/record.go b/pkg/util/stmtsummary/v2/record.go index 4f67707138fb8..e7fa2d38900ac 100644 --- a/pkg/util/stmtsummary/v2/record.go +++ b/pkg/util/stmtsummary/v2/record.go @@ -432,15 +432,16 @@ func (r *StmtRecord) Add(info *stmtsummary.StmtExecInfo) { } else { r.MinResultRows = 0 } - r.SumKVTotal += time.Duration(info.TiKVExecDetails.WaitKVRespDuration) - r.SumPDTotal += time.Duration(info.TiKVExecDetails.WaitPDRespDuration) - r.SumBackoffTotal += time.Duration(info.TiKVExecDetails.BackoffDuration) + tikvExecDetails := execdetails.LoadTiKVExecDetails(info.TiKVExecDetails) + r.SumKVTotal += time.Duration(tikvExecDetails.WaitKVRespDuration) + r.SumPDTotal += time.Duration(tikvExecDetails.WaitPDRespDuration) + r.SumBackoffTotal += time.Duration(tikvExecDetails.BackoffDuration) r.SumWriteSQLRespTotal += info.StmtExecDetails.WriteSQLRespDuration r.SumTidbCPU += info.CPUUsages.TidbCPUTime r.SumTikvCPU += info.CPUUsages.TikvCPUTime // Networks - r.StmtNetworkTrafficSummary.Add(info.TiKVExecDetails) + r.StmtNetworkTrafficSummary.Add(&tikvExecDetails) // RU r.StmtRUSummary.Add(info.RUDetail, info.TotalRUV2)