Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions pkg/transmitter/dual_contract_transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
stderrors "errors"
"fmt"
"math/big"
"strings"
"sync"

Expand All @@ -19,6 +20,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-evm/pkg/keys"
"github.com/smartcontractkit/chainlink-evm/pkg/logpoller"
"github.com/smartcontractkit/chainlink-evm/pkg/txm"
"github.com/smartcontractkit/chainlink-evm/pkg/txmgr"
)

Expand All @@ -37,6 +39,7 @@ type dualContractTransmitter struct {
lp logpoller.LogPoller
lggr logger.Logger
ks keys.Locker
lifecycleMetrics txm.Metrics
// Options
transmitterOptions *transmitterOps
}
Expand All @@ -58,13 +61,16 @@ func NewOCRDualContractTransmitter(
lp logpoller.LogPoller,
lggr logger.Logger,
ethKeystore keys.Locker,
chainID *big.Int,
opts ...OCRTransmitterOption,
) (*dualContractTransmitter, error) {
transmitted, ok := contractABI.Events["Transmitted"]
if !ok {
return nil, errors.New("invalid ABI, missing transmitted")
}

lifecycleMetrics := txm.NewTxmMetrics(lggr, chainID)

newContractTransmitter := &dualContractTransmitter{
contractAddress: address,
contractABI: contractABI,
Expand All @@ -75,6 +81,7 @@ func NewOCRDualContractTransmitter(
contractReader: caller,
lggr: logger.Named(lggr, "OCR2DualContractTransmitter"),
ks: ethKeystore,
lifecycleMetrics: lifecycleMetrics,
transmitterOptions: &transmitterOps{
reportToEvmTxMeta: reportToEvmTxMetaNoop,
excludeSigs: false,
Expand Down Expand Up @@ -126,8 +133,7 @@ func (oc *dualContractTransmitter) Transmit(ctx context.Context, reportCtx ocrty
transactionLifecycleID := generateTransactionLifecycleIDForOCR2(reportCtx.ReportTimestamp)
txMeta.TransactionLifecycleID = &transactionLifecycleID
oc.lggr.Infow("Transmitting report", "configDigest", "0x"+reportCtx.ReportTimestamp.ConfigDigest.Hex(), "epoch", reportCtx.ReportTimestamp.Epoch, "round", reportCtx.ReportTimestamp.Round, "contractAddress",
oc.contractAddress, "txMeta", txMeta, "transactionLifecycleID", transactionLifecycleID)

oc.contractAddress, "txMeta", txMeta, "transactionLifecycleID", transactionLifecycleID)

// Primary transmission
payload, err := oc.contractABI.Pack("transmit", rawReportCtx, []byte(report), rs, ss, vs)
Expand All @@ -138,6 +144,7 @@ func (oc *dualContractTransmitter) Transmit(ctx context.Context, reportCtx ocrty
transactionErr := errors.Wrap(oc.transmitter.CreateEthTransaction(ctx, oc.contractAddress, payload, txMeta), "failed to send primary Eth transaction")
if transactionErr != nil {
oc.lggr.Errorw("Failed to create primary Eth transaction", "error", transactionErr, "transactionLifecycleID", transactionLifecycleID)
oc.lifecycleMetrics.IncrementLifecycleFailure(ctx, txm.StageCreatePrimary)
} else {
oc.lggr.Debugw("Created primary transaction", "transactionLifecycleID", transactionLifecycleID)
}
Expand All @@ -151,6 +158,7 @@ func (oc *dualContractTransmitter) Transmit(ctx context.Context, reportCtx ocrty
err = errors.Wrap(oc.transmitter.CreateSecondaryEthTransaction(ctx, secondaryPayload, txMeta), "failed to send secondary Eth transaction")
if err != nil {
oc.lggr.Errorw("Failed to create secondary Eth transaction", "error", err, "transactionLifecycleID", transactionLifecycleID)
oc.lifecycleMetrics.IncrementLifecycleFailure(ctx, txm.StageCreate)
} else {
oc.lggr.Debugw("Created secondary transaction", "transactionLifecycleID", transactionLifecycleID)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/transmitter/dual_contract_transmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestDualContractTransmitter(t *testing.T) {
reportToEvmTxMeta := func(b []byte) (*txmgr.TxMeta, error) {
return &txmgr.TxMeta{}, nil
}
ot, err := NewOCRDualContractTransmitter(ctx, gethcommon.Address{}, c, contractABI, &mockDualTransmitter{}, lp, lggr, &keystest.FakeChainStore{}, WithReportToEthMetadata(reportToEvmTxMeta))
ot, err := NewOCRDualContractTransmitter(ctx, gethcommon.Address{}, c, contractABI, &mockDualTransmitter{}, lp, lggr, &keystest.FakeChainStore{}, testutils.FixtureChainID, WithReportToEthMetadata(reportToEvmTxMeta))
require.NoError(t, err)
digest, epoch, err := ot.LatestConfigDigestAndEpoch(testutils.Context(t))
require.NoError(t, err)
Expand Down Expand Up @@ -175,6 +175,7 @@ func createDualContractTransmitter(ctx context.Context, t *testing.T, transmitte
lp,
logger.Test(t),
&keystest.FakeChainStore{},
testutils.FixtureChainID,
ops...,
)
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions pkg/transmitter/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func newOnChainDualContractTransmitter(ctx context.Context, lggr logger.Logger,
chain.LogPoller(),
lggr,
ethKeystore,
chain.ID(),
ocrTransmitterOpts...,
)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/txm/clientwrappers/dualbroadcast/meta_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,12 @@ type MetaClient struct {
customURL *url.URL
chainID *big.Int
metrics *MetaMetrics
lifecycleMetrics txm.Metrics
txStore MetaClientTxStore
auctionRequestTimeout time.Duration
}

func NewMetaClient(lggr logger.Logger, c MetaClientRPC, ks MetaClientKeystore, customURL *url.URL, chainID *big.Int, txStore MetaClientTxStore, auctionRequestTimeout *time.Duration) (*MetaClient, error) {
func NewMetaClient(lggr logger.Logger, c MetaClientRPC, ks MetaClientKeystore, customURL *url.URL, chainID *big.Int, txStore MetaClientTxStore, auctionRequestTimeout *time.Duration, lifecycleMetrics txm.Metrics) (*MetaClient, error) {
metrics, err := NewMetaMetrics(chainID.String(), lggr)
if err != nil {
return nil, fmt.Errorf("failed to create Meta metrics: %w", err)
Expand All @@ -164,6 +165,7 @@ func NewMetaClient(lggr logger.Logger, c MetaClientRPC, ks MetaClientKeystore, c
customURL: customURL,
chainID: chainID,
metrics: metrics,
lifecycleMetrics: lifecycleMetrics,
txStore: txStore,
auctionRequestTimeout: t,
}, nil
Expand Down Expand Up @@ -196,6 +198,7 @@ func (a *MetaClient) SendTransaction(ctx context.Context, tx *types.Transaction,
if err != nil {
a.metrics.RecordSendRequestError(ctx)
a.metrics.emitAtlasError(ctx, "send_request", a.customURL, err, tx)
a.lifecycleMetrics.IncrementLifecycleFailure(ctx, txm.StageAuction)
return fmt.Errorf("error sending request for transactionID(%d): %w", tx.ID, errors.Join(err, ErrAuction))
Comment on lines 200 to 202
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new StageAuction lifecycle-failure increment isn’t covered by existing tests for MetaClient behavior. Consider adding a unit test for MetaClient.SendTransaction error paths that asserts the injected lifecycleMetrics is invoked (now that it’s a txm.Metrics interface).

Copilot uses AI. Check for mistakes.
}
// Send Metacall
Expand Down
15 changes: 13 additions & 2 deletions pkg/txm/clientwrappers/dualbroadcast/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,18 @@ import (
"github.com/smartcontractkit/chainlink-evm/pkg/txm/clientwrappers"
)

func SelectClient(lggr logger.Logger, client client.Client, keyStore keys.ChainStore, url *url.URL, chainID *big.Int, txStore txm.TxStore, readRequestsToMultipleNodes bool, bundles *bool, auctionRequestTimeout *time.Duration) (txm.Client, txm.ErrorHandler, error) {
func SelectClient(
lggr logger.Logger,
client client.Client,
keyStore keys.ChainStore,
url *url.URL,
chainID *big.Int,
txStore txm.TxStore,
readRequestsToMultipleNodes bool,
bundles *bool,
auctionRequestTimeout *time.Duration,
lifecycleMetrics txm.Metrics,
) (txm.Client, txm.ErrorHandler, error) {
chainClient, err := clientwrappers.NewChainClient(lggr, client, readRequestsToMultipleNodes)
if err != nil {
return nil, nil, err
Expand All @@ -25,7 +36,7 @@ func SelectClient(lggr logger.Logger, client client.Client, keyStore keys.ChainS
case strings.Contains(urlString, "flashbots"):
return NewFlashbotsClient(lggr, chainClient, keyStore, url, txStore, bundles), nil, nil
default:
mc, err := NewMetaClient(lggr, chainClient, keyStore, url, chainID, txStore, auctionRequestTimeout)
mc, err := NewMetaClient(lggr, chainClient, keyStore, url, chainID, txStore, auctionRequestTimeout, lifecycleMetrics)
if err != nil {
return nil, nil, err
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/txm/integration-tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,10 @@ func setupTestnetTXM(
errorHandler = dualbroadcast.NewErrorHandler()
}

metrics := txm.NewTxmMetrics(lggr, chainID)

// TXM
txm := txm.NewTxm(lggr, chainID, client, ab, store, stuckTxDetector, txmConfig, keystore, errorHandler)
txm := txm.NewTxm(lggr, chainID, client, ab, store, stuckTxDetector, txmConfig, keystore, errorHandler, metrics)
require.NotNil(t, txm)
servicetest.Run(t, txm)
return txm, store, client
Expand Down Expand Up @@ -182,8 +184,10 @@ func setupDevnetTXM(
errorHandler = dualbroadcast.NewErrorHandler()
}

metrics := txm.NewTxmMetrics(lggr, chainID)

// TXM
txm := txm.NewTxm(lggr, chainID, client, ab, store, stuckTxDetector, txmConfig, keystore, errorHandler)
txm := txm.NewTxm(lggr, chainID, client, ab, store, stuckTxDetector, txmConfig, keystore, errorHandler, metrics)
require.NotNil(t, txm)
servicetest.Run(t, txm)

Expand Down
98 changes: 90 additions & 8 deletions pkg/txm/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package txm

import (
"context"
"errors"
"fmt"
"math/big"
"strconv"
Expand All @@ -10,10 +11,12 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/metrics"
"github.com/smartcontractkit/chainlink-evm/pkg/txm/types"
svrv1 "github.com/smartcontractkit/chainlink-protos/svr/v1"
Expand Down Expand Up @@ -46,6 +49,19 @@ var (
}, []string{"chainID", "address"})
)

// Metrics is the metrics contract for the TXMv2 transaction lifecycle.
type Metrics interface {
IncrementLifecycleFailure(context.Context, LifecycleFailureStage)
IncrementNumBroadcastedTxs(context.Context)
IncrementNumConfirmedTxs(context.Context, int)
IncrementNumNonceGaps(context.Context)
ReachedMaxAttempts(context.Context, bool)
RecordTimeUntilTxConfirmed(context.Context, float64)
SetRPCNonce(context.Context, common.Address, uint64)
EmitTxMessage(context.Context, common.Hash, common.Address, *types.Transaction) error
}

// txmMetrics is the default metrics recorder for the TXMv2 transaction lifecycle.
type txmMetrics struct {
metrics.Labeler
chainID *big.Int
Expand All @@ -55,37 +71,51 @@ type txmMetrics struct {
reachedMaxAttempts metric.Int64Gauge
timeUntilTxConfirmed metric.Float64Histogram
rpcNonce metric.Int64Gauge
lifecycleFailure metric.Int64Counter
}

func NewTxmMetrics(chainID *big.Int) (*txmMetrics, error) {
func NewTxmMetrics(lggr logger.Logger, chainID *big.Int) Metrics {
var initErr error
numBroadcastedTxs, err := beholder.GetMeter().Int64Counter("txm_num_broadcasted_transactions")
if err != nil {
return nil, fmt.Errorf("failed to register broadcasted txs number: %w", err)
initErr = errors.Join(initErr, fmt.Errorf("txm_num_broadcasted_transactions: %w", err))
}

numConfirmedTxs, err := beholder.GetMeter().Int64Counter("txm_num_confirmed_transactions")
if err != nil {
return nil, fmt.Errorf("failed to register confirmed txs number: %w", err)
initErr = errors.Join(initErr, fmt.Errorf("txm_num_confirmed_transactions: %w", err))
}

numNonceGaps, err := beholder.GetMeter().Int64Counter("txm_num_nonce_gaps")
if err != nil {
return nil, fmt.Errorf("failed to register nonce gaps number: %w", err)
initErr = errors.Join(initErr, fmt.Errorf("txm_num_nonce_gaps: %w", err))
}

timeUntilTxConfirmed, err := beholder.GetMeter().Float64Histogram("txm_time_until_tx_confirmed")
if err != nil {
return nil, fmt.Errorf("failed to register time until tx confirmed: %w", err)
initErr = errors.Join(initErr, fmt.Errorf("txm_time_until_tx_confirmed: %w", err))
}

reachedMaxAttempts, err := beholder.GetMeter().Int64Gauge("txm_reached_max_attempts")
if err != nil {
return nil, fmt.Errorf("failed to register max attempts indicator: %w", err)
initErr = errors.Join(initErr, fmt.Errorf("txm_reached_max_attempts: %w", err))
}

rpcNonce, err := beholder.GetMeter().Int64Gauge("txm_rpc_nonce")
if err != nil {
return nil, fmt.Errorf("failed to register rpc nonce gauge: %w", err)
initErr = errors.Join(initErr, fmt.Errorf("txm_rpc_nonce: %w", err))
}

lifecycleFailure, err := beholder.GetMeter().Int64Counter("txm_transaction_lifecycle_failure_total")
if err != nil {
initErr = errors.Join(initErr, fmt.Errorf("txm_transaction_lifecycle_failure_total: %w", err))
}

// It is very unlikely that another metric will be initialized correctly if there is even a single failure,
// so it's safer if we use a NOOP metric struct for everything and log an error instead.
if initErr != nil {
lggr.Errorw("Failed to initialize TXM metrics; using noop metrics", "err", initErr)
return NewNoopTxmMetrics()
}

return &txmMetrics{
Expand All @@ -97,7 +127,37 @@ func NewTxmMetrics(chainID *big.Int) (*txmMetrics, error) {
reachedMaxAttempts: reachedMaxAttempts,
timeUntilTxConfirmed: timeUntilTxConfirmed,
rpcNonce: rpcNonce,
}, nil
lifecycleFailure: lifecycleFailure,
}
}

func NewNoopTxmMetrics() Metrics {
return noopTxmMetrics{}
}

// LifecycleFailureStage represents a stage in the transaction lifecycle where a failure can occur.
type LifecycleFailureStage string

const (
StageCreate LifecycleFailureStage = "create"
StageInFlightSubset LifecycleFailureStage = "in_flight_subset"
StageMaxInFlight LifecycleFailureStage = "max_in_flight"
StageBroadcast LifecycleFailureStage = "broadcast"
StageNonceAt LifecycleFailureStage = "nonce_at"

// SVR-specific stages.
StageCreatePrimary LifecycleFailureStage = "create_primary"
StageAuction LifecycleFailureStage = "auction"
)

// IncrementLifecycleFailure increments the lifecycle failure counter for the given stage.
func (m *txmMetrics) IncrementLifecycleFailure(ctx context.Context, stage LifecycleFailureStage) {
m.lifecycleFailure.Add(ctx, 1,
metric.WithAttributes(
attribute.String("chainID", m.chainID.String()),
attribute.String("stage", string(stage)),
),
)
}

func (m *txmMetrics) IncrementNumBroadcastedTxs(ctx context.Context) {
Expand Down Expand Up @@ -173,3 +233,25 @@ func (m *txmMetrics) EmitTxMessage(ctx context.Context, txHash common.Hash, from
"beholder_data_schema", "/beholder-tx-message/versions/2",
)
}

// noopTxmMetrics is a no-op implementation of the Metrics interface.
// It allows the app to run without being blocked in case of metrics initialization errors.
type noopTxmMetrics struct{}

func (noopTxmMetrics) IncrementLifecycleFailure(context.Context, LifecycleFailureStage) {}

func (noopTxmMetrics) IncrementNumBroadcastedTxs(context.Context) {}

func (noopTxmMetrics) IncrementNumConfirmedTxs(context.Context, int) {}

func (noopTxmMetrics) IncrementNumNonceGaps(context.Context) {}

func (noopTxmMetrics) ReachedMaxAttempts(context.Context, bool) {}

func (noopTxmMetrics) RecordTimeUntilTxConfirmed(context.Context, float64) {}

func (noopTxmMetrics) SetRPCNonce(context.Context, common.Address, uint64) {}

func (noopTxmMetrics) EmitTxMessage(context.Context, common.Hash, common.Address, *types.Transaction) error {
return nil
}
Loading
Loading