Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
71cb0b8
In Bed but not Asleep annotation
KarlDeck Apr 9, 2026
e457736
Refactor CrossChannelExtractor into driver with pluggable synthesizers
max-rosenblattl Apr 12, 2026
7596a1b
made stationary activity synthesizer
KarlDeck Apr 12, 2026
e4320e5
added totals
KarlDeck Apr 12, 2026
08af225
added cardio synthesizers with totals
KarlDeck Apr 12, 2026
48927f1
Merge remote-tracking branch 'origin/main' into KarlDeck/Sleep-Bundles
KarlDeck Apr 12, 2026
7596829
Merge remote-tracking branch 'origin/main' into KarlDeck/Sleep-Bundles
KarlDeck Apr 12, 2026
5e3842d
put static methods into parent
KarlDeck Apr 13, 2026
43e3711
made min duration for synthesizer visible to users. Adresses Coderabb…
KarlDeck Apr 13, 2026
9e525e0
adressed coderabbit comment #2 comment
KarlDeck Apr 13, 2026
5158020
put variables into mhc/constants.py
KarlDeck Apr 13, 2026
3601edd
added HR delta
KarlDeck Apr 13, 2026
e7e5d0a
rephrased HR delta
KarlDeck Apr 13, 2026
fc3590a
solved duplication issue in templates/templates.json
KarlDeck Apr 13, 2026
52b8063
fixed --weekly issue
KarlDeck Apr 13, 2026
396e5b1
added 100 bpm threshold
KarlDeck Apr 13, 2026
05fb0ac
put _seed into util.py
KarlDeck Apr 13, 2026
0e09456
Merge remote-tracking branch 'origin/main' into KarlDeck/Sleep-Bundles
KarlDeck Apr 13, 2026
9f72309
reprased the synthesizer outputs
KarlDeck Apr 14, 2026
8a0956e
rephrase 2
KarlDeck Apr 14, 2026
df174d1
split up _metrics_suffix to make it easier to read
KarlDeck Apr 14, 2026
3ee312b
split up _metrics_suffix to make it easier to read
KarlDeck Apr 14, 2026
e6ecd2a
small cleanup
KarlDeck Apr 14, 2026
5cb38ef
Refactored _metrics_suffix into sub functions and transfered into parent
KarlDeck Apr 14, 2026
56d2fad
minor fix
KarlDeck Apr 14, 2026
9e39ee3
added docstrings
KarlDeck Apr 14, 2026
5de955d
comment added
KarlDeck Apr 14, 2026
c2be771
transfer functions from init to _helper
KarlDeck Apr 16, 2026
2fe7c84
created _workout base for cardio, stationary and furutre workouts
KarlDeck Apr 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
__pycache__/
data/
.env
.claude/
docs/
sample_plot_*.png
.DS_Store
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Use the interactive explorer to inspect one row at a time, switch signals, and s
Start it with:

```bash
python3 explorer.py
python3 explorer.py --min-wear-pct=50.0
```

Useful controls:
Expand Down
8 changes: 8 additions & 0 deletions captionizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def run(
from mhc.dataset import MHCDataset
from mhc.transformer import MHCTransformer
from mhc.constants import MHC_CHANNEL_CONFIG
from extractors.cross_channel import CrossChannelExtractor
from synthesizers.cardio import CardioSynthesizer
from synthesizers.sleep import SleepSynthesizer
from synthesizers.stationary_activity import StationaryActivitySynthesizer
from extractors.statistical import StatisticalExtractor
from extractors.structural import StructuralExtractor
from models.local import LocalConfig, LocalModel
Expand All @@ -66,6 +70,10 @@ def run(
StatisticalExtractor(MHC_CHANNEL_CONFIG),
StructuralExtractor(MHC_CHANNEL_CONFIG),
SemanticExtractor(MHC_CHANNEL_CONFIG),
CrossChannelExtractor(
MHC_CHANNEL_CONFIG,
synthesizers=[SleepSynthesizer(), StationaryActivitySynthesizer(), CardioSynthesizer()],
),
])

captionizer = Captionizer(dataset, MHCTransformer(), annotator)
Expand Down
2 changes: 1 addition & 1 deletion detectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

@dataclass
class DetectionResult:
event_type: Literal["trend", "spike", "drop"]
event_type: Literal["trend", "spike"]
start_minute: Optional[int] = None
end_minute: Optional[int] = None
direction: Optional[Literal["increasing", "decreasing"]] = None
Expand Down
190 changes: 36 additions & 154 deletions detectors/spike.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,191 +7,73 @@
from __future__ import annotations

import numpy as np
from scipy.signal import find_peaks, peak_widths
from scipy.signal import find_peaks

from detectors import DetectionResult, StructuralDetector


class SpikeDetector(StructuralDetector):
"""Detects spikes and drops.
"""
"""Detects peaks using a minimal SciPy-backed configuration."""

def __init__(
self,
filter_zeros: bool = False,
prominence_scale: float = 3.0,
min_height: float | None = None,
min_prominence: float = 0.0,
min_threshold: float | None = None,
min_distance: int = 1,
min_width: int = 1,
top_k: int | None = None,
smooth_window: int = 1,
drop_localization: str = "center",
max_cluster_peaks: int = 2,
soft_top_k_margin: float = 0.90,
) -> None:
super().__init__(filter_zeros=filter_zeros)
self.prominence_scale = prominence_scale
self.min_height = min_height
self.min_prominence = min_prominence
self.min_threshold = min_threshold
self.min_distance = max(1, min_distance)
self.min_width = max(1, min_width)
self.top_k = top_k
self.smooth_window = max(1, smooth_window)
self.drop_localization = drop_localization
self.max_cluster_peaks = max(1, max_cluster_peaks)
self.soft_top_k_margin = min(max(float(soft_top_k_margin), 0.0), 1.0)

def _detect(self, filtered_signal: np.ndarray, indices: np.ndarray) -> list[DetectionResult]:
smoothed_signal = self._smooth(filtered_signal)
prominence = self._prominence_threshold(filtered_signal) # threshold on raw/filtered, not smoothed
if prominence <= 0:
return []

results: list[DetectionResult] = []
seen_minutes: set[int] = set()

for peak_idx, peak_score, properties in self._select_peaks(smoothed_signal, prominence):
minute = self._localize_spike(peak_idx, properties, filtered_signal, indices)
if minute in seen_minutes:
continue
results.append(DetectionResult(event_type="spike", spike_minute=minute, score=peak_score))
seen_minutes.add(minute)

for peak_idx, peak_score, properties in self._select_drop_peaks(smoothed_signal, prominence):
minute = self._localize_drop(peak_idx, properties, smoothed_signal, indices)
if minute in seen_minutes:
continue
results.append(DetectionResult(event_type="drop", spike_minute=minute, score=peak_score))
seen_minutes.add(minute)

results.sort(key=lambda result: int(result.spike_minute))
return results

def _smooth(self, filtered_signal: np.ndarray) -> np.ndarray:
if self.smooth_window <= 1:
return filtered_signal

kernel = np.ones(self.smooth_window, dtype=float) / float(self.smooth_window)
return np.convolve(filtered_signal, kernel, mode="same")

def _select_peaks(
self,
smoothed_signal: np.ndarray,
prominence: float,
) -> list[tuple[int, float, dict[str, int]]]:
peaks, properties = find_peaks(
smoothed_signal,
prominence=prominence,
filtered_signal,
height=self.min_height,
prominence=self.min_prominence,
threshold=self.min_threshold,
distance=self.min_distance,
width=self.min_width,
)
if len(peaks) == 0:
return []

prominences = properties["prominences"].astype(float)
order = np.argsort(prominences)[::-1]
if self.top_k is not None:
order = order[: self.top_k]

left_ips = properties.get("left_ips", peaks).astype(float)
right_ips = properties.get("right_ips", peaks).astype(float)

selected: list[tuple[int, float, dict[str, int]]] = []
for idx in order:
peak_idx = int(peaks[idx])
selected.append(
(
peak_idx,
float(prominences[idx]),
{
"left_idx": int(np.floor(left_ips[idx])),
"right_idx": int(np.ceil(right_ips[idx])),
},
)
)
return selected
results = self._build_results(peaks, properties, indices)
results.sort(key=lambda result: int(result.spike_minute))
return results

def _select_drop_peaks(
def _build_results(
self,
smoothed_signal: np.ndarray,
prominence: float,
) -> list[tuple[int, float, dict[str, int]]]:
peaks, properties = find_peaks(
-smoothed_signal,
prominence=prominence,
distance=self.min_distance,
width=self.min_width,
plateau_size=(1, None),
)
peaks: np.ndarray,
properties: dict[str, np.ndarray],
indices: np.ndarray,
) -> list[DetectionResult]:
if len(peaks) == 0:
return []

prominences = properties["prominences"].astype(float)
left_edges = properties.get("left_edges", peaks).astype(int)
order = np.argsort(prominences)[::-1]
widths = properties.get("widths", np.ones_like(peaks, dtype=float)).astype(float)
scores = prominences + 0.25 * widths
ranked_indices = sorted(
range(len(peaks)),
key=lambda idx: (-scores[idx], int(indices[int(peaks[idx])])),
)
if self.top_k is not None:
order = order[: self.top_k]
ranked_indices = ranked_indices[:self.top_k]

selected: list[tuple[int, float, dict[str, int]]] = []
for idx in order:
peak_idx = int(peaks[idx])
left_edge_idx = int(left_edges[idx])
selected.append(
(
peak_idx,
float(prominences[idx]),
{
"left_edge_idx": left_edge_idx,
},
)
return [
DetectionResult(
event_type="spike",
spike_minute=int(indices[int(peaks[idx])]),
score=float(scores[idx]),
)
return selected

def _localize_drop(
self,
peak_idx: int,
properties: dict[str, int],
smoothed_signal: np.ndarray,
indices: np.ndarray,
) -> int:
if self.drop_localization == "left_edge":
return int(indices[properties["left_edge_idx"]])

if self.drop_localization == "left_ips":
try:
left_ips_arr = peak_widths(-smoothed_signal, [peak_idx], rel_height=1.0)[2]
left_idx = int(np.clip(round(float(left_ips_arr[0])), 0, len(indices) - 1))
return int(indices[left_idx])
except Exception:
return int(indices[peak_idx])

return int(indices[peak_idx])

def _localize_spike(
self,
peak_idx: int,
properties: dict[str, int],
filtered_signal: np.ndarray,
indices: np.ndarray,
) -> int:
left_idx = int(np.clip(properties.get("left_idx", peak_idx), 0, len(indices) - 1))
right_idx = int(np.clip(properties.get("right_idx", peak_idx), left_idx, len(indices) - 1))
window = filtered_signal[left_idx:right_idx + 1]
if len(window) == 0:
return int(indices[peak_idx])

max_height = float(np.max(window))
candidate_offsets = np.flatnonzero(window == max_height)
if len(candidate_offsets) == 0:
return int(indices[peak_idx])

candidate_indices = left_idx + candidate_offsets
best_idx = min(candidate_indices, key=lambda idx: abs(idx - peak_idx))
return int(indices[int(best_idx)])

def _prominence_threshold(self, filtered_signal: np.ndarray) -> float:
centered = filtered_signal - np.median(filtered_signal)
mad = float(np.median(np.abs(centered)))
if mad > 0:
scale = 1.4826 * mad
return max(self.min_prominence, self.prominence_scale * scale)

spread = float(np.percentile(filtered_signal, 95) - np.percentile(filtered_signal, 5))
if spread <= 1e-12:
return 0.0
return max(self.min_prominence, 0.5 * spread)
for idx in ranked_indices
]
18 changes: 17 additions & 1 deletion detectors/trend.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _detect(self, filtered_signal: np.ndarray, indices: np.ndarray) -> list[Dete
if segment is not None:
segments.append(segment)

merged = self._merge_segments(segments)
merged = self._resolve_containment(self._merge_segments(segments))
return [
DetectionResult(
event_type="trend",
Expand Down Expand Up @@ -153,6 +153,22 @@ def _classify_window(
score=effect * r2,
)

@staticmethod
def _resolve_containment(segments: list[_TrendSegment]) -> list[_TrendSegment]:
drop: set[int] = set()
for i, outer in enumerate(segments):
if i in drop:
continue
for j, inner in enumerate(segments):
if j <= i or j in drop or inner.direction == outer.direction:
continue
if outer.start_minute <= inner.start_minute and inner.end_minute <= outer.end_minute:
drop.add(j)
elif inner.start_minute <= outer.start_minute and outer.end_minute <= inner.end_minute:
drop.add(i)
break
return [s for i, s in enumerate(segments) if i not in drop]

def _merge_segments(self, segments: list[_TrendSegment]) -> list[_TrendSegment]:
if not segments:
return []
Expand Down
Loading