From d876b5a396b7ac789e8365786bbd4ab8b77e7312 Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Mon, 25 May 2026 11:23:19 +0200 Subject: [PATCH 1/5] Add the Clock trait --- crates/app/src/retry.rs | 45 +++++++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/crates/app/src/retry.rs b/crates/app/src/retry.rs index d6beeb31..f634387e 100644 --- a/crates/app/src/retry.rs +++ b/crates/app/src/retry.rs @@ -1,14 +1,34 @@ use backon::{BackoffBuilder, Retryable}; +use chrono::{DateTime, Utc}; use std::{sync::Arc, time::Duration}; use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error, info, warn}; +/// Provides the "current time" used to compute retry deadlines. +/// +/// A blanket implementation covers any `Fn() -> DateTime`, so closures +/// (including [`chrono::Utc::now`]) can be passed wherever a `Clock` is +/// expected. Custom implementations are useful for deterministic tests. +pub trait Clock: Send + Sync + 'static { + /// Returns the current time. + fn now(&self) -> DateTime; +} + +impl Clock for F +where + F: Fn() -> DateTime + Send + Sync + 'static, +{ + fn now(&self) -> DateTime { + self() + } +} + /// Options for the asynchronous retry executor. #[derive(Clone)] pub struct AsyncOptions { backoff_builder: backon::ExponentialBuilder, - deadline_fn: Arc Option> + Send + Sync>, - time_fn: Arc chrono::DateTime + Send + Sync>, + deadline_fn: Arc Option> + Send + Sync>, + clock: Arc, cancellation_token: Option, } @@ -22,20 +42,19 @@ impl AsyncOptions { /// Set the deadline function. pub fn with_deadline( mut self, - deadline_fn: impl Fn(T) -> Option> + Send + Sync + 'static, + deadline_fn: impl Fn(T) -> Option> + Send + Sync + 'static, ) -> Self { self.deadline_fn = Arc::new(deadline_fn); self } - /// Set the time provider function. This function should return the "current - /// time", which will be compared with the deadline computed by the - /// `deadline_fn`. - pub fn with_time( - mut self, - time_fn: impl Fn() -> chrono::DateTime + Send + Sync + 'static, - ) -> Self { - self.time_fn = Arc::new(time_fn); + /// Set the [`Clock`] providing the "current time", which is compared with + /// the deadline computed by the `deadline_fn`. + /// + /// Accepts any [`Clock`], including a `Fn() -> DateTime` closure via + /// the blanket implementation. + pub fn with_time(mut self, clock: impl Clock) -> Self { + self.clock = Arc::new(clock); self } @@ -56,7 +75,7 @@ impl Default for AsyncOptions { .without_max_times() .with_jitter(), deadline_fn: Arc::new(|_| None), - time_fn: Arc::new(chrono::Utc::now), + clock: Arc::new(Utc::now), cancellation_token: None, } } @@ -128,7 +147,7 @@ pub async fn do_async< mut future: FutureFn, ) { let deadline = (options.deadline_fn)(t); - let now = (options.time_fn)(); + let now = options.clock.now(); #[allow( clippy::arithmetic_side_effects, From 23e28adcff2e2b25b495cf8154d6d8af656db5d0 Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Mon, 25 May 2026 16:47:07 +0200 Subject: [PATCH 2/5] Add EnrEntry --- crates/eth2util/src/enr.rs | 48 +++++++++++++--------------- crates/peerinfo/examples/peerinfo.rs | 9 +++--- crates/relay-server/src/web.rs | 9 +++--- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/crates/eth2util/src/enr.rs b/crates/eth2util/src/enr.rs index 912a7e97..d005ee41 100644 --- a/crates/eth2util/src/enr.rs +++ b/crates/eth2util/src/enr.rs @@ -105,28 +105,26 @@ pub struct Record { kvs: HashMap>, } -/// OptionFn is a function that sets an option in the record. -pub type OptionFn = Box>)>; - -/// with_ip_impl is a function that sets the IP address in the record. -pub fn with_ip_impl(ip: Ipv4Addr) -> OptionFn { - Box::new(move |kvs: &mut HashMap>| { - kvs.insert(KEY_IP.to_string(), ip.octets().to_vec()); - }) +/// A single entry to set when creating a record. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum EnrEntry { + /// Sets the IP address. + Ip(Ipv4Addr), + /// Sets the TCP port. + Tcp(u16), + /// Sets the UDP port. + Udp(u16), } -/// with_tcp_impl is a function that sets the TCP port in the record. -pub fn with_tcp_impl(tcp: u16) -> OptionFn { - Box::new(move |kvs: &mut HashMap>| { - kvs.insert(KEY_TCP.to_string(), tcp.to_be_bytes().to_vec()); - }) -} - -/// with_udp_impl is a function that sets the UDP port in the record. -pub fn with_udp_impl(udp: u16) -> OptionFn { - Box::new(move |kvs: &mut HashMap>| { - kvs.insert(KEY_UDP.to_string(), udp.to_be_bytes().to_vec()); - }) +impl EnrEntry { + /// Writes this entry into the record's key-value pairs. + fn apply(self, kvs: &mut HashMap>) { + match self { + Self::Ip(ip) => kvs.insert(KEY_IP.to_string(), ip.octets().to_vec()), + Self::Tcp(tcp) => kvs.insert(KEY_TCP.to_string(), tcp.to_be_bytes().to_vec()), + Self::Udp(udp) => kvs.insert(KEY_UDP.to_string(), udp.to_be_bytes().to_vec()), + }; + } } impl Record { @@ -137,7 +135,7 @@ impl Record { } /// Creates a new record. - pub fn new(secret_key: &SecretKey, opts: Vec) -> Result { + pub fn new(secret_key: &SecretKey, opts: Vec) -> Result { let mut kvs: HashMap> = HashMap::new(); kvs.insert(KEY_ID.to_string(), VAL_ID.as_bytes().to_vec()); @@ -147,7 +145,7 @@ impl Record { ); for opt in opts { - opt(&mut kvs); + opt.apply(&mut kvs); } let signature = sign(secret_key, &encode_elements(&[], &kvs))?; @@ -394,9 +392,9 @@ mod tests { let r1 = Record::new( &secret_key, vec![ - with_ip_impl(expect_ip), - with_tcp_impl(expect_tcp), - with_udp_impl(expect_udp), + EnrEntry::Ip(expect_ip), + EnrEntry::Tcp(expect_tcp), + EnrEntry::Udp(expect_udp), ], ) .expect("Failed to create record"); diff --git a/crates/peerinfo/examples/peerinfo.rs b/crates/peerinfo/examples/peerinfo.rs index 1ad36c29..22e68ce3 100644 --- a/crates/peerinfo/examples/peerinfo.rs +++ b/crates/peerinfo/examples/peerinfo.rs @@ -19,6 +19,7 @@ use libp2p::{ }; use pluto_cluster::lock::Lock; use pluto_core::version::{VERSION, git_commit}; +use pluto_eth2util::enr::{EnrEntry, Record}; use pluto_p2p::{ behaviours::pluto::PlutoBehaviourEvent, config::P2PConfig, @@ -256,12 +257,12 @@ async fn main() -> anyhow::Result<()> { } }; - let enr = pluto_eth2util::enr::Record::new( + let enr = Record::new( &key, vec![ - pluto_eth2util::enr::with_ip_impl(Ipv4Addr::from([0, 0, 0, 0])), - pluto_eth2util::enr::with_tcp_impl(args.port), - pluto_eth2util::enr::with_udp_impl(args.port), + EnrEntry::Ip(Ipv4Addr::from([0, 0, 0, 0])), + EnrEntry::Tcp(args.port), + EnrEntry::Udp(args.port), ], )?; diff --git a/crates/relay-server/src/web.rs b/crates/relay-server/src/web.rs index 678a33b2..449785a3 100644 --- a/crates/relay-server/src/web.rs +++ b/crates/relay-server/src/web.rs @@ -14,6 +14,7 @@ use axum::{ }; use k256::SecretKey; use libp2p::{Multiaddr, PeerId, multiaddr}; +use pluto_eth2util::enr::{EnrEntry, Record}; use tokio::{ net::TcpListener, sync::{RwLock, mpsc}, @@ -284,12 +285,12 @@ pub async fn enr_handler( }; // Create ENR record - let record = pluto_eth2util::enr::Record::new( + let record = Record::new( &state.secret_key, vec![ - pluto_eth2util::enr::with_ip_impl(ip), - pluto_eth2util::enr::with_tcp_impl(tcp_port), - pluto_eth2util::enr::with_udp_impl(udp_port), + EnrEntry::Ip(ip), + EnrEntry::Tcp(tcp_port), + EnrEntry::Udp(udp_port), ], ) .map_err(|e| HandlerError { From e65febcc6e5ace8c0bf8368c63d8535db54c6b85 Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Tue, 26 May 2026 13:58:16 +0200 Subject: [PATCH 3/5] Add fixed and system clocks --- crates/app/src/retry.rs | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/crates/app/src/retry.rs b/crates/app/src/retry.rs index f634387e..e37d08d7 100644 --- a/crates/app/src/retry.rs +++ b/crates/app/src/retry.rs @@ -6,20 +6,20 @@ use tracing::{Instrument, debug, error, info, warn}; /// Provides the "current time" used to compute retry deadlines. /// -/// A blanket implementation covers any `Fn() -> DateTime`, so closures -/// (including [`chrono::Utc::now`]) can be passed wherever a `Clock` is -/// expected. Custom implementations are useful for deterministic tests. +/// [`SystemClock`] is the production implementation; tests can supply a +/// deterministic clock instead. pub trait Clock: Send + Sync + 'static { /// Returns the current time. fn now(&self) -> DateTime; } -impl Clock for F -where - F: Fn() -> DateTime + Send + Sync + 'static, -{ +/// [`Clock`] backed by the system wall clock via [`chrono::Utc::now`]. +#[derive(Debug, Clone, Copy, Default)] +pub struct SystemClock; + +impl Clock for SystemClock { fn now(&self) -> DateTime { - self() + Utc::now() } } @@ -50,9 +50,6 @@ impl AsyncOptions { /// Set the [`Clock`] providing the "current time", which is compared with /// the deadline computed by the `deadline_fn`. - /// - /// Accepts any [`Clock`], including a `Fn() -> DateTime` closure via - /// the blanket implementation. pub fn with_time(mut self, clock: impl Clock) -> Self { self.clock = Arc::new(clock); self @@ -75,7 +72,7 @@ impl Default for AsyncOptions { .without_max_times() .with_jitter(), deadline_fn: Arc::new(|_| None), - clock: Arc::new(Utc::now), + clock: Arc::new(SystemClock), cancellation_token: None, } } @@ -217,10 +214,20 @@ pub async fn do_async< mod tests { use tokio_util::sync::CancellationToken; - use crate::retry::{self, DoAsyncError}; + use crate::retry::{self, Clock, DoAsyncError}; + use chrono::{DateTime, Utc}; use core::time; use std::sync::{Arc, Mutex}; + /// [`Clock`] that always returns a fixed instant, for deterministic tests. + struct FixedClock(DateTime); + + impl Clock for FixedClock { + fn now(&self) -> DateTime { + self.0 + } + } + struct TestCase { options: retry::AsyncOptions<()>, func: Arc Result<(), DoAsyncError> + Send + Sync>, @@ -304,12 +311,12 @@ mod tests { #[tokio::test] async fn one_attempt_timeout() { - let now = chrono::Utc::now(); + let now = Utc::now(); run_test(TestCase { options: retry::AsyncOptions::default() .with_backoff(test_backoff()) - .with_time(move || now) + .with_time(FixedClock(now)) .with_deadline(move |_| Some(now)), func: Arc::new(|_| Err(DoAsyncError::RetryableError)), expected_attempts: 1, From ce448ee2926aa386e6761069d9b93572c969b2ef Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Wed, 27 May 2026 21:04:40 +0200 Subject: [PATCH 4/5] Fix Chrono and Ip --- crates/app/src/retry.rs | 8 ++++---- crates/eth2util/src/enr.rs | 6 +++--- crates/peerinfo/examples/peerinfo.rs | 2 +- crates/relay-server/src/web.rs | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/crates/app/src/retry.rs b/crates/app/src/retry.rs index e37d08d7..29aa054d 100644 --- a/crates/app/src/retry.rs +++ b/crates/app/src/retry.rs @@ -6,7 +6,7 @@ use tracing::{Instrument, debug, error, info, warn}; /// Provides the "current time" used to compute retry deadlines. /// -/// [`SystemClock`] is the production implementation; tests can supply a +/// [`ChronoClock`] is the production implementation; tests can supply a /// deterministic clock instead. pub trait Clock: Send + Sync + 'static { /// Returns the current time. @@ -15,9 +15,9 @@ pub trait Clock: Send + Sync + 'static { /// [`Clock`] backed by the system wall clock via [`chrono::Utc::now`]. #[derive(Debug, Clone, Copy, Default)] -pub struct SystemClock; +pub struct ChronoClock; -impl Clock for SystemClock { +impl Clock for ChronoClock { fn now(&self) -> DateTime { Utc::now() } @@ -72,7 +72,7 @@ impl Default for AsyncOptions { .without_max_times() .with_jitter(), deadline_fn: Arc::new(|_| None), - clock: Arc::new(SystemClock), + clock: Arc::new(ChronoClock), cancellation_token: None, } } diff --git a/crates/eth2util/src/enr.rs b/crates/eth2util/src/enr.rs index d005ee41..ab6dbdb7 100644 --- a/crates/eth2util/src/enr.rs +++ b/crates/eth2util/src/enr.rs @@ -109,7 +109,7 @@ pub struct Record { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum EnrEntry { /// Sets the IP address. - Ip(Ipv4Addr), + Ipv4(Ipv4Addr), /// Sets the TCP port. Tcp(u16), /// Sets the UDP port. @@ -120,7 +120,7 @@ impl EnrEntry { /// Writes this entry into the record's key-value pairs. fn apply(self, kvs: &mut HashMap>) { match self { - Self::Ip(ip) => kvs.insert(KEY_IP.to_string(), ip.octets().to_vec()), + Self::Ipv4(ip) => kvs.insert(KEY_IP.to_string(), ip.octets().to_vec()), Self::Tcp(tcp) => kvs.insert(KEY_TCP.to_string(), tcp.to_be_bytes().to_vec()), Self::Udp(udp) => kvs.insert(KEY_UDP.to_string(), udp.to_be_bytes().to_vec()), }; @@ -392,7 +392,7 @@ mod tests { let r1 = Record::new( &secret_key, vec![ - EnrEntry::Ip(expect_ip), + EnrEntry::Ipv4(expect_ip), EnrEntry::Tcp(expect_tcp), EnrEntry::Udp(expect_udp), ], diff --git a/crates/peerinfo/examples/peerinfo.rs b/crates/peerinfo/examples/peerinfo.rs index 22e68ce3..a96e1d68 100644 --- a/crates/peerinfo/examples/peerinfo.rs +++ b/crates/peerinfo/examples/peerinfo.rs @@ -260,7 +260,7 @@ async fn main() -> anyhow::Result<()> { let enr = Record::new( &key, vec![ - EnrEntry::Ip(Ipv4Addr::from([0, 0, 0, 0])), + EnrEntry::Ipv4(Ipv4Addr::from([0, 0, 0, 0])), EnrEntry::Tcp(args.port), EnrEntry::Udp(args.port), ], diff --git a/crates/relay-server/src/web.rs b/crates/relay-server/src/web.rs index 449785a3..4a5c47a8 100644 --- a/crates/relay-server/src/web.rs +++ b/crates/relay-server/src/web.rs @@ -288,7 +288,7 @@ pub async fn enr_handler( let record = Record::new( &state.secret_key, vec![ - EnrEntry::Ip(ip), + EnrEntry::Ipv4(ip), EnrEntry::Tcp(tcp_port), EnrEntry::Udp(udp_port), ], From 6c46acec479549b491e6f66b7a13214400d4f742 Mon Sep 17 00:00:00 2001 From: Denis Kolodin Date: Wed, 27 May 2026 21:17:09 +0200 Subject: [PATCH 5/5] Move Clock to core --- crates/app/src/retry.rs | 35 ++++------------------------------- crates/core/src/clock.rs | 31 +++++++++++++++++++++++++++++++ crates/core/src/lib.rs | 3 +++ crates/eth2util/src/enr.rs | 2 +- 4 files changed, 39 insertions(+), 32 deletions(-) create mode 100644 crates/core/src/clock.rs diff --git a/crates/app/src/retry.rs b/crates/app/src/retry.rs index 29aa054d..d95aa2df 100644 --- a/crates/app/src/retry.rs +++ b/crates/app/src/retry.rs @@ -1,28 +1,10 @@ use backon::{BackoffBuilder, Retryable}; use chrono::{DateTime, Utc}; +use pluto_core::clock::{ChronoClock, Clock}; use std::{sync::Arc, time::Duration}; use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error, info, warn}; -/// Provides the "current time" used to compute retry deadlines. -/// -/// [`ChronoClock`] is the production implementation; tests can supply a -/// deterministic clock instead. -pub trait Clock: Send + Sync + 'static { - /// Returns the current time. - fn now(&self) -> DateTime; -} - -/// [`Clock`] backed by the system wall clock via [`chrono::Utc::now`]. -#[derive(Debug, Clone, Copy, Default)] -pub struct ChronoClock; - -impl Clock for ChronoClock { - fn now(&self) -> DateTime { - Utc::now() - } -} - /// Options for the asynchronous retry executor. #[derive(Clone)] pub struct AsyncOptions { @@ -214,20 +196,11 @@ pub async fn do_async< mod tests { use tokio_util::sync::CancellationToken; - use crate::retry::{self, Clock, DoAsyncError}; - use chrono::{DateTime, Utc}; + use crate::retry::{self, DoAsyncError}; + use chrono::Utc; use core::time; use std::sync::{Arc, Mutex}; - /// [`Clock`] that always returns a fixed instant, for deterministic tests. - struct FixedClock(DateTime); - - impl Clock for FixedClock { - fn now(&self) -> DateTime { - self.0 - } - } - struct TestCase { options: retry::AsyncOptions<()>, func: Arc Result<(), DoAsyncError> + Send + Sync>, @@ -316,7 +289,7 @@ mod tests { run_test(TestCase { options: retry::AsyncOptions::default() .with_backoff(test_backoff()) - .with_time(FixedClock(now)) + .with_time(move || now) .with_deadline(move |_| Some(now)), func: Arc::new(|_| Err(DoAsyncError::RetryableError)), expected_attempts: 1, diff --git a/crates/core/src/clock.rs b/crates/core/src/clock.rs new file mode 100644 index 00000000..dad38882 --- /dev/null +++ b/crates/core/src/clock.rs @@ -0,0 +1,31 @@ +use chrono::{DateTime, Utc}; + +/// Provides the "current time". +/// +/// [`ChronoClock`] is the production implementation; tests can supply a +/// deterministic clock instead. Any `Fn() -> DateTime` also implements +/// this trait via the blanket impl below, so closures and [`chrono::Utc::now`] +/// can be used directly. +pub trait Clock: Send + Sync + 'static { + /// Returns the current time. + fn now(&self) -> DateTime; +} + +/// [`Clock`] backed by the system wall clock via [`chrono::Utc::now`]. +#[derive(Debug, Clone, Copy, Default)] +pub struct ChronoClock; + +impl Clock for ChronoClock { + fn now(&self) -> DateTime { + Utc::now() + } +} + +impl Clock for F +where + F: Fn() -> DateTime + Send + Sync + 'static, +{ + fn now(&self) -> DateTime { + self() + } +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 5b44a216..4d1b40e6 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -23,6 +23,9 @@ pub mod version; /// Duty deadline tracking and notification. pub mod deadline; +/// Clock abstraction over the current time. +pub mod clock; + /// parsigdb pub mod parsigdb; diff --git a/crates/eth2util/src/enr.rs b/crates/eth2util/src/enr.rs index ab6dbdb7..9e0a4969 100644 --- a/crates/eth2util/src/enr.rs +++ b/crates/eth2util/src/enr.rs @@ -119,7 +119,7 @@ pub enum EnrEntry { impl EnrEntry { /// Writes this entry into the record's key-value pairs. fn apply(self, kvs: &mut HashMap>) { - match self { + let _ = match self { Self::Ipv4(ip) => kvs.insert(KEY_IP.to_string(), ip.octets().to_vec()), Self::Tcp(tcp) => kvs.insert(KEY_TCP.to_string(), tcp.to_be_bytes().to_vec()), Self::Udp(udp) => kvs.insert(KEY_UDP.to_string(), udp.to_be_bytes().to_vec()),