Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/developer/platform/compute-layer-q-n-a.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ This figure illustrates the compute networking stack, as well as the flow of com

### What is the set of commands that the COMPUTE layer responds to and what do these commands do at a high level?

The set of available commands can be seen in `ComputeCommand<T>` at [`src/compute-client/src/command.rs`](/src/compute-client/src/command.rs). At a high level, there are commands to: (a) initialize a COMPUTE replica, (b) communicate to a replica that all command reconciliation during initialization has been completed, (c) create dataflows, (d) allow compactions of COMPUTE-managed collections to take place, (e) peek at arrangements, (f) cancel peeks. The types of responses to the commands are listed in `ComputeResponse` at [`src/compute-client/src/response.rs`](/src/compute-client/src/response.rs).
The set of available commands can be seen in `ComputeCommand` at [`src/compute-client/src/command.rs`](/src/compute-client/src/command.rs). At a high level, there are commands to: (a) initialize a COMPUTE replica, (b) communicate to a replica that all command reconciliation during initialization has been completed, (c) create dataflows, (d) allow compactions of COMPUTE-managed collections to take place, (e) peek at arrangements, (f) cancel peeks. The types of responses to the commands are listed in `ComputeResponse` at [`src/compute-client/src/response.rs`](/src/compute-client/src/response.rs).

### What is the lifecycle of networking threads in a `computed` process? Do they farm out work to other threads?

Expand Down
2 changes: 1 addition & 1 deletion src/adapter-types/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl CompactionWindow {
}
}

impl From<CompactionWindow> for ReadPolicy<Timestamp> {
impl From<CompactionWindow> for ReadPolicy {
fn from(value: CompactionWindow) -> Self {
let time = match value {
CompactionWindow::Default => DEFAULT_LOGICAL_COMPACTION_WINDOW_TS,
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3493,7 +3493,7 @@ impl Coordinator {
/// This method expects all storage collections and dataflow plans to be available, so it must
/// run after [`Coordinator::bootstrap_storage_collections`] and
/// [`Coordinator::bootstrap_dataflow_plans`].
async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
async fn bootstrap_dataflow_as_ofs(&mut self) -> BTreeMap<GlobalId, ReadHold> {
let mut catalog_ids = Vec::new();
let mut dataflows = Vec::new();
let mut read_policies = BTreeMap::new();
Expand Down
12 changes: 2 additions & 10 deletions src/adapter/src/coord/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,7 @@ pub enum PeekResponseUnary {

#[derive(Clone, Debug)]
pub struct PeekDataflowPlan {
pub(crate) desc: DataflowDescription<
mz_compute_types::plan::Plan<mz_repr::Timestamp>,
(),
mz_repr::Timestamp,
>,
pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan, ()>,
pub(crate) id: GlobalId,
key: Vec<MirScalarExpr>,
permutation: Vec<usize>,
Expand All @@ -103,11 +99,7 @@ pub struct PeekDataflowPlan {

impl PeekDataflowPlan {
pub fn new(
desc: DataflowDescription<
mz_compute_types::plan::Plan<mz_repr::Timestamp>,
(),
mz_repr::Timestamp,
>,
desc: DataflowDescription<mz_compute_types::plan::Plan, ()>,
id: GlobalId,
typ: &SqlRelationType,
) -> Self {
Expand Down
13 changes: 5 additions & 8 deletions src/adapter/src/coord/read_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use crate::util::ResultExt;
/// relinquishes the associated read capabilities.
#[derive(Debug, Default, Clone)]
pub struct ReadHolds {
pub storage_holds: BTreeMap<GlobalId, ReadHold<Timestamp>>,
pub compute_holds: BTreeMap<(ComputeInstanceId, GlobalId), ReadHold<Timestamp>>,
pub storage_holds: BTreeMap<GlobalId, ReadHold>,
pub compute_holds: BTreeMap<(ComputeInstanceId, GlobalId), ReadHold>,
}

impl ReadHolds {
Expand Down Expand Up @@ -308,10 +308,7 @@ impl crate::coord::Coordinator {
}
}

pub(crate) fn update_storage_read_policies(
&self,
policies: Vec<(CatalogItemId, ReadPolicy<Timestamp>)>,
) {
pub(crate) fn update_storage_read_policies(&self, policies: Vec<(CatalogItemId, ReadPolicy)>) {
let policies = policies
.into_iter()
.map(|(item_id, policy)| {
Expand All @@ -330,7 +327,7 @@ impl crate::coord::Coordinator {

pub(crate) fn update_compute_read_policies(
&self,
mut policies: Vec<(ComputeInstanceId, CatalogItemId, ReadPolicy<Timestamp>)>,
mut policies: Vec<(ComputeInstanceId, CatalogItemId, ReadPolicy)>,
) {
policies.sort_by_key(|&(cluster_id, _, _)| cluster_id);
for (cluster_id, group) in &policies
Expand All @@ -357,7 +354,7 @@ impl crate::coord::Coordinator {
&self,
compute_instance: ComputeInstanceId,
item_id: CatalogItemId,
base_policy: ReadPolicy<Timestamp>,
base_policy: ReadPolicy,
) {
self.update_compute_read_policies(vec![(compute_instance, item_id, base_policy)])
}
Expand Down
14 changes: 7 additions & 7 deletions src/compute-client/src/as_of_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ use tracing::{info, warn};
/// with the compute controller.
pub fn run(
dataflows: &mut [DataflowDescription<Plan, ()>],
read_policies: &BTreeMap<GlobalId, ReadPolicy<Timestamp>>,
read_policies: &BTreeMap<GlobalId, ReadPolicy>,
storage_collections: &dyn StorageCollections,
current_time: Timestamp,
read_only_mode: bool,
) -> BTreeMap<GlobalId, ReadHold<Timestamp>> {
) -> BTreeMap<GlobalId, ReadHold> {
// Get read holds for the storage inputs of the dataflows.
// This ensures that storage frontiers don't advance past the selected as-ofs.
let mut storage_read_holds = BTreeMap::new();
Expand Down Expand Up @@ -321,7 +321,7 @@ impl Constraint<'_> {
struct Collection<'a> {
storage_inputs: Vec<GlobalId>,
compute_inputs: Vec<GlobalId>,
read_policy: Option<&'a ReadPolicy<Timestamp>>,
read_policy: Option<&'a ReadPolicy>,
/// The currently known as-of bounds.
///
/// Shared between collections exported by the same dataflow.
Expand All @@ -342,7 +342,7 @@ impl<'a> Context<'a> {
fn new(
dataflows: &[DataflowDescription<Plan, ()>],
storage_collections: &'a dyn StorageCollections,
read_policies: &'a BTreeMap<GlobalId, ReadPolicy<Timestamp>>,
read_policies: &'a BTreeMap<GlobalId, ReadPolicy>,
current_time: Timestamp,
) -> Self {
// Construct initial collection state for each dataflow export. Dataflows might have their
Expand Down Expand Up @@ -427,7 +427,7 @@ impl<'a> Context<'a> {
/// not be able to hydrate successfully.
fn apply_upstream_storage_constraints(
&self,
storage_read_holds: &BTreeMap<GlobalId, ReadHold<Timestamp>>,
storage_read_holds: &BTreeMap<GlobalId, ReadHold>,
) {
// Apply direct constraints from storage inputs.
for (id, collection) in &self.collections {
Expand Down Expand Up @@ -1027,14 +1027,14 @@ mod tests {
unimplemented!()
}

fn set_read_policies(&self, _policies: Vec<(GlobalId, ReadPolicy<Timestamp>)>) {
fn set_read_policies(&self, _policies: Vec<(GlobalId, ReadPolicy)>) {
unimplemented!()
}

fn acquire_read_holds(
&self,
desired_holds: Vec<GlobalId>,
) -> Result<Vec<ReadHold<Timestamp>>, CollectionMissing> {
) -> Result<Vec<ReadHold>, CollectionMissing> {
let mut holds = Vec::with_capacity(desired_holds.len());
for id in desired_holds {
let (read, _write) = self.0.get(&id).ok_or(CollectionMissing(id))?;
Expand Down
9 changes: 3 additions & 6 deletions src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ impl ComputeController {
pub fn set_read_policy(
&self,
instance_id: ComputeInstanceId,
policies: Vec<(GlobalId, ReadPolicy<Timestamp>)>,
policies: Vec<(GlobalId, ReadPolicy)>,
) -> Result<(), ReadPolicyError> {
use ReadPolicyError::*;

Expand All @@ -968,7 +968,7 @@ impl ComputeController {
&self,
instance_id: ComputeInstanceId,
collection_id: GlobalId,
) -> Result<ReadHold<Timestamp>, CollectionUpdateError> {
) -> Result<ReadHold, CollectionUpdateError> {
let read_hold = self
.instance(instance_id)?
.acquire_read_hold(collection_id)?;
Expand Down Expand Up @@ -1114,10 +1114,7 @@ impl InstanceState {
}

/// Acquires a [`ReadHold`] for the identified compute collection.
pub fn acquire_read_hold(
&self,
id: GlobalId,
) -> Result<ReadHold<Timestamp>, CollectionMissing> {
pub fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold, CollectionMissing> {
// We acquire read holds at the earliest possible time rather than returning a copy
// of the implied read hold. This is so that in `create_dataflow` we can acquire read holds
// on compute dependencies at frontiers that are held back by other read holds the caller
Expand Down
47 changes: 22 additions & 25 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ pub(super) struct Instance {
///
/// Copies of this sender are given to [`ReadHold`]s that are created in
/// [`CollectionState::new`].
read_hold_tx: read_holds::ChangeTx<Timestamp>,
read_hold_tx: read_holds::ChangeTx,
/// A sender for responses from replicas.
replica_tx: mz_ore::channel::InstrumentedUnboundedSender<ReplicaResponse, IntCounter>,
/// A receiver for responses from replicas.
Expand Down Expand Up @@ -277,9 +277,9 @@ impl Instance {
id: GlobalId,
as_of: Antichain<Timestamp>,
shared: SharedCollectionState,
storage_dependencies: BTreeMap<GlobalId, ReadHold<Timestamp>>,
compute_dependencies: BTreeMap<GlobalId, ReadHold<Timestamp>>,
replica_input_read_holds: Vec<ReadHold<Timestamp>>,
storage_dependencies: BTreeMap<GlobalId, ReadHold>,
compute_dependencies: BTreeMap<GlobalId, ReadHold>,
replica_input_read_holds: Vec<ReadHold>,
write_only: bool,
storage_sink: bool,
initial_as_of: Option<Antichain<Timestamp>>,
Expand Down Expand Up @@ -885,7 +885,7 @@ impl Instance {
dyncfg: Arc<ConfigSet>,
command_rx: mpsc::UnboundedReceiver<Command>,
response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
read_hold_tx: read_holds::ChangeTx<Timestamp>,
read_hold_tx: read_holds::ChangeTx,
introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
read_only: bool,
) -> Self {
Expand Down Expand Up @@ -1269,7 +1269,7 @@ impl Instance {
pub fn create_dataflow(
&mut self,
dataflow: DataflowDescription<mz_compute_types::plan::Plan, ()>,
import_read_holds: Vec<ReadHold<Timestamp>>,
import_read_holds: Vec<ReadHold>,
mut shared_collection_state: BTreeMap<GlobalId, SharedCollectionState>,
target_replica: Option<ReplicaId>,
) -> Result<(), DataflowCreationError> {
Expand Down Expand Up @@ -1628,7 +1628,7 @@ impl Instance {
result_desc: RelationDesc,
finishing: RowSetFinishing,
map_filter_project: mz_expr::SafeMfpPlan,
mut read_hold: ReadHold<Timestamp>,
mut read_hold: ReadHold,
target_replica: Option<ReplicaId>,
peek_response_tx: oneshot::Sender<PeekResponse>,
) -> Result<(), PeekError> {
Expand Down Expand Up @@ -1724,7 +1724,7 @@ impl Instance {
#[mz_ore::instrument(level = "debug")]
pub fn set_read_policy(
&mut self,
policies: Vec<(GlobalId, ReadPolicy<Timestamp>)>,
policies: Vec<(GlobalId, ReadPolicy)>,
) -> Result<(), ReadPolicyError> {
// Do error checking upfront, to avoid introducing inconsistencies between a collection's
// `implied_capability` and `read_capabilities`.
Expand Down Expand Up @@ -2327,10 +2327,7 @@ impl Instance {
///
/// This mirrors the logic used by the controller-side `InstanceState::acquire_read_hold`,
/// but executes on the instance task itself.
pub(super) fn acquire_read_hold(
&self,
id: GlobalId,
) -> Result<ReadHold<Timestamp>, CollectionMissing> {
pub(super) fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold, CollectionMissing> {
// Similarly to InstanceState::acquire_read_hold and StorageCollections::acquire_read_holds,
// we acquire read holds at the earliest possible time rather than returning a copy
// of the implied read hold. This is so that dependents can acquire read holds on
Expand Down Expand Up @@ -2400,28 +2397,28 @@ struct CollectionState {
/// `read_policy`. It also ensures that read holds on the collection's dependencies are kept at
/// some time not greater than the collection's `write_frontier`, guaranteeing that the
/// collection's next outputs can always be computed without skipping times.
implied_read_hold: ReadHold<Timestamp>,
implied_read_hold: ReadHold,
/// A read hold held to enable dataflow warmup.
///
/// Dataflow warmup is an optimization that allows dataflows to immediately start hydrating
/// even when their next output time (as implied by the `write_frontier`) is in the future.
/// By installing a read capability derived from the write frontiers of the collection's
/// inputs, we ensure that the as-of of new dataflows installed for the collection is at a time
/// that is immediately available, so hydration can begin immediately too.
warmup_read_hold: ReadHold<Timestamp>,
warmup_read_hold: ReadHold,
/// The policy to use to downgrade `self.implied_read_hold`.
///
/// If `None`, the collection is a write-only collection (i.e. a sink). For write-only
/// collections, the `implied_read_hold` is only required for maintaining read holds on the
/// inputs, so we can immediately downgrade it to the `write_frontier`.
read_policy: Option<ReadPolicy<Timestamp>>,
read_policy: Option<ReadPolicy>,

/// Storage identifiers on which this collection depends, and read holds this collection
/// requires on them.
storage_dependencies: BTreeMap<GlobalId, ReadHold<Timestamp>>,
storage_dependencies: BTreeMap<GlobalId, ReadHold>,
/// Compute identifiers on which this collection depends, and read holds this collection
/// requires on them.
compute_dependencies: BTreeMap<GlobalId, ReadHold<Timestamp>>,
compute_dependencies: BTreeMap<GlobalId, ReadHold>,

/// Introspection state associated with this collection.
introspection: CollectionIntrospection,
Expand Down Expand Up @@ -2450,9 +2447,9 @@ impl CollectionState {
collection_id: GlobalId,
as_of: Antichain<Timestamp>,
shared: SharedCollectionState,
storage_dependencies: BTreeMap<GlobalId, ReadHold<Timestamp>>,
compute_dependencies: BTreeMap<GlobalId, ReadHold<Timestamp>>,
read_hold_tx: read_holds::ChangeTx<Timestamp>,
storage_dependencies: BTreeMap<GlobalId, ReadHold>,
compute_dependencies: BTreeMap<GlobalId, ReadHold>,
read_hold_tx: read_holds::ChangeTx,
introspection: CollectionIntrospection,
) -> Self {
// A collection is not readable before the `as_of`.
Expand Down Expand Up @@ -2505,7 +2502,7 @@ impl CollectionState {
fn new_log_collection(
id: GlobalId,
shared: SharedCollectionState,
read_hold_tx: read_holds::ChangeTx<Timestamp>,
read_hold_tx: read_holds::ChangeTx,
introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
) -> Self {
let since = Antichain::from_elem(Timestamp::MIN);
Expand Down Expand Up @@ -2943,7 +2940,7 @@ struct PendingPeek {
/// Used to track peek durations.
requested_at: Instant,
/// The read hold installed to serve this peek.
read_hold: ReadHold<Timestamp>,
read_hold: ReadHold,
/// The channel to send peek results.
peek_response_tx: oneshot::Sender<PeekResponse>,
/// An optional limit of the peek's result size.
Expand Down Expand Up @@ -3014,7 +3011,7 @@ impl ReplicaState {
&mut self,
id: GlobalId,
as_of: Antichain<Timestamp>,
input_read_holds: Vec<ReadHold<Timestamp>>,
input_read_holds: Vec<ReadHold>,
) {
let metrics = self.metrics.for_collection(id);
let introspection = ReplicaCollectionIntrospection::new(
Expand Down Expand Up @@ -3114,7 +3111,7 @@ struct ReplicaCollectionState {
/// These read holds are kept to ensure that the replica is able to read from storage inputs at
/// all times it hasn't read yet. We only need to install read holds for storage inputs since
/// compaction of compute inputs is implicitly held back by Timely/DD.
input_read_holds: Vec<ReadHold<Timestamp>>,
input_read_holds: Vec<ReadHold>,

/// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
///
Expand All @@ -3127,7 +3124,7 @@ impl ReplicaCollectionState {
metrics: Option<ReplicaCollectionMetrics>,
as_of: Antichain<Timestamp>,
introspection: ReplicaCollectionIntrospection,
input_read_holds: Vec<ReadHold<Timestamp>>,
input_read_holds: Vec<ReadHold>,
) -> Self {
Self {
write_frontier: as_of.clone(),
Expand Down
11 changes: 5 additions & 6 deletions src/compute-client/src/controller/instance_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ pub struct InstanceClient {
command_tx: mpsc::UnboundedSender<Command>,
/// A sender for read hold changes for collections installed on the instance.
#[derivative(Debug = "ignore")]
read_hold_tx: read_holds::ChangeTx<Timestamp>,
read_hold_tx: read_holds::ChangeTx,
}

impl InstanceClient {
pub(super) fn read_hold_tx(&self) -> read_holds::ChangeTx<Timestamp> {
pub(super) fn read_hold_tx(&self) -> read_holds::ChangeTx {
Arc::clone(&self.read_hold_tx)
}

Expand Down Expand Up @@ -163,7 +163,7 @@ impl InstanceClient {
) -> Self {
let (command_tx, command_rx) = mpsc::unbounded_channel();

let read_hold_tx: read_holds::ChangeTx<_> = {
let read_hold_tx: read_holds::ChangeTx = {
let command_tx = command_tx.clone();
Arc::new(move |id, change: ChangeBatch<_>| {
let cmd: Command = {
Expand Down Expand Up @@ -205,8 +205,7 @@ impl InstanceClient {
pub async fn acquire_read_holds_and_collection_write_frontiers(
&self,
ids: Vec<GlobalId>,
) -> Result<Vec<(GlobalId, ReadHold<Timestamp>, Antichain<Timestamp>)>, AcquireReadHoldsError>
{
) -> Result<Vec<(GlobalId, ReadHold, Antichain<Timestamp>)>, AcquireReadHoldsError> {
self.call_sync(move |i| {
let mut result = Vec::new();
for id in ids.into_iter() {
Expand All @@ -233,7 +232,7 @@ impl InstanceClient {
result_desc: RelationDesc,
finishing: RowSetFinishing,
map_filter_project: mz_expr::SafeMfpPlan,
target_read_hold: ReadHold<Timestamp>,
target_read_hold: ReadHold,
target_replica: Option<ReplicaId>,
peek_response_tx: oneshot::Sender<PeekResponse>,
) -> Result<(), PeekError> {
Expand Down
Loading
Loading