Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/api-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ See [Pagination](#pagination) below for how to page through results.

| RPC | Description |
|-------------------|-------------------------------------------------------------|
| `SubscribeEvents` | **Server-streaming.** Subscribe to real-time payment events |
| `SubscribeEvents` | **Server-streaming.** Subscribe to real-time payment and channel events |

`SubscribeEvents` returns a stream of `EventEnvelope` messages. Each envelope contains one of:

Expand All @@ -199,6 +199,7 @@ See [Pagination](#pagination) below for how to page through results.
| `PaymentFailed` | An outbound payment failed |
| `PaymentClaimable` | A hodl invoice payment arrived and is waiting to be claimed or failed |
| `PaymentForwarded` | A payment was routed through this node |
| `ChannelStateChanged` | A channel changed state (pending, ready, open failed, closed) |

Events are broadcast to all connected subscribers. The server uses a bounded broadcast channel
(capacity 1024). A slow subscriber that falls behind will miss events.
Expand Down
335 changes: 332 additions & 3 deletions e2e-tests/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use std::time::Duration;

use e2e_tests::{
find_available_port, mine_and_sync, run_cli, run_cli_raw, setup_funded_channel,
wait_for_onchain_balance, LdkServerConfig, LdkServerHandle, TestBitcoind,
wait_for_onchain_balance, wait_for_usable_channel, LdkServerConfig, LdkServerHandle,
TestBitcoind,
};
use hex_conservative::{DisplayHex, FromHex};
use ldk_node::bitcoin::hashes::{sha256, Hash};
Expand All @@ -21,10 +22,12 @@ use ldk_node::lightning::offers::offer::Offer;
use ldk_node::lightning_invoice::Bolt11Invoice;
use ldk_server_client::client::EventStream;
use ldk_server_client::ldk_server_grpc::api::{
Bolt11ReceiveRequest, Bolt12ReceiveRequest, OnchainReceiveRequest,
Bolt11ReceiveRequest, Bolt12ReceiveRequest, OnchainReceiveRequest, OpenChannelRequest,
};
use ldk_server_client::ldk_server_grpc::events::event_envelope::Event;
use ldk_server_client::ldk_server_grpc::events::EventEnvelope;
use ldk_server_client::ldk_server_grpc::events::{
ChannelClosureInitiator, ChannelState, ChannelStateChangeReasonKind, EventEnvelope,
};
use ldk_server_client::ldk_server_grpc::types::{
bolt11_invoice_description, Bolt11InvoiceDescription,
};
Expand Down Expand Up @@ -410,6 +413,332 @@ async fn test_cli_open_channel() {
assert!(!output["user_channel_id"].as_str().unwrap().is_empty());
}

#[tokio::test]
async fn test_subscribe_events_channel_state_lifecycle_pending_ready_closed() {
Comment thread
frnandu marked this conversation as resolved.
let bitcoind = TestBitcoind::new();
let server_a = LdkServerHandle::start(&bitcoind).await;
let server_b = LdkServerHandle::start(&bitcoind).await;

let addr_a = server_a.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
let addr_b = server_b.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
bitcoind.fund_address(&addr_a, 1.0);
bitcoind.fund_address(&addr_b, 0.1);
mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;
wait_for_onchain_balance(server_a.client(), Duration::from_secs(30)).await;
wait_for_onchain_balance(server_b.client(), Duration::from_secs(30)).await;

let mut events_a = server_a.client().subscribe_events().await.unwrap();
let mut events_b = server_b.client().subscribe_events().await.unwrap();

let open_resp = server_a
.client()
.open_channel(OpenChannelRequest {
node_pubkey: server_b.node_id().to_string(),
address: format!("127.0.0.1:{}", server_b.p2p_port),
channel_amount_sats: 100_000,
push_to_counterparty_msat: None,
channel_config: None,
announce_channel: true,
disable_counterparty_reserve: false,
})
.await
.unwrap();

let pending_a = wait_for_event(&mut events_a, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.user_channel_id == open_resp.user_channel_id
&& channel_event.state == ChannelState::Pending as i32
)
})
.await;
let pending_a = match pending_a.event {
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
other => panic!("expected ChannelStateChanged event, got {other:?}"),
};
assert_eq!(pending_a.user_channel_id, open_resp.user_channel_id);
assert_eq!(pending_a.counterparty_node_id.as_deref(), Some(server_b.node_id()));
assert!(pending_a.funding_txo.is_some());
assert!(pending_a.reason.is_none());
assert_eq!(pending_a.closure_initiator, ChannelClosureInitiator::Unspecified as i32);

let pending_b = wait_for_event(&mut events_b, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.channel_id == pending_a.channel_id
&& channel_event.state == ChannelState::Pending as i32
)
})
.await;
let pending_b = match pending_b.event {
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
other => panic!("expected ChannelStateChanged event, got {other:?}"),
};
assert_eq!(pending_b.channel_id, pending_a.channel_id);
assert_eq!(pending_b.counterparty_node_id.as_deref(), Some(server_a.node_id()));
assert!(pending_b.funding_txo.is_some());
assert!(pending_b.reason.is_none());
assert_eq!(pending_b.closure_initiator, ChannelClosureInitiator::Unspecified as i32);

mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;
wait_for_usable_channel(server_a.client(), &bitcoind, Duration::from_secs(60)).await;

let ready_a = wait_for_event(&mut events_a, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.channel_id == pending_a.channel_id
&& channel_event.state == ChannelState::Ready as i32
)
})
.await;
let ready_a = match ready_a.event {
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
other => panic!("expected ChannelStateChanged event, got {other:?}"),
};
assert_eq!(ready_a.channel_id, pending_a.channel_id);
assert_eq!(ready_a.user_channel_id, open_resp.user_channel_id);
assert_eq!(ready_a.counterparty_node_id.as_deref(), Some(server_b.node_id()));
assert!(ready_a.funding_txo.is_some());
assert!(ready_a.reason.is_none());
assert_eq!(ready_a.closure_initiator, ChannelClosureInitiator::Unspecified as i32);

let ready_b = wait_for_event(&mut events_b, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.channel_id == pending_a.channel_id
&& channel_event.state == ChannelState::Ready as i32
)
})
.await;
let ready_b = match ready_b.event {
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
other => panic!("expected ChannelStateChanged event, got {other:?}"),
};
assert_eq!(ready_b.channel_id, pending_a.channel_id);
assert_eq!(ready_b.counterparty_node_id.as_deref(), Some(server_a.node_id()));
assert!(ready_b.funding_txo.is_some());
assert!(ready_b.reason.is_none());
assert_eq!(ready_b.closure_initiator, ChannelClosureInitiator::Unspecified as i32);

run_cli(&server_a, &["close-channel", &open_resp.user_channel_id, server_b.node_id()]);
mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;

let closed_a = wait_for_event(&mut events_a, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.channel_id == pending_a.channel_id
&& channel_event.state == ChannelState::Closed as i32
)
})
.await;
let closed_a = match closed_a.event {
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
other => panic!("expected ChannelStateChanged event, got {other:?}"),
};
assert_eq!(closed_a.user_channel_id, open_resp.user_channel_id);
assert_eq!(closed_a.state, ChannelState::Closed as i32);
assert_eq!(closed_a.counterparty_node_id.as_deref(), Some(server_b.node_id()));
assert!(closed_a.funding_txo.is_none());
let reason_a = closed_a.reason.expect("closed event must include closure reason");
assert!(matches!(
ChannelStateChangeReasonKind::from_i32(reason_a.kind),
Some(ChannelStateChangeReasonKind::LocallyInitiatedCooperativeClosure)
| Some(ChannelStateChangeReasonKind::LegacyCooperativeClosure)
));
assert_eq!(closed_a.closure_initiator, ChannelClosureInitiator::Local as i32);

let closed_b = wait_for_event(&mut events_b, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.channel_id == pending_a.channel_id
&& channel_event.state == ChannelState::Closed as i32
)
})
.await;
let closed_b = match closed_b.event {
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
other => panic!("expected ChannelStateChanged event, got {other:?}"),
};
assert_eq!(closed_b.channel_id, pending_a.channel_id);
assert_eq!(closed_b.counterparty_node_id.as_deref(), Some(server_a.node_id()));
assert!(closed_b.funding_txo.is_none());
let reason_b = closed_b.reason.expect("closed event must include closure reason");
assert!(matches!(
ChannelStateChangeReasonKind::from_i32(reason_b.kind),
Some(ChannelStateChangeReasonKind::CounterpartyInitiatedCooperativeClosure)
| Some(ChannelStateChangeReasonKind::LegacyCooperativeClosure)
));
assert_eq!(closed_b.closure_initiator, ChannelClosureInitiator::Remote as i32);
}

#[tokio::test]
async fn test_subscribe_events_channel_state_lifecycle_pending_ready_force_closed() {
let bitcoind = TestBitcoind::new();
let server_a = LdkServerHandle::start(&bitcoind).await;
let server_b = LdkServerHandle::start(&bitcoind).await;

let addr_a = server_a.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
let addr_b = server_b.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
bitcoind.fund_address(&addr_a, 1.0);
bitcoind.fund_address(&addr_b, 0.1);
mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;
wait_for_onchain_balance(server_a.client(), Duration::from_secs(30)).await;
wait_for_onchain_balance(server_b.client(), Duration::from_secs(30)).await;

let mut events_a = server_a.client().subscribe_events().await.unwrap();
let mut events_b = server_b.client().subscribe_events().await.unwrap();

let open_resp = server_a
.client()
.open_channel(OpenChannelRequest {
node_pubkey: server_b.node_id().to_string(),
address: format!("127.0.0.1:{}", server_b.p2p_port),
channel_amount_sats: 100_000,
push_to_counterparty_msat: None,
channel_config: None,
announce_channel: true,
disable_counterparty_reserve: false,
})
.await
.unwrap();

let pending_a = wait_for_event(&mut events_a, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.user_channel_id == open_resp.user_channel_id
&& channel_event.state == ChannelState::Pending as i32
)
})
.await;
let pending_a = match pending_a.event {
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
other => panic!("expected ChannelStateChanged event, got {other:?}"),
};
assert_eq!(pending_a.user_channel_id, open_resp.user_channel_id);
assert_eq!(pending_a.counterparty_node_id.as_deref(), Some(server_b.node_id()));
assert!(pending_a.funding_txo.is_some());
assert!(pending_a.reason.is_none());
assert_eq!(pending_a.closure_initiator, ChannelClosureInitiator::Unspecified as i32);

let pending_b = wait_for_event(&mut events_b, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.channel_id == pending_a.channel_id
&& channel_event.state == ChannelState::Pending as i32
)
})
.await;
let pending_b = match pending_b.event {
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
other => panic!("expected ChannelStateChanged event, got {other:?}"),
};
assert_eq!(pending_b.channel_id, pending_a.channel_id);
assert_eq!(pending_b.counterparty_node_id.as_deref(), Some(server_a.node_id()));
assert!(pending_b.funding_txo.is_some());
assert!(pending_b.reason.is_none());
assert_eq!(pending_b.closure_initiator, ChannelClosureInitiator::Unspecified as i32);

mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;
wait_for_usable_channel(server_a.client(), &bitcoind, Duration::from_secs(60)).await;

let ready_a = wait_for_event(&mut events_a, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.channel_id == pending_a.channel_id
&& channel_event.state == ChannelState::Ready as i32
)
})
.await;
let ready_a = match ready_a.event {
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
other => panic!("expected ChannelStateChanged event, got {other:?}"),
};
assert_eq!(ready_a.channel_id, pending_a.channel_id);
assert_eq!(ready_a.user_channel_id, open_resp.user_channel_id);
assert_eq!(ready_a.counterparty_node_id.as_deref(), Some(server_b.node_id()));
assert!(ready_a.funding_txo.is_some());
assert!(ready_a.reason.is_none());
assert_eq!(ready_a.closure_initiator, ChannelClosureInitiator::Unspecified as i32);

let ready_b = wait_for_event(&mut events_b, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.channel_id == pending_a.channel_id
&& channel_event.state == ChannelState::Ready as i32
)
})
.await;
let ready_b = match ready_b.event {
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
other => panic!("expected ChannelStateChanged event, got {other:?}"),
};
assert_eq!(ready_b.channel_id, pending_a.channel_id);
assert_eq!(ready_b.counterparty_node_id.as_deref(), Some(server_a.node_id()));
assert!(ready_b.funding_txo.is_some());
assert!(ready_b.reason.is_none());
assert_eq!(ready_b.closure_initiator, ChannelClosureInitiator::Unspecified as i32);

run_cli(&server_a, &["force-close-channel", &open_resp.user_channel_id, server_b.node_id()]);
mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;

let closed_a = wait_for_event(&mut events_a, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.channel_id == pending_a.channel_id
&& channel_event.state == ChannelState::Closed as i32
)
})
.await;
let closed_a = match closed_a.event {
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
other => panic!("expected ChannelStateChanged event, got {other:?}"),
};
assert_eq!(closed_a.user_channel_id, open_resp.user_channel_id);
assert_eq!(closed_a.state, ChannelState::Closed as i32);
assert_eq!(closed_a.counterparty_node_id.as_deref(), Some(server_b.node_id()));
assert!(closed_a.funding_txo.is_none());
let reason_a = closed_a.reason.expect("closed event must include closure reason");
assert_eq!(
ChannelStateChangeReasonKind::from_i32(reason_a.kind),
Some(ChannelStateChangeReasonKind::HolderForceClosed)
);
assert_eq!(closed_a.closure_initiator, ChannelClosureInitiator::Local as i32);

let closed_b = wait_for_event(&mut events_b, |e| {
matches!(
e,
Event::ChannelStateChanged(channel_event)
if channel_event.channel_id == pending_a.channel_id
&& channel_event.state == ChannelState::Closed as i32
)
})
.await;
let closed_b = match closed_b.event {
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
other => panic!("expected ChannelStateChanged event, got {other:?}"),
};
assert_eq!(closed_b.channel_id, pending_a.channel_id);
assert_eq!(closed_b.counterparty_node_id.as_deref(), Some(server_a.node_id()));
assert!(closed_b.funding_txo.is_none());
let reason_b = closed_b.reason.expect("closed event must include closure reason");
assert_eq!(
ChannelStateChangeReasonKind::from_i32(reason_b.kind),
Some(ChannelStateChangeReasonKind::CounterpartyForceClosed)
);
assert_eq!(closed_b.closure_initiator, ChannelClosureInitiator::Remote as i32);
}

#[tokio::test]
async fn test_cli_list_channels() {
let bitcoind = TestBitcoind::new();
Expand Down
Loading