Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__pycache__/
__pycache__/
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,27 @@ export MHC_DATASET_DIR="../hf-daily_max-nonwear=50"
```bash
python captionizer.py
```

## Export To TimeF

```python
from pathlib import Path

from captionizer import Captionizer
from exporters.timef_export import TimeFExportConfig, export_caption_result

result, _ = captionizer.run(max_rows=5)
root = export_caption_result(
result,
TimeFExportConfig(
output_root=Path("exports"),
dataset_id="mhc_caption_runs",
sampling_period=1,
timestamp_unit=1,
unit_sampling_rate="1 / minute",
unit_timestamp="minute",
time_column_name="time_minute",
),
)
print(root)
Comment on lines +21 to +40
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Example code references undefined captionizer variable.

The example imports Captionizer class (line 24) but uses a lowercase captionizer instance (line 27) that is never created. Users following this example will encounter a NameError.

Consider either:

  1. Adding the necessary setup (dataset, transformer, annotator, instantiation), or
  2. Adding a comment indicating prerequisite setup is required.
📝 Proposed fix with minimal setup context
 from pathlib import Path
 
 from captionizer import Captionizer
 from exporters.timef_export import TimeFExportConfig, export_caption_result
+
+# Setup (see Usage section for full details)
+# captionizer = Captionizer(dataset, transformer, annotator)
 
 result, _ = captionizer.run(max_rows=5)

Or provide a complete working example:

 from pathlib import Path
 
 from captionizer import Captionizer
 from exporters.timef_export import TimeFExportConfig, export_caption_result
+from mhc.dataset import MHCDataset
+from mhc.transformer import MHCTransformer
+from mhc.constants import MHC_CHANNEL_CONFIG
+from extractors.statistical import StatisticalExtractor
+from annotator import Annotator
+
+dataset = MHCDataset()
+annotator = Annotator([StatisticalExtractor(MHC_CHANNEL_CONFIG)])
+captionizer = Captionizer(dataset, MHCTransformer(), annotator)
 
 result, _ = captionizer.run(max_rows=5)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` around lines 21 - 40, The example uses an undefined lowercase
variable captionizer when calling captionizer.run; either instantiate a
Captionizer instance before use (e.g., create and configure a Captionizer object
and assign it to captionizer) or add a clear comment above the snippet stating
that a configured Captionizer instance must exist; update the README example to
reference the Captionizer symbol (Captionizer) properly so export_caption_result
and TimeFExportConfig are called with a real result from captionizer.run.

```
14 changes: 4 additions & 10 deletions annotator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from __future__ import annotations

from extractors import CaptionExtractor
from timef.schema import Annotation, AnnotationSpec, Sample, Signal
from runtime import Annotation, RuntimeRow


class Annotator:
Expand All @@ -19,14 +19,8 @@ def __init__(self, extractors: list[CaptionExtractor]):
raise ValueError(f"Duplicate extractor for caption_type={extractor.caption_type!r}.")
seen.add(extractor.caption_type)

def get_annotation_specs(self) -> list[AnnotationSpec]:
return [extractor.get_annotation_spec() for extractor in self.extractors]

def annotate(self, signals: list[Signal]) -> tuple[list[Sample], list[Annotation]]:
samples: list[Sample] = []
def annotate(self, row: RuntimeRow) -> list[Annotation]:
annotations: list[Annotation] = []
for extractor in self.extractors:
for s, a in extractor.extract(signals):
samples.append(s)
annotations.append(a)
return samples, annotations
annotations.extend(extractor.extract(row))
return annotations
35 changes: 10 additions & 25 deletions captionizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#
from __future__ import annotations

from timef.schema import DatasetManifest, CaptionResult
from runtime import CaptionResult
from transformer import Transformer
from annotator import Annotator
from reviewer import EvaluationResult, Reviewer
Expand All @@ -28,28 +28,17 @@ def __init__(
def run(
self, max_rows: int | None = None,
) -> tuple[CaptionResult, EvaluationResult | None]:
manifest = DatasetManifest()
for spec in self.transformer.get_signal_specs():
manifest.signal_specs[spec.id] = spec
for spec in self.annotator.get_annotation_specs():
manifest.annotation_specs[spec.id] = spec

result = CaptionResult(manifest=manifest)
result = CaptionResult()
failed_rows: list[int] = []

for i in range(len(self.dataset)):
if max_rows is not None and i >= max_rows:
break

try:
row = self.dataset[i]
signals = self.transformer.transform_row(row)
samples, annotations = self.annotator.annotate(signals)

for signal in signals:
result.signals[signal.id] = signal
result.samples.extend(samples)
result.annotations.extend(annotations)
row = self.transformer.transform_row(self.dataset[i])
row.annotations.extend(self.annotator.annotate(row))
result.rows.append(row)
except Exception:
failed_rows.append(i)

Expand Down Expand Up @@ -86,19 +75,15 @@ def run(
print(f"Dataset size: {len(dataset)}")

result, evaluation = captionizer.run(max_rows=5)
print(f"Signals: {len(result.signals)}")
print(f"Samples: {len(result.samples)}")
print(f"Annotations: {len(result.annotations)}")
print(f"Rows: {len(result.rows)}")
print(f"Annotations: {sum(len(row.annotations) for row in result.rows)}")
if evaluation:
print(f"Evaluation: {len(evaluation.scores)} scores, mean={evaluation.mean_score}")

shown = 0
for signals, samples, annotations in result.iter_rows():
active = sum(
s.metadata.get("has_any_data", True) for s in signals
)
if active >= 9:
plot_row(signals, samples, annotations)
for row in result.iter_rows():
if row.active_channel_count() >= 9:
plot_row(row)
shown += 1
if shown >= 4:
break
12 changes: 12 additions & 0 deletions exporters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#
# SPDX-FileCopyrightText: 2026 Stanford University, ETH Zurich, and the project authors (see CONTRIBUTORS.md)
# SPDX-FileCopyrightText: 2026 This source file is part of the SensorTSLM open-source project.
#
# SPDX-License-Identifier: MIT
#
from exporters.timef_export import TimeFExportConfig, export_caption_result

__all__ = [
"TimeFExportConfig",
"export_caption_result",
]
215 changes: 215 additions & 0 deletions exporters/timef_export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
#
# SPDX-FileCopyrightText: 2026 Stanford University, ETH Zurich, and the project authors (see CONTRIBUTORS.md)
# SPDX-FileCopyrightText: 2026 This source file is part of the SensorTSLM open-source project.
#
# SPDX-License-Identifier: MIT
#
from __future__ import annotations

import json
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path

import pyarrow as pa
import pyarrow.parquet as pq

from runtime import CaptionResult
from timenet_timef import (
Annotation as PersistedAnnotation,
AnnotationSampleRef,
AnnotationSpec,
DatasetManifest,
Sample,
SampleSignalRef,
Signal,
SignalSpec,
TimeFWriter,
mark_validated,
validate_dataset,
)

_SPEC_IDS = {
"statistical": 0,
"structural": 1,
"semantic": 2,
}

_QUESTIONS = {
"statistical": "Describe summary statistics for the selected signal.",
"structural": "Describe the structural event in the selected signal.",
"semantic": "Describe the semantic event in the selected signal.",
}


@dataclass(frozen=True)
class TimeFExportConfig:
output_root: Path
dataset_id: str
version: str | None = None
source: str = "sensortslm"
description: str = ""
domains: tuple[str, ...] = ("health", "activity", "sleep")
tags: tuple[str, ...] = ("sensortslm", "generated")
sampling_period: float = 60.0
timestamp_unit: float = 1.0
unit_sampling_rate: str = "Hz"
unit_timestamp: str = "s"
time_column_name: str = "time_s"
value_unit_map: dict[str, str] | None = None


def export_caption_result(result: CaptionResult, config: TimeFExportConfig) -> Path:
version = config.version or datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ")
root = Path(config.output_root) / config.dataset_id / version
if root.exists():
raise FileExistsError(f"Export target already exists: {root}")
if not result.rows:
raise ValueError("CaptionResult is empty")
if config.sampling_period <= 0:
raise ValueError("sampling_period must be > 0")
if config.timestamp_unit <= 0:
raise ValueError("timestamp_unit must be > 0")
if not config.time_column_name:
raise ValueError("time_column_name must be non-empty")

first_row = result.rows[0]
channel_names = tuple(first_row.channel_names)
signal_specs = _build_signal_specs(channel_names, config)
annotation_specs = [
AnnotationSpec(id=_SPEC_IDS["statistical"], task="statistical", domains=list(config.domains)),
AnnotationSpec(id=_SPEC_IDS["structural"], task="structural", domains=list(config.domains)),
AnnotationSpec(id=_SPEC_IDS["semantic"], task="semantic", domains=list(config.domains)),
]

writer = TimeFWriter(root)
samples: list[Sample] = []
signals: list[Signal] = []
annotations: list[PersistedAnnotation] = []
sampling_rate = config.timestamp_unit / config.sampling_period
annotation_id = 0

for sample_id, row in enumerate(result.rows):
_validate_row_shape(row, channel_names)
signal_file = f"sample-{sample_id}.parquet"
_write_signal_frame(root / "signals" / signal_file, config.sampling_period, config.time_column_name, row)
Comment on lines +85 to +95
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Stage the export before touching the final dataset path.

TimeFWriter(root) eagerly creates root/signals in timenet_timef/io.py Lines 76-78. Any later failure here—or at Lines 182-183—leaves a partial tree behind, and the next retry then hits Line 65's FileExistsError even though no valid dataset was produced.

Also applies to: 179-183

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@exporters/timef_export.py` around lines 85 - 95, TimeFWriter(root) eagerly
creates the final root/signals tree which can leave partial files on failure;
instead stage output to a temporary directory and only instantiate or move
TimeFWriter into the final path on success: write signals via
_write_signal_frame into a temp staging directory (or pass a staging path to
TimeFWriter), accumulate samples/signals/annotations there, and after all rows
and writes complete, atomically move or rename the staging directory to the
intended root (or then construct TimeFWriter(root) if it must create the tree).
Also ensure any temporary directory is removed on failure so subsequent retries
won't hit FileExistsError.

total_duration = float(row.values.shape[1] * config.sampling_period)

signal_refs: list[SampleSignalRef] = []
for channel_idx, channel_name in enumerate(channel_names):
signal_id = sample_id * len(channel_names) + channel_idx
signals.append(
Signal(
id=signal_id,
spec_id=channel_idx,
name=f"{row.row_id}:{channel_name}",
sampling_rate=sampling_rate,
total_duration=total_duration,
shard_file=signal_file,
row_group_id=0,
row_start=0,
row_count=row.values.shape[1],
)
)
signal_refs.append(
SampleSignalRef(
signal_id=signal_id,
sampling_rate=sampling_rate,
channels=[channel_name],
)
)

samples.append(
Sample(
id=sample_id,
dataset_id=config.dataset_id,
unit_timestamp=config.unit_timestamp,
windows=None,
signals=signal_refs,
)
)

for annotation in row.annotations:
spec_id = _SPEC_IDS.get(annotation.kind)
if spec_id is None:
raise ValueError(f"Unsupported annotation kind: {annotation.kind}")
reference = json.dumps(
{
"channel_names": [channel_names[idx] for idx in annotation.channel_idxs],
"window": list(annotation.window) if annotation.window is not None else None,
Comment on lines +136 to +139
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate annotation.channel_idxs before translating them to names.

runtime.types.Annotation does not bound-check these indexes. Negative indexes silently bind to the wrong channel, and an out-of-range index raises IndexError mid-export.

Suggested change
             if spec_id is None:
                 raise ValueError(f"Unsupported annotation kind: {annotation.kind}")
+            invalid_channel_idxs = [
+                idx for idx in annotation.channel_idxs if idx < 0 or idx >= len(channel_names)
+            ]
+            if invalid_channel_idxs:
+                raise ValueError(
+                    f"Annotation {annotation.kind!r} references invalid channel indexes: {invalid_channel_idxs}"
+                )
             reference = json.dumps(
                 {
                     "channel_names": [channel_names[idx] for idx in annotation.channel_idxs],
                     "window": list(annotation.window) if annotation.window is not None else None,
                     "kind": annotation.kind,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@exporters/timef_export.py` around lines 136 - 139, The code that builds
reference (the json.dumps block using channel_names and annotation.channel_idxs)
must validate annotation.channel_idxs before indexing: ensure each idx is an
integer and 0 <= idx < len(channel_names) (do not allow negative wrapping), and
handle violations with a clear error or by filtering them out consistently;
update the comprehension that produces "channel_names": [channel_names[idx] for
idx in annotation.channel_idxs] to first validate/map the indexes (or raise a
ValueError with the bad idx and annotation id/context) so an IndexError or
silent negative wrap cannot occur during export.

"kind": annotation.kind,
},
sort_keys=True,
)

label = None if annotation.label is None else str(annotation.label)
answer = annotation.text if label is None else None
rationale = annotation.text if label is not None and annotation.text else None

annotations.append(
PersistedAnnotation(
id=annotation_id,
dataset_id=config.dataset_id,
spec_id=spec_id,
samples=[AnnotationSampleRef(sample=sample_id, reference=reference)],
question=_QUESTIONS[annotation.kind],
answer=answer,
label=label,
rationale=rationale,
)
)
annotation_id += 1

manifest = DatasetManifest(
dataset_id=config.dataset_id,
version=version,
source=config.source,
description=config.description,
domains=list(config.domains),
tags=list(config.tags),
signals=signals,
signal_spec=signal_specs,
sensor_spec=[],
annotation_spec=annotation_specs,
sample_count=len(samples),
annotation_count=len(annotations),
validated=False,
)

writer.write_manifest(manifest)
writer.write_samples(samples)
writer.write_annotations(annotations, manifest)
validate_dataset(root)
mark_validated(root, validated=True)
return root


def _build_signal_specs(channel_names: tuple[str, ...], config: TimeFExportConfig) -> list[SignalSpec]:
value_unit_map = config.value_unit_map or {}
return [
SignalSpec(
id=idx,
name=channel_name,
channels=[channel_name],
unit_sampling_rate=config.unit_sampling_rate,
unit_timestamp=config.unit_timestamp,
unit_value=value_unit_map.get(channel_name),
)
for idx, channel_name in enumerate(channel_names)
]


def _validate_row_shape(row, channel_names: tuple[str, ...]) -> None:
if tuple(row.channel_names) != channel_names:
raise ValueError("All RuntimeRow objects must share the same channel_names order for export")
if row.values.shape[0] != len(channel_names):
raise ValueError("RuntimeRow values must have one row per channel")


def _write_signal_frame(path: Path, sampling_period: float, time_column_name: str, row) -> None:
time_axis = [idx * sampling_period for idx in range(row.values.shape[1])]
payload: dict[str, list[float]] = {time_column_name: time_axis}
for idx, channel_name in enumerate(row.channel_names):
payload[channel_name] = row.values[idx].astype(float).tolist()
path.parent.mkdir(parents=True, exist_ok=True)
Comment on lines +209 to +214
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject duplicate or colliding column names before building the parquet payload.

This payload is keyed by time_column_name and row.channel_names. Duplicate channel names—or a channel named exactly like the time column—overwrite earlier entries in the dict and silently drop signal data.

Suggested change
 def _write_signal_frame(path: Path, sampling_period: float, time_column_name: str, row) -> None:
+    channel_names = tuple(row.channel_names)
+    if len(set(channel_names)) != len(channel_names):
+        raise ValueError("RuntimeRow channel_names must be unique for export")
+    if time_column_name in channel_names:
+        raise ValueError("time_column_name must not collide with a channel name")
+
     time_axis = [idx * sampling_period for idx in range(row.values.shape[1])]
     payload: dict[str, list[float]] = {time_column_name: time_axis}
-    for idx, channel_name in enumerate(row.channel_names):
+    for idx, channel_name in enumerate(channel_names):
         payload[channel_name] = row.values[idx].astype(float).tolist()
     path.parent.mkdir(parents=True, exist_ok=True)
     pq.write_table(pa.table(payload), path)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@exporters/timef_export.py` around lines 209 - 214, In _write_signal_frame
ensure you detect and reject duplicate or colliding column names before building
payload: check row.channel_names for duplicates and also check if any channel
name equals the time_column_name, and if any conflict exists raise a clear
ValueError (including the offending names) instead of silently overwriting; do
this validation prior to creating payload and populating entries so callers get
an explicit error when channel names conflict with each other or with
time_column_name.

pq.write_table(pa.table(payload), path)
9 changes: 3 additions & 6 deletions extractors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from aggregators import MetricAggregator
from detectors import StructuralDetector
from timef.schema import Annotation, AnnotationSpec, Sample, SampleRef, SampleSignalRef, Signal
from runtime import Annotation, RuntimeRow

VALID_CAPTION_TYPES = ("statistical", "structural", "semantic")

Expand Down Expand Up @@ -51,16 +51,13 @@ def __init_subclass__(cls, **kwargs):
def __init__(self, config: ChannelConfig):
self.config = config

def get_annotation_spec(self) -> AnnotationSpec:
return AnnotationSpec(id=f"captioning:{self.caption_type}", task="captioning")

@staticmethod
def _seed(key: str) -> int:
return zlib.crc32(key.encode("utf-8")) & 0xFFFFFFFF

@abc.abstractmethod
def extract(self, signals: list[Signal]) -> list[tuple[Sample, Annotation]]:
"""Extract captions and return (Sample, Annotation) pairs."""
def extract(self, row: RuntimeRow) -> list[Annotation]:
"""Extract annotations for a row."""
...


Expand Down
4 changes: 2 additions & 2 deletions extractors/generative.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from extractors import CaptionExtractor, ChannelConfig
from models.base import BaseModel
from timef.schema import Annotation, Sample, Signal
from runtime import Annotation, RuntimeRow


class GenerativeExtractor(CaptionExtractor):
Expand All @@ -20,5 +20,5 @@ def __init__(self, config: ChannelConfig, model: BaseModel):
super().__init__(config)
self.model = model

def extract(self, signals: list[Signal]) -> list[tuple[Sample, Annotation]]:
def extract(self, row: RuntimeRow) -> list[Annotation]:
raise NotImplementedError
Loading
Loading