diff --git a/Cargo.lock b/Cargo.lock index e2ff2268f1..0e50741380 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7629,6 +7629,7 @@ dependencies = [ "pin-project", "pin-project-lite", "restate-test-util", + "restate-util-time", "restate-workspace-hack", "test-log", "thiserror 2.0.18", diff --git a/crates/bifrost/src/appender.rs b/crates/bifrost/src/appender.rs index 1182046a1a..3e5a6d5045 100644 --- a/crates/bifrost/src/appender.rs +++ b/crates/bifrost/src/appender.rs @@ -24,6 +24,7 @@ use restate_types::live::Live; use restate_types::logs::metadata::SegmentIndex; use restate_types::logs::{LogId, Lsn, Record}; use restate_types::storage::StorageEncode; +use restate_util_time::DurationExt; use crate::bifrost::{BifrostInner, ErrorRecoveryStrategy, PreferenceControl}; use crate::loglet::AppendError; @@ -230,8 +231,8 @@ impl Appender { attempt, segment_index = %loglet.segment_index(), error_recovery_strategy = %self.error_recovery_strategy, - "Failed to append this batch. Since underlying error is retryable, will retry in {:?}", - retry_dur + "Failed to append this batch. Since underlying error is retryable, will retry in {}", + retry_dur.friendly() ); tokio::time::sleep(retry_dur).await; } @@ -370,15 +371,15 @@ impl Appender { info!( open_segment = %loglet.segment_index(), %error_recovery_strategy, - "New segment detected after {:?}", - total_dur + "New segment detected after {}", + total_dur.friendly() ); } else { debug!( open_segment = %loglet.segment_index(), %error_recovery_strategy, - "New segment detected after {:?}", - total_dur + "New segment detected after {}", + total_dur.friendly() ); } return Ok(loglet); @@ -397,8 +398,8 @@ impl Appender { let admin = BifrostAdmin::new(bifrost_inner); info!( %sealed_segment, - "[Auto Recovery] Attempting to extend the chain to recover log availability with a new configuration. It has been {:?} since encountering the sealed loglet", - start.elapsed(), + "[Auto Recovery] Attempting to extend the chain to recover log availability with a new configuration. It has been {} since encountering the sealed loglet", + start.elapsed().friendly(), ); if let Err(err) = admin .seal_and_auto_extend_chain(log_id, Some(sealed_segment)) @@ -435,15 +436,15 @@ impl Appender { info!( %log_metadata_version, %error_recovery_strategy, - "In holding pattern, still waiting for log reconfiguration to complete. Elapsed={:?}", - start.elapsed(), + "In holding pattern, still waiting for log reconfiguration to complete. Elapsed={}", + start.elapsed().friendly(), ); } else { debug!( %log_metadata_version, %error_recovery_strategy, - "In holding pattern, waiting for log reconfiguration to complete. Elapsed={:?}", - start.elapsed(), + "In holding pattern, waiting for log reconfiguration to complete. Elapsed={}", + start.elapsed().friendly(), ); } } diff --git a/crates/bifrost/src/providers/replicated_loglet/loglet.rs b/crates/bifrost/src/providers/replicated_loglet/loglet.rs index 791f79a5f2..73596925f6 100644 --- a/crates/bifrost/src/providers/replicated_loglet/loglet.rs +++ b/crates/bifrost/src/providers/replicated_loglet/loglet.rs @@ -13,6 +13,7 @@ use std::sync::Arc; use async_trait::async_trait; use futures::stream::BoxStream; +use restate_util_time::DurationExt; use tokio::sync::Mutex; use tokio::time::Instant; use tracing::{debug, instrument, trace}; @@ -404,7 +405,7 @@ impl Loglet for ReplicatedLoglet { // Ensure that only one seal operation is in progress at a time. let start = Instant::now(); let _guard = self.seal_in_progress.lock().await; - trace!("seal() lock acquired after {:?}", start.elapsed()); + trace!("seal() lock acquired after {}", start.elapsed().friendly()); if self.known_global_tail.get().is_sealed() { return Ok(()); @@ -426,8 +427,8 @@ impl Loglet for ReplicatedLoglet { // not be up-to-date at the time we sealed. We want that to happen by find_tail() after // it consults the sequencer, or by running a full find-tail algorithm directly on log // servers. - debug!(loglet_id=%self.my_params.loglet_id, "seal() has completed successfully in {:?}", - start.elapsed()); + debug!(loglet_id=%self.my_params.loglet_id, "seal() has completed successfully in {}", + start.elapsed().friendly()); Ok(()) } } diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs index e917736873..1132f903f1 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/appender.rs @@ -248,8 +248,8 @@ impl SequencerAppender { trace!( wave = %self.current_wave, - "Append succeeded in {:?}, status {}", - start.elapsed(), + "Append succeeded in {}, status {}", + start.elapsed().friendly(), self.nodeset_status ); } diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs index 0d54fd724d..2ada9f5e32 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs @@ -11,13 +11,14 @@ use std::sync::Weak; use std::time::Duration; -use restate_types::retries::with_jitter; use tokio::time::Instant; use tracing::instrument; use tracing::{debug, trace}; use restate_core::network::TransportConnect; use restate_types::logs::LogletId; +use restate_types::retries::with_jitter; +use restate_util_time::DurationExt; use crate::loglet::OperationError; use crate::providers::replicated_loglet::loglet::{FindTailFlags, ReplicatedLoglet}; @@ -69,8 +70,8 @@ impl PeriodicTailChecker { known_global_tail = %tail.offset(), is_sequencer = ?loglet.is_sequencer_local(), is_sealed = ?tail.is_sealed(), - "Successfully determined the tail status of the loglet, took={:?}", - start.elapsed(), + "Successfully determined the tail status of the loglet, took={}", + start.elapsed().friendly(), ); } Err(OperationError::Shutdown(_)) => { diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/repair_tail.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/repair_tail.rs index 052a170bb8..fe356acced 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/repair_tail.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/repair_tail.rs @@ -17,6 +17,7 @@ use restate_core::{Metadata, ShutdownError, TaskCenterFutureExt}; use restate_types::logs::{KeyFilter, LogletOffset, RecordCache, SequenceNumber, TailOffsetWatch}; use restate_types::net::log_server::{GetDigest, LogServerRequestHeader}; use restate_types::replicated_loglet::{LogNodeSetExt, ReplicatedLogletParams}; +use restate_util_time::DurationExt; use crate::providers::replicated_loglet::read_path::ReadStreamTask; @@ -193,7 +194,7 @@ impl RepairTail { loglet_id = %self.my_params.loglet_id, start_offset = %self.digests.start_offset(), target_tail = %self.digests.target_tail(), - elapsed = ?start.elapsed(), + elapsed = %start.elapsed().friendly(), "Digest phase completed." ); @@ -202,7 +203,7 @@ impl RepairTail { loglet_id = %self.my_params.loglet_id, known_global_tail = %self.known_global_tail.latest_offset(), target_tail = %self.digests.target_tail(), - elapsed = ?start.elapsed(), + elapsed = %start.elapsed().friendly(), "Repair task completed, no records required repairing" ); return RepairTailResult::Completed; @@ -217,7 +218,7 @@ impl RepairTail { nodeset = %self.my_params.nodeset, nodes_responded = %self.digests.known_nodes(), replication = %self.my_params.replication, - elapsed = ?start.elapsed(), + elapsed = %start.elapsed().friendly(), "Couldn't repair the tail! We have records to repair but not enough writeable nodes \ have responded to our digest request. We'll not be able to re-replicate the missing records until \ they are online and responsive", @@ -236,7 +237,7 @@ impl RepairTail { nodeset = %self.my_params.nodeset, nodes_responded = %self.digests.known_nodes(), replication = %self.my_params.replication, - elapsed = ?start.elapsed(), + elapsed = %start.elapsed().friendly(), "Couldn't repair the tail! We have records to repair **and** enough nodes to repair, but \ we couldn't find any node with copies for the oldest record within the repair range. \ We'll not be able to re-replicate the missing records until we can read back this \ @@ -308,7 +309,7 @@ impl RepairTail { info!( loglet_id = %self.my_params.loglet_id, known_global_tail = %self.known_global_tail.latest_offset(), - elapsed = ?start.elapsed(), + elapsed = %start.elapsed().friendly(), "Repair task completed, {} record(s) have been repaired", self.digests.num_fixups(), ); @@ -320,7 +321,7 @@ impl RepairTail { nodeset = %self.my_params.nodeset, nodes_responded = %self.digests.known_nodes(), replication = %self.my_params.replication, - elapsed = ?start.elapsed(), + elapsed = %start.elapsed().friendly(), "Failed to repair the tail. The under-replicated region is from {} to {} [inclusive]. \ {} records have been repaired during the process", self.digests.start_offset(), diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs index f2e27e14f9..5f05d3b564 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs @@ -21,6 +21,7 @@ use restate_types::logs::{LogletOffset, SequenceNumber, TailOffsetWatch}; use restate_types::net::log_server::{LogServerRequestHeader, Seal, Sealed, Status}; use restate_types::replicated_loglet::{LogNodeSetExt, ReplicatedLogletParams}; use restate_types::replication::{DecoratedNodeSet, NodeSetChecker}; +use restate_util_time::DurationExt; use crate::providers::replicated_loglet::error::{NodeSealStatus, ReplicatedLogletError}; use crate::providers::replicated_loglet::tasks::util::{Attempts, RunOnSingleNode}; @@ -136,9 +137,8 @@ impl SealTask { replication = %my_params.replication, %max_local_tail, global_tail = %known_global_tail.latest_offset(), - "Seal task completed on f-majority of nodes in {:?}. Nodeset status {}", - start.elapsed(), - nodeset_status, + "Seal task completed on f-majority of nodes in {}. Nodeset status {nodeset_status}", + start.elapsed().friendly(), ); // note that we drop the rest of the seal requests after return return Ok(max_local_tail); @@ -157,8 +157,8 @@ fn on_seal_response(peer: PlainNodeId, msg: Sealed) -> Disposition { return Disposition::Return(msg); } trace!( - "Seal request failed on node {}, status is {:?}", - peer, msg.header.status + "Seal request failed on node {peer}, status is {}", + msg.header.status ); Disposition::Abort } diff --git a/crates/core/src/task_center.rs b/crates/core/src/task_center.rs index 84cc74d1d8..4e726ab2ec 100644 --- a/crates/core/src/task_center.rs +++ b/crates/core/src/task_center.rs @@ -52,6 +52,7 @@ use restate_types::health::{Health, NodeStatus}; use restate_types::identifiers::PartitionId; use restate_types::net::listener::AddressBook; use restate_types::{GenerationalNodeId, NodeId}; +use restate_util_time::DurationExt; const EXIT_CODE_FAILURE: i32 = 1; @@ -1048,11 +1049,14 @@ impl TaskCenterInner { if shutdown_result.is_err() { warn!( - "Timeout waiting for graceful shutdown. Shutdown took {:?}", - start.elapsed() + "Timeout waiting for graceful shutdown. Shutdown took {}", + start.elapsed().friendly() ); } else { - info!("Restate has gracefully shutdown in {:?}", start.elapsed()); + info!( + "Restate has gracefully shutdown in {}", + start.elapsed().friendly() + ); }; self.root_task_context.cancel(); self.global_cancel_token.cancel(); diff --git a/crates/futures-util/Cargo.toml b/crates/futures-util/Cargo.toml index 77ea475319..a0ad0ec3f5 100644 --- a/crates/futures-util/Cargo.toml +++ b/crates/futures-util/Cargo.toml @@ -10,6 +10,8 @@ publish = false [dependencies] restate-workspace-hack = { workspace = true } +restate-util-time = { workspace = true } + anyhow = { workspace = true } async-channel = { workspace = true } futures = { workspace = true } diff --git a/crates/futures-util/src/overdue.rs b/crates/futures-util/src/overdue.rs index 4df65b9c00..8e24e42f27 100644 --- a/crates/futures-util/src/overdue.rs +++ b/crates/futures-util/src/overdue.rs @@ -18,6 +18,8 @@ use pin_project_lite::pin_project; use tokio::time::{Instant, Sleep}; use tracing::{Level, debug, error, info, trace, warn}; +use restate_util_time::DurationExt; + const MAX_REPEAT_DURATION: Duration = const { Duration::from_secs(30) }; /// Adds the ability to override task-center for a future and all its children @@ -199,21 +201,21 @@ where fn log_message(message: &M, level: Level, elapsed: Duration, label: &str) { match level { - Level::ERROR => error!(?elapsed, "[{label}] {message}"), - Level::WARN => warn!(?elapsed, "[{label}] {message}"), - Level::INFO => info!(?elapsed, "[{label}] {message}"), - Level::DEBUG => debug!(?elapsed, "[{label}] {message}"), - Level::TRACE => trace!(?elapsed, "[{label}] {message}"), + Level::ERROR => error!(elapsed = %elapsed.friendly(), "[{label}] {message}"), + Level::WARN => warn!(elapsed = %elapsed.friendly(), "[{label}] {message}"), + Level::INFO => info!(elapsed = %elapsed.friendly(), "[{label}] {message}"), + Level::DEBUG => debug!(elapsed = %elapsed.friendly(), "[{label}] {message}"), + Level::TRACE => trace!(elapsed = %elapsed.friendly(), "[{label}] {message}"), } } fn log_completion(message: &M, level: Level, elapsed: Duration) { match level { - Level::ERROR => error!(?elapsed, "[completed] {message}"), - Level::WARN => warn!(?elapsed, "[completed] {message}"), - Level::INFO => info!(?elapsed, "[completed] {message}"), - Level::DEBUG => debug!(?elapsed, "[completed] {message}"), - Level::TRACE => trace!(?elapsed, "[completed] {message}"), + Level::ERROR => error!(elapsed = %elapsed.friendly(), "[completed] {message}"), + Level::WARN => warn!(elapsed = %elapsed.friendly(), "[completed] {message}"), + Level::INFO => info!(elapsed = %elapsed.friendly(), "[completed] {message}"), + Level::DEBUG => debug!(elapsed = %elapsed.friendly(), "[completed] {message}"), + Level::TRACE => trace!(elapsed = %elapsed.friendly(), "[completed] {message}"), } } @@ -256,7 +258,7 @@ mod tests { ); future.await; assert!(logs_contain("[slow] sleep operation elapsed=500ms")); - assert!(logs_contain("[slow] sleep operation elapsed=1.5s")); + assert!(logs_contain("[slow] sleep operation elapsed=1s 500ms")); assert!(logs_contain("[completed] sleep operation elapsed=2s")); logs_assert(|lines: &[&str]| { match lines @@ -281,14 +283,14 @@ mod tests { assert!(logs_contain("[slow] sleep operation elapsed=500ms")); // 1s later - assert!(logs_contain("[slow] sleep operation elapsed=1.5s")); + assert!(logs_contain("[slow] sleep operation elapsed=1s 500ms")); // 3.2 (the overdue point) is closer than 1.5+2=3.5, so we should see overdue sooner than 4.5 elapsed time - assert!(logs_contain("[overdue] sleep operation elapsed=3.2s")); + assert!(logs_contain("[overdue] sleep operation elapsed=3s 200ms")); // we use the next (unused) duration from the previous run (2s) // we expect that 3.2s+2s = 5.2s is our next notification point - assert!(logs_contain("[overdue] sleep operation elapsed=5.2s")); + assert!(logs_contain("[overdue] sleep operation elapsed=5s 200ms")); // back to normal, next point is 4s after 5.2s = 9.2s - assert!(logs_contain("[overdue] sleep operation elapsed=9.2s")); + assert!(logs_contain("[overdue] sleep operation elapsed=9s 200ms")); // operation finishes before the next tick which is after 8s (9.2+8=17.2s) assert!(logs_contain("[completed] sleep operation elapsed=10s")); logs_assert(|lines: &[&str]| { @@ -316,18 +318,20 @@ mod tests { future.await; assert!(logs_contain("[slow] sleep operation elapsed=500ms")); // 1s - assert!(logs_contain("[slow] sleep operation elapsed=1.5s")); + assert!(logs_contain("[slow] sleep operation elapsed=1s 500ms")); // 2s - assert!(logs_contain("[slow] sleep operation elapsed=3.5s")); + assert!(logs_contain("[slow] sleep operation elapsed=3s 500ms")); // 4s - assert!(logs_contain("[slow] sleep operation elapsed=7.5s")); + assert!(logs_contain("[slow] sleep operation elapsed=7s 500ms")); // over due at 10s assert!(logs_contain("[overdue] sleep operation elapsed=10s")); // 8s assert!(logs_contain("[overdue] sleep operation elapsed=18s")); // 16s assert!(logs_contain("[overdue] sleep operation elapsed=34s")); - assert!(logs_contain("[completed] sleep operation elapsed=35.9s")); + assert!(logs_contain( + "[completed] sleep operation elapsed=35s 900ms" + )); logs_assert(|lines: &[&str]| { match lines .iter() diff --git a/crates/rocksdb/src/rock_access.rs b/crates/rocksdb/src/rock_access.rs index 024c58714d..39a26aa857 100644 --- a/crates/rocksdb/src/rock_access.rs +++ b/crates/rocksdb/src/rock_access.rs @@ -20,6 +20,8 @@ use rocksdb::{CompactOptions, ExportImportFilesMetaData}; use tokio::time::Instant; use tracing::{debug, info, trace, warn}; +use restate_util_time::DurationExt; + use crate::DbSpec; use crate::RawRocksDb; use crate::RocksError; @@ -239,9 +241,9 @@ impl RocksAccess { } else { info!( db = %self.name(), - "{} column families flushed in {:?}", + "{} column families flushed in {}", cfs_to_flush.len(), - start.elapsed(), + start.elapsed().friendly(), ); } } diff --git a/util/time/src/duration.rs b/util/time/src/duration.rs index 49522a2124..78e408e229 100644 --- a/util/time/src/duration.rs +++ b/util/time/src/duration.rs @@ -47,17 +47,20 @@ pub trait DurationExt { } } -/// Displays a time span with 'days' as the maximum unit. +/// Displays a time span with 'days' as the maximum unit and milli-seconds as minimum. pub struct Days; -/// Displays a time span with 'seconds' as the maximum unit. +/// Displays a time span with 'seconds' as the maximum unit and micro-seconds as minimum. pub struct Seconds; /// Displays a time span in 'HH::MM::SS[.fff]' format. pub struct Hms; /// Displays a time span in ISO 8601 format. pub struct Iso8601; +/// Displays a time span with full resolution from days to nanos. +pub struct Full; mod private { pub trait Sealed {} + impl Sealed for super::Full {} impl Sealed for super::Days {} impl Sealed for super::Seconds {} impl Sealed for super::Hms {} @@ -69,6 +72,12 @@ pub trait Style: private::Sealed { fn print_span(span: &Span, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result; } +impl Style for Full { + fn print_span(span: &Span, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + print_inner(span, f) + } +} + impl Style for Days { fn print_span(span: &Span, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { print_inner(span, f) @@ -121,6 +130,8 @@ impl DurationExt for StdDuration { /// The default display behaves the same as `to_days_span()`. But you can also use the following /// conversions to customize the display behaviour: /// +/// - Use [`FriendlyDuration::to_full_span`] to display a duration as a span with its maximum unit +/// set to days and minimum to nano-seconds. /// - Use [`FriendlyDuration::to_days_span`] to display a duration as a span with its maximum unit /// set to days. /// - Use [`FriendlyDuration::to_seconds_span`] to display a duration as a span with its maximum @@ -287,6 +298,11 @@ impl Duration { self.0.is_zero() } + /// Returns a span with full resolution. + pub fn to_full_span(self) -> TimeSpan { + TimeSpan::::new(Span::try_from(self.0).unwrap()) + } + /// Returns a span with its maximum unit set to days. pub fn to_days_span(self) -> TimeSpan { TimeSpan::::new(Span::try_from(self.0).unwrap()) @@ -422,12 +438,27 @@ impl TimeSpan { } } +impl TimeSpan { + fn new(span: Span) -> TimeSpan { + let span = span + .round( + SpanRound::new() + .largest(jiff::Unit::Day) + .smallest(jiff::Unit::Nanosecond) + .days_are_24_hours(), + ) + .unwrap(); + TimeSpan(span, PhantomData) + } +} + impl TimeSpan { fn new(span: Span) -> TimeSpan { let span = span .round( SpanRound::new() .largest(jiff::Unit::Second) + .smallest(jiff::Unit::Microsecond) .days_are_24_hours(), ) .unwrap(); @@ -441,6 +472,7 @@ impl TimeSpan { .round( SpanRound::new() .largest(jiff::Unit::Day) + .smallest(jiff::Unit::Millisecond) .days_are_24_hours(), ) .unwrap(); @@ -454,6 +486,7 @@ impl TimeSpan { .round( SpanRound::new() .largest(jiff::Unit::Hour) + .smallest(jiff::Unit::Microsecond) .days_are_24_hours(), ) .unwrap(); @@ -683,7 +716,7 @@ mod tests { #[test] fn friendly_conversion() { let friendly = StdDuration::from_nanos(22).friendly(); - assert_eq!("22ns", friendly.to_days_span().to_string()); + assert_eq!("22ns", friendly.to_full_span().to_string()); } #[test]