diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index d0bc31d8c2618..4b546eb1d528f 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -345,7 +345,7 @@ impl Instance { client: ReplicaClient, config: ReplicaConfig, epoch: u64, - ) { + ) -> Result<(), read_holds::ReadHoldIssuerHungUp> { let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect(); let metrics = self.metrics.for_replica(id); @@ -359,6 +359,7 @@ impl Instance { ); // Add per-replica collection state. + let mut shutdown_input = None; for (collection_id, collection) in &self.collections { // Skip log collections not maintained by this replica, // and collections targeted at a different replica. @@ -378,11 +379,42 @@ impl Instance { collection.read_frontier().to_owned() }; - let input_read_holds = collection.storage_dependencies.values().cloned().collect(); + // Cloning a `ReadHold` fails when its issuer has hung up. For these holds the issuer + // is the `StorageCollections`, which doesn't hang up as long as the `Instance` exists, + // except during process shutdown, when the tokio runtime drops tasks in arbitrary + // order. In that case there is no way of correctly initializing the per-replica + // collection state, so we give up. We still add the replica itself, to keep the + // bookkeeping consistent with the controller's, and then signal the unrecoverable + // error to the caller, which shuts the instance down. + let mut input_read_holds = Vec::with_capacity(collection.storage_dependencies.len()); + let mut hung_up = Vec::new(); + for hold in collection.storage_dependencies.values() { + match hold.try_clone() { + Ok(hold) => input_read_holds.push(hold), + Err(read_holds::ReadHoldIssuerHungUp(input_id)) => hung_up.push(input_id), + } + } + if !hung_up.is_empty() { + tracing::error!( + replica_id = %id, + %collection_id, + ?hung_up, + "giving up on adding replica collections: storage read hold issuers hung \ + up, the process is shutting down", + ); + shutdown_input = hung_up.into_iter().next(); + break; + } + replica.add_collection(*collection_id, as_of, input_read_holds); } self.replicas.insert(id, replica); + + match shutdown_input { + Some(input_id) => Err(read_holds::ReadHoldIssuerHungUp(input_id)), + None => Ok(()), + } } /// Enqueue the given response for delivery to the controller clients. @@ -1043,6 +1075,18 @@ impl Instance { ); } + /// Terminate the [`Instance::run`] loop, causing the instance task to shut down. + /// + /// Unlike [`Instance::shutdown`], this does not assert that the instance has no replicas + /// left. We use it to react to unrecoverable errors that can only occur during process + /// shutdown, such as a storage read hold issuer hanging up while we rehydrate a replica. + fn initiate_shutdown(&mut self) { + // Replacing `command_rx` with a fresh, sender-less channel makes the next `recv` in + // [`Instance::run`] return `None`, terminating the loop. + let (_tx, rx) = mpsc::unbounded_channel(); + self.command_rx = rx; + } + /// Sends a command to replicas of this instance. #[mz_ore::instrument(level = "debug")] fn send(&mut self, cmd: ComputeCommand) { @@ -1148,7 +1192,14 @@ impl Instance { } // Add replica to tracked state. - self.add_replica_state(id, client, config, epoch); + if self.add_replica_state(id, client, config, epoch).is_err() { + // A storage read hold issuer hung up, which only happens during process shutdown. + // There is no way to correctly bring up the replica anymore, so we shut the instance + // down instead of running on with half-initialized replica state. `add_replica_state` + // has already logged the details and inserted the replica to keep our bookkeeping + // consistent with the controller's. + self.initiate_shutdown(); + } Ok(()) } diff --git a/src/storage-types/src/read_holds.rs b/src/storage-types/src/read_holds.rs index afd2a09b33c7e..c65dcdbf25796 100644 --- a/src/storage-types/src/read_holds.rs +++ b/src/storage-types/src/read_holds.rs @@ -52,6 +52,14 @@ impl Debug for ReadHold { } } +/// The issuer of a [`ReadHold`] has hung up, so the hold can no longer be cloned. +/// +/// This is only expected during process shutdown, when the tokio runtime drops tasks in +/// arbitrary order and the issuer task can disappear while holds still exist. +#[derive(Error, Debug)] +#[error("read hold issuer for collection {0} has hung up")] +pub struct ReadHoldIssuerHungUp(pub GlobalId); + /// Errors for manipulating read holds. #[derive(Error, Debug)] pub enum ReadHoldDowngradeError { @@ -165,10 +173,16 @@ impl ReadHold { self.try_downgrade(Antichain::new()) .expect("known to succeed"); } -} -impl Clone for ReadHold { - fn clone(&self) -> Self { + /// Clones this [ReadHold], returning an `Err` when the issuer of the read + /// hold has hung up, in which case the clone would not actually hold back + /// the since of the collection. + /// + /// The issuer hanging up is only expected during process shutdown, when + /// the tokio runtime drops tasks in arbitrary order. Callers that may run + /// concurrently with shutdown can use this method to handle that case + /// gracefully, instead of panicking via [Clone::clone]. + pub fn try_clone(&self) -> Result { if self.id.is_user() { tracing::trace!("cloning ReadHold on {}: {:?}", self.id, self.since); } @@ -181,18 +195,23 @@ impl Clone for ReadHold { if !changes.is_empty() { // We do care about sending here. If the other end hung up we don't // really have a read hold anymore. - match (self.change_tx)(self.id.clone(), changes) { - Ok(_) => (), - Err(e) => { - panic!("cannot clone ReadHold: {}", e); - } - } + (self.change_tx)(self.id.clone(), changes) + .map_err(|_| ReadHoldIssuerHungUp(self.id))?; } - Self { + Ok(Self { id: self.id.clone(), since: self.since.clone(), change_tx: Arc::clone(&self.change_tx), + }) + } +} + +impl Clone for ReadHold { + fn clone(&self) -> Self { + match self.try_clone() { + Ok(clone) => clone, + Err(e) => panic!("cannot clone ReadHold: {}", e), } } } @@ -206,3 +225,25 @@ impl Drop for ReadHold { self.release(); } } + +#[cfg(test)] +mod tests { + use tokio::sync::mpsc; + + use super::*; + + #[mz_ore::test] + fn try_clone_after_issuer_hung_up() { + let (tx, rx) = mpsc::unbounded_channel(); + let hold = + ReadHold::with_channel(GlobalId::User(1), Antichain::from_elem(Timestamp::MIN), tx); + + let clone = hold.try_clone().expect("issuer is alive"); + drop(clone); + + // Once the issuer has hung up, cloning must fail instead of producing + // a hold that doesn't actually hold back the since. + drop(rx); + assert!(hold.try_clone().is_err()); + } +}