Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ lint:tools/bin/revive
go run tools/dashboard-linter/main.go pkg/metrics/nextgengrafana/tidb_with_keyspace_name.json
go run tools/dashboard-linter/main.go pkg/metrics/nextgengrafana/tidb_worker.json

.PHONY: nogo
nogo: lint ## Backward-compatible alias for legacy automation invoking `make nogo`

.PHONY: license
license:
bazel $(BAZEL_GLOBAL_CONFIG) run $(BAZEL_CMD_CONFIG) \
Expand Down
23 changes: 23 additions & 0 deletions pkg/util/topsql/reporter/datamodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,9 +623,13 @@ type planMeta struct {
// normalizedSQLMap is a wrapped map used to register normalizedSQL.
type normalizedSQLMap struct {
data atomic.Pointer[sync.Map]
mu sync.RWMutex
length atomic2.Int64
}

// normalizedMetaRegisterAfterLoadHook is a test hook for coordinating stale-pointer registrations.
var normalizedMetaRegisterAfterLoadHook func()

func newNormalizedSQLMap() *normalizedSQLMap {
r := &normalizedSQLMap{}
r.data.Store(&sync.Map{})
Expand All @@ -639,7 +643,13 @@ func (m *normalizedSQLMap) register(sqlDigest []byte, normalizedSQL string, isIn
reporter_metrics.IgnoreExceedSQLCounter.Inc()
return
}
m.mu.RLock()
defer m.mu.RUnlock()

data := m.data.Load()
if normalizedMetaRegisterAfterLoadHook != nil {
normalizedMetaRegisterAfterLoadHook()
}
_, loaded := data.LoadOrStore(string(sqlDigest), sqlMeta{
normalizedSQL: normalizedSQL,
isInternal: isInternal,
Expand All @@ -651,6 +661,9 @@ func (m *normalizedSQLMap) register(sqlDigest []byte, normalizedSQL string, isIn

// take away all data inside normalizedSQLMap, put them in the returned new normalizedSQLMap.
func (m *normalizedSQLMap) take() *normalizedSQLMap {
m.mu.Lock()
defer m.mu.Unlock()

data := m.data.Load()
length := m.length.Load()
r := &normalizedSQLMap{}
Expand Down Expand Up @@ -688,6 +701,7 @@ type planBinaryCompressFunc func([]byte) string
// normalizedSQLMap is a wrapped map used to register normalizedPlan.
type normalizedPlanMap struct {
data atomic.Pointer[sync.Map]
mu sync.RWMutex
length atomic2.Int64
}

Expand All @@ -704,7 +718,13 @@ func (m *normalizedPlanMap) register(planDigest []byte, normalizedPlan string, i
reporter_metrics.IgnoreExceedPlanCounter.Inc()
return
}
m.mu.RLock()
defer m.mu.RUnlock()

data := m.data.Load()
if normalizedMetaRegisterAfterLoadHook != nil {
normalizedMetaRegisterAfterLoadHook()
}
_, loaded := data.LoadOrStore(string(planDigest), planMeta{
binaryNormalizedPlan: normalizedPlan,
isLarge: isLarge,
Expand All @@ -716,6 +736,9 @@ func (m *normalizedPlanMap) register(planDigest []byte, normalizedPlan string, i

// take away all data inside normalizedPlanMap, put them in the returned new normalizedPlanMap.
func (m *normalizedPlanMap) take() *normalizedPlanMap {
m.mu.Lock()
defer m.mu.Unlock()

data := m.data.Load()
length := m.length.Load()
r := &normalizedPlanMap{}
Expand Down
13 changes: 9 additions & 4 deletions pkg/util/topsql/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const (

var nowFunc = time.Now

// reportWorkerBeforeBuildReportDataHook is a test hook for coordinating reportWorker timing.
var reportWorkerBeforeBuildReportDataHook func()

// TopSQLReporter collects Top SQL metrics.
type TopSQLReporter interface {
collector.Collector
Expand Down Expand Up @@ -371,10 +374,12 @@ func (tsr *RemoteTopSQLReporter) reportWorker() {
for {
select {
case data := <-tsr.reportCollectedDataChan:
// When `reportCollectedDataChan` receives something, there could be ongoing
// `RegisterSQL` and `RegisterPlan` running, who writes to the data structure
// that `data` contains. So we wait for a little while to ensure that writes
// are finished.
if reportWorkerBeforeBuildReportDataHook != nil {
reportWorkerBeforeBuildReportDataHook()
}
// RegisterSQL/RegisterPlan may still be in flight against the taken maps
// after the payload is queued, so allow those stale-pointer writes to
// settle before converting the payload to protobuf.
time.Sleep(time.Millisecond * 100)
Comment thread
yinsustart marked this conversation as resolved.
rs := data.collected.getReportRecords()
// Convert to protobuf data and do report.
Expand Down
135 changes: 134 additions & 1 deletion pkg/util/topsql/reporter/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,116 @@ func TestDoReportSendsMetaWhenRURecordsEmpty(t *testing.T) {
}
}

func TestReportWorkerWaitsForInFlightSQLMetaRegistration(t *testing.T) {
origReportHook := reportWorkerBeforeBuildReportDataHook
origRegisterHook := normalizedMetaRegisterAfterLoadHook

registerLoadedOldMap := make(chan struct{})
releaseRegister := make(chan struct{})
reportWorkerReceivedData := make(chan struct{})
takeDone := make(chan struct{})

reportWorkerBeforeBuildReportDataHook = func() {
select {
case <-reportWorkerReceivedData:
default:
close(reportWorkerReceivedData)
}
}

tsr := NewRemoteTopSQLReporter(mockPlanBinaryDecoderFunc, mockPlanBinaryCompressFunc)
tsr.BindKeyspaceName([]byte("ks-race"))
t.Cleanup(func() {
select {
case <-releaseRegister:
default:
close(releaseRegister)
}
normalizedMetaRegisterAfterLoadHook = origRegisterHook
reportWorkerBeforeBuildReportDataHook = origReportHook
})
Comment on lines +198 to +208
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Wait for reportWorker() to exit before restoring the package hooks.

t.Cleanup runs LIFO, so this currently restores reportWorkerBeforeBuildReportDataHook / normalizedMetaRegisterAfterLoadHook before tsr.Close(). Also, Close() does not join reportWorker(). On an early-failing path, that leaves a real background worker racing with hook restoration and can leak hook state into later tests.

Track the worker with a done channel, close the reporter first, wait for the goroutine to exit, and only then restore the package globals.

Also applies to: 210-212

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/util/topsql/reporter/reporter_test.go` around lines 197 - 208, The test
must ensure reportWorker() exits before restoring package hooks; change the
cleanup order so tsr.Close() runs first, then wait for the worker goroutine to
finish via a done channel, and only after that restore
reportWorkerBeforeBuildReportDataHook and normalizedMetaRegisterAfterLoadHook
and close releaseRegister; specifically, create a done chan (e.g. done :=
make(chan struct{}), have the worker or test signal close(done) when
reportWorker returns, call tsr.Close() in cleanup, then block on <-done before
resetting the package globals and closing releaseRegister to avoid races and
leaks.

t.Cleanup(tsr.Close)

ch := make(chan *ReportData, 1)
require.NoError(t, tsr.Register(newMockDataSink(ch)))
go tsr.reportWorker()

seedDigest := []byte("sql-seed")
racingDigest := []byte("sql-racing")
tsr.RegisterSQL(seedDigest, "select 1", false)
normalizedMetaRegisterAfterLoadHook = func() {
select {
case <-registerLoadedOldMap:
default:
close(registerLoadedOldMap)
}
<-releaseRegister
}

registerDone := make(chan struct{})
go func() {
tsr.RegisterSQL(racingDigest, "select 2", false)
close(registerDone)
}()

select {
case <-registerLoadedOldMap:
case <-time.After(time.Second):
t.Fatal("timed out waiting for RegisterSQL to snapshot the old SQL meta map")
}

go func() {
tsr.takeDataAndSendToReportChan(60)
close(takeDone)
}()

require.Never(t, func() bool {
select {
case <-takeDone:
return true
default:
return false
}
}, 300*time.Millisecond, 5*time.Millisecond)

require.Never(t, func() bool {
select {
case <-ch:
return true
default:
return false
}
}, 300*time.Millisecond, 5*time.Millisecond)

close(releaseRegister)
select {
case <-registerDone:
case <-time.After(time.Second):
t.Fatal("timed out waiting for the in-flight SQL meta registration to finish")
}
select {
case <-takeDone:
case <-time.After(time.Second):
t.Fatal("timed out waiting for takeDataAndSendToReportChan to return")
}
select {
case <-reportWorkerReceivedData:
case <-time.After(time.Second):
t.Fatal("timed out waiting for reportWorker to dequeue the payload")
}

select {
case payload := <-ch:
require.Len(t, payload.SQLMetas, 2)
_, ok := findSQLMeta(payload.SQLMetas, seedDigest)
require.True(t, ok, "missing seed SQL meta")
_, ok = findSQLMeta(payload.SQLMetas, racingDigest)
require.True(t, ok, "missing in-flight SQL meta")
case <-time.After(time.Second):
t.Fatal("timeout waiting for the report payload")
}
}

func TestCollectAndSendBatch(t *testing.T) {
tsr, ds := setupRemoteTopSQLReporter(t, maxSQLNum, 1)
populateCache(tsr, 0, maxSQLNum, 1)
Expand Down Expand Up @@ -1026,9 +1136,32 @@ func TestTopRUPipelineInProcessIntegration(t *testing.T) {
}, rmclient.DefaultRUVersion)

require.Eventually(t, func() bool {
// Wait until collectWorker has actually merged both hot-key batches.
// Channel drain is not enough because receive and addBatch are separate steps.
tsr.ruAggregator.mu.Lock()
defer tsr.ruAggregator.mu.Unlock()
return len(tsr.ruAggregator.buckets) > 0

for _, bucket := range tsr.ruAggregator.buckets {
if bucket == nil || bucket.collecting == nil {
continue
}
userCollecting, ok := bucket.collecting.users["user-hot"]
if !ok {
continue
}
hotRecord, ok := userCollecting.records[makeKey(stmtstats.BinaryDigest("sql-hot"), stmtstats.BinaryDigest("plan-hot"))]
if !ok {
continue
}

hotTotalRU, hotExecCount := 0.0, uint64(0)
for _, item := range hotRecord.items {
hotTotalRU += item.totalRU
hotExecCount += item.execCount
}
return hotTotalRU >= 17.0 && hotExecCount >= 3
}
return false
}, time.Second, 10*time.Millisecond)

tsr.takeDataAndSendToReportChan(60)
Expand Down
Loading