diff --git a/node/rustchain_tx_handler.py b/node/rustchain_tx_handler.py index f4da4b78f..77a026a95 100644 --- a/node/rustchain_tx_handler.py +++ b/node/rustchain_tx_handler.py @@ -84,6 +84,7 @@ -- Index for faster queries CREATE INDEX IF NOT EXISTS idx_pending_from ON pending_transactions(from_addr); CREATE INDEX IF NOT EXISTS idx_pending_nonce ON pending_transactions(from_addr, nonce); +CREATE INDEX IF NOT EXISTS idx_pending_admission ON pending_transactions(created_at, tx_hash); CREATE INDEX IF NOT EXISTS idx_history_from ON transaction_history(from_addr); CREATE INDEX IF NOT EXISTS idx_history_to ON transaction_history(to_addr); CREATE INDEX IF NOT EXISTS idx_history_block ON transaction_history(block_height); @@ -170,6 +171,7 @@ def _ensure_schema(self): if "already exists" not in str(e): logger.warning(f"Schema statement failed: {e}") + self._ensure_pending_created_at(cursor) conn.commit() def _recover_interrupted_balances_migration(self, cursor) -> None: @@ -193,6 +195,29 @@ def _recover_interrupted_balances_migration(self, cursor) -> None: cursor.execute("ALTER TABLE balances_old RENAME TO balances") logger.warning("Recovered interrupted balances migration from balances_old") + def _ensure_pending_created_at(self, cursor) -> None: + """Backfill admission-time ordering support for older pending tables.""" + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='pending_transactions'" + ) + if not cursor.fetchone(): + return + cursor.execute("PRAGMA table_info(pending_transactions)") + columns = [col[1] for col in cursor.fetchall()] + if "created_at" not in columns: + cursor.execute( + "ALTER TABLE pending_transactions ADD COLUMN created_at INTEGER NOT NULL DEFAULT 0" + ) + if "timestamp" in columns: + cursor.execute( + "UPDATE pending_transactions SET created_at = timestamp WHERE created_at = 0" + ) + logger.info("Added created_at column to pending_transactions table") + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_pending_admission " + "ON pending_transactions(created_at, tx_hash)" + ) + @contextmanager def _get_connection(self): """Get database connection with proper locking""" @@ -490,13 +515,13 @@ def submit_transaction(self, tx: SignedTransaction) -> Tuple[bool, str]: return False, f"Transaction already exists: {e}" def get_pending_transactions(self, limit: int = 100) -> List[SignedTransaction]: - """Get pending transactions ordered by nonce""" + """Get pending transactions in deterministic admission order.""" with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """SELECT * FROM pending_transactions WHERE status = 'pending' - ORDER BY nonce ASC + ORDER BY created_at ASC, rowid ASC LIMIT ?""", (limit,) ) diff --git a/tests/test_tx_handler_pending_order.py b/tests/test_tx_handler_pending_order.py new file mode 100644 index 000000000..fa085b98d --- /dev/null +++ b/tests/test_tx_handler_pending_order.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: MIT +"""Regression tests for pending transaction ordering.""" + +import importlib +import os +import sqlite3 +import sys +import types + +NODE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "node")) +if NODE_DIR not in sys.path: + sys.path.insert(0, NODE_DIR) + +mock = types.ModuleType("rustchain_crypto") + + +class FakeSignedTransaction: + def __init__( + self, + from_addr, + to_addr, + amount_urtc, + nonce, + timestamp, + memo, + signature, + public_key, + tx_hash, + ): + self.from_addr = from_addr + self.to_addr = to_addr + self.amount_urtc = amount_urtc + self.nonce = nonce + self.timestamp = timestamp + self.memo = memo + self.signature = signature + self.public_key = public_key + self.tx_hash = tx_hash + + def verify(self): + return True + + +mock.SignedTransaction = FakeSignedTransaction # type: ignore[attr-defined] +mock.Ed25519Signer = object # type: ignore[attr-defined] +mock.blake2b256_hex = lambda data: "00" * 32 # type: ignore[attr-defined] +mock.address_from_public_key = lambda data: data.hex() # type: ignore[attr-defined] +previous_crypto = sys.modules.get("rustchain_crypto") +sys.modules["rustchain_crypto"] = mock + +rustchain_tx_handler = importlib.import_module("rustchain_tx_handler") +TransactionPool = rustchain_tx_handler.TransactionPool + +if previous_crypto is None: + sys.modules.pop("rustchain_crypto", None) +else: + sys.modules["rustchain_crypto"] = previous_crypto + + +def _insert_pending(conn, tx_hash, from_addr, nonce, created_at): + conn.execute( + """INSERT INTO pending_transactions + (tx_hash, from_addr, to_addr, amount_urtc, nonce, timestamp, + memo, signature, public_key, created_at, status) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending')""", + ( + tx_hash, + from_addr, + "receiver", + 1, + nonce, + created_at, + "", + "sig", + "00", + created_at, + ), + ) + + +def test_pending_transactions_use_fifo_order_across_wallets(tmp_path): + pool = TransactionPool(str(tmp_path / "tx.db")) + + with sqlite3.connect(pool.db_path) as conn: + _insert_pending(conn, "b" * 64, "wallet-a", 9, 100) + _insert_pending(conn, "a" * 64, "wallet-b", 1, 200) + + pending = pool.get_pending_transactions() + + assert [tx.tx_hash for tx in pending] == ["b" * 64, "a" * 64] + + +def test_pending_transactions_preserve_insert_order_for_same_admission_time(tmp_path): + pool = TransactionPool(str(tmp_path / "tx.db")) + + with sqlite3.connect(pool.db_path) as conn: + _insert_pending(conn, "d" * 64, "wallet-a", 1, 100) + _insert_pending(conn, "c" * 64, "wallet-b", 1, 100) + + pending = pool.get_pending_transactions() + + assert [tx.tx_hash for tx in pending] == ["d" * 64, "c" * 64] + + +def test_submit_transaction_preserves_fifo_when_clock_collides(tmp_path, monkeypatch): + pool = TransactionPool(str(tmp_path / "tx.db")) + monkeypatch.setattr(rustchain_tx_handler, "address_from_public_key", lambda data: data.hex()) + monkeypatch.setattr(rustchain_tx_handler.time, "time", lambda: 100) + + with sqlite3.connect(pool.db_path) as conn: + conn.execute( + "INSERT INTO balances (wallet, balance_urtc, wallet_nonce) VALUES (?, ?, ?)", + ("aa", 10, 0), + ) + + first = FakeSignedTransaction( + from_addr="aa", + to_addr="bb", + amount_urtc=1, + nonce=1, + timestamp=100, + memo="", + signature="sig", + public_key="aa", + tx_hash="z" * 64, + ) + second = FakeSignedTransaction( + from_addr="aa", + to_addr="cc", + amount_urtc=1, + nonce=2, + timestamp=101, + memo="", + signature="sig", + public_key="aa", + tx_hash="a" * 64, + ) + + assert pool.submit_transaction(first) == (True, "z" * 64) + assert pool.submit_transaction(second) == (True, "a" * 64) + + pending = pool.get_pending_transactions() + + assert [tx.tx_hash for tx in pending] == ["z" * 64, "a" * 64] + + +def test_legacy_pending_table_gets_created_at_migration(tmp_path): + db_path = tmp_path / "legacy.db" + with sqlite3.connect(db_path) as conn: + conn.execute( + """CREATE TABLE balances ( + wallet TEXT PRIMARY KEY, + balance_urtc INTEGER NOT NULL DEFAULT 0, + wallet_nonce INTEGER DEFAULT 0 + )""" + ) + conn.execute( + """CREATE TABLE pending_transactions ( + tx_hash TEXT PRIMARY KEY, + from_addr TEXT NOT NULL, + to_addr TEXT NOT NULL, + amount_urtc INTEGER NOT NULL, + nonce INTEGER NOT NULL, + timestamp INTEGER NOT NULL, + memo TEXT DEFAULT '', + signature TEXT NOT NULL, + public_key TEXT NOT NULL, + status TEXT DEFAULT 'pending' + )""" + ) + conn.execute( + """INSERT INTO pending_transactions + (tx_hash, from_addr, to_addr, amount_urtc, nonce, timestamp, + memo, signature, public_key, status) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending')""", + ("b" * 64, "wallet-a", "receiver", 1, 9, 100, "", "sig", "00"), + ) + conn.execute( + """INSERT INTO pending_transactions + (tx_hash, from_addr, to_addr, amount_urtc, nonce, timestamp, + memo, signature, public_key, status) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending')""", + ("a" * 64, "wallet-b", "receiver", 1, 1, 200, "", "sig", "00"), + ) + + pool = TransactionPool(str(db_path)) + + with sqlite3.connect(db_path) as conn: + columns = [row[1] for row in conn.execute("PRAGMA table_info(pending_transactions)")] + assert "created_at" in columns + + pending = pool.get_pending_transactions() + + assert [tx.tx_hash for tx in pending] == ["b" * 64, "a" * 64]