From 96b6cedff36e7bc4963a8e21071b0414316862e8 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Fri, 12 Jun 2026 12:07:32 +0000 Subject: [PATCH 1/2] compute: fix "cannot clone ReadHold" panic during shutdown During process shutdown the tokio runtime drops tasks in arbitrary order, so the `StorageCollections` background task can disappear while an `Instance` task still processes a queued `maintain()`. Rehydrating a failed replica then panics in `ReadHold::clone` when registering the clone with the hung-up issuer (seen in sqllogictest, which tears down a server per file). Add a fallible `ReadHold::try_clone` and use it in `add_replica_state`: on a hung-up issuer, stop populating per-replica collection state but still insert the replica to keep bookkeeping consistent with the controller. `Clone` keeps panicking for all other callers, since a clone that fails to register is not a real read hold. Fixes database-issues#9964 Co-Authored-By: Claude Fable 5 --- src/compute-client/src/controller/instance.rs | 29 ++++++++++- src/storage-types/src/read_holds.rs | 52 +++++++++++++++---- 2 files changed, 70 insertions(+), 11 deletions(-) diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index d0bc31d8c2618..146ce3bccc551 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -378,7 +378,34 @@ impl Instance { collection.read_frontier().to_owned() }; - let input_read_holds = collection.storage_dependencies.values().cloned().collect(); + // Cloning a `ReadHold` fails when its issuer has hung up. For these holds the issuer + // is the `StorageCollections`, which doesn't hang up as long as the `Instance` exists, + // except during process shutdown, when the tokio runtime drops tasks in arbitrary + // order. In that case there is no way of correctly initializing the per-replica + // collection state, so we stop trying. We still add the replica itself, to keep the + // bookkeeping consistent with the controller's, but it will never hydrate. + let mut input_read_holds = Vec::with_capacity(collection.storage_dependencies.len()); + let mut hung_up = None; + for hold in collection.storage_dependencies.values() { + match hold.try_clone() { + Ok(hold) => input_read_holds.push(hold), + Err(_) => { + hung_up = Some(hold.id()); + break; + } + } + } + if let Some(input_id) = hung_up { + tracing::info!( + replica_id = %id, + %collection_id, + %input_id, + "giving up on adding replica collections: storage read hold issuer hung \ + up, the process is shutting down", + ); + break; + } + replica.add_collection(*collection_id, as_of, input_read_holds); } diff --git a/src/storage-types/src/read_holds.rs b/src/storage-types/src/read_holds.rs index afd2a09b33c7e..31131f90fe145 100644 --- a/src/storage-types/src/read_holds.rs +++ b/src/storage-types/src/read_holds.rs @@ -165,10 +165,16 @@ impl ReadHold { self.try_downgrade(Antichain::new()) .expect("known to succeed"); } -} -impl Clone for ReadHold { - fn clone(&self) -> Self { + /// Clones this [ReadHold], returning an `Err` when the issuer of the read + /// hold has hung up, in which case the clone would not actually hold back + /// the since of the collection. + /// + /// The issuer hanging up is only expected during process shutdown, when + /// the tokio runtime drops tasks in arbitrary order. Callers that may run + /// concurrently with shutdown can use this method to handle that case + /// gracefully, instead of panicking via [Clone::clone]. + pub fn try_clone(&self) -> Result)>> { if self.id.is_user() { tracing::trace!("cloning ReadHold on {}: {:?}", self.id, self.since); } @@ -181,18 +187,22 @@ impl Clone for ReadHold { if !changes.is_empty() { // We do care about sending here. If the other end hung up we don't // really have a read hold anymore. - match (self.change_tx)(self.id.clone(), changes) { - Ok(_) => (), - Err(e) => { - panic!("cannot clone ReadHold: {}", e); - } - } + (self.change_tx)(self.id.clone(), changes)?; } - Self { + Ok(Self { id: self.id.clone(), since: self.since.clone(), change_tx: Arc::clone(&self.change_tx), + }) + } +} + +impl Clone for ReadHold { + fn clone(&self) -> Self { + match self.try_clone() { + Ok(clone) => clone, + Err(e) => panic!("cannot clone ReadHold: {}", e), } } } @@ -206,3 +216,25 @@ impl Drop for ReadHold { self.release(); } } + +#[cfg(test)] +mod tests { + use tokio::sync::mpsc; + + use super::*; + + #[mz_ore::test] + fn try_clone_after_issuer_hung_up() { + let (tx, rx) = mpsc::unbounded_channel(); + let hold = + ReadHold::with_channel(GlobalId::User(1), Antichain::from_elem(Timestamp::MIN), tx); + + let clone = hold.try_clone().expect("issuer is alive"); + drop(clone); + + // Once the issuer has hung up, cloning must fail instead of producing + // a hold that doesn't actually hold back the since. + drop(rx); + assert!(hold.try_clone().is_err()); + } +} From 811e29f342cbe823c28c5a73e4e912f99551c0e4 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 12 Jun 2026 20:11:50 +0200 Subject: [PATCH 2/2] compute: address review on ReadHold try_clone shutdown fix Return a concrete `ReadHoldIssuerHungUp` from `ReadHold::try_clone` instead of leaking `SendError`. Collect all hung-up input ids and log them at error level. Propagate the error from `add_replica_state` so `add_replica` terminates the instance task, instead of running on with half-initialized replica state. Co-Authored-By: Claude Opus 4.8 --- src/compute-client/src/controller/instance.rs | 50 ++++++++++++++----- src/storage-types/src/read_holds.rs | 13 ++++- 2 files changed, 48 insertions(+), 15 deletions(-) diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index 146ce3bccc551..4b546eb1d528f 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -345,7 +345,7 @@ impl Instance { client: ReplicaClient, config: ReplicaConfig, epoch: u64, - ) { + ) -> Result<(), read_holds::ReadHoldIssuerHungUp> { let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect(); let metrics = self.metrics.for_replica(id); @@ -359,6 +359,7 @@ impl Instance { ); // Add per-replica collection state. + let mut shutdown_input = None; for (collection_id, collection) in &self.collections { // Skip log collections not maintained by this replica, // and collections targeted at a different replica. @@ -382,27 +383,26 @@ impl Instance { // is the `StorageCollections`, which doesn't hang up as long as the `Instance` exists, // except during process shutdown, when the tokio runtime drops tasks in arbitrary // order. In that case there is no way of correctly initializing the per-replica - // collection state, so we stop trying. We still add the replica itself, to keep the - // bookkeeping consistent with the controller's, but it will never hydrate. + // collection state, so we give up. We still add the replica itself, to keep the + // bookkeeping consistent with the controller's, and then signal the unrecoverable + // error to the caller, which shuts the instance down. let mut input_read_holds = Vec::with_capacity(collection.storage_dependencies.len()); - let mut hung_up = None; + let mut hung_up = Vec::new(); for hold in collection.storage_dependencies.values() { match hold.try_clone() { Ok(hold) => input_read_holds.push(hold), - Err(_) => { - hung_up = Some(hold.id()); - break; - } + Err(read_holds::ReadHoldIssuerHungUp(input_id)) => hung_up.push(input_id), } } - if let Some(input_id) = hung_up { - tracing::info!( + if !hung_up.is_empty() { + tracing::error!( replica_id = %id, %collection_id, - %input_id, - "giving up on adding replica collections: storage read hold issuer hung \ + ?hung_up, + "giving up on adding replica collections: storage read hold issuers hung \ up, the process is shutting down", ); + shutdown_input = hung_up.into_iter().next(); break; } @@ -410,6 +410,11 @@ impl Instance { } self.replicas.insert(id, replica); + + match shutdown_input { + Some(input_id) => Err(read_holds::ReadHoldIssuerHungUp(input_id)), + None => Ok(()), + } } /// Enqueue the given response for delivery to the controller clients. @@ -1070,6 +1075,18 @@ impl Instance { ); } + /// Terminate the [`Instance::run`] loop, causing the instance task to shut down. + /// + /// Unlike [`Instance::shutdown`], this does not assert that the instance has no replicas + /// left. We use it to react to unrecoverable errors that can only occur during process + /// shutdown, such as a storage read hold issuer hanging up while we rehydrate a replica. + fn initiate_shutdown(&mut self) { + // Replacing `command_rx` with a fresh, sender-less channel makes the next `recv` in + // [`Instance::run`] return `None`, terminating the loop. + let (_tx, rx) = mpsc::unbounded_channel(); + self.command_rx = rx; + } + /// Sends a command to replicas of this instance. #[mz_ore::instrument(level = "debug")] fn send(&mut self, cmd: ComputeCommand) { @@ -1175,7 +1192,14 @@ impl Instance { } // Add replica to tracked state. - self.add_replica_state(id, client, config, epoch); + if self.add_replica_state(id, client, config, epoch).is_err() { + // A storage read hold issuer hung up, which only happens during process shutdown. + // There is no way to correctly bring up the replica anymore, so we shut the instance + // down instead of running on with half-initialized replica state. `add_replica_state` + // has already logged the details and inserted the replica to keep our bookkeeping + // consistent with the controller's. + self.initiate_shutdown(); + } Ok(()) } diff --git a/src/storage-types/src/read_holds.rs b/src/storage-types/src/read_holds.rs index 31131f90fe145..c65dcdbf25796 100644 --- a/src/storage-types/src/read_holds.rs +++ b/src/storage-types/src/read_holds.rs @@ -52,6 +52,14 @@ impl Debug for ReadHold { } } +/// The issuer of a [`ReadHold`] has hung up, so the hold can no longer be cloned. +/// +/// This is only expected during process shutdown, when the tokio runtime drops tasks in +/// arbitrary order and the issuer task can disappear while holds still exist. +#[derive(Error, Debug)] +#[error("read hold issuer for collection {0} has hung up")] +pub struct ReadHoldIssuerHungUp(pub GlobalId); + /// Errors for manipulating read holds. #[derive(Error, Debug)] pub enum ReadHoldDowngradeError { @@ -174,7 +182,7 @@ impl ReadHold { /// the tokio runtime drops tasks in arbitrary order. Callers that may run /// concurrently with shutdown can use this method to handle that case /// gracefully, instead of panicking via [Clone::clone]. - pub fn try_clone(&self) -> Result)>> { + pub fn try_clone(&self) -> Result { if self.id.is_user() { tracing::trace!("cloning ReadHold on {}: {:?}", self.id, self.since); } @@ -187,7 +195,8 @@ impl ReadHold { if !changes.is_empty() { // We do care about sending here. If the other end hung up we don't // really have a read hold anymore. - (self.change_tx)(self.id.clone(), changes)?; + (self.change_tx)(self.id.clone(), changes) + .map_err(|_| ReadHoldIssuerHungUp(self.id))?; } Ok(Self {