Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
v0.22.1

- Fix `tile_functions_parasail` D/I CIGAR inversion: `nw_trace_scan_profile_16`
returns a CIGAR from the profile's perspective, so insertion and deletion ops
were swapped relative to SAM convention; normalize before parsing
- Fix `cigar_to_subs` bounds guard: skip X ops that would index past the end of
the reference (triggered when query sequences are longer than the reference)
- Add insertion and deletion generation to `sequence_pileup.py` benchmark
(Poisson-sampled ~3 insertions and ~3 deletions per sequence in addition to SNPs)

v0.22.0

- Add parasail alignment backend to pileup tile functions (`tile_functions_parasail`,
Expand Down
34 changes: 24 additions & 10 deletions clodius/tiles/pileup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@ def cigar_to_subs(cigar_bytes: bytes, ref: str, query: str) -> tuple:
qry_pos += length
elif op == "X":
for k in range(length):
subs.append(
{
"pos": ref_pos + k,
"type": "X",
"length": 1,
"base": ref[ref_pos + k],
"variant": query[qry_pos + k],
}
)
rp = ref_pos + k
qp = qry_pos + k
if rp < len(ref) and qp < len(query):
subs.append(
{
"pos": rp,
"type": "X",
"length": 1,
"base": ref[rp],
"variant": query[qp],
}
)
ref_pos += length
qry_pos += length
elif op == "I":
Expand Down Expand Up @@ -298,7 +301,18 @@ def tileset_info():
for refseq in refseqs:
profile = profiles[refseq["id"]]
result = parasail.nw_trace_scan_profile_16(profile, seq, OPEN, EXTEND)
start, end, subs = cigar_to_subs(result.cigar.decode, refseq["seq"], seq)
# nw_trace_scan_profile builds the profile from the reference, so
# the returned CIGAR describes the alignment from the *profile's*
# perspective: its D/I ops are swapped relative to SAM convention.
# Normalise to SAM (I = extra base in query, D = missing base in
# query) before passing to cigar_to_subs.
cigar = (
result.cigar.decode
.replace(b"D", b"\x00")
.replace(b"I", b"D")
.replace(b"\x00", b"I")
)
start, end, subs = cigar_to_subs(cigar, refseq["seq"], seq)

chr_offset = calc_chr_offset(chromsizes, refseq["id"])

Expand Down
72 changes: 55 additions & 17 deletions examples/sequence_pileup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Sequence pileup benchmark:
1. Generate a random 250nt base sequence.
2. Generate 20 000 mutated sequences (~30 SNPs each from the base).
2. Generate 50 000 mutated sequences (~30 SNPs, ~3 deletions, ~3 insertions each).
3. Build a pileup tile with parasail (profile-based, one profile per reference).
"""

Expand Down Expand Up @@ -69,20 +69,47 @@ def save_result(result: dict, prefix: str) -> None:
SEQ_LEN = 250
N_SEQS = 50_000
AVG_MUTATIONS = 30
AVG_INSERTIONS = 3
AVG_DELETIONS = 3


def random_sequence(length: int, rng: random.Random) -> str:
return "".join(rng.choices(BASES, k=length))


def mutate_sequence(seq: str, avg_mutations: int, rng: random.Random) -> str:
"""Return a copy of *seq* with Poisson(avg_mutations) random SNPs."""
n_muts = np.random.poisson(avg_mutations)
positions = rng.sample(range(len(seq)), min(n_muts, len(seq)))
def mutate_sequence(
seq: str,
avg_mutations: int,
avg_insertions: int,
avg_deletions: int,
rng: random.Random,
) -> str:
"""Return a copy of *seq* with Poisson SNPs, deletions, and insertions."""
seq_list = list(seq)
for pos in positions:

# SNPs
n_muts = np.random.poisson(avg_mutations)
snp_positions = rng.sample(range(len(seq_list)), min(n_muts, len(seq_list)))
for pos in snp_positions:
alt_bases = [b for b in BASES if b != seq_list[pos]]
seq_list[pos] = rng.choice(alt_bases)

# Deletions — remove positions high-to-low so earlier indices stay valid
n_dels = np.random.poisson(avg_deletions)
if n_dels > 0:
del_positions = sorted(
rng.sample(range(len(seq_list)), min(n_dels, len(seq_list))),
reverse=True,
)
for pos in del_positions:
del seq_list[pos]

# Insertions — insert random bases at random positions
n_ins = np.random.poisson(avg_insertions)
for _ in range(n_ins):
pos = rng.randint(0, len(seq_list))
seq_list.insert(pos, rng.choice(BASES))

return "".join(seq_list)


Expand All @@ -91,13 +118,15 @@ def mutate_sequence(seq: str, avg_mutations: int, rng: random.Random) -> str:

print("Generating sequences …")
base_seq = random_sequence(SEQ_LEN, rng)
mutated_seqs = [mutate_sequence(base_seq, AVG_MUTATIONS, rng) for _ in range(N_SEQS)]
actual_avg = (
sum(sum(a != b for a, b in zip(base_seq, s)) for s in mutated_seqs) / N_SEQS
)
print(f" Base sequence length : {SEQ_LEN} nt")
print(f" Number of sequences : {N_SEQS}")
print(f" Actual mean SNPs/seq : {actual_avg:.1f}")
mutated_seqs = [
mutate_sequence(base_seq, AVG_MUTATIONS, AVG_INSERTIONS, AVG_DELETIONS, rng)
for _ in range(N_SEQS)
]
seq_lengths = [len(s) for s in mutated_seqs]
print(f" Base sequence length : {SEQ_LEN} nt")
print(f" Number of sequences : {N_SEQS}")
print(f" Mutated seq length range : {min(seq_lengths)}–{max(seq_lengths)} nt")
print(f" Mutated seq mean length : {sum(seq_lengths)/N_SEQS:.1f} nt")


# ---------------------------------------------------------------------------
Expand All @@ -112,9 +141,18 @@ def mutate_sequence(seq: str, avg_mutations: int, rng: random.Random) -> str:
print(f" Time : {t_parasail:.2f} s ({t_parasail / N_SEQS * 1000:.2f} ms/seq)")
save_result(result_parasail, "pileup_parasail")

# Aggregate substitution counts
ps_total_subs = sum(len(r["substitutions"]) for r in tile_parasail)
print(f"\n Total substitution entries : {ps_total_subs}")
print(f" Per-read average : {ps_total_subs / N_SEQS:.2f}")
# Aggregate alignment event counts
ps_total_snps = sum(
sum(1 for e in r["substitutions"] if e["type"] == "X") for r in tile_parasail
)
ps_total_ins = sum(
sum(1 for e in r["substitutions"] if e["type"] == "I") for r in tile_parasail
)
ps_total_dels = sum(
sum(1 for e in r["substitutions"] if e["type"] == "D") for r in tile_parasail
)
print(f"\n SNPs — total: {ps_total_snps:>8,} per-read avg: {ps_total_snps / N_SEQS:.2f}")
print(f" INS — total: {ps_total_ins:>8,} per-read avg: {ps_total_ins / N_SEQS:.2f}")
print(f" DEL — total: {ps_total_dels:>8,} per-read avg: {ps_total_dels / N_SEQS:.2f}")

print("\nDone.")
49 changes: 49 additions & 0 deletions test/tiles/pileup_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ def test_multiple_mismatches(self):
mismatch_positions = [s["pos"] for s in subs if s["type"] == "X"]
assert mismatch_positions == [0, 4]

def test_x_op_clipped_at_ref_boundary(self):
"""X ops that would index past the end of the reference are silently skipped."""
# CIGAR "3=3X": ref "ACGTA" (len 5); after 3= ref_pos=3; the 3X would
# try ref[3], ref[4], ref[5] — the last is out of bounds and must be skipped.
start, end, subs = cigar_to_subs(b"3=3X", "ACGTA", "ACGXYZ")
x_subs = [s for s in subs if s["type"] == "X"]
assert len(x_subs) == 2
assert x_subs[0]["pos"] == 3
assert x_subs[1]["pos"] == 4


class TestGetPileupAlignmentDataParasail:
@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -248,3 +258,42 @@ def test_read_ids(self):
tile = result["tiles"]["x.0.0"]
assert tile[0]["id"] == "r0_ref"
assert tile[1]["id"] == "r1_ref"

def test_insertion_detected(self):
"""A sequence longer than the reference produces an I event."""
seq_with_ins = self.REF[:5] + "T" + self.REF[5:]
result = get_pileup_alignment_data(self.REF, [seq_with_ins], method="parasail")
entry = result["tiles"]["x.0.0"][0]
assert any(s["type"] == "I" for s in entry["substitutions"])

def test_deletion_detected(self):
"""A sequence shorter than the reference produces a D event."""
seq_with_del = self.REF[:5] + self.REF[6:]
result = get_pileup_alignment_data(self.REF, [seq_with_del], method="parasail")
entry = result["tiles"]["x.0.0"][0]
assert any(s["type"] == "D" for s in entry["substitutions"])

def test_mixed_indels_and_snps(self):
"""A sequence with an insertion, a deletion, and a SNP produces all three event types."""
seq = list(self.REF)
seq[50] = "A" if seq[50] != "A" else "C" # SNP
del seq[30] # deletion → len 59
seq.insert(10, "A") # insertion → len 60
result = get_pileup_alignment_data(self.REF, ["".join(seq)], method="parasail")
entry = result["tiles"]["x.0.0"][0]
types = {s["type"] for s in entry["substitutions"]}
assert "I" in types
assert "D" in types
assert "X" in types

def test_variable_length_sequences_align_without_error(self):
"""Batch of sequences with different lengths all align and produce the correct event type."""
seqs = [
self.REF[:5] + "T" + self.REF[5:], # one insertion → len 61
self.REF[:5] + self.REF[6:], # one deletion → len 59
]
result = get_pileup_alignment_data(self.REF, seqs, method="parasail")
tile = result["tiles"]["x.0.0"]
assert len(tile) == 2
assert any(s["type"] == "I" for s in tile[0]["substitutions"])
assert any(s["type"] == "D" for s in tile[1]["substitutions"])
Loading