Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
* New `MockRequestNamed` type for verifying specific requests in tests.
* New `test_helpers.ExecuteOnAll` function to execute operations on all
instances in parallel with context support.
* New `pool.Pool.WaitConnected(ctx, mode)` method (also added to the
`pool.Pooler` interface) that blocks until the pool holds a connection
satisfying the mode, or `ctx` is done, or the pool is closed. `pool.New`
still returns a pool even when no instance is reachable, so callers that
require a usable connection before proceeding should use `WaitConnected`
instead of the racy `ConnectedNow` snapshot.

### Changed

Expand All @@ -49,6 +55,14 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
* `box.New` returns an error instead of panic (#448).
* Now cases of `<-ctx.Done()` returns wrapped error provided by `ctx.Cause()`.
Allows you compare it using `errors.Is/As` (#457).
* `pool.NewWithOpts`/`pool.New` no longer use `Opts.CheckTimeout` to bound the
initial connect: the wait for the instance dials is now bounded only by the
supplied `ctx`, so a slow or unreachable instance no longer blocks past the
`ctx` deadline (previously the wait was capped at `Opts.CheckTimeout` after
the first successful dial). `Opts.CheckTimeout` is unchanged for the
reconnect/role-relocate timer. Pass a `context.Context` with a deadline if
you cannot tolerate an unbounded wait, and use `pool.Pool.WaitConnected` if
you need a connection in a specific mode to be ready before proceeding.
* Removed deprecated `pool` methods, related interfaces and tests are
updated (#478).
* Removed deprecated `box.session.push()` support: Future.AppendPush()
Expand Down
72 changes: 72 additions & 0 deletions MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,78 @@ TODO
* `pool.Pool.DoInstance()` renamed to `pool.Pool.DoOn()`.
* `pool.Connect()` renamed to `pool.New()`, `pool.ConnectWithOpts()` renamed to
`pool.NewWithOpts()`.
* `pool.New()`/`pool.NewWithOpts()` no longer use `pool.Opts.CheckTimeout` to
bound the initial connect. They still dial all instances concurrently and
wait for those dials to finish, but the wait is now bounded only by the
supplied `ctx` (previously it was additionally capped at `CheckTimeout`
after the first successful dial). Consequence: if `ctx` has no deadline and
an instance is unreachable in a way that hangs (e.g. a black-hole address),
the constructor blocks until that dial times out on its own instead of
returning after `CheckTimeout` — pass a `context.Context` with a deadline.
Instances that fail to connect are left to the background controllers, as
before. `pool.Opts.CheckTimeout` is unchanged for the reconnect/role-relocate
timer. If you need a connection in a specific mode to be ready before
proceeding, use the new `pool.Pool.WaitConnected(ctx, mode)` — the readiness
counterpart to the racy `ConnectedNow` snapshot, also part of the
`pool.Pooler` interface.

Before:
```Go
connPool, err := pool.ConnectWithOpts(ctx, instances, poolOpts)
// a working connection was likely (but not guaranteed) available here
```

After:
```Go
connPool, err := pool.NewWithOpts(ctx, instances, poolOpts)
if err != nil {
// ...
}
// ctx should carry a deadline; WaitConnected returns ctx.Err() on timeout.
if err := connPool.WaitConnected(ctx, pool.ModeRW); err != nil {
// ...
}
```

If you relied on the old behaviour where the constructor effectively
blocked until a connection was (likely) available, reproduce it explicitly
by giving the call a deadline via `context.WithTimeout` and waiting for the
pool to become ready:
```Go
ctx, cancel := context.WithTimeout(context.Background(), connectTimeout)
defer cancel()

connPool, err := pool.NewWithOpts(ctx, instances, poolOpts)
if err != nil {
// ctx expired before any instance connected, or an invalid argument
return err
}
if err := connPool.WaitConnected(ctx, pool.ModeRW); err != nil {
// no usable connection within connectTimeout (returns ctx.Err()),
// or the pool was closed (pool.ErrClosed)
_ = connPool.Close()
return err
}
// here a master connection is guaranteed to be available
```
Note `pool.NewWithOpts` no longer takes any timeout of its own — pass a
bounded `context.Context` instead. `pool.Opts.CheckTimeout` only controls
the background reconnect/role-relocate timer now.
* New `pool.Pool.WaitConnected(ctx context.Context, mode pool.Mode) error`
method (also added to the `pool.Pooler` interface). It blocks until the pool
holds a connection satisfying `mode`, returning `nil`; it returns `ctx.Err()`
if `ctx` is done first and `pool.ErrClosed` if the pool is (or becomes)
closed. Because `pool.New`/`pool.NewWithOpts` may return a pool with no live
connections (they do not fail when instances are unreachable), prefer
`WaitConnected` over the racy `ConnectedNow` snapshot when you need a usable
connection before proceeding. Typical usage:
```Go
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := connPool.WaitConnected(ctx, pool.ModeRW); err != nil {
// no master available within the timeout (or pool closed)
}
```
* `pool` enum constants renamed to use prefix: `ANY` → `ModeAny`, `RW` →
`ModeRW`, `RO` → `ModeRO`, `PreferRW` → `ModePreferRW`, `PreferRO` →
`ModePreferRO`, `UnknownRole` → `RoleUnknown`, `MasterRole` → `RoleMaster`,
Expand Down
149 changes: 105 additions & 44 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,16 @@ type Pool struct {
// WithGroup("tarantool") namespace.
rawLogger *slog.Logger

state state
done chan struct{}
roPool *roundRobinStrategy
rwPool *roundRobinStrategy
anyPool *roundRobinStrategy
poolsMutex sync.RWMutex
state state
done chan struct{}
roPool *roundRobinStrategy
rwPool *roundRobinStrategy
anyPool *roundRobinStrategy
poolsMutex sync.RWMutex
// connEvent is closed and replaced with a fresh channel whenever the set
// of active connections changes. It lets WaitConnected block until the
// pool becomes usable. Guarded by poolsMutex.
connEvent chan struct{}
watcherContainer watcherContainer
}

Expand Down Expand Up @@ -204,6 +208,14 @@ func newEndpoint(name string, dialer tarantool.Dialer, opts tarantool.Opts) *end

// NewWithOpts creates pool for instances with specified instances and
// opts. Instances must have unique names.
//
// It dials the instances concurrently and waits for the dials to finish;
// ctx bounds that wait, so a slow or unreachable instance does not delay it
// past the ctx deadline. If ctx is done before any instance connects, an
// error is returned. Otherwise the pool is returned even if some (or all)
// instances did not connect — the pool keeps reconnecting them in the
// background; use Pool.WaitConnected if you need a usable connection before
// proceeding.
func NewWithOpts(ctx context.Context, instances []Instance,
opts Opts) (*Pool, error) {
unique := make(map[string]bool)
Expand Down Expand Up @@ -232,13 +244,14 @@ func NewWithOpts(ctx context.Context, instances []Instance,
anyPool := newRoundRobinStrategy(size)

p := &Pool{
ends: make(map[string]*endpoint),
opts: opts,
state: connectedState,
done: make(chan struct{}),
rwPool: rwPool,
roPool: roPool,
anyPool: anyPool,
ends: make(map[string]*endpoint),
opts: opts,
state: connectedState,
done: make(chan struct{}),
rwPool: rwPool,
roPool: roPool,
anyPool: anyPool,
connEvent: make(chan struct{}),
}

if opts.Logger == nil {
Expand All @@ -248,33 +261,25 @@ func NewWithOpts(ctx context.Context, instances []Instance,
p.rawLogger = opts.Logger
}

// Dial all instances concurrently and wait for every dial to finish.
// The only deadline is ctx: when it expires the in-flight dials are
// cancelled, so a slow or unreachable instance does not block past it.
// Instances that fail to connect are left to the background controllers,
// which keep retrying. If nothing connects and ctx is done, fail;
// otherwise return the pool (which may have no live connections —
// callers that require one should follow up with Pool.WaitConnected).
fillCtx, fillCancel := context.WithCancel(ctx)
defer fillCancel()

var timeout <-chan time.Time

timeout = make(chan time.Time)
filled := p.fillPools(fillCtx, instances)
done := 0
success := len(instances) == 0

for done < len(instances) {
select {
case <-timeout:
fillCancel()
// To be sure that the branch is called only once.
timeout = make(chan time.Time)
case err := <-filled:
done++

if err == nil && !success {
timeout = time.After(opts.CheckTimeout)
success = true
}
connected := len(instances) == 0
for range instances {
if err := <-filled; err == nil {
connected = true
}
}

if !success && ctx.Err() != nil {
if !connected && ctx.Err() != nil {
p.state.set(closedState)
return nil, ctx.Err()
}
Expand All @@ -297,30 +302,68 @@ func New(ctx context.Context, instances []Instance) (*Pool, error) {
return NewWithOpts(ctx, instances, opts)
}

// ConnectedNow gets connected status of pool.
func (p *Pool) ConnectedNow(mode Mode) (bool, error) {
p.poolsMutex.RLock()
defer p.poolsMutex.RUnlock()

if p.state.get() != connectedState {
return false, nil
}
// hasConnection reports whether the pool currently holds a connection
// satisfying mode. The caller must hold poolsMutex (read or write).
func (p *Pool) hasConnection(mode Mode) (bool, error) {
switch mode {
case ModeAny:
return !p.anyPool.IsEmpty(), nil
case ModeRW:
return !p.rwPool.IsEmpty(), nil
case ModeRO:
return !p.roPool.IsEmpty(), nil
case ModePreferRW:
fallthrough
case ModePreferRO:
case ModePreferRW, ModePreferRO:
return !p.rwPool.IsEmpty() || !p.roPool.IsEmpty(), nil
default:
return false, ErrNoHealthyInstance
}
}

// ConnectedNow gets connected status of pool.
func (p *Pool) ConnectedNow(mode Mode) (bool, error) {
p.poolsMutex.RLock()
defer p.poolsMutex.RUnlock()

if p.state.get() != connectedState {
return false, nil
}
return p.hasConnection(mode)
}

// WaitConnected blocks until the pool holds at least one connection
// satisfying mode, returning nil. It returns ctx.Err() if ctx is done first
// and ErrClosed if the pool is (or becomes) closed.
//
// NewWithOpts does not fail when instances are unreachable: it returns a pool
// that may have no live connections yet and keeps reconnecting in the
// background. Callers that need a usable connection before proceeding should
// use WaitConnected (typically with a deadline on ctx) instead of relying on
// the racy ConnectedNow snapshot.
func (p *Pool) WaitConnected(ctx context.Context, mode Mode) error {
for {
p.poolsMutex.RLock()
closed := p.state.get() != connectedState
ok, err := p.hasConnection(mode)
event := p.connEvent
p.poolsMutex.RUnlock()

switch {
case closed:
return ErrClosed
case err != nil:
return err
case ok:
return nil
}

select {
case <-ctx.Done():
return ctx.Err()
case <-event:
}
}
}

// ConfiguredTimeout gets timeout of current connection.
func (p *Pool) ConfiguredTimeout(mode Mode) (time.Duration, error) {
conn, err := p.getNextConnection(mode)
Expand Down Expand Up @@ -457,6 +500,10 @@ func (p *Pool) Close() error {
close(s.close)
}
p.endsMutex.RUnlock()

p.poolsMutex.Lock()
p.notifyConnEvent()
p.poolsMutex.Unlock()
}

return p.waitClose()
Expand All @@ -472,6 +519,10 @@ func (p *Pool) CloseGraceful() error {
close(s.shutdown)
}
p.endsMutex.RUnlock()

p.poolsMutex.Lock()
p.notifyConnEvent()
p.poolsMutex.Unlock()
}

return p.waitClose()
Expand Down Expand Up @@ -692,11 +743,20 @@ func (p *Pool) getConnectionFromPool(name string) (*tarantool.Connection, Role)
return p.anyPool.GetConnection(name), RoleUnknown
}

// notifyConnEvent wakes WaitConnected callers by closing the current
// connEvent channel and installing a fresh one. The caller must hold
// poolsMutex for writing.
func (p *Pool) notifyConnEvent() {
close(p.connEvent)
p.connEvent = make(chan struct{})
}

func (p *Pool) deleteConnection(name string) {
if conn := p.anyPool.DeleteConnection(name); conn != nil {
if conn := p.rwPool.DeleteConnection(name); conn == nil {
p.roPool.DeleteConnection(name)
}
p.notifyConnEvent()
// The internal connection deinitialization.
p.watcherContainer.mutex.RLock()
defer p.watcherContainer.mutex.RUnlock()
Expand Down Expand Up @@ -754,6 +814,7 @@ func (p *Pool) addConnection(name string,
case RoleReplica:
p.roPool.AddConnection(name, conn)
}
p.notifyConnEvent()
return nil
}

Expand Down
44 changes: 44 additions & 0 deletions pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,50 @@ func TestNewWithOpts_error_no_timeout(t *testing.T) {
require.ErrorIs(t, err, pool.ErrWrongCheckTimeout)
}

func TestPool_WaitConnected(t *testing.T) {
ctx, cancel := test_helpers.GetPoolConnectContext()
defer cancel()
connPool, err := pool.New(ctx, makeInstances(servers, connOpts))
require.NoError(t, err)
require.NotNil(t, connPool)
defer func() { _ = connPool.Close() }()

wctx, wcancel := context.WithTimeout(context.Background(), 10*time.Second)
defer wcancel()

require.NoError(t, connPool.WaitConnected(wctx, pool.ModeAny))
require.NoError(t, connPool.WaitConnected(wctx, pool.ModeRW))
require.NoError(t, connPool.WaitConnected(wctx, pool.ModePreferRO))
}

func TestPool_WaitConnected_context_canceled(t *testing.T) {
ctx, cancel := test_helpers.GetPoolConnectContext()
defer cancel()
// "err" does not resolve, so the pool stays without connections.
connPool, err := pool.New(ctx, makeInstances([]string{"err"}, connOpts))
require.NoError(t, err)
require.NotNil(t, connPool)
defer func() { _ = connPool.Close() }()

wctx, wcancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer wcancel()

err = connPool.WaitConnected(wctx, pool.ModeAny)
require.ErrorIs(t, err, context.DeadlineExceeded)
}

func TestPool_WaitConnected_closed(t *testing.T) {
ctx, cancel := test_helpers.GetPoolConnectContext()
defer cancel()
connPool, err := pool.New(ctx, makeInstances([]string{"err"}, connOpts))
require.NoError(t, err)
require.NotNil(t, connPool)
require.NoError(t, connPool.Close())

err = connPool.WaitConnected(context.Background(), pool.ModeAny)
require.ErrorIs(t, err, pool.ErrClosed)
}

func TestConnectWithOpts_error_reconnect(t *testing.T) {
ctx, cancel := test_helpers.GetPoolConnectContext()
defer cancel()
Expand Down
Loading
Loading