Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions crates/app/src/retry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use backon::{BackoffBuilder, Retryable};
use chrono::{DateTime, Utc};
use pluto_core::clock::{ChronoClock, Clock};
use std::{sync::Arc, time::Duration};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, warn};
Expand All @@ -7,8 +9,8 @@ use tracing::{Instrument, debug, error, info, warn};
#[derive(Clone)]
pub struct AsyncOptions<T> {
backoff_builder: backon::ExponentialBuilder,
deadline_fn: Arc<dyn Fn(T) -> Option<chrono::DateTime<chrono::Utc>> + Send + Sync>,
time_fn: Arc<dyn Fn() -> chrono::DateTime<chrono::Utc> + Send + Sync>,
deadline_fn: Arc<dyn Fn(T) -> Option<DateTime<Utc>> + Send + Sync>,
clock: Arc<dyn Clock>,
cancellation_token: Option<CancellationToken>,
}

Expand All @@ -22,20 +24,16 @@ impl<T> AsyncOptions<T> {
/// Set the deadline function.
pub fn with_deadline(
mut self,
deadline_fn: impl Fn(T) -> Option<chrono::DateTime<chrono::Utc>> + Send + Sync + 'static,
deadline_fn: impl Fn(T) -> Option<DateTime<Utc>> + Send + Sync + 'static,
) -> Self {
self.deadline_fn = Arc::new(deadline_fn);
self
}

/// Set the time provider function. This function should return the "current
/// time", which will be compared with the deadline computed by the
/// `deadline_fn`.
pub fn with_time(
mut self,
time_fn: impl Fn() -> chrono::DateTime<chrono::Utc> + Send + Sync + 'static,
) -> Self {
self.time_fn = Arc::new(time_fn);
/// Set the [`Clock`] providing the "current time", which is compared with
/// the deadline computed by the `deadline_fn`.
pub fn with_time(mut self, clock: impl Clock) -> Self {
self.clock = Arc::new(clock);
self
}

Expand All @@ -56,7 +54,7 @@ impl<T> Default for AsyncOptions<T> {
.without_max_times()
.with_jitter(),
deadline_fn: Arc::new(|_| None),
time_fn: Arc::new(chrono::Utc::now),
clock: Arc::new(ChronoClock),
cancellation_token: None,
}
}
Expand Down Expand Up @@ -128,7 +126,7 @@ pub async fn do_async<
mut future: FutureFn,
) {
let deadline = (options.deadline_fn)(t);
let now = (options.time_fn)();
let now = options.clock.now();

#[allow(
clippy::arithmetic_side_effects,
Expand Down Expand Up @@ -199,6 +197,7 @@ mod tests {
use tokio_util::sync::CancellationToken;

use crate::retry::{self, DoAsyncError};
use chrono::Utc;
use core::time;
use std::sync::{Arc, Mutex};

Expand Down Expand Up @@ -285,7 +284,7 @@ mod tests {

#[tokio::test]
async fn one_attempt_timeout() {
let now = chrono::Utc::now();
let now = Utc::now();

run_test(TestCase {
options: retry::AsyncOptions::default()
Expand Down
31 changes: 31 additions & 0 deletions crates/core/src/clock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use chrono::{DateTime, Utc};

/// Provides the "current time".
///
/// [`ChronoClock`] is the production implementation; tests can supply a
/// deterministic clock instead. Any `Fn() -> DateTime<Utc>` also implements
/// this trait via the blanket impl below, so closures and [`chrono::Utc::now`]
/// can be used directly.
pub trait Clock: Send + Sync + 'static {
/// Returns the current time.
fn now(&self) -> DateTime<Utc>;
}

/// [`Clock`] backed by the system wall clock via [`chrono::Utc::now`].
#[derive(Debug, Clone, Copy, Default)]
pub struct ChronoClock;

impl Clock for ChronoClock {
fn now(&self) -> DateTime<Utc> {
Utc::now()
}
}

impl<F> Clock for F
where
F: Fn() -> DateTime<Utc> + Send + Sync + 'static,
{
fn now(&self) -> DateTime<Utc> {
self()
}
}
3 changes: 3 additions & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub mod version;
/// Duty deadline tracking and notification.
pub mod deadline;

/// Clock abstraction over the current time.
pub mod clock;

/// parsigdb
pub mod parsigdb;

Expand Down
48 changes: 23 additions & 25 deletions crates/eth2util/src/enr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,28 +105,26 @@ pub struct Record {
kvs: HashMap<String, Vec<u8>>,
}

/// OptionFn is a function that sets an option in the record.
pub type OptionFn = Box<dyn Fn(&mut HashMap<String, Vec<u8>>)>;

/// with_ip_impl is a function that sets the IP address in the record.
pub fn with_ip_impl(ip: Ipv4Addr) -> OptionFn {
Box::new(move |kvs: &mut HashMap<String, Vec<u8>>| {
kvs.insert(KEY_IP.to_string(), ip.octets().to_vec());
})
/// A single entry to set when creating a record.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EnrEntry {
/// Sets the IP address.
Ipv4(Ipv4Addr),
/// Sets the TCP port.
Tcp(u16),
/// Sets the UDP port.
Udp(u16),
}

/// with_tcp_impl is a function that sets the TCP port in the record.
pub fn with_tcp_impl(tcp: u16) -> OptionFn {
Box::new(move |kvs: &mut HashMap<String, Vec<u8>>| {
kvs.insert(KEY_TCP.to_string(), tcp.to_be_bytes().to_vec());
})
}

/// with_udp_impl is a function that sets the UDP port in the record.
pub fn with_udp_impl(udp: u16) -> OptionFn {
Box::new(move |kvs: &mut HashMap<String, Vec<u8>>| {
kvs.insert(KEY_UDP.to_string(), udp.to_be_bytes().to_vec());
})
impl EnrEntry {
/// Writes this entry into the record's key-value pairs.
fn apply(self, kvs: &mut HashMap<String, Vec<u8>>) {
let _ = match self {
Self::Ipv4(ip) => kvs.insert(KEY_IP.to_string(), ip.octets().to_vec()),
Self::Tcp(tcp) => kvs.insert(KEY_TCP.to_string(), tcp.to_be_bytes().to_vec()),
Self::Udp(udp) => kvs.insert(KEY_UDP.to_string(), udp.to_be_bytes().to_vec()),
};
}
}

impl Record {
Expand All @@ -137,7 +135,7 @@ impl Record {
}

/// Creates a new record.
pub fn new(secret_key: &SecretKey, opts: Vec<OptionFn>) -> Result<Self, RecordError> {
pub fn new(secret_key: &SecretKey, opts: Vec<EnrEntry>) -> Result<Self, RecordError> {
let mut kvs: HashMap<String, Vec<u8>> = HashMap::new();

kvs.insert(KEY_ID.to_string(), VAL_ID.as_bytes().to_vec());
Expand All @@ -147,7 +145,7 @@ impl Record {
);

for opt in opts {
opt(&mut kvs);
opt.apply(&mut kvs);
}

let signature = sign(secret_key, &encode_elements(&[], &kvs))?;
Expand Down Expand Up @@ -394,9 +392,9 @@ mod tests {
let r1 = Record::new(
&secret_key,
vec![
with_ip_impl(expect_ip),
with_tcp_impl(expect_tcp),
with_udp_impl(expect_udp),
EnrEntry::Ipv4(expect_ip),
EnrEntry::Tcp(expect_tcp),
EnrEntry::Udp(expect_udp),
],
)
.expect("Failed to create record");
Expand Down
9 changes: 5 additions & 4 deletions crates/peerinfo/examples/peerinfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use libp2p::{
};
use pluto_cluster::lock::Lock;
use pluto_core::version::{VERSION, git_commit};
use pluto_eth2util::enr::{EnrEntry, Record};
use pluto_p2p::{
behaviours::pluto::PlutoBehaviourEvent,
config::P2PConfig,
Expand Down Expand Up @@ -256,12 +257,12 @@ async fn main() -> anyhow::Result<()> {
}
};

let enr = pluto_eth2util::enr::Record::new(
let enr = Record::new(
&key,
vec![
pluto_eth2util::enr::with_ip_impl(Ipv4Addr::from([0, 0, 0, 0])),
pluto_eth2util::enr::with_tcp_impl(args.port),
pluto_eth2util::enr::with_udp_impl(args.port),
EnrEntry::Ipv4(Ipv4Addr::from([0, 0, 0, 0])),
EnrEntry::Tcp(args.port),
EnrEntry::Udp(args.port),
],
)?;

Expand Down
9 changes: 5 additions & 4 deletions crates/relay-server/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use axum::{
};
use k256::SecretKey;
use libp2p::{Multiaddr, PeerId, multiaddr};
use pluto_eth2util::enr::{EnrEntry, Record};
use tokio::{
net::TcpListener,
sync::{RwLock, mpsc},
Expand Down Expand Up @@ -284,12 +285,12 @@ pub async fn enr_handler(
};

// Create ENR record
let record = pluto_eth2util::enr::Record::new(
let record = Record::new(
&state.secret_key,
vec![
pluto_eth2util::enr::with_ip_impl(ip),
pluto_eth2util::enr::with_tcp_impl(tcp_port),
pluto_eth2util::enr::with_udp_impl(udp_port),
EnrEntry::Ipv4(ip),
EnrEntry::Tcp(tcp_port),
EnrEntry::Udp(udp_port),
],
)
.map_err(|e| HandlerError {
Expand Down
Loading