Skip to content
89 changes: 87 additions & 2 deletions Python/pyxdf/pyxdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import xml.etree.ElementTree as ET
from collections import OrderedDict, defaultdict
import logging

from scipy.interpolate import interp1d
import numpy as np

__all__ = ['load_xdf']
Expand All @@ -26,6 +26,7 @@ def load_xdf(filename,
synchronize_clocks=True,
handle_clock_resets=True,
dejitter_timestamps=True,
sync_timestamps=False,
jitter_break_threshold_seconds=1,
jitter_break_threshold_samples=500,
clock_reset_threshold_seconds=5,
Expand Down Expand Up @@ -55,6 +56,16 @@ def load_xdf(filename,
dejitter_timestamps : Whether to perform jitter removal for regularly
sampled streams. (default: true)

sync_timestamps: {bool str}
sync timestamps of all streams sample-wise with the stream to the
highest effective sampling rate

False -> no syncing
True -> linear syncing
str:<'linear’, ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’,
‘previous’, ‘next’> for method inherited from
scipy.interpolate.interp1d

on_chunk : Function that is called for each chunk of data as it is
being retrieved from the file; the function is allowed to modify
the data (for example, sub-sample it). The four input arguments
Expand Down Expand Up @@ -372,7 +383,14 @@ def __init__(self, xml):
stream['info']['effective_srate'] = tmp.effective_srate
stream['time_series'] = tmp.time_series
stream['time_stamps'] = tmp.time_stamps


# sync sampling with the fastest timeseries by interpolation / shifting
if sync_timestamps:
if type(sync_timestamps) is not str:
sync_timestamps = 'linear'
logger.warning('sync_timestamps defaults to "linear"')
streams = _sync_timestamps(streams, kind=sync_timestamps)

streams = [s for s in streams.values()]
sort_data = [s['info']['name'][0] for s in streams]
streams = [x for _, x in sorted(zip(sort_data, streams))]
Expand Down Expand Up @@ -553,6 +571,73 @@ def _jitter_removal(streams,
return streams


def _sync_timestamps(streams, kind='linear'):
srate_key = 'effective_srate'

snames = [stream['info']['source_id'] for stream in streams.values()]
stamps = [stream['time_stamps'] for stream in streams.values()]
srates = [stream['info'][srate_key] for stream in streams.values()]
max_fs = max(srates, default=0)

if max_fs == 0: #either no valid stream or all streams are async
return streams
if srates.count(max_fs) > 1:
# highly unlikely, with floating point precision and sampling noise
# but anyways: better safe than sorry
logger.warning('I found at least two streams with identical effective '
'srate. I select one at random for syncing timestamps.')

fastest_stream = snames[srates.index(max_fs)]
new_timestamps = stamps[srates.index(max_fs)]

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streams are not guaranteed to start and stop at the same time. i.e. the first sample for the fastest stream might not come until several minutes into a recording. What do do then?
I think we can extrapolate timestamps (but not data!) assuming a constant interval.
So maybe for each stream we need to create a new timestamp vector that uses new_timestamps for the overlapping periods, and then extrapolates timestamps for the non-overlapping periods assuming even intervals.

For data processing streams that require all sources to have the same number of samples, we could then have a separate option to trim all streams to smallest overlapping regions.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points. I implemented that new_timestamps is now based on an extrapolation to the earliest and latest timestamp of any stream using the timestamps of the fastest stream as seed. An alternative would have been to generate a completely new timestamp vector based only on start and endpoint and an arbitrary sampling rate. But i did want to reduce the amount of interpolation necessary, and not implement downsampling. Additionally, i added a function to limit all streams to overlapping time-periods. The latter is supposed to work independently of whether timestamps have been synced before.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expanded interpolation to also deal with integer values. If a channels format is any integer type, interpolation now enforces integers as output with np.around. I also improved the coumentation, added unit tests to be run with pytest, and fixed a couple of bugs detected with the tests. Feel more or less finished, and am open for feedback.


for stream in streams.values():
channel_format = stream['info']['channel_format'][0]

if stream['info']['source_id'] == fastest_stream:
# no need to interpolate the fastest stream
continue
if ( (channel_format == 'string') and
(stream['info'][srate_key]== 0) ):
# you can't really interpolate strings; and streams with srate=0
# don't have a real sampling rate. One approach to sync them is to
# shift their events to the nearest timestamp of the fastest
# timeseries
shifted_x = []
for x in stream['time_stamps']:
argmin = (abs(new_timestamps-x)).argmin()
shifted_x.append(new_timestamps[argmin])

shifted_y = []
for x in new_timestamps:
try:
idx = shifted_x.index(x)
y = stream['time_series'][idx]
shifted_y.append(y)
except ValueError:
shifted_y.append([''])

stream['time_series'] = np.asanyarray((shifted_y))
stream['time_stamps'] = new_timestamps

elif channel_format == 'float32':
#a float32 can be interpolated with interp1d
# bounds_error=False replaces everything outside of interpolation
# zone with NaNs
y = stream['time_series']
x = stream['time_stamps']
f = interp1d(x, y, kind=kind, axis=0,
assume_sorted=True, #speed up
bounds_error=False)

stream['time_series'] = f(new_timestamps)
stream['time_stamps'] = new_timestamps
stream['info']['effective_srate'] = max_fs
else:
raise NotImplementedError("Don't know how to sync sampling for " +
'channel_format={}'.format(
channel_format))
return streams

# noinspection PyTypeChecker
def _robust_fit(A, y, rho=1, iters=1000):
"""Perform a robust linear regression using the Huber loss function.
Expand Down