Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;

Expand All @@ -12,6 +12,28 @@ use crate::stream::{OutputStreamHandle, PlayError};
use crate::{queue, source::Done, Sample, Source};
use cpal::FromSample;

#[derive(Debug)]
pub struct AtomicF64 {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice solution to not having an atomicf64!

storage: AtomicU64,
}

impl AtomicF64 {
pub fn new(value: f64) -> Self {
let as_u64 = value.to_bits();
Self {
storage: AtomicU64::new(as_u64),
}
}
pub fn store(&self, value: f64, ordering: Ordering) {
let as_u64 = value.to_bits();
self.storage.store(as_u64, ordering)
}
pub fn load(&self, ordering: Ordering) -> f64 {
let as_u64 = self.storage.load(ordering);
f64::from_bits(as_u64)
}
}

/// Handle to a device that outputs sounds.
///
/// Dropping the `Sink` stops all sounds. You can use `detach` if you want the sounds to continue
Expand All @@ -22,6 +44,7 @@ pub struct Sink {

controls: Arc<Controls>,
sound_count: Arc<AtomicUsize>,
position: Arc<AtomicF64>,

detached: bool,
}
Expand Down Expand Up @@ -92,6 +115,7 @@ impl Sink {
seek: Mutex::new(None),
}),
sound_count: Arc::new(AtomicUsize::new(0)),
position: Arc::new(AtomicF64::new(0.0)),
detached: false,
};
(sink, queue_rx)
Expand Down Expand Up @@ -119,6 +143,7 @@ impl Sink {

let source = source
.speed(1.0)
.trackable(self.position.clone())
.pausable(false)
.amplify(1.0)
.skippable()
Expand All @@ -140,6 +165,7 @@ impl Sink {
amp.inner_mut()
.set_paused(controls.pause.load(Ordering::SeqCst));
amp.inner_mut()
.inner_mut()
.inner_mut()
.set_factor(*controls.speed.lock().unwrap());
if let Some(seek) = controls.seek.lock().unwrap().take() {
Expand Down Expand Up @@ -309,6 +335,12 @@ impl Sink {
pub fn len(&self) -> usize {
self.sound_count.load(Ordering::Relaxed)
}

/// Returns the position of the sound that's being played.
#[inline]
pub fn get_pos(&self) -> f64 {
self.position.load(Ordering::Relaxed)
}
}

impl Drop for Sink {
Expand Down
11 changes: 11 additions & 0 deletions src/source/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
//! Sources of sound and various filters.

use std::sync::Arc;
use std::time::Duration;

use cpal::FromSample;

use crate::sink::AtomicF64;
use crate::Sample;

pub use self::amplify::Amplify;
Expand All @@ -21,6 +23,7 @@ pub use self::from_iter::{from_iter, FromIter};
pub use self::mix::Mix;
pub use self::pausable::Pausable;
pub use self::periodic::PeriodicAccess;
pub use self::position::TrackPosition;
pub use self::repeat::Repeat;
pub use self::samples_converter::SamplesConverter;
pub use self::sine::SineWave;
Expand Down Expand Up @@ -48,6 +51,7 @@ mod from_iter;
mod mix;
mod pausable;
mod periodic;
mod position;
mod repeat;
mod samples_converter;
mod sine;
Expand Down Expand Up @@ -333,6 +337,13 @@ where
skippable::skippable(self)
}

fn trackable(self, position: Arc<AtomicF64>) -> TrackPosition<Self>
where
Self: Sized,
{
position::trackable(self, position)
}

/// Applies a low-pass filter to the source.
/// **Warning**: Probably buggy.
#[inline]
Expand Down
180 changes: 180 additions & 0 deletions src/source/position.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
use std::{
sync::{atomic::Ordering, Arc},
time::Duration,
};

use crate::{sink::AtomicF64, Sample, Source};

use super::SeekError;

/// Internal function that builds a `TrackPosition` object.
pub fn trackable<I>(source: I, position: Arc<AtomicF64>) -> TrackPosition<I> {
TrackPosition {
input: source,
samples_counted: 0,
offset_duration: 0.0,
position,
current_frame_sample_rate: 0,
current_frame_channels: 0,
current_frame_len: None,
}
}

#[derive(Debug)]
pub struct TrackPosition<I> {
input: I,
samples_counted: usize,
offset_duration: f64,
position: Arc<AtomicF64>,
current_frame_sample_rate: u32,
current_frame_channels: u16,
current_frame_len: Option<usize>,
}

impl<I> TrackPosition<I> {
/// Returns a reference to the inner source.
#[inline]
pub fn inner(&self) -> &I {
&self.input
}

/// Returns a mutable reference to the inner source.
#[inline]
pub fn inner_mut(&mut self) -> &mut I {
&mut self.input
}

/// Returns the inner source.
#[inline]
pub fn into_inner(self) -> I {
self.input
}
}

impl<I> TrackPosition<I>
where
I: Source,
I::Item: Sample,
{
/// Returns the position of the source.
#[inline]
fn get_pos(&self) -> f64 {
self.samples_counted as f64 / self.input.sample_rate() as f64 / self.input.channels() as f64
+ self.offset_duration
}

#[inline]
fn set_current_frame(&mut self) {
self.current_frame_len = self.current_frame_len();
self.current_frame_sample_rate = self.sample_rate();
self.current_frame_channels = self.channels();
}
}

impl<I> Iterator for TrackPosition<I>
where
I: Source,
I::Item: Sample,
{
type Item = I::Item;

#[inline]
fn next(&mut self) -> Option<I::Item> {
// This should only be executed once at the first call to next.
if self.current_frame_len.is_none() {
self.set_current_frame();
}

let item = self.input.next();
if item.is_some() {
self.samples_counted += 1;

// At the end of a frame add the duration of this frame to
// offset_duration and start collecting samples again.
if Some(self.samples_counted) == self.current_frame_len() {
self.offset_duration += self.samples_counted as f64
/ self.current_frame_sample_rate as f64
/ self.current_frame_channels as f64;

// Reset.
self.samples_counted = 0;
self.set_current_frame();
};
};
self.position.store(self.get_pos(), Ordering::Relaxed);
item
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.input.size_hint()
}
}

impl<I> Source for TrackPosition<I>
where
I: Source,
I::Item: Sample,
{
#[inline]
fn current_frame_len(&self) -> Option<usize> {
self.input.current_frame_len()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after current_frame_len the sample rate may change.
One way to deal with that is to:

  • collect samples until the frame rate changes
  • convert the collected samples using the old frame-rate and add them to a duration
  • change get_pos() to return the newly collected samples * new frame rate + duration

Feel free to experiment with other approaches to find what fits best. Performance matters but as always premature optimization is the root of all evil. An if statement that fires every frame_len in next() might very well pulled up by the optimizer and merged with the iterator in some more optimal fashion. Better to measure the impact before spending time on optimizing.

@Gusted Gusted Jun 10, 2024

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you happen to know which decoder currently handle this correctly? I've tried with FLAC using https://github.com/ietf-wg-cellar/flac-test-files/blob/main/uncommon/01%20-%20changing%20samplerate.flac with the symphonia decoder and after switching to the next sample rate it stops working and with claxon it does work, but doesn't seem to record the new sample rate (just reports the 32 kHz all the way trough) and it ends up 'pitching' the audio. (FWIW, VLC also borks on this only mpv seems to somewhat handle it but has incorrect position after switching the sample rate and has the incorrect total duration). Shouldn't be hard to handle these cases correctly, but at least the FLAC decoder is stopping me from testing this and I don't know of any other format that supports variable sample rate 😅

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gone ahead and opened PRs against claxon and symphonia to correctly handle such cases. ruuda/claxon#35 & pdeljanov/Symphonia#287. For the claxon support a single line would need to be added to rodio once the PR is merged:

diff --git a/src/decoder/flac.rs b/src/decoder/flac.rs
index 9dbc0c4..b3b69dd 100644
--- a/src/decoder/flac.rs
+++ b/src/decoder/flac.rs
@@ -118,6 +118,7 @@ where
             let buffer = mem::take(&mut self.current_block);
             match self.reader.blocks().read_next_or_eof(buffer) {
                 Ok(Some(block)) => {
+                    self.sample_rate = block.sample_rate();
                     self.current_block_channel_len = (block.len() / block.channels()) as usize;
                     self.current_block = block.into_buffer();
                 }

@yara-blue yara-blue Jun 10, 2024

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you happen to know which decoder currently handle this correctly?

On the rodio side at least the symphonia and vorbis code takes care to handle this. I had no idea it was broken in symphonia & claxon (rodio's test suit is kinda sparse atm). Like us they are desperate for maintainers/contributers :).

I've gone ahead and opened PRs against claxon and symphonia

nice! thanks a lot for that 👍

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the decoders are fixed 0e6d5a1 should make the position tracking work correctly with. Doing a few local tests doesn't hint at performance problems (but only a profiler would be able to confirm that) and tracking the position of https://github.com/ietf-wg-cellar/flac-test-files/blob/main/uncommon/01%20-%20changing%20samplerate.flac works flawlessly.

}

#[inline]
fn channels(&self) -> u16 {
self.input.channels()
}

#[inline]
fn sample_rate(&self) -> u32 {
self.input.sample_rate()
}

#[inline]
fn total_duration(&self) -> Option<Duration> {
self.input.total_duration()
}

#[inline]
fn try_seek(&mut self, pos: Duration) -> Result<(), SeekError> {
let result = self.input.try_seek(pos);
if result.is_ok() {
self.offset_duration = pos.as_secs_f64();
// This assumes that the seek implementation of the codec always
// starts again at the beginning of a frame. Which is the case with
// symphonia.
self.samples_counted = 0;
self.position.store(self.get_pos(), Ordering::Relaxed);
}
result
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use crate::buffer::SamplesBuffer;
use crate::sink::AtomicF64;
use crate::source::Source;

#[test]
fn test_position() {
let inner = SamplesBuffer::new(1, 1, vec![10i16, -10, 10, -10, 20, -20]);
let position = Arc::new(AtomicF64::new(0.0));
let mut source = inner.trackable(position.clone());

assert_eq!(position.load(Ordering::Relaxed), 0.0);
source.next();
assert_eq!(position.load(Ordering::Relaxed), 1.0);

source.next();
assert_eq!(position.load(Ordering::Relaxed), 2.0);

assert_eq!(source.try_seek(Duration::new(1, 0)).is_ok(), true);
assert_eq!(position.load(Ordering::Relaxed), 1.0);
}
}
6 changes: 6 additions & 0 deletions src/spatial_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,10 @@ impl SpatialSink {
pub fn try_seek(&self, pos: Duration) -> Result<(), SeekError> {
self.sink.try_seek(pos)
}

/// Returns the position of the sound that's being played.
#[inline]
pub fn get_pos(&self) -> f64 {
self.sink.get_pos()
}
}