Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions crates/farmer/ab-farmer-rpc-primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ impl Encode for FarmerAppInfo {
.saturating_add(Encode::size_hint(&self.protocol_info))
}

fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
fn encode_to<O>(&self, dest: &mut O)
where
O: Output + ?Sized,
{
Encode::encode_to(&self.genesis_root, dest);
Encode::encode_to(
&self
Expand All @@ -71,7 +74,10 @@ impl Encode for FarmerAppInfo {
impl EncodeLike for FarmerAppInfo {}

impl Decode for FarmerAppInfo {
fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
where
I: Input,
{
Ok(FarmerAppInfo {
genesis_root: BlockRoot::decode(input)
.map_err(|error| error.chain("Could not decode `FarmerAppInfo::genesis_root`"))?,
Expand Down
10 changes: 8 additions & 2 deletions crates/farmer/ab-farmer/src/cluster/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ impl Encode for ClusterCacheId {
}

#[inline]
fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
fn encode_to<O>(&self, dest: &mut O)
where
O: Output + ?Sized,
{
match self {
ClusterCacheId::Ulid(ulid) => {
dest.push_byte(0);
Expand All @@ -60,7 +63,10 @@ impl EncodeLike for ClusterCacheId {}

impl Decode for ClusterCacheId {
#[inline]
fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
fn decode<I>(input: &mut I) -> Result<Self, parity_scale_codec::Error>
where
I: Input,
{
match input.read_byte().map_err(|e| {
e.chain("Could not decode `ClusterCacheId`, failed to read variant byte")
})? {
Expand Down
176 changes: 3 additions & 173 deletions crates/farmer/ab-farmer/src/cluster/controller/stream_map.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
//! A stream map that keeps track of futures that are currently being processed for each `Index`.

#[cfg(test)]
mod tests;

use futures::stream::FusedStream;
use futures::{FutureExt, Stream, StreamExt};
use std::collections::hash_map::Entry;
Expand Down Expand Up @@ -110,176 +113,3 @@ where
self.in_progress.is_empty() && self.queue.is_empty()
}
}

#[cfg(test)]
mod tests {
use crate::cluster::controller::stream_map::StreamMap;
// TODO: Not supported on Miri on macOS yet: https://github.com/rust-lang/miri/issues/4007
#[cfg(not(all(miri, target_os = "macos")))]
// TODO: Not supported on Miri on Windows yet: https://github.com/rust-lang/miri/issues/1719
#[cfg(not(all(miri, target_os = "windows")))]
use futures::StreamExt;
use futures::stream::FusedStream;
use std::task::Context;

fn assert_is_terminated<'a, R: 'a>(stream_map: &StreamMap<'a, u16, R>) {
assert!(stream_map.in_progress.is_empty());
assert!(stream_map.queue.is_empty());
assert!(stream_map.is_terminated());
}

#[test]
fn test_stream_map_default() {
let stream_map = StreamMap::<u16, ()>::default();
assert_is_terminated(&stream_map);
}

#[test]
fn test_stream_map_push() {
let mut stream_map = StreamMap::default();

let index = 1;
let fut = Box::pin(async {});
stream_map.push(index, fut);
assert!(stream_map.queue.is_empty());
assert!(stream_map.in_progress.contains_key(&index));
assert!(!stream_map.is_terminated());
}

#[test]
fn test_stream_map_add_if_not_in_progress() {
let mut stream_map = StreamMap::default();

let index = 1;
let fut1 = Box::pin(async {});
let fut2 = Box::pin(async {});
assert!(stream_map.add_if_not_in_progress(index, fut1));
assert!(!stream_map.add_if_not_in_progress(index, fut2));
}

#[test]
fn test_stream_map_poll_next_entry() {
let mut stream_map = StreamMap::default();

let fut = Box::pin(async {});
stream_map.push(0, fut);

let mut cx = Context::from_waker(futures::task::noop_waker_ref());
let poll_result = stream_map.poll_next_entry(&mut cx);
assert!(poll_result.is_ready());
assert_is_terminated(&stream_map);
}

// TODO: Not supported on Miri on macOS yet: https://github.com/rust-lang/miri/issues/4007
#[cfg(not(all(miri, target_os = "macos")))]
// TODO: Not supported on Miri on Windows yet: https://github.com/rust-lang/miri/issues/1719
#[cfg(not(all(miri, target_os = "windows")))]
#[tokio::test]
async fn test_stream_map_stream() {
let mut stream_map = StreamMap::default();

let fut00 = Box::pin(async { 0x00 });
stream_map.push(0, fut00);

let next_item = stream_map.next().await;
assert_eq!(next_item, Some((0, 0x00)));
assert_is_terminated(&stream_map);

let fut11 = Box::pin(async { 0x11u8 });
let fut12 = Box::pin(async { 0x12u8 });
let fut13 = Box::pin(async { 0x13u8 });
let fut21 = Box::pin(async {
// Yield the current task three times to ensure that fut22 is polled last.
for _ in 0..3u8 {
tokio::task::yield_now().await;
}
0x21u8
});
let fut22 = Box::pin(async { 0x22u8 });

// Push 2 futs into the same farm index 1, expect fut11 to be polled first,
// fut12 should push into the in_progress queue and wait for fut11 to finish
stream_map.push(1, fut11);
stream_map.push(1, fut12);
assert!(!stream_map.is_terminated());
assert_eq!(stream_map.in_progress.len(), 1);
assert!(stream_map.in_progress.contains_key(&1));
assert_eq!(stream_map.queue.len(), 1);

// Push fut22 into farm index 2, we have 2 in progress futures now
stream_map.push(2, fut21);
assert_eq!(stream_map.in_progress.len(), 2);
assert!(stream_map.in_progress.contains_key(&2));
assert_eq!(stream_map.queue.len(), 1);

// Push fut22 into farm index 2, in-progress queue length should not change,
// but the queue should have 2 entries now
stream_map.push(2, fut22);
assert_eq!(stream_map.in_progress.len(), 2);
assert_eq!(stream_map.queue.len(), 2);
assert_eq!(stream_map.queue[&2].len(), 1);

// Push fut13 into farm index 1, fut13 should be polled after fut11 and fut12
stream_map.push(1, fut13);
assert!(!stream_map.is_terminated());
assert!(stream_map.in_progress.contains_key(&1));
assert_eq!(stream_map.in_progress.len(), 2);
assert_eq!(stream_map.queue[&1].len(), 2);

// Poll the next item in the stream, fut11 should be polled first,
// fut12 should be pushed into the in-progress queue
let next_item = stream_map.next().await;
assert!(!stream_map.is_terminated());
assert_eq!(next_item.unwrap(), (1, 0x11));
assert!(stream_map.in_progress.contains_key(&1));
assert!(stream_map.in_progress.contains_key(&2));
assert_eq!(stream_map.in_progress.len(), 2);
assert_eq!(stream_map.queue[&1].len(), 1);

// Here, fut12 and fut 13 should be polled before fut21 because fut21 has a yield point.
// Fut13 should be pushed into the in_progress queue.
// There are no more futures waiting to be polled in farm index 1, so the farm index 1
// should be removed from the queue map.
let next_item = stream_map.next().await;
assert!(!stream_map.is_terminated());
assert_eq!(next_item.unwrap(), (1, 0x12));
assert_eq!(stream_map.in_progress.len(), 2);
assert!(stream_map.in_progress.contains_key(&1));
assert!(stream_map.in_progress.contains_key(&2));
assert!(!stream_map.queue.contains_key(&1));

// Poll the next item in the stream, fut13 should be polled next.
// For now, all futures in farm index 1 have been polled, so farm index 1 should be removed
// from the in-progress queue.
let next_item = stream_map.next().await;
assert!(!stream_map.is_terminated());
assert_eq!(next_item.unwrap(), (1, 0x13));
assert_eq!(stream_map.in_progress.len(), 1);
assert!(!stream_map.in_progress.contains_key(&1));
assert!(stream_map.in_progress.contains_key(&2));
assert!(!stream_map.queue.contains_key(&1));
assert_eq!(stream_map.queue[&2].len(), 1);

// We hope futures with the same index are polled in the order they are pushed,
// so fut21 should be polled next.
// fut22 should be pushed into the in-progress queue.
// There are no more futures waiting to be polled in farm index 2, so the farm index 2
// should be removed from the queue map.
let next_item = stream_map.next().await;
assert!(!stream_map.is_terminated());
assert_eq!(next_item.unwrap(), (2, 0x21));
assert_eq!(stream_map.in_progress.len(), 1);
assert!(!stream_map.in_progress.contains_key(&1));
assert!(stream_map.in_progress.contains_key(&2));
assert!(!stream_map.queue.contains_key(&1));
assert!(!stream_map.queue.contains_key(&2));

// Poll the next item in the stream, fut22 should be polled next.
// For now, all futures in farm index 2 have been polled, so farm index 2 should be removed
// from the in-progress queue.
// Finally, the stream should be terminated.
let next_item = stream_map.next().await;
assert_eq!(next_item, Some((2, 0x22)));
assert_is_terminated(&stream_map);
}
}
Loading
Loading