Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 47 additions & 10 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,37 @@ func TestOwnerDropped(t *testing.T) {
c := createFakeCluster(t, 4, false)
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
installSubscribeSupport(c)
env := newTestEnv(c, t)
fp := "github.com/pingcap/tidb/br/pkg/streamhelper/get_subscriber"
getSubscriberReached := make(chan struct{})
beforeManualPollReached := make(chan struct{})
releaseSubscriber := make(chan struct{})
releaseManualPoll := make(chan struct{})
stopDone := make(chan struct{})
var getSubscriberReachedOnce sync.Once
var beforeManualPollReachedOnce sync.Once
timingHooksEnabled := atomic.NewBool(false)
// Keep the synchronization local to this fake env rather than a package-global
// failpoint: the flaky window is bounded by the subscription refresh (`Stores`)
// and the fallback manual poll (`RegionScan`). By wiring the hooks through this
// env, the repro stays test-local, immutable after construction, and the manual
// poll hook can wait for `OnStop` to finish before the poll phase is allowed to
// start, which makes the owner-loss handoff deterministic.
env := newTestEnv(c, t, withTestEnvTimingHooks(
func() {
if !timingHooksEnabled.Load() {
return
}
getSubscriberReachedOnce.Do(func() { close(getSubscriberReached) })
<-releaseSubscriber
},
func() {
if !timingHooksEnabled.Load() {
return
}
<-stopDone
beforeManualPollReachedOnce.Do(func() { close(beforeManualPollReached) })
<-releaseManualPoll
},
))
defer func() {
if t.Failed() {
fmt.Println(c)
Expand All @@ -437,18 +466,26 @@ func TestOwnerDropped(t *testing.T) {
adv.OnStart(ctx)
adv.SpawnSubscriptionHandler(ctx)
require.NoError(t, adv.OnTick(ctx))
failpoint.Enable(fp, "pause")
ch := make(chan struct{})
timingHooksEnabled.Store(true)

tickDone := make(chan error, 1)
go func() {
defer close(ch)
require.NoError(t, adv.OnTick(ctx))
tickDone <- adv.OnTick(ctx)
}()
adv.OnStop()
failpoint.Disable(fp)

// First hold the subscription refresh in-flight, then stop the owner. The
// fallback manual poll hook waits on `stopDone`, so the poll phase cannot start
// before the owner drop has completed.
<-getSubscriberReached
go func() {
adv.OnStop()
close(stopDone)
}()
close(releaseSubscriber)
<-beforeManualPollReached
cp := c.advanceCheckpoints()
c.flushAll()
<-ch
close(releaseManualPoll)
require.NoError(t, <-tickDone)
adv.WithCheckpoints(func(vsf *spans.ValueSortedFull) {
// Advancer will manually poll the checkpoint...
require.Equal(t, vsf.MinValue(), cp)
Expand Down
39 changes: 38 additions & 1 deletion br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,34 @@ type testEnv struct {
task streamhelper.TaskEvent

resolveLocks func([]*txnlock.Lock, *tikv.KeyLocation) (*tikv.KeyLocation, error)
beforeStores func()
beforeScan func()

mu sync.Mutex
pd.Client
}

func newTestEnv(c *fakeCluster, t *testing.T) *testEnv {
type testEnvOption func(*testEnv)

// withTestEnvTimingHooks installs test-local immutable hooks at the fake-cluster
// boundaries used by subscription refresh and fallback polling. This keeps the
// synchronization scoped to a single test environment instead of a package-global
// failpoint shared by the whole package.
func withTestEnvTimingHooks(beforeStores, beforeScan func()) testEnvOption {
return func(env *testEnv) {
env.beforeStores = beforeStores
env.beforeScan = beforeScan
}
}

func newTestEnv(c *fakeCluster, t *testing.T, opts ...testEnvOption) *testEnv {
env := &testEnv{
Cluster: c.Cluster,
testCtx: t,
}
for _, opt := range opts {
opt(env)
}
rngs := env.ranges
if len(rngs) == 0 {
rngs = []kv.KeyRange{{}}
Expand Down Expand Up @@ -232,6 +250,25 @@ func (t *testEnv) putTask() {
}
}

func (t *testEnv) Stores(ctx context.Context) ([]streamhelper.Store, error) {
if t.beforeStores != nil {
t.beforeStores()
}
return t.Cluster.Stores(ctx)
}

func (t *testEnv) RegionScan(
ctx context.Context,
key []byte,
endKey []byte,
limit int,
) ([]streamhelper.RegionWithLeader, error) {
if t.beforeScan != nil {
t.beforeScan()
}
return t.Cluster.RegionScan(ctx, key, endKey, limit)
}

func (t *testEnv) ScanLocksInOneRegion(
bo *tikv.Backoffer,
key []byte,
Expand Down
Loading