Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 260 additions & 0 deletions node/data_custody.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
# SPDX-License-Identifier: MIT
"""Deterministic data custody challenges for availability validators."""

import hashlib
import hmac
import json
from dataclasses import dataclass
from typing import Dict, List, Optional


DEFAULT_SAMPLE_COUNT = 16
DEFAULT_SAMPLE_SIZE = 32
MAX_SAMPLE_COUNT = 256
MAX_SAMPLE_SIZE = 4096


def _sha256_hex(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()


def _canonical_json(data: Dict) -> bytes:
return json.dumps(data, sort_keys=True, separators=(",", ":")).encode()


def _derive_seed(
piece_id: str,
piece_size: int,
epoch: int,
validator_id: str,
seed: Optional[str],
) -> bytes:
if seed:
return bytes.fromhex(seed) if _looks_like_hex(seed) else seed.encode()

return _canonical_json({
"epoch": epoch,
"piece_id": piece_id,
"piece_size": piece_size,
"validator_id": validator_id,
})


def _looks_like_hex(value: str) -> bool:
if len(value) % 2:
return False
try:
bytes.fromhex(value)
except ValueError:
return False
return True


def _validate_challenge_params(
piece_id: str,
piece_size: int,
epoch: int,
validator_id: str,
sample_count: int,
sample_size: int,
) -> None:
if not isinstance(piece_id, str) or not piece_id:
raise ValueError("piece_id is required")
if not isinstance(validator_id, str) or not validator_id:
raise ValueError("validator_id is required")
if not isinstance(epoch, int) or isinstance(epoch, bool) or epoch < 0:
raise ValueError("epoch must be a non-negative integer")
if not isinstance(piece_size, int) or isinstance(piece_size, bool) or piece_size <= 0:
raise ValueError("piece_size must be a positive integer")
if not isinstance(sample_count, int) or not 1 <= sample_count <= MAX_SAMPLE_COUNT:
raise ValueError("sample_count out of range")
if not isinstance(sample_size, int) or not 1 <= sample_size <= MAX_SAMPLE_SIZE:
raise ValueError("sample_size out of range")
if sample_size > piece_size:
raise ValueError("sample_size cannot exceed piece_size")


@dataclass(frozen=True)
class CustodyChallenge:
piece_id: str
piece_size: int
epoch: int
validator_id: str
sample_offsets: List[int]
sample_size: int = DEFAULT_SAMPLE_SIZE

@property
def challenge_hash(self) -> str:
return _sha256_hex(_canonical_json(self.to_dict(include_hash=False)))

def to_dict(self, include_hash: bool = True) -> Dict:
data = {
"piece_id": self.piece_id,
"piece_size": self.piece_size,
"epoch": self.epoch,
"validator_id": self.validator_id,
"sample_offsets": list(self.sample_offsets),
"sample_size": self.sample_size,
}
if include_hash:
data["challenge_hash"] = self.challenge_hash
return data


@dataclass(frozen=True)
class CustodyProof:
challenge_hash: str
piece_id: str
validator_id: str
sample_hashes: Dict[str, str]
piece_hash: Optional[str] = None

def to_dict(self) -> Dict:
data = {
"challenge_hash": self.challenge_hash,
"piece_id": self.piece_id,
"validator_id": self.validator_id,
"sample_hashes": dict(self.sample_hashes),
}
if self.piece_hash is not None:
data["piece_hash"] = self.piece_hash
return data


@dataclass(frozen=True)
class CustodyVerificationResult:
valid: bool
slashable: bool
reason: str
checked_samples: int
failed_offsets: List[int]

def to_dict(self) -> Dict:
return {
"valid": self.valid,
"slashable": self.slashable,
"reason": self.reason,
"checked_samples": self.checked_samples,
"failed_offsets": list(self.failed_offsets),
}


def build_custody_challenge(
piece_id: str,
piece_size: int,
epoch: int,
validator_id: str,
sample_count: int = DEFAULT_SAMPLE_COUNT,
sample_size: int = DEFAULT_SAMPLE_SIZE,
seed: Optional[str] = None,
) -> CustodyChallenge:
"""Select deterministic sample offsets for a data availability custody check."""
_validate_challenge_params(
piece_id=piece_id,
piece_size=piece_size,
epoch=epoch,
validator_id=validator_id,
sample_count=sample_count,
sample_size=sample_size,
)

max_offset = piece_size - sample_size
seed_material = _derive_seed(piece_id, piece_size, epoch, validator_id, seed)
offsets = []
counter = 0

while len(offsets) < sample_count:
digest = hashlib.sha256(seed_material + counter.to_bytes(8, "big")).digest()
offset = int.from_bytes(digest[:8], "big") % (max_offset + 1)
offsets.append(offset)
counter += 1

return CustodyChallenge(
piece_id=piece_id,
piece_size=piece_size,
epoch=epoch,
validator_id=validator_id,
sample_offsets=offsets,
sample_size=sample_size,
)


def create_custody_proof(data: bytes, challenge: CustodyChallenge) -> CustodyProof:
"""Hash the challenged data samples for a validator custody response."""
if not isinstance(data, bytes):
raise TypeError("data must be bytes")
if len(data) != challenge.piece_size:
raise ValueError("data length does not match challenge piece_size")

sample_hashes = {
str(offset): _sha256_hex(data[offset:offset + challenge.sample_size])
for offset in challenge.sample_offsets
}
return CustodyProof(
challenge_hash=challenge.challenge_hash,
piece_id=challenge.piece_id,
validator_id=challenge.validator_id,
sample_hashes=sample_hashes,
piece_hash=_sha256_hex(data),
)


def verify_custody_proof(
data: bytes,
challenge: CustodyChallenge,
proof: CustodyProof,
) -> CustodyVerificationResult:
"""Verify sampled custody evidence and flag failures as slashable."""
if not isinstance(data, bytes):
raise TypeError("data must be bytes")
if len(data) != challenge.piece_size:
raise ValueError("data length does not match challenge piece_size")

if proof.challenge_hash != challenge.challenge_hash:
return CustodyVerificationResult(
valid=False,
slashable=True,
reason="challenge_hash_mismatch",
checked_samples=0,
failed_offsets=[],
)
if proof.piece_id != challenge.piece_id:
return CustodyVerificationResult(
valid=False,
slashable=True,
reason="piece_id_mismatch",
checked_samples=0,
failed_offsets=[],
)
if proof.validator_id != challenge.validator_id:
return CustodyVerificationResult(
valid=False,
slashable=True,
reason="validator_id_mismatch",
checked_samples=0,
failed_offsets=[],
)

failed_offsets = []
for offset in challenge.sample_offsets:
expected = _sha256_hex(data[offset:offset + challenge.sample_size])
observed = proof.sample_hashes.get(str(offset))
if observed is None or not hmac.compare_digest(observed, expected):
failed_offsets.append(offset)

if failed_offsets:
return CustodyVerificationResult(
valid=False,
slashable=True,
reason="sample_hash_mismatch",
checked_samples=len(challenge.sample_offsets),
failed_offsets=failed_offsets,
)

return CustodyVerificationResult(
valid=True,
slashable=False,
reason="ok",
checked_samples=len(challenge.sample_offsets),
failed_offsets=[],
)
126 changes: 126 additions & 0 deletions node/tests/test_data_custody.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# SPDX-License-Identifier: MIT

import pytest

from node.data_custody import (
build_custody_challenge,
create_custody_proof,
verify_custody_proof,
)


def test_challenge_offsets_are_deterministic_for_validator_epoch():
first = build_custody_challenge(
piece_id="piece-a",
piece_size=1024,
epoch=7,
validator_id="validator-1",
sample_count=8,
sample_size=16,
)
second = build_custody_challenge(
piece_id="piece-a",
piece_size=1024,
epoch=7,
validator_id="validator-1",
sample_count=8,
sample_size=16,
)

assert first.sample_offsets == second.sample_offsets
assert first.challenge_hash == second.challenge_hash
assert all(0 <= offset <= 1008 for offset in first.sample_offsets)


def test_valid_custody_proof_verifies_all_challenged_samples():
data = bytes(range(256)) * 4
challenge = build_custody_challenge(
piece_id="piece-a",
piece_size=len(data),
epoch=9,
validator_id="validator-1",
sample_count=10,
sample_size=24,
)
proof = create_custody_proof(data, challenge)

result = verify_custody_proof(data, challenge, proof)

assert result.valid is True
assert result.slashable is False
assert result.checked_samples == 10
assert result.failed_offsets == []


def test_missing_sample_hash_is_slashable_custody_failure():
data = b"availability-piece" * 64
challenge = build_custody_challenge(
piece_id="piece-a",
piece_size=len(data),
epoch=11,
validator_id="validator-1",
sample_count=6,
sample_size=32,
)
proof = create_custody_proof(data, challenge)
sample_hashes = proof.to_dict()["sample_hashes"]
removed_offset = challenge.sample_offsets[0]
sample_hashes.pop(str(removed_offset))

incomplete_proof = type(proof)(
challenge_hash=proof.challenge_hash,
piece_id=proof.piece_id,
validator_id=proof.validator_id,
sample_hashes=sample_hashes,
piece_hash=proof.piece_hash,
)

result = verify_custody_proof(data, challenge, incomplete_proof)

assert result.valid is False
assert result.slashable is True
assert result.reason == "sample_hash_mismatch"
assert removed_offset in result.failed_offsets


def test_tampered_sample_hash_is_slashable_custody_failure():
data = b"availability-piece" * 64
challenge = build_custody_challenge(
piece_id="piece-a",
piece_size=len(data),
epoch=12,
validator_id="validator-1",
sample_count=6,
sample_size=32,
)
proof = create_custody_proof(data, challenge)
sample_hashes = proof.to_dict()["sample_hashes"]
tampered_offset = challenge.sample_offsets[-1]
sample_hashes[str(tampered_offset)] = "00" * 32

tampered_proof = type(proof)(
challenge_hash=proof.challenge_hash,
piece_id=proof.piece_id,
validator_id=proof.validator_id,
sample_hashes=sample_hashes,
piece_hash=proof.piece_hash,
)

result = verify_custody_proof(data, challenge, tampered_proof)

assert result.valid is False
assert result.slashable is True
assert result.reason == "sample_hash_mismatch"
assert tampered_offset in result.failed_offsets


def test_challenge_rejects_impossible_sample_size():
with pytest.raises(ValueError, match="sample_size cannot exceed piece_size"):
build_custody_challenge(
piece_id="piece-a",
piece_size=16,
epoch=1,
validator_id="validator-1",
sample_count=1,
sample_size=32,
)
Loading