Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a9791b1
fix: coordinated ShutdownInitiated after bid close tx broadcast is fi…
vertex451 Mar 10, 2026
15977fa
fix: test watchdog stop while witing for broadcast
vertex451 Mar 10, 2026
09dbce0
chore: reverted original log.Info for lease won
vertex451 Mar 11, 2026
b1a78b6
chore: commit to the broadcast once started
vertex451 Mar 12, 2026
85f8226
fix: added context.WithTimeout to the broadcast message
vertex451 Mar 13, 2026
3514dd1
Merge branch 'main' of github.com:akash-network/provider into artem/f…
vertex451 Mar 13, 2026
d4c2e18
fix: added default BroadcastTimeout
vertex451 Mar 13, 2026
914b5b6
fix: set cancel timeout to 12s
vertex451 Mar 13, 2026
baee021
Merge branch 'main' into artem/fix-bid-close-if-no-manfiest
troian Mar 20, 2026
3388a8e
Merge branch 'main' into artem/fix-bid-close-if-no-manfiest
Zblocker64 Mar 20, 2026
5643880
Merge branch 'main' into artem/fix-bid-close-if-no-manfiest
Zblocker64 Mar 20, 2026
199c391
fix: added broadcastTimeout validation
vertex451 Mar 20, 2026
a252e0c
Merge branch 'main' into artem/fix-bid-close-if-no-manfiest
vertex451 Mar 23, 2026
87728f3
Merge branch 'main' into artem/fix-bid-close-if-no-manfiest
Zblocker64 Mar 24, 2026
e404abd
Merge branch 'main' into artem/fix-bid-close-if-no-manfiest
Zblocker64 Mar 24, 2026
1bf9bf0
Merge branch 'main' into artem/fix-bid-close-if-no-manfiest
Zblocker64 Mar 25, 2026
a9a2bc4
Merge branch 'main' into artem/fix-bid-close-if-no-manfiest
vertex451 Mar 27, 2026
f4a01fa
Merge branch 'main' into artem/fix-bid-close-if-no-manfiest
Zblocker64 Apr 8, 2026
0cc9eb7
Merge branch 'main' into artem/fix-bid-close-if-no-manfiest
Zblocker64 Apr 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/provider-services/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error {
deploymentRuntimeClass := viper.GetString(FlagDeploymentRuntimeClass)
bidTimeout := viper.GetDuration(FlagBidTimeout)
manifestTimeout := viper.GetDuration(FlagManifestTimeout)
broadcastTimeout := viper.GetDuration(FlagTxBroadcastTimeout)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
metricsListener := viper.GetString(FlagMetricsListener)
providerConfig := viper.GetString(FlagProviderConfig)
cachedResultMaxAge := viper.GetDuration(FlagCachedResultMaxAge)
Expand Down Expand Up @@ -576,6 +577,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error {
config.DeploymentIngressDomain = deploymentIngressDomain
config.BidTimeout = bidTimeout
config.ManifestTimeout = manifestTimeout
config.BroadcastTimeout = broadcastTimeout
config.MonitorMaxRetries = monitorMaxRetries
config.MonitorRetryPeriod = monitorRetryPeriod
config.MonitorRetryPeriodJitter = monitorRetryPeriodJitter
Expand Down
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Config struct {
BidDeposit sdk.Coin
BidTimeout time.Duration
ManifestTimeout time.Duration
BroadcastTimeout time.Duration
Comment thread
coderabbitai[bot] marked this conversation as resolved.
BalanceCheckerCfg BalanceCheckerConfig
Attributes attrtypes.Attributes
MaxGroupVolumes int
Expand Down
2 changes: 1 addition & 1 deletion gateway/rest/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func newRouter(log log.Logger, addr sdk.Address, pclient provider.Client, ctxCon
authorizeProviderMiddleware,
requireOwner,
)
Comment thread
Zblocker64 marked this conversation as resolved.
Outdated

hostnameRouter := authedRouter.PathPrefix(apclient.HostnamePrefix).Subrouter()
hostnameRouter.HandleFunc(apclient.MigratePathPrefix,
migrateHandler(log, pclient.Hostname(), pclient.ClusterService())).
Expand Down
1 change: 1 addition & 0 deletions manifest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import "time"
type ServiceConfig struct {
HTTPServicesRequireAtLeastOneHost bool
ManifestTimeout time.Duration
BroadcastTimeout time.Duration
RPCQueryTimeout time.Duration
CachedResultMaxAge time.Duration
}
2 changes: 1 addition & 1 deletion manifest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (s *service) handleLease(ev event.LeaseWon, isNew bool) {
if isNew && s.config.ManifestTimeout > time.Duration(0) {
// Create watchdog if it does not exist AND a manifest has not been received yet
if watchdog := s.watchdogs[ev.LeaseID.DeploymentID()]; watchdog == nil {
watchdog = newWatchdog(s.session, s.lc.ShuttingDown(), s.watchdogch, ev.LeaseID, s.config.ManifestTimeout)
watchdog = newWatchdog(s.session, s.lc.ShuttingDown(), s.watchdogch, ev.LeaseID, s.config.ManifestTimeout, s.config.BroadcastTimeout)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if provider service restarts, does it start counting from 0, or takes into account when lease was created?

Copy link
Copy Markdown
Contributor Author

@vertex451 vertex451 Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the lease creation time is not taking into an account.
I can improve the solution by taking Lease.CreatedAt and computing the remaining time by converting currentBlock to the time(the same we did in isStaleBid)

Not doing this since we discussed the refactor of the serial broadcaster.

s.watchdogs[ev.LeaseID.DeploymentID()] = watchdog
}
}
Expand Down
63 changes: 38 additions & 25 deletions manifest/watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,25 @@ import (
)

type watchdog struct {
leaseID mtypes.LeaseID
timeout time.Duration
lc lifecycle.Lifecycle
sess session.Session
ctx context.Context
log log.Logger
leaseID mtypes.LeaseID
timeout time.Duration
broadcastTimeout time.Duration
lc lifecycle.Lifecycle
sess session.Session
log log.Logger
}

func newWatchdog(sess session.Session, parent <-chan struct{}, done chan<- dtypes.DeploymentID, leaseID mtypes.LeaseID, timeout time.Duration) *watchdog {
ctx, cancel := context.WithCancel(context.Background())
func newWatchdog(sess session.Session, parent <-chan struct{}, done chan<- dtypes.DeploymentID, leaseID mtypes.LeaseID, timeout, broadcastTimeout time.Duration) *watchdog {
result := &watchdog{
leaseID: leaseID,
timeout: timeout,
lc: lifecycle.New(),
sess: sess,
ctx: ctx,
log: sess.Log().With("leaseID", leaseID),
leaseID: leaseID,
timeout: timeout,
broadcastTimeout: broadcastTimeout,
lc: lifecycle.New(),
sess: sess,
log: sess.Log().With("leaseID", leaseID),
}

go func() {
result.lc.WatchChannel(parent)
cancel()
}()
go result.lc.WatchChannel(parent)

go func() {
<-result.lc.Done()
Expand All @@ -54,10 +50,17 @@ func newWatchdog(sess session.Session, parent <-chan struct{}, done chan<- dtype
return result
}

// stop signals the watchdog to exit without closing the bid.
// Called when: (1) manifest received within the timeout, (2) manifest manager is stopped.
func (wd *watchdog) stop() {
wd.lc.ShutdownAsync(nil)
}

// run waits for the manifest timeout, then broadcasts MsgCloseBid.
//
// Rule: once the broadcast is started, we commit to it - it must complete regardless of
// any concurrent stop() or parent shutdown. This prevents leaving an open bid on-chain.
// broadcastTimeout bounds how long we wait for the RPC response to prevent a permanent hang.
func (wd *watchdog) run() {
defer wd.lc.ShutdownCompleted()

Expand All @@ -71,21 +74,31 @@ func (wd *watchdog) run() {
wd.log.Info("watchdog closing bid")

runch = runner.Do(func() runner.Result {
ctx, cancel := context.WithTimeout(context.Background(), wd.broadcastTimeout)
defer cancel()

msg := &mvbeta.MsgCloseBid{
ID: mtypes.MakeBidID(wd.leaseID.OrderID(), wd.sess.Provider().Address()),
Reason: mtypes.LeaseClosedReasonManifestTimeout,
}

return runner.NewResult(wd.sess.Client().Tx().BroadcastMsgs(wd.ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError()))
return runner.NewResult(wd.sess.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError()))
})
case err = <-wd.lc.ShutdownRequest():
// Manifest received or parent shutdown before timeout - exit without closing the bid.
}

wd.lc.ShutdownInitiated(err)
if runch != nil {
result := <-runch
if err := result.Error(); err != nil {
wd.log.Error("failed closing bid", "err", err)
// ShutdownRequest may arrive while we wait for the broadcast result.
// Consume it to unblock the sender, but keep looping until runch delivers.
// The broadcast context is independent and bounded by broadcastTimeout.
for runch != nil {
select {
case result := <-runch:
if err := result.Error(); err != nil {
wd.log.Error("failed closing bid", "err", err)
}
runch = nil
case err = <-wd.lc.ShutdownRequest():
Comment thread
troian marked this conversation as resolved.
}
}
wd.lc.ShutdownInitiated(err)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
74 changes: 70 additions & 4 deletions manifest/watchdog_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package manifest

import (
"context"
"testing"
"time"

Expand All @@ -10,6 +11,7 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"

clientmocks "pkg.akt.dev/go/mocks/node/client"
aclient "pkg.akt.dev/go/node/client/v1beta3"
dtypes "pkg.akt.dev/go/node/deployment/v1"
mtypes "pkg.akt.dev/go/node/market/v1"
mvbeta "pkg.akt.dev/go/node/market/v1beta5"
Expand All @@ -29,6 +31,14 @@ type watchdogTestScaffold struct {
}

func makeWatchdogTestScaffold(t *testing.T, timeout time.Duration) (*watchdog, *watchdogTestScaffold) {
return makeWatchdogTestScaffoldFull(t, timeout, 30*time.Second, nil)
}

func makeWatchdogTestScaffoldWithBlocking(t *testing.T, timeout time.Duration, blockUntilRelease <-chan struct{}) (*watchdog, *watchdogTestScaffold) {
return makeWatchdogTestScaffoldFull(t, timeout, 30*time.Second, blockUntilRelease)
}

func makeWatchdogTestScaffoldFull(t *testing.T, timeout, broadcastTimeout time.Duration, blockUntilRelease <-chan struct{}) (*watchdog, *watchdogTestScaffold) {
scaffold := &watchdogTestScaffold{}
scaffold.parentCh = make(chan struct{})
scaffold.doneCh = make(chan dtypes.DeploymentID, 1)
Expand All @@ -38,17 +48,27 @@ func makeWatchdogTestScaffold(t *testing.T, timeout time.Duration) (*watchdog, *
scaffold.broadcasts = make(chan []sdk.Msg, 1)

txClientMock := &clientmocks.TxClient{}
txClientMock.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
scaffold.broadcasts <- args.Get(1).([]sdk.Msg)
}).Return(&sdk.Result{}, nil)
txClientMock.EXPECT().
BroadcastMsgs(mock.Anything, mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, msgs []sdk.Msg, _ ...aclient.BroadcastOption) (any, error) {
if blockUntilRelease != nil {
select {
case <-blockUntilRelease:
case <-ctx.Done():
return nil, ctx.Err()
}
}
scaffold.broadcasts <- msgs
return &sdk.Result{}, nil
})

scaffold.client = &clientmocks.Client{}
scaffold.client.On("Tx").Return(txClientMock)
sess := session.New(testutil.Logger(t), scaffold.client, &scaffold.provider, -1)

require.NotNil(t, sess.Client())

wd := newWatchdog(sess, scaffold.parentCh, scaffold.doneCh, scaffold.leaseID, timeout)
wd := newWatchdog(sess, scaffold.parentCh, scaffold.doneCh, scaffold.leaseID, timeout, broadcastTimeout)

return wd, scaffold
}
Expand All @@ -72,6 +92,7 @@ func TestWatchdogTimeout(t *testing.T) {

msg := msgs[0].(*mvbeta.MsgCloseBid)
require.Equal(t, scaffold.leaseID, msg.ID.LeaseID())
require.Equal(t, mtypes.LeaseClosedReasonManifestTimeout, msg.Reason)

deploymentID := testutil.ChannelWaitForValue(t, scaffold.doneCh)
require.Equal(t, deploymentID, scaffold.leaseID.DeploymentID())
Expand Down Expand Up @@ -121,3 +142,48 @@ func TestWatchdogStopsOnParent(t *testing.T) {
deploymentID := testutil.ChannelWaitForValue(t, scaffold.doneCh)
require.Equal(t, deploymentID, scaffold.leaseID.DeploymentID())
}

func TestWatchdogBroadcastTimeout(t *testing.T) {
// Mock blocks forever; broadcast context expires after 10ms → watchdog exits cleanly.
neverRelease := make(chan struct{})
wd, scaffold := makeWatchdogTestScaffoldFull(t, 100*time.Millisecond, 10*time.Millisecond, neverRelease)

select {
case <-wd.lc.Done():
case <-time.After(5 * time.Second):
t.Fatal("watchdog hung after broadcast timeout")
}

select {
case <-scaffold.broadcasts:
t.Fatal("broadcast should not have completed")
default:
}

deploymentID := testutil.ChannelWaitForValue(t, scaffold.doneCh)
require.Equal(t, deploymentID, scaffold.leaseID.DeploymentID())
}

func TestWatchdogStopWhileWaitingForBroadcast(t *testing.T) {
releaseCh := make(chan struct{})
wd, scaffold := makeWatchdogTestScaffoldWithBlocking(t, 100*time.Millisecond, releaseCh)

<-time.After(200 * time.Millisecond)
wd.stop()

close(releaseCh)

select {
case <-wd.lc.Done():
case <-time.After(5 * time.Second):
t.Fatal("deadlock: watchdog did not complete after broadcast")
}

deploymentID := testutil.ChannelWaitForValue(t, scaffold.doneCh)
require.Equal(t, deploymentID, scaffold.leaseID.DeploymentID())

broadcasts := testutil.ChannelWaitForValue(t, scaffold.broadcasts)
msgs := broadcasts.([]sdk.Msg)
require.Len(t, msgs, 1)
require.Equal(t, mtypes.LeaseClosedReasonManifestTimeout, msgs[0].(*mvbeta.MsgCloseBid).Reason)
}
1 change: 1 addition & 0 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func NewService(ctx context.Context,
manifestConfig := manifest.ServiceConfig{
HTTPServicesRequireAtLeastOneHost: !cfg.DeploymentIngressStaticHosts,
ManifestTimeout: cfg.ManifestTimeout,
BroadcastTimeout: cfg.BroadcastTimeout,
RPCQueryTimeout: cfg.RPCQueryTimeout,
CachedResultMaxAge: cfg.CachedResultMaxAge,
}
Expand Down
Loading