Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions node/rustchain_tx_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
-- Index for faster queries
CREATE INDEX IF NOT EXISTS idx_pending_from ON pending_transactions(from_addr);
CREATE INDEX IF NOT EXISTS idx_pending_nonce ON pending_transactions(from_addr, nonce);
CREATE INDEX IF NOT EXISTS idx_pending_admission ON pending_transactions(created_at, tx_hash);
CREATE INDEX IF NOT EXISTS idx_history_from ON transaction_history(from_addr);
CREATE INDEX IF NOT EXISTS idx_history_to ON transaction_history(to_addr);
CREATE INDEX IF NOT EXISTS idx_history_block ON transaction_history(block_height);
Expand Down Expand Up @@ -170,6 +171,7 @@ def _ensure_schema(self):
if "already exists" not in str(e):
logger.warning(f"Schema statement failed: {e}")

self._ensure_pending_created_at(cursor)
conn.commit()

def _recover_interrupted_balances_migration(self, cursor) -> None:
Expand All @@ -193,6 +195,29 @@ def _recover_interrupted_balances_migration(self, cursor) -> None:
cursor.execute("ALTER TABLE balances_old RENAME TO balances")
logger.warning("Recovered interrupted balances migration from balances_old")

def _ensure_pending_created_at(self, cursor) -> None:
"""Backfill admission-time ordering support for older pending tables."""
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='pending_transactions'"
)
if not cursor.fetchone():
return
cursor.execute("PRAGMA table_info(pending_transactions)")
columns = [col[1] for col in cursor.fetchall()]
if "created_at" not in columns:
cursor.execute(
"ALTER TABLE pending_transactions ADD COLUMN created_at INTEGER NOT NULL DEFAULT 0"
)
if "timestamp" in columns:
cursor.execute(
"UPDATE pending_transactions SET created_at = timestamp WHERE created_at = 0"
)
logger.info("Added created_at column to pending_transactions table")
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_pending_admission "
"ON pending_transactions(created_at, tx_hash)"
)

@contextmanager
def _get_connection(self):
"""Get database connection with proper locking"""
Expand Down Expand Up @@ -490,13 +515,13 @@ def submit_transaction(self, tx: SignedTransaction) -> Tuple[bool, str]:
return False, f"Transaction already exists: {e}"

def get_pending_transactions(self, limit: int = 100) -> List[SignedTransaction]:
"""Get pending transactions ordered by nonce"""
"""Get pending transactions in deterministic admission order."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""SELECT * FROM pending_transactions
WHERE status = 'pending'
ORDER BY nonce ASC
ORDER BY created_at ASC, rowid ASC
LIMIT ?""",
(limit,)
)
Expand Down
195 changes: 195 additions & 0 deletions tests/test_tx_handler_pending_order.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: MIT
"""Regression tests for pending transaction ordering."""

import importlib
import os
import sqlite3
import sys
import types

NODE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "node"))
if NODE_DIR not in sys.path:
sys.path.insert(0, NODE_DIR)

mock = types.ModuleType("rustchain_crypto")


class FakeSignedTransaction:
def __init__(
self,
from_addr,
to_addr,
amount_urtc,
nonce,
timestamp,
memo,
signature,
public_key,
tx_hash,
):
self.from_addr = from_addr
self.to_addr = to_addr
self.amount_urtc = amount_urtc
self.nonce = nonce
self.timestamp = timestamp
self.memo = memo
self.signature = signature
self.public_key = public_key
self.tx_hash = tx_hash

def verify(self):
return True


mock.SignedTransaction = FakeSignedTransaction # type: ignore[attr-defined]
mock.Ed25519Signer = object # type: ignore[attr-defined]
mock.blake2b256_hex = lambda data: "00" * 32 # type: ignore[attr-defined]
mock.address_from_public_key = lambda data: data.hex() # type: ignore[attr-defined]
previous_crypto = sys.modules.get("rustchain_crypto")
sys.modules["rustchain_crypto"] = mock

rustchain_tx_handler = importlib.import_module("rustchain_tx_handler")
TransactionPool = rustchain_tx_handler.TransactionPool

if previous_crypto is None:
sys.modules.pop("rustchain_crypto", None)
else:
sys.modules["rustchain_crypto"] = previous_crypto


def _insert_pending(conn, tx_hash, from_addr, nonce, created_at):
conn.execute(
"""INSERT INTO pending_transactions
(tx_hash, from_addr, to_addr, amount_urtc, nonce, timestamp,
memo, signature, public_key, created_at, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending')""",
(
tx_hash,
from_addr,
"receiver",
1,
nonce,
created_at,
"",
"sig",
"00",
created_at,
),
)


def test_pending_transactions_use_fifo_order_across_wallets(tmp_path):
pool = TransactionPool(str(tmp_path / "tx.db"))

with sqlite3.connect(pool.db_path) as conn:
_insert_pending(conn, "b" * 64, "wallet-a", 9, 100)
_insert_pending(conn, "a" * 64, "wallet-b", 1, 200)

pending = pool.get_pending_transactions()

assert [tx.tx_hash for tx in pending] == ["b" * 64, "a" * 64]


def test_pending_transactions_preserve_insert_order_for_same_admission_time(tmp_path):
pool = TransactionPool(str(tmp_path / "tx.db"))

with sqlite3.connect(pool.db_path) as conn:
_insert_pending(conn, "d" * 64, "wallet-a", 1, 100)
_insert_pending(conn, "c" * 64, "wallet-b", 1, 100)

pending = pool.get_pending_transactions()

assert [tx.tx_hash for tx in pending] == ["d" * 64, "c" * 64]


def test_submit_transaction_preserves_fifo_when_clock_collides(tmp_path, monkeypatch):
pool = TransactionPool(str(tmp_path / "tx.db"))
monkeypatch.setattr(rustchain_tx_handler, "address_from_public_key", lambda data: data.hex())
monkeypatch.setattr(rustchain_tx_handler.time, "time", lambda: 100)

with sqlite3.connect(pool.db_path) as conn:
conn.execute(
"INSERT INTO balances (wallet, balance_urtc, wallet_nonce) VALUES (?, ?, ?)",
("aa", 10, 0),
)

first = FakeSignedTransaction(
from_addr="aa",
to_addr="bb",
amount_urtc=1,
nonce=1,
timestamp=100,
memo="",
signature="sig",
public_key="aa",
tx_hash="z" * 64,
)
second = FakeSignedTransaction(
from_addr="aa",
to_addr="cc",
amount_urtc=1,
nonce=2,
timestamp=101,
memo="",
signature="sig",
public_key="aa",
tx_hash="a" * 64,
)

assert pool.submit_transaction(first) == (True, "z" * 64)
assert pool.submit_transaction(second) == (True, "a" * 64)

pending = pool.get_pending_transactions()

assert [tx.tx_hash for tx in pending] == ["z" * 64, "a" * 64]


def test_legacy_pending_table_gets_created_at_migration(tmp_path):
db_path = tmp_path / "legacy.db"
with sqlite3.connect(db_path) as conn:
conn.execute(
"""CREATE TABLE balances (
wallet TEXT PRIMARY KEY,
balance_urtc INTEGER NOT NULL DEFAULT 0,
wallet_nonce INTEGER DEFAULT 0
)"""
)
conn.execute(
"""CREATE TABLE pending_transactions (
tx_hash TEXT PRIMARY KEY,
from_addr TEXT NOT NULL,
to_addr TEXT NOT NULL,
amount_urtc INTEGER NOT NULL,
nonce INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
memo TEXT DEFAULT '',
signature TEXT NOT NULL,
public_key TEXT NOT NULL,
status TEXT DEFAULT 'pending'
)"""
)
conn.execute(
"""INSERT INTO pending_transactions
(tx_hash, from_addr, to_addr, amount_urtc, nonce, timestamp,
memo, signature, public_key, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending')""",
("b" * 64, "wallet-a", "receiver", 1, 9, 100, "", "sig", "00"),
)
conn.execute(
"""INSERT INTO pending_transactions
(tx_hash, from_addr, to_addr, amount_urtc, nonce, timestamp,
memo, signature, public_key, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending')""",
("a" * 64, "wallet-b", "receiver", 1, 1, 200, "", "sig", "00"),
)

pool = TransactionPool(str(db_path))

with sqlite3.connect(db_path) as conn:
columns = [row[1] for row in conn.execute("PRAGMA table_info(pending_transactions)")]
assert "created_at" in columns

pending = pool.get_pending_transactions()

assert [tx.tx_hash for tx in pending] == ["b" * 64, "a" * 64]
Loading