From 6fcd412cc025f90269498e636edffda344ac18e6 Mon Sep 17 00:00:00 2001 From: Eugene Blikh Date: Tue, 12 May 2026 20:16:14 +0300 Subject: [PATCH] pool: bound New connect by context, add WaitConnected pool.New / pool.NewWithOpts no longer use Opts.CheckTimeout to bound the initial connect: they dial all instances concurrently and wait for those dials to finish, bounded only by the supplied context. Previously the wait was additionally capped at CheckTimeout after the first successful dial. Opts.CheckTimeout now only drives the reconnect and role-relocate timer. Add Pool.WaitConnected(ctx, mode) (also on the Pooler interface) to block until a connection in the given mode is ready, since New may return a pool with no live connections when instances are unreachable. --- CHANGELOG.md | 14 +++++ MIGRATION.md | 72 ++++++++++++++++++++++ pool/pool.go | 149 ++++++++++++++++++++++++++++++++-------------- pool/pool_test.go | 44 ++++++++++++++ pool/pooler.go | 3 + 5 files changed, 238 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 69f745266..3e0c033d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,12 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. * New `MockRequestNamed` type for verifying specific requests in tests. * New `test_helpers.ExecuteOnAll` function to execute operations on all instances in parallel with context support. +* New `pool.Pool.WaitConnected(ctx, mode)` method (also added to the + `pool.Pooler` interface) that blocks until the pool holds a connection + satisfying the mode, or `ctx` is done, or the pool is closed. `pool.New` + still returns a pool even when no instance is reachable, so callers that + require a usable connection before proceeding should use `WaitConnected` + instead of the racy `ConnectedNow` snapshot. ### Changed @@ -49,6 +55,14 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. * `box.New` returns an error instead of panic (#448). * Now cases of `<-ctx.Done()` returns wrapped error provided by `ctx.Cause()`. Allows you compare it using `errors.Is/As` (#457). +* `pool.NewWithOpts`/`pool.New` no longer use `Opts.CheckTimeout` to bound the + initial connect: the wait for the instance dials is now bounded only by the + supplied `ctx`, so a slow or unreachable instance no longer blocks past the + `ctx` deadline (previously the wait was capped at `Opts.CheckTimeout` after + the first successful dial). `Opts.CheckTimeout` is unchanged for the + reconnect/role-relocate timer. Pass a `context.Context` with a deadline if + you cannot tolerate an unbounded wait, and use `pool.Pool.WaitConnected` if + you need a connection in a specific mode to be ready before proceeding. * Removed deprecated `pool` methods, related interfaces and tests are updated (#478). * Removed deprecated `box.session.push()` support: Future.AppendPush() diff --git a/MIGRATION.md b/MIGRATION.md index dd33cd4c8..71d80d054 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -75,6 +75,78 @@ TODO * `pool.Pool.DoInstance()` renamed to `pool.Pool.DoOn()`. * `pool.Connect()` renamed to `pool.New()`, `pool.ConnectWithOpts()` renamed to `pool.NewWithOpts()`. +* `pool.New()`/`pool.NewWithOpts()` no longer use `pool.Opts.CheckTimeout` to + bound the initial connect. They still dial all instances concurrently and + wait for those dials to finish, but the wait is now bounded only by the + supplied `ctx` (previously it was additionally capped at `CheckTimeout` + after the first successful dial). Consequence: if `ctx` has no deadline and + an instance is unreachable in a way that hangs (e.g. a black-hole address), + the constructor blocks until that dial times out on its own instead of + returning after `CheckTimeout` — pass a `context.Context` with a deadline. + Instances that fail to connect are left to the background controllers, as + before. `pool.Opts.CheckTimeout` is unchanged for the reconnect/role-relocate + timer. If you need a connection in a specific mode to be ready before + proceeding, use the new `pool.Pool.WaitConnected(ctx, mode)` — the readiness + counterpart to the racy `ConnectedNow` snapshot, also part of the + `pool.Pooler` interface. + + Before: + ```Go + connPool, err := pool.ConnectWithOpts(ctx, instances, poolOpts) + // a working connection was likely (but not guaranteed) available here + ``` + + After: + ```Go + connPool, err := pool.NewWithOpts(ctx, instances, poolOpts) + if err != nil { + // ... + } + // ctx should carry a deadline; WaitConnected returns ctx.Err() on timeout. + if err := connPool.WaitConnected(ctx, pool.ModeRW); err != nil { + // ... + } + ``` + + If you relied on the old behaviour where the constructor effectively + blocked until a connection was (likely) available, reproduce it explicitly + by giving the call a deadline via `context.WithTimeout` and waiting for the + pool to become ready: + ```Go + ctx, cancel := context.WithTimeout(context.Background(), connectTimeout) + defer cancel() + + connPool, err := pool.NewWithOpts(ctx, instances, poolOpts) + if err != nil { + // ctx expired before any instance connected, or an invalid argument + return err + } + if err := connPool.WaitConnected(ctx, pool.ModeRW); err != nil { + // no usable connection within connectTimeout (returns ctx.Err()), + // or the pool was closed (pool.ErrClosed) + _ = connPool.Close() + return err + } + // here a master connection is guaranteed to be available + ``` + Note `pool.NewWithOpts` no longer takes any timeout of its own — pass a + bounded `context.Context` instead. `pool.Opts.CheckTimeout` only controls + the background reconnect/role-relocate timer now. +* New `pool.Pool.WaitConnected(ctx context.Context, mode pool.Mode) error` + method (also added to the `pool.Pooler` interface). It blocks until the pool + holds a connection satisfying `mode`, returning `nil`; it returns `ctx.Err()` + if `ctx` is done first and `pool.ErrClosed` if the pool is (or becomes) + closed. Because `pool.New`/`pool.NewWithOpts` may return a pool with no live + connections (they do not fail when instances are unreachable), prefer + `WaitConnected` over the racy `ConnectedNow` snapshot when you need a usable + connection before proceeding. Typical usage: + ```Go + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := connPool.WaitConnected(ctx, pool.ModeRW); err != nil { + // no master available within the timeout (or pool closed) + } + ``` * `pool` enum constants renamed to use prefix: `ANY` → `ModeAny`, `RW` → `ModeRW`, `RO` → `ModeRO`, `PreferRW` → `ModePreferRW`, `PreferRO` → `ModePreferRO`, `UnknownRole` → `RoleUnknown`, `MasterRole` → `RoleMaster`, diff --git a/pool/pool.go b/pool/pool.go index 8e924829c..66b5de11e 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -147,12 +147,16 @@ type Pool struct { // WithGroup("tarantool") namespace. rawLogger *slog.Logger - state state - done chan struct{} - roPool *roundRobinStrategy - rwPool *roundRobinStrategy - anyPool *roundRobinStrategy - poolsMutex sync.RWMutex + state state + done chan struct{} + roPool *roundRobinStrategy + rwPool *roundRobinStrategy + anyPool *roundRobinStrategy + poolsMutex sync.RWMutex + // connEvent is closed and replaced with a fresh channel whenever the set + // of active connections changes. It lets WaitConnected block until the + // pool becomes usable. Guarded by poolsMutex. + connEvent chan struct{} watcherContainer watcherContainer } @@ -204,6 +208,14 @@ func newEndpoint(name string, dialer tarantool.Dialer, opts tarantool.Opts) *end // NewWithOpts creates pool for instances with specified instances and // opts. Instances must have unique names. +// +// It dials the instances concurrently and waits for the dials to finish; +// ctx bounds that wait, so a slow or unreachable instance does not delay it +// past the ctx deadline. If ctx is done before any instance connects, an +// error is returned. Otherwise the pool is returned even if some (or all) +// instances did not connect — the pool keeps reconnecting them in the +// background; use Pool.WaitConnected if you need a usable connection before +// proceeding. func NewWithOpts(ctx context.Context, instances []Instance, opts Opts) (*Pool, error) { unique := make(map[string]bool) @@ -232,13 +244,14 @@ func NewWithOpts(ctx context.Context, instances []Instance, anyPool := newRoundRobinStrategy(size) p := &Pool{ - ends: make(map[string]*endpoint), - opts: opts, - state: connectedState, - done: make(chan struct{}), - rwPool: rwPool, - roPool: roPool, - anyPool: anyPool, + ends: make(map[string]*endpoint), + opts: opts, + state: connectedState, + done: make(chan struct{}), + rwPool: rwPool, + roPool: roPool, + anyPool: anyPool, + connEvent: make(chan struct{}), } if opts.Logger == nil { @@ -248,33 +261,25 @@ func NewWithOpts(ctx context.Context, instances []Instance, p.rawLogger = opts.Logger } + // Dial all instances concurrently and wait for every dial to finish. + // The only deadline is ctx: when it expires the in-flight dials are + // cancelled, so a slow or unreachable instance does not block past it. + // Instances that fail to connect are left to the background controllers, + // which keep retrying. If nothing connects and ctx is done, fail; + // otherwise return the pool (which may have no live connections — + // callers that require one should follow up with Pool.WaitConnected). fillCtx, fillCancel := context.WithCancel(ctx) defer fillCancel() - var timeout <-chan time.Time - - timeout = make(chan time.Time) filled := p.fillPools(fillCtx, instances) - done := 0 - success := len(instances) == 0 - - for done < len(instances) { - select { - case <-timeout: - fillCancel() - // To be sure that the branch is called only once. - timeout = make(chan time.Time) - case err := <-filled: - done++ - - if err == nil && !success { - timeout = time.After(opts.CheckTimeout) - success = true - } + connected := len(instances) == 0 + for range instances { + if err := <-filled; err == nil { + connected = true } } - if !success && ctx.Err() != nil { + if !connected && ctx.Err() != nil { p.state.set(closedState) return nil, ctx.Err() } @@ -297,14 +302,9 @@ func New(ctx context.Context, instances []Instance) (*Pool, error) { return NewWithOpts(ctx, instances, opts) } -// ConnectedNow gets connected status of pool. -func (p *Pool) ConnectedNow(mode Mode) (bool, error) { - p.poolsMutex.RLock() - defer p.poolsMutex.RUnlock() - - if p.state.get() != connectedState { - return false, nil - } +// hasConnection reports whether the pool currently holds a connection +// satisfying mode. The caller must hold poolsMutex (read or write). +func (p *Pool) hasConnection(mode Mode) (bool, error) { switch mode { case ModeAny: return !p.anyPool.IsEmpty(), nil @@ -312,15 +312,58 @@ func (p *Pool) ConnectedNow(mode Mode) (bool, error) { return !p.rwPool.IsEmpty(), nil case ModeRO: return !p.roPool.IsEmpty(), nil - case ModePreferRW: - fallthrough - case ModePreferRO: + case ModePreferRW, ModePreferRO: return !p.rwPool.IsEmpty() || !p.roPool.IsEmpty(), nil default: return false, ErrNoHealthyInstance } } +// ConnectedNow gets connected status of pool. +func (p *Pool) ConnectedNow(mode Mode) (bool, error) { + p.poolsMutex.RLock() + defer p.poolsMutex.RUnlock() + + if p.state.get() != connectedState { + return false, nil + } + return p.hasConnection(mode) +} + +// WaitConnected blocks until the pool holds at least one connection +// satisfying mode, returning nil. It returns ctx.Err() if ctx is done first +// and ErrClosed if the pool is (or becomes) closed. +// +// NewWithOpts does not fail when instances are unreachable: it returns a pool +// that may have no live connections yet and keeps reconnecting in the +// background. Callers that need a usable connection before proceeding should +// use WaitConnected (typically with a deadline on ctx) instead of relying on +// the racy ConnectedNow snapshot. +func (p *Pool) WaitConnected(ctx context.Context, mode Mode) error { + for { + p.poolsMutex.RLock() + closed := p.state.get() != connectedState + ok, err := p.hasConnection(mode) + event := p.connEvent + p.poolsMutex.RUnlock() + + switch { + case closed: + return ErrClosed + case err != nil: + return err + case ok: + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-event: + } + } +} + // ConfiguredTimeout gets timeout of current connection. func (p *Pool) ConfiguredTimeout(mode Mode) (time.Duration, error) { conn, err := p.getNextConnection(mode) @@ -457,6 +500,10 @@ func (p *Pool) Close() error { close(s.close) } p.endsMutex.RUnlock() + + p.poolsMutex.Lock() + p.notifyConnEvent() + p.poolsMutex.Unlock() } return p.waitClose() @@ -472,6 +519,10 @@ func (p *Pool) CloseGraceful() error { close(s.shutdown) } p.endsMutex.RUnlock() + + p.poolsMutex.Lock() + p.notifyConnEvent() + p.poolsMutex.Unlock() } return p.waitClose() @@ -692,11 +743,20 @@ func (p *Pool) getConnectionFromPool(name string) (*tarantool.Connection, Role) return p.anyPool.GetConnection(name), RoleUnknown } +// notifyConnEvent wakes WaitConnected callers by closing the current +// connEvent channel and installing a fresh one. The caller must hold +// poolsMutex for writing. +func (p *Pool) notifyConnEvent() { + close(p.connEvent) + p.connEvent = make(chan struct{}) +} + func (p *Pool) deleteConnection(name string) { if conn := p.anyPool.DeleteConnection(name); conn != nil { if conn := p.rwPool.DeleteConnection(name); conn == nil { p.roPool.DeleteConnection(name) } + p.notifyConnEvent() // The internal connection deinitialization. p.watcherContainer.mutex.RLock() defer p.watcherContainer.mutex.RUnlock() @@ -754,6 +814,7 @@ func (p *Pool) addConnection(name string, case RoleReplica: p.roPool.AddConnection(name, conn) } + p.notifyConnEvent() return nil } diff --git a/pool/pool_test.go b/pool/pool_test.go index 758b46a2b..38f32ba9b 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -196,6 +196,50 @@ func TestNewWithOpts_error_no_timeout(t *testing.T) { require.ErrorIs(t, err, pool.ErrWrongCheckTimeout) } +func TestPool_WaitConnected(t *testing.T) { + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + connPool, err := pool.New(ctx, makeInstances(servers, connOpts)) + require.NoError(t, err) + require.NotNil(t, connPool) + defer func() { _ = connPool.Close() }() + + wctx, wcancel := context.WithTimeout(context.Background(), 10*time.Second) + defer wcancel() + + require.NoError(t, connPool.WaitConnected(wctx, pool.ModeAny)) + require.NoError(t, connPool.WaitConnected(wctx, pool.ModeRW)) + require.NoError(t, connPool.WaitConnected(wctx, pool.ModePreferRO)) +} + +func TestPool_WaitConnected_context_canceled(t *testing.T) { + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + // "err" does not resolve, so the pool stays without connections. + connPool, err := pool.New(ctx, makeInstances([]string{"err"}, connOpts)) + require.NoError(t, err) + require.NotNil(t, connPool) + defer func() { _ = connPool.Close() }() + + wctx, wcancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer wcancel() + + err = connPool.WaitConnected(wctx, pool.ModeAny) + require.ErrorIs(t, err, context.DeadlineExceeded) +} + +func TestPool_WaitConnected_closed(t *testing.T) { + ctx, cancel := test_helpers.GetPoolConnectContext() + defer cancel() + connPool, err := pool.New(ctx, makeInstances([]string{"err"}, connOpts)) + require.NoError(t, err) + require.NotNil(t, connPool) + require.NoError(t, connPool.Close()) + + err = connPool.WaitConnected(context.Background(), pool.ModeAny) + require.ErrorIs(t, err, pool.ErrClosed) +} + func TestConnectWithOpts_error_reconnect(t *testing.T) { ctx, cancel := test_helpers.GetPoolConnectContext() defer cancel() diff --git a/pool/pooler.go b/pool/pooler.go index a0f81f3d3..bd12dd760 100644 --- a/pool/pooler.go +++ b/pool/pooler.go @@ -19,6 +19,9 @@ type Pooler interface { TopologyEditor ConnectedNow(mode Mode) (bool, error) + // WaitConnected blocks until the pool holds a connection satisfying mode, + // or ctx is done, or the pool is closed. + WaitConnected(ctx context.Context, mode Mode) error Close() error // CloseGraceful closes connections in the Pool gracefully. It waits // for all requests to complete.