diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index ffcf578c8ce33..6851132699ee8 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -425,8 +425,37 @@ func TestOwnerDropped(t *testing.T) { c := createFakeCluster(t, 4, false) c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") installSubscribeSupport(c) - env := newTestEnv(c, t) - fp := "github.com/pingcap/tidb/br/pkg/streamhelper/get_subscriber" + getSubscriberReached := make(chan struct{}) + beforeManualPollReached := make(chan struct{}) + releaseSubscriber := make(chan struct{}) + releaseManualPoll := make(chan struct{}) + stopDone := make(chan struct{}) + var getSubscriberReachedOnce sync.Once + var beforeManualPollReachedOnce sync.Once + timingHooksEnabled := atomic.NewBool(false) + // Keep the synchronization local to this fake env rather than a package-global + // failpoint: the flaky window is bounded by the subscription refresh (`Stores`) + // and the fallback manual poll (`RegionScan`). By wiring the hooks through this + // env, the repro stays test-local, immutable after construction, and the manual + // poll hook can wait for `OnStop` to finish before the poll phase is allowed to + // start, which makes the owner-loss handoff deterministic. + env := newTestEnv(c, t, withTestEnvTimingHooks( + func() { + if !timingHooksEnabled.Load() { + return + } + getSubscriberReachedOnce.Do(func() { close(getSubscriberReached) }) + <-releaseSubscriber + }, + func() { + if !timingHooksEnabled.Load() { + return + } + <-stopDone + beforeManualPollReachedOnce.Do(func() { close(beforeManualPollReached) }) + <-releaseManualPoll + }, + )) defer func() { if t.Failed() { fmt.Println(c) @@ -437,18 +466,26 @@ func TestOwnerDropped(t *testing.T) { adv.OnStart(ctx) adv.SpawnSubscriptionHandler(ctx) require.NoError(t, adv.OnTick(ctx)) - failpoint.Enable(fp, "pause") - ch := make(chan struct{}) + timingHooksEnabled.Store(true) + + tickDone := make(chan error, 1) go func() { - defer close(ch) - require.NoError(t, adv.OnTick(ctx)) + tickDone <- adv.OnTick(ctx) }() - adv.OnStop() - failpoint.Disable(fp) - + // First hold the subscription refresh in-flight, then stop the owner. The + // fallback manual poll hook waits on `stopDone`, so the poll phase cannot start + // before the owner drop has completed. + <-getSubscriberReached + go func() { + adv.OnStop() + close(stopDone) + }() + close(releaseSubscriber) + <-beforeManualPollReached cp := c.advanceCheckpoints() c.flushAll() - <-ch + close(releaseManualPoll) + require.NoError(t, <-tickDone) adv.WithCheckpoints(func(vsf *spans.ValueSortedFull) { // Advancer will manually poll the checkpoint... require.Equal(t, vsf.MinValue(), cp) diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 2f25c8531eaa1..3a733e688139b 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -97,16 +97,34 @@ type testEnv struct { task streamhelper.TaskEvent resolveLocks func([]*txnlock.Lock, *tikv.KeyLocation) (*tikv.KeyLocation, error) + beforeStores func() + beforeScan func() mu sync.Mutex pd.Client } -func newTestEnv(c *fakeCluster, t *testing.T) *testEnv { +type testEnvOption func(*testEnv) + +// withTestEnvTimingHooks installs test-local immutable hooks at the fake-cluster +// boundaries used by subscription refresh and fallback polling. This keeps the +// synchronization scoped to a single test environment instead of a package-global +// failpoint shared by the whole package. +func withTestEnvTimingHooks(beforeStores, beforeScan func()) testEnvOption { + return func(env *testEnv) { + env.beforeStores = beforeStores + env.beforeScan = beforeScan + } +} + +func newTestEnv(c *fakeCluster, t *testing.T, opts ...testEnvOption) *testEnv { env := &testEnv{ Cluster: c.Cluster, testCtx: t, } + for _, opt := range opts { + opt(env) + } rngs := env.ranges if len(rngs) == 0 { rngs = []kv.KeyRange{{}} @@ -232,6 +250,25 @@ func (t *testEnv) putTask() { } } +func (t *testEnv) Stores(ctx context.Context) ([]streamhelper.Store, error) { + if t.beforeStores != nil { + t.beforeStores() + } + return t.Cluster.Stores(ctx) +} + +func (t *testEnv) RegionScan( + ctx context.Context, + key []byte, + endKey []byte, + limit int, +) ([]streamhelper.RegionWithLeader, error) { + if t.beforeScan != nil { + t.beforeScan() + } + return t.Cluster.RegionScan(ctx, key, endKey, limit) +} + func (t *testEnv) ScanLocksInOneRegion( bo *tikv.Backoffer, key []byte,