-
Notifications
You must be signed in to change notification settings - Fork 6
Timed infinite loop fix #80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
60c123d
e6fb7df
66fcd74
b4b0bc2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,16 +3,16 @@ | |||||||||
| //! To apply this to a given (receiving) actor: | ||||||||||
| //! * Use [`TimedContext<Self::Message>`] as [`Actor::Context`] associated type. | ||||||||||
| //! * Such actors cannot be spawned unless wrapped, making it impossible to forget wrapping it. | ||||||||||
| //! * Wrapped actor's `Error` must implement [`From<SendError>`]. | ||||||||||
| //! * Wrap the actor in [`Timed`] before spawning. | ||||||||||
| //! | ||||||||||
| //! The wrapped actor will accept [`TimedMessage<M>`] with convenience conversion from `M`. | ||||||||||
| //! [`RecipientExt`] becomes available for [`Recipient<TimedMessage<M>>`]s which provides methods like | ||||||||||
| //! `send_delayed()`, `send_recurring()`. | ||||||||||
| //! | ||||||||||
| //! Once accepted by the actor, delayed and recurring messages do not occupy place in actor's | ||||||||||
| //! channel inbox, they are placed to internal queue instead. Due to the design, delayed and | ||||||||||
| //! recurring messages have always lower priority than instant messages when the actor is | ||||||||||
| //! saturated. | ||||||||||
| //! channel inbox, they are placed to internal queue instead. When delayed/recurring message become | ||||||||||
| //! due, they go through the actor's regular inboxes (subject to prioritization). | ||||||||||
| //! | ||||||||||
| //! See `delay_actor.rs` example for usage. | ||||||||||
|
|
||||||||||
|
|
@@ -24,11 +24,11 @@ use std::{ | |||||||||
| time::{Duration, Instant}, | ||||||||||
| }; | ||||||||||
|
|
||||||||||
| /// A message that can be delivered now, at certain time and optionally repeatedly. | ||||||||||
| /// A message that can be enqueued now, at certain time and optionally repeatedly. | ||||||||||
| pub enum TimedMessage<M> { | ||||||||||
| Instant { message: M }, | ||||||||||
| Delayed { message: M, fire_at: Instant }, | ||||||||||
| Recurring { factory: Box<dyn FnMut() -> M + Send>, fire_at: Instant, interval: Duration }, | ||||||||||
| Delayed { message: M, enqueue_at: Instant }, | ||||||||||
| Recurring { factory: Box<dyn FnMut() -> M + Send>, enqueue_at: Instant, interval: Duration }, | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /// This implementation allows sending direct unwrapped messages to wrapped actors. | ||||||||||
|
|
@@ -43,19 +43,19 @@ pub trait RecipientExt<M> { | |||||||||
| /// Send a `message` now. Convenience to wrap message in [`TimedMessage::Instant`]. | ||||||||||
| fn send_now(&self, message: M) -> Result<(), SendError>; | ||||||||||
|
|
||||||||||
| /// Send a `message` to be delivered later at a certain instant. | ||||||||||
| fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError>; | ||||||||||
| /// Send a `message` to be enqueued later at a certain instant. | ||||||||||
| fn send_timed(&self, message: M, enqueue_at: Instant) -> Result<(), SendError>; | ||||||||||
|
|
||||||||||
| /// Send a `message` to be delivered later after some time from now. | ||||||||||
| /// Send a `message` to be enqueued later after some time from now. | ||||||||||
| fn send_delayed(&self, message: M, delay: Duration) -> Result<(), SendError> { | ||||||||||
| self.send_timed(message, Instant::now() + delay) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /// Schedule sending of message at `fire_at` plus at regular `interval`s from that point on. | ||||||||||
| /// Schedule sending of message at `enqueue_at` plus at regular `interval`s from that point on. | ||||||||||
| fn send_recurring( | ||||||||||
| &self, | ||||||||||
| factory: impl FnMut() -> M + Send + 'static, | ||||||||||
| fire_at: Instant, | ||||||||||
| enqueue_at: Instant, | ||||||||||
| interval: Duration, | ||||||||||
| ) -> Result<(), SendError>; | ||||||||||
| } | ||||||||||
|
|
@@ -65,17 +65,17 @@ impl<M> RecipientExt<M> for Recipient<TimedMessage<M>> { | |||||||||
| self.send(TimedMessage::Instant { message }) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError> { | ||||||||||
| self.send(TimedMessage::Delayed { message, fire_at }) | ||||||||||
| fn send_timed(&self, message: M, enqueue_at: Instant) -> Result<(), SendError> { | ||||||||||
| self.send(TimedMessage::Delayed { message, enqueue_at }) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| fn send_recurring( | ||||||||||
| &self, | ||||||||||
| factory: impl FnMut() -> M + Send + 'static, | ||||||||||
| fire_at: Instant, | ||||||||||
| enqueue_at: Instant, | ||||||||||
| interval: Duration, | ||||||||||
| ) -> Result<(), SendError> { | ||||||||||
| self.send(TimedMessage::Recurring { factory: Box::new(factory), fire_at, interval }) | ||||||||||
| self.send(TimedMessage::Recurring { factory: Box::new(factory), enqueue_at, interval }) | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
|
|
@@ -111,50 +111,52 @@ pub struct Timed<A: Actor> { | |||||||||
| queue: BinaryHeap<QueueItem<A::Message>>, | ||||||||||
| } | ||||||||||
|
|
||||||||||
| impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Timed<A> { | ||||||||||
| impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Timed<A> | ||||||||||
| where | ||||||||||
| <A as Actor>::Error: From<SendError>, | ||||||||||
| { | ||||||||||
| pub fn new(inner: A) -> Self { | ||||||||||
| Self { inner, queue: Default::default() } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /// Process any pending messages in the internal queue, calling wrapped actor's `handle()`. | ||||||||||
| fn process_queue(&mut self, context: &mut <Self as Actor>::Context) -> Result<(), A::Error> { | ||||||||||
| // Handle all messages that should have been handled by now. | ||||||||||
| let now = Instant::now(); | ||||||||||
| while self.queue.peek().map(|m| m.fire_at <= now).unwrap_or(false) { | ||||||||||
| fn process_queue(&mut self, context: &mut <Self as Actor>::Context) -> Result<(), SendError> { | ||||||||||
| // If the message on top of the queue is due, send it to the regular actor queue. | ||||||||||
| // No problem if there are multiple such messages, it's handle() will call process_queue() | ||||||||||
| // again. | ||||||||||
| if self.queue.peek().map(|m| m.enqueue_at <= Instant::now()).unwrap_or(false) { | ||||||||||
| let item = self.queue.pop().expect("heap is non-empty, we have just peeked"); | ||||||||||
|
|
||||||||||
| let message = match item.payload { | ||||||||||
| Payload::Delayed { message } => message, | ||||||||||
| Payload::Recurring { mut factory, interval } => { | ||||||||||
| let message = factory(); | ||||||||||
| self.queue.push(QueueItem { | ||||||||||
| fire_at: item.fire_at + interval, | ||||||||||
| enqueue_at: item.enqueue_at + interval, | ||||||||||
| payload: Payload::Recurring { factory, interval }, | ||||||||||
| }); | ||||||||||
| message | ||||||||||
| }, | ||||||||||
| }; | ||||||||||
|
|
||||||||||
| // Let inner actor do its job. | ||||||||||
| // | ||||||||||
| // Alternatively, we could send an `Instant` message to ourselves. | ||||||||||
| // - The advantage would be that it would go into the queue with proper priority. But it | ||||||||||
| // is unclear what should be handled first: normal-priority message that should have | ||||||||||
| // been processed a while ago, or a high-priority message that was delivered now. | ||||||||||
| // - Disadvantage is we could easily overflow the queue if many messages fire at once. | ||||||||||
| self.inner.handle(&mut TimedContext::from_context(context), message)?; | ||||||||||
| // Enqueue an immediate message to process. Alternative would be to call inner handle(), | ||||||||||
| // but we don't want to effectively call child handle() twice in the parent handle(). | ||||||||||
| context.myself.send_now(message)?; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| Ok(()) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| fn schedule_timeout(&self, context: &mut <Self as Actor>::Context) { | ||||||||||
| // Schedule next timeout if the queue is not empty. | ||||||||||
| context.set_deadline(self.queue.peek().map(|earliest| earliest.fire_at)); | ||||||||||
| context.set_deadline(self.queue.peek().map(|earliest| earliest.enqueue_at)); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor for Timed<A> { | ||||||||||
| impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor for Timed<A> | ||||||||||
| where | ||||||||||
| <A as Actor>::Error: From<SendError>, | ||||||||||
| { | ||||||||||
| type Context = Context<Self::Message>; | ||||||||||
| type Error = A::Error; | ||||||||||
| type Message = TimedMessage<M>; | ||||||||||
|
|
@@ -171,12 +173,14 @@ impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor | |||||||||
| TimedMessage::Instant { message } => { | ||||||||||
| self.inner.handle(&mut TimedContext::from_context(context), message)?; | ||||||||||
| }, | ||||||||||
| TimedMessage::Delayed { message, fire_at } => { | ||||||||||
| self.queue.push(QueueItem { fire_at, payload: Payload::Delayed { message } }); | ||||||||||
| TimedMessage::Delayed { message, enqueue_at } => { | ||||||||||
| self.queue.push(QueueItem { enqueue_at, payload: Payload::Delayed { message } }); | ||||||||||
| }, | ||||||||||
| TimedMessage::Recurring { factory, fire_at, interval } => { | ||||||||||
| self.queue | ||||||||||
| .push(QueueItem { fire_at, payload: Payload::Recurring { factory, interval } }); | ||||||||||
| TimedMessage::Recurring { factory, enqueue_at, interval } => { | ||||||||||
| self.queue.push(QueueItem { | ||||||||||
| enqueue_at, | ||||||||||
| payload: Payload::Recurring { factory, interval }, | ||||||||||
| }); | ||||||||||
| }, | ||||||||||
| }; | ||||||||||
|
|
||||||||||
|
|
@@ -195,14 +199,11 @@ impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor | |||||||||
|
|
||||||||||
| fn priority(message: &Self::Message) -> Priority { | ||||||||||
| match message { | ||||||||||
| // Use underlying message priority if we can reference it. | ||||||||||
| TimedMessage::Instant { message } | TimedMessage::Delayed { message, .. } => { | ||||||||||
| A::priority(message) | ||||||||||
| }, | ||||||||||
| // Recurring message is only received once, the recurring instances go through the | ||||||||||
| // internal queue (and not actor's channel). Assign high priority to the request to | ||||||||||
| // set-up the recurrent sending. | ||||||||||
| TimedMessage::Recurring { .. } => Priority::High, | ||||||||||
| // Use underlying message priority for instant messages. | ||||||||||
| TimedMessage::Instant { message } => A::priority(message), | ||||||||||
| // Recurring and Delayed messages are only added to the queue when handled, and then go | ||||||||||
| // through actors priority inboxes again when actually enqueued. | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm having trouble visualizing the message flow based on this comment.
In my mind, "handled" means the actor has run the Is it possible to be more specific in the language here about the flow of the message from when
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree. What about
Suggested change
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @goodhoko you suggestion reads very good to me using it (with very small tweaks) |
||||||||||
| TimedMessage::Recurring { .. } | TimedMessage::Delayed { .. } => Priority::High, | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
|
|
@@ -236,13 +237,13 @@ impl<A: Actor> Deref for Timed<A> { | |||||||||
|
|
||||||||||
| /// Implementation detail, element of message queue ordered by time to fire at. | ||||||||||
| struct QueueItem<M> { | ||||||||||
| fire_at: Instant, | ||||||||||
| enqueue_at: Instant, | ||||||||||
| payload: Payload<M>, | ||||||||||
| } | ||||||||||
|
|
||||||||||
| impl<M> PartialEq for QueueItem<M> { | ||||||||||
| fn eq(&self, other: &Self) -> bool { | ||||||||||
| self.fire_at == other.fire_at | ||||||||||
| self.enqueue_at == other.enqueue_at | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
|
|
@@ -257,8 +258,8 @@ impl<M> PartialOrd for QueueItem<M> { | |||||||||
|
|
||||||||||
| impl<M> Ord for QueueItem<M> { | ||||||||||
| fn cmp(&self, other: &Self) -> Ordering { | ||||||||||
| // Reverse because [BinaryHeap] is a *max* heap, but we want pop() to return lowest `fire_at`. | ||||||||||
| self.fire_at.cmp(&other.fire_at).reverse() | ||||||||||
| // Reverse because [BinaryHeap] is a *max* heap, but we want pop() to return lowest `enqueue_at`. | ||||||||||
| self.enqueue_at.cmp(&other.enqueue_at).reverse() | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
|
|
@@ -277,19 +278,20 @@ mod tests { | |||||||||
| }; | ||||||||||
|
|
||||||||||
| struct TimedTestActor { | ||||||||||
| recurring_message_sleep: Duration, | ||||||||||
| received: Arc<Mutex<Vec<usize>>>, | ||||||||||
| } | ||||||||||
|
|
||||||||||
| impl Actor for TimedTestActor { | ||||||||||
| type Context = TimedContext<Self::Message>; | ||||||||||
| type Error = (); | ||||||||||
| type Error = SendError; | ||||||||||
| type Message = usize; | ||||||||||
|
|
||||||||||
| fn name() -> &'static str { | ||||||||||
| "TimedTestActor" | ||||||||||
| } | ||||||||||
|
|
||||||||||
| fn handle(&mut self, context: &mut Self::Context, message: usize) -> Result<(), ()> { | ||||||||||
| fn handle(&mut self, context: &mut Self::Context, message: usize) -> Result<(), SendError> { | ||||||||||
| { | ||||||||||
| let mut guard = self.received.lock().unwrap(); | ||||||||||
| guard.push(message); | ||||||||||
|
|
@@ -301,38 +303,78 @@ mod tests { | |||||||||
| context.myself.send_now(3).unwrap(); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| Ok(()) | ||||||||||
| } | ||||||||||
| // Message 2 is a recurring one, sleep based on a paremeter. | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
| if message == 2 { | ||||||||||
| thread::sleep(self.recurring_message_sleep); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| fn started(&mut self, context: &mut Self::Context) { | ||||||||||
| context | ||||||||||
| .myself | ||||||||||
| .send_recurring( | ||||||||||
| || 2, | ||||||||||
| Instant::now() + Duration::from_millis(50), | ||||||||||
| Duration::from_millis(100), | ||||||||||
| ) | ||||||||||
| .unwrap() | ||||||||||
| Ok(()) | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /// Tests that recurring messages still get in for actors that have one "tick" message type that | ||||||||||
| /// does `block_for_some_time(); myself.send_now(Tick);` in its handle(). | ||||||||||
| #[test] | ||||||||||
| fn recurring_messages_for_busy_actors() { | ||||||||||
| fn recurring_messages_for_self_looping_actors() { | ||||||||||
| let received = Arc::new(Mutex::new(Vec::new())); | ||||||||||
|
|
||||||||||
| let mut system = System::new("timed test"); | ||||||||||
| let address = | ||||||||||
| system.spawn(Timed::new(TimedTestActor { received: Arc::clone(&received) })).unwrap(); | ||||||||||
| let address = system | ||||||||||
| .spawn(Timed::new(TimedTestActor { | ||||||||||
| recurring_message_sleep: Duration::ZERO, | ||||||||||
| received: Arc::clone(&received), | ||||||||||
| })) | ||||||||||
| .unwrap(); | ||||||||||
| address | ||||||||||
| .send_recurring( | ||||||||||
| || 2, | ||||||||||
| Instant::now() + Duration::from_millis(50), | ||||||||||
| Duration::from_millis(100), | ||||||||||
| ) | ||||||||||
| .unwrap(); | ||||||||||
|
|
||||||||||
| address.send_now(1).unwrap(); | ||||||||||
| thread::sleep(Duration::from_millis(225)); | ||||||||||
| system.shutdown().unwrap(); | ||||||||||
|
|
||||||||||
| // The order of messages should be: | ||||||||||
| // 1 (initial message), | ||||||||||
| // 2 (first recurring scheduled message), | ||||||||||
| // 3 (first self-sent message), | ||||||||||
| // 2 (second recurring message) | ||||||||||
| // 3 (second self-sent message) | ||||||||||
| assert_eq!(*received.lock().unwrap(), vec![1, 2, 3, 2, 3]); | ||||||||||
| // The timeline (order of messages received) is: | ||||||||||
| // at 0 ms: 1 (initial message, takes 100 ms to handle), | ||||||||||
| // at 100 ms: 3 (first self-sent message, 100 ms to handle), | ||||||||||
| // at 200 ms: 2 (first recurring scheduled message, delivered 150 ms late), | ||||||||||
| // at 200 ms: 3 (second self-sent message, 100 ms to handle) | ||||||||||
| // at 225 ms: (control message to shut down the actor sent) | ||||||||||
| // at 300 ms: (control signal to shut down finally delivered to the actor) | ||||||||||
| assert_eq!(*received.lock().unwrap(), vec![1, 3, 2, 3]); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /// Test that actors with recurring messages that take longer to handle than what the recurring | ||||||||||
| /// delay is still get other and control messages. | ||||||||||
| #[test] | ||||||||||
| fn recurring_messages_handled_slower_than_generated() { | ||||||||||
| let received = Arc::new(Mutex::new(Vec::new())); | ||||||||||
|
|
||||||||||
| let mut system = System::new("timed test"); | ||||||||||
| let address = system | ||||||||||
| .spawn(Timed::new(TimedTestActor { | ||||||||||
| recurring_message_sleep: Duration::from_millis(100), | ||||||||||
| received: Arc::clone(&received), | ||||||||||
| })) | ||||||||||
| .unwrap(); | ||||||||||
| address.send_recurring(|| 2, Instant::now(), Duration::from_millis(10)).unwrap(); | ||||||||||
|
|
||||||||||
| thread::sleep(Duration::from_millis(150)); | ||||||||||
| address.send_now(4).unwrap(); | ||||||||||
| thread::sleep(Duration::from_millis(125)); | ||||||||||
| system.shutdown().unwrap(); | ||||||||||
|
|
||||||||||
| // The timeline (order of messages received) is: | ||||||||||
| // at 0 ms: 2 (first recurring message, 100 ms to handle) | ||||||||||
| // at 100 ms: 2 (second recurring message, 90 ms late, 100 ms to handle) | ||||||||||
| // at 150 ms: (message "4" sent to the actor from the main thread) | ||||||||||
| // at 200 ms: 4 (actor wakes up, processes message 4 that was sent before the recurring one) | ||||||||||
| // at 200 ms: 2 (third recurring message, 180 ms late, 100 ms to handle) | ||||||||||
| // at 275 ms: (control message to shut down actor sent) | ||||||||||
| // at 300 ms: (control message to shut down received at highest priority) | ||||||||||
| assert_eq!(*received.lock().unwrap(), vec![2, 2, 4, 2]); | ||||||||||
| } | ||||||||||
| } | ||||||||||
Uh oh!
There was an error while loading. Please reload this page.