Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions docs/release-notes/release-notes-0.8.2.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,14 @@

## Performance Improvements

- [PR#2183](https://github.com/lightninglabs/taproot-assets/pull/2183)
dramatically improves the performance of MS-SMT proof verification.

- [PR#2184](https://github.com/lightninglabs/taproot-assets/pull/2184)
dramatically improves the performance of universe federation proof push.

- [PR#2183](https://github.com/lightninglabs/taproot-assets/pull/2183)
dramatically improves the performance of MS-SMT proof verification.
- [PR#2194](https://github.com/lightninglabs/taproot-assets/pull/2194)
significantly improves concurrent universe proof ingest on Postgres.

## Deprecations

Expand Down
99 changes: 99 additions & 0 deletions mssmt/tree_prop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package mssmt_test

import (
"context"
"testing"

"github.com/lightninglabs/taproot-assets/mssmt"
"github.com/stretchr/testify/require"
"pgregory.net/rapid"
)

// testInsertLastWriteWins asserts that insertion is last-write-wins per
// key: applying a sequence of inserts that reuses keys produces the
// same tree as inserting only the final leaf observed for each key.
// This is the invariant that allows coalescing consecutive updates to
// the same key into a single insert of the latest value.
func testInsertLastWriteWins(t *rapid.T) {
ctx := context.Background()

// Draw a small pool of keys so the insertion sequence is likely
// to hit the same key several times. Duplicate draws within the
// pool are harmless; they only shrink the effective pool.
numKeys := rapid.IntRange(1, 8).Draw(t, "num_keys")
keys := make([][hashSize]byte, numKeys)
for i := range keys {
keyBytes := rapid.SliceOfN(
rapid.Byte(), hashSize, hashSize,
).Draw(t, "key")
copy(keys[i][:], keyBytes)
}

// Draw the insertion sequence. Sums are bounded well below the
// point where the tree's uint64 sum overflow check could
// trigger.
numInserts := rapid.IntRange(1, 32).Draw(t, "num_inserts")
sequence := make([]treeLeaf, numInserts)
for i := range sequence {
keyIdx := rapid.IntRange(0, numKeys-1).Draw(t, "key_idx")
value := rapid.SliceOfN(rapid.Byte(), 1, 64).Draw(t, "value")
sum := rapid.Uint64Range(0, 1<<32).Draw(t, "sum")

sequence[i] = treeLeaf{
key: keys[keyIdx],
leaf: mssmt.NewLeafNode(value, sum),
}
}

// Apply the full sequence in order.
full := mssmt.NewCompactedTree(mssmt.NewDefaultStore())
for _, item := range sequence {
_, err := full.Insert(ctx, item.key, item.leaf)
require.NoError(t, err)
}

// Reduce the sequence to the final leaf per key, keeping the
// order of each key's first occurrence, and apply only those.
finalLeaves := make(map[[hashSize]byte]*mssmt.LeafNode)
var keyOrder [][hashSize]byte
for _, item := range sequence {
if _, ok := finalLeaves[item.key]; !ok {
keyOrder = append(keyOrder, item.key)
}
finalLeaves[item.key] = item.leaf
}

coalesced := mssmt.NewCompactedTree(mssmt.NewDefaultStore())
for _, key := range keyOrder {
_, err := coalesced.Insert(ctx, key, finalLeaves[key])
require.NoError(t, err)
}

fullRoot, err := full.Root(ctx)
require.NoError(t, err)
coalescedRoot, err := coalesced.Root(ctx)
require.NoError(t, err)

require.True(
t, mssmt.IsEqualNode(fullRoot, coalescedRoot),
"full root %v != coalesced root %v", fullRoot, coalescedRoot,
)

// Each key's final leaf must carry a valid inclusion proof in
// the fully-inserted tree.
for _, key := range keyOrder {
proof, err := full.MerkleProof(ctx, key)
require.NoError(t, err)
require.True(t, mssmt.VerifyMerkleProof(
key, finalLeaves[key], proof, fullRoot,
))
}
}

// TestInsertLastWriteWins runs the last-write-wins insertion property
// against the compacted tree.
func TestInsertLastWriteWins(t *testing.T) {
t.Parallel()

rapid.Check(t, testInsertLastWriteWins)
}
9 changes: 9 additions & 0 deletions tapcfg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,15 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
return nil, fmt.Errorf("create multiverse store: %w", err)
}

// The multiverse trees are derived from the universe roots; repair any
// entries that diverged, for example because the daemon stopped
// between a proof insert committing and its multiverse update being
// written.
err = multiverse.ReconcileMultiverse(context.Background())
if err != nil {
return nil, fmt.Errorf("reconcile multiverse: %w", err)
}

uniStatsDB := tapdb.NewTransactionExecutor(
db, func(tx *sql.Tx) tapdb.UniverseStatsStore {
return db.WithTx(tx)
Expand Down
19 changes: 18 additions & 1 deletion tapdb/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,30 @@ type BaseDB struct {
*sqlc.Queries
}

// TxIsolationOverrider is an optional interface that TxOptions can implement
// to override the default serializable isolation level a transaction is
// started with. The override is only honored on the Postgres backend;
// SQLite's driver only supports its default isolation.
type TxIsolationOverrider interface {
// TxIsolation returns the isolation level to start the transaction
// with.
TxIsolation() sql.IsolationLevel
}

// BeginTx wraps the normal sql specific BeginTx method with the TxOptions
// interface. This interface is then mapped to the concrete sql tx options
// struct.
func (s *BaseDB) BeginTx(ctx context.Context, opts TxOptions) (*sql.Tx, error) {
isolation := sql.LevelSerializable
if o, ok := opts.(TxIsolationOverrider); ok &&
s.Backend() == sqlc.BackendTypePostgres {

isolation = o.TxIsolation()
}

sqlOptions := sql.TxOptions{
ReadOnly: opts.ReadOnly(),
Isolation: sql.LevelSerializable,
Isolation: isolation,
}
return s.DB.BeginTx(ctx, &sqlOptions)
}
Expand Down
122 changes: 86 additions & 36 deletions tapdb/multiverse.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"database/sql"
"errors"
"fmt"
"sync"
"time"

"github.com/btcsuite/btcd/btcec/v2"
Expand Down Expand Up @@ -132,6 +133,17 @@ type MultiverseStore struct {

leafKeysCache *universeLeafPageCache

// rootCoalescer batches all writes to the shared multiverse
// trees, so proof insert transactions never touch rows that are
// contended across universes.
rootCoalescer *multiverseRootCoalescer

// multiverseWriteMu serializes all multiverse writes in this
// process: the coalescer's flushes and the deletion paths. This
// mutual exclusion is what makes it safe for flushes to run
// below serializable isolation.
multiverseWriteMu sync.Mutex

// transferProofDistributor is an event distributor that will be used to
// notify subscribers about new proof leaves that are added to the
// multiverse. This is used to notify the custodian about new incoming
Expand All @@ -149,7 +161,7 @@ func NewMultiverseStore(db BatchedMultiverse,
return nil, fmt.Errorf("parse max proof cache size: %w", err)
}

return &MultiverseStore{
store := &MultiverseStore{
db: db,
cfg: cfg,
syncerCache: newSyncerRootNodeCache(
Expand All @@ -165,7 +177,12 @@ func NewMultiverseStore(db BatchedMultiverse,
cfg.Caches.LeavesPerUniverse,
),
transferProofDistributor: fn.NewEventDistributor[proof.Blob](),
}, nil
}
store.rootCoalescer = newMultiverseRootCoalescer(
db, &store.multiverseWriteMu,
)

return store, nil
}

// namespaceForProof returns the multiverse namespace used for the given proof
Expand Down Expand Up @@ -790,10 +807,8 @@ func (b *MultiverseStore) UpsertProofLeaf(ctx context.Context,
metaReveal *proof.MetaReveal) (*universe.Proof, error) {

var (
writeTx BaseMultiverseOptions
uniProof *universe.Proof
multiverseRoot mssmt.Node
multiverseProof *mssmt.Proof
writeTx BaseMultiverseOptions
uniProof *universe.Proof
)

execTxFunc := func(dbTx BaseMultiverseStore) error {
Expand All @@ -813,26 +828,29 @@ func (b *MultiverseStore) UpsertProofLeaf(ctx context.Context,
return fmt.Errorf("failed universe upsert: %w", err)
}

// Now, attempt to insert the universe root into the main
// multiverse tree.
//
// nolint:lll
multiverseRoot, multiverseProof, err = upsertMultiverseLeafEntry(
ctx, dbTx, id, uniProof.UniverseRoot,
)
if err != nil {
return fmt.Errorf("failed multiverse upsert: %w", err)
}

return nil
}
dbErr := b.db.ExecTx(ctx, &writeTx, execTxFunc)
if dbErr != nil {
return nil, dbErr
}

// Now reflect the universe's new root in the shared multiverse tree,
// through the root coalescer rather than the transaction above: the
// multiverse rows are contended by every insert into every universe,
// so writing them under the insert's own transaction would serialize
// ingest across universes. If this fails, the universe leaf above
// remains committed, and the universe's multiverse entry is healed by
// its next successful update.
multiverseRoot, multiverseProof, err := b.rootCoalescer.updateRoot(
ctx, id, uniProof.UniverseRoot,
)
if err != nil {
return nil, fmt.Errorf("failed multiverse upsert: %w", err)
}

// Populate the multiverse fields in the proof object now that the
// transaction is complete.
// update is complete.
uniProof.MultiverseRoot = multiverseRoot
uniProof.MultiverseInclusionProof = multiverseProof

Expand Down Expand Up @@ -872,9 +890,24 @@ func (b *MultiverseStore) UpsertProofLeafBatch(ctx context.Context,
writeTx BaseMultiverseOptions
uniProofs []*universe.Proof
)
// Track the final universe root per universe, so the shared
// multiverse tree is updated once per universe with its latest root,
// rather than once per item.
var (
finalRoots map[universeIDKey]universeRootUpdate
updateOrder []universeIDKey
)

dbErr := b.db.ExecTx(
ctx, &writeTx, func(store BaseMultiverseStore) error {
uniProofs = make([]*universe.Proof, len(items))

finalRoots = make(
map[universeIDKey]universeRootUpdate,
len(items),
)
updateOrder = nil

for idx := range items {
item := items[idx]

Expand Down Expand Up @@ -902,24 +935,14 @@ func (b *MultiverseStore) UpsertProofLeafBatch(ctx context.Context,
}
uniProofs[idx] = uniProof

// Next we'll, attempt to insert the universe
// root into the main multiverse tree.
//
//nolint:lll
multiRoot, multiProof, err := upsertMultiverseLeafEntry(
ctx, store, item.ID,
uniProof.UniverseRoot,
)
if err != nil {
return fmt.Errorf("failed multiverse "+
"upsert for item %d: %w",
idx, err)
key := item.ID.String()
if _, ok := finalRoots[key]; !ok {
updateOrder = append(updateOrder, key)
}
finalRoots[key] = universeRootUpdate{
id: item.ID,
root: uniProof.UniverseRoot,
}

// Update the proof object with multiverse
// details.
uniProofs[idx].MultiverseRoot = multiRoot
uniProofs[idx].MultiverseInclusionProof = multiProof //nolint:lll
}

return nil
Expand All @@ -929,6 +952,22 @@ func (b *MultiverseStore) UpsertProofLeafBatch(ctx context.Context,
return dbErr
}

// Now reflect each universe's final root in the shared multiverse
// tree, through the root coalescer rather than the transaction above:
// the multiverse rows are contended by every insert into every
// universe, so writing them under the batch's own transaction would
// collide with concurrent inserts. If this fails, the universe leaves
// above remain committed, and each universe's multiverse entry is
// healed by its next successful update.
updates := make([]universeRootUpdate, 0, len(updateOrder))
for _, key := range updateOrder {
updates = append(updates, finalRoots[key])
}
err := b.rootCoalescer.updateRoots(ctx, updates)
if err != nil {
return fmt.Errorf("failed multiverse upsert: %w", err)
}

// TODO(roasbeef): want to write thru but then need db query again?

b.rootNodeCache.wipeCache()
Expand Down Expand Up @@ -978,6 +1017,10 @@ func (b *MultiverseStore) DeleteUniverse(ctx context.Context,

var writeTx BaseUniverseStoreOptions

// Deleting touches the shared multiverse tree, so take the
// multiverse write lock to stay mutually exclusive with the root
// coalescer's flushes.
b.multiverseWriteMu.Lock()
dbErr := b.db.ExecTx(ctx, &writeTx, func(tx BaseMultiverseStore) error {
multiverseNS, err := namespaceForProof(id.ProofType)
if err != nil {
Expand All @@ -996,6 +1039,7 @@ func (b *MultiverseStore) DeleteUniverse(ctx context.Context,

return deleteUniverseTree(ctx, tx, id)
})
b.multiverseWriteMu.Unlock()
if dbErr != nil {
return "", dbErr
}
Expand All @@ -1019,6 +1063,12 @@ func (b *MultiverseStore) DeleteProofLeaf(ctx context.Context,

var writeTx BaseMultiverseOptions

// Deleting touches the shared multiverse tree, so take the
// multiverse write lock to stay mutually exclusive with the root
// coalescer's flushes.
b.multiverseWriteMu.Lock()
defer b.multiverseWriteMu.Unlock()

dbErr := b.db.ExecTx(
ctx, &writeTx, func(tx BaseMultiverseStore) error {
namespace := id.String()
Expand Down Expand Up @@ -1060,7 +1110,7 @@ func (b *MultiverseStore) DeleteProofLeaf(ctx context.Context,

// Otherwise, update the multiverse entry with the
// new universe root.
_, _, err = upsertMultiverseLeafEntry(
err = upsertMultiverseLeafEntry(
ctx, tx, id, newRoot,
)
if err != nil {
Expand Down
Loading
Loading