Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 13 additions & 12 deletions crates/bifrost/src/appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use restate_types::live::Live;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::{LogId, Lsn, Record};
use restate_types::storage::StorageEncode;
use restate_util_time::DurationExt;

use crate::bifrost::{BifrostInner, ErrorRecoveryStrategy, PreferenceControl};
use crate::loglet::AppendError;
Expand Down Expand Up @@ -230,8 +231,8 @@ impl<T: StorageEncode> Appender<T> {
attempt,
segment_index = %loglet.segment_index(),
error_recovery_strategy = %self.error_recovery_strategy,
"Failed to append this batch. Since underlying error is retryable, will retry in {:?}",
retry_dur
"Failed to append this batch. Since underlying error is retryable, will retry in {}",
retry_dur.friendly()
);
tokio::time::sleep(retry_dur).await;
}
Expand Down Expand Up @@ -370,15 +371,15 @@ impl<T: StorageEncode> Appender<T> {
info!(
open_segment = %loglet.segment_index(),
%error_recovery_strategy,
"New segment detected after {:?}",
total_dur
"New segment detected after {}",
total_dur.friendly()
);
} else {
debug!(
open_segment = %loglet.segment_index(),
%error_recovery_strategy,
"New segment detected after {:?}",
total_dur
"New segment detected after {}",
total_dur.friendly()
);
}
return Ok(loglet);
Expand All @@ -397,8 +398,8 @@ impl<T: StorageEncode> Appender<T> {
let admin = BifrostAdmin::new(bifrost_inner);
info!(
%sealed_segment,
"[Auto Recovery] Attempting to extend the chain to recover log availability with a new configuration. It has been {:?} since encountering the sealed loglet",
start.elapsed(),
"[Auto Recovery] Attempting to extend the chain to recover log availability with a new configuration. It has been {} since encountering the sealed loglet",
start.elapsed().friendly(),
);
if let Err(err) = admin
.seal_and_auto_extend_chain(log_id, Some(sealed_segment))
Expand Down Expand Up @@ -435,15 +436,15 @@ impl<T: StorageEncode> Appender<T> {
info!(
%log_metadata_version,
%error_recovery_strategy,
"In holding pattern, still waiting for log reconfiguration to complete. Elapsed={:?}",
start.elapsed(),
"In holding pattern, still waiting for log reconfiguration to complete. Elapsed={}",
start.elapsed().friendly(),
);
} else {
debug!(
%log_metadata_version,
%error_recovery_strategy,
"In holding pattern, waiting for log reconfiguration to complete. Elapsed={:?}",
start.elapsed(),
"In holding pattern, waiting for log reconfiguration to complete. Elapsed={}",
start.elapsed().friendly(),
);
}
}
Expand Down
7 changes: 4 additions & 3 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use futures::stream::BoxStream;
use restate_util_time::DurationExt;
use tokio::sync::Mutex;
use tokio::time::Instant;
use tracing::{debug, instrument, trace};
Expand Down Expand Up @@ -404,7 +405,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
// Ensure that only one seal operation is in progress at a time.
let start = Instant::now();
let _guard = self.seal_in_progress.lock().await;
trace!("seal() lock acquired after {:?}", start.elapsed());
trace!("seal() lock acquired after {}", start.elapsed().friendly());

if self.known_global_tail.get().is_sealed() {
return Ok(());
Expand All @@ -426,8 +427,8 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
// not be up-to-date at the time we sealed. We want that to happen by find_tail() after
// it consults the sequencer, or by running a full find-tail algorithm directly on log
// servers.
debug!(loglet_id=%self.my_params.loglet_id, "seal() has completed successfully in {:?}",
start.elapsed());
debug!(loglet_id=%self.my_params.loglet_id, "seal() has completed successfully in {}",
start.elapsed().friendly());
Ok(())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ impl<T: TransportConnect> SequencerAppender<T> {

trace!(
wave = %self.current_wave,
"Append succeeded in {:?}, status {}",
start.elapsed(),
"Append succeeded in {}, status {}",
start.elapsed().friendly(),
self.nodeset_status
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
use std::sync::Weak;
use std::time::Duration;

use restate_types::retries::with_jitter;
use tokio::time::Instant;
use tracing::instrument;
use tracing::{debug, trace};

use restate_core::network::TransportConnect;
use restate_types::logs::LogletId;
use restate_types::retries::with_jitter;
use restate_util_time::DurationExt;

use crate::loglet::OperationError;
use crate::providers::replicated_loglet::loglet::{FindTailFlags, ReplicatedLoglet};
Expand Down Expand Up @@ -69,8 +70,8 @@ impl PeriodicTailChecker {
known_global_tail = %tail.offset(),
is_sequencer = ?loglet.is_sequencer_local(),
is_sealed = ?tail.is_sealed(),
"Successfully determined the tail status of the loglet, took={:?}",
start.elapsed(),
"Successfully determined the tail status of the loglet, took={}",
start.elapsed().friendly(),
);
}
Err(OperationError::Shutdown(_)) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use restate_core::{Metadata, ShutdownError, TaskCenterFutureExt};
use restate_types::logs::{KeyFilter, LogletOffset, RecordCache, SequenceNumber, TailOffsetWatch};
use restate_types::net::log_server::{GetDigest, LogServerRequestHeader};
use restate_types::replicated_loglet::{LogNodeSetExt, ReplicatedLogletParams};
use restate_util_time::DurationExt;

use crate::providers::replicated_loglet::read_path::ReadStreamTask;

Expand Down Expand Up @@ -193,7 +194,7 @@ impl<T: TransportConnect> RepairTail<T> {
loglet_id = %self.my_params.loglet_id,
start_offset = %self.digests.start_offset(),
target_tail = %self.digests.target_tail(),
elapsed = ?start.elapsed(),
elapsed = %start.elapsed().friendly(),
"Digest phase completed."
);

Expand All @@ -202,7 +203,7 @@ impl<T: TransportConnect> RepairTail<T> {
loglet_id = %self.my_params.loglet_id,
known_global_tail = %self.known_global_tail.latest_offset(),
target_tail = %self.digests.target_tail(),
elapsed = ?start.elapsed(),
elapsed = %start.elapsed().friendly(),
"Repair task completed, no records required repairing"
);
return RepairTailResult::Completed;
Expand All @@ -217,7 +218,7 @@ impl<T: TransportConnect> RepairTail<T> {
nodeset = %self.my_params.nodeset,
nodes_responded = %self.digests.known_nodes(),
replication = %self.my_params.replication,
elapsed = ?start.elapsed(),
elapsed = %start.elapsed().friendly(),
"Couldn't repair the tail! We have records to repair but not enough writeable nodes \
have responded to our digest request. We'll not be able to re-replicate the missing records until \
they are online and responsive",
Expand All @@ -236,7 +237,7 @@ impl<T: TransportConnect> RepairTail<T> {
nodeset = %self.my_params.nodeset,
nodes_responded = %self.digests.known_nodes(),
replication = %self.my_params.replication,
elapsed = ?start.elapsed(),
elapsed = %start.elapsed().friendly(),
"Couldn't repair the tail! We have records to repair **and** enough nodes to repair, but \
we couldn't find any node with copies for the oldest record within the repair range. \
We'll not be able to re-replicate the missing records until we can read back this \
Expand Down Expand Up @@ -308,7 +309,7 @@ impl<T: TransportConnect> RepairTail<T> {
info!(
loglet_id = %self.my_params.loglet_id,
known_global_tail = %self.known_global_tail.latest_offset(),
elapsed = ?start.elapsed(),
elapsed = %start.elapsed().friendly(),
"Repair task completed, {} record(s) have been repaired",
self.digests.num_fixups(),
);
Expand All @@ -320,7 +321,7 @@ impl<T: TransportConnect> RepairTail<T> {
nodeset = %self.my_params.nodeset,
nodes_responded = %self.digests.known_nodes(),
replication = %self.my_params.replication,
elapsed = ?start.elapsed(),
elapsed = %start.elapsed().friendly(),
"Failed to repair the tail. The under-replicated region is from {} to {} [inclusive]. \
{} records have been repaired during the process",
self.digests.start_offset(),
Expand Down
10 changes: 5 additions & 5 deletions crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use restate_types::logs::{LogletOffset, SequenceNumber, TailOffsetWatch};
use restate_types::net::log_server::{LogServerRequestHeader, Seal, Sealed, Status};
use restate_types::replicated_loglet::{LogNodeSetExt, ReplicatedLogletParams};
use restate_types::replication::{DecoratedNodeSet, NodeSetChecker};
use restate_util_time::DurationExt;

use crate::providers::replicated_loglet::error::{NodeSealStatus, ReplicatedLogletError};
use crate::providers::replicated_loglet::tasks::util::{Attempts, RunOnSingleNode};
Expand Down Expand Up @@ -136,9 +137,8 @@ impl SealTask {
replication = %my_params.replication,
%max_local_tail,
global_tail = %known_global_tail.latest_offset(),
"Seal task completed on f-majority of nodes in {:?}. Nodeset status {}",
start.elapsed(),
nodeset_status,
"Seal task completed on f-majority of nodes in {}. Nodeset status {nodeset_status}",
start.elapsed().friendly(),
);
// note that we drop the rest of the seal requests after return
return Ok(max_local_tail);
Expand All @@ -157,8 +157,8 @@ fn on_seal_response(peer: PlainNodeId, msg: Sealed) -> Disposition<Sealed> {
return Disposition::Return(msg);
}
trace!(
"Seal request failed on node {}, status is {:?}",
peer, msg.header.status
"Seal request failed on node {peer}, status is {}",
msg.header.status
);
Disposition::Abort
}
10 changes: 7 additions & 3 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use restate_types::health::{Health, NodeStatus};
use restate_types::identifiers::PartitionId;
use restate_types::net::listener::AddressBook;
use restate_types::{GenerationalNodeId, NodeId};
use restate_util_time::DurationExt;

const EXIT_CODE_FAILURE: i32 = 1;

Expand Down Expand Up @@ -1048,11 +1049,14 @@ impl TaskCenterInner {

if shutdown_result.is_err() {
warn!(
"Timeout waiting for graceful shutdown. Shutdown took {:?}",
start.elapsed()
"Timeout waiting for graceful shutdown. Shutdown took {}",
start.elapsed().friendly()
);
} else {
info!("Restate has gracefully shutdown in {:?}", start.elapsed());
info!(
"Restate has gracefully shutdown in {}",
start.elapsed().friendly()
);
};
self.root_task_context.cancel();
self.global_cancel_token.cancel();
Expand Down
2 changes: 2 additions & 0 deletions crates/futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ publish = false
[dependencies]
restate-workspace-hack = { workspace = true }

restate-util-time = { workspace = true }

anyhow = { workspace = true }
async-channel = { workspace = true }
futures = { workspace = true }
Expand Down
42 changes: 23 additions & 19 deletions crates/futures-util/src/overdue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use pin_project_lite::pin_project;
use tokio::time::{Instant, Sleep};
use tracing::{Level, debug, error, info, trace, warn};

use restate_util_time::DurationExt;

const MAX_REPEAT_DURATION: Duration = const { Duration::from_secs(30) };

/// Adds the ability to override task-center for a future and all its children
Expand Down Expand Up @@ -199,21 +201,21 @@ where

fn log_message<M: Display>(message: &M, level: Level, elapsed: Duration, label: &str) {
match level {
Level::ERROR => error!(?elapsed, "[{label}] {message}"),
Level::WARN => warn!(?elapsed, "[{label}] {message}"),
Level::INFO => info!(?elapsed, "[{label}] {message}"),
Level::DEBUG => debug!(?elapsed, "[{label}] {message}"),
Level::TRACE => trace!(?elapsed, "[{label}] {message}"),
Level::ERROR => error!(elapsed = %elapsed.friendly(), "[{label}] {message}"),
Level::WARN => warn!(elapsed = %elapsed.friendly(), "[{label}] {message}"),
Level::INFO => info!(elapsed = %elapsed.friendly(), "[{label}] {message}"),
Level::DEBUG => debug!(elapsed = %elapsed.friendly(), "[{label}] {message}"),
Level::TRACE => trace!(elapsed = %elapsed.friendly(), "[{label}] {message}"),
}
}

fn log_completion<M: Display>(message: &M, level: Level, elapsed: Duration) {
match level {
Level::ERROR => error!(?elapsed, "[completed] {message}"),
Level::WARN => warn!(?elapsed, "[completed] {message}"),
Level::INFO => info!(?elapsed, "[completed] {message}"),
Level::DEBUG => debug!(?elapsed, "[completed] {message}"),
Level::TRACE => trace!(?elapsed, "[completed] {message}"),
Level::ERROR => error!(elapsed = %elapsed.friendly(), "[completed] {message}"),
Level::WARN => warn!(elapsed = %elapsed.friendly(), "[completed] {message}"),
Level::INFO => info!(elapsed = %elapsed.friendly(), "[completed] {message}"),
Level::DEBUG => debug!(elapsed = %elapsed.friendly(), "[completed] {message}"),
Level::TRACE => trace!(elapsed = %elapsed.friendly(), "[completed] {message}"),
}
}

Expand Down Expand Up @@ -256,7 +258,7 @@ mod tests {
);
future.await;
assert!(logs_contain("[slow] sleep operation elapsed=500ms"));
assert!(logs_contain("[slow] sleep operation elapsed=1.5s"));
assert!(logs_contain("[slow] sleep operation elapsed=1s 500ms"));
assert!(logs_contain("[completed] sleep operation elapsed=2s"));
logs_assert(|lines: &[&str]| {
match lines
Expand All @@ -281,14 +283,14 @@ mod tests {

assert!(logs_contain("[slow] sleep operation elapsed=500ms"));
// 1s later
assert!(logs_contain("[slow] sleep operation elapsed=1.5s"));
assert!(logs_contain("[slow] sleep operation elapsed=1s 500ms"));
// 3.2 (the overdue point) is closer than 1.5+2=3.5, so we should see overdue sooner than 4.5 elapsed time
assert!(logs_contain("[overdue] sleep operation elapsed=3.2s"));
assert!(logs_contain("[overdue] sleep operation elapsed=3s 200ms"));
// we use the next (unused) duration from the previous run (2s)
// we expect that 3.2s+2s = 5.2s is our next notification point
assert!(logs_contain("[overdue] sleep operation elapsed=5.2s"));
assert!(logs_contain("[overdue] sleep operation elapsed=5s 200ms"));
// back to normal, next point is 4s after 5.2s = 9.2s
assert!(logs_contain("[overdue] sleep operation elapsed=9.2s"));
assert!(logs_contain("[overdue] sleep operation elapsed=9s 200ms"));
// operation finishes before the next tick which is after 8s (9.2+8=17.2s)
assert!(logs_contain("[completed] sleep operation elapsed=10s"));
logs_assert(|lines: &[&str]| {
Expand Down Expand Up @@ -316,18 +318,20 @@ mod tests {
future.await;
assert!(logs_contain("[slow] sleep operation elapsed=500ms"));
// 1s
assert!(logs_contain("[slow] sleep operation elapsed=1.5s"));
assert!(logs_contain("[slow] sleep operation elapsed=1s 500ms"));
// 2s
assert!(logs_contain("[slow] sleep operation elapsed=3.5s"));
assert!(logs_contain("[slow] sleep operation elapsed=3s 500ms"));
// 4s
assert!(logs_contain("[slow] sleep operation elapsed=7.5s"));
assert!(logs_contain("[slow] sleep operation elapsed=7s 500ms"));
// over due at 10s
assert!(logs_contain("[overdue] sleep operation elapsed=10s"));
// 8s
assert!(logs_contain("[overdue] sleep operation elapsed=18s"));
// 16s
assert!(logs_contain("[overdue] sleep operation elapsed=34s"));
assert!(logs_contain("[completed] sleep operation elapsed=35.9s"));
assert!(logs_contain(
"[completed] sleep operation elapsed=35s 900ms"
));
logs_assert(|lines: &[&str]| {
match lines
.iter()
Expand Down
6 changes: 4 additions & 2 deletions crates/rocksdb/src/rock_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use rocksdb::{CompactOptions, ExportImportFilesMetaData};
use tokio::time::Instant;
use tracing::{debug, info, trace, warn};

use restate_util_time::DurationExt;

use crate::DbSpec;
use crate::RawRocksDb;
use crate::RocksError;
Expand Down Expand Up @@ -239,9 +241,9 @@ impl RocksAccess {
} else {
info!(
db = %self.name(),
"{} column families flushed in {:?}",
"{} column families flushed in {}",
cfs_to_flush.len(),
start.elapsed(),
start.elapsed().friendly(),
);
}
}
Expand Down
Loading
Loading