diff --git a/doc/developer/platform/compute-layer-q-n-a.md b/doc/developer/platform/compute-layer-q-n-a.md index 01ff0338fb61f..5b188a9db760c 100644 --- a/doc/developer/platform/compute-layer-q-n-a.md +++ b/doc/developer/platform/compute-layer-q-n-a.md @@ -18,7 +18,7 @@ This figure illustrates the compute networking stack, as well as the flow of com ### What is the set of commands that the COMPUTE layer responds to and what do these commands do at a high level? -The set of available commands can be seen in `ComputeCommand` at [`src/compute-client/src/command.rs`](/src/compute-client/src/command.rs). At a high level, there are commands to: (a) initialize a COMPUTE replica, (b) communicate to a replica that all command reconciliation during initialization has been completed, (c) create dataflows, (d) allow compactions of COMPUTE-managed collections to take place, (e) peek at arrangements, (f) cancel peeks. The types of responses to the commands are listed in `ComputeResponse` at [`src/compute-client/src/response.rs`](/src/compute-client/src/response.rs). +The set of available commands can be seen in `ComputeCommand` at [`src/compute-client/src/command.rs`](/src/compute-client/src/command.rs). At a high level, there are commands to: (a) initialize a COMPUTE replica, (b) communicate to a replica that all command reconciliation during initialization has been completed, (c) create dataflows, (d) allow compactions of COMPUTE-managed collections to take place, (e) peek at arrangements, (f) cancel peeks. The types of responses to the commands are listed in `ComputeResponse` at [`src/compute-client/src/response.rs`](/src/compute-client/src/response.rs). ### What is the lifecycle of networking threads in a `computed` process? Do they farm out work to other threads? diff --git a/src/adapter-types/src/compaction.rs b/src/adapter-types/src/compaction.rs index 8387c7de03418..f734bc0f158e5 100644 --- a/src/adapter-types/src/compaction.rs +++ b/src/adapter-types/src/compaction.rs @@ -65,7 +65,7 @@ impl CompactionWindow { } } -impl From for ReadPolicy { +impl From for ReadPolicy { fn from(value: CompactionWindow) -> Self { let time = match value { CompactionWindow::Default => DEFAULT_LOGICAL_COMPACTION_WINDOW_TS, diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 3cb34187f5271..0f2f0fee6105e 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -3493,7 +3493,7 @@ impl Coordinator { /// This method expects all storage collections and dataflow plans to be available, so it must /// run after [`Coordinator::bootstrap_storage_collections`] and /// [`Coordinator::bootstrap_dataflow_plans`]. - async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap> { + async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap { let mut catalog_ids = Vec::new(); let mut dataflows = Vec::new(); let mut read_policies = BTreeMap::new(); diff --git a/src/adapter/src/coord/peek.rs b/src/adapter/src/coord/peek.rs index ef3392878549c..de20ace687a3d 100644 --- a/src/adapter/src/coord/peek.rs +++ b/src/adapter/src/coord/peek.rs @@ -90,11 +90,7 @@ pub enum PeekResponseUnary { #[derive(Clone, Debug)] pub struct PeekDataflowPlan { - pub(crate) desc: DataflowDescription< - mz_compute_types::plan::Plan, - (), - mz_repr::Timestamp, - >, + pub(crate) desc: DataflowDescription, pub(crate) id: GlobalId, key: Vec, permutation: Vec, @@ -103,11 +99,7 @@ pub struct PeekDataflowPlan { impl PeekDataflowPlan { pub fn new( - desc: DataflowDescription< - mz_compute_types::plan::Plan, - (), - mz_repr::Timestamp, - >, + desc: DataflowDescription, id: GlobalId, typ: &SqlRelationType, ) -> Self { diff --git a/src/adapter/src/coord/read_policy.rs b/src/adapter/src/coord/read_policy.rs index 6572367ff2502..29f9750966fc0 100644 --- a/src/adapter/src/coord/read_policy.rs +++ b/src/adapter/src/coord/read_policy.rs @@ -42,8 +42,8 @@ use crate::util::ResultExt; /// relinquishes the associated read capabilities. #[derive(Debug, Default, Clone)] pub struct ReadHolds { - pub storage_holds: BTreeMap>, - pub compute_holds: BTreeMap<(ComputeInstanceId, GlobalId), ReadHold>, + pub storage_holds: BTreeMap, + pub compute_holds: BTreeMap<(ComputeInstanceId, GlobalId), ReadHold>, } impl ReadHolds { @@ -308,10 +308,7 @@ impl crate::coord::Coordinator { } } - pub(crate) fn update_storage_read_policies( - &self, - policies: Vec<(CatalogItemId, ReadPolicy)>, - ) { + pub(crate) fn update_storage_read_policies(&self, policies: Vec<(CatalogItemId, ReadPolicy)>) { let policies = policies .into_iter() .map(|(item_id, policy)| { @@ -330,7 +327,7 @@ impl crate::coord::Coordinator { pub(crate) fn update_compute_read_policies( &self, - mut policies: Vec<(ComputeInstanceId, CatalogItemId, ReadPolicy)>, + mut policies: Vec<(ComputeInstanceId, CatalogItemId, ReadPolicy)>, ) { policies.sort_by_key(|&(cluster_id, _, _)| cluster_id); for (cluster_id, group) in &policies @@ -357,7 +354,7 @@ impl crate::coord::Coordinator { &self, compute_instance: ComputeInstanceId, item_id: CatalogItemId, - base_policy: ReadPolicy, + base_policy: ReadPolicy, ) { self.update_compute_read_policies(vec![(compute_instance, item_id, base_policy)]) } diff --git a/src/compute-client/src/as_of_selection.rs b/src/compute-client/src/as_of_selection.rs index 531ad2b1337b2..5583e28eaf80c 100644 --- a/src/compute-client/src/as_of_selection.rs +++ b/src/compute-client/src/as_of_selection.rs @@ -100,11 +100,11 @@ use tracing::{info, warn}; /// with the compute controller. pub fn run( dataflows: &mut [DataflowDescription], - read_policies: &BTreeMap>, + read_policies: &BTreeMap, storage_collections: &dyn StorageCollections, current_time: Timestamp, read_only_mode: bool, -) -> BTreeMap> { +) -> BTreeMap { // Get read holds for the storage inputs of the dataflows. // This ensures that storage frontiers don't advance past the selected as-ofs. let mut storage_read_holds = BTreeMap::new(); @@ -321,7 +321,7 @@ impl Constraint<'_> { struct Collection<'a> { storage_inputs: Vec, compute_inputs: Vec, - read_policy: Option<&'a ReadPolicy>, + read_policy: Option<&'a ReadPolicy>, /// The currently known as-of bounds. /// /// Shared between collections exported by the same dataflow. @@ -342,7 +342,7 @@ impl<'a> Context<'a> { fn new( dataflows: &[DataflowDescription], storage_collections: &'a dyn StorageCollections, - read_policies: &'a BTreeMap>, + read_policies: &'a BTreeMap, current_time: Timestamp, ) -> Self { // Construct initial collection state for each dataflow export. Dataflows might have their @@ -427,7 +427,7 @@ impl<'a> Context<'a> { /// not be able to hydrate successfully. fn apply_upstream_storage_constraints( &self, - storage_read_holds: &BTreeMap>, + storage_read_holds: &BTreeMap, ) { // Apply direct constraints from storage inputs. for (id, collection) in &self.collections { @@ -1027,14 +1027,14 @@ mod tests { unimplemented!() } - fn set_read_policies(&self, _policies: Vec<(GlobalId, ReadPolicy)>) { + fn set_read_policies(&self, _policies: Vec<(GlobalId, ReadPolicy)>) { unimplemented!() } fn acquire_read_holds( &self, desired_holds: Vec, - ) -> Result>, CollectionMissing> { + ) -> Result, CollectionMissing> { let mut holds = Vec::with_capacity(desired_holds.len()); for id in desired_holds { let (read, _write) = self.0.get(&id).ok_or(CollectionMissing(id))?; diff --git a/src/compute-client/src/controller.rs b/src/compute-client/src/controller.rs index f751683220530..86b6f1e6cdd13 100644 --- a/src/compute-client/src/controller.rs +++ b/src/compute-client/src/controller.rs @@ -943,7 +943,7 @@ impl ComputeController { pub fn set_read_policy( &self, instance_id: ComputeInstanceId, - policies: Vec<(GlobalId, ReadPolicy)>, + policies: Vec<(GlobalId, ReadPolicy)>, ) -> Result<(), ReadPolicyError> { use ReadPolicyError::*; @@ -968,7 +968,7 @@ impl ComputeController { &self, instance_id: ComputeInstanceId, collection_id: GlobalId, - ) -> Result, CollectionUpdateError> { + ) -> Result { let read_hold = self .instance(instance_id)? .acquire_read_hold(collection_id)?; @@ -1114,10 +1114,7 @@ impl InstanceState { } /// Acquires a [`ReadHold`] for the identified compute collection. - pub fn acquire_read_hold( - &self, - id: GlobalId, - ) -> Result, CollectionMissing> { + pub fn acquire_read_hold(&self, id: GlobalId) -> Result { // We acquire read holds at the earliest possible time rather than returning a copy // of the implied read hold. This is so that in `create_dataflow` we can acquire read holds // on compute dependencies at frontiers that are held back by other read holds the caller diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index 94541088ea562..bd11be4f00308 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -208,7 +208,7 @@ pub(super) struct Instance { /// /// Copies of this sender are given to [`ReadHold`]s that are created in /// [`CollectionState::new`]. - read_hold_tx: read_holds::ChangeTx, + read_hold_tx: read_holds::ChangeTx, /// A sender for responses from replicas. replica_tx: mz_ore::channel::InstrumentedUnboundedSender, /// A receiver for responses from replicas. @@ -277,9 +277,9 @@ impl Instance { id: GlobalId, as_of: Antichain, shared: SharedCollectionState, - storage_dependencies: BTreeMap>, - compute_dependencies: BTreeMap>, - replica_input_read_holds: Vec>, + storage_dependencies: BTreeMap, + compute_dependencies: BTreeMap, + replica_input_read_holds: Vec, write_only: bool, storage_sink: bool, initial_as_of: Option>, @@ -885,7 +885,7 @@ impl Instance { dyncfg: Arc, command_rx: mpsc::UnboundedReceiver, response_tx: mpsc::UnboundedSender, - read_hold_tx: read_holds::ChangeTx, + read_hold_tx: read_holds::ChangeTx, introspection_tx: mpsc::UnboundedSender, read_only: bool, ) -> Self { @@ -1269,7 +1269,7 @@ impl Instance { pub fn create_dataflow( &mut self, dataflow: DataflowDescription, - import_read_holds: Vec>, + import_read_holds: Vec, mut shared_collection_state: BTreeMap, target_replica: Option, ) -> Result<(), DataflowCreationError> { @@ -1628,7 +1628,7 @@ impl Instance { result_desc: RelationDesc, finishing: RowSetFinishing, map_filter_project: mz_expr::SafeMfpPlan, - mut read_hold: ReadHold, + mut read_hold: ReadHold, target_replica: Option, peek_response_tx: oneshot::Sender, ) -> Result<(), PeekError> { @@ -1724,7 +1724,7 @@ impl Instance { #[mz_ore::instrument(level = "debug")] pub fn set_read_policy( &mut self, - policies: Vec<(GlobalId, ReadPolicy)>, + policies: Vec<(GlobalId, ReadPolicy)>, ) -> Result<(), ReadPolicyError> { // Do error checking upfront, to avoid introducing inconsistencies between a collection's // `implied_capability` and `read_capabilities`. @@ -2327,10 +2327,7 @@ impl Instance { /// /// This mirrors the logic used by the controller-side `InstanceState::acquire_read_hold`, /// but executes on the instance task itself. - pub(super) fn acquire_read_hold( - &self, - id: GlobalId, - ) -> Result, CollectionMissing> { + pub(super) fn acquire_read_hold(&self, id: GlobalId) -> Result { // Similarly to InstanceState::acquire_read_hold and StorageCollections::acquire_read_holds, // we acquire read holds at the earliest possible time rather than returning a copy // of the implied read hold. This is so that dependents can acquire read holds on @@ -2400,7 +2397,7 @@ struct CollectionState { /// `read_policy`. It also ensures that read holds on the collection's dependencies are kept at /// some time not greater than the collection's `write_frontier`, guaranteeing that the /// collection's next outputs can always be computed without skipping times. - implied_read_hold: ReadHold, + implied_read_hold: ReadHold, /// A read hold held to enable dataflow warmup. /// /// Dataflow warmup is an optimization that allows dataflows to immediately start hydrating @@ -2408,20 +2405,20 @@ struct CollectionState { /// By installing a read capability derived from the write frontiers of the collection's /// inputs, we ensure that the as-of of new dataflows installed for the collection is at a time /// that is immediately available, so hydration can begin immediately too. - warmup_read_hold: ReadHold, + warmup_read_hold: ReadHold, /// The policy to use to downgrade `self.implied_read_hold`. /// /// If `None`, the collection is a write-only collection (i.e. a sink). For write-only /// collections, the `implied_read_hold` is only required for maintaining read holds on the /// inputs, so we can immediately downgrade it to the `write_frontier`. - read_policy: Option>, + read_policy: Option, /// Storage identifiers on which this collection depends, and read holds this collection /// requires on them. - storage_dependencies: BTreeMap>, + storage_dependencies: BTreeMap, /// Compute identifiers on which this collection depends, and read holds this collection /// requires on them. - compute_dependencies: BTreeMap>, + compute_dependencies: BTreeMap, /// Introspection state associated with this collection. introspection: CollectionIntrospection, @@ -2450,9 +2447,9 @@ impl CollectionState { collection_id: GlobalId, as_of: Antichain, shared: SharedCollectionState, - storage_dependencies: BTreeMap>, - compute_dependencies: BTreeMap>, - read_hold_tx: read_holds::ChangeTx, + storage_dependencies: BTreeMap, + compute_dependencies: BTreeMap, + read_hold_tx: read_holds::ChangeTx, introspection: CollectionIntrospection, ) -> Self { // A collection is not readable before the `as_of`. @@ -2505,7 +2502,7 @@ impl CollectionState { fn new_log_collection( id: GlobalId, shared: SharedCollectionState, - read_hold_tx: read_holds::ChangeTx, + read_hold_tx: read_holds::ChangeTx, introspection_tx: mpsc::UnboundedSender, ) -> Self { let since = Antichain::from_elem(Timestamp::MIN); @@ -2943,7 +2940,7 @@ struct PendingPeek { /// Used to track peek durations. requested_at: Instant, /// The read hold installed to serve this peek. - read_hold: ReadHold, + read_hold: ReadHold, /// The channel to send peek results. peek_response_tx: oneshot::Sender, /// An optional limit of the peek's result size. @@ -3014,7 +3011,7 @@ impl ReplicaState { &mut self, id: GlobalId, as_of: Antichain, - input_read_holds: Vec>, + input_read_holds: Vec, ) { let metrics = self.metrics.for_collection(id); let introspection = ReplicaCollectionIntrospection::new( @@ -3114,7 +3111,7 @@ struct ReplicaCollectionState { /// These read holds are kept to ensure that the replica is able to read from storage inputs at /// all times it hasn't read yet. We only need to install read holds for storage inputs since /// compaction of compute inputs is implicitly held back by Timely/DD. - input_read_holds: Vec>, + input_read_holds: Vec, /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update. /// @@ -3127,7 +3124,7 @@ impl ReplicaCollectionState { metrics: Option, as_of: Antichain, introspection: ReplicaCollectionIntrospection, - input_read_holds: Vec>, + input_read_holds: Vec, ) -> Self { Self { write_frontier: as_of.clone(), diff --git a/src/compute-client/src/controller/instance_client.rs b/src/compute-client/src/controller/instance_client.rs index 37bc059b93c1e..cf7a21a59c33d 100644 --- a/src/compute-client/src/controller/instance_client.rs +++ b/src/compute-client/src/controller/instance_client.rs @@ -101,11 +101,11 @@ pub struct InstanceClient { command_tx: mpsc::UnboundedSender, /// A sender for read hold changes for collections installed on the instance. #[derivative(Debug = "ignore")] - read_hold_tx: read_holds::ChangeTx, + read_hold_tx: read_holds::ChangeTx, } impl InstanceClient { - pub(super) fn read_hold_tx(&self) -> read_holds::ChangeTx { + pub(super) fn read_hold_tx(&self) -> read_holds::ChangeTx { Arc::clone(&self.read_hold_tx) } @@ -163,7 +163,7 @@ impl InstanceClient { ) -> Self { let (command_tx, command_rx) = mpsc::unbounded_channel(); - let read_hold_tx: read_holds::ChangeTx<_> = { + let read_hold_tx: read_holds::ChangeTx = { let command_tx = command_tx.clone(); Arc::new(move |id, change: ChangeBatch<_>| { let cmd: Command = { @@ -205,8 +205,7 @@ impl InstanceClient { pub async fn acquire_read_holds_and_collection_write_frontiers( &self, ids: Vec, - ) -> Result, Antichain)>, AcquireReadHoldsError> - { + ) -> Result)>, AcquireReadHoldsError> { self.call_sync(move |i| { let mut result = Vec::new(); for id in ids.into_iter() { @@ -233,7 +232,7 @@ impl InstanceClient { result_desc: RelationDesc, finishing: RowSetFinishing, map_filter_project: mz_expr::SafeMfpPlan, - target_read_hold: ReadHold, + target_read_hold: ReadHold, target_replica: Option, peek_response_tx: oneshot::Sender, ) -> Result<(), PeekError> { diff --git a/src/compute-client/src/protocol/command.rs b/src/compute-client/src/protocol/command.rs index 59b13ee6ff4a4..67b951edeee87 100644 --- a/src/compute-client/src/protocol/command.rs +++ b/src/compute-client/src/protocol/command.rs @@ -18,7 +18,7 @@ use mz_dyncfg::ConfigUpdates; use mz_expr::RowSetFinishing; use mz_ore::tracing::OpenTelemetryContext; use mz_persist_types::PersistLocation; -use mz_repr::{GlobalId, RelationDesc, Row}; +use mz_repr::{GlobalId, RelationDesc, Row, Timestamp}; use mz_service::params::GrpcClientParameters; use mz_storage_types::controller::CollectionMetadata; use mz_tracing::params::TracingParameters; @@ -35,7 +35,7 @@ use crate::logging::LoggingConfig; /// /// [Protocol Stages]: super#protocol-stages #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub enum ComputeCommand { +pub enum ComputeCommand { /// `Hello` is the first command sent to a replica after a connection was established. It /// provides the replica with meta information about the connection. /// @@ -141,7 +141,7 @@ pub enum ComputeCommand { /// [`Frontiers`]: super::response::ComputeResponse::Frontiers /// [`SubscribeResponse`]: super::response::ComputeResponse::SubscribeResponse /// [`CopyToResponse`]: super::response::ComputeResponse::CopyToResponse - CreateDataflow(Box, CollectionMetadata, T>>), + CreateDataflow(Box>), /// `Schedule` allows the replica to start computation for a compute collection. /// @@ -210,7 +210,7 @@ pub enum ComputeCommand { /// TODO(database-issues#7533): Add documentation. id: GlobalId, /// TODO(database-issues#7533): Add documentation. - frontier: Antichain, + frontier: Antichain, }, /// `Peek` instructs the replica to perform a peek on a collection: either an index or a @@ -241,7 +241,7 @@ pub enum ComputeCommand { /// [`Rows`]: super::response::PeekResponse::Rows /// [`Error`]: super::response::PeekResponse::Error /// [`Canceled`]: super::response::PeekResponse::Canceled - Peek(Box>), + Peek(Box), /// `CancelPeek` instructs the replica to cancel the identified pending peek. /// @@ -417,7 +417,7 @@ impl PeekTarget { /// the dataflow runners are responsible for ensuring that they can /// correctly answer the `Peek`. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct Peek { +pub struct Peek { /// Target-specific metadata. pub target: PeekTarget, /// The relation description for the rows returned by this peek, before @@ -432,7 +432,7 @@ pub struct Peek { /// Used in responses and cancellation requests. pub uuid: Uuid, /// The logical timestamp at which the collection is queried. - pub timestamp: T, + pub timestamp: Timestamp, /// Actions to apply to the result set before returning them. pub finishing: RowSetFinishing, /// Linear operation to apply in-line on each result. diff --git a/src/compute-types/src/dataflows.rs b/src/compute-types/src/dataflows.rs index d238c1ef4a979..8b31973a34c85 100644 --- a/src/compute-types/src/dataflows.rs +++ b/src/compute-types/src/dataflows.rs @@ -16,7 +16,7 @@ use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelati use mz_ore::collections::CollectionExt; use mz_ore::soft_assert_or_log; use mz_repr::refresh_schedule::RefreshSchedule; -use mz_repr::{GlobalId, ReprRelationType, SqlRelationType}; +use mz_repr::{GlobalId, ReprRelationType, SqlRelationType, Timestamp}; use mz_storage_types::time_dependence::TimeDependence; use serde::{Deserialize, Serialize}; use timely::progress::Antichain; @@ -28,9 +28,9 @@ use crate::sources::{SourceInstanceArguments, SourceInstanceDesc}; /// A description of a dataflow to construct and results to surface. #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub struct DataflowDescription { +pub struct DataflowDescription { /// Sources instantiations made available to the dataflow pair with monotonicity information. - pub source_imports: BTreeMap>, + pub source_imports: BTreeMap>, /// Indexes made available to the dataflow. /// (id of index, import) pub index_imports: BTreeMap, @@ -43,23 +43,23 @@ pub struct DataflowDescription { pub index_exports: BTreeMap, /// sinks to be created /// (id of new sink, description of sink) - pub sink_exports: BTreeMap>, + pub sink_exports: BTreeMap>, /// An optional frontier to which inputs should be advanced. /// /// If this is set, it should override the default setting determined by /// the upper bound of `since` frontiers contributing to the dataflow. /// It is an error for this to be set to a frontier not beyond that default. - pub as_of: Option>, + pub as_of: Option>, /// Frontier beyond which the dataflow should not execute. /// Specifically, updates at times greater or equal to this frontier are suppressed. /// This is often set to `as_of + 1` to enable "batch" computations. /// Note that frontier advancements might still happen to times that are after the `until`, /// only data is suppressed. (This is consistent with how frontier advancements can also /// happen before the `as_of`.) - pub until: Antichain, + pub until: Antichain, /// The initial as_of when the collection is first created. Filled only for materialized views. /// Note that this doesn't change upon restarts. - pub initial_storage_as_of: Option>, + pub initial_storage_as_of: Option>, /// The schedule of REFRESH materialized views. pub refresh_schedule: Option, /// Human-readable name @@ -68,7 +68,7 @@ pub struct DataflowDescription { pub time_dependence: Option, } -impl DataflowDescription { +impl DataflowDescription { /// Tests if the dataflow refers to a single timestamp, namely /// that `as_of` has a single coordinate and that the `until` /// value corresponds to the `as_of` value plus one, or `as_of` @@ -103,7 +103,7 @@ impl DataflowDescription { } } -impl DataflowDescription, (), mz_repr::Timestamp> { +impl DataflowDescription { /// Check invariants expected to be true about `DataflowDescription`s. pub fn check_invariants(&self) -> Result<(), String> { let mut plans: Vec<_> = self.objects_to_build.iter().map(|o| &o.plan).collect(); @@ -123,7 +123,7 @@ impl DataflowDescription, (), mz_repr::Timestamp> { } } -impl DataflowDescription { +impl DataflowDescription { /// Imports a previously exported index. /// /// This method makes available an index previously exported as `id`, identified @@ -198,7 +198,7 @@ impl DataflowDescription { } /// Exports as `id` a sink described by `description`. - pub fn export_sink(&mut self, id: GlobalId, description: ComputeSinkDesc<(), T>) { + pub fn export_sink(&mut self, id: GlobalId, description: ComputeSinkDesc<()>) { self.sink_exports.insert(id, description); } @@ -254,7 +254,7 @@ impl DataflowDescription { } } -impl DataflowDescription { +impl DataflowDescription { /// Creates a new dataflow description with a human-readable name. pub fn new(name: String) -> Self { Self { @@ -295,12 +295,12 @@ impl DataflowDescription { /// Generally, one should consider setting `as_of` at least to the `since` /// frontiers of contributing data sources and as aggressively as the /// computation permits. - pub fn set_as_of(&mut self, as_of: Antichain) { + pub fn set_as_of(&mut self, as_of: Antichain) { self.as_of = Some(as_of); } /// Records the initial `as_of` of the storage collection associated with a materialized view. - pub fn set_initial_as_of(&mut self, initial_as_of: Antichain) { + pub fn set_initial_as_of(&mut self, initial_as_of: Antichain) { self.initial_storage_as_of = Some(initial_as_of); } @@ -419,7 +419,7 @@ impl DataflowDescription { } } -impl DataflowDescription +impl DataflowDescription where P: CollectionPlan, { @@ -488,10 +488,9 @@ where } } -impl DataflowDescription +impl DataflowDescription where S: Clone + PartialEq, - T: Clone + timely::PartialOrder, { /// Determine if a dataflow description is compatible with this dataflow description. /// @@ -611,7 +610,7 @@ pub struct IndexImport { /// Information about an imported source, and how it will be used by the dataflow. #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub struct SourceImport { +pub struct SourceImport { /// Description of the source instance to import. pub desc: SourceInstanceDesc, /// Whether the source will supply monotonic data. @@ -619,7 +618,7 @@ pub struct SourceImport { /// Whether this import must include the snapshot data. pub with_snapshot: bool, /// The initial known upper frontier for the source. - pub upper: Antichain, + pub upper: Antichain, } /// An association of a global identifier to an expression. diff --git a/src/compute-types/src/explain.rs b/src/compute-types/src/explain.rs index 519ecdaf977e1..d6c24ad24eb9f 100644 --- a/src/compute-types/src/explain.rs +++ b/src/compute-types/src/explain.rs @@ -152,7 +152,7 @@ impl<'a> DataflowDescription { } /// TODO(database-issues#7533): Add documentation. -pub fn export_ids_for(dd: &DataflowDescription) -> BTreeMap { +pub fn export_ids_for(dd: &DataflowDescription) -> BTreeMap { let mut map = BTreeMap::::default(); // Dataflows created from a `CREATE MATERIALIZED VIEW` have: diff --git a/src/compute-types/src/plan.rs b/src/compute-types/src/plan.rs index 73ef98bc5c45b..987d42fec800c 100644 --- a/src/compute-types/src/plan.rs +++ b/src/compute-types/src/plan.rs @@ -23,7 +23,7 @@ use mz_ore::str::Indent; use mz_repr::explain::text::text_string_at; use mz_repr::explain::{DummyHumanizer, ExplainConfig, ExprHumanizer, PlanRenderingContext}; use mz_repr::optimize::OptimizerFeatures; -use mz_repr::{Diff, GlobalId, Row}; +use mz_repr::{Diff, GlobalId, Row, Timestamp}; use serde::{Deserialize, Serialize}; use crate::dataflows::DataflowDescription; @@ -140,20 +140,20 @@ impl std::fmt::Display for LirId { /// A rendering plan with as much conditional logic as possible removed. #[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)] -pub struct Plan { +pub struct Plan { /// A dataflow-local identifier. pub lir_id: LirId, /// The underlying operator. - pub node: PlanNode, + pub node: PlanNode, } /// The actual AST node of the `Plan`. #[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)] -pub enum PlanNode { +pub enum PlanNode { /// A collection containing a pre-determined collection. Constant { /// Explicit update triples for the collection. - rows: Result, EvalError>, + rows: Result, EvalError>, }, /// A reference to a bound collection. /// @@ -186,10 +186,10 @@ pub enum PlanNode { /// The local identifier to be used, available to `body` as `Id::Local(id)`. id: LocalId, /// The collection that should be bound to `id`. - value: Box>, + value: Box, /// The collection that results, which is allowed to contain `Get` stages /// that reference `Id::Local(id)`. - body: Box>, + body: Box, }, /// Binds `values` to `ids`, evaluates them potentially recursively, and returns `body`. /// @@ -201,12 +201,12 @@ pub enum PlanNode { /// The local identifiers to be used, available to `body` as `Id::Local(id)`. ids: Vec, /// The collection that should be bound to `id`. - values: Vec>, + values: Vec, /// Maximum number of iterations. See further info on the MIR `LetRec`. limits: Vec>, /// The collection that results, which is allowed to contain `Get` stages /// that reference `Id::Local(id)`. - body: Box>, + body: Box, }, /// Map, Filter, and Project operators. /// @@ -215,7 +215,7 @@ pub enum PlanNode { /// and sometimes reduce stages are not able to absorb this operator. Mfp { /// The input collection. - input: Box>, + input: Box, /// Linear operator to apply to each record. mfp: MapFilterProject, /// Whether the input is from an arrangement, and if so, @@ -239,7 +239,7 @@ pub enum PlanNode { /// if any input_key: Option>, /// The input collection. - input: Box>, + input: Box, /// Expressions that for each row prepare the arguments to `func`. exprs: Vec, /// The variable-record emitting function. @@ -254,7 +254,7 @@ pub enum PlanNode { /// strategy we will use, and any pushed down per-record work. Join { /// An ordered list of inputs that will be joined. - inputs: Vec>, + inputs: Vec, /// Detailed information about the implementation of the join. /// /// This includes information about the implementation strategy, but also @@ -268,7 +268,7 @@ pub enum PlanNode { /// if any input_key: Option>, /// The input collection. - input: Box>, + input: Box, /// A plan for changing input records into key, value pairs. key_val_plan: KeyValPlan, /// A plan for performing the reduce. @@ -287,7 +287,7 @@ pub enum PlanNode { /// Key-based "Top K" operator, retaining the first K records in each group. TopK { /// The input collection. - input: Box>, + input: Box, /// A plan for performing the Top-K. /// /// The implementation of reduction has several different strategies based @@ -298,7 +298,7 @@ pub enum PlanNode { /// Inverts the sign of each update. Negate { /// The input collection. - input: Box>, + input: Box, }, /// Filters records that accumulate negatively. /// @@ -306,7 +306,7 @@ pub enum PlanNode { /// resources proportional to the number of records with non-zero accumulation. Threshold { /// The input collection. - input: Box>, + input: Box, /// A plan for performing the threshold. /// /// The implementation of reduction has several different strategies based @@ -322,7 +322,7 @@ pub enum PlanNode { /// implementing the "distinct" operator. Union { /// The input collections - inputs: Vec>, + inputs: Vec, /// Whether to consolidate the output, e.g., cancel negated records. consolidate_output: bool, }, @@ -336,7 +336,7 @@ pub enum PlanNode { /// The key that must be used to access the input. input_key: Option>, /// The input collection. - input: Box>, + input: Box, /// The MFP that must be applied to the input. input_mfp: MapFilterProject, /// A list of arrangement keys, and possibly a raw collection, @@ -346,9 +346,9 @@ pub enum PlanNode { }, } -impl PlanNode { +impl PlanNode { /// Iterates through references to child expressions. - pub fn children(&self) -> impl Iterator> { + pub fn children(&self) -> impl Iterator { let mut first = None; let mut second = None; let mut rest = None; @@ -387,7 +387,7 @@ impl PlanNode { } /// Iterates through mutable references to child expressions. - pub fn children_mut(&mut self) -> impl Iterator> { + pub fn children_mut(&mut self) -> impl Iterator { let mut first = None; let mut second = None; let mut rest = None; @@ -426,9 +426,9 @@ impl PlanNode { } } -impl PlanNode { +impl PlanNode { /// Attach an `lir_id` to a `PlanNode` to make a complete `Plan`. - pub fn as_plan(self, lir_id: LirId) -> Plan { + pub fn as_plan(self, lir_id: LirId) -> Plan { Plan { lir_id, node: self } } } @@ -469,7 +469,7 @@ pub enum GetPlan { Collection(MapFilterProject), } -impl Plan { +impl Plan { /// Convert the dataflow description into one that uses render plans. #[mz_ore::instrument( target = "optimizer", @@ -704,7 +704,7 @@ impl Plan { // a single-time dataflow. assert!(dataflow.is_single_time()); - let transform = transform::RelaxMustConsolidate::::new(); + let transform = transform::RelaxMustConsolidate; for build_desc in dataflow.objects_to_build.iter_mut() { transform .transform(config, &mut build_desc.plan) @@ -715,7 +715,7 @@ impl Plan { } } -impl CollectionPlan for PlanNode { +impl CollectionPlan for PlanNode { fn depends_on_into(&self, out: &mut BTreeSet) { match self { PlanNode::Constant { rows: _ } => (), @@ -793,7 +793,7 @@ impl CollectionPlan for PlanNode { } } -impl CollectionPlan for Plan { +impl CollectionPlan for Plan { fn depends_on_into(&self, out: &mut BTreeSet) { self.node.depends_on_into(out); } diff --git a/src/compute-types/src/plan/interpret/api.rs b/src/compute-types/src/plan/interpret/api.rs index b93da25fd7270..b7cfd0b526cec 100644 --- a/src/compute-types/src/plan/interpret/api.rs +++ b/src/compute-types/src/plan/interpret/api.rs @@ -25,7 +25,7 @@ use mz_expr::{ use mz_ore::cast::CastFrom; use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError}; use mz_ore::{assert_none, soft_panic_or_log}; -use mz_repr::{Diff, Row}; +use mz_repr::{Diff, Row, Timestamp}; use crate::plan::join::JoinPlan; use crate::plan::reduce::{KeyValPlan, ReducePlan}; @@ -47,7 +47,7 @@ use crate::plan::{AvailableCollections, GetPlan, Plan, PlanNode}; /// [tagless final encoding]: /// /// TODO(database-issues#7446): align this with the `Plan` structure -pub trait Interpreter { +pub trait Interpreter { /// TODO(database-issues#7533): Add documentation. type Domain: Debug + Sized; @@ -55,7 +55,7 @@ pub trait Interpreter { fn constant( &self, ctx: &Context, - rows: &Result, EvalError>, + rows: &Result, EvalError>, ) -> Self::Domain; /// TODO(database-issues#7533): Add documentation. @@ -212,17 +212,17 @@ const MAX_LET_REC_ITERATIONS: u64 = 100; /// A wrapper for a recursive fold invocation over a [Plan] that cannot /// mutate its input. #[allow(missing_debug_implementations)] -pub struct Fold +pub struct Fold where - I: Interpreter, + I: Interpreter, { interpret: I, ctx: Context, } -impl Fold +impl Fold where - I: Interpreter, + I: Interpreter, I::Domain: BoundedLattice + Clone, { /// TODO(database-issues#7533): Add documentation. @@ -238,13 +238,13 @@ where /// Runs an abstract interpreter over the given `expr` in a bottom-up /// manner, keeping the `ctx` field of the enclosing field up to date, and /// returns the final result for the entire `expr`. - pub fn apply(&mut self, expr: &Plan) -> Result { + pub fn apply(&mut self, expr: &Plan) -> Result { self.apply_rec(expr, &RecursionGuard::with_limit(RECURSION_LIMIT)) } fn apply_rec( &mut self, - expr: &Plan, + expr: &Plan, rg: &RecursionGuard, ) -> Result { use PlanNode::*; @@ -460,20 +460,20 @@ where /// A wrapper for a recursive fold invocation over a [Plan] that can /// mutate its input. #[allow(missing_debug_implementations)] -pub struct FoldMut +pub struct FoldMut where - I: Interpreter, + I: Interpreter, { interpret: I, action: Action, ctx: Context, } -impl FoldMut +impl FoldMut where - I: Interpreter, + I: Interpreter, I::Domain: BoundedLattice + Clone, - A: FnMut(&mut Plan, &I::Domain, &[I::Domain]), + A: FnMut(&mut Plan, &I::Domain, &[I::Domain]), { /// TODO(database-issues#7533): Add documentation. pub fn new(interpreter: I, action: A) -> Self { @@ -493,13 +493,13 @@ where /// At each step, the current `expr` is passed along with the interpretation /// result of itself and its children to an `action` callback that can /// optionally mutate it. - pub fn apply(&mut self, expr: &mut Plan) -> Result { + pub fn apply(&mut self, expr: &mut Plan) -> Result { self.apply_rec(expr, &RecursionGuard::with_limit(RECURSION_LIMIT)) } fn apply_rec( &mut self, - expr: &mut Plan, + expr: &mut Plan, rg: &RecursionGuard, ) -> Result { use PlanNode::*; diff --git a/src/compute-types/src/plan/interpret/physically_monotonic.rs b/src/compute-types/src/plan/interpret/physically_monotonic.rs index da0b930df71f0..a48de2f6a7e57 100644 --- a/src/compute-types/src/plan/interpret/physically_monotonic.rs +++ b/src/compute-types/src/plan/interpret/physically_monotonic.rs @@ -12,11 +12,10 @@ use std::cmp::Reverse; use std::collections::BTreeSet; -use std::marker::PhantomData; use differential_dataflow::lattice::Lattice; use mz_expr::{EvalError, Id, MapFilterProject, MirScalarExpr, TableFunc}; -use mz_repr::{Diff, GlobalId, Row}; +use mz_repr::{Diff, GlobalId, Row, Timestamp}; use timely::PartialOrder; use crate::plan::interpret::{BoundedLattice, Context, Interpreter}; @@ -70,29 +69,25 @@ impl PartialOrder for PhysicallyMonotonic { /// of retractions in a stream, enables us to disable forced consolidation /// whenever possible. #[derive(Debug)] -pub struct SingleTimeMonotonic<'a, T = mz_repr::Timestamp> { +pub struct SingleTimeMonotonic<'a> { monotonic_ids: &'a BTreeSet, - _phantom: PhantomData, } -impl<'a, T> SingleTimeMonotonic<'a, T> { +impl<'a> SingleTimeMonotonic<'a> { /// Instantiates an interpreter for single-time physical monotonicity /// analysis. pub fn new(monotonic_ids: &'a BTreeSet) -> Self { - SingleTimeMonotonic { - monotonic_ids, - _phantom: Default::default(), - } + SingleTimeMonotonic { monotonic_ids } } } -impl Interpreter for SingleTimeMonotonic<'_, T> { +impl Interpreter for SingleTimeMonotonic<'_> { type Domain = PhysicallyMonotonic; fn constant( &self, _ctx: &Context, - rows: &Result, EvalError>, + rows: &Result, EvalError>, ) -> Self::Domain { // A constant is physically monotonic iff the constant is an `EvalError` // or all its rows have `Diff` values greater than zero. diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 58488c08d4c1b..d8343f9c77d75 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -19,9 +19,8 @@ use mz_expr::{ OptimizedMirRelationExpr, TableFunc, permutation_for_arrangement, }; use mz_ore::{assert_none, soft_assert_eq_or_log, soft_panic_or_log}; -use mz_repr::GlobalId; use mz_repr::optimize::OptimizerFeatures; -use timely::progress::Timestamp; +use mz_repr::{GlobalId, Timestamp}; use crate::dataflows::{BuildDesc, DataflowDescription, IndexImport}; use crate::plan::join::{DeltaJoinPlan, JoinPlan, LinearJoinPlan}; @@ -65,10 +64,10 @@ impl Context { id } - pub fn lower( + pub fn lower( mut self, desc: DataflowDescription, - ) -> Result>, String> { + ) -> Result, String> { // Sources might provide arranged forms of their data, in the future. // Indexes provide arranged forms of their data. for IndexImport { @@ -141,10 +140,10 @@ impl Context { /// /// An empty list of arrangement keys indicates that only a `Collection` stream can /// be assumed to exist. - fn lower_mir_expr( + fn lower_mir_expr( &mut self, expr: &MirRelationExpr, - ) -> Result<(Plan, AvailableCollections), String> { + ) -> Result<(Plan, AvailableCollections), String> { // This function is recursive and can overflow its stack, so grow it if // needed. The growth here is unbounded. Our general solution for this problem // is to use [`ore::stack::RecursionGuard`] to additionally limit the stack @@ -157,13 +156,10 @@ impl Context { mz_ore::stack::maybe_grow(|| self.lower_mir_expr_stack_safe(expr)) } - fn lower_mir_expr_stack_safe( + fn lower_mir_expr_stack_safe( &mut self, expr: &MirRelationExpr, - ) -> Result<(Plan, AvailableCollections), String> - where - T: Timestamp, - { + ) -> Result<(Plan, AvailableCollections), String> { // Extract a maximally large MapFilterProject from `expr`. // We will then try and push this in to the resulting expression. // @@ -190,7 +186,7 @@ impl Context { let node = PlanNode::Constant { rows: rows.clone().map(|rows| { rows.into_iter() - .map(|(row, diff)| (row, T::minimum(), diff)) + .map(|(row, diff)| (row, Timestamp::MIN, diff)) .collect() }), }; @@ -981,7 +977,7 @@ This is not expected to cause incorrect results, but could indicate a performanc /// originally on top of the `Reduce`. This MFP, or a part of it, might be fused into the /// `Reduce`, in which case `mfp_on_top` is mutated to be the residual MFP, i.e., what was not /// fused. - fn lower_reduce( + fn lower_reduce( &mut self, input: &MirRelationExpr, group_key: &Vec, @@ -990,7 +986,7 @@ This is not expected to cause incorrect results, but could indicate a performanc expected_group_size: &Option, mfp_on_top: &mut MapFilterProject, fused_unnest_list: bool, - ) -> Result<(Plan, AvailableCollections), String> { + ) -> Result<(Plan, AvailableCollections), String> { let input_arity = input.arity(); let (input, keys) = self.lower_mir_expr(input)?; let (input_key, permutation_and_new_arity) = @@ -1048,13 +1044,13 @@ This is not expected to cause incorrect results, but could indicate a performanc /// Replace the plan with another one /// that has the collection in some additional forms. - pub fn arrange_by( + pub fn arrange_by( &mut self, - plan: Plan, + plan: Plan, collections: AvailableCollections, old_collections: &AvailableCollections, arity: usize, - ) -> Plan { + ) -> Plan { if let Plan { node: PlanNode::ArrangeBy { diff --git a/src/compute-types/src/plan/render_plan.rs b/src/compute-types/src/plan/render_plan.rs index de6edbe84e91f..c6ea43b3db30d 100644 --- a/src/compute-types/src/plan/render_plan.rs +++ b/src/compute-types/src/plan/render_plan.rs @@ -19,7 +19,7 @@ use mz_expr::{ }; use mz_ore::soft_assert_or_log; use mz_repr::explain::{CompactScalars, ExprHumanizer}; -use mz_repr::{Diff, GlobalId, Row}; +use mz_repr::{Diff, GlobalId, Row, Timestamp}; use serde::{Deserialize, Serialize}; use crate::plan::join::{DeltaJoinPlan, JoinPlan, LinearJoinPlan}; @@ -38,11 +38,11 @@ use crate::plan::{AvailableCollections, GetPlan, LirId, Plan, PlanNode}; /// /// A [`RenderPlan`] can be constructed from a [`Plan`] using the corresponding [`TryFrom`] impl. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct RenderPlan { +pub struct RenderPlan { /// Stages of bindings to render in order. - pub binds: Vec>, + pub binds: Vec, /// The binding-free body. - pub body: LetFreePlan, + pub body: LetFreePlan, } /// A set of bindings to render in order. @@ -55,29 +55,29 @@ pub struct RenderPlan { /// Rec bindings in `recs` are rendered second. Each one has access to _all_ Let and Rec bindings /// in the same stage, including itself, as well as any bindings from previous stages. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct BindStage { +pub struct BindStage { /// Non-recursive bindings. - pub lets: Vec>, + pub lets: Vec, /// Potentially recursive bindings. - pub recs: Vec>, + pub recs: Vec, } /// Binds a collection to a [`LocalId`]. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct LetBind { +pub struct LetBind { /// The identifier through which the collection can be referenced. pub id: LocalId, /// The collection that is bound to `id`. - pub value: LetFreePlan, + pub value: LetFreePlan, } /// Binds a potentially recursively defined collection to a [`LocalId`]. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct RecBind { +pub struct RecBind { /// The identifier through which the collection can be referenced. pub id: LocalId, /// The collection that is bound to `id`. - pub value: RenderPlan, + pub value: RenderPlan, /// Limits imposed on recursive iteration. pub limit: Option, } @@ -97,9 +97,9 @@ pub struct RecBind { /// The implementation of [`LetFreePlan`] must ensure that all its methods uphold these invariants /// and that users are not able to break them. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct LetFreePlan { +pub struct LetFreePlan { /// The nodes in the plan. - nodes: BTreeMap>, + nodes: BTreeMap, /// The ID of the root node. root: LirId, /// The topological order of nodes (dependencies before dependants). @@ -108,9 +108,9 @@ pub struct LetFreePlan { /// A node of a [`RenderPlan`], comprising an [`Expr`] and some metadata. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct Node { +pub struct Node { /// The relation expression for this node. - pub expr: Expr, + pub expr: Expr, /// The [`LirId`] of the parent of this node (for tree reconstruction). pub parent: Option, /// The nesting level of this node (for pretty printing). @@ -124,11 +124,11 @@ pub struct Node { /// * The `Let` and `LetRec` variants are removed. /// * The `lir_id` fields are removed. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub enum Expr { +pub enum Expr { /// A collection containing a pre-determined collection. Constant { /// Explicit update triples for the collection. - rows: Result, EvalError>, + rows: Result, EvalError>, }, /// A reference to a bound collection. /// @@ -276,7 +276,7 @@ pub enum Expr { }, } -impl TryFrom> for RenderPlan { +impl TryFrom for RenderPlan { /// The only error is "invalid input plan". type Error = (); @@ -299,7 +299,7 @@ impl TryFrom> for RenderPlan { /// ``` /// /// Input [`Plan`]s that do not satisfy this requirement will result in errors. - fn try_from(mut plan: Plan) -> Result { + fn try_from(mut plan: Plan) -> Result { use PlanNode::{Let, LetRec}; // Peel off stages of bindings. Each stage is constructed of an arbitrary amount of leading @@ -338,14 +338,14 @@ impl TryFrom> for RenderPlan { } } -impl TryFrom> for LetFreePlan { +impl TryFrom for LetFreePlan { /// The only error is "invalid input plan". type Error = (); /// Convert the given [`Plan`] into a [`LetFreePlan`]. /// /// Returns an error if the given [`Plan`] contains `Let` or `LetRec` nodes. - fn try_from(plan: Plan) -> Result { + fn try_from(plan: Plan) -> Result { use Expr::*; // The strategy is to walk walk through the `Plan` in right-to-left pre-order and for each @@ -355,9 +355,9 @@ impl TryFrom> for LetFreePlan { let root = plan.lir_id; // Stack of nodes to flatten, with their parent id and nesting. - let mut todo: Vec<(Plan, Option, u8)> = vec![(plan, None, 0)]; + let mut todo: Vec<(Plan, Option, u8)> = vec![(plan, None, 0)]; // `RenderPlan` nodes produced so far. - let mut nodes: BTreeMap> = Default::default(); + let mut nodes: BTreeMap = Default::default(); // A list remembering the order in which nodes were flattened. // Because nodes are flatten in right-to-left pre-order, reversing this list at the end // yields a valid topological order. @@ -524,7 +524,7 @@ impl TryFrom> for LetFreePlan { } } -impl CollectionPlan for RenderPlan { +impl CollectionPlan for RenderPlan { fn depends_on_into(&self, out: &mut BTreeSet) { for stage in &self.binds { for LetBind { value, .. } in &stage.lets { @@ -539,7 +539,7 @@ impl CollectionPlan for RenderPlan { } } -impl CollectionPlan for LetFreePlan { +impl CollectionPlan for LetFreePlan { fn depends_on_into(&self, out: &mut BTreeSet) { for Node { expr, .. } in self.nodes.values() { if let Expr::Get { id, .. } = expr { @@ -551,7 +551,7 @@ impl CollectionPlan for LetFreePlan { } } -impl RenderPlan { +impl RenderPlan { /// Return whether the plan contains recursion. pub fn is_recursive(&self) -> bool { self.binds.iter().any(|b| !b.recs.is_empty()) @@ -589,7 +589,7 @@ impl RenderPlan { } } -impl LetFreePlan { +impl LetFreePlan { /// Return the ID of the root node. pub fn root_id(&self) -> LirId { self.root @@ -600,7 +600,7 @@ impl LetFreePlan { /// /// This allows consuming the plan without being required to uphold the [`LetFreePlan`] /// invariants. - pub fn destruct(self) -> (BTreeMap>, LirId, Vec) { + pub fn destruct(self) -> (BTreeMap, LirId, Vec) { (self.nodes, self.root, self.topological_order) } @@ -634,7 +634,7 @@ impl LetFreePlan { } } -impl RenderPlan { +impl RenderPlan { /// Partitions the plan into `parts` many disjoint pieces. /// /// This is used to partition `PlanNode::Constant` stages so that the work @@ -668,7 +668,7 @@ impl RenderPlan { } } -impl BindStage { +impl BindStage { /// Partitions the stage into `parts` many disjoint pieces. /// /// This is used to partition `PlanNode::Constant` stages so that the work @@ -699,7 +699,7 @@ impl BindStage { } } -impl LetFreePlan { +impl LetFreePlan { /// Partitions the plan into `parts` many disjoint pieces. /// /// This is used to partition `PlanNode::Constant` stages so that the work @@ -732,7 +732,7 @@ impl LetFreePlan { } } -impl Expr { +impl Expr { /// Partitions the expr into `parts` many disjoint pieces. /// /// This is used to partition `PlanNode::Constant` stages so that the work @@ -771,7 +771,7 @@ impl Expr { } } -impl Expr { +impl Expr { /// Renders a single [`Expr`] as a string. /// /// Typically of the format "{ExprName}::{Detail} {input LirID} ({options})" @@ -787,23 +787,23 @@ impl Expr { /// /// Invariant: the [`std::fmt::Display`] instance should produce a single line for a given expr. #[derive(Debug)] -pub struct RenderPlanExprHumanizer<'a, T> { +pub struct RenderPlanExprHumanizer<'a> { /// The [`Expr`] to be rendered. - expr: &'a Expr, + expr: &'a Expr, /// Humanization information. humanizer: &'a dyn ExprHumanizer, } -impl<'a, T> RenderPlanExprHumanizer<'a, T> { +impl<'a> RenderPlanExprHumanizer<'a> { /// Creates a [`RenderPlanExprHumanizer`] (which simply holds the references). /// /// Use the [`std::fmt::Display`] instance. - pub fn new(expr: &'a Expr, humanizer: &'a dyn ExprHumanizer) -> Self { + pub fn new(expr: &'a Expr, humanizer: &'a dyn ExprHumanizer) -> Self { Self { expr, humanizer } } } -impl<'a, T> std::fmt::Display for RenderPlanExprHumanizer<'a, T> { +impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> { // NOTE: This code needs to be kept in sync with `Plan::fmt_default_text`. // // This code determines what you see in `mz_lir_mapping`; that other code diff --git a/src/compute-types/src/plan/transform/api.rs b/src/compute-types/src/plan/transform/api.rs index befaecf2fa696..3682654fd64be 100644 --- a/src/compute-types/src/plan/transform/api.rs +++ b/src/compute-types/src/plan/transform/api.rs @@ -26,7 +26,7 @@ pub struct TransformConfig { } /// A transform for [crate::plan::Plan] nodes. -pub trait Transform { +pub trait Transform { /// TODO(database-issues#7533): Add documentation. fn name(&self) -> &'static str; @@ -39,7 +39,7 @@ pub trait Transform { fn transform( &self, config: &TransformConfig, - plan: &mut Plan, + plan: &mut Plan, ) -> Result<(), RecursionLimitError> { let _span = tracing::span!(target: "optimizer", tracing::Level::TRACE, @@ -54,19 +54,19 @@ pub trait Transform { fn do_transform( &self, config: &TransformConfig, - plan: &mut Plan, + plan: &mut Plan, ) -> Result<(), RecursionLimitError>; } /// TODO(database-issues#7533): Add documentation. -pub trait BottomUpTransform { +pub trait BottomUpTransform { /// A type representing analysis information to be associated with each /// sub-term and exposed to the transformation action callback. type Info: BoundedLattice + Clone; /// A type responsible for synthesizing the [Self::Info] associated with /// each sub-term. - type Interpreter<'a>: Interpreter; + type Interpreter<'a>: Interpreter; /// The name for this transform. fn name(&self) -> &'static str; @@ -76,12 +76,12 @@ pub trait BottomUpTransform { /// A callback for manipulating the root of the given [Plan] using the /// [Self::Info] derived for itself and its children. - fn action(plan: &mut Plan, plan_info: &Self::Info, input_infos: &[Self::Info]); + fn action(plan: &mut Plan, plan_info: &Self::Info, input_infos: &[Self::Info]); } -impl Transform for A +impl Transform for A where - A: BottomUpTransform, + A: BottomUpTransform, { fn name(&self) -> &'static str { self.name() @@ -90,7 +90,7 @@ where fn do_transform( &self, config: &TransformConfig, - plan: &mut Plan, + plan: &mut Plan, ) -> Result<(), RecursionLimitError> { let mut fold = FoldMut::new(Self::interpreter(config), Self::action); fold.apply(plan).map(|_| ()) diff --git a/src/compute-types/src/plan/transform/relax_must_consolidate.rs b/src/compute-types/src/plan/transform/relax_must_consolidate.rs index 8695f70ecd4e7..a9d85d6642480 100644 --- a/src/compute-types/src/plan/transform/relax_must_consolidate.rs +++ b/src/compute-types/src/plan/transform/relax_must_consolidate.rs @@ -10,8 +10,6 @@ //! Placeholder module for an [crate::plan::transform::Transform] that infers //! physical monotonicity. -use std::marker::PhantomData; - use crate::plan::interpret::{PhysicallyMonotonic, SingleTimeMonotonic}; use crate::plan::reduce::{HierarchicalPlan, MonotonicPlan}; use crate::plan::top_k::{MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan}; @@ -22,23 +20,12 @@ use crate::plan::{Plan, PlanNode, ReducePlan}; /// analysis and refines, as appropriate, the setting of the `must_consolidate` /// flag in monotonic `Plan` nodes with forced consolidation. #[derive(Debug)] -pub struct RelaxMustConsolidate { - _phantom: PhantomData, -} - -impl RelaxMustConsolidate { - /// TODO(database-issues#7533): Add documentation. - pub fn new() -> Self { - RelaxMustConsolidate { - _phantom: Default::default(), - } - } -} +pub struct RelaxMustConsolidate; -impl BottomUpTransform for RelaxMustConsolidate { +impl BottomUpTransform for RelaxMustConsolidate { type Info = PhysicallyMonotonic; - type Interpreter<'a> = SingleTimeMonotonic<'a, T>; + type Interpreter<'a> = SingleTimeMonotonic<'a>; fn name(&self) -> &'static str { "must_consolidate relaxation" @@ -48,7 +35,7 @@ impl BottomUpTransform for RelaxMustConsolidate { SingleTimeMonotonic::new(&config.monotonic_ids) } - fn action(plan: &mut Plan, _plan_info: &Self::Info, input_infos: &[Self::Info]) { + fn action(plan: &mut Plan, _plan_info: &Self::Info, input_infos: &[Self::Info]) { // Look at `input_infos` and type of `Plan` node and refine the `must_consolidate` flag. // Note that the LIR nodes we care about have a single input. match (&mut plan.node, input_infos) { diff --git a/src/compute-types/src/sinks.rs b/src/compute-types/src/sinks.rs index 2ecf1714bc2b1..ce39f21f220c2 100644 --- a/src/compute-types/src/sinks.rs +++ b/src/compute-types/src/sinks.rs @@ -19,7 +19,7 @@ use timely::progress::Antichain; /// A sink for updates to a relational collection. #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -pub struct ComputeSinkDesc { +pub struct ComputeSinkDesc { /// TODO(database-issues#7533): Add documentation. pub from: GlobalId, /// TODO(database-issues#7533): Add documentation. @@ -29,7 +29,7 @@ pub struct ComputeSinkDesc { /// TODO(database-issues#7533): Add documentation. pub with_snapshot: bool, /// TODO(database-issues#7533): Add documentation. - pub up_to: Antichain, + pub up_to: Antichain, /// TODO(database-issues#7533): Add documentation. pub non_null_assertions: Vec, /// TODO(database-issues#7533): Add documentation. diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index a0df8623b335c..386f7b1a34af8 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -1087,8 +1087,8 @@ impl<'a> ActiveComputeState<'a> { pub fn determine_dataflow_expiration( &self, time_dependence: &TimeDependence, - until: &Antichain, - ) -> Antichain { + until: &Antichain, + ) -> Antichain { // Evaluate time dependence with respect to the expiration time. // * Step time forward to ensure the expiration time is different to the moment a dataflow // can legitimately jump to. @@ -1098,7 +1098,7 @@ impl<'a> ActiveComputeState<'a> { .replica_expiration .iter() .filter_map(|t| time_dependence.apply(*t)) - .filter_map(|t| mz_repr::Timestamp::try_step_forward(&t)) + .filter_map(|t| Timestamp::try_step_forward(&t)) .filter(|expiration| !until.less_equal(expiration)); Antichain::from_iter(iter) } @@ -1523,7 +1523,7 @@ impl IndexPeek { /// Collects data for a known-complete peek from the ok stream. fn collect_ok_finished_data( - peek: &Peek, + peek: &Peek, oks_handle: &mut Tr, max_result_size: u64, peek_stash_eligible: bool, @@ -1535,7 +1535,7 @@ impl IndexPeek { Key<'a>: ToDatumIter + Eq, KeyContainer: BatchContainer, Val<'a>: ToDatumIter, - TimeGat<'a>: PartialOrder, + TimeGat<'a>: PartialOrder, DiffGat<'a> = &'a Diff, >, { diff --git a/src/compute/src/render/continual_task.rs b/src/compute/src/render/continual_task.rs index b279b092730da..1138e0b477239 100644 --- a/src/compute/src/render/continual_task.rs +++ b/src/compute/src/render/continual_task.rs @@ -317,7 +317,7 @@ impl ContinualTaskSourceTransformer { } impl<'scope> ContinualTaskCtx<'scope> { - pub fn new(dataflow: &DataflowDescription) -> Self { + pub fn new(dataflow: &DataflowDescription) -> Self { let mut name = None; let mut ct_inputs = BTreeSet::new(); let mut ct_outputs = BTreeSet::new(); diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index 733410d2b1984..5d565e037e23e 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -767,10 +767,10 @@ pub struct ExportState { /// The read holds that this export has on its dependencies (its input and itself). When /// the upper of the export changes, we downgrade this, which in turn /// downgrades holds we have on our dependencies' sinces. - pub read_holds: [ReadHold; 2], + pub read_holds: [ReadHold; 2], /// The policy to use to downgrade `self.read_capability`. - pub read_policy: ReadPolicy, + pub read_policy: ReadPolicy, /// Reported write frontier. pub write_frontier: Antichain, @@ -779,10 +779,10 @@ pub struct ExportState { impl ExportState { pub fn new( cluster_id: StorageInstanceId, - read_hold: ReadHold, - self_hold: ReadHold, + read_hold: ReadHold, + self_hold: ReadHold, write_frontier: Antichain, - read_policy: ReadPolicy, + read_policy: ReadPolicy, ) -> Self { let mut dependency_since = Antichain::from_elem(Timestamp::MIN); for read_hold in [&read_hold, &self_hold] { @@ -804,7 +804,7 @@ impl ExportState { } /// Returns the cluster to which the export is bound. - pub fn input_hold(&self) -> &ReadHold { + pub fn input_hold(&self) -> &ReadHold { &self.read_holds[0] } diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index 67db8d997da44..2fb46fbb5ba87 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -282,14 +282,14 @@ pub trait StorageCollections: Debug + Sync { /// /// Identifiers not present in `policies` retain their existing read /// policies. - fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy)>); + fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy)>); /// Acquires and returns the earliest possible read holds for the specified /// collections. fn acquire_read_holds( &self, desired_holds: Vec, - ) -> Result>, CollectionMissing>; + ) -> Result, CollectionMissing>; /// Get the time dependence for a storage collection. Returns no value if unknown or if /// the object isn't managed by storage. @@ -1120,7 +1120,7 @@ impl StorageCollectionsImpl { fn set_read_policies_inner( &self, collections: &mut BTreeMap, - policies: Vec<(GlobalId, ReadPolicy)>, + policies: Vec<(GlobalId, ReadPolicy)>, ) { trace!("set_read_policies: {:?}", policies); @@ -2203,7 +2203,7 @@ impl StorageCollections for StorageCollectionsImpl { self.synchronize_finalized_shards(storage_metadata); } - fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy)>) { + fn set_read_policies(&self, policies: Vec<(GlobalId, ReadPolicy)>) { let mut collections = self.collections.lock().expect("lock poisoned"); if tracing::enabled!(tracing::Level::TRACE) { @@ -2238,7 +2238,7 @@ impl StorageCollections for StorageCollectionsImpl { fn acquire_read_holds( &self, desired_holds: Vec, - ) -> Result>, CollectionMissing> { + ) -> Result, CollectionMissing> { if desired_holds.is_empty() { return Ok(vec![]); } @@ -2497,7 +2497,7 @@ struct CollectionState { pub implied_capability: Antichain, /// The policy to use to downgrade `self.implied_capability`. - pub read_policy: ReadPolicy, + pub read_policy: ReadPolicy, /// Storage identifiers on which this collection depends. pub storage_dependencies: Vec, diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index f4a43baeb4c67..cfee2041538dd 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -2837,7 +2837,7 @@ where // This is really only used when dropping things, where we set the // ReadPolicy to the empty Antichain. #[instrument(level = "debug")] - fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy)>) { + fn set_hold_policies(&mut self, policies: Vec<(GlobalId, ReadPolicy)>) { let mut read_capability_changes = BTreeMap::default(); for (id, policy) in policies.into_iter() { @@ -3925,7 +3925,7 @@ struct IngestionState { pub derived_since: Antichain, /// Holds that this ingestion (or ingestion export) has on its dependencies. - pub dependency_read_holds: Vec>, + pub dependency_read_holds: Vec, /// Reported write frontier. pub write_frontier: Antichain, @@ -3936,7 +3936,7 @@ struct IngestionState { /// This is a _storage-controller-internal_ policy used to derive its /// personal read hold on the collection. It should not be confused with any /// read policies that the adapter might install at [StorageCollections]. - pub hold_policy: ReadPolicy, + pub hold_policy: ReadPolicy, /// The ID of the instance in which the ingestion is running. pub instance_id: StorageInstanceId, diff --git a/src/storage-types/src/read_holds.rs b/src/storage-types/src/read_holds.rs index e51729b25ce89..afd2a09b33c7e 100644 --- a/src/storage-types/src/read_holds.rs +++ b/src/storage-types/src/read_holds.rs @@ -10,15 +10,18 @@ use std::fmt::Debug; use std::sync::Arc; -use mz_repr::GlobalId; +use mz_repr::{GlobalId, Timestamp}; use thiserror::Error; use timely::PartialOrder; -use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp}; +use timely::progress::{Antichain, ChangeBatch}; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::error::SendError; -pub type ChangeTx = Arc< - dyn Fn(GlobalId, ChangeBatch) -> Result<(), SendError<(GlobalId, ChangeBatch)>> +pub type ChangeTx = Arc< + dyn Fn( + GlobalId, + ChangeBatch, + ) -> Result<(), SendError<(GlobalId, ChangeBatch)>> + Send + Sync, >; @@ -29,18 +32,18 @@ pub type ChangeTx = Arc< /// /// This [ReadHold] is safe to drop. The installed read hold will be returned to /// the issuer behind the scenes. -pub struct ReadHold { +pub struct ReadHold { /// Identifies that collection that we have a hold on. id: GlobalId, /// The times at which we hold. - since: Antichain, + since: Antichain, /// For communicating changes to this read hold back to whoever issued it. - change_tx: ChangeTx, + change_tx: ChangeTx, } -impl Debug for ReadHold { +impl Debug for ReadHold { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ReadHold") .field("id", &self.id) @@ -51,19 +54,19 @@ impl Debug for ReadHold { /// Errors for manipulating read holds. #[derive(Error, Debug)] -pub enum ReadHoldDowngradeError { +pub enum ReadHoldDowngradeError { /// The new frontier is not beyond the current since. #[error("since violation: new frontier {frontier:?} is not beyond current since {since:?}")] SinceViolation { /// The frontier to downgrade to. - frontier: Antichain, + frontier: Antichain, /// The since of the collection. - since: Antichain, + since: Antichain, }, } -impl ReadHold { - pub fn new(id: GlobalId, since: Antichain, change_tx: ChangeTx) -> Self { +impl ReadHold { + pub fn new(id: GlobalId, since: Antichain, change_tx: ChangeTx) -> Self { Self { id, since, @@ -73,8 +76,8 @@ impl ReadHold { pub fn with_channel( id: GlobalId, - since: Antichain, - channel_tx: UnboundedSender<(GlobalId, ChangeBatch)>, + since: Antichain, + channel_tx: UnboundedSender<(GlobalId, ChangeBatch)>, ) -> Self { let tx = Arc::new(move |id, changes| channel_tx.send((id, changes))); Self::new(id, since, tx) @@ -89,7 +92,7 @@ impl ReadHold { /// of the collection identified by `id`. This does not mean that the /// overall since of the collection is what we report here. Only that it is /// _at least_ held back to the reported frontier by this read hold. - pub fn since(&self) -> &Antichain { + pub fn since(&self) -> &Antichain { &self.since } @@ -99,7 +102,7 @@ impl ReadHold { /// /// Panics when trying to merge a [ReadHold] for a different collection /// (different [GlobalId]). - pub fn merge_assign(&mut self, mut other: ReadHold) { + pub fn merge_assign(&mut self, mut other: ReadHold) { assert_eq!( self.id, other.id, "can only merge ReadHolds for the same ID" @@ -134,8 +137,8 @@ impl ReadHold { /// holding. pub fn try_downgrade( &mut self, - frontier: Antichain, - ) -> Result<(), ReadHoldDowngradeError> { + frontier: Antichain, + ) -> Result<(), ReadHoldDowngradeError> { if PartialOrder::less_than(&frontier, &self.since) { return Err(ReadHoldDowngradeError::SinceViolation { frontier, @@ -164,7 +167,7 @@ impl ReadHold { } } -impl Clone for ReadHold { +impl Clone for ReadHold { fn clone(&self) -> Self { if self.id.is_user() { tracing::trace!("cloning ReadHold on {}: {:?}", self.id, self.since); @@ -194,7 +197,7 @@ impl Clone for ReadHold { } } -impl Drop for ReadHold { +impl Drop for ReadHold { fn drop(&mut self) { if self.id.is_user() { tracing::trace!("dropping ReadHold on {}: {:?}", self.id, self.since); diff --git a/src/storage-types/src/read_policy.rs b/src/storage-types/src/read_policy.rs index 2f4b6ba5a3142..feeb51fe4a351 100644 --- a/src/storage-types/src/read_policy.rs +++ b/src/storage-types/src/read_policy.rs @@ -11,10 +11,10 @@ use std::sync::Arc; use derivative::Derivative; use itertools::Itertools; -use mz_repr::TimestampManipulation; +use mz_repr::Timestamp; use serde::Serialize; +use timely::progress::Antichain; use timely::progress::frontier::AntichainRef; -use timely::progress::{Antichain, Timestamp}; /// Compaction policies for collections maintained by `Controller`. /// @@ -22,13 +22,13 @@ use timely::progress::{Antichain, Timestamp}; /// because it is fundamental to both storage and compute. #[derive(Clone, Derivative, Serialize)] #[derivative(Debug)] -pub enum ReadPolicy { +pub enum ReadPolicy { /// No-one has yet requested a `ReadPolicy` from us, which means that we can /// still change the implied_capability/the collection since if we need /// to. - NoPolicy { initial_since: Antichain }, + NoPolicy { initial_since: Antichain }, /// Maintain the collection as valid from this frontier onward. - ValidFrom(Antichain), + ValidFrom(Antichain), /// Maintain the collection as valid from a function of the write frontier. /// /// This function will only be re-evaluated when the write frontier changes. @@ -39,51 +39,40 @@ pub enum ReadPolicy { LagWriteFrontier( #[derivative(Debug = "ignore")] #[serde(skip)] - Arc) -> Antichain + Send + Sync>, + Arc) -> Antichain + Send + Sync>, ), /// Allows one to express multiple read policies, taking the least of /// the resulting frontiers. - Multiple(Vec>), + Multiple(Vec), } -impl ReadPolicy -where - T: Timestamp + TimestampManipulation, -{ +impl ReadPolicy { /// Creates a read policy that lags the write frontier "by one". pub fn step_back() -> Self { Self::LagWriteFrontier(Arc::new(move |upper| { if upper.is_empty() { - Antichain::from_elem(Timestamp::minimum()) + Antichain::from_elem(Timestamp::MIN) } else { let stepped_back = upper .to_owned() .into_iter() - .map(|time| { - if time == T::minimum() { - time - } else { - time.step_back().unwrap() - } - }) + .map(|t| t.step_back().unwrap_or(Timestamp::MIN)) .collect_vec(); stepped_back.into() } })) } -} -impl ReadPolicy { /// Creates a read policy that lags the write frontier by the indicated amount, rounded down to (at most) the specified value. /// The rounding down is done to reduce the number of changes the capability undergoes. - pub fn lag_writes_by(lag: mz_repr::Timestamp, max_granularity: mz_repr::Timestamp) -> Self { + pub fn lag_writes_by(lag: Timestamp, max_granularity: Timestamp) -> Self { Self::LagWriteFrontier(Arc::new(move |upper| { if upper.is_empty() { - Antichain::from_elem(Timestamp::minimum()) + Antichain::from_elem(Timestamp::MIN) } else { // Subtract the lag from the time, and then round down to a multiple of `granularity` to cut chatter. let mut time = upper[0]; - if lag != mz_repr::Timestamp::default() { + if lag != Timestamp::default() { time = time.saturating_sub(lag); // It makes little sense to refuse to compact if the user genuinely // sets a smaller compaction window than the default, so honor it here. @@ -94,10 +83,8 @@ impl ReadPolicy { } })) } -} -impl ReadPolicy { - pub fn frontier(&self, write_frontier: AntichainRef) -> Antichain { + pub fn frontier(&self, write_frontier: AntichainRef) -> Antichain { match self { ReadPolicy::NoPolicy { initial_since } => initial_since.clone(), ReadPolicy::ValidFrom(frontier) => frontier.clone(),