zebra_network/peer_set/set.rs
1//! Abstractions that represent "the rest of the network".
2//!
3//! # Implementation
4//!
5//! The [`PeerSet`] implementation is adapted from the one in [tower::Balance][tower-balance].
6//!
7//! As described in Tower's documentation, it:
8//!
9//! > Distributes requests across inner services using the [Power of Two Choices][p2c].
10//! >
11//! > As described in the [Finagle Guide][finagle]:
12//! >
13//! > > The algorithm randomly picks two services from the set of ready endpoints and
14//! > > selects the least loaded of the two. By repeatedly using this strategy, we can
15//! > > expect a manageable upper bound on the maximum load of any server.
16//! > >
17//! > > The maximum load variance between any two servers is bound by `ln(ln(n))` where
18//! > > `n` is the number of servers in the cluster.
19//!
20//! The Power of Two Choices should work well for many network requests, but not all of them.
21//! Some requests should only be made to a subset of connected peers.
22//! For example, a request for a particular inventory item
23//! should be made to a peer that has recently advertised that inventory hash.
24//! Other requests require broadcasts, such as transaction diffusion.
25//!
26//! Implementing this specialized routing logic inside the `PeerSet` -- so that
27//! it continues to abstract away "the rest of the network" into one endpoint --
28//! is not a problem, as the `PeerSet` can simply maintain more information on
29//! its peers and route requests appropriately. However, there is a problem with
30//! maintaining accurate backpressure information, because the `Service` trait
31//! requires that service readiness is independent of the data in the request.
32//!
33//! For this reason, in the future, this code will probably be refactored to
34//! address this backpressure mismatch. One possibility is to refactor the code
35//! so that one entity holds and maintains the peer set and metadata on the
36//! peers, and each "backpressure category" of request is assigned to different
37//! `Service` impls with specialized `poll_ready()` implementations. Another
38//! less-elegant solution (which might be useful as an intermediate step for the
39//! inventory case) is to provide a way to borrow a particular backing service,
40//! say by address.
41//!
42//! [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
43//! [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
44//! [tower-balance]: https://github.com/tower-rs/tower/tree/master/tower/src/balance
45//!
46//! # Behavior During Network Upgrades
47//!
48//! [ZIP-201] specifies peer behavior during network upgrades:
49//!
50//! > With scheduled network upgrades, at the activation height, nodes on each consensus branch
51//! > should disconnect from nodes on other consensus branches and only accept new incoming
52//! > connections from nodes on the same consensus branch.
53//!
54//! Zebra handles this with the help of [`MinimumPeerVersion`], which determines the minimum peer
55//! protocol version to accept based on the current best chain tip height. The minimum version is
56//! therefore automatically increased when the block height reaches a network upgrade's activation
57//! height. The helper type is then used to:
58//!
59//! - cancel handshakes to outdated peers, in `handshake::negotiate_version`
60//! - cancel requests to and disconnect from peers that have become outdated, in
61//! [`PeerSet::push_unready`]
62//! - disconnect from peers that have just responded and became outdated, in
63//! [`PeerSet::poll_unready`]
64//! - disconnect from idle peers that have become outdated, in
65//! [`PeerSet::disconnect_from_outdated_peers`]
66//!
67//! ## Network Coalescence
68//!
69//! [ZIP-201] also specifies how Zcashd behaves [leading up to a activation
70//! height][1]. Since Zcashd limits the number of connections to at most eight
71//! peers, it will gradually migrate its connections to up-to-date peers as it
72//! approaches the activation height.
73//!
74//! The motivation for this behavior is to avoid an abrupt partitioning the network, which can lead
75//! to isolated peers and increases the chance of an eclipse attack on some peers of the network.
76//!
77//! Zebra does not gradually migrate its peers as it approaches an activation height. This is
78//! because Zebra by default can connect to up to 75 peers, as can be seen in [`Config::default`].
79//! Since this is a lot larger than the 8 peers Zcashd connects to, an eclipse attack becomes a lot
80//! more costly to execute, and the probability of an abrupt network partition that isolates peers
81//! is lower.
82//!
83//! Even if a Zebra node is manually configured to connect to a smaller number
84//! of peers, the [`AddressBook`][2] is configured to hold a large number of
85//! peer addresses ([`MAX_ADDRS_IN_ADDRESS_BOOK`][3]). Since the address book
86//! prioritizes addresses it trusts (like those that it has successfully
87//! connected to before), the node should be able to recover and rejoin the
88//! network by itself, as long as the address book is populated with enough
89//! entries.
90//!
91//! [1]: https://zips.z.cash/zip-0201#network-coalescence
92//! [2]: crate::AddressBook
93//! [3]: crate::constants::MAX_ADDRS_IN_ADDRESS_BOOK
94//! [ZIP-201]: https://zips.z.cash/zip-0201
95
96use std::{
97 collections::{HashMap, HashSet},
98 convert,
99 fmt::Debug,
100 marker::PhantomData,
101 net::IpAddr,
102 pin::Pin,
103 sync::Arc,
104 task::{Context, Poll},
105 time::Instant,
106};
107
108use futures::{
109 channel::{mpsc, oneshot},
110 future::{FutureExt, TryFutureExt},
111 prelude::*,
112 stream::FuturesUnordered,
113 task::noop_waker,
114};
115use indexmap::IndexMap;
116use itertools::Itertools;
117use num_integer::div_ceil;
118use tokio::{
119 sync::{broadcast, watch},
120 task::JoinHandle,
121};
122use tower::{
123 discover::{Change, Discover},
124 load::Load,
125 Service,
126};
127
128use zebra_chain::chain_tip::ChainTip;
129
130use crate::{
131 address_book::AddressMetrics,
132 constants::MIN_PEER_SET_LOG_INTERVAL,
133 peer::{LoadTrackedClient, MinimumPeerVersion},
134 peer_set::{
135 unready_service::{Error as UnreadyError, UnreadyService},
136 InventoryChange, InventoryRegistry,
137 },
138 protocol::{
139 external::InventoryHash,
140 internal::{Request, Response},
141 },
142 BoxError, Config, PeerError, PeerSocketAddr, SharedPeerError,
143};
144
145#[cfg(test)]
146mod tests;
147
148/// A signal sent by the [`PeerSet`] when it has no ready peers, and gets a request from Zebra.
149///
150/// In response to this signal, the crawler tries to open more peer connections.
151#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
152pub struct MorePeers;
153
154/// A signal sent by the [`PeerSet`] to cancel a [`Client`][1]'s current request
155/// or response.
156///
157/// When it receives this signal, the [`Client`][1] stops processing and exits.
158///
159/// [1]: crate::peer::Client
160#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
161pub struct CancelClientWork;
162
163/// A [`tower::Service`] that abstractly represents "the rest of the network".
164///
165/// # Security
166///
167/// The `Discover::Key` must be the transient remote address of each peer. This
168/// address may only be valid for the duration of a single connection. (For
169/// example, inbound connections have an ephemeral remote port, and proxy
170/// connections have an ephemeral local or proxy port.)
171///
172/// Otherwise, malicious peers could interfere with other peers' `PeerSet` state.
173pub struct PeerSet<D, C>
174where
175 D: Discover<Key = PeerSocketAddr, Service = LoadTrackedClient> + Unpin,
176 D::Error: Into<BoxError>,
177 C: ChainTip,
178{
179 // Peer Tracking: New Peers
180 //
181 /// Provides new and deleted peer [`Change`]s to the peer set,
182 /// via the [`Discover`] trait implementation.
183 discover: D,
184
185 /// A channel that asks the peer crawler task to connect to more peers.
186 demand_signal: mpsc::Sender<MorePeers>,
187
188 /// A watch channel receiver with a copy of banned IP addresses.
189 bans_receiver: watch::Receiver<Arc<IndexMap<IpAddr, std::time::Instant>>>,
190
191 // Peer Tracking: Ready Peers
192 //
193 /// Connected peers that are ready to receive requests from Zebra,
194 /// or send requests to Zebra.
195 ready_services: HashMap<D::Key, D::Service>,
196
197 // Request Routing
198 //
199 /// Stores gossiped inventory hashes from connected peers.
200 ///
201 /// Used to route inventory requests to peers that are likely to have it.
202 inventory_registry: InventoryRegistry,
203
204 // Peer Tracking: Busy Peers
205 //
206 /// Connected peers that are handling a Zebra request,
207 /// or Zebra is handling one of their requests.
208 unready_services: FuturesUnordered<UnreadyService<D::Key, D::Service, Request>>,
209
210 /// Channels used to cancel the request that an unready service is doing.
211 cancel_handles: HashMap<D::Key, oneshot::Sender<CancelClientWork>>,
212
213 // Peer Validation
214 //
215 /// An endpoint to see the minimum peer protocol version in real time.
216 ///
217 /// The minimum version depends on the block height, and [`MinimumPeerVersion`] listens for
218 /// height changes and determines the correct minimum version.
219 minimum_peer_version: MinimumPeerVersion<C>,
220
221 /// The configured limit for inbound and outbound connections.
222 ///
223 /// The peer set panics if this size is exceeded.
224 /// If that happens, our connection limit code has a bug.
225 peerset_total_connection_limit: usize,
226
227 // Background Tasks
228 //
229 /// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks
230 ///
231 /// The join handles passed into the PeerSet are used populate the `guards` member
232 handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,
233
234 /// Unordered set of handles to background tasks associated with the `PeerSet`
235 ///
236 /// These guards are checked for errors as part of `poll_ready` which lets
237 /// the `PeerSet` propagate errors from background tasks back to the user
238 guards: futures::stream::FuturesUnordered<JoinHandle<Result<(), BoxError>>>,
239
240 // Metrics and Logging
241 //
242 /// Address book metrics watch channel.
243 ///
244 /// Used for logging diagnostics.
245 address_metrics: watch::Receiver<AddressMetrics>,
246
247 /// The last time we logged a message about the peer set size
248 last_peer_log: Option<Instant>,
249
250 /// The configured maximum number of peers that can be in the
251 /// peer set per IP, defaults to [`crate::constants::DEFAULT_MAX_CONNS_PER_IP`]
252 max_conns_per_ip: usize,
253}
254
255impl<D, C> Drop for PeerSet<D, C>
256where
257 D: Discover<Key = PeerSocketAddr, Service = LoadTrackedClient> + Unpin,
258 D::Error: Into<BoxError>,
259 C: ChainTip,
260{
261 fn drop(&mut self) {
262 // We don't have access to the current task (if any), so we just drop everything we can.
263 let waker = noop_waker();
264 let mut cx = Context::from_waker(&waker);
265
266 self.shut_down_tasks_and_channels(&mut cx);
267 }
268}
269
270impl<D, C> PeerSet<D, C>
271where
272 D: Discover<Key = PeerSocketAddr, Service = LoadTrackedClient> + Unpin,
273 D::Error: Into<BoxError>,
274 C: ChainTip,
275{
276 #[allow(clippy::too_many_arguments)]
277 /// Construct a peerset which uses `discover` to manage peer connections.
278 ///
279 /// Arguments:
280 /// - `config`: configures the peer set connection limit;
281 /// - `discover`: handles peer connects and disconnects;
282 /// - `demand_signal`: requests more peers when all peers are busy (unready);
283 /// - `handle_rx`: receives background task handles,
284 /// monitors them to make sure they're still running,
285 /// and shuts down all the tasks as soon as one task exits;
286 /// - `inv_stream`: receives inventory changes from peers,
287 /// allowing the peer set to direct inventory requests;
288 /// - `bans_receiver`: receives a map of banned IP addresses that should be dropped;
289 /// - `address_book`: when peer set is busy, it logs address book diagnostics.
290 /// - `minimum_peer_version`: endpoint to see the minimum peer protocol version in real time.
291 /// - `max_conns_per_ip`: configured maximum number of peers that can be in the
292 /// peer set per IP, defaults to the config value or to
293 /// [`crate::constants::DEFAULT_MAX_CONNS_PER_IP`].
294 pub fn new(
295 config: &Config,
296 discover: D,
297 demand_signal: mpsc::Sender<MorePeers>,
298 handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,
299 inv_stream: broadcast::Receiver<InventoryChange>,
300 bans_receiver: watch::Receiver<Arc<IndexMap<IpAddr, std::time::Instant>>>,
301 address_metrics: watch::Receiver<AddressMetrics>,
302 minimum_peer_version: MinimumPeerVersion<C>,
303 max_conns_per_ip: Option<usize>,
304 ) -> Self {
305 Self {
306 // New peers
307 discover,
308 demand_signal,
309 // Banned peers
310 bans_receiver,
311
312 // Ready peers
313 ready_services: HashMap::new(),
314 // Request Routing
315 inventory_registry: InventoryRegistry::new(inv_stream),
316
317 // Busy peers
318 unready_services: FuturesUnordered::new(),
319 cancel_handles: HashMap::new(),
320
321 // Peer validation
322 minimum_peer_version,
323 peerset_total_connection_limit: config.peerset_total_connection_limit(),
324
325 // Background tasks
326 handle_rx,
327 guards: futures::stream::FuturesUnordered::new(),
328
329 // Metrics
330 last_peer_log: None,
331 address_metrics,
332
333 max_conns_per_ip: max_conns_per_ip.unwrap_or(config.max_connections_per_ip),
334 }
335 }
336
337 /// Check background task handles to make sure they're still running.
338 ///
339 /// Never returns `Ok`.
340 ///
341 /// If any background task exits, shuts down all other background tasks,
342 /// and returns an error. Otherwise, returns `Pending`, and registers a wakeup for
343 /// receiving the background tasks, or the background tasks exiting.
344 fn poll_background_errors(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
345 futures::ready!(self.receive_tasks_if_needed(cx))?;
346
347 // Return Pending if all background tasks are still running.
348 match futures::ready!(Pin::new(&mut self.guards).poll_next(cx)) {
349 Some(res) => {
350 info!(
351 background_tasks = %self.guards.len(),
352 "a peer set background task exited, shutting down other peer set tasks"
353 );
354
355 self.shut_down_tasks_and_channels(cx);
356
357 // Flatten the join result and inner result, and return any errors.
358 res.map_err(Into::into)
359 // TODO: replace with Result::flatten when it stabilises (#70142)
360 .and_then(convert::identity)?;
361
362 // Turn Ok() task exits into errors.
363 Poll::Ready(Err("a peer set background task exited".into()))
364 }
365
366 None => {
367 self.shut_down_tasks_and_channels(cx);
368 Poll::Ready(Err("all peer set background tasks have exited".into()))
369 }
370 }
371 }
372
373 /// Receive background tasks, if they've been sent on the channel, but not consumed yet.
374 ///
375 /// Returns a result representing the current task state, or `Poll::Pending` if the background
376 /// tasks should be polled again to check their state.
377 fn receive_tasks_if_needed(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
378 if self.guards.is_empty() {
379 // Return Pending if the tasks have not been sent yet.
380 let handles = futures::ready!(Pin::new(&mut self.handle_rx).poll(cx));
381
382 match handles {
383 // The tasks have been sent, but not consumed yet.
384 Ok(handles) => {
385 // Currently, the peer set treats an empty background task set as an error.
386 //
387 // TODO: refactor `handle_rx` and `guards` into an enum
388 // for the background task state: Waiting/Running/Shutdown.
389 assert!(
390 !handles.is_empty(),
391 "the peer set requires at least one background task"
392 );
393
394 self.guards.extend(handles);
395
396 Poll::Ready(Ok(()))
397 }
398
399 // The sender was dropped without sending the tasks.
400 Err(_) => Poll::Ready(Err(
401 "sender did not send peer background tasks before it was dropped".into(),
402 )),
403 }
404 } else {
405 Poll::Ready(Ok(()))
406 }
407 }
408
409 /// Shut down:
410 /// - services by dropping the service lists
411 /// - background tasks via their join handles or cancel handles
412 /// - channels by closing the channel
413 fn shut_down_tasks_and_channels(&mut self, cx: &mut Context<'_>) {
414 // Drop services and cancel their background tasks.
415 self.ready_services = HashMap::new();
416
417 for (_peer_key, handle) in self.cancel_handles.drain() {
418 let _ = handle.send(CancelClientWork);
419 }
420 self.unready_services = FuturesUnordered::new();
421
422 // Close the MorePeers channel for all senders,
423 // so we don't add more peers to a shut down peer set.
424 self.demand_signal.close_channel();
425
426 // Shut down background tasks, ignoring pending polls.
427 self.handle_rx.close();
428 let _ = self.receive_tasks_if_needed(cx);
429 for guard in self.guards.iter() {
430 guard.abort();
431 }
432 }
433
434 /// Check busy peer services for request completion or errors.
435 ///
436 /// Move newly ready services to the ready list if they are for peers with supported protocol
437 /// versions, otherwise they are dropped. Also drop failed services.
438 ///
439 /// Never returns an error.
440 ///
441 /// Returns `Ok(Some(())` if at least one peer became ready, `Poll::Pending` if there are
442 /// unready peers, but none became ready, and `Ok(None)` if the unready peers were empty.
443 ///
444 /// If there are any remaining unready peers, registers a wakeup for the next time one becomes
445 /// ready. If there are no unready peers, doesn't register any wakeups. (Since wakeups come
446 /// from peers, there needs to be at least one peer to register a wakeup.)
447 fn poll_unready(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, BoxError>> {
448 let mut result = Poll::Pending;
449
450 // # Correctness
451 //
452 // `poll_next()` must always be called, because `self.unready_services` could have been
453 // empty before the call to `self.poll_ready()`.
454 //
455 // > When new futures are added, `poll_next` must be called in order to begin receiving
456 // > wake-ups for new futures.
457 //
458 // <https://docs.rs/futures/latest/futures/stream/futures_unordered/struct.FuturesUnordered.html>
459 //
460 // Returns Pending if we've finished processing the unready service changes,
461 // but there are still some unready services.
462 loop {
463 // No ready peers left, but there are some unready peers pending.
464 let Poll::Ready(ready_peer) = Pin::new(&mut self.unready_services).poll_next(cx) else {
465 break;
466 };
467
468 match ready_peer {
469 // No unready peers in the list.
470 None => {
471 // If we've finished processing the unready service changes, and there are no
472 // unready services left, it doesn't make sense to return Pending, because
473 // their stream is terminated. But when we add more unready peers and call
474 // `poll_next()`, its termination status will be reset, and it will receive
475 // wakeups again.
476 if result.is_pending() {
477 result = Poll::Ready(Ok(None));
478 }
479
480 break;
481 }
482
483 // Unready -> Ready
484 Some(Ok((key, svc))) => {
485 trace!(?key, "service became ready");
486
487 if self.bans_receiver.borrow().contains_key(&key.ip()) {
488 warn!(?key, "service is banned, dropping service");
489 std::mem::drop(svc);
490 continue;
491 }
492
493 self.push_ready(true, key, svc);
494
495 // Return Ok if at least one peer became ready.
496 result = Poll::Ready(Ok(Some(())));
497 }
498
499 // Unready -> Canceled
500 Some(Err((key, UnreadyError::Canceled))) => {
501 // A service be canceled because we've connected to the same service twice.
502 // In that case, there is a cancel handle for the peer address,
503 // but it belongs to the service for the newer connection.
504 trace!(
505 ?key,
506 duplicate_connection = self.cancel_handles.contains_key(&key),
507 "service was canceled, dropping service"
508 );
509 }
510 Some(Err((key, UnreadyError::CancelHandleDropped(_)))) => {
511 // Similarly, services with dropped cancel handes can have duplicates.
512 trace!(
513 ?key,
514 duplicate_connection = self.cancel_handles.contains_key(&key),
515 "cancel handle was dropped, dropping service"
516 );
517 }
518
519 // Unready -> Errored
520 Some(Err((key, UnreadyError::Inner(error)))) => {
521 debug!(%error, "service failed while unready, dropping service");
522
523 let cancel = self.cancel_handles.remove(&key);
524 assert!(cancel.is_some(), "missing cancel handle");
525 }
526 }
527 }
528
529 result
530 }
531
532 /// Checks previously ready peer services for errors.
533 ///
534 /// The only way these peer `Client`s can become unready is when we send them a request,
535 /// because the peer set has exclusive access to send requests to each peer. (If an inbound
536 /// request is in progress, it will be handled, then our request will be sent by the connection
537 /// task.)
538 ///
539 /// Returns `Poll::Ready` if there are some ready peers, and `Poll::Pending` if there are no
540 /// ready peers. Registers a wakeup if any peer has failed due to a disconnection, hang, or protocol error.
541 ///
542 /// # Panics
543 ///
544 /// If any peers somehow became unready without being sent a request. This indicates a bug in the peer set, where requests
545 /// are sent to peers without putting them in `unready_peers`.
546 fn poll_ready_peer_errors(&mut self, cx: &mut Context<'_>) -> Poll<()> {
547 let mut previous = HashMap::new();
548 std::mem::swap(&mut previous, &mut self.ready_services);
549
550 // TODO: consider only checking some peers each poll (for performance reasons),
551 // but make sure we eventually check all of them.
552 for (key, mut svc) in previous.drain() {
553 let Poll::Ready(peer_readiness) = Pin::new(&mut svc).poll_ready(cx) else {
554 unreachable!(
555 "unexpected unready peer: peers must be put into the unready_peers list \
556 after sending them a request"
557 );
558 };
559
560 match peer_readiness {
561 // Still ready, add it back to the list.
562 Ok(()) => {
563 if self.bans_receiver.borrow().contains_key(&key.ip()) {
564 debug!(?key, "service ip is banned, dropping service");
565 std::mem::drop(svc);
566 continue;
567 }
568
569 self.push_ready(false, key, svc)
570 }
571
572 // Ready -> Errored
573 Err(error) => {
574 debug!(%error, "service failed while ready, dropping service");
575
576 // Ready services can just be dropped, they don't need any cleanup.
577 std::mem::drop(svc);
578 }
579 }
580 }
581
582 if self.ready_services.is_empty() {
583 Poll::Pending
584 } else {
585 Poll::Ready(())
586 }
587 }
588
589 /// Returns the number of peer connections Zebra already has with
590 /// the provided IP address
591 ///
592 /// # Performance
593 ///
594 /// This method is `O(connected peers)`, so it should not be called from a loop
595 /// that is already iterating through the peer set.
596 fn num_peers_with_ip(&self, ip: IpAddr) -> usize {
597 self.ready_services
598 .keys()
599 .chain(self.cancel_handles.keys())
600 .filter(|addr| addr.ip() == ip)
601 .count()
602 }
603
604 /// Returns `true` if Zebra is already connected to the IP and port in `addr`.
605 fn has_peer_with_addr(&self, addr: PeerSocketAddr) -> bool {
606 self.ready_services.contains_key(&addr) || self.cancel_handles.contains_key(&addr)
607 }
608
609 /// Processes the entire list of newly inserted or removed services.
610 ///
611 /// Puts inserted services in the unready list.
612 /// Drops removed services, after cancelling any pending requests.
613 ///
614 /// If the peer connector channel is closed, returns an error.
615 ///
616 /// Otherwise, returns `Ok` if it discovered at least one peer, or `Poll::Pending` if it didn't
617 /// discover any peers. Always registers a wakeup for new peers, even when it returns `Ok`.
618 fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
619 // Return pending if there are no peers in the list.
620 let mut result = Poll::Pending;
621
622 loop {
623 // If we've emptied the list, finish looping, otherwise process the new peer.
624 let Poll::Ready(discovered) = Pin::new(&mut self.discover).poll_discover(cx) else {
625 break;
626 };
627
628 // If the change channel has a permanent error, return that error.
629 let change = discovered
630 .ok_or("discovery stream closed")?
631 .map_err(Into::into)?;
632
633 // Otherwise we have successfully processed a peer.
634 result = Poll::Ready(Ok(()));
635
636 // Process each change.
637 match change {
638 Change::Remove(key) => {
639 trace!(?key, "got Change::Remove from Discover");
640 self.remove(&key);
641 }
642 Change::Insert(key, svc) => {
643 // We add peers as unready, so that we:
644 // - always do the same checks on every ready peer, and
645 // - check for any errors that happened right after the handshake
646 trace!(?key, "got Change::Insert from Discover");
647
648 // # Security
649 //
650 // Drop the new peer if we are already connected to it.
651 // Preferring old connections avoids connection thrashing.
652 if self.has_peer_with_addr(key) {
653 std::mem::drop(svc);
654 continue;
655 }
656
657 // # Security
658 //
659 // drop the new peer if there are already `max_conns_per_ip` peers with
660 // the same IP address in the peer set.
661 if self.num_peers_with_ip(key.ip()) >= self.max_conns_per_ip {
662 std::mem::drop(svc);
663 continue;
664 }
665
666 self.push_unready(key, svc);
667 }
668 }
669 }
670
671 result
672 }
673
674 /// Checks if the minimum peer version has changed, and disconnects from outdated peers.
675 fn disconnect_from_outdated_peers(&mut self) {
676 if let Some(minimum_version) = self.minimum_peer_version.changed() {
677 // It is ok to drop ready services, they don't need anything cancelled.
678 self.ready_services
679 .retain(|_address, peer| peer.remote_version() >= minimum_version);
680 }
681 }
682
683 /// Takes a ready service by key.
684 fn take_ready_service(&mut self, key: &D::Key) -> Option<D::Service> {
685 if let Some(svc) = self.ready_services.remove(key) {
686 assert!(
687 !self.cancel_handles.contains_key(key),
688 "cancel handles are only used for unready service work"
689 );
690
691 Some(svc)
692 } else {
693 None
694 }
695 }
696
697 /// Remove the service corresponding to `key` from the peer set.
698 ///
699 /// Drops the service, cancelling any pending request or response to that peer.
700 /// If the peer does not exist, does nothing.
701 fn remove(&mut self, key: &D::Key) {
702 if let Some(ready_service) = self.take_ready_service(key) {
703 // A ready service has no work to cancel, so just drop it.
704 std::mem::drop(ready_service);
705 } else if let Some(handle) = self.cancel_handles.remove(key) {
706 // Cancel the work, implicitly dropping the cancel handle.
707 // The service future returns a `Canceled` error,
708 // making `poll_unready` drop the service.
709 let _ = handle.send(CancelClientWork);
710 }
711 }
712
713 /// Adds a ready service to the ready list if it's for a peer with a supported version.
714 /// If `was_unready` is true, also removes the peer's cancel handle.
715 ///
716 /// If the service is for a connection to an outdated peer, the service is dropped.
717 fn push_ready(&mut self, was_unready: bool, key: D::Key, svc: D::Service) {
718 let cancel = self.cancel_handles.remove(&key);
719 assert_eq!(
720 cancel.is_some(),
721 was_unready,
722 "missing or unexpected cancel handle"
723 );
724
725 if svc.remote_version() >= self.minimum_peer_version.current() {
726 self.ready_services.insert(key, svc);
727 } else {
728 std::mem::drop(svc);
729 }
730 }
731
732 /// Adds a busy service to the unready list if it's for a peer with a supported version,
733 /// and adds a cancel handle for the service's current request.
734 ///
735 /// If the service is for a connection to an outdated peer, the request is cancelled and the
736 /// service is dropped.
737 fn push_unready(&mut self, key: D::Key, svc: D::Service) {
738 let peer_version = svc.remote_version();
739 let (tx, rx) = oneshot::channel();
740
741 self.unready_services.push(UnreadyService {
742 key: Some(key),
743 service: Some(svc),
744 cancel: rx,
745 _req: PhantomData,
746 });
747
748 if peer_version >= self.minimum_peer_version.current() {
749 self.cancel_handles.insert(key, tx);
750 } else {
751 // Cancel any request made to the service because it is using an outdated protocol
752 // version.
753 let _ = tx.send(CancelClientWork);
754 }
755 }
756
757 /// Performs P2C on `self.ready_services` to randomly select a less-loaded ready service.
758 fn select_ready_p2c_peer(&self) -> Option<D::Key> {
759 self.select_p2c_peer_from_list(&self.ready_services.keys().copied().collect())
760 }
761
762 /// Performs P2C on `ready_service_list` to randomly select a less-loaded ready service.
763 #[allow(clippy::unwrap_in_result)]
764 fn select_p2c_peer_from_list(&self, ready_service_list: &HashSet<D::Key>) -> Option<D::Key> {
765 match ready_service_list.len() {
766 0 => None,
767 1 => Some(
768 *ready_service_list
769 .iter()
770 .next()
771 .expect("just checked there is one service"),
772 ),
773 len => {
774 // Choose 2 random peers, then return the least loaded of those 2 peers.
775 let (a, b) = {
776 let idxs = rand::seq::index::sample(&mut rand::thread_rng(), len, 2);
777 let a = idxs.index(0);
778 let b = idxs.index(1);
779
780 let a = *ready_service_list
781 .iter()
782 .nth(a)
783 .expect("sample returns valid indexes");
784 let b = *ready_service_list
785 .iter()
786 .nth(b)
787 .expect("sample returns valid indexes");
788
789 (a, b)
790 };
791
792 let a_load = self.query_load(&a).expect("supplied services are ready");
793 let b_load = self.query_load(&b).expect("supplied services are ready");
794
795 let selected = if a_load <= b_load { a } else { b };
796
797 trace!(
798 a.key = ?a,
799 a.load = ?a_load,
800 b.key = ?b,
801 b.load = ?b_load,
802 selected = ?selected,
803 ?len,
804 "selected service by p2c"
805 );
806
807 Some(selected)
808 }
809 }
810 }
811
812 /// Randomly chooses `max_peers` ready services, ignoring service load.
813 ///
814 /// The chosen peers are unique, but their order is not fully random.
815 fn select_random_ready_peers(&self, max_peers: usize) -> Vec<D::Key> {
816 use rand::seq::IteratorRandom;
817
818 self.ready_services
819 .keys()
820 .copied()
821 .choose_multiple(&mut rand::thread_rng(), max_peers)
822 }
823
824 /// Accesses a ready endpoint by `key` and returns its current load.
825 ///
826 /// Returns `None` if the service is not in the ready service list.
827 fn query_load(&self, key: &D::Key) -> Option<<D::Service as Load>::Metric> {
828 let svc = self.ready_services.get(key);
829 svc.map(|svc| svc.load())
830 }
831
832 /// Routes a request using P2C load-balancing.
833 fn route_p2c(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
834 if let Some(p2c_key) = self.select_ready_p2c_peer() {
835 tracing::trace!(?p2c_key, "routing based on p2c");
836
837 let mut svc = self
838 .take_ready_service(&p2c_key)
839 .expect("selected peer must be ready");
840
841 let fut = svc.call(req);
842 self.push_unready(p2c_key, svc);
843
844 return fut.map_err(Into::into).boxed();
845 }
846
847 async move {
848 // Let other tasks run, so a retry request might get different ready peers.
849 tokio::task::yield_now().await;
850
851 // # Security
852 //
853 // Avoid routing requests to peers that are missing inventory.
854 // If we kept trying doomed requests, peers that are missing our requested inventory
855 // could take up a large amount of our bandwidth and retry limits.
856 Err(SharedPeerError::from(PeerError::NoReadyPeers))
857 }
858 .map_err(Into::into)
859 .boxed()
860 }
861
862 /// Tries to route a request to a ready peer that advertised that inventory,
863 /// falling back to a ready peer that isn't missing the inventory.
864 ///
865 /// If all ready peers are missing the inventory,
866 /// returns a synthetic [`NotFoundRegistry`](PeerError::NotFoundRegistry) error.
867 ///
868 /// Uses P2C to route requests to the least loaded peer in each list.
869 fn route_inv(
870 &mut self,
871 req: Request,
872 hash: InventoryHash,
873 ) -> <Self as tower::Service<Request>>::Future {
874 let advertising_peer_list = self
875 .inventory_registry
876 .advertising_peers(hash)
877 .filter(|&addr| self.ready_services.contains_key(addr))
878 .copied()
879 .collect();
880
881 // # Security
882 //
883 // Choose a random, less-loaded peer with the inventory.
884 //
885 // If we chose the first peer in HashMap order,
886 // peers would be able to influence our choice by switching addresses.
887 // But we need the choice to be random,
888 // so that a peer can't provide all our inventory responses.
889 let peer = self.select_p2c_peer_from_list(&advertising_peer_list);
890
891 if let Some(mut svc) = peer.and_then(|key| self.take_ready_service(&key)) {
892 let peer = peer.expect("just checked peer is Some");
893 tracing::trace!(?hash, ?peer, "routing to a peer which advertised inventory");
894 let fut = svc.call(req);
895 self.push_unready(peer, svc);
896 return fut.map_err(Into::into).boxed();
897 }
898
899 let missing_peer_list: HashSet<PeerSocketAddr> = self
900 .inventory_registry
901 .missing_peers(hash)
902 .copied()
903 .collect();
904 let maybe_peer_list = self
905 .ready_services
906 .keys()
907 .filter(|addr| !missing_peer_list.contains(addr))
908 .copied()
909 .collect();
910
911 // Security: choose a random, less-loaded peer that might have the inventory.
912 let peer = self.select_p2c_peer_from_list(&maybe_peer_list);
913
914 if let Some(mut svc) = peer.and_then(|key| self.take_ready_service(&key)) {
915 let peer = peer.expect("just checked peer is Some");
916 tracing::trace!(?hash, ?peer, "routing to a peer that might have inventory");
917 let fut = svc.call(req);
918 self.push_unready(peer, svc);
919 return fut.map_err(Into::into).boxed();
920 }
921
922 tracing::debug!(
923 ?hash,
924 "all ready peers are missing inventory, failing request"
925 );
926
927 async move {
928 // Let other tasks run, so a retry request might get different ready peers.
929 tokio::task::yield_now().await;
930
931 // # Security
932 //
933 // Avoid routing requests to peers that are missing inventory.
934 // If we kept trying doomed requests, peers that are missing our requested inventory
935 // could take up a large amount of our bandwidth and retry limits.
936 Err(SharedPeerError::from(PeerError::NotFoundRegistry(vec![
937 hash,
938 ])))
939 }
940 .map_err(Into::into)
941 .boxed()
942 }
943
944 /// Routes the same request to up to `max_peers` ready peers, ignoring return values.
945 ///
946 /// `max_peers` must be at least one, and at most the number of ready peers.
947 fn route_multiple(
948 &mut self,
949 req: Request,
950 max_peers: usize,
951 ) -> <Self as tower::Service<Request>>::Future {
952 assert!(
953 max_peers > 0,
954 "requests must be routed to at least one peer"
955 );
956 assert!(
957 max_peers <= self.ready_services.len(),
958 "requests can only be routed to ready peers"
959 );
960
961 // # Security
962 //
963 // We choose peers randomly, ignoring load.
964 // This avoids favouring malicious peers, because peers can influence their own load.
965 //
966 // The order of peers isn't completely random,
967 // but peer request order is not security-sensitive.
968
969 let futs = FuturesUnordered::new();
970 for key in self.select_random_ready_peers(max_peers) {
971 let mut svc = self
972 .take_ready_service(&key)
973 .expect("selected peers are ready");
974 futs.push(svc.call(req.clone()).map_err(|_| ()));
975 self.push_unready(key, svc);
976 }
977
978 async move {
979 let results = futs.collect::<Vec<Result<_, _>>>().await;
980 tracing::debug!(
981 ok.len = results.iter().filter(|r| r.is_ok()).count(),
982 err.len = results.iter().filter(|r| r.is_err()).count(),
983 "sent peer request to multiple peers"
984 );
985 Ok(Response::Nil)
986 }
987 .boxed()
988 }
989
990 /// Broadcasts the same request to lots of ready peers, ignoring return values.
991 fn route_broadcast(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
992 // Broadcasts ignore the response
993 self.route_multiple(req, self.number_of_peers_to_broadcast())
994 }
995
996 /// Given a number of ready peers calculate to how many of them Zebra will
997 /// actually send the request to. Return this number.
998 pub(crate) fn number_of_peers_to_broadcast(&self) -> usize {
999 // We are currently sending broadcast messages to a third of the total peers.
1000 const PEER_FRACTION_TO_BROADCAST: usize = 3;
1001
1002 // Round up, so that if we have one ready peer, it gets the request.
1003 div_ceil(self.ready_services.len(), PEER_FRACTION_TO_BROADCAST)
1004 }
1005
1006 /// Returns the list of addresses in the peer set.
1007 fn peer_set_addresses(&self) -> Vec<PeerSocketAddr> {
1008 self.ready_services
1009 .keys()
1010 .chain(self.cancel_handles.keys())
1011 .cloned()
1012 .collect()
1013 }
1014
1015 /// Logs the peer set size, and any potential connectivity issues.
1016 fn log_peer_set_size(&mut self) {
1017 let ready_services_len = self.ready_services.len();
1018 let unready_services_len = self.unready_services.len();
1019 trace!(ready_peers = ?ready_services_len, unready_peers = ?unready_services_len);
1020
1021 let now = Instant::now();
1022
1023 // These logs are designed to be human-readable in a terminal, at the
1024 // default Zebra log level. If you need to know the peer set size for
1025 // every request, use the trace-level logs, or the metrics exporter.
1026 if let Some(last_peer_log) = self.last_peer_log {
1027 // Avoid duplicate peer set logs
1028 if now.duration_since(last_peer_log) < MIN_PEER_SET_LOG_INTERVAL {
1029 return;
1030 }
1031 } else {
1032 // Suppress initial logs until the peer set has started up.
1033 // There can be multiple initial requests before the first peer is
1034 // ready.
1035 self.last_peer_log = Some(now);
1036 return;
1037 }
1038
1039 self.last_peer_log = Some(now);
1040
1041 // Log potential duplicate connections.
1042 let peers = self.peer_set_addresses();
1043
1044 // Check for duplicates by address and port: these are unexpected and represent a bug.
1045 let duplicates: Vec<PeerSocketAddr> = peers.iter().duplicates().cloned().collect();
1046
1047 let mut peer_counts = peers.iter().counts();
1048 peer_counts.retain(|peer, _count| duplicates.contains(peer));
1049
1050 if !peer_counts.is_empty() {
1051 let duplicate_connections: usize = peer_counts.values().sum();
1052
1053 warn!(
1054 ?duplicate_connections,
1055 duplicated_peers = ?peer_counts.len(),
1056 peers = ?peers.len(),
1057 "duplicate peer connections in peer set"
1058 );
1059 }
1060
1061 // Check for duplicates by address: these can happen if there are multiple nodes
1062 // behind a NAT or on a single server.
1063 let peers: Vec<IpAddr> = peers.iter().map(|addr| addr.ip()).collect();
1064 let duplicates: Vec<IpAddr> = peers.iter().duplicates().cloned().collect();
1065
1066 let mut peer_counts = peers.iter().counts();
1067 peer_counts.retain(|peer, _count| duplicates.contains(peer));
1068
1069 if !peer_counts.is_empty() {
1070 let duplicate_connections: usize = peer_counts.values().sum();
1071
1072 info!(
1073 ?duplicate_connections,
1074 duplicated_peers = ?peer_counts.len(),
1075 peers = ?peers.len(),
1076 "duplicate IP addresses in peer set"
1077 );
1078 }
1079
1080 // Only log connectivity warnings if all our peers are busy (or there are no peers).
1081 if ready_services_len > 0 {
1082 return;
1083 }
1084
1085 let address_metrics = *self.address_metrics.borrow();
1086 if unready_services_len == 0 {
1087 warn!(
1088 ?address_metrics,
1089 "network request with no peer connections. Hint: check your network connection"
1090 );
1091 } else {
1092 info!(?address_metrics, "network request with no ready peers: finding more peers, waiting for {} peers to answer requests",
1093 unready_services_len);
1094 }
1095 }
1096
1097 /// Updates the peer set metrics.
1098 ///
1099 /// # Panics
1100 ///
1101 /// If the peer set size exceeds the connection limit.
1102 fn update_metrics(&self) {
1103 let num_ready = self.ready_services.len();
1104 let num_unready = self.unready_services.len();
1105 let num_peers = num_ready + num_unready;
1106 metrics::gauge!("pool.num_ready").set(num_ready as f64);
1107 metrics::gauge!("pool.num_unready").set(num_unready as f64);
1108 metrics::gauge!("zcash.net.peers").set(num_peers as f64);
1109
1110 // Security: make sure we haven't exceeded the connection limit
1111 if num_peers > self.peerset_total_connection_limit {
1112 let address_metrics = *self.address_metrics.borrow();
1113 panic!(
1114 "unexpectedly exceeded configured peer set connection limit: \n\
1115 peers: {num_peers:?}, ready: {num_ready:?}, unready: {num_unready:?}, \n\
1116 address_metrics: {address_metrics:?}",
1117 );
1118 }
1119 }
1120}
1121
1122impl<D, C> Service<Request> for PeerSet<D, C>
1123where
1124 D: Discover<Key = PeerSocketAddr, Service = LoadTrackedClient> + Unpin,
1125 D::Error: Into<BoxError>,
1126 C: ChainTip,
1127{
1128 type Response = Response;
1129 type Error = BoxError;
1130 type Future =
1131 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1132
1133 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1134 // Update service and peer statuses.
1135 //
1136 // # Correctness
1137 //
1138 // All of the futures that receive a context from this method can wake the peer set buffer
1139 // task. If there are no ready peers, and no new peers, network requests will pause until:
1140 // - an unready peer becomes ready, or
1141 // - a new peer arrives.
1142
1143 // Check for new peers, and register a task wakeup when the next new peers arrive. New peers
1144 // can be infrequent if our connection slots are full, or we're connected to all
1145 // available/useful peers.
1146 let _poll_pending_or_ready: Poll<()> = self.poll_discover(cx)?;
1147
1148 // These tasks don't provide new peers or newly ready peers.
1149 let _poll_pending: Poll<()> = self.poll_background_errors(cx)?;
1150 let _poll_pending_or_ready: Poll<()> = self.inventory_registry.poll_inventory(cx)?;
1151
1152 // Check for newly ready peers, including newly added peers (which are added as unready).
1153 // So it needs to run after `poll_discover()`. Registers a wakeup if there are any unready
1154 // peers.
1155 //
1156 // Each connected peer should become ready within a few minutes, or timeout, close the
1157 // connection, and release its connection slot.
1158 //
1159 // TODO: drop peers that overload us with inbound messages and never become ready (#7822)
1160 let _poll_pending_or_ready: Poll<Option<()>> = self.poll_unready(cx)?;
1161
1162 // Cleanup and metrics.
1163
1164 // Only checks the versions of ready peers, so it needs to run after `poll_unready()`.
1165 self.disconnect_from_outdated_peers();
1166
1167 // These metrics should run last, to report the most up-to-date information.
1168 self.log_peer_set_size();
1169 self.update_metrics();
1170
1171 // Check for failures in ready peers, removing newly errored or disconnected peers.
1172 // So it needs to run after `poll_unready()`.
1173 let ready_peers: Poll<()> = self.poll_ready_peer_errors(cx);
1174
1175 if ready_peers.is_pending() {
1176 // # Correctness
1177 //
1178 // If the channel is full, drop the demand signal rather than waiting. If we waited
1179 // here, the crawler could deadlock sending a request to fetch more peers, because it
1180 // also empties the channel.
1181 trace!("no ready services, sending demand signal");
1182 let _ = self.demand_signal.try_send(MorePeers);
1183
1184 // # Correctness
1185 //
1186 // The current task must be scheduled for wakeup every time we return `Poll::Pending`.
1187 //
1188 // As long as there are unready or new peers, this task will run, because:
1189 // - `poll_discover` schedules this task for wakeup when new peers arrive.
1190 // - if there are unready peers, `poll_unready` or `poll_ready_peers` schedule this
1191 // task for wakeup when peer services become ready.
1192 //
1193 // To avoid peers blocking on a full peer status/error channel:
1194 // - `poll_background_errors` schedules this task for wakeup when the peer status
1195 // update task exits.
1196 Poll::Pending
1197 } else {
1198 Poll::Ready(Ok(()))
1199 }
1200 }
1201
1202 fn call(&mut self, req: Request) -> Self::Future {
1203 let fut = match req {
1204 // Only do inventory-aware routing on individual items.
1205 Request::BlocksByHash(ref hashes) if hashes.len() == 1 => {
1206 let hash = InventoryHash::from(*hashes.iter().next().unwrap());
1207 self.route_inv(req, hash)
1208 }
1209 Request::TransactionsById(ref hashes) if hashes.len() == 1 => {
1210 let hash = InventoryHash::from(*hashes.iter().next().unwrap());
1211 self.route_inv(req, hash)
1212 }
1213
1214 // Broadcast advertisements to lots of peers
1215 Request::AdvertiseTransactionIds(_) => self.route_broadcast(req),
1216 Request::AdvertiseBlock(_) => self.route_broadcast(req),
1217
1218 // Choose a random less-loaded peer for all other requests
1219 _ => self.route_p2c(req),
1220 };
1221 self.update_metrics();
1222
1223 fut
1224 }
1225}