From dddd924e9634b40bdffa1347ee1c266f468f0ac9 Mon Sep 17 00:00:00 2001 From: Abeeujah Date: Wed, 13 May 2026 17:56:53 +0100 Subject: [PATCH] Batch same-channel MPP claims into a single commitment update When multiple parts of an MPP payment arrive on the same channel, claim them via the holding cell so they are released as one combined ChannelMonitorUpdate (every PaymentPreimage step plus a single commitment_signed) instead of one round-trip per part. claim_payment_internal now forces the holding-cell path whenever there is more than one source, records each touched channel, and flushes them after queueing. The single-source case keeps the existing inline fast path. FundedChannel::get_update_fulfill_htlc gains a force_holding_cell flag that suppresses the per-claim ChannelMonitorUpdate so the flush can emit one combined update. When the holding cell cannot be freed immediately (awaiting RAA, monitor update in progress, disconnected, quiescent, ...), build_preimage_only_monitor_update writes a one-step preimage update so the preimage stays durable across restarts; the commitment_signed follows when the holding cell is naturally flushed. Update test_single_channel_multiple_mpp, test_simple_mpp, and auto_retry_partial_failure to reflect the new single-round-trip behavior, and drop the threaded reproducer that targeted the old per-claim path. --- lightning/src/ln/chanmon_update_fail_tests.rs | 266 +++++------------- lightning/src/ln/channel.rs | 172 +++++++++-- lightning/src/ln/channelmanager.rs | 233 ++++++++++++--- lightning/src/ln/payment_tests.rs | 47 +--- 4 files changed, 415 insertions(+), 303 deletions(-) diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 9633800db08..af80e3902b6 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -4654,34 +4654,20 @@ fn test_claim_to_closed_channel_blocks_claimed_event() { } #[test] -#[cfg(all(feature = "std", not(target_os = "windows")))] fn test_single_channel_multiple_mpp() { use crate::util::config::UserConfig; - use std::sync::atomic::{AtomicBool, Ordering}; - // Test what happens when we attempt to claim an MPP with many parts that came to us through - // the same channel with a synchronous persistence interface which has very high latency. - // - // Previously, if a `revoke_and_ack` came in while we were still running in - // `ChannelManager::claim_payment` we'd end up hanging waiting to apply a - // `ChannelMonitorUpdate` until after it completed. See the commit which introduced this test - // for more info. + // Test that when an MPP payment has many parts ariving on the same channel, all of them are + // claimed in a single commitment update rather than requiring a round-trip per claim. let chanmon_cfgs = create_chanmon_cfgs(9); let node_cfgs = create_node_cfgs(9, &chanmon_cfgs); let mut config = test_default_channel_config(); - // Set the percentage to the default value at the time this test was written config.channel_handshake_config.announced_channel_max_inbound_htlc_value_in_flight_percentage = 10; let configs: [Option; 9] = core::array::from_fn(|_| Some(config.clone())); let node_chanmgrs = create_node_chanmgrs(9, &node_cfgs, &configs); let mut nodes = create_network(9, &node_cfgs, &node_chanmgrs); - let node_b_id = nodes[1].node.get_our_node_id(); - let node_c_id = nodes[2].node.get_our_node_id(); - let node_d_id = nodes[3].node.get_our_node_id(); - let node_e_id = nodes[4].node.get_our_node_id(); - let node_f_id = nodes[5].node.get_our_node_id(); - let node_g_id = nodes[6].node.get_our_node_id(); let node_h_id = nodes[7].node.get_our_node_id(); let node_i_id = nodes[8].node.get_our_node_id(); @@ -4691,28 +4677,7 @@ fn test_single_channel_multiple_mpp() { // 7 // 8 // - // We can in theory reproduce this issue with fewer channels/HTLCs, but getting this test - // robust is rather challenging. We rely on having the main test thread wait on locks held in - // the background `claim_funds` thread and unlocking when the `claim_funds` thread completes a - // single `ChannelMonitorUpdate`. - // This thread calls `get_and_clear_pending_msg_events()` and `handle_revoke_and_ack()`, both - // of which require `ChannelManager` locks, but we have to make sure this thread gets a chance - // to be blocked on the mutexes before we let the background thread wake `claim_funds` so that - // the mutex can switch to this main thread. - // This relies on our locks being fair, but also on our threads getting runtime during the test - // run, which can be pretty competitive. Thus we do a dumb dance to be as conservative as - // possible - we have a background thread which completes a `ChannelMonitorUpdate` (by sending - // into the `write_blocker` mpsc) but it doesn't run until a mpsc channel sends from this main - // thread to the background thread, and then we let it sleep a while before we send the - // `ChannelMonitorUpdate` unblocker. - // Further, we give ourselves two chances each time, needing 4 HTLCs just to unlock our two - // `ChannelManager` calls. We then need a few remaining HTLCs to actually trigger the bug, so - // we use 6 HTLCs. - // Finaly, we do not run this test on Winblowz because it, somehow, in 2025, does not implement - // actual preemptive multitasking and thinks that cooperative multitasking somehow is - // acceptable in the 21st century, let alone a quarter of the way into it. - const MAX_THREAD_INIT_TIME: std::time::Duration = std::time::Duration::from_secs(1); - + // All six parts converge on the same channel (7->8) create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 100_000, 0); create_announced_chan_between_nodes_with_value(&nodes, 0, 2, 100_000, 0); create_announced_chan_between_nodes_with_value(&nodes, 0, 3, 100_000, 0); @@ -4728,7 +4693,7 @@ fn test_single_channel_multiple_mpp() { create_announced_chan_between_nodes_with_value(&nodes, 6, 7, 100_000, 0); create_announced_chan_between_nodes_with_value(&nodes, 7, 8, 1_000_000, 0); - let (mut route, payment_hash, payment_preimage, payment_secret) = + let (route, payment_hash, payment_preimage, payment_secret) = get_route_and_payment_hash!(&nodes[0], nodes[8], 50_000_000); send_along_route_with_secret( @@ -4747,177 +4712,74 @@ fn test_single_channel_multiple_mpp() { payment_secret, ); - let (do_a_write, blocker) = std::sync::mpsc::sync_channel(0); - *nodes[8].chain_monitor.write_blocker.lock().unwrap() = Some(blocker); - - // Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }. - // We do this by casting a pointer to a `TestChannelManager` to a pointer to a - // `TestChannelManager` with different (in this case 'static) lifetime. - // This is even suggested in the second example at - // https://doc.rust-lang.org/std/mem/fn.transmute.html#examples - let claim_node: &'static TestChannelManager<'static, 'static> = - unsafe { std::mem::transmute(nodes[8].node as &TestChannelManager) }; - let thrd = std::thread::spawn(move || { - // Initiate the claim in a background thread as it will immediately block waiting on the - // `write_blocker` we set above. - claim_node.claim_funds(payment_preimage); - }); - - // First unlock one monitor so that we have a pending - // `update_fulfill_htlc`/`commitment_signed` pair to pass to our counterparty. - do_a_write.send(()).unwrap(); - - let event_node: &'static TestChannelManager<'static, 'static> = - unsafe { std::mem::transmute(nodes[8].node as &TestChannelManager) }; - let thrd_event = std::thread::spawn(move || { - let mut have_event = false; - while !have_event { - let mut events = event_node.get_and_clear_pending_events(); - assert!(events.len() == 1 || events.len() == 0); - if events.len() == 1 { - if let Event::PaymentClaimed { .. } = events[0] { - } else { - panic!("Unexpected event {events:?}"); - } - have_event = true; - } - if !have_event { - std::thread::yield_now(); - } - } - }); - - // Then fetch the `update_fulfill_htlc`/`commitment_signed`. Note that the - // `get_and_clear_pending_msg_events` will immediately hang trying to take a peer lock which - // `claim_funds` is holding. Thus, we release a second write after a small sleep in the - // background to give `claim_funds` a chance to step forward, unblocking - // `get_and_clear_pending_msg_events`. - let do_a_write_background = do_a_write.clone(); - let block_thrd2 = AtomicBool::new(true); - let block_thrd2_read: &'static AtomicBool = unsafe { std::mem::transmute(&block_thrd2) }; - let thrd2 = std::thread::spawn(move || { - while block_thrd2_read.load(Ordering::Acquire) { - std::thread::yield_now(); - } - std::thread::sleep(MAX_THREAD_INIT_TIME); - do_a_write_background.send(()).unwrap(); - std::thread::sleep(MAX_THREAD_INIT_TIME); - do_a_write_background.send(()).unwrap(); - }); - block_thrd2.store(false, Ordering::Release); - let mut first_updates = get_htlc_update_msgs(&nodes[8], &node_h_id); - - // Thread 2 could unblock first, or it could get blocked waiting on us to process a - // `PaymentClaimed` event. Either way, wait until both have finished. - thrd2.join().unwrap(); - thrd_event.join().unwrap(); - - // Disconnect node 6 from all its peers so it doesn't bother to fail the HTLCs back - nodes[7].node.peer_disconnected(node_b_id); - nodes[7].node.peer_disconnected(node_c_id); - nodes[7].node.peer_disconnected(node_d_id); - nodes[7].node.peer_disconnected(node_e_id); - nodes[7].node.peer_disconnected(node_f_id); - nodes[7].node.peer_disconnected(node_g_id); - - let first_update_fulfill = first_updates.update_fulfill_htlcs.remove(0); - nodes[7].node.handle_update_fulfill_htlc(node_i_id, first_update_fulfill); - check_added_monitors(&nodes[7], 1); - expect_payment_forwarded!(nodes[7], nodes[1], nodes[8], Some(1000), false, false); - nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &first_updates.commitment_signed); - check_added_monitors(&nodes[7], 1); - let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_i_id); - - // Now, handle the `revoke_and_ack` from node 5. Note that `claim_funds` is still blocked on - // our peer lock, so we have to release a write to let it process. - // After this call completes, the channel previously would be locked up and should not be able - // to make further progress. - let do_a_write_background = do_a_write.clone(); - let block_thrd3 = AtomicBool::new(true); - let block_thrd3_read: &'static AtomicBool = unsafe { std::mem::transmute(&block_thrd3) }; - let thrd3 = std::thread::spawn(move || { - while block_thrd3_read.load(Ordering::Acquire) { - std::thread::yield_now(); - } - std::thread::sleep(MAX_THREAD_INIT_TIME); - do_a_write_background.send(()).unwrap(); - std::thread::sleep(MAX_THREAD_INIT_TIME); - do_a_write_background.send(()).unwrap(); - }); - block_thrd3.store(false, Ordering::Release); - nodes[8].node.handle_revoke_and_ack(node_h_id, &raa); - thrd3.join().unwrap(); - assert!(!thrd.is_finished()); - - let thrd4 = std::thread::spawn(move || { - do_a_write.send(()).unwrap(); - do_a_write.send(()).unwrap(); - }); - - thrd4.join().unwrap(); - thrd.join().unwrap(); - - // At the end, we should have 7 ChannelMonitorUpdates - 6 for HTLC claims, and one for the - // above `revoke_and_ack`. - check_added_monitors(&nodes[8], 7); - - // Now drive everything to the end, at least as far as node 7 is concerned... - *nodes[8].chain_monitor.write_blocker.lock().unwrap() = None; - nodes[8].node.handle_commitment_signed_batch_test(node_h_id, &cs); + // All six parts are on the same channel, so claiming should produce a single batched + // ChannelMonitorUpdate containing all 6 preimages and one commitment. + nodes[8].node.claim_funds(payment_preimage); + expect_payment_claimed!(nodes[8], payment_hash, 50_000_000); check_added_monitors(&nodes[8], 1); - let (mut updates, raa) = get_updates_and_revoke(&nodes[8], &node_h_id); - - nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0)); - expect_payment_forwarded!(nodes[7], nodes[2], nodes[8], Some(1000), false, false); - nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0)); - expect_payment_forwarded!(nodes[7], nodes[3], nodes[8], Some(1000), false, false); - let mut next_source = 4; - if let Some(update) = updates.update_fulfill_htlcs.get(0) { - nodes[7].node.handle_update_fulfill_htlc(node_i_id, update.clone()); - expect_payment_forwarded!(nodes[7], nodes[4], nodes[8], Some(1000), false, false); - next_source += 1; - } - - nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &updates.commitment_signed); - nodes[7].node.handle_revoke_and_ack(node_i_id, &raa); - if updates.update_fulfill_htlcs.get(0).is_some() { - check_added_monitors(&nodes[7], 5); - } else { - check_added_monitors(&nodes[7], 4); + let mut first_updates = get_htlc_update_msgs(&nodes[8], &node_h_id); + assert_eq!(first_updates.update_fulfill_htlcs.len(), 6); + + // Disconnect node 7 from intermediate nodes so it doesn't bother forwarding back. + nodes[7].node.peer_disconnected(nodes[1].node.get_our_node_id()); + nodes[7].node.peer_disconnected(nodes[2].node.get_our_node_id()); + nodes[7].node.peer_disconnected(nodes[3].node.get_our_node_id()); + nodes[7].node.peer_disconnected(nodes[4].node.get_our_node_id()); + nodes[7].node.peer_disconnected(nodes[5].node.get_our_node_id()); + nodes[7].node.peer_disconnected(nodes[6].node.get_our_node_id()); + + // Deliver all 6 fulfills to node 7 before handling the commitment_signed. + // Each handle_update_fulfill_htlc triggers claim_funds_internal on node 7's upstream + // channels (which are disconnected), generating a preimage monitor update + PaymentForwarded. + for fulfill in first_updates.update_fulfill_htlcs.drain(..) { + nodes[7].node.handle_update_fulfill_htlc(node_i_id, fulfill); + check_added_monitors(&nodes[7], 1); } - - let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_i_id); - - nodes[8].node.handle_revoke_and_ack(node_h_id, &raa); - nodes[8].node.handle_commitment_signed_batch_test(node_h_id, &cs); - check_added_monitors(&nodes[8], 2); - - let (mut updates, raa) = get_updates_and_revoke(&nodes[8], &node_h_id); - - nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0)); - expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false); - next_source += 1; - nodes[7].node.handle_update_fulfill_htlc(node_i_id, updates.update_fulfill_htlcs.remove(0)); - expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false); - next_source += 1; - if let Some(update) = updates.update_fulfill_htlcs.get(0) { - nodes[7].node.handle_update_fulfill_htlc(node_i_id, update.clone()); - expect_payment_forwarded!(nodes[7], nodes[next_source], nodes[8], Some(1000), false, false); + let events = nodes[7].node.get_and_clear_pending_events(); + assert_eq!(events.len(), 6); + let mut seen_prev_node_ids = std::collections::HashSet::new(); + for event in events { + match event { + Event::PaymentForwarded { + prev_htlcs, + next_htlcs, + total_fee_earned_msat, + claim_from_onchain_tx, + .. + } => { + assert_eq!(total_fee_earned_msat, Some(1000)); + assert!(!claim_from_onchain_tx); + assert_eq!(prev_htlcs.len(), 1); + assert_eq!(next_htlcs.len(), 1); + let prev_node_id = prev_htlcs[0].node_id.unwrap(); + let next_node_id = next_htlcs[0].node_id.unwrap(); + assert_eq!(next_node_id, node_i_id); + // Each forward should come from a unique intermediate node (1-6) + assert!( + seen_prev_node_ids.insert(prev_node_id), + "Duplicate prev_node_id in PaymentForwarded events" + ); + }, + _ => panic!("Unexpected event {:?}", event), + } } - - nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &updates.commitment_signed); - nodes[7].node.handle_revoke_and_ack(node_i_id, &raa); - if updates.update_fulfill_htlcs.get(0).is_some() { - check_added_monitors(&nodes[7], 5); - } else { - check_added_monitors(&nodes[7], 4); + // Verify all 6 intermediate nodes were seen + for i in 1..=6 { + assert!( + seen_prev_node_ids.contains(&nodes[i].node.get_our_node_id()), + "Missing PaymentForwarded for node {}", + i + ); } - + nodes[7].node.handle_commitment_signed_batch_test(node_i_id, &first_updates.commitment_signed); + check_added_monitors(&nodes[7], 1); let (raa, cs) = get_revoke_commit_msgs(&nodes[7], &node_i_id); + nodes[8].node.handle_revoke_and_ack(node_h_id, &raa); + check_added_monitors(&nodes[8], 1); nodes[8].node.handle_commitment_signed_batch_test(node_h_id, &cs); - check_added_monitors(&nodes[8], 2); + check_added_monitors(&nodes[8], 1); let raa = get_event_msg!(nodes[8], MessageSendEvent::SendRevokeAndACK, node_h_id); nodes[7].node.handle_revoke_and_ack(node_i_id, &raa); diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 8075699c758..08068466333 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -1160,7 +1160,15 @@ pub(crate) struct CommitmentStats { /// A return value enum for get_update_fulfill_htlc. See UpdateFulfillCommitFetch variants for /// description enum UpdateFulfillFetch { - NewClaim { monitor_update: ChannelMonitorUpdate, htlc_value_msat: u64, update_blocked: bool }, + NewClaim { + /// The `ChannelMonitorUpdate` containing the new payment preimage. `None` when the caller + /// asked us to push the claim into the holding cell (via `force_holding_cell`); in that + /// case the caller is responsible for either flushing the holding cell (which will + /// produce a single combined update) or building a preimage-only update for durability. + monitor_update: Option, + htlc_value_msat: u64, + update_blocked: bool, + }, DuplicateClaim {}, } @@ -1175,6 +1183,14 @@ pub enum UpdateFulfillCommitFetch { /// The value of the HTLC which was claimed, in msat. htlc_value_msat: u64, }, + /// Indicates the HTLC fulfill was queued into the holding cell because the caller asked us + /// to defer it (e.g. so multiple claims on the same channel can be batched into a single + /// commitment update). No `ChannelMonitorUpdate` has been produced; the caller must flush + /// the holding cell to obtain one combined update for all queued claims. + Queued { + /// The value of the HTLC which was claimed, in msat. + htlc_value_msat: u64, + }, /// Indicates the HTLC fulfill is duplicative and already existed either in the holding cell /// or has been forgotten (presumably previously claimed). DuplicateClaim {}, @@ -7573,18 +7589,35 @@ where // (see equivalent if condition there). assert!(!self.context.channel_state.can_generate_new_commitment()); let mon_update_id = self.context.latest_monitor_update_id; // Forget the ChannelMonitor update - let fulfill_resp = - self.get_update_fulfill_htlc(htlc_id_arg, payment_preimage_arg, None, None, logger); + let fulfill_resp = self.get_update_fulfill_htlc( + htlc_id_arg, + payment_preimage_arg, + None, + None, + false, + logger, + ); self.context.latest_monitor_update_id = mon_update_id; if let UpdateFulfillFetch::NewClaim { update_blocked, .. } = fulfill_resp { assert!(update_blocked); // The HTLC must have ended up in the holding cell. } } + /// Claims an inbound HTLC, generating either a `ChannelMonitorUpdate` plus inline state + /// transition (when the channel can immediately produce a commitment update) or pushing the + /// claim into the holding cell. + /// + /// When `force_holding_cell` is `true` the claim is unconditionally pushed into the holding + /// cell and **no** `ChannelMonitorUpdate` is produced. The caller must subsequently flush the + /// holding cell (via [`Self::maybe_free_holding_cell_htlcs`]) to get a single combined + /// `ChannelMonitorUpdate`, or build a preimage-only update via + /// [`Self::build_preimage_only_monitor_update`] if the holding cell cannot be flushed yet. + /// This is used for batching multiple HTLC claims (e.g. MPP parts arriving on the same + /// channel) into one commitment update. fn get_update_fulfill_htlc( &mut self, htlc_id_arg: u64, payment_preimage_arg: PaymentPreimage, payment_info: Option, attribution_data: Option, - logger: &L, + force_holding_cell: bool, logger: &L, ) -> UpdateFulfillFetch { // Either ChannelReady got set (which means it won't be unset) or there is no way any // caller thought we could have something claimed (cause we wouldn't have accepted in an @@ -7639,31 +7672,45 @@ where return UpdateFulfillFetch::DuplicateClaim {}; } + let into_holding_cell = + force_holding_cell || !self.context.channel_state.can_generate_new_commitment(); + // Now update local state: // // We have to put the payment_preimage in the channel_monitor right away here to ensure we // can claim it even if the channel hits the chain before we see their next commitment. - self.context.latest_monitor_update_id += 1; - let monitor_update = ChannelMonitorUpdate { - update_id: self.context.latest_monitor_update_id, - updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { - payment_preimage: payment_preimage_arg.clone(), - payment_info, - }], - channel_id: Some(self.context.channel_id()), + // When the caller asked us to push into the holding cell for batching + // (`force_holding_cell`) we skip producing a `ChannelMonitorUpdate` here; the caller is + // responsible for either flushing the holding cell (producing one combined update with + // every queued preimage and the commitment) or building a preimage-only update for + // durability. + let monitor_update = if force_holding_cell { + None + } else { + self.context.latest_monitor_update_id += 1; + Some(ChannelMonitorUpdate { + update_id: self.context.latest_monitor_update_id, + updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { + payment_preimage: payment_preimage_arg.clone(), + payment_info, + }], + channel_id: Some(self.context.channel_id()), + }) }; - if !self.context.channel_state.can_generate_new_commitment() { + if into_holding_cell { // Note that this condition is the same as the assertion in // `claim_htlc_while_disconnected_dropping_mon_update` and must match exactly - // `claim_htlc_while_disconnected_dropping_mon_update` would not work correctly if we - // do not not get into this branch. + // do not get into this branch. for pending_update in self.context.holding_cell_htlc_updates.iter() { match pending_update { &HTLCUpdateAwaitingACK::ClaimHTLC { htlc_id, .. } => { if htlc_id_arg == htlc_id { // Make sure we don't leave latest_monitor_update_id incremented here: - self.context.latest_monitor_update_id -= 1; + if monitor_update.is_some() { + self.context.latest_monitor_update_id -= 1; + } return UpdateFulfillFetch::DuplicateClaim {}; } }, @@ -7735,7 +7782,7 @@ where pub fn get_update_fulfill_htlc_and_commit( &mut self, htlc_id: u64, payment_preimage: PaymentPreimage, payment_info: Option, attribution_data: Option, - logger: &L, + force_holding_cell: bool, logger: &L, ) -> UpdateFulfillCommitFetch { let release_cs_monitor = self.context.blocked_monitor_updates.is_empty(); match self.get_update_fulfill_htlc( @@ -7743,10 +7790,25 @@ where payment_preimage, payment_info, attribution_data, + force_holding_cell, logger, ) { UpdateFulfillFetch::NewClaim { - mut monitor_update, + monitor_update: None, + htlc_value_msat, + update_blocked, + } => { + // Caller asked us to push the claim into the holding cell so that several claims + // on this channel can be batched into a single commitment update. We have not + // produced a `ChannelMonitorUpdate`; the caller will obtain one by flushing the + // holding cell (or by building a preimage-only update for durability if the + // channel cannot currently generate a commitment). + debug_assert!(force_holding_cell); + debug_assert!(update_blocked); + UpdateFulfillCommitFetch::Queued { htlc_value_msat } + }, + UpdateFulfillFetch::NewClaim { + monitor_update: Some(mut monitor_update), htlc_value_msat, update_blocked, } => { @@ -7794,6 +7856,66 @@ where } } + /// Builds a preimage-only [`ChannelMonitorUpdate`] for a payment whose claims have just been + /// queued into the holding cell via + /// [`Self::get_update_fulfill_htlc_and_commit`] with `force_holding_cell = true`. + /// + /// This is used to ensure preimage durability when the holding cell cannot be flushed + /// immediately (e.g. while awaiting RAA, while a monitor update is in progress, while + /// disconnected, or while quiescent). All MPP parts being claimed share the same preimage, + /// so a single one-step update is sufficient regardless of how many parts were queued. + /// The eventual holding-cell flush will produce a [`ChannelMonitorUpdate`] containing a + /// redundant `PaymentPreimage` step for the same preimage, which is harmless: + /// `ChannelMonitor` is idempotent w.r.t. the preimage, and we do not bother to track + /// `payment_info` again at flush time. + /// + /// Increments `latest_monitor_update_id` by 1 and handles the + /// `blocked_monitor_updates` ID juggling so the new update is consecutive with any held + /// updates. + pub fn build_preimage_only_monitor_update( + &mut self, payment_preimage: PaymentPreimage, payment_info: Option, + logger: &L, + ) -> ChannelMonitorUpdate { + self.context.latest_monitor_update_id += 1; + let mut monitor_update = ChannelMonitorUpdate { + update_id: self.context.latest_monitor_update_id, + updates: vec![ChannelMonitorUpdateStep::PaymentPreimage { + payment_preimage, + payment_info, + }], + channel_id: Some(self.context.channel_id()), + }; + + // If there are blocked monitor updates queued ahead of us, the new preimage update + // has to fly before them, so insert it at the head and bump their IDs (mirrors the + // logic in `get_update_fulfill_htlc_and_commit`). + if !self.context.blocked_monitor_updates.is_empty() { + let new_mon_id = self.context.blocked_monitor_updates[0].update.update_id; + monitor_update.update_id = new_mon_id; + for held_update in self.context.blocked_monitor_updates.iter_mut() { + held_update.update.update_id += 1; + } + } + + // Mark the channel as having a monitor update in progress so that when the update is + // submitted to the chain monitor and persistence completes (synchronously or otherwise), + // `monitor_updating_restored` doesn't trip its `MONITOR_UPDATE_IN_PROGRESS` assertion. + // This mirrors the `monitor_updating_paused` call made in `free_holding_cell_htlcs` + // (the `Some` branch of `maybe_free_holding_cell_htlcs`) and in + // `get_update_fulfill_htlc_and_commit`. + self.monitor_updating_paused( + false, + false, + false, + Vec::new(), + Vec::new(), + Vec::new(), + logger, + ); + + monitor_update + } + /// Returns `Err` (always with [`ChannelError::Ignore`]) if the HTLC could not be failed (e.g. /// if it was already resolved). Otherwise returns `Ok`. pub fn queue_fail_htlc( @@ -8935,14 +9057,18 @@ where *payment_preimage, None, attribution_data.clone(), + false, logger, ); - let mut additional_monitor_update = - if let UpdateFulfillFetch::NewClaim { monitor_update, .. } = fulfill { - monitor_update - } else { - unreachable!() - }; + let mut additional_monitor_update = if let UpdateFulfillFetch::NewClaim { + monitor_update: Some(monitor_update), + .. + } = fulfill + { + monitor_update + } else { + unreachable!() + }; update_fulfill_count += 1; monitor_update.updates.append(&mut additional_monitor_update.updates); None diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index db64cc99a02..6195e494339 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -9568,24 +9568,33 @@ impl< None }; let payment_info = Some(PaymentClaimDetails { mpp_parts, claiming_payment }); + + // Iterate every source and push each claim into its channel's holding cell so that + // multiple MPP parts arriving on the same channel will be released as a single + // combined `ChannelMonitorUpdate` (preimage steps + commitment update) when the + // holding cell is flushed below. We force the holding-cell path only when there is + // more than one source, to preserve the single-claim inline fast path (one combined + // `ChannelMonitorUpdate` produced directly by `get_update_fulfill_htlc_and_commit`). + let force_holding_cell = sources.len() > 1; + // Channels into which we successfully queued a claim and therefore must flush at + // the end. Using a `Vec` (rather than a `HashSet`) preserves insertion order so + // the flush order matches the iteration order, which keeps tests deterministic. + let mut touched_channels: Vec<(PublicKey, ChannelId, OutPoint)> = Vec::new(); for htlc in sources { - let this_mpp_claim = - pending_mpp_claim_ptr_opt.as_ref().map(|pending_mpp_claim| { - let counterparty_id = htlc.mpp_part.prev_hop.counterparty_node_id; - let counterparty_id = counterparty_id - .expect("Prior to upgrading to LDK 0.1, all pending HTLCs forwarded by LDK 0.0.123 or before must be resolved. It appears at least one claimable payment was not resolved. Please downgrade to LDK 0.0.125 and resolve the HTLC by claiming the payment prior to upgrading."); - let claim_ptr = PendingMPPClaimPointer(Arc::clone(pending_mpp_claim)); - (counterparty_id, htlc.mpp_part.prev_hop.channel_id, claim_ptr) - }); + let counterparty_id = htlc.mpp_part.prev_hop.counterparty_node_id.expect("Prior to upgrading to LDK 0.1, all pending HTLCs forwarded by LDK 0.0.123 or before must be resolved. It appears at least one claimable payment was not resolved. Please downgrade to LDK 0.0.125 and resolve the HTLC by claiming the payment prior to upgrading."); + let chan_id = htlc.mpp_part.prev_hop.channel_id; + let funding_txo = htlc.mpp_part.prev_hop.outpoint; + + let this_mpp_claim = pending_mpp_claim_ptr_opt.as_ref().map(|pending_mpp_claim| { + let claim_ptr = PendingMPPClaimPointer(Arc::clone(pending_mpp_claim)); + (counterparty_id, chan_id, claim_ptr) + }); let raa_blocker = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| { RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)), } }); - // Create new attribution data as the final hop. Always report a zero hold time, because reporting a - // non-zero value will not make a difference in the penalty that may be applied by the sender. If there - // is a phantom hop, we need to double-process. let attribution_data = if let Some(phantom_secret) = htlc.mpp_part.prev_hop.phantom_shared_secret { let attribution_data = @@ -9601,11 +9610,12 @@ impl< 0, ); - self.claim_funds_from_hop( + let queued = self.claim_funds_from_hop( &htlc.mpp_part.prev_hop, payment_preimage, payment_info.clone(), Some(attribution_data), + force_holding_cell, |_, definitely_duplicate| { debug_assert!( !definitely_duplicate, @@ -9620,6 +9630,30 @@ impl< ) }, ); + if queued + && !touched_channels + .iter() + .any(|(cp, c, _)| *cp == counterparty_id && *c == chan_id) + { + touched_channels.push((counterparty_id, chan_id, funding_txo)); + } + } + + // Flush each channel into which we queued a claim. For each channel this produces + // one combined `ChannelMonitorUpdate` (every queued `PaymentPreimage` step plus + // the commitment update). If a channel cannot currently generate a commitment + // (awaiting RAA, monitor update in progress, peer disconnected, quiescent, ...) + // we instead build a single preimage-only `ChannelMonitorUpdate` so the preimage + // is durable across restarts; the commitment update will follow when the holding + // cell is naturally flushed. + for (counterparty_id, chan_id, funding_txo) in touched_channels { + self.flush_pending_holding_cell_claim( + counterparty_id, + chan_id, + funding_txo, + payment_preimage, + payment_info.clone(), + ); } } else { for htlc in sources { @@ -9679,6 +9713,7 @@ impl< payment_preimage, None, Some(attribution_data), + false, |htlc_claim_value_msat, definitely_duplicate| { let chan_to_release = EventUnblockedChannel { counterparty_node_id: next_channel_counterparty_node_id, @@ -9768,6 +9803,10 @@ impl< ); } + /// Returns `true` if the claim was queued into the channel's holding cell for batched + /// release by [`Self::flush_pending_holding_cell_claim`] and `false` otherwise (an inline + /// `ChannelMonitorUpdate` was produced, the claim was a duplicate, or the channel was + /// already closed). Only the `true` return value requires a follow-up flush. fn claim_funds_from_hop< ComplFunc: FnOnce( Option, @@ -9776,8 +9815,8 @@ impl< >( &self, prev_hop: &HTLCPreviousHopData, payment_preimage: PaymentPreimage, payment_info: Option, attribution_data: Option, - completion_action: ComplFunc, - ) { + force_holding_cell: bool, completion_action: ComplFunc, + ) -> bool { let counterparty_node_id = prev_hop.counterparty_node_id.or_else(|| { let short_to_chan_info = self.short_to_chan_info.read().unwrap(); short_to_chan_info.get(&prev_hop.prev_outbound_scid_alias).map(|(cp_id, _)| *cp_id) @@ -9802,10 +9841,12 @@ impl< payment_preimage, payment_info, attribution_data, + force_holding_cell, completion_action, ) } + /// See [`Self::claim_funds_from_hop`] for the meaning of the `bool` return value. fn claim_mpp_part< ComplFunc: FnOnce( Option, @@ -9814,8 +9855,8 @@ impl< >( &self, prev_hop: HTLCClaimSource, payment_preimage: PaymentPreimage, payment_info: Option, attribution_data: Option, - completion_action: ComplFunc, - ) { + force_holding_cell: bool, completion_action: ComplFunc, + ) -> bool { //TODO: Delay the claimed_funds relaying just like we do outbound relay! // If we haven't yet run background events assume we're still deserializing and shouldn't @@ -9839,6 +9880,7 @@ impl< .map(|peer_mutex| peer_mutex.lock().unwrap()) .expect(MISSING_MON_ERROR); + let mut queued = false; { let peer_state = &mut *peer_state_lock; if let hash_map::Entry::Occupied(mut chan_entry) = @@ -9851,10 +9893,41 @@ impl< payment_preimage, payment_info, attribution_data, + force_holding_cell, &&logger, ); + queued = matches!(&fulfill_res, UpdateFulfillCommitFetch::Queued { .. }); match fulfill_res { + UpdateFulfillCommitFetch::Queued { htlc_value_msat } => { + // The claim was queued into the holding cell so that this and any + // other concurrent claims on this channel will be released as a + // single combined `ChannelMonitorUpdate` later (see + // `claim_payment_internal`'s post-iteration flush). Just record the + // post-update completion actions and the RAA blocker; the actual + // monitor update will be applied when the holding cell is flushed. + let (action_opt, raa_blocker_opt) = + completion_action(Some(htlc_value_msat), false); + if let Some(action) = action_opt { + log_trace!( + logger, + "Tracking monitor update completion action (queued): {:?}", + action + ); + peer_state + .monitor_update_blocked_actions + .entry(chan_id) + .or_insert(Vec::new()) + .push(action); + } + if let Some(raa_blocker) = raa_blocker_opt { + peer_state + .actions_blocking_raa_monitor_updates + .entry(chan_id) + .or_insert_with(Vec::new) + .push(raa_blocker); + } + }, UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => { let (action_opt, raa_blocker_opt) = completion_action(Some(htlc_value_msat), false); @@ -9917,7 +9990,7 @@ impl< let action = if let Some(action) = action_opt { action } else { - return; + return false; }; // If there are monitor updates in flight, we may be in the case @@ -9933,7 +10006,7 @@ impl< .entry(chan_id) .or_insert_with(Vec::new) .push(action); - return; + return false; } mem::drop(peer_state_lock); @@ -9982,12 +10055,12 @@ impl< } else { debug_assert!(false, "Duplicate claims should always either be for forwarded payments(freeing another channel immediately) or during init (for claim replay)"); - return; + return false; }; }, } } - return; + return queued; } } @@ -10065,6 +10138,86 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ mem::drop(per_peer_state); self.handle_monitor_update_completion_actions(actions); } + false + } + + /// Flushes a channel's holding cell after one or more HTLC claims for the same payment have + /// been queued via [`FundedChannel::get_update_fulfill_htlc_and_commit`] with + /// `force_holding_cell = true`. + /// + /// When the holding cell can be freed this produces a single `ChannelMonitorUpdate` + /// containing every queued `PaymentPreimage` step plus the commitment update — so an + /// N-part MPP arriving on the same channel is fulfilled with exactly one + /// `commitment_signed` round-trip. When it cannot (awaiting RAA, monitor update in + /// progress, peer disconnected, quiescent, ...) we instead build a single preimage-only + /// `ChannelMonitorUpdate` so the preimage is durable across restarts; the + /// `commitment_signed` will be sent later when the holding cell is naturally flushed. + /// + /// Channel-closed and channel-not-found cases are no-ops here: closed channels never go + /// through the holding-cell path (no claim was ever queued by the iteration in + /// `claim_payment_internal`). + fn flush_pending_holding_cell_claim( + &self, counterparty_node_id: PublicKey, chan_id: ChannelId, funding_txo: OutPoint, + payment_preimage: PaymentPreimage, payment_info: Option, + ) { + let per_peer_state = self.per_peer_state.read().unwrap(); + let mut peer_state_lock = match per_peer_state.get(&counterparty_node_id) { + Some(peer_mutex) => peer_mutex.lock().unwrap(), + None => return, + }; + let peer_state = &mut *peer_state_lock; + let chan = match peer_state + .channel_by_id + .get_mut(&chan_id) + .and_then(|chan| chan.as_funded_mut()) + { + Some(chan) => chan, + None => return, + }; + let logger = WithChannelContext::from(&self.logger, &chan.context, None); + + let (monitor_opt, holding_cell_failed_htlcs) = + chan.maybe_free_holding_cell_htlcs(&self.fee_estimator, &&logger); + + let monitor_update = if let Some(mut monitor_update) = monitor_opt { + // Inject payment_info into the first PaymentPreimage step that matches our preimage + // (the holding cell may contain unrelated claims for other preimages from prior + // queueing). + for step in monitor_update.updates.iter_mut() { + if let ChannelMonitorUpdateStep::PaymentPreimage { + payment_preimage: ref step_preimage, + payment_info: ref mut info, + } = step + { + if *step_preimage == payment_preimage { + *info = payment_info; + break; + } + } + } + monitor_update + } else { + // Deferred flush: build a preimage-only `ChannelMonitorUpdate` so the preimage is + // durable now. All MPP parts share the same preimage so a single one-step update + // covers any number of queued claims for this payment. + chan.build_preimage_only_monitor_update(payment_preimage, payment_info, &&logger) + }; + + let post_update_data = self.handle_new_monitor_update( + &mut peer_state.in_flight_monitor_updates, + &mut peer_state.monitor_update_blocked_actions, + &mut peer_state.pending_msg_events, + peer_state.is_connected, + chan, + funding_txo, + monitor_update, + ); + mem::drop(peer_state_lock); + mem::drop(per_peer_state); + self.fail_holding_cell_htlcs(holding_cell_failed_htlcs, chan_id, &counterparty_node_id); + if let Some(data) = post_update_data { + self.handle_post_monitor_update_chan_resume(data); + } } fn finalize_claims(&self, sources: Vec<(HTLCSource, Option)>) { @@ -20429,6 +20582,7 @@ impl< payment_preimage, None, None, + false, |_, _| { ( Some(MonitorUpdateCompletionAction::PaymentClaimed { @@ -20877,39 +21031,26 @@ mod tests { assert_eq!(events.len(), 1); pass_along_path(&nodes[0], &[&nodes[1]], 200_000, our_payment_hash, Some(payment_secret), events.drain(..).next().unwrap(), true, None); - // Claim the full MPP payment. Note that we can't use a test utility like - // claim_funds_along_route because the ordering of the messages causes the second half of the - // payment to be put in the holding cell, which confuses the test utilities. So we exchange the - // lightning messages manually. + // Claim the full MPP payment. Both parts are on the same channel, so they should be + // batched into a single commitment update. nodes[1].node.claim_funds(payment_preimage); expect_payment_claimed!(nodes[1], our_payment_hash, 200_000); - check_added_monitors(&nodes[1], 2); + check_added_monitors(&nodes[1], 1); - let mut bs_1st_updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id()); - nodes[0].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), bs_1st_updates.update_fulfill_htlcs.remove(0)); + let mut bs_updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id()); + assert_eq!(bs_updates.update_fulfill_htlcs.len(), 2); + nodes[0].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), bs_updates.update_fulfill_htlcs.remove(0)); expect_payment_sent(&nodes[0], payment_preimage, None, false, false); - nodes[0].node.handle_commitment_signed_batch_test(nodes[1].node.get_our_node_id(), &bs_1st_updates.commitment_signed); - check_added_monitors(&nodes[0], 1); - let (as_first_raa, as_first_cs) = get_revoke_commit_msgs(&nodes[0], &nodes[1].node.get_our_node_id()); - nodes[1].node.handle_revoke_and_ack(nodes[0].node.get_our_node_id(), &as_first_raa); - check_added_monitors(&nodes[1], 1); - let mut bs_2nd_updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id()); - nodes[1].node.handle_commitment_signed_batch_test(nodes[0].node.get_our_node_id(), &as_first_cs); - check_added_monitors(&nodes[1], 1); - let bs_first_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id()); - nodes[0].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), bs_2nd_updates.update_fulfill_htlcs.remove(0)); - nodes[0].node.handle_commitment_signed_batch_test(nodes[1].node.get_our_node_id(), &bs_2nd_updates.commitment_signed); - check_added_monitors(&nodes[0], 1); - let as_second_raa = get_event_msg!(nodes[0], MessageSendEvent::SendRevokeAndACK, nodes[1].node.get_our_node_id()); - nodes[0].node.handle_revoke_and_ack(nodes[1].node.get_our_node_id(), &bs_first_raa); - let as_second_updates = get_htlc_update_msgs(&nodes[0], &nodes[1].node.get_our_node_id()); + nodes[0].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), bs_updates.update_fulfill_htlcs.remove(0)); + nodes[0].node.handle_commitment_signed_batch_test(nodes[1].node.get_our_node_id(), &bs_updates.commitment_signed); check_added_monitors(&nodes[0], 1); - nodes[1].node.handle_revoke_and_ack(nodes[0].node.get_our_node_id(), &as_second_raa); + let (as_raa, as_cs) = get_revoke_commit_msgs(&nodes[0], &nodes[1].node.get_our_node_id()); + nodes[1].node.handle_revoke_and_ack(nodes[0].node.get_our_node_id(), &as_raa); check_added_monitors(&nodes[1], 1); - nodes[1].node.handle_commitment_signed_batch_test(nodes[0].node.get_our_node_id(), &as_second_updates.commitment_signed); + nodes[1].node.handle_commitment_signed_batch_test(nodes[0].node.get_our_node_id(), &as_cs); check_added_monitors(&nodes[1], 1); - let bs_third_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id()); - nodes[0].node.handle_revoke_and_ack(nodes[1].node.get_our_node_id(), &bs_third_raa); + let bs_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, nodes[0].node.get_our_node_id()); + nodes[0].node.handle_revoke_and_ack(nodes[1].node.get_our_node_id(), &bs_raa); check_added_monitors(&nodes[0], 1); // Note that successful MPP payments will generate a single PaymentSent event upon the first diff --git a/lightning/src/ln/payment_tests.rs b/lightning/src/ln/payment_tests.rs index e80fcea33aa..d5f72e179da 100644 --- a/lightning/src/ln/payment_tests.rs +++ b/lightning/src/ln/payment_tests.rs @@ -3109,55 +3109,38 @@ fn auto_retry_partial_failure() { expect_htlc_failure_conditions(nodes[1].node.get_and_clear_pending_events(), &[]); nodes[1].node.process_pending_htlc_forwards(); expect_payment_claimable!(nodes[1], payment_hash, payment_secret, amt_msat); + // All 3 parts arrived on the same channel (chan_1), so claim_funds batches them + // into a single commitment update with all 3 update_fulfill_htlcmessages. nodes[1].node.claim_funds(payment_preimage); expect_payment_claimed!(nodes[1], payment_hash, amt_msat); + check_added_monitors(&nodes[1], 1); let mut bs_claim = get_htlc_update_msgs(&nodes[1], &node_a_id); - assert_eq!(bs_claim.update_fulfill_htlcs.len(), 1); + assert_eq!(bs_claim.update_fulfill_htlcs.len(), 3); nodes[0].node.handle_update_fulfill_htlc(node_b_id, bs_claim.update_fulfill_htlcs.remove(0)); expect_payment_sent(&nodes[0], payment_preimage, None, false, false); + nodes[0].node.handle_update_fulfill_htlc(node_b_id, bs_claim.update_fulfill_htlcs.remove(0)); + nodes[0].node.handle_update_fulfill_htlc(node_b_id, bs_claim.update_fulfill_htlcs.remove(0)); nodes[0].node.handle_commitment_signed_batch_test(node_b_id, &bs_claim.commitment_signed); check_added_monitors(&nodes[0], 1); let (as_third_raa, as_third_cs) = get_revoke_commit_msgs(&nodes[0], &node_b_id); nodes[1].node.handle_revoke_and_ack(node_a_id, &as_third_raa); - check_added_monitors(&nodes[1], 4); - let mut bs_2nd_claim = get_htlc_update_msgs(&nodes[1], &node_a_id); - - nodes[1].node.handle_commitment_signed_batch_test(node_a_id, &as_third_cs); - check_added_monitors(&nodes[1], 1); - let bs_third_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, node_a_id); - - nodes[0].node.handle_revoke_and_ack(node_b_id, &bs_third_raa); - check_added_monitors(&nodes[0], 1); - expect_payment_path_successful!(nodes[0]); - - let bs_second_fulfill_a = bs_2nd_claim.update_fulfill_htlcs.remove(0); - let bs_second_fulfill_b = bs_2nd_claim.update_fulfill_htlcs.remove(0); - nodes[0].node.handle_update_fulfill_htlc(node_b_id, bs_second_fulfill_a); - nodes[0].node.handle_update_fulfill_htlc(node_b_id, bs_second_fulfill_b); - nodes[0].node.handle_commitment_signed_batch_test(node_b_id, &bs_2nd_claim.commitment_signed); - check_added_monitors(&nodes[0], 1); - let (as_fourth_raa, as_fourth_cs) = get_revoke_commit_msgs(&nodes[0], &node_b_id); - - nodes[1].node.handle_revoke_and_ack(node_a_id, &as_fourth_raa); check_added_monitors(&nodes[1], 1); - nodes[1].node.handle_commitment_signed_batch_test(node_a_id, &as_fourth_cs); + nodes[1].node.handle_commitment_signed_batch_test(node_a_id, &as_third_cs); check_added_monitors(&nodes[1], 1); - let bs_second_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, node_a_id); + let bs_raa = get_event_msg!(nodes[1], MessageSendEvent::SendRevokeAndACK, node_a_id); - nodes[0].node.handle_revoke_and_ack(node_b_id, &bs_second_raa); + nodes[0].node.handle_revoke_and_ack(node_b_id, &bs_raa); check_added_monitors(&nodes[0], 1); let events = nodes[0].node.get_and_clear_pending_events(); - assert_eq!(events.len(), 2); - if let Event::PaymentPathSuccessful { .. } = events[0] { - } else { - panic!(); - } - if let Event::PaymentPathSuccessful { .. } = events[1] { - } else { - panic!(); + assert_eq!(events.len(), 3); + for event in &events { + if let Event::PaymentPathSuccessful { .. } = event { + } else { + panic!("Unexpected event {:?}", event); + } } }