Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 16 additions & 25 deletions balance_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"pkg.akt.dev/go/util/pubsub"
netutil "pkg.akt.dev/node/v2/util/network"
"pkg.akt.dev/node/v2/util/runner"
"pkg.akt.dev/node/v2/x/escrow/client/util"

"github.com/akash-network/provider/event"
Expand All @@ -43,6 +42,7 @@ const (
type BalanceCheckerConfig struct {
WithdrawalPeriod time.Duration
LeaseFundsCheckInterval time.Duration
WithdrawalBatchMaxMsgs int
}

type leaseState struct {
Expand All @@ -60,6 +60,7 @@ type balanceChecker struct {
aqc aclient.QueryClient
leases map[mtypes.LeaseID]*leaseState
cfg BalanceCheckerConfig
batcher *withdrawBatcher
}

type leaseCheckResponse struct {
Expand Down Expand Up @@ -87,6 +88,7 @@ func newBalanceChecker(
aqc: aqc,
leases: make(map[mtypes.LeaseID]*leaseState),
cfg: cfg,
batcher: newWithdrawBatcher(clientSession.Client().Tx(), withdrawTimeout, cfg.WithdrawalBatchMaxMsgs),
}

startCh := make(chan error, 1)
Expand Down Expand Up @@ -199,18 +201,6 @@ func (bc *balanceChecker) doEscrowCheck(ctx context.Context, lid mtypes.LeaseID,
return resp
}

func (bc *balanceChecker) startWithdraw(ctx context.Context, lid mtypes.LeaseID) error {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved to the withdrawal_batcher.go

ctx, cancel := context.WithTimeout(ctx, withdrawTimeout)
defer cancel()

msg := &mvbeta.MsgWithdrawLease{
ID: lid,
}

_, err := bc.session.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError())
return err
}

func (bc *balanceChecker) run(startCh chan<- error) {
ctx, cancel := context.WithCancel(bc.ctx)

Expand All @@ -228,16 +218,13 @@ func (bc *balanceChecker) run(startCh chan<- error) {
}()

leaseCheckCh := make(chan leaseCheckResponse, 1)
var resultch chan runner.Result

subscriber, err := bc.bus.Subscribe()
startCh <- err
if err != nil {
return
}

resultch = make(chan runner.Result, 1)

loop:
for {
select {
Expand Down Expand Up @@ -282,6 +269,7 @@ loop:
}

delete(bc.leases, ev.LeaseID)
bc.batcher.Remove(ev.LeaseID)
}
case res := <-leaseCheckCh:
// we may have timer fired just a heart beat ahead of lease remove event.
Expand Down Expand Up @@ -325,17 +313,20 @@ loop:
}

if withdraw {
go func() {
select {
case <-ctx.Done():
case resultch <- runner.NewResult(res.lid, bc.startWithdraw(ctx, res.lid)):
}
}()
bc.batcher.Enqueue(res.lid)
bc.batcher.Flush(ctx)
}
case res := <-resultch:
if err := res.Error(); err != nil {
bc.log.Error("failed to do lease withdrawal", "err", err, "LeaseID", res.Value().(mtypes.LeaseID))
case err := <-bc.batcher.Done():
bc.batcher.MarkDone()
if err != nil {
// Skip immediate re-flush on failure: let the next per-lease
// timer trigger Enqueue+Flush, which gives natural backoff.
// Pending ids stay queued; Enqueue dedupes so a lease that
// re-triggers while still pending won't duplicate the msg.
bc.log.Error("failed to do lease withdrawal", "err", err, "pending", bc.batcher.Pending())
continue loop
}
bc.batcher.Flush(ctx)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions bidengine/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ loop:
Sources: deposit.Sources{deposit.SourceBalance},
}, offer)
bidch = runner.Do(func() runner.Result {
return runner.NewResult(o.session.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError()))
return runner.NewResult(o.session.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError(), aclient.WithPriority()))
})

case result := <-bidch:
Expand Down Expand Up @@ -493,7 +493,7 @@ loop:
Reason: mtypes.LeaseClosedReasonUnspecified,
}

_, err := o.session.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError())
_, err := o.session.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError(), aclient.WithPriority())
if err != nil {
o.log.Error("closing bid", "error", err)
bidCounter.WithLabelValues("close", metricsutils.FailLabel).Inc()
Expand Down
8 changes: 4 additions & 4 deletions bidengine/order_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func makeMocks(s *orderTestScaffold) {
txMocks := &clientmocks.TxClient{}
s.broadcasts = make(chan []sdk.Msg, 1)

txMocks.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
txMocks.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
s.broadcasts <- args.Get(1).([]sdk.Msg)
}).Return(&sdk.Result{}, nil)

Expand Down Expand Up @@ -553,7 +553,7 @@ func Test_ShouldCloseBidWhenAlreadySetAndOld(t *testing.T) {
Reason: mtypes.LeaseClosedReasonUnspecified,
}}

scaffold.txClient.AssertCalled(t, "BroadcastMsgs", mock.Anything, expMsgs, mock.Anything)
scaffold.txClient.AssertCalled(t, "BroadcastMsgs", mock.Anything, expMsgs, mock.Anything, mock.Anything)
}

func Test_ShouldExitWhenAlreadySetAndLost(t *testing.T) {
Expand All @@ -578,7 +578,7 @@ func Test_ShouldExitWhenAlreadySetAndLost(t *testing.T) {
ID: mtypes.MakeBidID(order.orderID, scaffold.testAddr),
}

scaffold.txClient.AssertNotCalled(t, "BroadcastMsgs", mock.Anything, expMsgs, mock.Anything)
scaffold.txClient.AssertNotCalled(t, "BroadcastMsgs", mock.Anything, expMsgs, mock.Anything, mock.Anything)
}

func Test_ShouldCloseBidWhenAlreadySetAndThenTimeout(t *testing.T) {
Expand All @@ -605,7 +605,7 @@ func Test_ShouldCloseBidWhenAlreadySetAndThenTimeout(t *testing.T) {
Reason: mtypes.LeaseClosedReasonUnspecified,
},
}
scaffold.txClient.AssertCalled(t, "BroadcastMsgs", mock.Anything, expMsgs, mock.Anything)
scaffold.txClient.AssertCalled(t, "BroadcastMsgs", mock.Anything, expMsgs, mock.Anything, mock.Anything)

// Should have called unreserve
scaffold.cluster.AssertCalled(t, "Unreserve", scaffold.orderID)
Expand Down
2 changes: 1 addition & 1 deletion cluster/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (m *deploymentMonitor) runCloseLease(ctx context.Context) <-chan runner.Res
ID: m.deployment.LeaseID().BidID(),
Reason: mv1.LeaseClosedReasonUnstable,
}
res, err := m.session.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError())
res, err := m.session.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError(), aclient.WithPriority())
if err != nil {
m.log.Error("closing deployment", "err", err)
} else {
Expand Down
5 changes: 5 additions & 0 deletions cmd/provider-services/cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ func addRunFlags(cmd *cobra.Command) error {
return err
}

cmd.Flags().Int(FlagWithdrawalBatchMaxMsgs, 50, "max number of MsgWithdrawLease messages coalesced into a single broadcast. valid range [10, 100]")
if err := viper.BindPFlag(FlagWithdrawalBatchMaxMsgs, cmd.Flags().Lookup(FlagWithdrawalBatchMaxMsgs)); err != nil {
return err
}

cmd.Flags().Duration(FlagLeaseFundsMonitorInterval, time.Minute*10, "interval at which lease is checked for funds available on the escrow accounts. >= 1m")
if err := viper.BindPFlag(FlagLeaseFundsMonitorInterval, cmd.Flags().Lookup(FlagLeaseFundsMonitorInterval)); err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions cmd/provider-services/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ const (
FlagManifestTimeout = "manifest-timeout"
FlagMetricsListener = "metrics-listener"
FlagWithdrawalPeriod = "withdrawal-period"
FlagWithdrawalBatchMaxMsgs = "withdrawal-batch-max-msgs"
FlagLeaseFundsMonitorInterval = "lease-funds-monitor-interval"
FlagMinimumBalance = "minimum-balance"
FlagProviderConfig = "provider-config"
Expand Down Expand Up @@ -205,6 +206,10 @@ func RunCmd() *cobra.Command {
return fmt.Errorf(`flag "%s" value must be > "%s"`, FlagWithdrawalPeriod, FlagLeaseFundsMonitorInterval) // nolint: err113
}

if maxMsgs := viper.GetInt(FlagWithdrawalBatchMaxMsgs); maxMsgs < 10 || maxMsgs > 100 {
Comment thread
cloud-j-luna marked this conversation as resolved.
Outdated
return fmt.Errorf(`flag "%s" contains invalid value %d. expected range [10, 100]`, FlagWithdrawalBatchMaxMsgs, maxMsgs) // nolint: err113
}

if viper.GetDuration(FlagMonitorRetryPeriod) < 4*time.Second {
return fmt.Errorf(`flag "%s" value must be > "%s"`, FlagMonitorRetryPeriod, 4*time.Second) // nolint: err113
}
Expand Down Expand Up @@ -675,6 +680,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error {
config.BalanceCheckerCfg = provider.BalanceCheckerConfig{
WithdrawalPeriod: viper.GetDuration(FlagWithdrawalPeriod),
LeaseFundsCheckInterval: viper.GetDuration(FlagLeaseFundsMonitorInterval),
WithdrawalBatchMaxMsgs: viper.GetInt(FlagWithdrawalBatchMaxMsgs),
}

config.BidPricingStrategy = pricing
Expand Down
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewDefaultConfig() Config {
BalanceCheckerCfg: BalanceCheckerConfig{
LeaseFundsCheckInterval: 1 * time.Minute,
WithdrawalPeriod: 24 * time.Hour,
WithdrawalBatchMaxMsgs: 50,
},
MaxGroupVolumes: constants.DefaultMaxGroupVolumes,
Config: cluster.NewDefaultConfig(),
Expand Down
2 changes: 1 addition & 1 deletion manifest/watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
Reason: mtypes.LeaseClosedReasonManifestTimeout,
}

return runner.NewResult(wd.sess.Client().Tx().BroadcastMsgs(wd.ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError()))
return runner.NewResult(wd.sess.Client().Tx().BroadcastMsgs(wd.ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError(), aclient.WithPriority())) // bidClose should be priority to release the reserved resources ASAP

Check failure on line 79 in manifest/watchdog.go

View workflow job for this annotation

GitHub Actions / lint

undefined: aclient.WithPriority (typecheck)

Check failure on line 79 in manifest/watchdog.go

View workflow job for this annotation

GitHub Actions / tests

undefined: aclient.WithPriority

Check failure on line 79 in manifest/watchdog.go

View workflow job for this annotation

GitHub Actions / build-bins

undefined: aclient.WithPriority
})
case err = <-wd.lc.ShutdownRequest():
}
Expand Down
2 changes: 1 addition & 1 deletion manifest/watchdog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func makeWatchdogTestScaffold(t *testing.T, timeout time.Duration) (*watchdog, *
scaffold.broadcasts = make(chan []sdk.Msg, 1)

txClientMock := &clientmocks.TxClient{}
txClientMock.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
txClientMock.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
scaffold.broadcasts <- args.Get(1).([]sdk.Msg)
}).Return(&sdk.Result{}, nil)

Expand Down
131 changes: 131 additions & 0 deletions withdraw_batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package provider

import (
"context"
"fmt"
"slices"
"time"

sdk "github.com/cosmos/cosmos-sdk/types"

aclient "pkg.akt.dev/go/node/client/v1beta3"
mtypes "pkg.akt.dev/go/node/market/v1"
mvbeta "pkg.akt.dev/go/node/market/v1beta5"
)

// withdrawBatcher coalesces MsgWithdrawLease requests into single multi-msg
// transactions using opportunistic in-flight batching:
//
// - Idle: Flush fires a 1-msg TX immediately.
// - In-flight: subsequent Enqueue calls accumulate in pending.
// - On MarkDone: callers invoke Flush which drains up to maxMsgs from pending.
//
// Not safe for concurrent use. All methods except the internal broadcast
Comment thread
cloud-j-luna marked this conversation as resolved.
Outdated
// goroutine must be called from a single goroutine.
type withdrawBatcher struct {
tx aclient.TxClient
timeout time.Duration
maxMsgs int

pending []mtypes.LeaseID
inFlight bool
doneCh chan error
}

func newWithdrawBatcher(tx aclient.TxClient, timeout time.Duration, maxMsgs int) *withdrawBatcher {
if maxMsgs < 1 {
panic(fmt.Sprintf("withdrawBatcher: maxMsgs must be >= 1, got %d", maxMsgs))
}
return &withdrawBatcher{
tx: tx,
timeout: timeout,
maxMsgs: maxMsgs,
doneCh: make(chan error, 1),
}
}

// After an in-flight broadcast fails, items coalesced during the in-flight
// window remain in pending (run-loop skips re-flush on error for natural
// backoff). If the same lease re-triggers before pending drains, Enqueue must
// dedupe so the next batch doesn't carry a duplicate MsgWithdrawLease, which
// would risk failing the entire atomic tx on the second message.
func (b *withdrawBatcher) Enqueue(lid mtypes.LeaseID) {
if slices.Contains(b.pending, lid) {
return
}
b.pending = append(b.pending, lid)
}

// Remove drops a lease id from the pending batch.
// Does not affect an in-flight broadcast.
func (b *withdrawBatcher) Remove(lid mtypes.LeaseID) {
b.pending = slices.DeleteFunc(b.pending, func(p mtypes.LeaseID) bool {
return p == lid
})
}

// InFlight reports whether a broadcast is currently running.
func (b *withdrawBatcher) InFlight() bool {
return b.inFlight
}

// Pending reports the number of queued lease ids not yet broadcast.
func (b *withdrawBatcher) Pending() int {
return len(b.pending)
}

// Flush starts a broadcast with up to maxMsgs pending lease ids when idle.
// Returns true if a broadcast was started, false if nothing to do or already in-flight.
func (b *withdrawBatcher) Flush(ctx context.Context) bool {
if b.inFlight || len(b.pending) == 0 {
return false
}

n := len(b.pending)
if n > b.maxMsgs {
n = b.maxMsgs
}
Comment thread
cloud-j-luna marked this conversation as resolved.
Outdated

batch := make([]mtypes.LeaseID, n)
copy(batch, b.pending[:n])
b.pending = b.pending[n:]
b.inFlight = true

go func() {
err := b.broadcast(ctx, batch)
select {
case <-ctx.Done():
case b.doneCh <- err:
}
}()

return true
}

// Done returns a channel that delivers the broadcast result of each completed batch.
// Callers must invoke MarkDone after reading to unblock the next Flush.
func (b *withdrawBatcher) Done() <-chan error {
return b.doneCh
}

// MarkDone clears the in-flight flag. Must be called after reading Done().
func (b *withdrawBatcher) MarkDone() {
b.inFlight = false
}

func (b *withdrawBatcher) broadcast(ctx context.Context, lids []mtypes.LeaseID) error {
if len(lids) == 0 {
return nil
}

ctx, cancel := context.WithTimeout(ctx, b.timeout)
defer cancel()

msgs := make([]sdk.Msg, 0, len(lids))
for _, lid := range lids {
msgs = append(msgs, &mvbeta.MsgWithdrawLease{ID: lid})
}

_, err := b.tx.BroadcastMsgs(ctx, msgs, aclient.WithResultCodeAsError())
return err
}
Loading
Loading