diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index ffcf578c8ce33..3391714e7513b 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -309,12 +309,19 @@ func TestBlocked(t *testing.T) { ctx := context.Background() req := require.New(t) c.splitAndScatter("0012", "0034", "0048") + blockReq := make(chan struct{}) + defer close(blockReq) + firstBlocked := make(chan struct{}, 1) + firstBlockedOnce := sync.Once{} marked := false for _, s := range c.storeList() { s.SetGetRegionCheckpointHook(func(glftrr *logbackup.GetLastFlushTSOfRegionRequest) error { + firstBlockedOnce.Do(func() { + firstBlocked <- struct{}{} + }) // blocking the thread. // this may happen when TiKV goes down or too busy. - <-(chan struct{})(nil) + <-blockReq return nil }) marked = true @@ -324,14 +331,21 @@ func TestBlocked(t *testing.T) { adv := streamhelper.NewCheckpointAdvancer(env) adv.StartTaskListener(ctx) adv.UpdateConfigWith(func(c *config.CommandConfig) { - // ... So the tick timeout would be 100ms - c.TickDuration = 10 * time.Millisecond + // keep enough headroom so the blocked rpc request is observed before timeout. + c.TickDuration = 100 * time.Millisecond + }) + errCh := make(chan error, 1) + go func() { + errCh <- adv.OnTick(ctx) + }() + shouldFinishInTime(t, 5*time.Second, "wait until blocked request observed", func() { + <-firstBlocked }) var err error - shouldFinishInTime(t, time.Second, "ticking", func() { - err = adv.OnTick(ctx) + shouldFinishInTime(t, 5*time.Second, "ticking", func() { + err = <-errCh }) - req.ErrorIs(errors.Cause(err), context.DeadlineExceeded) + req.ErrorIs(err, context.DeadlineExceeded) } func TestResolveLock(t *testing.T) {