Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 67 additions & 58 deletions verifier/pkg/token/cctp/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"golang.org/x/sync/errgroup"

"github.com/smartcontractkit/chainlink-ccv/verifier/pkg/commit"
verifier "github.com/smartcontractkit/chainlink-ccv/verifier/pkg/vtypes"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
Expand Down Expand Up @@ -54,71 +56,78 @@ func NewVerifierWithConfig(
}
}

const workerPoolSize = 10

func (v *Verifier) VerifyMessages(
ctx context.Context,
tasks []verifier.VerificationTask,
) []verifier.VerificationResult {
results := make([]verifier.VerificationResult, 0, len(tasks))

// TODO: `attestationService.Fetch` is an IO-bound operation and can be parallelized. Large number of tasks
// may lead to performance bottlenecks. Consider using a worker pool or goroutines with a semaphore to limit
// concurrency.
for _, task := range tasks {
lggr := logger.With(v.lggr, "messageID", task.MessageID, "txHash", task.TxHash)
lggr.Infow("Verifying CCTP task")

// 1. Fetch attestation
attestation, err := v.attestationService.Fetch(ctx, task.TxHash, task.Message)
if err != nil {
lggr.Warnw("Failed to fetch attestation", "err", err)
verificationError := v.errorRetry(err, task)
results = append(results, verifier.VerificationResult{Error: &verificationError})
continue
}

if !attestation.IsReady() {
lggr.Debugw("Attestation not ready for message")
verificationError := v.attestationErrorRetry(
fmt.Errorf("attestation not ready for message ID: %s", task.MessageID),
task,
results := make([]verifier.VerificationResult, len(tasks))

var g errgroup.Group
g.SetLimit(workerPoolSize)

for i, task := range tasks {
g.Go(func() error {
lggr := logger.With(v.lggr, "messageID", task.MessageID, "txHash", task.TxHash)
lggr.Infow("Verifying CCTP task")

// 1. Fetch attestation
attestation, err := v.attestationService.Fetch(ctx, task.TxHash, task.Message)
if err != nil {
lggr.Warnw("Failed to fetch attestation", "err", err)
verificationError := v.errorRetry(err, task)
results[i] = verifier.VerificationResult{Error: &verificationError}
return nil
}

if !attestation.IsReady() {
lggr.Debugw("Attestation not ready for message")
verificationError := v.attestationErrorRetry(
fmt.Errorf("attestation not ready for message ID: %s", task.MessageID),
task,
)
results[i] = verifier.VerificationResult{Error: &verificationError}
return nil
}

verifierFormat, err := attestation.ToVerifierFormat()
if err != nil {
lggr.Errorw("Failed to decode attestation data", "err", err)
verificationError := v.errorRetry(err, task)
results[i] = verifier.VerificationResult{Error: &verificationError}
return nil
}

lggr.Infow("Attestation fetched and decoded successfully",
"status", attestation.status,
"attestation", attestation.attestation,
"encodedCCTPMessage", attestation.encodedCCTPMessage,
"verifierFormat", verifierFormat,
)
results = append(results, verifier.VerificationResult{Error: &verificationError})
continue
}

verifierFormat, err := attestation.ToVerifierFormat()
if err != nil {
lggr.Errorw("Failed to decode attestation data", "err", err)
verificationError := v.errorRetry(err, task)
results = append(results, verifier.VerificationResult{Error: &verificationError})
continue
}

lggr.Infow("Attestation fetched and decoded successfully",
"status", attestation.status,
"attestation", attestation.attestation,
"encodedCCTPMessage", attestation.encodedCCTPMessage,
"verifierFormat", verifierFormat,
)

// 2. Create VerifierNodeResult
result, err := commit.CreateVerifierNodeResult(
&task,
verifierFormat,
attestation.verifierVersion,
)
if err != nil {
lggr.Errorw("CreateVerifierNodeResult: Failed to create VerifierNodeResult", "err", err)
verificationError := v.errorRetry(err, task)
results = append(results, verifier.VerificationResult{Error: &verificationError})
continue
}

// 3. Return successful result
lggr.Infow("VerifierResults: Successfully verified message", "signature", result.Signature)
results = append(results, verifier.VerificationResult{Result: result})

// 2. Create VerifierNodeResult
result, err := commit.CreateVerifierNodeResult(
&task,
verifierFormat,
attestation.verifierVersion,
)
if err != nil {
lggr.Errorw("CreateVerifierNodeResult: Failed to create VerifierNodeResult", "err", err)
verificationError := v.errorRetry(err, task)
results[i] = verifier.VerificationResult{Error: &verificationError}
return nil
}

// 3. Return successful result
lggr.Infow("VerifierResults: Successfully verified message", "signature", result.Signature)
results[i] = verifier.VerificationResult{Result: result}
return nil
})
}

_ = g.Wait()

return results
}

Expand Down
Loading