From 738233e5bbcef53ca4cd856d0c214465fc47dfe5 Mon Sep 17 00:00:00 2001 From: Artem Shcherbatiuk Date: Fri, 24 Apr 2026 13:15:30 +0200 Subject: [PATCH 1/5] chore: wip --- bidengine/bid_batcher.go | 169 +++++++++++++++++++++++++++++ bidengine/config.go | 11 +- bidengine/order.go | 26 +++-- bidengine/service.go | 53 ++++++--- cmd/provider-services/cmd/flags.go | 5 + cmd/provider-services/cmd/run.go | 6 + config.go | 1 + service.go | 1 + 8 files changed, 244 insertions(+), 28 deletions(-) create mode 100644 bidengine/bid_batcher.go diff --git a/bidengine/bid_batcher.go b/bidengine/bid_batcher.go new file mode 100644 index 00000000..1a15f839 --- /dev/null +++ b/bidengine/bid_batcher.go @@ -0,0 +1,169 @@ +package bidengine + +import ( + "context" + "fmt" + "regexp" + "slices" + "strconv" + "time" + + "cosmossdk.io/log" + sdk "github.com/cosmos/cosmos-sdk/types" + + aclient "pkg.akt.dev/go/node/client/v1beta3" +) + +// bidRequest is a single MsgCreateBid submission with a reply channel. +type bidRequest struct { + msg sdk.Msg + replyCh chan error +} + +// reMsgIndex matches the Cosmos SDK error format "message index: N" to identify +// which message in a multi-msg tx failed. +var reMsgIndex = regexp.MustCompile(`message index:\s*(\d+)`) + +// parseMsgFailIndex extracts the 0-based failing message index from a Cosmos SDK +// tx error. Returns -1 if the error does not identify a specific message. +func parseMsgFailIndex(err error) int { + m := reMsgIndex.FindStringSubmatch(err.Error()) + if m == nil { + return -1 + } + idx, e := strconv.Atoi(m[1]) + if e != nil { + return -1 + } + return idx +} + +// bidBatcher coalesces MsgCreateBid requests into single multi-msg transactions +// using opportunistic in-flight batching. +// +// On broadcast failure, it parses the Cosmos SDK error to find which message +// failed, fans the error to that caller, removes it, and retries the remaining +// messages. This continues until all messages are resolved individually. +// +// Not safe for concurrent use. All methods must be called from service.run(). +type bidBatcher struct { + tx aclient.TxClient + log log.Logger + timeout time.Duration + maxMsgs int + + pending []bidRequest + inFlight bool + doneCh chan struct{} +} + +func newBidBatcher(tx aclient.TxClient, logger log.Logger, timeout time.Duration, maxMsgs int) *bidBatcher { + if maxMsgs < 1 { + panic(fmt.Sprintf("bidBatcher: maxMsgs must be >= 1, got %d", maxMsgs)) + } + return &bidBatcher{ + tx: tx, + log: logger, + timeout: timeout, + maxMsgs: maxMsgs, + doneCh: make(chan struct{}, 1), + } +} + +func (b *bidBatcher) InFlight() bool { + return b.inFlight +} + +func (b *bidBatcher) Pending() int { + return len(b.pending) +} + +func (b *bidBatcher) Enqueue(req bidRequest) { + b.pending = append(b.pending, req) + b.log.Debug("bid batcher: enqueue", "pending", len(b.pending), "inFlight", b.inFlight) +} + +// Flush starts a broadcast with up to maxMsgs pending requests when idle. +// Returns true if a broadcast was started. +func (b *bidBatcher) Flush(ctx context.Context) bool { + if b.inFlight { + b.log.Debug("bid batcher: flush skipped (in-flight)", "pending", len(b.pending)) + return false + } + if len(b.pending) == 0 { + return false + } + + n := len(b.pending) + if n > b.maxMsgs { + n = b.maxMsgs + } + + batch := make([]bidRequest, n) + copy(batch, b.pending[:n]) + b.pending = b.pending[n:] + b.inFlight = true + + b.log.Info("bid batcher: flush", "batch", n, "remaining", len(b.pending), "maxMsgs", b.maxMsgs) + + go func() { + b.broadcastWithRetry(ctx, batch) + select { + case <-ctx.Done(): + case b.doneCh <- struct{}{}: + } + }() + + return true +} + +// Done returns a channel that signals when the current batch is fully resolved. +// Call MarkDone after receiving, then Flush to start the next batch. +func (b *bidBatcher) Done() <-chan struct{} { + return b.doneCh +} + +// MarkDone clears the in-flight flag. Must be called after receiving from Done(). +func (b *bidBatcher) MarkDone() { + b.inFlight = false +} + +// broadcastWithRetry broadcasts remaining requests, removing individual failures +// by parsing the Cosmos SDK "message index: N" error. Each resolved request +// receives its own error or nil via its replyCh. +func (b *bidBatcher) broadcastWithRetry(ctx context.Context, batch []bidRequest) { + remaining := batch + + for len(remaining) > 0 { + msgs := make([]sdk.Msg, len(remaining)) + for i, req := range remaining { + msgs[i] = req.msg + } + + broadcastCtx, cancel := context.WithTimeout(ctx, b.timeout) + _, err := b.tx.BroadcastMsgs(broadcastCtx, msgs, aclient.WithResultCodeAsError(), aclient.WithPriority()) + cancel() + + if err == nil { + b.log.Info("bid batcher: batch succeeded", "count", len(remaining)) + for _, req := range remaining { + req.replyCh <- nil + } + return + } + + idx := parseMsgFailIndex(err) + if idx < 0 || idx >= len(remaining) { + // Error is not message-specific (e.g. network/sequence error): fail all. + b.log.Error("bid batcher: unrecoverable batch error", "err", err, "remaining", len(remaining)) + for _, req := range remaining { + req.replyCh <- err + } + return + } + + b.log.Error("bid batcher: message failed, retrying remainder", "idx", idx, "err", err, "remaining", len(remaining)-1) + remaining[idx].replyCh <- err + remaining = slices.Delete(remaining, idx, idx+1) + } +} diff --git a/bidengine/config.go b/bidengine/config.go index e37b1ece..ffb53cb9 100644 --- a/bidengine/config.go +++ b/bidengine/config.go @@ -10,9 +10,10 @@ import ( // Config represents the configuration parameters for the bid engine. // It controls pricing, deposits, timeouts and provider capabilities and attributes type Config struct { - PricingStrategy BidPricingStrategy - Deposit sdk.Coin - BidTimeout time.Duration - Attributes atttypes.Attributes - MaxGroupVolumes int + PricingStrategy BidPricingStrategy + Deposit sdk.Coin + BidTimeout time.Duration + BidBatchMaxMsgs int + Attributes atttypes.Attributes + MaxGroupVolumes int } diff --git a/bidengine/order.go b/bidengine/order.go index e3f1bd1c..eddffb16 100644 --- a/bidengine/order.go +++ b/bidengine/order.go @@ -53,6 +53,9 @@ type order struct { // reservationFulfilledNotify is the channel to notify when resources are reserved. reservationFulfilledNotify chan<- int + // bidSubmitCh is the service-level channel for submitting MsgCreateBid for batching. + bidSubmitCh chan<- bidRequest + // log is the logger instance log log.Logger @@ -124,6 +127,7 @@ func newOrderInternal(svc *service, oid mtypes.OrderID, cfg Config, pass Provide log: log, lc: lifecycle.New(), reservationFulfilledNotify: reservationFulfilledNotify, // Normally nil in production + bidSubmitCh: svc.bidSubmitCh, pass: pass, } @@ -182,8 +186,8 @@ func (o *order) run(checkForExistingBid bool) { storedGroupCh <-chan runner.Result // Channel for receiving cluster reservation result. clusterch <-chan runner.Result - // Channel for receiving bid creation transaction result. - bidch <-chan runner.Result + // Channel for receiving bid creation transaction result (reply from bidBatcher). + bidch <-chan error // Channel for receiving bid price calculation result. pricech <-chan runner.Result // Channel for receiving existing bid query result. @@ -430,20 +434,24 @@ loop: offer := mvbeta.ResourceOfferFromRU(reservation.GetAllocatedResources()) - // Begin submitting fulfillment + // Enqueue MsgCreateBid in the service-level bid batcher. msg = mvbeta.NewMsgCreateBid(mtypes.MakeBidID(o.orderID, o.session.Provider().Address()), price, deposit.Deposit{ Amount: o.cfg.Deposit, Sources: deposit.Sources{deposit.SourceBalance}, }, offer) - bidch = runner.Do(func() runner.Result { - return runner.NewResult(o.session.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError(), aclient.WithPriority())) - }) + replyCh := make(chan error, 1) + select { + case o.bidSubmitCh <- bidRequest{msg: msg, replyCh: replyCh}: + bidch = replyCh + case <-o.lc.ShutdownRequest(): + break loop + } - case result := <-bidch: + case err := <-bidch: bidch = nil - if result.Error() != nil { + if err != nil { bidCounter.WithLabelValues(metricsutils.OpenLabel, metricsutils.FailLabel).Inc() - o.log.Error("bid failed", "error", result.Error()) + o.log.Error("bid failed", "error", err) break loop } diff --git a/bidengine/service.go b/bidengine/service.go index c0bc06ba..991238e1 100644 --- a/bidengine/service.go +++ b/bidengine/service.go @@ -3,6 +3,7 @@ package bidengine import ( "context" "errors" + "time" "github.com/boz/go-lifecycle" "github.com/prometheus/client_golang/prometheus" @@ -78,21 +79,34 @@ func NewService( ctx, cancel := context.WithCancel(pctx) group, _ := errgroup.WithContext(ctx) + const bidBroadcastTimeout = 30 * time.Second + + if cfg.BidBatchMaxMsgs <= 0 { + cfg.BidBatchMaxMsgs = 10 + } + s := &service{ - session: session, - cluster: cluster, - bus: bus, - sub: sub, - statusch: make(chan chan<- *apclient.BidEngineStatus), - orders: make(map[string]*order), - drainch: make(chan *order), - ordersch: make(chan []mtypes.OrderID, 1000), - group: group, - cancel: cancel, - lc: lifecycle.New(), - cfg: cfg, - pass: providerAttrService, - waiter: waiter, + session: session, + cluster: cluster, + bus: bus, + sub: sub, + statusch: make(chan chan<- *apclient.BidEngineStatus), + orders: make(map[string]*order), + drainch: make(chan *order), + ordersch: make(chan []mtypes.OrderID, 1000), + bidSubmitCh: make(chan bidRequest, 100), + bidBatcher: newBidBatcher( + session.Client().Tx(), + session.Log().With("cmp", "bid-batcher"), + bidBroadcastTimeout, + cfg.BidBatchMaxMsgs, + ), + group: group, + cancel: cancel, + lc: lifecycle.New(), + cfg: cfg, + pass: providerAttrService, + waiter: waiter, } go s.lc.WatchContext(pctx) @@ -128,6 +142,11 @@ type service struct { // ordersch receives new orders to process from the blockchain. ordersch chan []mtypes.OrderID + // bidSubmitCh receives MsgCreateBid requests from order goroutines. + bidSubmitCh chan bidRequest + // bidBatcher coalesces MsgCreateBid requests into batched transactions. + bidBatcher *bidBatcher + group *errgroup.Group // cancel holds the cancel function of the service context. cancel context.CancelFunc @@ -318,6 +337,12 @@ loop: ch <- &apclient.BidEngineStatus{ Orders: uint32(len(s.orders)), // nolint: gosec } + case req := <-s.bidSubmitCh: + s.bidBatcher.Enqueue(req) + s.bidBatcher.Flush(ctx) + case <-s.bidBatcher.Done(): + s.bidBatcher.MarkDone() + s.bidBatcher.Flush(ctx) case order := <-s.drainch: // child done key := mquery.OrderPath(order.orderID) diff --git a/cmd/provider-services/cmd/flags.go b/cmd/provider-services/cmd/flags.go index a03adad5..ae27f7f6 100644 --- a/cmd/provider-services/cmd/flags.go +++ b/cmd/provider-services/cmd/flags.go @@ -181,6 +181,11 @@ func addRunFlags(cmd *cobra.Command) error { return err } + cmd.Flags().Int(FlagBidBatchMaxMsgs, 10, "max number of MsgCreateBid messages coalesced into a single broadcast. valid range [1, 50]") + if err := viper.BindPFlag(FlagBidBatchMaxMsgs, cmd.Flags().Lookup(FlagBidBatchMaxMsgs)); err != nil { + return err + } + cmd.Flags().Duration(FlagManifestTimeout, 5*time.Minute, "time after which bids are cancelled if no manifest is received") if err := viper.BindPFlag(FlagManifestTimeout, cmd.Flags().Lookup(FlagManifestTimeout)); err != nil { return err diff --git a/cmd/provider-services/cmd/run.go b/cmd/provider-services/cmd/run.go index a77d4659..5aaad63a 100644 --- a/cmd/provider-services/cmd/run.go +++ b/cmd/provider-services/cmd/run.go @@ -93,6 +93,7 @@ const ( FlagAuthPem = "auth-pem" FlagDeploymentRuntimeClass = "deployment-runtime-class" FlagBidTimeout = "bid-timeout" + FlagBidBatchMaxMsgs = "bid-batch-max-msgs" FlagManifestTimeout = "manifest-timeout" FlagMetricsListener = "metrics-listener" FlagWithdrawalPeriod = "withdrawal-period" @@ -489,6 +490,10 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { blockedHostnames := viper.GetStringSlice(FlagDeploymentBlockedHostnames) deploymentRuntimeClass := viper.GetString(FlagDeploymentRuntimeClass) bidTimeout := viper.GetDuration(FlagBidTimeout) + bidBatchMaxMsgs := viper.GetInt(FlagBidBatchMaxMsgs) + if bidBatchMaxMsgs < 1 || bidBatchMaxMsgs > 50 { + return fmt.Errorf(`flag "%s" contains invalid value %d. expected range [1, 50]`, FlagBidBatchMaxMsgs, bidBatchMaxMsgs) // nolint: err113 + } manifestTimeout := viper.GetDuration(FlagManifestTimeout) metricsListener := viper.GetString(FlagMetricsListener) providerConfig := viper.GetString(FlagProviderConfig) @@ -663,6 +668,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { config.DeploymentIngressStaticHosts = deploymentIngressStaticHosts config.DeploymentIngressDomain = deploymentIngressDomain config.BidTimeout = bidTimeout + config.BidBatchMaxMsgs = bidBatchMaxMsgs config.ManifestTimeout = manifestTimeout config.MonitorMaxRetries = monitorMaxRetries config.MonitorRetryPeriod = monitorRetryPeriod diff --git a/config.go b/config.go index 9c17e5a4..1a5e205b 100644 --- a/config.go +++ b/config.go @@ -20,6 +20,7 @@ type Config struct { BidPricingStrategy bidengine.BidPricingStrategy BidDeposit sdk.Coin BidTimeout time.Duration + BidBatchMaxMsgs int ManifestTimeout time.Duration BalanceCheckerCfg BalanceCheckerConfig Attributes attrtypes.Attributes diff --git a/service.go b/service.go index 175ed211..56713b20 100644 --- a/service.go +++ b/service.go @@ -90,6 +90,7 @@ func NewService(ctx context.Context, PricingStrategy: cfg.BidPricingStrategy, Deposit: cfg.BidDeposit, BidTimeout: cfg.BidTimeout, + BidBatchMaxMsgs: cfg.BidBatchMaxMsgs, Attributes: cfg.Attributes, MaxGroupVolumes: cfg.MaxGroupVolumes, }) From bb9bada25a97778c1bd7dddc692c9e7bbe77cf6e Mon Sep 17 00:00:00 2001 From: Artem Shcherbatiuk Date: Mon, 27 Apr 2026 16:29:53 +0200 Subject: [PATCH 2/5] feat: createBid batching --- bidengine/bid_batcher_test.go | 280 +++++++++++++++++++++++++++++++ cmd/provider-services/cmd/run.go | 81 ++++----- 2 files changed, 322 insertions(+), 39 deletions(-) create mode 100644 bidengine/bid_batcher_test.go diff --git a/bidengine/bid_batcher_test.go b/bidengine/bid_batcher_test.go new file mode 100644 index 00000000..d673314b --- /dev/null +++ b/bidengine/bid_batcher_test.go @@ -0,0 +1,280 @@ +package bidengine + +import ( + "context" + "errors" + "fmt" + "io" + "testing" + "time" + + "cosmossdk.io/log" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + sdk "github.com/cosmos/cosmos-sdk/types" + + clientmocks "pkg.akt.dev/go/mocks/node/client" +) + +func testBidLogger() log.Logger { return log.NewLogger(io.Discard) } + +func newBidReq() bidRequest { + return bidRequest{ + msg: &sdk.TxResponse{}, // placeholder non-nil msg + replyCh: make(chan error, 1), + } +} + +func waitReply(t *testing.T, req bidRequest) error { + t.Helper() + select { + case err := <-req.replyCh: + return err + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for reply") + return nil + } +} + +func waitDone(t *testing.T, b *bidBatcher) { + t.Helper() + select { + case <-b.Done(): + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for batcher done") + } +} + +// bidFixture sets up a bidBatcher with a controllable mock TxClient. +type bidFixture struct { + t *testing.T + tx *clientmocks.TxClient + batcher *bidBatcher + release chan struct{} + captured chan []sdk.Msg +} + +func newBidFixture(t *testing.T, maxMsgs int, block bool) *bidFixture { + t.Helper() + f := &bidFixture{ + t: t, + tx: &clientmocks.TxClient{}, + release: make(chan struct{}), + captured: make(chan []sdk.Msg, 16), + } + f.tx.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + f.captured <- args.Get(1).([]sdk.Msg) + if block { + <-f.release + } + }). + Return(&sdk.TxResponse{}, nil) + f.batcher = newBidBatcher(f.tx, testBidLogger(), time.Second, maxMsgs) + return f +} + +func (f *bidFixture) waitCaptured() []sdk.Msg { + f.t.Helper() + select { + case msgs := <-f.captured: + return msgs + case <-time.After(2 * time.Second): + f.t.Fatal("timed out waiting for broadcast") + return nil + } +} + +// --- parseMsgFailIndex --- + +func TestParseMsgFailIndex_valid(t *testing.T) { + err := errors.New("tx failed: message index: 2: insufficient funds") + require.Equal(t, 2, parseMsgFailIndex(err)) +} + +func TestParseMsgFailIndex_zero(t *testing.T) { + err := errors.New("message index: 0: bad") + require.Equal(t, 0, parseMsgFailIndex(err)) +} + +func TestParseMsgFailIndex_no_index(t *testing.T) { + err := errors.New("out of gas") + require.Equal(t, -1, parseMsgFailIndex(err)) +} + +func TestParseMsgFailIndex_whitespace(t *testing.T) { + err := errors.New("message index: 7: bad msg") + require.Equal(t, 7, parseMsgFailIndex(err)) +} + +// --- Flush / Done / MarkDone state machine --- + +func TestBidBatcher_idle_flush_succeeds(t *testing.T) { + f := newBidFixture(t, 10, false) + req := newBidReq() + + f.batcher.Enqueue(req) + require.True(t, f.batcher.Flush(context.Background())) + + msgs := f.waitCaptured() + require.Len(t, msgs, 1) + require.NoError(t, waitReply(t, req)) + waitDone(t, f.batcher) +} + +func TestBidBatcher_flush_when_empty_is_noop(t *testing.T) { + b := newBidBatcher(&clientmocks.TxClient{}, testBidLogger(), time.Second, 10) + require.False(t, b.Flush(context.Background())) +} + +func TestBidBatcher_flush_when_in_flight_is_noop(t *testing.T) { + f := newBidFixture(t, 10, true) + req1 := newBidReq() + req2 := newBidReq() + + f.batcher.Enqueue(req1) + require.True(t, f.batcher.Flush(context.Background())) + f.waitCaptured() + + // Now in-flight: second Flush must return false. + f.batcher.Enqueue(req2) + require.False(t, f.batcher.Flush(context.Background())) + + // Unblock and drain. + close(f.release) + waitDone(t, f.batcher) + require.NoError(t, waitReply(t, req1)) +} + +func TestBidBatcher_respects_max_msgs(t *testing.T) { + f := newBidFixture(t, 2, false) + reqs := make([]bidRequest, 5) + for i := range reqs { + reqs[i] = newBidReq() + f.batcher.Enqueue(reqs[i]) + } + + require.True(t, f.batcher.Flush(context.Background())) + msgs := f.waitCaptured() + require.Len(t, msgs, 2) + require.Equal(t, 3, f.batcher.Pending()) + + waitDone(t, f.batcher) + f.batcher.MarkDone() + + // Second flush sends next 2. + require.True(t, f.batcher.Flush(context.Background())) + msgs = f.waitCaptured() + require.Len(t, msgs, 2) +} + +func TestBidBatcher_all_replies_nil_on_success(t *testing.T) { + f := newBidFixture(t, 10, false) + reqs := make([]bidRequest, 3) + for i := range reqs { + reqs[i] = newBidReq() + f.batcher.Enqueue(reqs[i]) + } + f.batcher.Flush(context.Background()) + f.waitCaptured() + waitDone(t, f.batcher) + + for _, req := range reqs { + require.NoError(t, waitReply(t, req)) + } +} + +// --- Error handling --- + +func TestBidBatcher_unrecoverable_error_fails_all(t *testing.T) { + tx := &clientmocks.TxClient{} + broadcastErr := errors.New("out of gas") + tx.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything). + Return(nil, broadcastErr) + + b := newBidBatcher(tx, testBidLogger(), time.Second, 10) + reqs := []bidRequest{newBidReq(), newBidReq(), newBidReq()} + for _, r := range reqs { + b.Enqueue(r) + } + b.Flush(context.Background()) + waitDone(t, b) + + for _, req := range reqs { + err := waitReply(t, req) + require.ErrorIs(t, err, broadcastErr) + } +} + +func TestBidBatcher_smart_retry_removes_failing_msg(t *testing.T) { + tx := &clientmocks.TxClient{} + badErr := fmt.Errorf("tx failed: message index: 1: bad msg") + // First broadcast of 3 msgs fails at index 1. + tx.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything). + Return(nil, badErr).Once() + // Retry of remaining 2 msgs (index 0 and 2 from original) succeeds. + tx.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything). + Return(&sdk.TxResponse{}, nil).Once() + + b := newBidBatcher(tx, testBidLogger(), time.Second, 10) + req0 := newBidReq() + req1 := newBidReq() + req2 := newBidReq() + b.Enqueue(req0) + b.Enqueue(req1) + b.Enqueue(req2) + b.Flush(context.Background()) + waitDone(t, b) + + require.NoError(t, waitReply(t, req0)) + require.ErrorIs(t, waitReply(t, req1), badErr) + require.NoError(t, waitReply(t, req2)) +} + +func TestBidBatcher_smart_retry_cascade(t *testing.T) { + tx := &clientmocks.TxClient{} + err0 := fmt.Errorf("message index: 0: bad") + err1 := fmt.Errorf("message index: 0: bad") + // Broadcast 3 msgs: index 0 fails. + tx.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything). + Return(nil, err0).Once() + // Retry 2 remaining: index 0 (originally index 1) fails. + tx.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything). + Return(nil, err1).Once() + // Retry 1 remaining: succeeds. + tx.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything). + Return(&sdk.TxResponse{}, nil).Once() + + b := newBidBatcher(tx, testBidLogger(), time.Second, 10) + reqs := []bidRequest{newBidReq(), newBidReq(), newBidReq()} + for _, r := range reqs { + b.Enqueue(r) + } + b.Flush(context.Background()) + waitDone(t, b) + + require.ErrorIs(t, waitReply(t, reqs[0]), err0) + require.ErrorIs(t, waitReply(t, reqs[1]), err1) + require.NoError(t, waitReply(t, reqs[2])) +} + +func TestBidBatcher_context_cancel_stops_broadcast(t *testing.T) { + tx := &clientmocks.TxClient{} + started := make(chan struct{}) + tx.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { close(started) }). + Return(nil, context.Canceled) + + ctx, cancel := context.WithCancel(context.Background()) + b := newBidBatcher(tx, testBidLogger(), time.Second, 10) + req := newBidReq() + b.Enqueue(req) + b.Flush(ctx) + + <-started + cancel() + + waitDone(t, b) + require.ErrorIs(t, waitReply(t, req), context.Canceled) +} diff --git a/cmd/provider-services/cmd/run.go b/cmd/provider-services/cmd/run.go index 5aaad63a..60f6f3fc 100644 --- a/cmd/provider-services/cmd/run.go +++ b/cmd/provider-services/cmd/run.go @@ -94,43 +94,46 @@ const ( FlagDeploymentRuntimeClass = "deployment-runtime-class" FlagBidTimeout = "bid-timeout" FlagBidBatchMaxMsgs = "bid-batch-max-msgs" - FlagManifestTimeout = "manifest-timeout" - FlagMetricsListener = "metrics-listener" - FlagWithdrawalPeriod = "withdrawal-period" - FlagWithdrawalBatchMaxMsgs = "withdrawal-batch-max-msgs" - FlagLeaseFundsMonitorInterval = "lease-funds-monitor-interval" - FlagMinimumBalance = "minimum-balance" - FlagProviderConfig = "provider-config" - FlagCachedResultMaxAge = "cached-result-max-age" - FlagRPCQueryTimeout = "rpc-query-timeout" - FlagBidPriceIPScale = "bid-price-ip-scale" - FlagEnableIPOperator = "ip-operator" - FlagTxBroadcastTimeout = "tx-broadcast-timeout" - FlagMonitorMaxRetries = "monitor-max-retries" - FlagMonitorRetryPeriod = "monitor-retry-period" - FlagMonitorRetryPeriodJitter = "monitor-retry-period-jitter" - FlagMonitorHealthcheckPeriod = "monitor-healthcheck-period" - FlagMonitorHealthcheckPeriodJitter = "monitor-healthcheck-period-jitter" - FlagPersistentConfigBackend = "persistent-config-backend" - FlagPersistentConfigPath = "persistent-config-path" - FlagGatewayTLSCert = "gateway-tls-cert" - FlagGatewayTLSKey = "gateway-tls-key" - FlagCertIssuerEnabled = "cert-issuer-enabled" - FlagCertIssuerKID = "cert-issuer-kid" - FlagCertIssuerHMAC = "cert-issuer-hmac" - FlagCertIssuerStorageDir = "cert-issuer-storage-dir" - FlagCertIssuerCADirURL = "cert-issuer-ca-dir-url" - FlagCertIssuerHTTPChallengePort = "cert-issuer-http-challenge-port" - FlagCertIssuerTLSChallengePort = "cert-issuer-tls-challenge-port" - FlagCertIssuerDNSProviders = "cert-issuer-dns-providers" - FlagCertIssuerDNSResolvers = "cert-issuer-dns-resolvers" - FlagCertIssuerEmail = "cert-issuer-email" - FlagMigrationsEnabled = "migrations-enabled" - FlagMigrationsStatePath = "migrations-state-path" - FlagIngressMode = "ingress-mode" - FlagGatewayName = "gateway-name" - FlagGatewayNamespace = "gateway-namespace" - FlagGatewayProvider = "gateway-provider" + + bidBatchMaxMsgsMin = 1 + bidBatchMaxMsgsMax = 50 + FlagManifestTimeout = "manifest-timeout" + FlagMetricsListener = "metrics-listener" + FlagWithdrawalPeriod = "withdrawal-period" + FlagWithdrawalBatchMaxMsgs = "withdrawal-batch-max-msgs" + FlagLeaseFundsMonitorInterval = "lease-funds-monitor-interval" + FlagMinimumBalance = "minimum-balance" + FlagProviderConfig = "provider-config" + FlagCachedResultMaxAge = "cached-result-max-age" + FlagRPCQueryTimeout = "rpc-query-timeout" + FlagBidPriceIPScale = "bid-price-ip-scale" + FlagEnableIPOperator = "ip-operator" + FlagTxBroadcastTimeout = "tx-broadcast-timeout" + FlagMonitorMaxRetries = "monitor-max-retries" + FlagMonitorRetryPeriod = "monitor-retry-period" + FlagMonitorRetryPeriodJitter = "monitor-retry-period-jitter" + FlagMonitorHealthcheckPeriod = "monitor-healthcheck-period" + FlagMonitorHealthcheckPeriodJitter = "monitor-healthcheck-period-jitter" + FlagPersistentConfigBackend = "persistent-config-backend" + FlagPersistentConfigPath = "persistent-config-path" + FlagGatewayTLSCert = "gateway-tls-cert" + FlagGatewayTLSKey = "gateway-tls-key" + FlagCertIssuerEnabled = "cert-issuer-enabled" + FlagCertIssuerKID = "cert-issuer-kid" + FlagCertIssuerHMAC = "cert-issuer-hmac" + FlagCertIssuerStorageDir = "cert-issuer-storage-dir" + FlagCertIssuerCADirURL = "cert-issuer-ca-dir-url" + FlagCertIssuerHTTPChallengePort = "cert-issuer-http-challenge-port" + FlagCertIssuerTLSChallengePort = "cert-issuer-tls-challenge-port" + FlagCertIssuerDNSProviders = "cert-issuer-dns-providers" + FlagCertIssuerDNSResolvers = "cert-issuer-dns-resolvers" + FlagCertIssuerEmail = "cert-issuer-email" + FlagMigrationsEnabled = "migrations-enabled" + FlagMigrationsStatePath = "migrations-state-path" + FlagIngressMode = "ingress-mode" + FlagGatewayName = "gateway-name" + FlagGatewayNamespace = "gateway-namespace" + FlagGatewayProvider = "gateway-provider" ) const ( @@ -491,8 +494,8 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { deploymentRuntimeClass := viper.GetString(FlagDeploymentRuntimeClass) bidTimeout := viper.GetDuration(FlagBidTimeout) bidBatchMaxMsgs := viper.GetInt(FlagBidBatchMaxMsgs) - if bidBatchMaxMsgs < 1 || bidBatchMaxMsgs > 50 { - return fmt.Errorf(`flag "%s" contains invalid value %d. expected range [1, 50]`, FlagBidBatchMaxMsgs, bidBatchMaxMsgs) // nolint: err113 + if bidBatchMaxMsgs < bidBatchMaxMsgsMin || bidBatchMaxMsgs > bidBatchMaxMsgsMax { + return fmt.Errorf(`flag "%s" contains invalid value %d. expected range [%d, %d]`, FlagBidBatchMaxMsgs, bidBatchMaxMsgs, bidBatchMaxMsgsMin, bidBatchMaxMsgsMax) // nolint: err113 } manifestTimeout := viper.GetDuration(FlagManifestTimeout) metricsListener := viper.GetString(FlagMetricsListener) From 4cff6e255b459db7b729155979dee74e4604162d Mon Sep 17 00:00:00 2001 From: Artem Shcherbatiuk Date: Tue, 28 Apr 2026 16:22:19 +0200 Subject: [PATCH 3/5] chore: better config handling --- bidengine/bid_batcher.go | 55 +++++++++++------ bidengine/bid_batcher_test.go | 41 +++++++------ bidengine/service.go | 2 +- cmd/provider-services/cmd/flags.go | 4 +- cmd/provider-services/cmd/run.go | 98 +++++++++++++++--------------- config.go | 1 + 6 files changed, 111 insertions(+), 90 deletions(-) diff --git a/bidengine/bid_batcher.go b/bidengine/bid_batcher.go index 1a15f839..336ab7b5 100644 --- a/bidengine/bid_batcher.go +++ b/bidengine/bid_batcher.go @@ -6,6 +6,7 @@ import ( "regexp" "slices" "strconv" + "sync/atomic" "time" "cosmossdk.io/log" @@ -45,18 +46,31 @@ func parseMsgFailIndex(err error) int { // failed, fans the error to that caller, removes it, and retries the remaining // messages. This continues until all messages are resolved individually. // -// Not safe for concurrent use. All methods must be called from service.run(). +// Not safe for concurrent use. All public methods must be called from a single +// goroutine. Concurrent calls panic to surface developer mistakes early. type bidBatcher struct { tx aclient.TxClient log log.Logger timeout time.Duration maxMsgs int + inUse atomic.Bool + pending []bidRequest inFlight bool doneCh chan struct{} } +func (b *bidBatcher) enter() { + if b.inUse.Swap(true) { + panic("bidBatcher: concurrent use detected") + } +} + +func (b *bidBatcher) exit() { + b.inUse.Store(false) +} + func newBidBatcher(tx aclient.TxClient, logger log.Logger, timeout time.Duration, maxMsgs int) *bidBatcher { if maxMsgs < 1 { panic(fmt.Sprintf("bidBatcher: maxMsgs must be >= 1, got %d", maxMsgs)) @@ -71,14 +85,20 @@ func newBidBatcher(tx aclient.TxClient, logger log.Logger, timeout time.Duration } func (b *bidBatcher) InFlight() bool { + b.enter() + defer b.exit() return b.inFlight } func (b *bidBatcher) Pending() int { + b.enter() + defer b.exit() return len(b.pending) } func (b *bidBatcher) Enqueue(req bidRequest) { + b.enter() + defer b.exit() b.pending = append(b.pending, req) b.log.Debug("bid batcher: enqueue", "pending", len(b.pending), "inFlight", b.inFlight) } @@ -86,6 +106,8 @@ func (b *bidBatcher) Enqueue(req bidRequest) { // Flush starts a broadcast with up to maxMsgs pending requests when idle. // Returns true if a broadcast was started. func (b *bidBatcher) Flush(ctx context.Context) bool { + b.enter() + defer b.exit() if b.inFlight { b.log.Debug("bid batcher: flush skipped (in-flight)", "pending", len(b.pending)) return false @@ -94,10 +116,7 @@ func (b *bidBatcher) Flush(ctx context.Context) bool { return false } - n := len(b.pending) - if n > b.maxMsgs { - n = b.maxMsgs - } + n := min(len(b.pending), b.maxMsgs) batch := make([]bidRequest, n) copy(batch, b.pending[:n]) @@ -125,6 +144,8 @@ func (b *bidBatcher) Done() <-chan struct{} { // MarkDone clears the in-flight flag. Must be called after receiving from Done(). func (b *bidBatcher) MarkDone() { + b.enter() + defer b.exit() b.inFlight = false } @@ -132,11 +153,9 @@ func (b *bidBatcher) MarkDone() { // by parsing the Cosmos SDK "message index: N" error. Each resolved request // receives its own error or nil via its replyCh. func (b *bidBatcher) broadcastWithRetry(ctx context.Context, batch []bidRequest) { - remaining := batch - - for len(remaining) > 0 { - msgs := make([]sdk.Msg, len(remaining)) - for i, req := range remaining { + for len(batch) > 0 { + msgs := make([]sdk.Msg, len(batch)) + for i, req := range batch { msgs[i] = req.msg } @@ -145,25 +164,25 @@ func (b *bidBatcher) broadcastWithRetry(ctx context.Context, batch []bidRequest) cancel() if err == nil { - b.log.Info("bid batcher: batch succeeded", "count", len(remaining)) - for _, req := range remaining { + b.log.Info("bid batcher: batch succeeded", "count", len(batch)) + for _, req := range batch { req.replyCh <- nil } return } idx := parseMsgFailIndex(err) - if idx < 0 || idx >= len(remaining) { + if idx < 0 || idx >= len(batch) { // Error is not message-specific (e.g. network/sequence error): fail all. - b.log.Error("bid batcher: unrecoverable batch error", "err", err, "remaining", len(remaining)) - for _, req := range remaining { + b.log.Error("bid batcher: unrecoverable batch error", "err", err, "remaining", len(batch)) + for _, req := range batch { req.replyCh <- err } return } - b.log.Error("bid batcher: message failed, retrying remainder", "idx", idx, "err", err, "remaining", len(remaining)-1) - remaining[idx].replyCh <- err - remaining = slices.Delete(remaining, idx, idx+1) + b.log.Error("bid batcher: message failed, retrying remainder", "idx", idx, "err", err, "remaining", len(batch)-1) + batch[idx].replyCh <- err + batch = slices.Delete(batch, idx, idx+1) } } diff --git a/bidengine/bid_batcher_test.go b/bidengine/bid_batcher_test.go index d673314b..133b1ca6 100644 --- a/bidengine/bid_batcher_test.go +++ b/bidengine/bid_batcher_test.go @@ -17,6 +17,8 @@ import ( clientmocks "pkg.akt.dev/go/mocks/node/client" ) +const twoSecondsTestTimeout = 2 * time.Second + func testBidLogger() log.Logger { return log.NewLogger(io.Discard) } func newBidReq() bidRequest { @@ -31,7 +33,7 @@ func waitReply(t *testing.T, req bidRequest) error { select { case err := <-req.replyCh: return err - case <-time.After(2 * time.Second): + case <-time.After(twoSecondsTestTimeout): t.Fatal("timed out waiting for reply") return nil } @@ -41,7 +43,7 @@ func waitDone(t *testing.T, b *bidBatcher) { t.Helper() select { case <-b.Done(): - case <-time.After(2 * time.Second): + case <-time.After(twoSecondsTestTimeout): t.Fatal("timed out waiting for batcher done") } } @@ -80,7 +82,7 @@ func (f *bidFixture) waitCaptured() []sdk.Msg { select { case msgs := <-f.captured: return msgs - case <-time.After(2 * time.Second): + case <-time.After(twoSecondsTestTimeout): f.t.Fatal("timed out waiting for broadcast") return nil } @@ -88,24 +90,23 @@ func (f *bidFixture) waitCaptured() []sdk.Msg { // --- parseMsgFailIndex --- -func TestParseMsgFailIndex_valid(t *testing.T) { - err := errors.New("tx failed: message index: 2: insufficient funds") - require.Equal(t, 2, parseMsgFailIndex(err)) -} - -func TestParseMsgFailIndex_zero(t *testing.T) { - err := errors.New("message index: 0: bad") - require.Equal(t, 0, parseMsgFailIndex(err)) -} - -func TestParseMsgFailIndex_no_index(t *testing.T) { - err := errors.New("out of gas") - require.Equal(t, -1, parseMsgFailIndex(err)) -} +func TestParseMsgFailIndex(t *testing.T) { + tests := []struct { + name string + errMsg string + expected int + }{ + {name: "valid_index", errMsg: "tx failed: message index: 2: insufficient funds", expected: 2}, + {name: "zero_index", errMsg: "message index: 0: bad", expected: 0}, + {name: "no_index", errMsg: "out of gas", expected: -1}, + {name: "extra_whitespace", errMsg: "message index: 7: bad msg", expected: 7}, + } -func TestParseMsgFailIndex_whitespace(t *testing.T) { - err := errors.New("message index: 7: bad msg") - require.Equal(t, 7, parseMsgFailIndex(err)) + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expected, parseMsgFailIndex(errors.New(tc.errMsg))) + }) + } } // --- Flush / Done / MarkDone state machine --- diff --git a/bidengine/service.go b/bidengine/service.go index fbdff046..f0f8dffe 100644 --- a/bidengine/service.go +++ b/bidengine/service.go @@ -82,7 +82,7 @@ func NewService( const bidBroadcastTimeout = 30 * time.Second if cfg.BidBatchMaxMsgs <= 0 { - cfg.BidBatchMaxMsgs = 10 + panic("BidBatchMaxMsgs must be > 0") } s := &service{ diff --git a/cmd/provider-services/cmd/flags.go b/cmd/provider-services/cmd/flags.go index beb528ae..fcc0caf8 100644 --- a/cmd/provider-services/cmd/flags.go +++ b/cmd/provider-services/cmd/flags.go @@ -182,7 +182,7 @@ func addRunFlags(cmd *cobra.Command) error { return err } - cmd.Flags().Int(FlagBidBatchMaxMsgs, 10, "max number of MsgCreateBid messages coalesced into a single broadcast. valid range [1, 50]") + cmd.Flags().Int(FlagBidBatchMaxMsgs, 10, fmt.Sprintf("max number of MsgCreateBid messages coalesced into a single broadcast. valid range [%d, %d]", batchMaxMsgsMin, batchMaxMsgsMax)) if err := viper.BindPFlag(FlagBidBatchMaxMsgs, cmd.Flags().Lookup(FlagBidBatchMaxMsgs)); err != nil { return err } @@ -202,7 +202,7 @@ func addRunFlags(cmd *cobra.Command) error { return err } - cmd.Flags().Int(FlagWithdrawalBatchMaxMsgs, 50, fmt.Sprintf("max number of MsgWithdrawLease messages coalesced into a single broadcast. valid range [%d, %d]", withdrawalBatchMaxMsgsMin, withdrawalBatchMaxMsgsMax)) + cmd.Flags().Int(FlagWithdrawalBatchMaxMsgs, 50, fmt.Sprintf("max number of MsgWithdrawLease messages coalesced into a single broadcast. valid range [%d, %d]", batchMaxMsgsMin, batchMaxMsgsMax)) if err := viper.BindPFlag(FlagWithdrawalBatchMaxMsgs, cmd.Flags().Lookup(FlagWithdrawalBatchMaxMsgs)); err != nil { return err } diff --git a/cmd/provider-services/cmd/run.go b/cmd/provider-services/cmd/run.go index faeedf43..6e7fb829 100644 --- a/cmd/provider-services/cmd/run.go +++ b/cmd/provider-services/cmd/run.go @@ -94,53 +94,50 @@ const ( FlagDeploymentRuntimeClass = "deployment-runtime-class" FlagBidTimeout = "bid-timeout" FlagBidBatchMaxMsgs = "bid-batch-max-msgs" - - bidBatchMaxMsgsMin = 1 - bidBatchMaxMsgsMax = 50 - FlagManifestTimeout = "manifest-timeout" - FlagMetricsListener = "metrics-listener" - FlagWithdrawalPeriod = "withdrawal-period" - FlagWithdrawalBatchMaxMsgs = "withdrawal-batch-max-msgs" - FlagLeaseFundsMonitorInterval = "lease-funds-monitor-interval" - FlagMinimumBalance = "minimum-balance" - FlagProviderConfig = "provider-config" - FlagCachedResultMaxAge = "cached-result-max-age" - FlagRPCQueryTimeout = "rpc-query-timeout" - FlagBidPriceIPScale = "bid-price-ip-scale" - FlagEnableIPOperator = "ip-operator" - FlagTxBroadcastTimeout = "tx-broadcast-timeout" - FlagMonitorMaxRetries = "monitor-max-retries" - FlagMonitorRetryPeriod = "monitor-retry-period" - FlagMonitorRetryPeriodJitter = "monitor-retry-period-jitter" - FlagMonitorHealthcheckPeriod = "monitor-healthcheck-period" - FlagMonitorHealthcheckPeriodJitter = "monitor-healthcheck-period-jitter" - FlagPersistentConfigBackend = "persistent-config-backend" - FlagPersistentConfigPath = "persistent-config-path" - FlagGatewayTLSCert = "gateway-tls-cert" - FlagGatewayTLSKey = "gateway-tls-key" - FlagCertIssuerEnabled = "cert-issuer-enabled" - FlagCertIssuerKID = "cert-issuer-kid" - FlagCertIssuerHMAC = "cert-issuer-hmac" - FlagCertIssuerStorageDir = "cert-issuer-storage-dir" - FlagCertIssuerCADirURL = "cert-issuer-ca-dir-url" - FlagCertIssuerHTTPChallengePort = "cert-issuer-http-challenge-port" - FlagCertIssuerTLSChallengePort = "cert-issuer-tls-challenge-port" - FlagCertIssuerDNSProviders = "cert-issuer-dns-providers" - FlagCertIssuerDNSResolvers = "cert-issuer-dns-resolvers" - FlagCertIssuerEmail = "cert-issuer-email" - FlagMigrationsEnabled = "migrations-enabled" - FlagMigrationsStatePath = "migrations-state-path" - FlagIngressMode = "ingress-mode" - FlagGatewayName = "gateway-name" - FlagGatewayNamespace = "gateway-namespace" - FlagGatewayProvider = "gateway-provider" + FlagManifestTimeout = "manifest-timeout" + FlagMetricsListener = "metrics-listener" + FlagWithdrawalPeriod = "withdrawal-period" + FlagWithdrawalBatchMaxMsgs = "withdrawal-batch-max-msgs" + FlagLeaseFundsMonitorInterval = "lease-funds-monitor-interval" + FlagMinimumBalance = "minimum-balance" + FlagProviderConfig = "provider-config" + FlagCachedResultMaxAge = "cached-result-max-age" + FlagRPCQueryTimeout = "rpc-query-timeout" + FlagBidPriceIPScale = "bid-price-ip-scale" + FlagEnableIPOperator = "ip-operator" + FlagTxBroadcastTimeout = "tx-broadcast-timeout" + FlagMonitorMaxRetries = "monitor-max-retries" + FlagMonitorRetryPeriod = "monitor-retry-period" + FlagMonitorRetryPeriodJitter = "monitor-retry-period-jitter" + FlagMonitorHealthcheckPeriod = "monitor-healthcheck-period" + FlagMonitorHealthcheckPeriodJitter = "monitor-healthcheck-period-jitter" + FlagPersistentConfigBackend = "persistent-config-backend" + FlagPersistentConfigPath = "persistent-config-path" + FlagGatewayTLSCert = "gateway-tls-cert" + FlagGatewayTLSKey = "gateway-tls-key" + FlagCertIssuerEnabled = "cert-issuer-enabled" + FlagCertIssuerKID = "cert-issuer-kid" + FlagCertIssuerHMAC = "cert-issuer-hmac" + FlagCertIssuerStorageDir = "cert-issuer-storage-dir" + FlagCertIssuerCADirURL = "cert-issuer-ca-dir-url" + FlagCertIssuerHTTPChallengePort = "cert-issuer-http-challenge-port" + FlagCertIssuerTLSChallengePort = "cert-issuer-tls-challenge-port" + FlagCertIssuerDNSProviders = "cert-issuer-dns-providers" + FlagCertIssuerDNSResolvers = "cert-issuer-dns-resolvers" + FlagCertIssuerEmail = "cert-issuer-email" + FlagMigrationsEnabled = "migrations-enabled" + FlagMigrationsStatePath = "migrations-state-path" + FlagIngressMode = "ingress-mode" + FlagGatewayName = "gateway-name" + FlagGatewayNamespace = "gateway-namespace" + FlagGatewayProvider = "gateway-provider" ) const ( - serviceIPOperator = "ip-operator" - serviceHostnameOperator = "hostname-operator" - withdrawalBatchMaxMsgsMin = 10 - withdrawalBatchMaxMsgsMax = 100 + serviceIPOperator = "ip-operator" + serviceHostnameOperator = "hostname-operator" + batchMaxMsgsMin = 10 + batchMaxMsgsMax = 100 ) var ( @@ -216,8 +213,12 @@ func RunCmd() *cobra.Command { return fmt.Errorf(`flag "%s" value must be > "%s"`, FlagWithdrawalPeriod, FlagLeaseFundsMonitorInterval) // nolint: err113 } - if maxMsgs := viper.GetInt(FlagWithdrawalBatchMaxMsgs); maxMsgs < withdrawalBatchMaxMsgsMin || maxMsgs > withdrawalBatchMaxMsgsMax { - return fmt.Errorf(`flag "%s" contains invalid value %d. expected range [%d, %d]`, FlagWithdrawalBatchMaxMsgs, maxMsgs, withdrawalBatchMaxMsgsMin, withdrawalBatchMaxMsgsMax) // nolint: err113 + if maxMsgs := viper.GetInt(FlagWithdrawalBatchMaxMsgs); maxMsgs < batchMaxMsgsMin || maxMsgs > batchMaxMsgsMax { + return fmt.Errorf(`flag "%s" contains invalid value %d. expected range [%d, %d]`, FlagWithdrawalBatchMaxMsgs, maxMsgs, batchMaxMsgsMin, batchMaxMsgsMax) // nolint: err113 + } + + if maxMsgs := viper.GetInt(FlagBidBatchMaxMsgs); maxMsgs < batchMaxMsgsMin || maxMsgs > batchMaxMsgsMax { + return fmt.Errorf(`flag "%s" contains invalid value %d. expected range [%d, %d]`, FlagBidBatchMaxMsgs, maxMsgs, batchMaxMsgsMin, batchMaxMsgsMax) // nolint: err113 } if viper.GetDuration(FlagMonitorRetryPeriod) < 4*time.Second { @@ -495,10 +496,9 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { blockedHostnames := viper.GetStringSlice(FlagDeploymentBlockedHostnames) deploymentRuntimeClass := viper.GetString(FlagDeploymentRuntimeClass) bidTimeout := viper.GetDuration(FlagBidTimeout) + bidBatchMaxMsgs := viper.GetInt(FlagBidBatchMaxMsgs) - if bidBatchMaxMsgs < bidBatchMaxMsgsMin || bidBatchMaxMsgs > bidBatchMaxMsgsMax { - return fmt.Errorf(`flag "%s" contains invalid value %d. expected range [%d, %d]`, FlagBidBatchMaxMsgs, bidBatchMaxMsgs, bidBatchMaxMsgsMin, bidBatchMaxMsgsMax) // nolint: err113 - } + manifestTimeout := viper.GetDuration(FlagManifestTimeout) metricsListener := viper.GetString(FlagMetricsListener) providerConfig := viper.GetString(FlagProviderConfig) diff --git a/config.go b/config.go index 1a5e205b..7fff75bd 100644 --- a/config.go +++ b/config.go @@ -34,6 +34,7 @@ func NewDefaultConfig() Config { return Config{ ClusterWaitReadyDuration: time.Second * 10, BidDeposit: mtypes.DefaultBidMinDeposit, + BidBatchMaxMsgs: 10, BalanceCheckerCfg: BalanceCheckerConfig{ LeaseFundsCheckInterval: 1 * time.Minute, WithdrawalPeriod: 24 * time.Hour, From 462af286a91dde079be33e36b4a903c78436b6a0 Mon Sep 17 00:00:00 2001 From: Artem Shcherbatiuk Date: Tue, 28 Apr 2026 16:57:49 +0200 Subject: [PATCH 4/5] chore: refactor --- bidengine/bid_batcher.go | 6 +++--- bidengine/service.go | 6 +++--- withdraw_batcher.go | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/bidengine/bid_batcher.go b/bidengine/bid_batcher.go index 336ab7b5..6c55ecfc 100644 --- a/bidengine/bid_batcher.go +++ b/bidengine/bid_batcher.go @@ -108,11 +108,11 @@ func (b *bidBatcher) Enqueue(req bidRequest) { func (b *bidBatcher) Flush(ctx context.Context) bool { b.enter() defer b.exit() - if b.inFlight { - b.log.Debug("bid batcher: flush skipped (in-flight)", "pending", len(b.pending)) + if len(b.pending) == 0 { return false } - if len(b.pending) == 0 { + if b.inFlight { + b.log.Debug("bid batcher: flush skipped (in-flight)", "pending", len(b.pending)) return false } diff --git a/bidengine/service.go b/bidengine/service.go index f0f8dffe..74e9444f 100644 --- a/bidengine/service.go +++ b/bidengine/service.go @@ -35,6 +35,8 @@ var ordersCounter = promauto.NewCounterVec(prometheus.CounterOpts{ // ErrNotRunning declares new error with message "not running" var ErrNotRunning = errors.New("not running") +const broadcastTimeout = 30 * time.Second + // StatusClient interface predefined with Status method type StatusClient interface { Status(context.Context) (*apclient.BidEngineStatus, error) @@ -79,8 +81,6 @@ func NewService( ctx, cancel := context.WithCancel(pctx) group, _ := errgroup.WithContext(ctx) - const bidBroadcastTimeout = 30 * time.Second - if cfg.BidBatchMaxMsgs <= 0 { panic("BidBatchMaxMsgs must be > 0") } @@ -98,7 +98,7 @@ func NewService( bidBatcher: newBidBatcher( session.Client().Tx(), session.Log().With("cmp", "bid-batcher"), - bidBroadcastTimeout, + broadcastTimeout, cfg.BidBatchMaxMsgs, ), group: group, diff --git a/withdraw_batcher.go b/withdraw_batcher.go index 639d2f33..921fdff4 100644 --- a/withdraw_batcher.go +++ b/withdraw_batcher.go @@ -105,11 +105,11 @@ func (b *withdrawBatcher) Pending() int { func (b *withdrawBatcher) Flush(ctx context.Context) bool { b.enter() defer b.exit() - if b.inFlight { - b.log.Debug("batcher: flush skipped (in-flight)", "pending", len(b.pending)) + if len(b.pending) == 0 { return false } - if len(b.pending) == 0 { + if b.inFlight { + b.log.Debug("batcher: flush skipped (in-flight)", "pending", len(b.pending)) return false } From cf8b87744565a2d08c0eac3a98cf77e4855a8350 Mon Sep 17 00:00:00 2001 From: Artem Shcherbatiuk Date: Tue, 28 Apr 2026 19:30:18 +0200 Subject: [PATCH 5/5] chore: used chain-sdk RC --- go.mod | 10 +++++----- go.sum | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 4eb128fe..1b2fde68 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/akash-network/provider -go 1.25.5 +go 1.25.9 require ( cosmossdk.io/errors v1.0.2 @@ -10,7 +10,7 @@ require ( github.com/blang/semver/v4 v4.0.0 github.com/boz/go-lifecycle v0.1.1 github.com/cometbft/cometbft v0.38.21 - github.com/cosmos/cosmos-sdk v0.53.5 + github.com/cosmos/cosmos-sdk v0.53.6 github.com/desertbit/timer v1.0.1 github.com/fsnotify/fsnotify v1.9.0 github.com/go-acme/lego/v4 v4.26.0 @@ -51,11 +51,11 @@ require ( k8s.io/client-go v0.34.1 k8s.io/code-generator v0.34.1 k8s.io/kubectl v0.33.3 - pkg.akt.dev/go v0.2.7 + pkg.akt.dev/go v0.2.11-rc2 pkg.akt.dev/go/cli v0.2.2 pkg.akt.dev/go/sdl v0.2.0 pkg.akt.dev/node v1.2.2 - pkg.akt.dev/node/v2 v2.0.0 + pkg.akt.dev/node/v2 v2.1.0-rc9 sigs.k8s.io/gateway-api v1.4.1 sigs.k8s.io/kind v0.30.0 sigs.k8s.io/structured-merge-diff/v6 v6.3.0 @@ -179,7 +179,7 @@ require ( github.com/cosmos/iavl v1.2.6 // indirect github.com/cosmos/ibc-go/v10 v10.5.0 // indirect github.com/cosmos/ics23/go v0.11.0 // indirect - github.com/cosmos/ledger-cosmos-go v0.16.0 // indirect + github.com/cosmos/ledger-cosmos-go v1.0.0 // indirect github.com/cosmos/rosetta v0.50.12 // indirect github.com/cosmos/rosetta-sdk-go v0.10.0 // indirect github.com/danieljoos/wincred v1.2.2 // indirect diff --git a/go.sum b/go.sum index 4c4b14aa..692b93ea 100644 --- a/go.sum +++ b/go.sum @@ -3883,16 +3883,16 @@ nhooyr.io/websocket v1.8.17 h1:KEVeLJkUywCKVsnLIDlD/5gtayKp8VoCkksHCGGfT9Y= nhooyr.io/websocket v1.8.17/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= pgregory.net/rapid v0.5.5 h1:jkgx1TjbQPD/feRoK+S/mXw9e1uj6WilpHrXJowi6oA= pgregory.net/rapid v0.5.5/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04= -pkg.akt.dev/go v0.2.7 h1:oQFhmjv7xUc9MLxsCmdnl52DL/VOu3r3shz51rcXnLs= -pkg.akt.dev/go v0.2.7/go.mod h1:x9Cku9yibLk4aGLTE/Yy7eEkkda0uOSRWmYwQeFw23o= +pkg.akt.dev/go v0.2.11-rc2 h1:IdNX8cwhSVrDLMchNczIwWuJ2IdmF+WSVUKO6XUlzx0= +pkg.akt.dev/go v0.2.11-rc2/go.mod h1:x9Cku9yibLk4aGLTE/Yy7eEkkda0uOSRWmYwQeFw23o= pkg.akt.dev/go/cli v0.2.2 h1:PWDAAeHkVtcZ9qE76yh4IhJ2J/42ekhwSyrGWLPGi/g= pkg.akt.dev/go/cli v0.2.2/go.mod h1:MHm9lU8hb+xQ8BX3b9c9S1pMyZKUob5tVjHXQ4T1uwU= pkg.akt.dev/go/sdl v0.2.0 h1:hY74GjN4itV92REf8HqGt1rQDtXsruzE/iIzd/FpUB8= pkg.akt.dev/go/sdl v0.2.0/go.mod h1:urd6091AWDy9YwFLRCsENuQ931qyRcg/RJBN9XCBs/E= pkg.akt.dev/node v1.2.2 h1:Xka/9sVaJTyGfyudZUc8A9UW8xC0FYcZoHvD6dm1iy0= pkg.akt.dev/node v1.2.2/go.mod h1:luWpw5dNEIMhEITMqOvjiohqIoLd/kkcjeg1aCF4EJA= -pkg.akt.dev/node/v2 v2.0.0 h1:U1K9Kce+s4qAD9VlRy22l6Mw8tUTa2rYp8SJ/78zjwM= -pkg.akt.dev/node/v2 v2.0.0/go.mod h1:srQsJA8F3SB4RkhluxLKA5roOkW2q0YeNYzEpSo7M74= +pkg.akt.dev/node/v2 v2.1.0-rc9 h1:KqYAA9QBYSi9wO4wPy9BaBa5d39eUBmmg4ISk2K3X0k= +pkg.akt.dev/node/v2 v2.1.0-rc9/go.mod h1:DKxgQLlxBL8QT+Y6KIQPnAmjrnJSiwBhvmO4fr8c7pw= pkg.akt.dev/specs v0.0.1 h1:OP0zil3Fr4kcCuybFqQ8LWgSlSP2Yn7306meWpu6/S4= pkg.akt.dev/specs v0.0.1/go.mod h1:tiFuJAqzn+lkz662lf9qaEdjdrrDr882r3YMDnWkbp4= pkg.akt.dev/testdata v0.0.1 h1:yHfqF0Uxf7Rg7WdwSggnyBWMxACtAg5VpBUVFXU+uvM=