Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
__pycache__/
data/
.env
.claude/
docs/
Expand Down
2 changes: 1 addition & 1 deletion detectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

@dataclass
class DetectionResult:
event_type: Literal["trend", "spike", "drop"]
event_type: Literal["trend", "spike"]
start_minute: Optional[int] = None
end_minute: Optional[int] = None
Comment thread
KarlDeck marked this conversation as resolved.
direction: Optional[Literal["increasing", "decreasing"]] = None
Expand Down
243 changes: 115 additions & 128 deletions detectors/spike.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,163 +7,162 @@
from __future__ import annotations

import numpy as np
from scipy.signal import find_peaks, peak_widths
from scipy.signal import find_peaks

from detectors import DetectionResult, StructuralDetector


class SpikeDetector(StructuralDetector):
"""Detects spikes and drops.
"""
"""Detects upward point anomalies using a local-baseline heuristic."""

def __init__(
self,
filter_zeros: bool = False,
prominence_scale: float = 3.0,
min_prominence: float = 0.0,
min_distance: int = 1,
min_width: int = 1,
top_k: int | None = None,
smooth_window: int = 1,
drop_localization: str = "center",
max_cluster_peaks: int = 2,
soft_top_k_margin: float = 0.90,
) -> None:
super().__init__(filter_zeros=filter_zeros)
self.prominence_scale = prominence_scale
self.min_prominence = min_prominence
self.min_distance = max(1, min_distance)
self.min_width = max(1, min_width)
self.top_k = top_k
self.smooth_window = max(1, smooth_window)
self.drop_localization = drop_localization
self.max_cluster_peaks = max(1, max_cluster_peaks)
self.soft_top_k_margin = min(max(float(soft_top_k_margin), 0.0), 1.0)
self.duplicate_radius = max(1, min(3, self.min_distance // 5 if self.min_distance > 1 else 1))

def _detect(self, filtered_signal: np.ndarray, indices: np.ndarray) -> list[DetectionResult]:
smoothed_signal = self._smooth(filtered_signal)
prominence = self._prominence_threshold(filtered_signal) # threshold on raw/filtered, not smoothed
if prominence <= 0:
return []

results: list[DetectionResult] = []
seen_minutes: set[int] = set()

for peak_idx, peak_score, properties in self._select_peaks(smoothed_signal, prominence):
minute = self._localize_spike(peak_idx, properties, filtered_signal, indices)
if minute in seen_minutes:
continue
results.append(DetectionResult(event_type="spike", spike_minute=minute, score=peak_score))
seen_minutes.add(minute)

for peak_idx, peak_score, properties in self._select_drop_peaks(smoothed_signal, prominence):
minute = self._localize_drop(peak_idx, properties, smoothed_signal, indices)
if minute in seen_minutes:
continue
results.append(DetectionResult(event_type="drop", spike_minute=minute, score=peak_score))
seen_minutes.add(minute)

candidates = self._collect_candidates(filtered_signal, indices)
results = self._select_results(candidates)
results.sort(key=lambda result: int(result.spike_minute))
return results

def _smooth(self, filtered_signal: np.ndarray) -> np.ndarray:
if self.smooth_window <= 1:
return filtered_signal

kernel = np.ones(self.smooth_window, dtype=float) / float(self.smooth_window)
return np.convolve(filtered_signal, kernel, mode="same")

def _select_peaks(
def _collect_candidates(
self,
smoothed_signal: np.ndarray,
prominence: float,
) -> list[tuple[int, float, dict[str, int]]]:
filtered_signal: np.ndarray,
indices: np.ndarray,
) -> list[dict[str, float | int]]:
peaks, properties = find_peaks(
smoothed_signal,
prominence=prominence,
distance=self.min_distance,
filtered_signal,
prominence=self.min_prominence,
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SpikeDetector.min_distance is no longer applied in the find_peaks call (no distance=... argument). This makes the min_distance parameter ineffective and can substantially change detection behavior given the configs set min_distance (e.g., 15/20/30). Pass distance=self.min_distance to find_peaks (or remove the parameter if intentional) so spacing constraints are enforced consistently.

Suggested change
prominence=self.min_prominence,
prominence=self.min_prominence,
distance=self.min_distance,

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe solve by rename

width=self.min_width,
)
if len(peaks) == 0:
return []
Comment on lines 48 to 54
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

scipy find_peaks distance parameter behavior

💡 Result:

In scipy.signal.find_peaks, the distance parameter enforces a minimum horizontal spacing (in samples) between returned peak indices:

  • distance is the required minimal horizontal distance (>= 1) in samples between neighbouring peaks. If candidate peaks are closer than distance, the smaller peaks are removed first until all remaining peaks satisfy the spacing constraint. [1]
  • Conceptually, SciPy’s implementation keeps the taller peaks preferentially and suppresses nearby smaller ones (i.e., it effectively processes peaks from high to low and removes neighbors within the forbidden window). [2]
  • Guarantee is on peak indices being at least distance samples apart; however, for flat/plateau peaks, the edges of plateaus may end up closer than distance even though the returned peak indices respect it. [1]

Sources: SciPy docs for find_peaks [1] and an explanation/derivation based on the SciPy implementation [2].


🏁 Script executed:

find . -name "spike.py" -path "*/detectors/*" | head -5

Repository: StanfordBDHG/SensorTSLM

Length of output: 87


🏁 Script executed:

cat -n ./detectors/spike.py | sed -n '1,60p'

Repository: StanfordBDHG/SensorTSLM

Length of output: 2749


🏁 Script executed:

cat -n ./detectors/spike.py | sed -n '43,100p'

Repository: StanfordBDHG/SensorTSLM

Length of output: 3171


🏁 Script executed:

cat -n ./detectors/spike.py | sed -n '100,180p'

Repository: StanfordBDHG/SensorTSLM

Length of output: 4015


min_distance parameter is not applied during peak finding.

The min_distance attribute is stored in the constructor (line 30) but is not passed to find_peaks() (line 48). This means the minimum spacing constraint is not enforced during candidate collection. The duplicate_radius clustering in _select_results handles deduplication of collected candidates but doesn't replace the distance parameter's role in actively suppressing weaker peaks that fall within the minimum distance during the initial peak detection phase.

Pass distance=self.min_distance to enforce spacing during peak finding:

🔧 Proposed fix
        peaks, properties = find_peaks(
            filtered_signal,
            prominence=self.min_prominence,
            width=self.min_width,
+            distance=self.min_distance,
        )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@detectors/spike.py` around lines 48 - 54, The peak-finding call does not
apply the stored min_distance constraint, so update the find_peaks invocation
that processes filtered_signal (where peaks, properties = find_peaks(...)) to
include distance=self.min_distance so spacing is enforced during candidate
collection; keep the existing duplicate_radius logic in _select_results for
clustering but ensure initial suppression uses the min_distance attribute.


prominences = properties["prominences"].astype(float)
order = np.argsort(prominences)[::-1]
if self.top_k is not None:
order = order[: self.top_k]

left_ips = properties.get("left_ips", peaks).astype(float)
right_ips = properties.get("right_ips", peaks).astype(float)

selected: list[tuple[int, float, dict[str, int]]] = []
for idx in order:
peak_idx = int(peaks[idx])
selected.append(
(
peak_idx,
float(prominences[idx]),
{
"left_idx": int(np.floor(left_ips[idx])),
"right_idx": int(np.ceil(right_ips[idx])),
},
)
left_bases = properties.get("left_bases", peaks).astype(int)
right_bases = properties.get("right_bases", peaks).astype(int)
widths = properties.get("widths", np.ones_like(peaks, dtype=float)).astype(float)

candidates: list[dict[str, float | int]] = []
for idx, peak in enumerate(peaks):
peak_idx = int(peak)
minute = self._localize_spike(
peak_idx,
{
"left_idx": int(np.floor(left_ips[idx])),
"right_idx": int(np.ceil(right_ips[idx])),
},
filtered_signal,
indices,
)
return selected

def _select_drop_peaks(
self,
smoothed_signal: np.ndarray,
prominence: float,
) -> list[tuple[int, float, dict[str, int]]]:
peaks, properties = find_peaks(
-smoothed_signal,
prominence=prominence,
distance=self.min_distance,
width=self.min_width,
plateau_size=(1, None),
)
if len(peaks) == 0:
return []

prominences = properties["prominences"].astype(float)
left_edges = properties.get("left_edges", peaks).astype(int)
order = np.argsort(prominences)[::-1]
if self.top_k is not None:
order = order[: self.top_k]

selected: list[tuple[int, float, dict[str, int]]] = []
for idx in order:
peak_idx = int(peaks[idx])
left_edge_idx = int(left_edges[idx])
selected.append(
(
peak_idx,
float(prominences[idx]),
{
"left_edge_idx": left_edge_idx,
},
)
left_base_value = float(filtered_signal[int(np.clip(left_bases[idx], 0, len(filtered_signal) - 1))])
right_base_value = float(filtered_signal[int(np.clip(right_bases[idx], 0, len(filtered_signal) - 1))])
baseline_value = max(left_base_value, right_base_value)
local_jump = max(0.0, float(filtered_signal[peak_idx]) - baseline_value)
relative_jump = local_jump / max(abs(baseline_value), 1.0)
candidates.append(
{
"minute": minute,
"prominence": float(prominences[idx]),
"width": float(widths[idx]),
"local_jump": local_jump,
"relative_jump": relative_jump,
}
)
return selected
return candidates

def _localize_drop(
self,
peak_idx: int,
properties: dict[str, int],
smoothed_signal: np.ndarray,
indices: np.ndarray,
) -> int:
if self.drop_localization == "left_edge":
return int(indices[properties["left_edge_idx"]])
def _select_results(self, candidates: list[dict[str, float | int]]) -> list[DetectionResult]:
if not candidates:
return []

if self.drop_localization == "left_ips":
try:
left_ips_arr = peak_widths(-smoothed_signal, [peak_idx], rel_height=1.0)[2]
left_idx = int(np.clip(round(float(left_ips_arr[0])), 0, len(indices) - 1))
return int(indices[left_idx])
except Exception:
return int(indices[peak_idx])
minutes = np.asarray([float(candidate["minute"]) for candidate in candidates], dtype=float)
prominences = np.asarray([float(candidate["prominence"]) for candidate in candidates], dtype=float)
widths = np.asarray([float(candidate["width"]) for candidate in candidates], dtype=float)
local_jumps = np.asarray([float(candidate["local_jump"]) for candidate in candidates], dtype=float)
relative_jumps = np.asarray([float(candidate["relative_jump"]) for candidate in candidates], dtype=float)

jump_cutoff = self._local_jump_threshold(local_jumps)
scores = local_jumps + 0.25 * widths + 0.10 * prominences

duplicate_clusters = self._cluster_ids(minutes, radius=self.duplicate_radius)
accepted: set[int] = set()
cluster_members: dict[int, list[int]] = {}
for idx, cluster_id in enumerate(duplicate_clusters):
cluster_members.setdefault(int(cluster_id), []).append(idx)

for cluster_id in sorted(cluster_members):
members = cluster_members[cluster_id]
qualifying = [
idx for idx in members
if local_jumps[idx] >= jump_cutoff - 1e-12 or relative_jumps[idx] >= 1.0 - 1e-12
]
if not qualifying:
continue

return int(indices[peak_idx])
qualifying.sort(key=lambda idx: (-scores[idx], -local_jumps[idx], -widths[idx], minutes[idx]))
keep_n = min(self.max_cluster_peaks, len(qualifying))
boundary_score = float(scores[qualifying[keep_n - 1]])
accepted.update(idx for idx in qualifying if scores[idx] >= boundary_score - 1e-12)

if self.top_k is not None and len(accepted) > self.top_k:
accepted_list = sorted(accepted, key=lambda idx: (-scores[idx], -local_jumps[idx], -widths[idx], minutes[idx]))
boundary_score = float(scores[accepted_list[self.top_k - 1]])
override_cutoff = boundary_score * self.soft_top_k_margin
accepted = {idx for idx in accepted_list if scores[idx] >= override_cutoff - 1e-12}

return [
DetectionResult(
event_type="spike",
spike_minute=int(candidates[idx]["minute"]),
score=float(scores[idx]),
)
for idx in accepted
]

@staticmethod
def _cluster_ids(minutes: np.ndarray, radius: int) -> np.ndarray:
if len(minutes) == 0:
return np.zeros(0, dtype=int)
order = np.argsort(minutes)
cluster_ids = np.zeros(len(minutes), dtype=int)
cluster = 0
prev_minute = float(minutes[order[0]])
cluster_ids[order[0]] = cluster
for idx in order[1:]:
minute = float(minutes[idx])
if minute - prev_minute > radius:
cluster += 1
cluster_ids[idx] = cluster
prev_minute = minute
return cluster_ids

@staticmethod
def _local_jump_threshold(local_jumps: np.ndarray) -> float:
if len(local_jumps) == 0:
return 0.0
q25, q50, q75 = np.percentile(local_jumps, [25, 50, 75])
iqr = float(q75 - q25)
return max(0.0, max(float(q25), float(q50 - 0.25 * iqr)))

@staticmethod
def _localize_spike(
self,
peak_idx: int,
properties: dict[str, int],
filtered_signal: np.ndarray,
Expand All @@ -183,15 +182,3 @@ def _localize_spike(
candidate_indices = left_idx + candidate_offsets
best_idx = min(candidate_indices, key=lambda idx: abs(idx - peak_idx))
return int(indices[int(best_idx)])

def _prominence_threshold(self, filtered_signal: np.ndarray) -> float:
centered = filtered_signal - np.median(filtered_signal)
mad = float(np.median(np.abs(centered)))
if mad > 0:
scale = 1.4826 * mad
return max(self.min_prominence, self.prominence_scale * scale)

spread = float(np.percentile(filtered_signal, 95) - np.percentile(filtered_signal, 5))
if spread <= 1e-12:
return 0.0
return max(self.min_prominence, 0.5 * spread)
Loading
Loading