zebra_network/peer_set/
candidate_set.rs

1//! Candidate peer selection for outbound connections using the [`CandidateSet`].
2
3use std::{any::type_name, cmp::min, sync::Arc};
4
5use chrono::Utc;
6use futures::stream::{FuturesUnordered, StreamExt};
7use tokio::time::{sleep_until, timeout, Instant};
8use tower::{Service, ServiceExt};
9use tracing::Span;
10
11use zebra_chain::{diagnostic::task::WaitForPanics, serialization::DateTime32};
12
13use crate::{
14    constants, meta_addr::MetaAddrChange, peer_set::set::MorePeers, types::MetaAddr, AddressBook,
15    BoxError, Request, Response,
16};
17
18#[cfg(test)]
19mod tests;
20
21/// The [`CandidateSet`] manages outbound peer connection attempts. Successful
22/// connections become peers in the [`PeerSet`](super::set::PeerSet).
23///
24/// The candidate set divides the set of all possible outbound peers into
25/// disjoint subsets, using the [`PeerAddrState`](crate::PeerAddrState):
26///
27/// 1. [`Responded`] peers, which we have had an outbound connection to.
28/// 2. [`NeverAttemptedGossiped`] peers, which we learned about from other peers
29///    but have never connected to. This includes gossiped peers, DNS seeder peers,
30///    cached peers, canonical addresses from the [`Version`] messages of inbound
31///    and outbound connections, and remote IP addresses of inbound connections.
32/// 3. [`Failed`] peers, which failed a connection attempt, or had an error
33///    during an outbound connection.
34/// 4. [`AttemptPending`] peers, which we've recently queued for a connection.
35///
36/// Never attempted peers are always available for connection.
37///
38/// If a peer's attempted, responded, or failure time is recent
39/// (within the liveness limit), we avoid reconnecting to it.
40/// Otherwise, we assume that it has disconnected or hung,
41/// and attempt reconnection.
42///
43/// ```ascii,no_run
44///                         ┌──────────────────┐
45///                         │   Config / DNS   │
46///             ┌───────────│       Seed       │───────────┐
47///             │           │    Addresses     │           │
48///             │           └──────────────────┘           │
49///             │                    │ untrusted_last_seen │
50///             │                    │     is unknown      │
51///             ▼                    │                     ▼
52///    ┌──────────────────┐          │          ┌──────────────────┐
53///    │    Handshake     │          │          │     Peer Set     │
54///    │    Canonical     │──────────┼──────────│     Gossiped     │
55///    │    Addresses     │          │          │    Addresses     │
56///    └──────────────────┘          │          └──────────────────┘
57///     untrusted_last_seen          │                provides
58///         set to now               │           untrusted_last_seen
59///                                  ▼
60///                                  Λ   if attempted, responded, or failed:
61///                                 ╱ ╲         ignore gossiped info
62///                                ▕   ▏    otherwise, if never attempted:
63///                                 ╲ ╱    skip updates to existing fields
64///                                  V
65///  ┌───────────────────────────────┼───────────────────────────────┐
66///  │ AddressBook                   │                               │
67///  │ disjoint `PeerAddrState`s     ▼                               │
68///  │ ┌─────────────┐  ┌─────────────────────────┐  ┌─────────────┐ │
69///  │ │ `Responded` │  │`NeverAttemptedGossiped` │  │  `Failed`   │ │
70/// ┌┼▶│    Peers    │  │          Peers          │  │   Peers     │◀┼┐
71/// ││ └─────────────┘  └─────────────────────────┘  └─────────────┘ ││
72/// ││        │                      │                      │        ││
73/// ││ #1 oldest_first        #2 newest_first        #3 oldest_first ││
74/// ││        ├──────────────────────┴──────────────────────┘        ││
75/// ││        ▼                                                      ││
76/// ││        Λ                                                      ││
77/// ││       ╱ ╲              filter by                              ││
78/// ││      ▕   ▏   is_ready_for_connection_attempt                  ││
79/// ││       ╲ ╱     to remove recent `Responded`,                   ││
80/// ││        V  `AttemptPending`, and `Failed` peers                ││
81/// ││        │                                                      ││
82/// ││        │    try outbound connection,                          ││
83/// ││        ▼  update last_attempt to now()                        ││
84/// ││┌────────────────┐                                             ││
85/// │││`AttemptPending`│                                             ││
86/// │││     Peers      │                                             ││
87/// ││└────────────────┘                                             ││
88/// │└────────┼──────────────────────────────────────────────────────┘│
89/// │         ▼                                                       │
90/// │         Λ                                                       │
91/// │        ╱ ╲                                                      │
92/// │       ▕   ▏─────────────────────────────────────────────────────┘
93/// │        ╲ ╱   connection failed, update last_failure to now()
94/// │         V
95/// │         │
96/// │         │ connection succeeded
97/// │         ▼
98/// │  ┌────────────┐
99/// │  │    send    │
100/// │  │peer::Client│
101/// │  │to Discover │
102/// │  └────────────┘
103/// │         │
104/// │         ▼
105/// │┌───────────────────────────────────────┐
106/// ││ when connection succeeds, and every   │
107/// ││  time we receive a peer heartbeat:    │
108/// └│  * update state to `Responded`        │
109///  │  * update last_response to now()      │
110///  └───────────────────────────────────────┘
111/// ```
112///
113/// [`Responded`]: crate::PeerAddrState::Responded
114/// [`Version`]: crate::protocol::external::types::Version
115/// [`NeverAttemptedGossiped`]: crate::PeerAddrState::NeverAttemptedGossiped
116/// [`Failed`]: crate::PeerAddrState::Failed
117/// [`AttemptPending`]: crate::PeerAddrState::AttemptPending
118// TODO:
119//   * show all possible transitions between Attempt/Responded/Failed,
120//     except Failed -> Responded is invalid, must go through Attempt
121//
122// Note: the CandidateSet can't be cloned, because there needs to be a single
123// instance of its timers, so that rate limits are enforced correctly.
124pub(crate) struct CandidateSet<S>
125where
126    S: Service<Request, Response = Response, Error = BoxError> + Send,
127    S::Future: Send + 'static,
128{
129    /// The outbound address book for this peer set.
130    ///
131    /// # Correctness
132    ///
133    /// The address book must be private, so all operations are performed on a blocking thread
134    /// (see #1976).
135    address_book: Arc<std::sync::Mutex<AddressBook>>,
136
137    /// The peer set used to crawl the network for peers.
138    peer_service: S,
139
140    /// A timer that enforces a rate-limit on new outbound connections.
141    min_next_handshake: Instant,
142
143    /// A timer that enforces a rate-limit on peer set requests for more peers.
144    min_next_crawl: Instant,
145}
146
147impl<S> std::fmt::Debug for CandidateSet<S>
148where
149    S: Service<Request, Response = Response, Error = BoxError> + Send,
150    S::Future: Send + 'static,
151{
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        f.debug_struct("CandidateSet")
154            .field("address_book", &self.address_book)
155            .field("peer_service", &type_name::<S>())
156            .field("min_next_handshake", &self.min_next_handshake)
157            .field("min_next_crawl", &self.min_next_crawl)
158            .finish()
159    }
160}
161
162impl<S> CandidateSet<S>
163where
164    S: Service<Request, Response = Response, Error = BoxError> + Send,
165    S::Future: Send + 'static,
166{
167    /// Uses `address_book` and `peer_service` to manage a [`CandidateSet`] of peers.
168    pub fn new(
169        address_book: Arc<std::sync::Mutex<AddressBook>>,
170        peer_service: S,
171    ) -> CandidateSet<S> {
172        CandidateSet {
173            address_book,
174            peer_service,
175            min_next_handshake: Instant::now(),
176            min_next_crawl: Instant::now(),
177        }
178    }
179
180    /// Update the peer set from the network, using the default fanout limit.
181    ///
182    /// See [`update_initial`][Self::update_initial] for details.
183    pub async fn update(&mut self) -> Result<Option<MorePeers>, BoxError> {
184        self.update_timeout(None).await
185    }
186
187    /// Update the peer set from the network, limiting the fanout to
188    /// `fanout_limit`.
189    ///
190    /// - Ask a few live [`Responded`] peers to send us more peers.
191    /// - Process all completed peer responses, adding new peers in the
192    ///   [`NeverAttemptedGossiped`] state.
193    ///
194    /// Returns `Some(MorePeers)` if the crawl was successful and the crawler
195    /// should ask for more peers. Returns `None` if there are no new peers.
196    ///
197    /// ## Correctness
198    ///
199    /// Pass the initial peer set size as `fanout_limit` during initialization,
200    /// so that Zebra does not send duplicate requests to the same peer.
201    ///
202    /// The crawler exits when update returns an error, so it must only return
203    /// errors on permanent failures.
204    ///
205    /// The handshaker sets up the peer message receiver so it also sends a
206    /// [`Responded`] peer address update.
207    ///
208    /// [`next`][Self::next] puts peers into the [`AttemptPending`] state.
209    ///
210    /// ## Security
211    ///
212    /// This call is rate-limited to prevent sending a burst of repeated requests for new peer
213    /// addresses. Each call will only update the [`CandidateSet`] if more time
214    /// than [`MIN_PEER_GET_ADDR_INTERVAL`][constants::MIN_PEER_GET_ADDR_INTERVAL] has passed since
215    /// the last call. Otherwise, the update is skipped.
216    ///
217    /// [`Responded`]: crate::PeerAddrState::Responded
218    /// [`NeverAttemptedGossiped`]: crate::PeerAddrState::NeverAttemptedGossiped
219    /// [`Failed`]: crate::PeerAddrState::Failed
220    /// [`AttemptPending`]: crate::PeerAddrState::AttemptPending
221    pub async fn update_initial(
222        &mut self,
223        fanout_limit: usize,
224    ) -> Result<Option<MorePeers>, BoxError> {
225        self.update_timeout(Some(fanout_limit)).await
226    }
227
228    /// Update the peer set from the network, limiting the fanout to
229    /// `fanout_limit`, and imposing a timeout on the entire fanout.
230    ///
231    /// See [`update_initial`][Self::update_initial] for details.
232    async fn update_timeout(
233        &mut self,
234        fanout_limit: Option<usize>,
235    ) -> Result<Option<MorePeers>, BoxError> {
236        let mut more_peers = None;
237
238        // SECURITY
239        //
240        // Rate limit sending `GetAddr` messages to peers.
241        if self.min_next_crawl <= Instant::now() {
242            // CORRECTNESS
243            //
244            // Use a timeout to avoid deadlocks when there are no connected
245            // peers, and:
246            // - we're waiting on a handshake to complete so there are peers, or
247            // - another task that handles or adds peers is waiting on this task
248            //   to complete.
249            if let Ok(fanout_result) = timeout(
250                constants::PEER_GET_ADDR_TIMEOUT,
251                self.update_fanout(fanout_limit),
252            )
253            .await
254            {
255                more_peers = fanout_result?;
256            } else {
257                // update must only return an error for permanent failures
258                info!("timeout waiting for peer service readiness or peer responses");
259            }
260
261            self.min_next_crawl = Instant::now() + constants::MIN_PEER_GET_ADDR_INTERVAL;
262        }
263
264        Ok(more_peers)
265    }
266
267    /// Update the peer set from the network, limiting the fanout to
268    /// `fanout_limit`.
269    ///
270    /// Opportunistically crawl the network on every update call to ensure
271    /// we're actively fetching peers. Continue independently of whether we
272    /// actually receive any peers, but always ask the network for more.
273    ///
274    /// Because requests are load-balanced across existing peers, we can make
275    /// multiple requests concurrently, which will be randomly assigned to
276    /// existing peers, but we don't make too many because update may be
277    /// called while the peer set is already loaded.
278    ///
279    /// See [`update_initial`][Self::update_initial] for more details.
280    ///
281    /// # Correctness
282    ///
283    /// This function does not have a timeout.
284    /// Use [`update_timeout`][Self::update_timeout] instead.
285    async fn update_fanout(
286        &mut self,
287        fanout_limit: Option<usize>,
288    ) -> Result<Option<MorePeers>, BoxError> {
289        let fanout_limit = fanout_limit
290            .map(|fanout_limit| min(fanout_limit, constants::GET_ADDR_FANOUT))
291            .unwrap_or(constants::GET_ADDR_FANOUT);
292        debug!(?fanout_limit, "sending GetPeers requests");
293
294        let mut responses = FuturesUnordered::new();
295        let mut more_peers = None;
296
297        // Launch requests
298        for attempt in 0..fanout_limit {
299            if attempt > 0 {
300                // Let other tasks run, so we're more likely to choose a different peer.
301                //
302                // TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
303                tokio::task::yield_now().await;
304            }
305
306            let peer_service = self.peer_service.ready().await?;
307            responses.push(peer_service.call(Request::Peers));
308        }
309
310        let mut address_book_updates = FuturesUnordered::new();
311
312        // Process responses
313        while let Some(rsp) = responses.next().await {
314            match rsp {
315                Ok(Response::Peers(addrs)) => {
316                    trace!(
317                        addr_count = ?addrs.len(),
318                        ?addrs,
319                        "got response to GetPeers"
320                    );
321                    let addrs = validate_addrs(addrs, DateTime32::now());
322                    address_book_updates.push(self.send_addrs(addrs));
323                    more_peers = Some(MorePeers);
324                }
325                Err(e) => {
326                    // since we do a fanout, and new updates are triggered by
327                    // each demand, we can ignore errors in individual responses
328                    trace!(?e, "got error in GetPeers request");
329                }
330                Ok(_) => unreachable!("Peers requests always return Peers responses"),
331            }
332        }
333
334        // Wait until all the address book updates have finished
335        while let Some(()) = address_book_updates.next().await {}
336
337        Ok(more_peers)
338    }
339
340    /// Add new `addrs` to the address book.
341    async fn send_addrs(&self, addrs: impl IntoIterator<Item = MetaAddr>) {
342        // # Security
343        //
344        // New gossiped peers are rate-limited because:
345        // - Zebra initiates requests for new gossiped peers
346        // - the fanout is limited
347        // - the number of addresses per peer is limited
348        let addrs: Vec<MetaAddrChange> = addrs
349            .into_iter()
350            .map(MetaAddr::new_gossiped_change)
351            .map(|maybe_addr| maybe_addr.expect("Received gossiped peers always have services set"))
352            .collect();
353
354        debug!(count = ?addrs.len(), "sending gossiped addresses to the address book");
355
356        // Don't bother spawning a task if there are no addresses left.
357        if addrs.is_empty() {
358            return;
359        }
360
361        // # Correctness
362        //
363        // Spawn address book accesses on a blocking thread,
364        // to avoid deadlocks (see #1976).
365        //
366        // Extend handles duplicate addresses internally.
367        let address_book = self.address_book.clone();
368        let span = Span::current();
369        tokio::task::spawn_blocking(move || {
370            span.in_scope(|| address_book.lock().unwrap().extend(addrs))
371        })
372        .wait_for_panics()
373        .await
374    }
375
376    /// Returns the next candidate for a connection attempt, if any are available.
377    ///
378    /// Returns peers in reconnection order, based on
379    /// [`AddressBook::reconnection_peers`].
380    ///
381    /// Skips peers that have recently been active, attempted, or failed.
382    ///
383    /// ## Correctness
384    ///
385    /// `AttemptPending` peers will become [`Responded`] if they respond, or
386    /// become `Failed` if they time out or provide a bad response.
387    ///
388    /// Live [`Responded`] peers will stay live if they keep responding, or
389    /// become a reconnection candidate if they stop responding.
390    ///
391    /// ## Security
392    ///
393    /// Zebra resists distributed denial of service attacks by making sure that
394    /// new peer connections are initiated at least
395    /// [`MIN_OUTBOUND_PEER_CONNECTION_INTERVAL`][constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL]
396    /// apart. If `next()` has recently provided a peer, then its future will sleep
397    /// until the rate-limit has passed.
398    ///
399    /// [`Responded`]: crate::PeerAddrState::Responded
400    pub async fn next(&mut self) -> Option<MetaAddr> {
401        // Correctness: To avoid hangs, computation in the critical section should be kept to a minimum.
402        let address_book = self.address_book.clone();
403        let next_peer = move || -> Option<MetaAddr> {
404            let mut guard = address_book.lock().unwrap();
405
406            // Now we have the lock, get the current time
407            let instant_now = std::time::Instant::now();
408            let chrono_now = Utc::now();
409
410            // It's okay to return without sleeping here, because we're returning
411            // `None`. We only need to sleep before yielding an address.
412            let next_peer = guard.reconnection_peers(instant_now, chrono_now).next()?;
413
414            // TODO: only mark the peer as AttemptPending when it is actually used (#1976)
415            //
416            // If the future is dropped before `next` returns, the peer will be marked as AttemptPending,
417            // even if its address is not actually used for a connection.
418            //
419            // We could send a reconnect change to the AddressBookUpdater when the peer is actually used,
420            // but channel order is not guaranteed, so we could accidentally re-use the same peer.
421            let next_peer = MetaAddr::new_reconnect(next_peer.addr);
422            guard.update(next_peer)
423        };
424
425        // Correctness: Spawn address book accesses on a blocking thread, to avoid deadlocks (see #1976).
426        let span = Span::current();
427        let next_peer = tokio::task::spawn_blocking(move || span.in_scope(next_peer))
428            .wait_for_panics()
429            .await?;
430
431        // Security: rate-limit new outbound peer connections
432        sleep_until(self.min_next_handshake).await;
433        self.min_next_handshake = Instant::now() + constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL;
434
435        Some(next_peer)
436    }
437
438    /// Returns the address book for this `CandidateSet`.
439    #[cfg(any(test, feature = "proptest-impl"))]
440    #[allow(dead_code)]
441    pub async fn address_book(&self) -> Arc<std::sync::Mutex<AddressBook>> {
442        self.address_book.clone()
443    }
444}
445
446/// Check new `addrs` before adding them to the address book.
447///
448/// `last_seen_limit` is the maximum permitted last seen time, typically
449/// [`Utc::now`].
450///
451/// If the data in an address is invalid, this function can:
452/// - modify the address data, or
453/// - delete the address.
454///
455/// # Security
456///
457/// Adjusts untrusted last seen times so they are not in the future. This stops
458/// malicious peers keeping all their addresses at the front of the connection
459/// queue. Honest peers with future clock skew also get adjusted.
460///
461/// Rejects all addresses if any calculated times overflow or underflow.
462fn validate_addrs(
463    addrs: impl IntoIterator<Item = MetaAddr>,
464    last_seen_limit: DateTime32,
465) -> impl Iterator<Item = MetaAddr> {
466    // Note: The address book handles duplicate addresses internally,
467    // so we don't need to de-duplicate addresses here.
468
469    // TODO:
470    // We should eventually implement these checks in this function:
471    // - Zebra should ignore peers that are older than 3 weeks (part of #1865)
472    // - Zebra should count back 3 weeks from the newest peer timestamp sent
473    //   by the other peer, to compensate for clock skew
474
475    let mut addrs: Vec<_> = addrs.into_iter().collect();
476
477    limit_last_seen_times(&mut addrs, last_seen_limit);
478
479    addrs.into_iter()
480}
481
482/// Ensure all reported `last_seen` times are less than or equal to `last_seen_limit`.
483///
484/// This will consider all addresses as invalid if trying to offset their
485/// `last_seen` times to be before the limit causes an underflow.
486fn limit_last_seen_times(addrs: &mut Vec<MetaAddr>, last_seen_limit: DateTime32) {
487    let last_seen_times = addrs.iter().map(|meta_addr| {
488        meta_addr
489            .untrusted_last_seen()
490            .expect("unexpected missing last seen: should be provided by deserialization")
491    });
492    let oldest_seen = last_seen_times.clone().min().unwrap_or(DateTime32::MIN);
493    let newest_seen = last_seen_times.max().unwrap_or(DateTime32::MAX);
494
495    // If any time is in the future, adjust all times, to compensate for clock skew on honest peers
496    if newest_seen > last_seen_limit {
497        let offset = newest_seen
498            .checked_duration_since(last_seen_limit)
499            .expect("unexpected underflow: just checked newest_seen is greater");
500
501        // Check for underflow
502        if oldest_seen.checked_sub(offset).is_some() {
503            // No underflow is possible, so apply offset to all addresses
504            for addr in addrs {
505                let last_seen = addr
506                    .untrusted_last_seen()
507                    .expect("unexpected missing last seen: should be provided by deserialization");
508                let last_seen = last_seen
509                    .checked_sub(offset)
510                    .expect("unexpected underflow: just checked oldest_seen");
511
512                addr.set_untrusted_last_seen(last_seen);
513            }
514        } else {
515            // An underflow will occur, so reject all gossiped peers
516            addrs.clear();
517        }
518    }
519}