zebra_network/peer_set/
set.rs

1//! Abstractions that represent "the rest of the network".
2//!
3//! # Implementation
4//!
5//! The [`PeerSet`] implementation is adapted from the one in [tower::Balance][tower-balance].
6//!
7//! As described in Tower's documentation, it:
8//!
9//! > Distributes requests across inner services using the [Power of Two Choices][p2c].
10//! >
11//! > As described in the [Finagle Guide][finagle]:
12//! >
13//! > > The algorithm randomly picks two services from the set of ready endpoints and
14//! > > selects the least loaded of the two. By repeatedly using this strategy, we can
15//! > > expect a manageable upper bound on the maximum load of any server.
16//! > >
17//! > > The maximum load variance between any two servers is bound by `ln(ln(n))` where
18//! > > `n` is the number of servers in the cluster.
19//!
20//! The Power of Two Choices should work well for many network requests, but not all of them.
21//! Some requests should only be made to a subset of connected peers.
22//! For example, a request for a particular inventory item
23//! should be made to a peer that has recently advertised that inventory hash.
24//! Other requests require broadcasts, such as transaction diffusion.
25//!
26//! Implementing this specialized routing logic inside the `PeerSet` -- so that
27//! it continues to abstract away "the rest of the network" into one endpoint --
28//! is not a problem, as the `PeerSet` can simply maintain more information on
29//! its peers and route requests appropriately. However, there is a problem with
30//! maintaining accurate backpressure information, because the `Service` trait
31//! requires that service readiness is independent of the data in the request.
32//!
33//! For this reason, in the future, this code will probably be refactored to
34//! address this backpressure mismatch. One possibility is to refactor the code
35//! so that one entity holds and maintains the peer set and metadata on the
36//! peers, and each "backpressure category" of request is assigned to different
37//! `Service` impls with specialized `poll_ready()` implementations. Another
38//! less-elegant solution (which might be useful as an intermediate step for the
39//! inventory case) is to provide a way to borrow a particular backing service,
40//! say by address.
41//!
42//! [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
43//! [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
44//! [tower-balance]: https://github.com/tower-rs/tower/tree/master/tower/src/balance
45//!
46//! # Behavior During Network Upgrades
47//!
48//! [ZIP-201] specifies peer behavior during network upgrades:
49//!
50//! > With scheduled network upgrades, at the activation height, nodes on each consensus branch
51//! > should disconnect from nodes on other consensus branches and only accept new incoming
52//! > connections from nodes on the same consensus branch.
53//!
54//! Zebra handles this with the help of [`MinimumPeerVersion`], which determines the minimum peer
55//! protocol version to accept based on the current best chain tip height. The minimum version is
56//! therefore automatically increased when the block height reaches a network upgrade's activation
57//! height. The helper type is then used to:
58//!
59//! - cancel handshakes to outdated peers, in `handshake::negotiate_version`
60//! - cancel requests to and disconnect from peers that have become outdated, in
61//!   [`PeerSet::push_unready`]
62//! - disconnect from peers that have just responded and became outdated, in
63//!   [`PeerSet::poll_unready`]
64//! - disconnect from idle peers that have become outdated, in
65//!   [`PeerSet::disconnect_from_outdated_peers`]
66//!
67//! ## Network Coalescence
68//!
69//! [ZIP-201] also specifies how Zcashd behaves [leading up to a activation
70//! height][1]. Since Zcashd limits the number of connections to at most eight
71//! peers, it will gradually migrate its connections to up-to-date peers as it
72//! approaches the activation height.
73//!
74//! The motivation for this behavior is to avoid an abrupt partitioning the network, which can lead
75//! to isolated peers and increases the chance of an eclipse attack on some peers of the network.
76//!
77//! Zebra does not gradually migrate its peers as it approaches an activation height. This is
78//! because Zebra by default can connect to up to 75 peers, as can be seen in [`Config::default`].
79//! Since this is a lot larger than the 8 peers Zcashd connects to, an eclipse attack becomes a lot
80//! more costly to execute, and the probability of an abrupt network partition that isolates peers
81//! is lower.
82//!
83//! Even if a Zebra node is manually configured to connect to a smaller number
84//! of peers, the [`AddressBook`][2] is configured to hold a large number of
85//! peer addresses ([`MAX_ADDRS_IN_ADDRESS_BOOK`][3]). Since the address book
86//! prioritizes addresses it trusts (like those that it has successfully
87//! connected to before), the node should be able to recover and rejoin the
88//! network by itself, as long as the address book is populated with enough
89//! entries.
90//!
91//! [1]: https://zips.z.cash/zip-0201#network-coalescence
92//! [2]: crate::AddressBook
93//! [3]: crate::constants::MAX_ADDRS_IN_ADDRESS_BOOK
94//! [ZIP-201]: https://zips.z.cash/zip-0201
95
96use std::{
97    collections::{HashMap, HashSet},
98    convert,
99    fmt::Debug,
100    marker::PhantomData,
101    net::IpAddr,
102    pin::Pin,
103    sync::Arc,
104    task::{Context, Poll},
105    time::Instant,
106};
107
108use futures::{
109    channel::{mpsc, oneshot},
110    future::{FutureExt, TryFutureExt},
111    prelude::*,
112    stream::FuturesUnordered,
113    task::noop_waker,
114};
115use indexmap::IndexMap;
116use itertools::Itertools;
117use num_integer::div_ceil;
118use tokio::{
119    sync::{broadcast, watch},
120    task::JoinHandle,
121};
122use tower::{
123    discover::{Change, Discover},
124    load::Load,
125    Service,
126};
127
128use zebra_chain::{chain_tip::ChainTip, parameters::Network};
129
130use crate::{
131    address_book::AddressMetrics,
132    constants::MIN_PEER_SET_LOG_INTERVAL,
133    peer::{LoadTrackedClient, MinimumPeerVersion},
134    peer_set::{
135        unready_service::{Error as UnreadyError, UnreadyService},
136        InventoryChange, InventoryRegistry,
137    },
138    protocol::{
139        external::InventoryHash,
140        internal::{Request, Response},
141    },
142    BoxError, Config, PeerError, PeerSocketAddr, SharedPeerError,
143};
144
145#[cfg(test)]
146mod tests;
147
148/// A signal sent by the [`PeerSet`] when it has no ready peers, and gets a request from Zebra.
149///
150/// In response to this signal, the crawler tries to open more peer connections.
151#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
152pub struct MorePeers;
153
154/// A signal sent by the [`PeerSet`] to cancel a [`Client`][1]'s current request
155/// or response.
156///
157/// When it receives this signal, the [`Client`][1] stops processing and exits.
158///
159/// [1]: crate::peer::Client
160#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
161pub struct CancelClientWork;
162
163/// A [`tower::Service`] that abstractly represents "the rest of the network".
164///
165/// # Security
166///
167/// The `Discover::Key` must be the transient remote address of each peer. This
168/// address may only be valid for the duration of a single connection. (For
169/// example, inbound connections have an ephemeral remote port, and proxy
170/// connections have an ephemeral local or proxy port.)
171///
172/// Otherwise, malicious peers could interfere with other peers' `PeerSet` state.
173pub struct PeerSet<D, C>
174where
175    D: Discover<Key = PeerSocketAddr, Service = LoadTrackedClient> + Unpin,
176    D::Error: Into<BoxError>,
177    C: ChainTip,
178{
179    // Peer Tracking: New Peers
180    //
181    /// Provides new and deleted peer [`Change`]s to the peer set,
182    /// via the [`Discover`] trait implementation.
183    discover: D,
184
185    /// A channel that asks the peer crawler task to connect to more peers.
186    demand_signal: mpsc::Sender<MorePeers>,
187
188    /// A watch channel receiver with a copy of banned IP addresses.
189    bans_receiver: watch::Receiver<Arc<IndexMap<IpAddr, std::time::Instant>>>,
190
191    // Peer Tracking: Ready Peers
192    //
193    /// Connected peers that are ready to receive requests from Zebra,
194    /// or send requests to Zebra.
195    ready_services: HashMap<D::Key, D::Service>,
196
197    // Request Routing
198    //
199    /// Stores gossiped inventory hashes from connected peers.
200    ///
201    /// Used to route inventory requests to peers that are likely to have it.
202    inventory_registry: InventoryRegistry,
203
204    // Peer Tracking: Busy Peers
205    //
206    /// Connected peers that are handling a Zebra request,
207    /// or Zebra is handling one of their requests.
208    unready_services: FuturesUnordered<UnreadyService<D::Key, D::Service, Request>>,
209
210    /// Channels used to cancel the request that an unready service is doing.
211    cancel_handles: HashMap<D::Key, oneshot::Sender<CancelClientWork>>,
212
213    // Peer Validation
214    //
215    /// An endpoint to see the minimum peer protocol version in real time.
216    ///
217    /// The minimum version depends on the block height, and [`MinimumPeerVersion`] listens for
218    /// height changes and determines the correct minimum version.
219    minimum_peer_version: MinimumPeerVersion<C>,
220
221    /// The configured limit for inbound and outbound connections.
222    ///
223    /// The peer set panics if this size is exceeded.
224    /// If that happens, our connection limit code has a bug.
225    peerset_total_connection_limit: usize,
226
227    // Background Tasks
228    //
229    /// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks
230    ///
231    /// The join handles passed into the PeerSet are used populate the `guards` member
232    handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,
233
234    /// Unordered set of handles to background tasks associated with the `PeerSet`
235    ///
236    /// These guards are checked for errors as part of `poll_ready` which lets
237    /// the `PeerSet` propagate errors from background tasks back to the user
238    guards: futures::stream::FuturesUnordered<JoinHandle<Result<(), BoxError>>>,
239
240    // Metrics and Logging
241    //
242    /// Address book metrics watch channel.
243    ///
244    /// Used for logging diagnostics.
245    address_metrics: watch::Receiver<AddressMetrics>,
246
247    /// The last time we logged a message about the peer set size
248    last_peer_log: Option<Instant>,
249
250    /// The configured maximum number of peers that can be in the
251    /// peer set per IP, defaults to [`crate::constants::DEFAULT_MAX_CONNS_PER_IP`]
252    max_conns_per_ip: usize,
253
254    /// The network of this peer set.
255    network: Network,
256}
257
258impl<D, C> Drop for PeerSet<D, C>
259where
260    D: Discover<Key = PeerSocketAddr, Service = LoadTrackedClient> + Unpin,
261    D::Error: Into<BoxError>,
262    C: ChainTip,
263{
264    fn drop(&mut self) {
265        // We don't have access to the current task (if any), so we just drop everything we can.
266        let waker = noop_waker();
267        let mut cx = Context::from_waker(&waker);
268
269        self.shut_down_tasks_and_channels(&mut cx);
270    }
271}
272
273impl<D, C> PeerSet<D, C>
274where
275    D: Discover<Key = PeerSocketAddr, Service = LoadTrackedClient> + Unpin,
276    D::Error: Into<BoxError>,
277    C: ChainTip,
278{
279    #[allow(clippy::too_many_arguments)]
280    /// Construct a peerset which uses `discover` to manage peer connections.
281    ///
282    /// Arguments:
283    /// - `config`: configures the peer set connection limit;
284    /// - `discover`: handles peer connects and disconnects;
285    /// - `demand_signal`: requests more peers when all peers are busy (unready);
286    /// - `handle_rx`: receives background task handles,
287    ///   monitors them to make sure they're still running,
288    ///   and shuts down all the tasks as soon as one task exits;
289    /// - `inv_stream`: receives inventory changes from peers,
290    ///   allowing the peer set to direct inventory requests;
291    /// - `bans_receiver`: receives a map of banned IP addresses that should be dropped;
292    /// - `address_book`: when peer set is busy, it logs address book diagnostics.
293    /// - `minimum_peer_version`: endpoint to see the minimum peer protocol version in real time.
294    /// - `max_conns_per_ip`: configured maximum number of peers that can be in the
295    ///   peer set per IP, defaults to the config value or to
296    ///   [`crate::constants::DEFAULT_MAX_CONNS_PER_IP`].
297    pub fn new(
298        config: &Config,
299        discover: D,
300        demand_signal: mpsc::Sender<MorePeers>,
301        handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,
302        inv_stream: broadcast::Receiver<InventoryChange>,
303        bans_receiver: watch::Receiver<Arc<IndexMap<IpAddr, std::time::Instant>>>,
304        address_metrics: watch::Receiver<AddressMetrics>,
305        minimum_peer_version: MinimumPeerVersion<C>,
306        max_conns_per_ip: Option<usize>,
307    ) -> Self {
308        Self {
309            // New peers
310            discover,
311            demand_signal,
312            // Banned peers
313            bans_receiver,
314
315            // Ready peers
316            ready_services: HashMap::new(),
317            // Request Routing
318            inventory_registry: InventoryRegistry::new(inv_stream),
319
320            // Busy peers
321            unready_services: FuturesUnordered::new(),
322            cancel_handles: HashMap::new(),
323
324            // Peer validation
325            minimum_peer_version,
326            peerset_total_connection_limit: config.peerset_total_connection_limit(),
327
328            // Background tasks
329            handle_rx,
330            guards: futures::stream::FuturesUnordered::new(),
331
332            // Metrics
333            last_peer_log: None,
334            address_metrics,
335
336            max_conns_per_ip: max_conns_per_ip.unwrap_or(config.max_connections_per_ip),
337
338            network: config.network.clone(),
339        }
340    }
341
342    /// Check background task handles to make sure they're still running.
343    ///
344    /// Never returns `Ok`.
345    ///
346    /// If any background task exits, shuts down all other background tasks,
347    /// and returns an error. Otherwise, returns `Pending`, and registers a wakeup for
348    /// receiving the background tasks, or the background tasks exiting.
349    fn poll_background_errors(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
350        futures::ready!(self.receive_tasks_if_needed(cx))?;
351
352        // Return Pending if all background tasks are still running.
353        match futures::ready!(Pin::new(&mut self.guards).poll_next(cx)) {
354            Some(res) => {
355                info!(
356                    background_tasks = %self.guards.len(),
357                    "a peer set background task exited, shutting down other peer set tasks"
358                );
359
360                self.shut_down_tasks_and_channels(cx);
361
362                // Flatten the join result and inner result, and return any errors.
363                res.map_err(Into::into)
364                    // TODO: replace with Result::flatten when it stabilises (#70142)
365                    .and_then(convert::identity)?;
366
367                // Turn Ok() task exits into errors.
368                Poll::Ready(Err("a peer set background task exited".into()))
369            }
370
371            None => {
372                self.shut_down_tasks_and_channels(cx);
373                Poll::Ready(Err("all peer set background tasks have exited".into()))
374            }
375        }
376    }
377
378    /// Receive background tasks, if they've been sent on the channel, but not consumed yet.
379    ///
380    /// Returns a result representing the current task state, or `Poll::Pending` if the background
381    /// tasks should be polled again to check their state.
382    fn receive_tasks_if_needed(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
383        if self.guards.is_empty() {
384            // Return Pending if the tasks have not been sent yet.
385            let handles = futures::ready!(Pin::new(&mut self.handle_rx).poll(cx));
386
387            match handles {
388                // The tasks have been sent, but not consumed yet.
389                Ok(handles) => {
390                    // Currently, the peer set treats an empty background task set as an error.
391                    //
392                    // TODO: refactor `handle_rx` and `guards` into an enum
393                    //       for the background task state: Waiting/Running/Shutdown.
394                    assert!(
395                        !handles.is_empty(),
396                        "the peer set requires at least one background task"
397                    );
398
399                    self.guards.extend(handles);
400
401                    Poll::Ready(Ok(()))
402                }
403
404                // The sender was dropped without sending the tasks.
405                Err(_) => Poll::Ready(Err(
406                    "sender did not send peer background tasks before it was dropped".into(),
407                )),
408            }
409        } else {
410            Poll::Ready(Ok(()))
411        }
412    }
413
414    /// Shut down:
415    /// - services by dropping the service lists
416    /// - background tasks via their join handles or cancel handles
417    /// - channels by closing the channel
418    fn shut_down_tasks_and_channels(&mut self, cx: &mut Context<'_>) {
419        // Drop services and cancel their background tasks.
420        self.ready_services = HashMap::new();
421
422        for (_peer_key, handle) in self.cancel_handles.drain() {
423            let _ = handle.send(CancelClientWork);
424        }
425        self.unready_services = FuturesUnordered::new();
426
427        // Close the MorePeers channel for all senders,
428        // so we don't add more peers to a shut down peer set.
429        self.demand_signal.close_channel();
430
431        // Shut down background tasks, ignoring pending polls.
432        self.handle_rx.close();
433        let _ = self.receive_tasks_if_needed(cx);
434        for guard in self.guards.iter() {
435            guard.abort();
436        }
437    }
438
439    /// Check busy peer services for request completion or errors.
440    ///
441    /// Move newly ready services to the ready list if they are for peers with supported protocol
442    /// versions, otherwise they are dropped. Also drop failed services.
443    ///
444    /// Never returns an error.
445    ///
446    /// Returns `Ok(Some(())` if at least one peer became ready, `Poll::Pending` if there are
447    /// unready peers, but none became ready, and `Ok(None)` if the unready peers were empty.
448    ///
449    /// If there are any remaining unready peers, registers a wakeup for the next time one becomes
450    /// ready. If there are no unready peers, doesn't register any wakeups. (Since wakeups come
451    /// from peers, there needs to be at least one peer to register a wakeup.)
452    fn poll_unready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, BoxError>> {
453        let mut result = Poll::Pending;
454
455        // # Correctness
456        //
457        // `poll_next()` must always be called, because `self.unready_services` could have been
458        // empty before the call to `self.poll_ready()`.
459        //
460        // > When new futures are added, `poll_next` must be called in order to begin receiving
461        // > wake-ups for new futures.
462        //
463        // <https://docs.rs/futures/latest/futures/stream/futures_unordered/struct.FuturesUnordered.html>
464        //
465        // Returns Pending if we've finished processing the unready service changes,
466        // but there are still some unready services.
467        loop {
468            // No ready peers left, but there are some unready peers pending.
469            let Poll::Ready(ready_peer) = Pin::new(&mut self.unready_services).poll_next(cx) else {
470                break;
471            };
472
473            match ready_peer {
474                // No unready peers in the list.
475                None => {
476                    // If we've finished processing the unready service changes, and there are no
477                    // unready services left, it doesn't make sense to return Pending, because
478                    // their stream is terminated. But when we add more unready peers and call
479                    // `poll_next()`, its termination status will be reset, and it will receive
480                    // wakeups again.
481                    if result.is_pending() {
482                        result = Poll::Ready(Ok(None));
483                    }
484
485                    break;
486                }
487
488                // Unready -> Ready
489                Some(Ok((key, svc))) => {
490                    trace!(?key, "service became ready");
491
492                    if self.bans_receiver.borrow().contains_key(&key.ip()) {
493                        warn!(?key, "service is banned, dropping service");
494                        std::mem::drop(svc);
495                        continue;
496                    }
497
498                    self.push_ready(true, key, svc);
499
500                    // Return Ok if at least one peer became ready.
501                    result = Poll::Ready(Ok(Some(())));
502                }
503
504                // Unready -> Canceled
505                Some(Err((key, UnreadyError::Canceled))) => {
506                    // A service be canceled because we've connected to the same service twice.
507                    // In that case, there is a cancel handle for the peer address,
508                    // but it belongs to the service for the newer connection.
509                    trace!(
510                        ?key,
511                        duplicate_connection = self.cancel_handles.contains_key(&key),
512                        "service was canceled, dropping service"
513                    );
514                }
515                Some(Err((key, UnreadyError::CancelHandleDropped(_)))) => {
516                    // Similarly, services with dropped cancel handes can have duplicates.
517                    trace!(
518                        ?key,
519                        duplicate_connection = self.cancel_handles.contains_key(&key),
520                        "cancel handle was dropped, dropping service"
521                    );
522                }
523
524                // Unready -> Errored
525                Some(Err((key, UnreadyError::Inner(error)))) => {
526                    debug!(%error, "service failed while unready, dropping service");
527
528                    let cancel = self.cancel_handles.remove(&key);
529                    assert!(cancel.is_some(), "missing cancel handle");
530                }
531            }
532        }
533
534        result
535    }
536
537    /// Checks previously ready peer services for errors.
538    ///
539    /// The only way these peer `Client`s can become unready is when we send them a request,
540    /// because the peer set has exclusive access to send requests to each peer. (If an inbound
541    /// request is in progress, it will be handled, then our request will be sent by the connection
542    /// task.)
543    ///
544    /// Returns `Poll::Ready` if there are some ready peers, and `Poll::Pending` if there are no
545    /// ready peers. Registers a wakeup if any peer has failed due to a disconnection, hang, or protocol error.
546    ///
547    /// # Panics
548    ///
549    /// If any peers somehow became unready without being sent a request. This indicates a bug in the peer set, where requests
550    /// are sent to peers without putting them in `unready_peers`.
551    fn poll_ready_peer_errors(&mut self, cx: &mut Context<'_>) -> Poll<()> {
552        let mut previous = HashMap::new();
553        std::mem::swap(&mut previous, &mut self.ready_services);
554
555        // TODO: consider only checking some peers each poll (for performance reasons),
556        //       but make sure we eventually check all of them.
557        for (key, mut svc) in previous.drain() {
558            let Poll::Ready(peer_readiness) = Pin::new(&mut svc).poll_ready(cx) else {
559                unreachable!(
560                    "unexpected unready peer: peers must be put into the unready_peers list \
561                     after sending them a request"
562                );
563            };
564
565            match peer_readiness {
566                // Still ready, add it back to the list.
567                Ok(()) => {
568                    if self.bans_receiver.borrow().contains_key(&key.ip()) {
569                        debug!(?key, "service ip is banned, dropping service");
570                        std::mem::drop(svc);
571                        continue;
572                    }
573
574                    self.push_ready(false, key, svc)
575                }
576
577                // Ready -> Errored
578                Err(error) => {
579                    debug!(%error, "service failed while ready, dropping service");
580
581                    // Ready services can just be dropped, they don't need any cleanup.
582                    std::mem::drop(svc);
583                }
584            }
585        }
586
587        if self.ready_services.is_empty() {
588            Poll::Pending
589        } else {
590            Poll::Ready(())
591        }
592    }
593
594    /// Returns the number of peer connections Zebra already has with
595    /// the provided IP address
596    ///
597    /// # Performance
598    ///
599    /// This method is `O(connected peers)`, so it should not be called from a loop
600    /// that is already iterating through the peer set.
601    fn num_peers_with_ip(&self, ip: IpAddr) -> usize {
602        self.ready_services
603            .keys()
604            .chain(self.cancel_handles.keys())
605            .filter(|addr| addr.ip() == ip)
606            .count()
607    }
608
609    /// Returns `true` if Zebra is already connected to the IP and port in `addr`.
610    fn has_peer_with_addr(&self, addr: PeerSocketAddr) -> bool {
611        self.ready_services.contains_key(&addr) || self.cancel_handles.contains_key(&addr)
612    }
613
614    /// Processes the entire list of newly inserted or removed services.
615    ///
616    /// Puts inserted services in the unready list.
617    /// Drops removed services, after cancelling any pending requests.
618    ///
619    /// If the peer connector channel is closed, returns an error.
620    ///
621    /// Otherwise, returns `Ok` if it discovered at least one peer, or `Poll::Pending` if it didn't
622    /// discover any peers. Always registers a wakeup for new peers, even when it returns `Ok`.
623    fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
624        // Return pending if there are no peers in the list.
625        let mut result = Poll::Pending;
626
627        loop {
628            // If we've emptied the list, finish looping, otherwise process the new peer.
629            let Poll::Ready(discovered) = Pin::new(&mut self.discover).poll_discover(cx) else {
630                break;
631            };
632
633            // If the change channel has a permanent error, return that error.
634            let change = discovered
635                .ok_or("discovery stream closed")?
636                .map_err(Into::into)?;
637
638            // Otherwise we have successfully processed a peer.
639            result = Poll::Ready(Ok(()));
640
641            // Process each change.
642            match change {
643                Change::Remove(key) => {
644                    trace!(?key, "got Change::Remove from Discover");
645                    self.remove(&key);
646                }
647                Change::Insert(key, svc) => {
648                    // We add peers as unready, so that we:
649                    // - always do the same checks on every ready peer, and
650                    // - check for any errors that happened right after the handshake
651                    trace!(?key, "got Change::Insert from Discover");
652
653                    // # Security
654                    //
655                    // Drop the new peer if we are already connected to it.
656                    // Preferring old connections avoids connection thrashing.
657                    if self.has_peer_with_addr(key) {
658                        std::mem::drop(svc);
659                        continue;
660                    }
661
662                    // # Security
663                    //
664                    // drop the new peer if there are already `max_conns_per_ip` peers with
665                    // the same IP address in the peer set.
666                    if self.num_peers_with_ip(key.ip()) >= self.max_conns_per_ip {
667                        std::mem::drop(svc);
668                        continue;
669                    }
670
671                    self.push_unready(key, svc);
672                }
673            }
674        }
675
676        result
677    }
678
679    /// Checks if the minimum peer version has changed, and disconnects from outdated peers.
680    fn disconnect_from_outdated_peers(&mut self) {
681        if let Some(minimum_version) = self.minimum_peer_version.changed() {
682            // It is ok to drop ready services, they don't need anything cancelled.
683            self.ready_services
684                .retain(|_address, peer| peer.remote_version() >= minimum_version);
685        }
686    }
687
688    /// Takes a ready service by key.
689    fn take_ready_service(&mut self, key: &D::Key) -> Option<D::Service> {
690        if let Some(svc) = self.ready_services.remove(key) {
691            assert!(
692                !self.cancel_handles.contains_key(key),
693                "cancel handles are only used for unready service work"
694            );
695
696            Some(svc)
697        } else {
698            None
699        }
700    }
701
702    /// Remove the service corresponding to `key` from the peer set.
703    ///
704    /// Drops the service, cancelling any pending request or response to that peer.
705    /// If the peer does not exist, does nothing.
706    fn remove(&mut self, key: &D::Key) {
707        if let Some(ready_service) = self.take_ready_service(key) {
708            // A ready service has no work to cancel, so just drop it.
709            std::mem::drop(ready_service);
710        } else if let Some(handle) = self.cancel_handles.remove(key) {
711            // Cancel the work, implicitly dropping the cancel handle.
712            // The service future returns a `Canceled` error,
713            // making `poll_unready` drop the service.
714            let _ = handle.send(CancelClientWork);
715        }
716    }
717
718    /// Adds a ready service to the ready list if it's for a peer with a supported version.
719    /// If `was_unready` is true, also removes the peer's cancel handle.
720    ///
721    /// If the service is for a connection to an outdated peer, the service is dropped.
722    fn push_ready(&mut self, was_unready: bool, key: D::Key, svc: D::Service) {
723        let cancel = self.cancel_handles.remove(&key);
724        assert_eq!(
725            cancel.is_some(),
726            was_unready,
727            "missing or unexpected cancel handle"
728        );
729
730        if svc.remote_version() >= self.minimum_peer_version.current() {
731            self.ready_services.insert(key, svc);
732        } else {
733            std::mem::drop(svc);
734        }
735    }
736
737    /// Adds a busy service to the unready list if it's for a peer with a supported version,
738    /// and adds a cancel handle for the service's current request.
739    ///
740    /// If the service is for a connection to an outdated peer, the request is cancelled and the
741    /// service is dropped.
742    fn push_unready(&mut self, key: D::Key, svc: D::Service) {
743        let peer_version = svc.remote_version();
744        let (tx, rx) = oneshot::channel();
745
746        self.unready_services.push(UnreadyService {
747            key: Some(key),
748            service: Some(svc),
749            cancel: rx,
750            _req: PhantomData,
751        });
752
753        if peer_version >= self.minimum_peer_version.current() {
754            self.cancel_handles.insert(key, tx);
755        } else {
756            // Cancel any request made to the service because it is using an outdated protocol
757            // version.
758            let _ = tx.send(CancelClientWork);
759        }
760    }
761
762    /// Performs P2C on `self.ready_services` to randomly select a less-loaded ready service.
763    fn select_ready_p2c_peer(&self) -> Option<D::Key> {
764        self.select_p2c_peer_from_list(&self.ready_services.keys().copied().collect())
765    }
766
767    /// Performs P2C on `ready_service_list` to randomly select a less-loaded ready service.
768    #[allow(clippy::unwrap_in_result)]
769    fn select_p2c_peer_from_list(&self, ready_service_list: &HashSet<D::Key>) -> Option<D::Key> {
770        match ready_service_list.len() {
771            0 => None,
772            1 => Some(
773                *ready_service_list
774                    .iter()
775                    .next()
776                    .expect("just checked there is one service"),
777            ),
778            len => {
779                // Choose 2 random peers, then return the least loaded of those 2 peers.
780                let (a, b) = {
781                    let idxs = rand::seq::index::sample(&mut rand::thread_rng(), len, 2);
782                    let a = idxs.index(0);
783                    let b = idxs.index(1);
784
785                    let a = *ready_service_list
786                        .iter()
787                        .nth(a)
788                        .expect("sample returns valid indexes");
789                    let b = *ready_service_list
790                        .iter()
791                        .nth(b)
792                        .expect("sample returns valid indexes");
793
794                    (a, b)
795                };
796
797                let a_load = self.query_load(&a).expect("supplied services are ready");
798                let b_load = self.query_load(&b).expect("supplied services are ready");
799
800                let selected = if a_load <= b_load { a } else { b };
801
802                trace!(
803                    a.key = ?a,
804                    a.load = ?a_load,
805                    b.key = ?b,
806                    b.load = ?b_load,
807                    selected = ?selected,
808                    ?len,
809                    "selected service by p2c"
810                );
811
812                Some(selected)
813            }
814        }
815    }
816
817    /// Randomly chooses `max_peers` ready services, ignoring service load.
818    ///
819    /// The chosen peers are unique, but their order is not fully random.
820    fn select_random_ready_peers(&self, max_peers: usize) -> Vec<D::Key> {
821        use rand::seq::IteratorRandom;
822
823        self.ready_services
824            .keys()
825            .copied()
826            .choose_multiple(&mut rand::thread_rng(), max_peers)
827    }
828
829    /// Accesses a ready endpoint by `key` and returns its current load.
830    ///
831    /// Returns `None` if the service is not in the ready service list.
832    fn query_load(&self, key: &D::Key) -> Option<<D::Service as Load>::Metric> {
833        let svc = self.ready_services.get(key);
834        svc.map(|svc| svc.load())
835    }
836
837    /// Routes a request using P2C load-balancing.
838    fn route_p2c(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
839        if let Some(p2c_key) = self.select_ready_p2c_peer() {
840            tracing::trace!(?p2c_key, "routing based on p2c");
841
842            let mut svc = self
843                .take_ready_service(&p2c_key)
844                .expect("selected peer must be ready");
845
846            let fut = svc.call(req);
847            self.push_unready(p2c_key, svc);
848
849            return fut.map_err(Into::into).boxed();
850        }
851
852        async move {
853            // Let other tasks run, so a retry request might get different ready peers.
854            tokio::task::yield_now().await;
855
856            // # Security
857            //
858            // Avoid routing requests to peers that are missing inventory.
859            // If we kept trying doomed requests, peers that are missing our requested inventory
860            // could take up a large amount of our bandwidth and retry limits.
861            Err(SharedPeerError::from(PeerError::NoReadyPeers))
862        }
863        .map_err(Into::into)
864        .boxed()
865    }
866
867    /// Tries to route a request to a ready peer that advertised that inventory,
868    /// falling back to a ready peer that isn't missing the inventory.
869    ///
870    /// If all ready peers are missing the inventory,
871    /// returns a synthetic [`NotFoundRegistry`](PeerError::NotFoundRegistry) error.
872    ///
873    /// Uses P2C to route requests to the least loaded peer in each list.
874    fn route_inv(
875        &mut self,
876        req: Request,
877        hash: InventoryHash,
878    ) -> <Self as tower::Service<Request>>::Future {
879        let advertising_peer_list = self
880            .inventory_registry
881            .advertising_peers(hash)
882            .filter(|&addr| self.ready_services.contains_key(addr))
883            .copied()
884            .collect();
885
886        // # Security
887        //
888        // Choose a random, less-loaded peer with the inventory.
889        //
890        // If we chose the first peer in HashMap order,
891        // peers would be able to influence our choice by switching addresses.
892        // But we need the choice to be random,
893        // so that a peer can't provide all our inventory responses.
894        let peer = self.select_p2c_peer_from_list(&advertising_peer_list);
895
896        if let Some(mut svc) = peer.and_then(|key| self.take_ready_service(&key)) {
897            let peer = peer.expect("just checked peer is Some");
898            tracing::trace!(?hash, ?peer, "routing to a peer which advertised inventory");
899            let fut = svc.call(req);
900            self.push_unready(peer, svc);
901            return fut.map_err(Into::into).boxed();
902        }
903
904        let missing_peer_list: HashSet<PeerSocketAddr> = self
905            .inventory_registry
906            .missing_peers(hash)
907            .copied()
908            .collect();
909        let maybe_peer_list = self
910            .ready_services
911            .keys()
912            .filter(|addr| !missing_peer_list.contains(addr))
913            .copied()
914            .collect();
915
916        // Security: choose a random, less-loaded peer that might have the inventory.
917        let peer = self.select_p2c_peer_from_list(&maybe_peer_list);
918
919        if let Some(mut svc) = peer.and_then(|key| self.take_ready_service(&key)) {
920            let peer = peer.expect("just checked peer is Some");
921            tracing::trace!(?hash, ?peer, "routing to a peer that might have inventory");
922            let fut = svc.call(req);
923            self.push_unready(peer, svc);
924            return fut.map_err(Into::into).boxed();
925        }
926
927        tracing::debug!(
928            ?hash,
929            "all ready peers are missing inventory, failing request"
930        );
931
932        async move {
933            // Let other tasks run, so a retry request might get different ready peers.
934            tokio::task::yield_now().await;
935
936            // # Security
937            //
938            // Avoid routing requests to peers that are missing inventory.
939            // If we kept trying doomed requests, peers that are missing our requested inventory
940            // could take up a large amount of our bandwidth and retry limits.
941            Err(SharedPeerError::from(PeerError::NotFoundRegistry(vec![
942                hash,
943            ])))
944        }
945        .map_err(Into::into)
946        .boxed()
947    }
948
949    /// Routes the same request to up to `max_peers` ready peers, ignoring return values.
950    ///
951    /// `max_peers` must be at least one, and at most the number of ready peers.
952    fn route_multiple(
953        &mut self,
954        req: Request,
955        max_peers: usize,
956    ) -> <Self as tower::Service<Request>>::Future {
957        assert!(
958            max_peers > 0,
959            "requests must be routed to at least one peer"
960        );
961        assert!(
962            max_peers <= self.ready_services.len(),
963            "requests can only be routed to ready peers"
964        );
965
966        // # Security
967        //
968        // We choose peers randomly, ignoring load.
969        // This avoids favouring malicious peers, because peers can influence their own load.
970        //
971        // The order of peers isn't completely random,
972        // but peer request order is not security-sensitive.
973
974        let futs = FuturesUnordered::new();
975        for key in self.select_random_ready_peers(max_peers) {
976            let mut svc = self
977                .take_ready_service(&key)
978                .expect("selected peers are ready");
979            futs.push(svc.call(req.clone()).map_err(|_| ()));
980            self.push_unready(key, svc);
981        }
982
983        async move {
984            let results = futs.collect::<Vec<Result<_, _>>>().await;
985            tracing::debug!(
986                ok.len = results.iter().filter(|r| r.is_ok()).count(),
987                err.len = results.iter().filter(|r| r.is_err()).count(),
988                "sent peer request to multiple peers"
989            );
990            Ok(Response::Nil)
991        }
992        .boxed()
993    }
994
995    /// Broadcasts the same request to lots of ready peers, ignoring return values.
996    fn route_broadcast(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
997        // Broadcasts ignore the response
998        self.route_multiple(req, self.number_of_peers_to_broadcast())
999    }
1000
1001    /// Given a number of ready peers calculate to how many of them Zebra will
1002    /// actually send the request to. Return this number.
1003    pub(crate) fn number_of_peers_to_broadcast(&self) -> usize {
1004        if self.network.is_regtest() {
1005            // In regtest, we broadcast to all peers, so that we can test the
1006            // peer set with a small number of peers.
1007            self.ready_services.len()
1008        } else {
1009            // We are currently sending broadcast messages to a third of the total peers.
1010            const PEER_FRACTION_TO_BROADCAST: usize = 3;
1011
1012            // Round up, so that if we have one ready peer, it gets the request.
1013            div_ceil(self.ready_services.len(), PEER_FRACTION_TO_BROADCAST)
1014        }
1015    }
1016
1017    /// Returns the list of addresses in the peer set.
1018    fn peer_set_addresses(&self) -> Vec<PeerSocketAddr> {
1019        self.ready_services
1020            .keys()
1021            .chain(self.cancel_handles.keys())
1022            .cloned()
1023            .collect()
1024    }
1025
1026    /// Logs the peer set size, and any potential connectivity issues.
1027    fn log_peer_set_size(&mut self) {
1028        let ready_services_len = self.ready_services.len();
1029        let unready_services_len = self.unready_services.len();
1030        trace!(ready_peers = ?ready_services_len, unready_peers = ?unready_services_len);
1031
1032        let now = Instant::now();
1033
1034        // These logs are designed to be human-readable in a terminal, at the
1035        // default Zebra log level. If you need to know the peer set size for
1036        // every request, use the trace-level logs, or the metrics exporter.
1037        if let Some(last_peer_log) = self.last_peer_log {
1038            // Avoid duplicate peer set logs
1039            if now.duration_since(last_peer_log) < MIN_PEER_SET_LOG_INTERVAL {
1040                return;
1041            }
1042        } else {
1043            // Suppress initial logs until the peer set has started up.
1044            // There can be multiple initial requests before the first peer is
1045            // ready.
1046            self.last_peer_log = Some(now);
1047            return;
1048        }
1049
1050        self.last_peer_log = Some(now);
1051
1052        // Log potential duplicate connections.
1053        let peers = self.peer_set_addresses();
1054
1055        // Check for duplicates by address and port: these are unexpected and represent a bug.
1056        let duplicates: Vec<PeerSocketAddr> = peers.iter().duplicates().cloned().collect();
1057
1058        let mut peer_counts = peers.iter().counts();
1059        peer_counts.retain(|peer, _count| duplicates.contains(peer));
1060
1061        if !peer_counts.is_empty() {
1062            let duplicate_connections: usize = peer_counts.values().sum();
1063
1064            warn!(
1065                ?duplicate_connections,
1066                duplicated_peers = ?peer_counts.len(),
1067                peers = ?peers.len(),
1068                "duplicate peer connections in peer set"
1069            );
1070        }
1071
1072        // Check for duplicates by address: these can happen if there are multiple nodes
1073        // behind a NAT or on a single server.
1074        let peers: Vec<IpAddr> = peers.iter().map(|addr| addr.ip()).collect();
1075        let duplicates: Vec<IpAddr> = peers.iter().duplicates().cloned().collect();
1076
1077        let mut peer_counts = peers.iter().counts();
1078        peer_counts.retain(|peer, _count| duplicates.contains(peer));
1079
1080        if !peer_counts.is_empty() {
1081            let duplicate_connections: usize = peer_counts.values().sum();
1082
1083            info!(
1084                ?duplicate_connections,
1085                duplicated_peers = ?peer_counts.len(),
1086                peers = ?peers.len(),
1087                "duplicate IP addresses in peer set"
1088            );
1089        }
1090
1091        // Only log connectivity warnings if all our peers are busy (or there are no peers).
1092        if ready_services_len > 0 {
1093            return;
1094        }
1095
1096        let address_metrics = *self.address_metrics.borrow();
1097        if unready_services_len == 0 {
1098            warn!(
1099                ?address_metrics,
1100                "network request with no peer connections. Hint: check your network connection"
1101            );
1102        } else {
1103            info!(?address_metrics, "network request with no ready peers: finding more peers, waiting for {} peers to answer requests",
1104                  unready_services_len);
1105        }
1106    }
1107
1108    /// Updates the peer set metrics.
1109    ///
1110    /// # Panics
1111    ///
1112    /// If the peer set size exceeds the connection limit.
1113    fn update_metrics(&self) {
1114        let num_ready = self.ready_services.len();
1115        let num_unready = self.unready_services.len();
1116        let num_peers = num_ready + num_unready;
1117        metrics::gauge!("pool.num_ready").set(num_ready as f64);
1118        metrics::gauge!("pool.num_unready").set(num_unready as f64);
1119        metrics::gauge!("zcash.net.peers").set(num_peers as f64);
1120
1121        // Security: make sure we haven't exceeded the connection limit
1122        if num_peers > self.peerset_total_connection_limit {
1123            let address_metrics = *self.address_metrics.borrow();
1124            panic!(
1125                "unexpectedly exceeded configured peer set connection limit: \n\
1126                 peers: {num_peers:?}, ready: {num_ready:?}, unready: {num_unready:?}, \n\
1127                 address_metrics: {address_metrics:?}",
1128            );
1129        }
1130    }
1131}
1132
1133impl<D, C> Service<Request> for PeerSet<D, C>
1134where
1135    D: Discover<Key = PeerSocketAddr, Service = LoadTrackedClient> + Unpin,
1136    D::Error: Into<BoxError>,
1137    C: ChainTip,
1138{
1139    type Response = Response;
1140    type Error = BoxError;
1141    type Future =
1142        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1143
1144    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1145        // Update service and peer statuses.
1146        //
1147        // # Correctness
1148        //
1149        // All of the futures that receive a context from this method can wake the peer set buffer
1150        // task. If there are no ready peers, and no new peers, network requests will pause until:
1151        // - an unready peer becomes ready, or
1152        // - a new peer arrives.
1153
1154        // Check for new peers, and register a task wakeup when the next new peers arrive. New peers
1155        // can be infrequent if our connection slots are full, or we're connected to all
1156        // available/useful peers.
1157        let _poll_pending_or_ready: Poll<()> = self.poll_discover(cx)?;
1158
1159        // These tasks don't provide new peers or newly ready peers.
1160        let _poll_pending: Poll<()> = self.poll_background_errors(cx)?;
1161        let _poll_pending_or_ready: Poll<()> = self.inventory_registry.poll_inventory(cx)?;
1162
1163        // Check for newly ready peers, including newly added peers (which are added as unready).
1164        // So it needs to run after `poll_discover()`. Registers a wakeup if there are any unready
1165        // peers.
1166        //
1167        // Each connected peer should become ready within a few minutes, or timeout, close the
1168        // connection, and release its connection slot.
1169        //
1170        // TODO: drop peers that overload us with inbound messages and never become ready (#7822)
1171        let _poll_pending_or_ready: Poll<Option<()>> = self.poll_unready(cx)?;
1172
1173        // Cleanup and metrics.
1174
1175        // Only checks the versions of ready peers, so it needs to run after `poll_unready()`.
1176        self.disconnect_from_outdated_peers();
1177
1178        // These metrics should run last, to report the most up-to-date information.
1179        self.log_peer_set_size();
1180        self.update_metrics();
1181
1182        // Check for failures in ready peers, removing newly errored or disconnected peers.
1183        // So it needs to run after `poll_unready()`.
1184        let ready_peers: Poll<()> = self.poll_ready_peer_errors(cx);
1185
1186        if ready_peers.is_pending() {
1187            // # Correctness
1188            //
1189            // If the channel is full, drop the demand signal rather than waiting. If we waited
1190            // here, the crawler could deadlock sending a request to fetch more peers, because it
1191            // also empties the channel.
1192            trace!("no ready services, sending demand signal");
1193            let _ = self.demand_signal.try_send(MorePeers);
1194
1195            // # Correctness
1196            //
1197            // The current task must be scheduled for wakeup every time we return `Poll::Pending`.
1198            //
1199            // As long as there are unready or new peers, this task will run, because:
1200            // - `poll_discover` schedules this task for wakeup when new peers arrive.
1201            // - if there are unready peers, `poll_unready` or `poll_ready_peers` schedule this
1202            //   task for wakeup when peer services become ready.
1203            //
1204            // To avoid peers blocking on a full peer status/error channel:
1205            // - `poll_background_errors` schedules this task for wakeup when the peer status
1206            //   update task exits.
1207            Poll::Pending
1208        } else {
1209            Poll::Ready(Ok(()))
1210        }
1211    }
1212
1213    fn call(&mut self, req: Request) -> Self::Future {
1214        let fut = match req {
1215            // Only do inventory-aware routing on individual items.
1216            Request::BlocksByHash(ref hashes) if hashes.len() == 1 => {
1217                let hash = InventoryHash::from(*hashes.iter().next().unwrap());
1218                self.route_inv(req, hash)
1219            }
1220            Request::TransactionsById(ref hashes) if hashes.len() == 1 => {
1221                let hash = InventoryHash::from(*hashes.iter().next().unwrap());
1222                self.route_inv(req, hash)
1223            }
1224
1225            // Broadcast advertisements to lots of peers
1226            Request::AdvertiseTransactionIds(_) => self.route_broadcast(req),
1227            Request::AdvertiseBlock(_) => self.route_broadcast(req),
1228
1229            // Choose a random less-loaded peer for all other requests
1230            _ => self.route_p2c(req),
1231        };
1232        self.update_metrics();
1233
1234        fut
1235    }
1236}