Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl Instance {
client: ReplicaClient,
config: ReplicaConfig,
epoch: u64,
) {
) -> Result<(), read_holds::ReadHoldIssuerHungUp> {
let log_ids: BTreeSet<_> = config.logging.index_logs.values().copied().collect();

let metrics = self.metrics.for_replica(id);
Expand All @@ -359,6 +359,7 @@ impl Instance {
);

// Add per-replica collection state.
let mut shutdown_input = None;
for (collection_id, collection) in &self.collections {
// Skip log collections not maintained by this replica,
// and collections targeted at a different replica.
Expand All @@ -378,11 +379,42 @@ impl Instance {
collection.read_frontier().to_owned()
};

let input_read_holds = collection.storage_dependencies.values().cloned().collect();
// Cloning a `ReadHold` fails when its issuer has hung up. For these holds the issuer
// is the `StorageCollections`, which doesn't hang up as long as the `Instance` exists,
// except during process shutdown, when the tokio runtime drops tasks in arbitrary
// order. In that case there is no way of correctly initializing the per-replica
// collection state, so we give up. We still add the replica itself, to keep the
// bookkeeping consistent with the controller's, and then signal the unrecoverable
// error to the caller, which shuts the instance down.
let mut input_read_holds = Vec::with_capacity(collection.storage_dependencies.len());
let mut hung_up = Vec::new();
for hold in collection.storage_dependencies.values() {
match hold.try_clone() {
Ok(hold) => input_read_holds.push(hold),
Err(read_holds::ReadHoldIssuerHungUp(input_id)) => hung_up.push(input_id),
}
}
if !hung_up.is_empty() {
tracing::error!(
replica_id = %id,
%collection_id,
?hung_up,
"giving up on adding replica collections: storage read hold issuers hung \
up, the process is shutting down",
);
shutdown_input = hung_up.into_iter().next();
break;
}

replica.add_collection(*collection_id, as_of, input_read_holds);
}

self.replicas.insert(id, replica);

match shutdown_input {
Some(input_id) => Err(read_holds::ReadHoldIssuerHungUp(input_id)),
None => Ok(()),
}
}

/// Enqueue the given response for delivery to the controller clients.
Expand Down Expand Up @@ -1043,6 +1075,18 @@ impl Instance {
);
}

/// Terminate the [`Instance::run`] loop, causing the instance task to shut down.
///
/// Unlike [`Instance::shutdown`], this does not assert that the instance has no replicas
/// left. We use it to react to unrecoverable errors that can only occur during process
/// shutdown, such as a storage read hold issuer hanging up while we rehydrate a replica.
fn initiate_shutdown(&mut self) {
// Replacing `command_rx` with a fresh, sender-less channel makes the next `recv` in
// [`Instance::run`] return `None`, terminating the loop.
let (_tx, rx) = mpsc::unbounded_channel();
self.command_rx = rx;
}

/// Sends a command to replicas of this instance.
#[mz_ore::instrument(level = "debug")]
fn send(&mut self, cmd: ComputeCommand) {
Expand Down Expand Up @@ -1148,7 +1192,14 @@ impl Instance {
}

// Add replica to tracked state.
self.add_replica_state(id, client, config, epoch);
if self.add_replica_state(id, client, config, epoch).is_err() {
// A storage read hold issuer hung up, which only happens during process shutdown.
// There is no way to correctly bring up the replica anymore, so we shut the instance
// down instead of running on with half-initialized replica state. `add_replica_state`
// has already logged the details and inserted the replica to keep our bookkeeping
// consistent with the controller's.
self.initiate_shutdown();
}

Ok(())
}
Expand Down
61 changes: 51 additions & 10 deletions src/storage-types/src/read_holds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ impl Debug for ReadHold {
}
}

/// The issuer of a [`ReadHold`] has hung up, so the hold can no longer be cloned.
///
/// This is only expected during process shutdown, when the tokio runtime drops tasks in
/// arbitrary order and the issuer task can disappear while holds still exist.
#[derive(Error, Debug)]
#[error("read hold issuer for collection {0} has hung up")]
pub struct ReadHoldIssuerHungUp(pub GlobalId);

/// Errors for manipulating read holds.
#[derive(Error, Debug)]
pub enum ReadHoldDowngradeError {
Expand Down Expand Up @@ -165,10 +173,16 @@ impl ReadHold {
self.try_downgrade(Antichain::new())
.expect("known to succeed");
}
}

impl Clone for ReadHold {
fn clone(&self) -> Self {
/// Clones this [ReadHold], returning an `Err` when the issuer of the read
/// hold has hung up, in which case the clone would not actually hold back
/// the since of the collection.
///
/// The issuer hanging up is only expected during process shutdown, when
/// the tokio runtime drops tasks in arbitrary order. Callers that may run
/// concurrently with shutdown can use this method to handle that case
/// gracefully, instead of panicking via [Clone::clone].
pub fn try_clone(&self) -> Result<Self, ReadHoldIssuerHungUp> {
if self.id.is_user() {
tracing::trace!("cloning ReadHold on {}: {:?}", self.id, self.since);
}
Expand All @@ -181,18 +195,23 @@ impl Clone for ReadHold {
if !changes.is_empty() {
// We do care about sending here. If the other end hung up we don't
// really have a read hold anymore.
match (self.change_tx)(self.id.clone(), changes) {
Ok(_) => (),
Err(e) => {
panic!("cannot clone ReadHold: {}", e);
}
}
(self.change_tx)(self.id.clone(), changes)
.map_err(|_| ReadHoldIssuerHungUp(self.id))?;
}

Self {
Ok(Self {
id: self.id.clone(),
since: self.since.clone(),
change_tx: Arc::clone(&self.change_tx),
})
}
}

impl Clone for ReadHold {
fn clone(&self) -> Self {
match self.try_clone() {
Ok(clone) => clone,
Err(e) => panic!("cannot clone ReadHold: {}", e),
}
}
}
Expand All @@ -206,3 +225,25 @@ impl Drop for ReadHold {
self.release();
}
}

#[cfg(test)]
mod tests {
use tokio::sync::mpsc;

use super::*;

#[mz_ore::test]
fn try_clone_after_issuer_hung_up() {
let (tx, rx) = mpsc::unbounded_channel();
let hold =
ReadHold::with_channel(GlobalId::User(1), Antichain::from_elem(Timestamp::MIN), tx);

let clone = hold.try_clone().expect("issuer is alive");
drop(clone);

// Once the issuer has hung up, cloning must fail instead of producing
// a hold that doesn't actually hold back the since.
drop(rx);
assert!(hold.try_clone().is_err());
}
}
Loading