Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions node/rustchain_tx_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
-- Index for faster queries
CREATE INDEX IF NOT EXISTS idx_pending_from ON pending_transactions(from_addr);
CREATE INDEX IF NOT EXISTS idx_pending_nonce ON pending_transactions(from_addr, nonce);
CREATE INDEX IF NOT EXISTS idx_pending_admission ON pending_transactions(created_at, tx_hash);
CREATE INDEX IF NOT EXISTS idx_history_from ON transaction_history(from_addr);
CREATE INDEX IF NOT EXISTS idx_history_to ON transaction_history(to_addr);
CREATE INDEX IF NOT EXISTS idx_history_block ON transaction_history(block_height);
Expand Down Expand Up @@ -170,6 +171,7 @@ def _ensure_schema(self):
if "already exists" not in str(e):
logger.warning(f"Schema statement failed: {e}")

self._ensure_pending_created_at(cursor)
conn.commit()

def _recover_interrupted_balances_migration(self, cursor) -> None:
Expand All @@ -193,6 +195,29 @@ def _recover_interrupted_balances_migration(self, cursor) -> None:
cursor.execute("ALTER TABLE balances_old RENAME TO balances")
logger.warning("Recovered interrupted balances migration from balances_old")

def _ensure_pending_created_at(self, cursor) -> None:
"""Backfill admission-time ordering support for older pending tables."""
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='pending_transactions'"
)
if not cursor.fetchone():
return
cursor.execute("PRAGMA table_info(pending_transactions)")
columns = [col[1] for col in cursor.fetchall()]
if "created_at" not in columns:
cursor.execute(
"ALTER TABLE pending_transactions ADD COLUMN created_at INTEGER NOT NULL DEFAULT 0"
)
if "timestamp" in columns:
cursor.execute(
"UPDATE pending_transactions SET created_at = timestamp WHERE created_at = 0"
)
logger.info("Added created_at column to pending_transactions table")
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_pending_admission "
"ON pending_transactions(created_at, tx_hash)"
)

@contextmanager
def _get_connection(self):
"""Get database connection with proper locking"""
Expand Down Expand Up @@ -490,13 +515,13 @@ def submit_transaction(self, tx: SignedTransaction) -> Tuple[bool, str]:
return False, f"Transaction already exists: {e}"

def get_pending_transactions(self, limit: int = 100) -> List[SignedTransaction]:
"""Get pending transactions ordered by nonce"""
"""Get pending transactions in deterministic admission order."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""SELECT * FROM pending_transactions
WHERE status = 'pending'
ORDER BY nonce ASC
ORDER BY created_at ASC, tx_hash ASC
LIMIT ?""",
(limit,)
)
Expand Down
149 changes: 149 additions & 0 deletions tests/test_tx_handler_pending_order.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: MIT
"""Regression tests for pending transaction ordering."""

import importlib
import os
import sqlite3
import sys
import types

NODE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "node"))
if NODE_DIR not in sys.path:
sys.path.insert(0, NODE_DIR)

mock = types.ModuleType("rustchain_crypto")


class FakeSignedTransaction:
def __init__(
self,
from_addr,
to_addr,
amount_urtc,
nonce,
timestamp,
memo,
signature,
public_key,
tx_hash,
):
self.from_addr = from_addr
self.to_addr = to_addr
self.amount_urtc = amount_urtc
self.nonce = nonce
self.timestamp = timestamp
self.memo = memo
self.signature = signature
self.public_key = public_key
self.tx_hash = tx_hash


mock.SignedTransaction = FakeSignedTransaction # type: ignore[attr-defined]
mock.Ed25519Signer = object # type: ignore[attr-defined]
mock.blake2b256_hex = lambda data: "00" * 32 # type: ignore[attr-defined]
mock.address_from_public_key = lambda data: "addr-from-pub" # type: ignore[attr-defined]
previous_crypto = sys.modules.get("rustchain_crypto")
sys.modules["rustchain_crypto"] = mock

TransactionPool = importlib.import_module("rustchain_tx_handler").TransactionPool

if previous_crypto is None:
sys.modules.pop("rustchain_crypto", None)
else:
sys.modules["rustchain_crypto"] = previous_crypto


def _insert_pending(conn, tx_hash, from_addr, nonce, created_at):
conn.execute(
"""INSERT INTO pending_transactions
(tx_hash, from_addr, to_addr, amount_urtc, nonce, timestamp,
memo, signature, public_key, created_at, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending')""",
(
tx_hash,
from_addr,
"receiver",
1,
nonce,
created_at,
"",
"sig",
"00",
created_at,
),
)


def test_pending_transactions_use_fifo_order_across_wallets(tmp_path):
pool = TransactionPool(str(tmp_path / "tx.db"))

with sqlite3.connect(pool.db_path) as conn:
_insert_pending(conn, "b" * 64, "wallet-a", 9, 100)
_insert_pending(conn, "a" * 64, "wallet-b", 1, 200)

pending = pool.get_pending_transactions()

assert [tx.tx_hash for tx in pending] == ["b" * 64, "a" * 64]


def test_pending_transactions_use_hash_tiebreaker_for_same_admission_time(tmp_path):
pool = TransactionPool(str(tmp_path / "tx.db"))

with sqlite3.connect(pool.db_path) as conn:
_insert_pending(conn, "d" * 64, "wallet-a", 1, 100)
_insert_pending(conn, "c" * 64, "wallet-b", 1, 100)

pending = pool.get_pending_transactions()

assert [tx.tx_hash for tx in pending] == ["c" * 64, "d" * 64]


def test_legacy_pending_table_gets_created_at_migration(tmp_path):
db_path = tmp_path / "legacy.db"
with sqlite3.connect(db_path) as conn:
conn.execute(
"""CREATE TABLE balances (
wallet TEXT PRIMARY KEY,
balance_urtc INTEGER NOT NULL DEFAULT 0,
wallet_nonce INTEGER DEFAULT 0
)"""
)
conn.execute(
"""CREATE TABLE pending_transactions (
tx_hash TEXT PRIMARY KEY,
from_addr TEXT NOT NULL,
to_addr TEXT NOT NULL,
amount_urtc INTEGER NOT NULL,
nonce INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
memo TEXT DEFAULT '',
signature TEXT NOT NULL,
public_key TEXT NOT NULL,
status TEXT DEFAULT 'pending'
)"""
)
conn.execute(
"""INSERT INTO pending_transactions
(tx_hash, from_addr, to_addr, amount_urtc, nonce, timestamp,
memo, signature, public_key, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending')""",
("b" * 64, "wallet-a", "receiver", 1, 9, 100, "", "sig", "00"),
)
conn.execute(
"""INSERT INTO pending_transactions
(tx_hash, from_addr, to_addr, amount_urtc, nonce, timestamp,
memo, signature, public_key, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending')""",
("a" * 64, "wallet-b", "receiver", 1, 1, 200, "", "sig", "00"),
)

pool = TransactionPool(str(db_path))

with sqlite3.connect(db_path) as conn:
columns = [row[1] for row in conn.execute("PRAGMA table_info(pending_transactions)")]
assert "created_at" in columns

pending = pool.get_pending_transactions()

assert [tx.tx_hash for tx in pending] == ["b" * 64, "a" * 64]
Loading