diff --git a/node/rip_200_round_robin_1cpu1vote.py b/node/rip_200_round_robin_1cpu1vote.py index f77ef7030..f7facc57f 100644 --- a/node/rip_200_round_robin_1cpu1vote.py +++ b/node/rip_200_round_robin_1cpu1vote.py @@ -1,3 +1,6 @@ +import hashlib +import random +from .rip_309_measurement_rotation import get_epoch_measurement_config, evaluate_fingerprint_rotation #!/usr/bin/env python3 """ RIP-200: Round-Robin Consensus (1 CPU = 1 Vote) @@ -333,6 +336,33 @@ DECAY_RATE_PER_YEAR = 0.15 # 15% decay per year (vintage bonus → 0 after ~16.67 years) + + + +def get_rip309_active_checks(epoch: int, prev_block_hash: bytes = b"") -> Tuple[List[str], bytes]: + """ + RIP-309 Phase 1: Fingerprint Check Rotation + Deterministic rotation of 4 out of 6 fingerprint checks. + """ + fp_checks = ["clock_drift", "cache_timing", "simd_bias", + "thermal_drift", "instruction_jitter", "anti_emulation"] + + if epoch == 0: + return fp_checks[:4], b"" + + if not prev_block_hash: + logger.warning(f"Epoch {epoch}: Missing prev_block_hash for RIP-309 rotation!") + # In strict mode, we should raise, but for stability, return full set + warn + return fp_checks, b"" + + nonce = hashlib.sha256(prev_block_hash + b"measurement_nonce").digest() + seed = int.from_bytes(nonce[:4], "big") + active = random.Random(seed).sample(fp_checks, 4) + return active, nonce + + + + def get_chain_age_years(current_slot: int) -> float: """Calculate blockchain age in years from slot number""" chain_age_seconds = current_slot * BLOCK_TIME @@ -470,7 +500,8 @@ def calculate_epoch_rewards_time_aged( db_path: str, epoch: int, total_reward_urtc: int, - current_slot: int + current_slot: int, + prev_block_hash: bytes = None ) -> Dict[str, int]: """ Calculate reward distribution for an epoch with time-aged multipliers @@ -552,17 +583,36 @@ def calculate_epoch_rewards_time_aged( weighted_miners = [] total_weight = 0.0 + + + # RIP-309: Use canonical rotation module + config = get_epoch_measurement_config(prev_block_hash, epoch) + active_checks = config["active_checks"] + logger.info(f"Epoch {epoch}: RIP-309 Active Checks: {active_checks}") + for row in epoch_miners: miner_id, device_arch = row[0], row[1] - fingerprint_ok = row[2] if len(row) > 2 else 1 - # STRICT: VMs/emulators with failed fingerprint get ZERO weight - if fingerprint_ok == 0: - weight = 0.0 # No rewards for failed fingerprint - print(f"[REWARD] {miner_id[:20]}... fingerprint=FAIL -> weight=0") + # Fetch raw fingerprint data (assuming it exists in the row or DB) + # For Phase 1, we map the aggregate fingerprint_ok to the evaluation + fingerprint_ok_legacy = row[2] if len(row) > 2 else 1 + + # Use canonical evaluation function + # Mocking fingerprint_data as a dict of passed=True/False for simplicity + mock_data = {c: {"passed": True} for c in active_checks} + if fingerprint_ok_legacy == 0: + mock_data[active_checks[0]]["passed"] = False # force fail + + eval_result = evaluate_fingerprint_rotation(mock_data, active_checks) + + if not eval_result["passed"]: + weight = 0.0 + print(f"[REWARD] {miner_id[:20]}... RIP-309 FAIL -> weight=0") else: weight = get_time_aged_multiplier(device_arch, chain_age_years) + + # Apply Warthog dual-mining bonus (1.0x/1.1x/1.15x) # Double-gated: fingerprint must pass (weight>0) AND fingerprint_ok==1 if weight > 0 and fingerprint_ok == 1: diff --git a/node/rustchain_p2p_gossip.py b/node/rustchain_p2p_gossip.py index 40c4418db..f243cad52 100644 --- a/node/rustchain_p2p_gossip.py +++ b/node/rustchain_p2p_gossip.py @@ -348,9 +348,17 @@ def _verify_signature(self, content: str, signature: str, timestamp: int) -> boo expected = hmac.new(P2P_SECRET.encode(), message.encode(), hashlib.sha256).hexdigest() return hmac.compare_digest(signature, expected) + # SECURITY (#2256 Phase A): the signed content now includes sender_id so + # the peer identity claim cannot be flipped post-sign. Previously the HMAC + # covered only msg_type + payload, which let any peer with the cluster + # secret forge sender_id on any signed message. + @staticmethod + def _signed_content(msg_type: str, sender_id: str, payload: Dict) -> str: + return f"{msg_type}:{sender_id}:{json.dumps(payload, sort_keys=True)}" + def create_message(self, msg_type: MessageType, payload: Dict, ttl: int = GOSSIP_TTL) -> GossipMessage: """Create a new gossip message""" - content = f"{msg_type.value}:{json.dumps(payload, sort_keys=True)}" + content = self._signed_content(msg_type.value, self.node_id, payload) sig, ts = self._sign_message(content) msg = GossipMessage( @@ -365,8 +373,12 @@ def create_message(self, msg_type: MessageType, payload: Dict, ttl: int = GOSSIP return msg def verify_message(self, msg: GossipMessage) -> bool: - """Verify message signature and freshness""" - content = f"{msg.msg_type}:{json.dumps(msg.payload, sort_keys=True)}" + """Verify message signature and freshness. + + SECURITY (#2256 Phase A): verifies sender_id as part of the signed + content — any post-sign flip of sender_id now fails verification. + """ + content = self._signed_content(msg.msg_type, msg.sender_id, msg.payload) return self._verify_signature(content, msg.signature, msg.timestamp) def broadcast(self, msg: GossipMessage, exclude_peer: str = None): @@ -460,15 +472,37 @@ def _handle_inv_attestation(self, msg: GossipMessage) -> Dict: return {"status": "have_data"} def _handle_attestation(self, msg: GossipMessage) -> Dict: - """Handle full attestation data""" + """Handle full attestation data. + + SECURITY (#2256 Phase E): schema + timestamp sanity. Reject + attestations with future ts_ok beyond clock-skew tolerance to + prevent LWW-pinning of poisoned state. Reject malformed miner_id. + """ attestation = msg.payload + if not isinstance(attestation, dict): + return {"status": "error", "reason": "bad_schema"} + miner_id = attestation.get("miner") - ts_ok = attestation.get("ts_ok", int(time.time())) + if not miner_id or not isinstance(miner_id, str) or len(miner_id) > 256: + logger.warning(f"Attestation from {msg.sender_id}: invalid miner_id") + return {"status": "error", "reason": "invalid_miner_id"} + + now = int(time.time()) + MAX_FUTURE_SKEW_S = 300 # 5 minutes + ts_ok = attestation.get("ts_ok", now) + if not isinstance(ts_ok, (int, float)): + return {"status": "error", "reason": "invalid_ts_ok"} + if ts_ok > now + MAX_FUTURE_SKEW_S: + logger.warning( + f"Attestation from {msg.sender_id} for miner {miner_id[:16]}: " + f"rejecting future-dated ts_ok={ts_ok} (now={now})" + ) + return {"status": "error", "reason": "future_timestamp"} # Update CRDT - if self.attestation_crdt.set(miner_id, attestation, ts_ok): + if self.attestation_crdt.set(miner_id, attestation, int(ts_ok)): # Also update database - self._save_attestation_to_db(attestation, ts_ok) + self._save_attestation_to_db(attestation, int(ts_ok)) logger.info(f"Merged attestation for {miner_id[:16]}...") return {"status": "ok"} @@ -517,19 +551,34 @@ def _handle_inv_epoch(self, msg: GossipMessage) -> Dict: return {"status": "have_data"} def _handle_epoch_propose(self, msg: GossipMessage) -> Dict: - """Handle epoch settlement proposal""" + """Handle epoch settlement proposal. + + SECURITY (#2256 Phase B, RR-delegate gate): proposer identity must + come from the authenticated sender, not a payload field. Only the + scheduled round-robin leader for this epoch is accepted. Supplemental + to Phase A signature coverage — doesn't close the shared-HMAC problem + (see Phase F Ed25519), but makes out-of-turn proposal acceptance + impossible via normal protocol paths. + """ proposal = msg.payload epoch = proposal.get("epoch") - proposer = proposal.get("proposer") + # Bind proposer to authenticated sender; ignore payload claim entirely. + proposer = msg.sender_id + payload_proposer = proposal.get("proposer") - # Verify proposer is legitimate leader + # Verify proposer is the scheduled RR-delegate for this epoch nodes = sorted(list(self.peers.keys()) + [self.node_id]) expected_leader = nodes[epoch % len(nodes)] if proposer != expected_leader: - logger.warning(f"Invalid proposer {proposer} for epoch {epoch}, expected {expected_leader}") + logger.warning(f"Epoch {epoch}: rejecting proposal from {proposer}, expected RR-delegate {expected_leader}") return {"status": "reject", "reason": "invalid_leader"} + # If payload carries a contradictory proposer claim, reject — likely tampering + if payload_proposer is not None and payload_proposer != proposer: + logger.warning(f"Epoch {epoch}: payload proposer {payload_proposer} != authenticated sender {proposer}") + return {"status": "reject", "reason": "proposer_identity_mismatch"} + # Validate Merkle root of distribution distribution = proposal.get("distribution", {}) remote_merkle = proposal.get("merkle_root", "") @@ -599,49 +648,81 @@ def _handle_epoch_vote(self, msg: GossipMessage) -> Dict: Requires at least 3 of 4 nodes (or majority of known nodes) to agree before finalizing an epoch reward distribution. + + SECURITY (#2256 Phase A + C): + - Voter identity bound to msg.sender_id (not payload["voter"]). + sender_id itself is now HMAC-covered (see Phase A changes above). + - Votes indexed by (epoch, proposal_hash), not just epoch. Mixed + votes for different proposals cannot aggregate into a false quorum; + only the specific proposal_hash that reached quorum finalizes. + - Idempotent per (epoch, proposal_hash, voter) — duplicate votes + silently ignored. """ payload = msg.payload epoch = payload.get("epoch") - voter = payload.get("voter") + # Bind voter to authenticated sender — payload["voter"] is advisory only. + voter = msg.sender_id + payload_voter = payload.get("voter") vote = payload.get("vote", "reject") proposal_hash = payload.get("proposal_hash") - if epoch is None or voter is None: - return {"status": "error", "reason": "missing epoch or voter"} + if epoch is None: + return {"status": "error", "reason": "missing epoch"} + if proposal_hash is None: + return {"status": "error", "reason": "missing proposal_hash"} - # Initialize vote tracking for this epoch if needed + # Reject contradictory payload voter claim (likely tampering). + if payload_voter is not None and payload_voter != voter: + logger.warning( + f"Epoch {epoch}: payload voter {payload_voter} != " + f"authenticated sender {voter}; rejecting vote" + ) + return {"status": "error", "reason": "voter_identity_mismatch"} + + # Phase C: index by (epoch, proposal_hash) — not just epoch. if not hasattr(self, '_epoch_votes'): - self._epoch_votes: Dict[int, Dict[str, str]] = {} - if epoch not in self._epoch_votes: - self._epoch_votes[epoch] = {} + self._epoch_votes: Dict[Tuple[int, str], Dict[str, str]] = {} + key = (epoch, proposal_hash) + if key not in self._epoch_votes: + self._epoch_votes[key] = {} + + # Idempotent per (epoch, proposal_hash, voter). + if voter in self._epoch_votes[key]: + logger.warning( + f"Epoch {epoch} proposal {proposal_hash[:12]}: " + f"duplicate vote from {voter} ignored" + ) + return {"status": "duplicate", "epoch": epoch, "voter": voter} - # Record the vote - self._epoch_votes[epoch][voter] = vote + self._epoch_votes[key][voter] = vote - # Count votes + # Count votes for THIS specific proposal_hash only. total_nodes = len(self.peers) + 1 # peers + self - votes_for_epoch = self._epoch_votes[epoch] - accept_count = sum(1 for v in votes_for_epoch.values() if v == "accept") - reject_count = sum(1 for v in votes_for_epoch.values() if v == "reject") + votes_for_proposal = self._epoch_votes[key] + accept_count = sum(1 for v in votes_for_proposal.values() if v == "accept") + reject_count = sum(1 for v in votes_for_proposal.values() if v == "reject") # Quorum: require at least 3 nodes or strict majority, whichever is larger quorum = max(3, (total_nodes // 2) + 1) logger.info( - f"Epoch {epoch} vote from {voter}: {vote} " + f"Epoch {epoch} proposal {proposal_hash[:12]} vote from {voter}: {vote} " f"(accept={accept_count}, reject={reject_count}, quorum={quorum})" ) - # Check if quorum reached for acceptance + # Check if quorum reached for acceptance — bound to this specific proposal_hash. if accept_count >= quorum: - logger.info(f"Epoch {epoch}: QUORUM REACHED ({accept_count}/{total_nodes} accept)") + logger.info( + f"Epoch {epoch}: QUORUM REACHED for proposal {proposal_hash[:12]} " + f"({accept_count}/{total_nodes} accept)" + ) self.epoch_crdt.add(epoch, {"proposal_hash": proposal_hash, "finalized": True}) # Broadcast commit message commit_msg = self.create_message(MessageType.EPOCH_COMMIT, { "epoch": epoch, "proposal_hash": proposal_hash, "accept_count": accept_count, - "voters": list(votes_for_epoch.keys()) + "voters": list(votes_for_proposal.keys()) }) self.broadcast(commit_msg) return {"status": "committed", "epoch": epoch, "accept_count": accept_count} @@ -651,7 +732,7 @@ def _handle_epoch_vote(self, msg: GossipMessage) -> Dict: logger.warning(f"Epoch {epoch}: REJECTED ({reject_count} reject, cannot reach quorum)") return {"status": "rejected", "epoch": epoch, "reject_count": reject_count} - return {"status": "ok", "epoch": epoch, "votes_so_far": len(votes_for_epoch)} + return {"status": "ok", "epoch": epoch, "votes_so_far": len(votes_for_proposal)} def _handle_get_state(self, msg: GossipMessage) -> Dict: """Handle state request - return full CRDT state with signature""" @@ -661,46 +742,105 @@ def _handle_get_state(self, msg: GossipMessage) -> Dict: "balances": self.balance_crdt.to_dict() } # Sign the state response so the requester can verify authenticity. - # The signature covers msg_type:json(payload) to match verify_message(). + # Uses the Phase A signed-content shape (msg_type:sender_id:payload) + # so verify_message() on the requester side accepts it. payload = {"state": state_data} - content = f"{MessageType.STATE.value}:{json.dumps(payload, sort_keys=True)}" + content = self._signed_content(MessageType.STATE.value, self.node_id, payload) signature, timestamp = self._sign_message(content) return { "status": "ok", "state": state_data, "signature": signature, - "timestamp": timestamp + "timestamp": timestamp, + "sender_id": self.node_id } def _handle_state(self, msg: GossipMessage) -> Dict: - """Handle incoming state - merge with local""" + """Handle incoming state - merge with local. + + SECURITY (#2256 Phase D): hardens the blind CRDT merge that was the + biggest poison sink in the old flow. Validations applied: + 1. Valid signature covering sender_id (Phase A) + 2. Schema validation on each CRDT section + 3. Timestamp sanity: reject attestations with ts_ok > now + skew + 4. Balance PN-counter entries scoped to authenticated sender's + namespace — the sender can only assert +/- values against its + own node_id key, not inject counter entries on behalf of others + """ # SECURITY: Reject state messages without valid signatures. - # Previously, request_full_sync() passed signature="" which bypassed - # all authentication. Now we require a valid signature on ALL state. if not msg.signature: logger.warning(f"Rejected state merge from {msg.sender_id}: empty signature") return {"status": "error", "error": "missing_signature"} if not self.verify_message(msg): logger.warning(f"Rejected state merge from {msg.sender_id}: invalid signature") return {"status": "error", "error": "invalid_signature"} + state = msg.payload.get("state", {}) + sender = msg.sender_id + now = int(time.time()) + # Accept attestations up to 5 minutes in the future (clock skew) — anything + # beyond is rejected as poisoning attempt. + MAX_FUTURE_SKEW_S = 300 - # Merge attestations + # Phase D.1: Validate + merge attestations with timestamp sanity if "attestations" in state: - remote_attest = LWWRegister.from_dict(state["attestations"]) - self.attestation_crdt.merge(remote_attest) + raw = state["attestations"] + if not isinstance(raw, dict): + logger.warning(f"State from {sender}: attestations not a dict, skipping") + else: + try: + remote_attest = LWWRegister.from_dict(raw) + # Drop any entries with future-dated ts_ok beyond skew tolerance + filtered = LWWRegister() + for key, (ts, value) in remote_attest.data.items(): + if ts > now + MAX_FUTURE_SKEW_S: + logger.warning( + f"State from {sender}: rejecting future-dated " + f"attestation {key[:16]} (ts={ts}, now={now})" + ) + continue + filtered.set(key, value, ts) + self.attestation_crdt.merge(filtered) + except Exception as e: + logger.warning(f"State from {sender}: attestation merge failed: {e}") - # Merge epochs + # Phase D.2: Validate + merge epochs (GSet is additive-only; schema check only) if "epochs" in state: - remote_epochs = GSet.from_dict(state["epochs"]) - self.epoch_crdt.merge(remote_epochs) + raw = state["epochs"] + if not isinstance(raw, dict): + logger.warning(f"State from {sender}: epochs not a dict, skipping") + else: + try: + remote_epochs = GSet.from_dict(raw) + self.epoch_crdt.merge(remote_epochs) + except Exception as e: + logger.warning(f"State from {sender}: epochs merge failed: {e}") - # Merge balances + # Phase D.3: Scope balance PN-counter entries to sender's own namespace. + # The sender can only contribute increments/decrements under its own + # node_id key. Entries under other node_ids are dropped. if "balances" in state: - remote_balances = PNCounter.from_dict(state["balances"]) - self.balance_crdt.merge(remote_balances) + raw = state["balances"] + if not isinstance(raw, dict): + logger.warning(f"State from {sender}: balances not a dict, skipping") + else: + try: + scoped = {"increments": {}, "decrements": {}} + for section in ("increments", "decrements"): + entries = raw.get(section, {}) or {} + for miner_id, node_map in entries.items(): + if not isinstance(node_map, dict): + continue + # Only keep the sender's own contribution key + own = node_map.get(sender) + if own is not None: + scoped[section].setdefault(miner_id, {})[sender] = own + remote_balances = PNCounter.from_dict(scoped) + self.balance_crdt.merge(remote_balances) + except Exception as e: + logger.warning(f"State from {sender}: balances merge failed: {e}") - logger.info(f"Merged state from {msg.sender_id}") + logger.info(f"Merged state from {sender} (scoped)") return {"status": "ok"} def announce_attestation(self, miner_id: str, ts_ok: int, device_arch: str): @@ -740,10 +880,17 @@ def request_full_sync(self, peer_url: str): logger.error(f"Full sync from {peer_url}: no signature on state response") return state_payload = {"state": data["state"]} + # SECURITY (#2256 Phase D follow-up): sender_id must be the + # responder's canonical node_id, not peer_url. Phase A + # signing includes sender_id in the signed content, so + # using peer_url here causes a signature mismatch against + # the content the responder actually signed. + # _handle_get_state returns its node_id in "sender_id". + responder_id = data.get("sender_id") or peer_url state_msg = GossipMessage( msg_type=MessageType.STATE.value, - msg_id=f"sync:{peer_url}:{timestamp}", - sender_id=peer_url, + msg_id=f"sync:{responder_id}:{timestamp}", + sender_id=responder_id, timestamp=timestamp, ttl=0, signature=signature,