Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 18 additions & 26 deletions balance_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"pkg.akt.dev/go/util/pubsub"
netutil "pkg.akt.dev/node/v2/util/network"
"pkg.akt.dev/node/v2/util/runner"
"pkg.akt.dev/node/v2/x/escrow/client/util"

"github.com/akash-network/provider/event"
Expand All @@ -43,6 +42,7 @@ const (
type BalanceCheckerConfig struct {
WithdrawalPeriod time.Duration
LeaseFundsCheckInterval time.Duration
WithdrawalBatchMaxMsgs int
}

type leaseState struct {
Expand All @@ -60,6 +60,7 @@ type balanceChecker struct {
aqc aclient.QueryClient
leases map[mtypes.LeaseID]*leaseState
cfg BalanceCheckerConfig
batcher *withdrawBatcher
}

type leaseCheckResponse struct {
Expand All @@ -77,16 +78,18 @@ func newBalanceChecker(
bus pubsub.Bus,
cfg BalanceCheckerConfig,
) (*balanceChecker, error) {
bcLog := clientSession.Log().With("cmp", "balance-checker")
bc := &balanceChecker{
ctx: ctx,
session: clientSession,
log: clientSession.Log().With("cmp", "balance-checker"),
log: bcLog,
bus: bus,
lc: lifecycle.New(),
ownAddr: accAddr,
aqc: aqc,
leases: make(map[mtypes.LeaseID]*leaseState),
cfg: cfg,
batcher: newWithdrawBatcher(clientSession.Client().Tx(), bcLog.With("cmp", "withdraw-batcher"), withdrawTimeout, cfg.WithdrawalBatchMaxMsgs),
}

startCh := make(chan error, 1)
Expand Down Expand Up @@ -199,18 +202,6 @@ func (bc *balanceChecker) doEscrowCheck(ctx context.Context, lid mtypes.LeaseID,
return resp
}

func (bc *balanceChecker) startWithdraw(ctx context.Context, lid mtypes.LeaseID) error {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved to the withdrawal_batcher.go

ctx, cancel := context.WithTimeout(ctx, withdrawTimeout)
defer cancel()

msg := &mvbeta.MsgWithdrawLease{
ID: lid,
}

_, err := bc.session.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError())
return err
}

func (bc *balanceChecker) run(startCh chan<- error) {
ctx, cancel := context.WithCancel(bc.ctx)

Expand All @@ -228,16 +219,13 @@ func (bc *balanceChecker) run(startCh chan<- error) {
}()

leaseCheckCh := make(chan leaseCheckResponse, 1)
var resultch chan runner.Result

subscriber, err := bc.bus.Subscribe()
startCh <- err
if err != nil {
return
}

resultch = make(chan runner.Result, 1)

loop:
for {
select {
Expand Down Expand Up @@ -282,6 +270,7 @@ loop:
}

delete(bc.leases, ev.LeaseID)
bc.batcher.Remove(ev.LeaseID)
}
case res := <-leaseCheckCh:
// we may have timer fired just a heart beat ahead of lease remove event.
Expand Down Expand Up @@ -325,17 +314,20 @@ loop:
}

if withdraw {
go func() {
select {
case <-ctx.Done():
case resultch <- runner.NewResult(res.lid, bc.startWithdraw(ctx, res.lid)):
}
}()
bc.batcher.Enqueue(res.lid)
bc.batcher.Flush(ctx)
}
case res := <-resultch:
if err := res.Error(); err != nil {
bc.log.Error("failed to do lease withdrawal", "err", err, "LeaseID", res.Value().(mtypes.LeaseID))
case err := <-bc.batcher.Done():
bc.batcher.MarkDone()
if err != nil {
// Skip immediate re-flush on failure: let the next per-lease
// timer trigger Enqueue+Flush, which gives natural backoff.
// Pending ids stay queued; Enqueue dedupes so a lease that
// re-triggers while still pending won't duplicate the msg.
bc.log.Error("failed to do lease withdrawal", "err", err, "pending", bc.batcher.Pending())
continue loop
}
bc.batcher.Flush(ctx)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions bidengine/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ loop:
Sources: deposit.Sources{deposit.SourceBalance},
}, offer)
bidch = runner.Do(func() runner.Result {
return runner.NewResult(o.session.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError()))
return runner.NewResult(o.session.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError(), aclient.WithPriority()))
})

case result := <-bidch:
Expand Down Expand Up @@ -493,7 +493,7 @@ loop:
Reason: mtypes.LeaseClosedReasonUnspecified,
}

_, err := o.session.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError())
_, err := o.session.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError(), aclient.WithPriority())
if err != nil {
o.log.Error("closing bid", "error", err)
bidCounter.WithLabelValues("close", metricsutils.FailLabel).Inc()
Expand Down
8 changes: 4 additions & 4 deletions bidengine/order_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func makeMocks(s *orderTestScaffold) {
txMocks := &clientmocks.TxClient{}
s.broadcasts = make(chan []sdk.Msg, 1)

txMocks.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
txMocks.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
s.broadcasts <- args.Get(1).([]sdk.Msg)
}).Return(&sdk.Result{}, nil)

Expand Down Expand Up @@ -553,7 +553,7 @@ func Test_ShouldCloseBidWhenAlreadySetAndOld(t *testing.T) {
Reason: mtypes.LeaseClosedReasonUnspecified,
}}

scaffold.txClient.AssertCalled(t, "BroadcastMsgs", mock.Anything, expMsgs, mock.Anything)
scaffold.txClient.AssertCalled(t, "BroadcastMsgs", mock.Anything, expMsgs, mock.Anything, mock.Anything)
}

func Test_ShouldExitWhenAlreadySetAndLost(t *testing.T) {
Expand All @@ -578,7 +578,7 @@ func Test_ShouldExitWhenAlreadySetAndLost(t *testing.T) {
ID: mtypes.MakeBidID(order.orderID, scaffold.testAddr),
}

scaffold.txClient.AssertNotCalled(t, "BroadcastMsgs", mock.Anything, expMsgs, mock.Anything)
scaffold.txClient.AssertNotCalled(t, "BroadcastMsgs", mock.Anything, expMsgs, mock.Anything, mock.Anything)
}

func Test_ShouldCloseBidWhenAlreadySetAndThenTimeout(t *testing.T) {
Expand All @@ -605,7 +605,7 @@ func Test_ShouldCloseBidWhenAlreadySetAndThenTimeout(t *testing.T) {
Reason: mtypes.LeaseClosedReasonUnspecified,
},
}
scaffold.txClient.AssertCalled(t, "BroadcastMsgs", mock.Anything, expMsgs, mock.Anything)
scaffold.txClient.AssertCalled(t, "BroadcastMsgs", mock.Anything, expMsgs, mock.Anything, mock.Anything)

// Should have called unreserve
scaffold.cluster.AssertCalled(t, "Unreserve", scaffold.orderID)
Expand Down
5 changes: 5 additions & 0 deletions cluster/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,11 @@ func countPendingIPs(state *inventoryServiceState) uint {
}

func (is *inventoryService) handleRequest(req inventoryRequest, state *inventoryServiceState) {
if state.inventory == nil {
req.ch <- inventoryResponse{err: errInventoryNotAvailableYet}
return
}

// convert the resources to the committed amount
resourcesToCommit := is.resourcesToCommit(req.resources)
// create new registration if capacity available
Expand Down
2 changes: 1 addition & 1 deletion cluster/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (m *deploymentMonitor) runCloseLease(ctx context.Context) <-chan runner.Res
ID: m.deployment.LeaseID().BidID(),
Reason: mv1.LeaseClosedReasonUnstable,
}
res, err := m.session.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError())
res, err := m.session.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError(), aclient.WithPriority())
if err != nil {
m.log.Error("closing deployment", "err", err)
} else {
Expand Down
6 changes: 6 additions & 0 deletions cmd/provider-services/cmd/flags.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"fmt"
"time"

"github.com/go-acme/lego/v4/lego"
Expand Down Expand Up @@ -196,6 +197,11 @@ func addRunFlags(cmd *cobra.Command) error {
return err
}

cmd.Flags().Int(FlagWithdrawalBatchMaxMsgs, 50, fmt.Sprintf("max number of MsgWithdrawLease messages coalesced into a single broadcast. valid range [%d, %d]", withdrawalBatchMaxMsgsMin, withdrawalBatchMaxMsgsMax))
if err := viper.BindPFlag(FlagWithdrawalBatchMaxMsgs, cmd.Flags().Lookup(FlagWithdrawalBatchMaxMsgs)); err != nil {
return err
}

cmd.Flags().Duration(FlagLeaseFundsMonitorInterval, time.Minute*10, "interval at which lease is checked for funds available on the escrow accounts. >= 1m")
if err := viper.BindPFlag(FlagLeaseFundsMonitorInterval, cmd.Flags().Lookup(FlagLeaseFundsMonitorInterval)); err != nil {
return err
Expand Down
18 changes: 15 additions & 3 deletions cmd/provider-services/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ const (
FlagManifestTimeout = "manifest-timeout"
FlagMetricsListener = "metrics-listener"
FlagWithdrawalPeriod = "withdrawal-period"
FlagWithdrawalBatchMaxMsgs = "withdrawal-batch-max-msgs"
FlagLeaseFundsMonitorInterval = "lease-funds-monitor-interval"
FlagMinimumBalance = "minimum-balance"
FlagProviderConfig = "provider-config"
Expand Down Expand Up @@ -132,8 +133,10 @@ const (
)

const (
serviceIPOperator = "ip-operator"
serviceHostnameOperator = "hostname-operator"
serviceIPOperator = "ip-operator"
serviceHostnameOperator = "hostname-operator"
withdrawalBatchMaxMsgsMin = 10
withdrawalBatchMaxMsgsMax = 100
)

var (
Expand Down Expand Up @@ -189,6 +192,10 @@ func RunCmd() *cobra.Command {
Short: "run akash provider",
SilenceUsage: true,
PreRunE: func(cmd *cobra.Command, args []string) error {
// Store logger in context before TxPersistentPreRunE so that the
// serialBroadcaster (created during DiscoverClient) picks it up via ctxlog.Logger(ctx).
fromctx.CmdSetContextValue(cmd, fromctx.CtxKeyLogc, log.NewLogger(os.Stderr))
Comment thread
cloud-j-luna marked this conversation as resolved.

err := TxPersistentPreRunE(cmd, args)
if err != nil {
return err
Expand All @@ -205,6 +212,10 @@ func RunCmd() *cobra.Command {
return fmt.Errorf(`flag "%s" value must be > "%s"`, FlagWithdrawalPeriod, FlagLeaseFundsMonitorInterval) // nolint: err113
}

if maxMsgs := viper.GetInt(FlagWithdrawalBatchMaxMsgs); maxMsgs < withdrawalBatchMaxMsgsMin || maxMsgs > withdrawalBatchMaxMsgsMax {
return fmt.Errorf(`flag "%s" contains invalid value %d. expected range [%d, %d]`, FlagWithdrawalBatchMaxMsgs, maxMsgs, withdrawalBatchMaxMsgsMin, withdrawalBatchMaxMsgsMax) // nolint: err113
}

if viper.GetDuration(FlagMonitorRetryPeriod) < 4*time.Second {
return fmt.Errorf(`flag "%s" value must be > "%s"`, FlagMonitorRetryPeriod, 4*time.Second) // nolint: err113
}
Expand Down Expand Up @@ -265,7 +276,7 @@ func RunCmd() *cobra.Command {
return err
}

logger := log.NewLogger(os.Stderr)
logger := ctxlog.LogcFromCtx(cmd.Context())

kubeLog := logger.With("component", "k8s")

Expand Down Expand Up @@ -675,6 +686,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error {
config.BalanceCheckerCfg = provider.BalanceCheckerConfig{
WithdrawalPeriod: viper.GetDuration(FlagWithdrawalPeriod),
LeaseFundsCheckInterval: viper.GetDuration(FlagLeaseFundsMonitorInterval),
WithdrawalBatchMaxMsgs: viper.GetInt(FlagWithdrawalBatchMaxMsgs),
}

config.BidPricingStrategy = pricing
Expand Down
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewDefaultConfig() Config {
BalanceCheckerCfg: BalanceCheckerConfig{
LeaseFundsCheckInterval: 1 * time.Minute,
WithdrawalPeriod: 24 * time.Hour,
WithdrawalBatchMaxMsgs: 50,
},
MaxGroupVolumes: constants.DefaultMaxGroupVolumes,
Config: cluster.NewDefaultConfig(),
Expand Down
2 changes: 1 addition & 1 deletion manifest/watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
Reason: mtypes.LeaseClosedReasonManifestTimeout,
}

return runner.NewResult(wd.sess.Client().Tx().BroadcastMsgs(wd.ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError()))
return runner.NewResult(wd.sess.Client().Tx().BroadcastMsgs(wd.ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError(), aclient.WithPriority())) // bidClose should be priority to release the reserved resources ASAP

Check failure on line 79 in manifest/watchdog.go

View workflow job for this annotation

GitHub Actions / build-bins

undefined: aclient.WithPriority

Check failure on line 79 in manifest/watchdog.go

View workflow job for this annotation

GitHub Actions / lint

undefined: aclient.WithPriority (typecheck)

Check failure on line 79 in manifest/watchdog.go

View workflow job for this annotation

GitHub Actions / tests

undefined: aclient.WithPriority
})
case err = <-wd.lc.ShutdownRequest():
}
Expand Down
2 changes: 1 addition & 1 deletion manifest/watchdog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func makeWatchdogTestScaffold(t *testing.T, timeout time.Duration) (*watchdog, *
scaffold.broadcasts = make(chan []sdk.Msg, 1)

txClientMock := &clientmocks.TxClient{}
txClientMock.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
txClientMock.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
scaffold.broadcasts <- args.Get(1).([]sdk.Msg)
}).Return(&sdk.Result{}, nil)

Expand Down
Loading
Loading