diff --git a/cmd/provider-services/cmd/flags.go b/cmd/provider-services/cmd/flags.go index 8fff9a03..287aed05 100644 --- a/cmd/provider-services/cmd/flags.go +++ b/cmd/provider-services/cmd/flags.go @@ -226,7 +226,7 @@ func addRunFlags(cmd *cobra.Command) error { return err } - cmd.Flags().Duration(FlagTxBroadcastTimeout, 30*time.Second, "tx broadcast timeout. defaults to 30s") + cmd.Flags().Duration(FlagTxBroadcastTimeout, 12*time.Second, "tx broadcast timeout") if err := viper.BindPFlag(FlagTxBroadcastTimeout, cmd.Flags().Lookup(FlagTxBroadcastTimeout)); err != nil { return err } diff --git a/cmd/provider-services/cmd/run.go b/cmd/provider-services/cmd/run.go index 519e2752..5d88aa95 100644 --- a/cmd/provider-services/cmd/run.go +++ b/cmd/provider-services/cmd/run.go @@ -481,6 +481,7 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { deploymentRuntimeClass := viper.GetString(FlagDeploymentRuntimeClass) bidTimeout := viper.GetDuration(FlagBidTimeout) manifestTimeout := viper.GetDuration(FlagManifestTimeout) + broadcastTimeout := viper.GetDuration(FlagTxBroadcastTimeout) metricsListener := viper.GetString(FlagMetricsListener) providerConfig := viper.GetString(FlagProviderConfig) cachedResultMaxAge := viper.GetDuration(FlagCachedResultMaxAge) @@ -655,6 +656,11 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error { config.DeploymentIngressDomain = deploymentIngressDomain config.BidTimeout = bidTimeout config.ManifestTimeout = manifestTimeout + if broadcastTimeout <= 0 { + logger.Warn("tx-broadcast-timeout must be positive, using default", "invalid", broadcastTimeout, "default", 12*time.Second) + broadcastTimeout = 12 * time.Second + } + config.BroadcastTimeout = broadcastTimeout config.MonitorMaxRetries = monitorMaxRetries config.MonitorRetryPeriod = monitorRetryPeriod config.MonitorRetryPeriodJitter = monitorRetryPeriodJitter diff --git a/config.go b/config.go index e2248d27..cf938c51 100644 --- a/config.go +++ b/config.go @@ -21,6 +21,7 @@ type Config struct { BidDeposit sdk.Coin BidTimeout time.Duration ManifestTimeout time.Duration + BroadcastTimeout time.Duration BalanceCheckerCfg BalanceCheckerConfig Attributes attrtypes.Attributes MaxGroupVolumes int @@ -33,6 +34,7 @@ func NewDefaultConfig() Config { return Config{ ClusterWaitReadyDuration: time.Second * 10, BidDeposit: mtypes.DefaultBidMinDeposit, + BroadcastTimeout: 12 * time.Second, BalanceCheckerCfg: BalanceCheckerConfig{ LeaseFundsCheckInterval: 1 * time.Minute, WithdrawalPeriod: 24 * time.Hour, diff --git a/manifest/config.go b/manifest/config.go index b909aef8..4cee79d4 100644 --- a/manifest/config.go +++ b/manifest/config.go @@ -5,6 +5,7 @@ import "time" type ServiceConfig struct { HTTPServicesRequireAtLeastOneHost bool ManifestTimeout time.Duration + BroadcastTimeout time.Duration RPCQueryTimeout time.Duration CachedResultMaxAge time.Duration } diff --git a/manifest/service.go b/manifest/service.go index 4021a252..59a1b56c 100644 --- a/manifest/service.go +++ b/manifest/service.go @@ -351,7 +351,7 @@ func (s *service) handleLease(ev event.LeaseWon, isNew bool) { if isNew && s.config.ManifestTimeout > time.Duration(0) { // Create watchdog if it does not exist AND a manifest has not been received yet if watchdog := s.watchdogs[ev.LeaseID.DeploymentID()]; watchdog == nil { - watchdog = newWatchdog(s.session, s.lc.ShuttingDown(), s.watchdogch, ev.LeaseID, s.config.ManifestTimeout) + watchdog = newWatchdog(s.session, s.lc.ShuttingDown(), s.watchdogch, ev.LeaseID, s.config.ManifestTimeout, s.config.BroadcastTimeout) s.watchdogs[ev.LeaseID.DeploymentID()] = watchdog } } diff --git a/manifest/watchdog.go b/manifest/watchdog.go index f961bab5..7ed1f681 100644 --- a/manifest/watchdog.go +++ b/manifest/watchdog.go @@ -20,29 +20,25 @@ import ( ) type watchdog struct { - leaseID mtypes.LeaseID - timeout time.Duration - lc lifecycle.Lifecycle - sess session.Session - ctx context.Context - log log.Logger + leaseID mtypes.LeaseID + timeout time.Duration + broadcastTimeout time.Duration + lc lifecycle.Lifecycle + sess session.Session + log log.Logger } -func newWatchdog(sess session.Session, parent <-chan struct{}, done chan<- dtypes.DeploymentID, leaseID mtypes.LeaseID, timeout time.Duration) *watchdog { - ctx, cancel := context.WithCancel(context.Background()) +func newWatchdog(sess session.Session, parent <-chan struct{}, done chan<- dtypes.DeploymentID, leaseID mtypes.LeaseID, timeout, broadcastTimeout time.Duration) *watchdog { result := &watchdog{ - leaseID: leaseID, - timeout: timeout, - lc: lifecycle.New(), - sess: sess, - ctx: ctx, - log: sess.Log().With("leaseID", leaseID), + leaseID: leaseID, + timeout: timeout, + broadcastTimeout: broadcastTimeout, + lc: lifecycle.New(), + sess: sess, + log: sess.Log().With("leaseID", leaseID), } - go func() { - result.lc.WatchChannel(parent) - cancel() - }() + go result.lc.WatchChannel(parent) go func() { <-result.lc.Done() @@ -54,10 +50,17 @@ func newWatchdog(sess session.Session, parent <-chan struct{}, done chan<- dtype return result } +// stop signals the watchdog to exit without closing the bid. +// Called when: (1) manifest received within the timeout, (2) manifest manager is stopped. func (wd *watchdog) stop() { wd.lc.ShutdownAsync(nil) } +// run waits for the manifest timeout, then broadcasts MsgCloseBid. +// +// Rule: once the broadcast is started, we commit to it - it must complete regardless of +// any concurrent stop() or parent shutdown. This prevents leaving an open bid on-chain. +// broadcastTimeout bounds how long we wait for the RPC response to prevent a permanent hang. func (wd *watchdog) run() { defer wd.lc.ShutdownCompleted() @@ -71,21 +74,32 @@ func (wd *watchdog) run() { wd.log.Info("watchdog closing bid") runch = runner.Do(func() runner.Result { + ctx, cancel := context.WithTimeout(context.Background(), wd.broadcastTimeout) + defer cancel() + msg := &mvbeta.MsgCloseBid{ ID: mtypes.MakeBidID(wd.leaseID.OrderID(), wd.sess.Provider().Address()), Reason: mtypes.LeaseClosedReasonManifestTimeout, } - - return runner.NewResult(wd.sess.Client().Tx().BroadcastMsgs(wd.ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError())) + return runner.NewResult(wd.sess.Client().Tx().BroadcastMsgs(ctx, []sdk.Msg{msg}, aclient.WithResultCodeAsError())) }) case err = <-wd.lc.ShutdownRequest(): + // Manifest received or parent shutdown before timeout - exit without closing the bid. } - wd.lc.ShutdownInitiated(err) - if runch != nil { - result := <-runch - if err := result.Error(); err != nil { - wd.log.Error("failed closing bid", "err", err) + // ShutdownRequest may arrive while we wait for the broadcast result. + // Consume it to unblock the sender, but keep looping until runch delivers. + // The broadcast context is independent and bounded by broadcastTimeout. + for runch != nil { + select { + case result := <-runch: + if err := result.Error(); err != nil { + wd.log.Error("failed closing bid", "err", err) + } + runch = nil + case err = <-wd.lc.ShutdownRequest(): + wd.log.Info("watchdog shutdown requested, waiting for bid close tx to complete") } } + wd.lc.ShutdownInitiated(err) } diff --git a/manifest/watchdog_test.go b/manifest/watchdog_test.go index 34191f18..7a10c807 100644 --- a/manifest/watchdog_test.go +++ b/manifest/watchdog_test.go @@ -1,6 +1,7 @@ package manifest import ( + "context" "testing" "time" @@ -10,6 +11,7 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" clientmocks "pkg.akt.dev/go/mocks/node/client" + aclient "pkg.akt.dev/go/node/client/v1beta3" dtypes "pkg.akt.dev/go/node/deployment/v1" mtypes "pkg.akt.dev/go/node/market/v1" mvbeta "pkg.akt.dev/go/node/market/v1beta5" @@ -29,6 +31,14 @@ type watchdogTestScaffold struct { } func makeWatchdogTestScaffold(t *testing.T, timeout time.Duration) (*watchdog, *watchdogTestScaffold) { + return makeWatchdogTestScaffoldFull(t, timeout, 30*time.Second, nil) +} + +func makeWatchdogTestScaffoldWithBlocking(t *testing.T, timeout time.Duration, blockUntilRelease <-chan struct{}) (*watchdog, *watchdogTestScaffold) { + return makeWatchdogTestScaffoldFull(t, timeout, 30*time.Second, blockUntilRelease) +} + +func makeWatchdogTestScaffoldFull(t *testing.T, timeout, broadcastTimeout time.Duration, blockUntilRelease <-chan struct{}) (*watchdog, *watchdogTestScaffold) { scaffold := &watchdogTestScaffold{} scaffold.parentCh = make(chan struct{}) scaffold.doneCh = make(chan dtypes.DeploymentID, 1) @@ -38,9 +48,19 @@ func makeWatchdogTestScaffold(t *testing.T, timeout time.Duration) (*watchdog, * scaffold.broadcasts = make(chan []sdk.Msg, 1) txClientMock := &clientmocks.TxClient{} - txClientMock.On("BroadcastMsgs", mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { - scaffold.broadcasts <- args.Get(1).([]sdk.Msg) - }).Return(&sdk.Result{}, nil) + txClientMock.EXPECT(). + BroadcastMsgs(mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, msgs []sdk.Msg, _ ...aclient.BroadcastOption) (any, error) { + if blockUntilRelease != nil { + select { + case <-blockUntilRelease: + case <-ctx.Done(): + return nil, ctx.Err() + } + } + scaffold.broadcasts <- msgs + return &sdk.Result{}, nil + }) scaffold.client = &clientmocks.Client{} scaffold.client.On("Tx").Return(txClientMock) @@ -48,7 +68,7 @@ func makeWatchdogTestScaffold(t *testing.T, timeout time.Duration) (*watchdog, * require.NotNil(t, sess.Client()) - wd := newWatchdog(sess, scaffold.parentCh, scaffold.doneCh, scaffold.leaseID, timeout) + wd := newWatchdog(sess, scaffold.parentCh, scaffold.doneCh, scaffold.leaseID, timeout, broadcastTimeout) return wd, scaffold } @@ -72,6 +92,7 @@ func TestWatchdogTimeout(t *testing.T) { msg := msgs[0].(*mvbeta.MsgCloseBid) require.Equal(t, scaffold.leaseID, msg.ID.LeaseID()) + require.Equal(t, mtypes.LeaseClosedReasonManifestTimeout, msg.Reason) deploymentID := testutil.ChannelWaitForValue(t, scaffold.doneCh) require.Equal(t, deploymentID, scaffold.leaseID.DeploymentID()) @@ -121,3 +142,48 @@ func TestWatchdogStopsOnParent(t *testing.T) { deploymentID := testutil.ChannelWaitForValue(t, scaffold.doneCh) require.Equal(t, deploymentID, scaffold.leaseID.DeploymentID()) } + +func TestWatchdogBroadcastTimeout(t *testing.T) { + // Mock blocks forever; broadcast context expires after 10ms → watchdog exits cleanly. + neverRelease := make(chan struct{}) + wd, scaffold := makeWatchdogTestScaffoldFull(t, 100*time.Millisecond, 10*time.Millisecond, neverRelease) + + select { + case <-wd.lc.Done(): + case <-time.After(5 * time.Second): + t.Fatal("watchdog hung after broadcast timeout") + } + + select { + case <-scaffold.broadcasts: + t.Fatal("broadcast should not have completed") + default: + } + + deploymentID := testutil.ChannelWaitForValue(t, scaffold.doneCh) + require.Equal(t, deploymentID, scaffold.leaseID.DeploymentID()) +} + +func TestWatchdogStopWhileWaitingForBroadcast(t *testing.T) { + releaseCh := make(chan struct{}) + wd, scaffold := makeWatchdogTestScaffoldWithBlocking(t, 100*time.Millisecond, releaseCh) + + <-time.After(200 * time.Millisecond) + wd.stop() + + close(releaseCh) + + select { + case <-wd.lc.Done(): + case <-time.After(5 * time.Second): + t.Fatal("deadlock: watchdog did not complete after broadcast") + } + + deploymentID := testutil.ChannelWaitForValue(t, scaffold.doneCh) + require.Equal(t, deploymentID, scaffold.leaseID.DeploymentID()) + + broadcasts := testutil.ChannelWaitForValue(t, scaffold.broadcasts) + msgs := broadcasts.([]sdk.Msg) + require.Len(t, msgs, 1) + require.Equal(t, mtypes.LeaseClosedReasonManifestTimeout, msgs[0].(*mvbeta.MsgCloseBid).Reason) +} diff --git a/service.go b/service.go index 175ed211..c0a3a1aa 100644 --- a/service.go +++ b/service.go @@ -105,6 +105,7 @@ func NewService(ctx context.Context, manifestConfig := manifest.ServiceConfig{ HTTPServicesRequireAtLeastOneHost: !cfg.DeploymentIngressStaticHosts, ManifestTimeout: cfg.ManifestTimeout, + BroadcastTimeout: cfg.BroadcastTimeout, RPCQueryTimeout: cfg.RPCQueryTimeout, CachedResultMaxAge: cfg.CachedResultMaxAge, }