diff --git a/bidengine/bid_batcher.go b/bidengine/bid_batcher.go new file mode 100644 index 00000000..6c55ecfc --- /dev/null +++ b/bidengine/bid_batcher.go @@ -0,0 +1,188 @@ +package bidengine + +import ( + "context" + "fmt" + "regexp" + "slices" + "strconv" + "sync/atomic" + "time" + + "cosmossdk.io/log" + sdk "github.com/cosmos/cosmos-sdk/types" + + aclient "pkg.akt.dev/go/node/client/v1beta3" +) + +// bidRequest is a single MsgCreateBid submission with a reply channel. +type bidRequest struct { + msg sdk.Msg + replyCh chan error +} + +// reMsgIndex matches the Cosmos SDK error format "message index: N" to identify +// which message in a multi-msg tx failed. +var reMsgIndex = regexp.MustCompile(`message index:\s*(\d+)`) + +// parseMsgFailIndex extracts the 0-based failing message index from a Cosmos SDK +// tx error. Returns -1 if the error does not identify a specific message. +func parseMsgFailIndex(err error) int { + m := reMsgIndex.FindStringSubmatch(err.Error()) + if m == nil { + return -1 + } + idx, e := strconv.Atoi(m[1]) + if e != nil { + return -1 + } + return idx +} + +// bidBatcher coalesces MsgCreateBid requests into single multi-msg transactions +// using opportunistic in-flight batching. +// +// On broadcast failure, it parses the Cosmos SDK error to find which message +// failed, fans the error to that caller, removes it, and retries the remaining +// messages. This continues until all messages are resolved individually. +// +// Not safe for concurrent use. All public methods must be called from a single +// goroutine. Concurrent calls panic to surface developer mistakes early. +type bidBatcher struct { + tx aclient.TxClient + log log.Logger + timeout time.Duration + maxMsgs int + + inUse atomic.Bool + + pending []bidRequest + inFlight bool + doneCh chan struct{} +} + +func (b *bidBatcher) enter() { + if b.inUse.Swap(true) { + panic("bidBatcher: concurrent use detected") + } +} + +func (b *bidBatcher) exit() { + b.inUse.Store(false) +} + +func newBidBatcher(tx aclient.TxClient, logger log.Logger, timeout time.Duration, maxMsgs int) *bidBatcher { + if maxMsgs < 1 { + panic(fmt.Sprintf("bidBatcher: maxMsgs must be >= 1, got %d", maxMsgs)) + } + return &bidBatcher{ + tx: tx, + log: logger, + timeout: timeout, + maxMsgs: maxMsgs, + doneCh: make(chan struct{}, 1), + } +} + +func (b *bidBatcher) InFlight() bool { + b.enter() + defer b.exit() + return b.inFlight +} + +func (b *bidBatcher) Pending() int { + b.enter() + defer b.exit() + return len(b.pending) +} + +func (b *bidBatcher) Enqueue(req bidRequest) { + b.enter() + defer b.exit() + b.pending = append(b.pending, req) + b.log.Debug("bid batcher: enqueue", "pending", len(b.pending), "inFlight", b.inFlight) +} + +// Flush starts a broadcast with up to maxMsgs pending requests when idle. +// Returns true if a broadcast was started. +func (b *bidBatcher) Flush(ctx context.Context) bool { + b.enter() + defer b.exit() + if len(b.pending) == 0 { + return false + } + if b.inFlight { + b.log.Debug("bid batcher: flush skipped (in-flight)", "pending", len(b.pending)) + return false + } + + n := min(len(b.pending), b.maxMsgs) + + batch := make([]bidRequest, n) + copy(batch, b.pending[:n]) + b.pending = b.pending[n:] + b.inFlight = true + + b.log.Info("bid batcher: flush", "batch", n, "remaining", len(b.pending), "maxMsgs", b.maxMsgs) + + go func() { + b.broadcastWithRetry(ctx, batch) + select { + case <-ctx.Done(): + case b.doneCh <- struct{}{}: + } + }() + + return true +} + +// Done returns a channel that signals when the current batch is fully resolved. +// Call MarkDone after receiving, then Flush to start the next batch. +func (b *bidBatcher) Done() <-chan struct{} { + return b.doneCh +} + +// MarkDone clears the in-flight flag. Must be called after receiving from Done(). +func (b *bidBatcher) MarkDone() { + b.enter() + defer b.exit() + b.inFlight = false +} + +// broadcastWithRetry broadcasts remaining requests, removing individual failures +// by parsing the Cosmos SDK "message index: N" error. Each resolved request +// receives its own error or nil via its replyCh. +func (b *bidBatcher) broadcastWithRetry(ctx context.Context, batch []bidRequest) { + for len(batch) > 0 { + msgs := make([]sdk.Msg, len(batch)) + for i, req := range batch { + msgs[i] = req.msg + } + + broadcastCtx, cancel := context.WithTimeout(ctx, b.timeout) + _, err := b.tx.BroadcastMsgs(broadcastCtx, msgs, aclient.WithResultCodeAsError(), aclient.WithPriority()) + cancel() + + if err == nil { + b.log.Info("bid batcher: batch succeeded", "count", len(batch)) + for _, req := range batch { + req.replyCh <- nil + } + return + } + + idx := parseMsgFailIndex(err) + if idx < 0 || idx >= len(batch) { + // Error is not message-specific (e.g. network/sequence error): fail all. + b.log.Error("bid batcher: unrecoverable batch error", "err", err, "remaining", len(batch)) + for _, req := range batch { + req.replyCh <- err + } + return + } + + b.log.Error("bid batcher: message failed, retrying remainder", "idx", idx, "err", err, "remaining", len(batch)-1) + batch[idx].replyCh <- err + batch = slices.Delete(batch, idx, idx+1) + } +} diff --git a/bidengine/bid_batcher_test.go b/bidengine/bid_batcher_test.go new file mode 100644 index 00000000..133b1ca6 --- /dev/null +++ b/bidengine/bid_batcher_test.go @@ -0,0 +1,281 @@ +package bidengine + +import ( + "context" + "errors" + "fmt" + "io" + "testing" + "time" + + "cosmossdk.io/log" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + sdk "github.com/cosmos/cosmos-sdk/types" + + clientmocks "pkg.akt.dev/go/mocks/node/client" +) + +const twoSecondsTestTimeout = 2 * time.Second + +func testBidLogger() log.Logger { return log.NewLogger(io.Discard) } + +func newBidReq() bidRequest { + return bidRequest{ + msg: &sdk.TxResponse{}, // placeholder non-nil msg + replyCh: make(chan error, 1), + } +} + +func waitReply(t *testing.T, req bidRequest) error { + t.Helper() + select { + case err := <-req.replyCh: + return err + case <-time.After(twoSecondsTestTimeout): + t.Fatal("timed out waiting for reply") + return nil + } +} + +func waitDone(t *testing.T, b *bidBatcher) { + t.Helper() + select { + case <-b.Done(): + case <-time.After(twoSecondsTestTimeout): + t.Fatal("timed out waiting for batcher done") + } +} + +// bidFixture sets up a bidBatcher with a controllable mock TxClient. +type bidFixture struct { + t *testing.T + tx *clientmocks.TxClient + batcher *bidBatcher + release chan struct{} + captured chan []sdk.Msg +} + +func newBidFixture(t *testing.T, maxMsgs int, block bool) *bidFixture { + t.Helper() + f := &bidFixture{ + t: t, + tx: &clientmocks.TxClient{}, + release: make(chan struct{}), + captured: make(chan []sdk.Msg, 16), + } + f.tx.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + f.captured <- args.Get(1).([]sdk.Msg) + if block { + <-f.release + } + }). + Return(&sdk.TxResponse{}, nil) + f.batcher = newBidBatcher(f.tx, testBidLogger(), time.Second, maxMsgs) + return f +} + +func (f *bidFixture) waitCaptured() []sdk.Msg { + f.t.Helper() + select { + case msgs := <-f.captured: + return msgs + case <-time.After(twoSecondsTestTimeout): + f.t.Fatal("timed out waiting for broadcast") + return nil + } +} + +// --- parseMsgFailIndex --- + +func TestParseMsgFailIndex(t *testing.T) { + tests := []struct { + name string + errMsg string + expected int + }{ + {name: "valid_index", errMsg: "tx failed: message index: 2: insufficient funds", expected: 2}, + {name: "zero_index", errMsg: "message index: 0: bad", expected: 0}, + {name: "no_index", errMsg: "out of gas", expected: -1}, + {name: "extra_whitespace", errMsg: "message index: 7: bad msg", expected: 7}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expected, parseMsgFailIndex(errors.New(tc.errMsg))) + }) + } +} + +// --- Flush / Done / MarkDone state machine --- + +func TestBidBatcher_idle_flush_succeeds(t *testing.T) { + f := newBidFixture(t, 10, false) + req := newBidReq() + + f.batcher.Enqueue(req) + require.True(t, f.batcher.Flush(context.Background())) + + msgs := f.waitCaptured() + require.Len(t, msgs, 1) + require.NoError(t, waitReply(t, req)) + waitDone(t, f.batcher) +} + +func TestBidBatcher_flush_when_empty_is_noop(t *testing.T) { + b := newBidBatcher(&clientmocks.TxClient{}, testBidLogger(), time.Second, 10) + require.False(t, b.Flush(context.Background())) +} + +func TestBidBatcher_flush_when_in_flight_is_noop(t *testing.T) { + f := newBidFixture(t, 10, true) + req1 := newBidReq() + req2 := newBidReq() + + f.batcher.Enqueue(req1) + require.True(t, f.batcher.Flush(context.Background())) + f.waitCaptured() + + // Now in-flight: second Flush must return false. + f.batcher.Enqueue(req2) + require.False(t, f.batcher.Flush(context.Background())) + + // Unblock and drain. + close(f.release) + waitDone(t, f.batcher) + require.NoError(t, waitReply(t, req1)) +} + +func TestBidBatcher_respects_max_msgs(t *testing.T) { + f := newBidFixture(t, 2, false) + reqs := make([]bidRequest, 5) + for i := range reqs { + reqs[i] = newBidReq() + f.batcher.Enqueue(reqs[i]) + } + + require.True(t, f.batcher.Flush(context.Background())) + msgs := f.waitCaptured() + require.Len(t, msgs, 2) + require.Equal(t, 3, f.batcher.Pending()) + + waitDone(t, f.batcher) + f.batcher.MarkDone() + + // Second flush sends next 2. + require.True(t, f.batcher.Flush(context.Background())) + msgs = f.waitCaptured() + require.Len(t, msgs, 2) +} + +func TestBidBatcher_all_replies_nil_on_success(t *testing.T) { + f := newBidFixture(t, 10, false) + reqs := make([]bidRequest, 3) + for i := range reqs { + reqs[i] = newBidReq() + f.batcher.Enqueue(reqs[i]) + } + f.batcher.Flush(context.Background()) + f.waitCaptured() + waitDone(t, f.batcher) + + for _, req := range reqs { + require.NoError(t, waitReply(t, req)) + } +} + +// --- Error handling --- + +func TestBidBatcher_unrecoverable_error_fails_all(t *testing.T) { + tx := &clientmocks.TxClient{} + broadcastErr := errors.New("out of gas") + tx.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything). + Return(nil, broadcastErr) + + b := newBidBatcher(tx, testBidLogger(), time.Second, 10) + reqs := []bidRequest{newBidReq(), newBidReq(), newBidReq()} + for _, r := range reqs { + b.Enqueue(r) + } + b.Flush(context.Background()) + waitDone(t, b) + + for _, req := range reqs { + err := waitReply(t, req) + require.ErrorIs(t, err, broadcastErr) + } +} + +func TestBidBatcher_smart_retry_removes_failing_msg(t *testing.T) { + tx := &clientmocks.TxClient{} + badErr := fmt.Errorf("tx failed: message index: 1: bad msg") + // First broadcast of 3 msgs fails at index 1. + tx.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything). + Return(nil, badErr).Once() + // Retry of remaining 2 msgs (index 0 and 2 from original) succeeds. + tx.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything). + Return(&sdk.TxResponse{}, nil).Once() + + b := newBidBatcher(tx, testBidLogger(), time.Second, 10) + req0 := newBidReq() + req1 := newBidReq() + req2 := newBidReq() + b.Enqueue(req0) + b.Enqueue(req1) + b.Enqueue(req2) + b.Flush(context.Background()) + waitDone(t, b) + + require.NoError(t, waitReply(t, req0)) + require.ErrorIs(t, waitReply(t, req1), badErr) + require.NoError(t, waitReply(t, req2)) +} + +func TestBidBatcher_smart_retry_cascade(t *testing.T) { + tx := &clientmocks.TxClient{} + err0 := fmt.Errorf("message index: 0: bad") + err1 := fmt.Errorf("message index: 0: bad") + // Broadcast 3 msgs: index 0 fails. + tx.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything). + Return(nil, err0).Once() + // Retry 2 remaining: index 0 (originally index 1) fails. + tx.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything). + Return(nil, err1).Once() + // Retry 1 remaining: succeeds. + tx.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything). + Return(&sdk.TxResponse{}, nil).Once() + + b := newBidBatcher(tx, testBidLogger(), time.Second, 10) + reqs := []bidRequest{newBidReq(), newBidReq(), newBidReq()} + for _, r := range reqs { + b.Enqueue(r) + } + b.Flush(context.Background()) + waitDone(t, b) + + require.ErrorIs(t, waitReply(t, reqs[0]), err0) + require.ErrorIs(t, waitReply(t, reqs[1]), err1) + require.NoError(t, waitReply(t, reqs[2])) +} + +func TestBidBatcher_context_cancel_stops_broadcast(t *testing.T) { + tx := &clientmocks.TxClient{} + started := make(chan struct{}) + tx.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { close(started) }). + Return(nil, context.Canceled) + + ctx, cancel := context.WithCancel(context.Background()) + b := newBidBatcher(tx, testBidLogger(), time.Second, 10) + req := newBidReq() + b.Enqueue(req) + b.Flush(ctx) + + <-started + cancel() + + waitDone(t, b) + require.ErrorIs(t, waitReply(t, req), context.Canceled) +} diff --git a/bidengine/config.go b/bidengine/config.go index e37b1ece..ffb53cb9 100644 --- a/bidengine/config.go +++ b/bidengine/config.go @@ -10,9 +10,10 @@ import ( // Config represents the configuration parameters for the bid engine. // It controls pricing, deposits, timeouts and provider capabilities and attributes type Config struct { - PricingStrategy BidPricingStrategy - Deposit sdk.Coin - BidTimeout time.Duration - Attributes atttypes.Attributes - MaxGroupVolumes int + PricingStrategy BidPricingStrategy + Deposit sdk.Coin + BidTimeout time.Duration + BidBatchMaxMsgs int + Attributes atttypes.Attributes + MaxGroupVolumes int } diff --git a/bidengine/order.go b/bidengine/order.go index 25ba1406..4c5e0a2f 100644 --- a/bidengine/order.go +++ b/bidengine/order.go @@ -53,6 +53,9 @@ type order struct { // reservationFulfilledNotify is the channel to notify when resources are reserved. reservationFulfilledNotify chan<- int + // bidSubmitCh is the service-level channel for submitting MsgCreateBid for batching. + bidSubmitCh chan<- bidRequest + // log is the logger instance log log.Logger @@ -124,6 +127,7 @@ func newOrderInternal(svc *service, oid mtypes.OrderID, cfg Config, pass Provide log: log, lc: lifecycle.New(), reservationFulfilledNotify: reservationFulfilledNotify, // Normally nil in production + bidSubmitCh: svc.bidSubmitCh, pass: pass, } @@ -182,8 +186,8 @@ func (o *order) run(checkForExistingBid bool) { storedGroupCh <-chan runner.Result // Channel for receiving cluster reservation result. clusterch <-chan runner.Result - // Channel for receiving bid creation transaction result. - bidch <-chan runner.Result + // Channel for receiving bid creation transaction result (reply from bidBatcher). + bidch <-chan error // Channel for receiving bid price calculation result. pricech <-chan runner.Result // Channel for receiving existing bid query result. @@ -430,20 +434,24 @@ loop: offer := mvbeta.ResourceOfferFromRU(reservation.GetAllocatedResources()) - // Begin submitting fulfillment + // Enqueue MsgCreateBid in the service-level bid batcher. msg = mvbeta.NewMsgCreateBid(mtypes.MakeBidID(o.orderID, o.session.Provider().Address()), price, deposit.Deposit{ Amount: o.cfg.Deposit, Sources: deposit.Sources{deposit.SourceBalance}, }, offer) - bidch = runner.Do(func() runner.Result { - return runner.NewResult(o.session.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError(), aclient.WithPriority())) - }) + replyCh := make(chan error, 1) + select { + case o.bidSubmitCh <- bidRequest{msg: msg, replyCh: replyCh}: + bidch = replyCh + case <-o.lc.ShutdownRequest(): + break loop + } - case result := <-bidch: + case err := <-bidch: bidch = nil - if result.Error() != nil { + if err != nil { bidCounter.WithLabelValues(metricsutils.OpenLabel, metricsutils.FailLabel).Inc() - o.log.Error("bid failed", "err", result.Error()) + o.log.Error("bid failed", "err", err) break loop } diff --git a/bidengine/service.go b/bidengine/service.go index 55ebd081..74e9444f 100644 --- a/bidengine/service.go +++ b/bidengine/service.go @@ -3,6 +3,7 @@ package bidengine import ( "context" "errors" + "time" "github.com/boz/go-lifecycle" "github.com/prometheus/client_golang/prometheus" @@ -34,6 +35,8 @@ var ordersCounter = promauto.NewCounterVec(prometheus.CounterOpts{ // ErrNotRunning declares new error with message "not running" var ErrNotRunning = errors.New("not running") +const broadcastTimeout = 30 * time.Second + // StatusClient interface predefined with Status method type StatusClient interface { Status(context.Context) (*apclient.BidEngineStatus, error) @@ -78,21 +81,32 @@ func NewService( ctx, cancel := context.WithCancel(pctx) group, _ := errgroup.WithContext(ctx) + if cfg.BidBatchMaxMsgs <= 0 { + panic("BidBatchMaxMsgs must be > 0") + } + s := &service{ - session: session, - cluster: cluster, - bus: bus, - sub: sub, - statusch: make(chan chan<- *apclient.BidEngineStatus), - orders: make(map[string]*order), - drainch: make(chan *order), - ordersch: make(chan []mtypes.OrderID, 1000), - group: group, - cancel: cancel, - lc: lifecycle.New(), - cfg: cfg, - pass: providerAttrService, - waiter: waiter, + session: session, + cluster: cluster, + bus: bus, + sub: sub, + statusch: make(chan chan<- *apclient.BidEngineStatus), + orders: make(map[string]*order), + drainch: make(chan *order), + ordersch: make(chan []mtypes.OrderID, 1000), + bidSubmitCh: make(chan bidRequest, 100), + bidBatcher: newBidBatcher( + session.Client().Tx(), + session.Log().With("cmp", "bid-batcher"), + broadcastTimeout, + cfg.BidBatchMaxMsgs, + ), + group: group, + cancel: cancel, + lc: lifecycle.New(), + cfg: cfg, + pass: providerAttrService, + waiter: waiter, } go s.lc.WatchContext(pctx) @@ -128,6 +142,11 @@ type service struct { // ordersch receives new orders to process from the blockchain. ordersch chan []mtypes.OrderID + // bidSubmitCh receives MsgCreateBid requests from order goroutines. + bidSubmitCh chan bidRequest + // bidBatcher coalesces MsgCreateBid requests into batched transactions. + bidBatcher *bidBatcher + group *errgroup.Group // cancel holds the cancel function of the service context. cancel context.CancelFunc @@ -318,6 +337,12 @@ loop: ch <- &apclient.BidEngineStatus{ Orders: uint32(len(s.orders)), // nolint: gosec } + case req := <-s.bidSubmitCh: + s.bidBatcher.Enqueue(req) + s.bidBatcher.Flush(ctx) + case <-s.bidBatcher.Done(): + s.bidBatcher.MarkDone() + s.bidBatcher.Flush(ctx) case order := <-s.drainch: // child done key := mquery.OrderPath(order.orderID) diff --git a/cmd/provider-services/cmd/flags.go b/cmd/provider-services/cmd/flags.go index 57a58757..fcc0caf8 100644 --- a/cmd/provider-services/cmd/flags.go +++ b/cmd/provider-services/cmd/flags.go @@ -182,6 +182,11 @@ func addRunFlags(cmd *cobra.Command) error { return err } + cmd.Flags().Int(FlagBidBatchMaxMsgs, 10, fmt.Sprintf("max number of MsgCreateBid messages coalesced into a single broadcast. valid range [%d, %d]", batchMaxMsgsMin, batchMaxMsgsMax)) + if err := viper.BindPFlag(FlagBidBatchMaxMsgs, cmd.Flags().Lookup(FlagBidBatchMaxMsgs)); err != nil { + return err + } + cmd.Flags().Duration(FlagManifestTimeout, 5*time.Minute, "time after which bids are cancelled if no manifest is received") if err := viper.BindPFlag(FlagManifestTimeout, cmd.Flags().Lookup(FlagManifestTimeout)); err != nil { return err @@ -197,7 +202,7 @@ func addRunFlags(cmd *cobra.Command) error { return err } - cmd.Flags().Int(FlagWithdrawalBatchMaxMsgs, 50, fmt.Sprintf("max number of MsgWithdrawLease messages coalesced into a single broadcast. valid range [%d, %d]", withdrawalBatchMaxMsgsMin, withdrawalBatchMaxMsgsMax)) + cmd.Flags().Int(FlagWithdrawalBatchMaxMsgs, 50, fmt.Sprintf("max number of MsgWithdrawLease messages coalesced into a single broadcast. valid range [%d, %d]", batchMaxMsgsMin, batchMaxMsgsMax)) if err := viper.BindPFlag(FlagWithdrawalBatchMaxMsgs, cmd.Flags().Lookup(FlagWithdrawalBatchMaxMsgs)); err != nil { return err } diff --git a/cmd/provider-services/cmd/run.go b/cmd/provider-services/cmd/run.go index 2a35988e..6e7fb829 100644 --- a/cmd/provider-services/cmd/run.go +++ b/cmd/provider-services/cmd/run.go @@ -93,6 +93,7 @@ const ( FlagAuthPem = "auth-pem" FlagDeploymentRuntimeClass = "deployment-runtime-class" FlagBidTimeout = "bid-timeout" + FlagBidBatchMaxMsgs = "bid-batch-max-msgs" FlagManifestTimeout = "manifest-timeout" FlagMetricsListener = "metrics-listener" FlagWithdrawalPeriod = "withdrawal-period" @@ -133,10 +134,10 @@ const ( ) const ( - serviceIPOperator = "ip-operator" - serviceHostnameOperator = "hostname-operator" - withdrawalBatchMaxMsgsMin = 10 - withdrawalBatchMaxMsgsMax = 100 + serviceIPOperator = "ip-operator" + serviceHostnameOperator = "hostname-operator" + batchMaxMsgsMin = 10 + batchMaxMsgsMax = 100 ) var ( @@ -212,8 +213,12 @@ func RunCmd() *cobra.Command { return fmt.Errorf(`flag "%s" value must be > "%s"`, FlagWithdrawalPeriod, FlagLeaseFundsMonitorInterval) // nolint: err113 } - if maxMsgs := viper.GetInt(FlagWithdrawalBatchMaxMsgs); maxMsgs < withdrawalBatchMaxMsgsMin || maxMsgs > withdrawalBatchMaxMsgsMax { - return fmt.Errorf(`flag "%s" contains invalid value %d. expected range [%d, %d]`, FlagWithdrawalBatchMaxMsgs, maxMsgs, withdrawalBatchMaxMsgsMin, withdrawalBatchMaxMsgsMax) // nolint: err113 + if maxMsgs := viper.GetInt(FlagWithdrawalBatchMaxMsgs); maxMsgs < batchMaxMsgsMin || maxMsgs > batchMaxMsgsMax { + return fmt.Errorf(`flag "%s" contains invalid value %d. expected range [%d, %d]`, FlagWithdrawalBatchMaxMsgs, maxMsgs, batchMaxMsgsMin, batchMaxMsgsMax) // nolint: err113 + } + + if maxMsgs := viper.GetInt(FlagBidBatchMaxMsgs); maxMsgs < batchMaxMsgsMin || maxMsgs > batchMaxMsgsMax { + return fmt.Errorf(`flag "%s" contains invalid value %d. expected range [%d, %d]`, FlagBidBatchMaxMsgs, maxMsgs, batchMaxMsgsMin, batchMaxMsgsMax) // nolint: err113 } if viper.GetDuration(FlagMonitorRetryPeriod) < 4*time.Second { @@ -491,6 +496,9 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { blockedHostnames := viper.GetStringSlice(FlagDeploymentBlockedHostnames) deploymentRuntimeClass := viper.GetString(FlagDeploymentRuntimeClass) bidTimeout := viper.GetDuration(FlagBidTimeout) + + bidBatchMaxMsgs := viper.GetInt(FlagBidBatchMaxMsgs) + manifestTimeout := viper.GetDuration(FlagManifestTimeout) metricsListener := viper.GetString(FlagMetricsListener) providerConfig := viper.GetString(FlagProviderConfig) @@ -665,6 +673,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { config.DeploymentIngressStaticHosts = deploymentIngressStaticHosts config.DeploymentIngressDomain = deploymentIngressDomain config.BidTimeout = bidTimeout + config.BidBatchMaxMsgs = bidBatchMaxMsgs config.ManifestTimeout = manifestTimeout config.MonitorMaxRetries = monitorMaxRetries config.MonitorRetryPeriod = monitorRetryPeriod diff --git a/config.go b/config.go index 9c17e5a4..7fff75bd 100644 --- a/config.go +++ b/config.go @@ -20,6 +20,7 @@ type Config struct { BidPricingStrategy bidengine.BidPricingStrategy BidDeposit sdk.Coin BidTimeout time.Duration + BidBatchMaxMsgs int ManifestTimeout time.Duration BalanceCheckerCfg BalanceCheckerConfig Attributes attrtypes.Attributes @@ -33,6 +34,7 @@ func NewDefaultConfig() Config { return Config{ ClusterWaitReadyDuration: time.Second * 10, BidDeposit: mtypes.DefaultBidMinDeposit, + BidBatchMaxMsgs: 10, BalanceCheckerCfg: BalanceCheckerConfig{ LeaseFundsCheckInterval: 1 * time.Minute, WithdrawalPeriod: 24 * time.Hour, diff --git a/go.mod b/go.mod index 4eb128fe..1b2fde68 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/akash-network/provider -go 1.25.5 +go 1.25.9 require ( cosmossdk.io/errors v1.0.2 @@ -10,7 +10,7 @@ require ( github.com/blang/semver/v4 v4.0.0 github.com/boz/go-lifecycle v0.1.1 github.com/cometbft/cometbft v0.38.21 - github.com/cosmos/cosmos-sdk v0.53.5 + github.com/cosmos/cosmos-sdk v0.53.6 github.com/desertbit/timer v1.0.1 github.com/fsnotify/fsnotify v1.9.0 github.com/go-acme/lego/v4 v4.26.0 @@ -51,11 +51,11 @@ require ( k8s.io/client-go v0.34.1 k8s.io/code-generator v0.34.1 k8s.io/kubectl v0.33.3 - pkg.akt.dev/go v0.2.7 + pkg.akt.dev/go v0.2.11-rc2 pkg.akt.dev/go/cli v0.2.2 pkg.akt.dev/go/sdl v0.2.0 pkg.akt.dev/node v1.2.2 - pkg.akt.dev/node/v2 v2.0.0 + pkg.akt.dev/node/v2 v2.1.0-rc9 sigs.k8s.io/gateway-api v1.4.1 sigs.k8s.io/kind v0.30.0 sigs.k8s.io/structured-merge-diff/v6 v6.3.0 @@ -179,7 +179,7 @@ require ( github.com/cosmos/iavl v1.2.6 // indirect github.com/cosmos/ibc-go/v10 v10.5.0 // indirect github.com/cosmos/ics23/go v0.11.0 // indirect - github.com/cosmos/ledger-cosmos-go v0.16.0 // indirect + github.com/cosmos/ledger-cosmos-go v1.0.0 // indirect github.com/cosmos/rosetta v0.50.12 // indirect github.com/cosmos/rosetta-sdk-go v0.10.0 // indirect github.com/danieljoos/wincred v1.2.2 // indirect diff --git a/go.sum b/go.sum index 4c4b14aa..692b93ea 100644 --- a/go.sum +++ b/go.sum @@ -3883,16 +3883,16 @@ nhooyr.io/websocket v1.8.17 h1:KEVeLJkUywCKVsnLIDlD/5gtayKp8VoCkksHCGGfT9Y= nhooyr.io/websocket v1.8.17/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= pgregory.net/rapid v0.5.5 h1:jkgx1TjbQPD/feRoK+S/mXw9e1uj6WilpHrXJowi6oA= pgregory.net/rapid v0.5.5/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04= -pkg.akt.dev/go v0.2.7 h1:oQFhmjv7xUc9MLxsCmdnl52DL/VOu3r3shz51rcXnLs= -pkg.akt.dev/go v0.2.7/go.mod h1:x9Cku9yibLk4aGLTE/Yy7eEkkda0uOSRWmYwQeFw23o= +pkg.akt.dev/go v0.2.11-rc2 h1:IdNX8cwhSVrDLMchNczIwWuJ2IdmF+WSVUKO6XUlzx0= +pkg.akt.dev/go v0.2.11-rc2/go.mod h1:x9Cku9yibLk4aGLTE/Yy7eEkkda0uOSRWmYwQeFw23o= pkg.akt.dev/go/cli v0.2.2 h1:PWDAAeHkVtcZ9qE76yh4IhJ2J/42ekhwSyrGWLPGi/g= pkg.akt.dev/go/cli v0.2.2/go.mod h1:MHm9lU8hb+xQ8BX3b9c9S1pMyZKUob5tVjHXQ4T1uwU= pkg.akt.dev/go/sdl v0.2.0 h1:hY74GjN4itV92REf8HqGt1rQDtXsruzE/iIzd/FpUB8= pkg.akt.dev/go/sdl v0.2.0/go.mod h1:urd6091AWDy9YwFLRCsENuQ931qyRcg/RJBN9XCBs/E= pkg.akt.dev/node v1.2.2 h1:Xka/9sVaJTyGfyudZUc8A9UW8xC0FYcZoHvD6dm1iy0= pkg.akt.dev/node v1.2.2/go.mod h1:luWpw5dNEIMhEITMqOvjiohqIoLd/kkcjeg1aCF4EJA= -pkg.akt.dev/node/v2 v2.0.0 h1:U1K9Kce+s4qAD9VlRy22l6Mw8tUTa2rYp8SJ/78zjwM= -pkg.akt.dev/node/v2 v2.0.0/go.mod h1:srQsJA8F3SB4RkhluxLKA5roOkW2q0YeNYzEpSo7M74= +pkg.akt.dev/node/v2 v2.1.0-rc9 h1:KqYAA9QBYSi9wO4wPy9BaBa5d39eUBmmg4ISk2K3X0k= +pkg.akt.dev/node/v2 v2.1.0-rc9/go.mod h1:DKxgQLlxBL8QT+Y6KIQPnAmjrnJSiwBhvmO4fr8c7pw= pkg.akt.dev/specs v0.0.1 h1:OP0zil3Fr4kcCuybFqQ8LWgSlSP2Yn7306meWpu6/S4= pkg.akt.dev/specs v0.0.1/go.mod h1:tiFuJAqzn+lkz662lf9qaEdjdrrDr882r3YMDnWkbp4= pkg.akt.dev/testdata v0.0.1 h1:yHfqF0Uxf7Rg7WdwSggnyBWMxACtAg5VpBUVFXU+uvM= diff --git a/service.go b/service.go index 175ed211..56713b20 100644 --- a/service.go +++ b/service.go @@ -90,6 +90,7 @@ func NewService(ctx context.Context, PricingStrategy: cfg.BidPricingStrategy, Deposit: cfg.BidDeposit, BidTimeout: cfg.BidTimeout, + BidBatchMaxMsgs: cfg.BidBatchMaxMsgs, Attributes: cfg.Attributes, MaxGroupVolumes: cfg.MaxGroupVolumes, }) diff --git a/withdraw_batcher.go b/withdraw_batcher.go index 639d2f33..921fdff4 100644 --- a/withdraw_batcher.go +++ b/withdraw_batcher.go @@ -105,11 +105,11 @@ func (b *withdrawBatcher) Pending() int { func (b *withdrawBatcher) Flush(ctx context.Context) bool { b.enter() defer b.exit() - if b.inFlight { - b.log.Debug("batcher: flush skipped (in-flight)", "pending", len(b.pending)) + if len(b.pending) == 0 { return false } - if len(b.pending) == 0 { + if b.inFlight { + b.log.Debug("batcher: flush skipped (in-flight)", "pending", len(b.pending)) return false }