zebra_network/
address_book.rs

1//! The `AddressBook` manages information about what peers exist, when they were
2//! seen, and what services they provide.
3
4use std::{
5    cmp::Reverse,
6    collections::HashMap,
7    net::{IpAddr, SocketAddr},
8    sync::{Arc, Mutex},
9    time::Instant,
10};
11
12use chrono::Utc;
13use indexmap::IndexMap;
14use ordered_map::OrderedMap;
15use tokio::sync::watch;
16use tracing::Span;
17
18use zebra_chain::{parameters::Network, serialization::DateTime32};
19
20use crate::{
21    constants::{self, ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE},
22    meta_addr::MetaAddrChange,
23    protocol::external::{canonical_peer_addr, canonical_socket_addr},
24    types::MetaAddr,
25    AddressBookPeers, PeerAddrState, PeerSocketAddr,
26};
27
28#[cfg(test)]
29mod tests;
30
31/// A database of peer listener addresses, their advertised services, and
32/// information on when they were last seen.
33///
34/// # Security
35///
36/// Address book state must be based on outbound connections to peers.
37///
38/// If the address book is updated incorrectly:
39/// - malicious peers can interfere with other peers' `AddressBook` state,
40///   or
41/// - Zebra can advertise unreachable addresses to its own peers.
42///
43/// ## Adding Addresses
44///
45/// The address book should only contain Zcash listener port addresses from peers
46/// on the configured network. These addresses can come from:
47/// - DNS seeders
48/// - addresses gossiped by other peers
49/// - the canonical address (`Version.address_from`) provided by each peer,
50///   particularly peers on inbound connections.
51///
52/// The remote addresses of inbound connections must not be added to the address
53/// book, because they contain ephemeral outbound ports, not listener ports.
54///
55/// Isolated connections must not add addresses or update the address book.
56///
57/// ## Updating Address State
58///
59/// Updates to address state must be based on outbound connections to peers.
60///
61/// Updates must not be based on:
62/// - the remote addresses of inbound connections, or
63/// - the canonical address of any connection.
64#[derive(Debug)]
65pub struct AddressBook {
66    /// Peer listener addresses, suitable for outbound connections,
67    /// in connection attempt order.
68    ///
69    /// Some peers in this list might have open outbound or inbound connections.
70    ///
71    /// We reverse the comparison order, because the standard library
72    /// ([`BTreeMap`](std::collections::BTreeMap)) sorts in ascending order, but
73    /// [`OrderedMap`] sorts in descending order.
74    by_addr: OrderedMap<PeerSocketAddr, MetaAddr, Reverse<MetaAddr>>,
75
76    /// The address with a last_connection_state of [`PeerAddrState::Responded`] and
77    /// the most recent `last_response` time by IP.
78    ///
79    /// This is used to avoid initiating outbound connections past [`Config::max_connections_per_ip`](crate::config::Config), and
80    /// currently only supports a `max_connections_per_ip` of 1, and must be `None` when used with a greater `max_connections_per_ip`.
81    // TODO: Replace with `by_ip: HashMap<IpAddr, BTreeMap<DateTime32, MetaAddr>>` to support configured `max_connections_per_ip` greater than 1
82    most_recent_by_ip: Option<HashMap<IpAddr, MetaAddr>>,
83
84    /// A list of banned addresses, with the time they were banned.
85    bans_by_ip: Arc<IndexMap<IpAddr, Instant>>,
86
87    /// The local listener address.
88    local_listener: SocketAddr,
89
90    /// The configured Zcash network.
91    network: Network,
92
93    /// The maximum number of addresses in the address book.
94    ///
95    /// Always set to [`MAX_ADDRS_IN_ADDRESS_BOOK`](constants::MAX_ADDRS_IN_ADDRESS_BOOK),
96    /// in release builds. Lower values are used during testing.
97    addr_limit: usize,
98
99    /// The span for operations on this address book.
100    span: Span,
101
102    /// A channel used to send the latest address book metrics.
103    address_metrics_tx: watch::Sender<AddressMetrics>,
104
105    /// The last time we logged a message about the address metrics.
106    last_address_log: Option<Instant>,
107}
108
109/// Metrics about the states of the addresses in an [`AddressBook`].
110#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)]
111pub struct AddressMetrics {
112    /// The number of addresses in the `Responded` state.
113    pub responded: usize,
114
115    /// The number of addresses in the `NeverAttemptedGossiped` state.
116    pub never_attempted_gossiped: usize,
117
118    /// The number of addresses in the `Failed` state.
119    pub failed: usize,
120
121    /// The number of addresses in the `AttemptPending` state.
122    pub attempt_pending: usize,
123
124    /// The number of `Responded` addresses within the liveness limit.
125    pub recently_live: usize,
126
127    /// The number of `Responded` addresses outside the liveness limit.
128    pub recently_stopped_responding: usize,
129
130    /// The number of addresses in the address book, regardless of their states.
131    pub num_addresses: usize,
132
133    /// The maximum number of addresses in the address book.
134    pub address_limit: usize,
135}
136
137#[allow(clippy::len_without_is_empty)]
138impl AddressBook {
139    /// Construct an [`AddressBook`] with the given `local_listener` on `network`.
140    ///
141    /// Uses the supplied [`tracing::Span`] for address book operations.
142    pub fn new(
143        local_listener: SocketAddr,
144        network: &Network,
145        max_connections_per_ip: usize,
146        span: Span,
147    ) -> AddressBook {
148        let constructor_span = span.clone();
149        let _guard = constructor_span.enter();
150
151        let instant_now = Instant::now();
152        let chrono_now = Utc::now();
153
154        // The default value is correct for an empty address book,
155        // and it gets replaced by `update_metrics` anyway.
156        let (address_metrics_tx, _address_metrics_rx) = watch::channel(AddressMetrics::default());
157
158        // Avoid initiating outbound handshakes when max_connections_per_ip is 1.
159        let should_limit_outbound_conns_per_ip = max_connections_per_ip == 1;
160        let mut new_book = AddressBook {
161            by_addr: OrderedMap::new(|meta_addr| Reverse(*meta_addr)),
162            local_listener: canonical_socket_addr(local_listener),
163            network: network.clone(),
164            addr_limit: constants::MAX_ADDRS_IN_ADDRESS_BOOK,
165            span,
166            address_metrics_tx,
167            last_address_log: None,
168            most_recent_by_ip: should_limit_outbound_conns_per_ip.then(HashMap::new),
169            bans_by_ip: Default::default(),
170        };
171
172        new_book.update_metrics(instant_now, chrono_now);
173        new_book
174    }
175
176    /// Construct an [`AddressBook`] with the given `local_listener`, `network`,
177    /// `addr_limit`, [`tracing::Span`], and addresses.
178    ///
179    /// `addr_limit` is enforced by this method, and by [`AddressBook::update`].
180    ///
181    /// If there are multiple [`MetaAddr`]s with the same address,
182    /// an arbitrary address is inserted into the address book,
183    /// and the rest are dropped.
184    ///
185    /// This constructor can be used to break address book invariants,
186    /// so it should only be used in tests.
187    #[cfg(any(test, feature = "proptest-impl"))]
188    pub fn new_with_addrs(
189        local_listener: SocketAddr,
190        network: &Network,
191        max_connections_per_ip: usize,
192        addr_limit: usize,
193        span: Span,
194        addrs: impl IntoIterator<Item = MetaAddr>,
195    ) -> AddressBook {
196        let constructor_span = span.clone();
197        let _guard = constructor_span.enter();
198
199        let instant_now = Instant::now();
200        let chrono_now = Utc::now();
201
202        // The maximum number of addresses should be always greater than 0
203        assert!(addr_limit > 0);
204
205        let mut new_book = AddressBook::new(local_listener, network, max_connections_per_ip, span);
206        new_book.addr_limit = addr_limit;
207
208        let addrs = addrs
209            .into_iter()
210            .map(|mut meta_addr| {
211                meta_addr.addr = canonical_peer_addr(meta_addr.addr);
212                meta_addr
213            })
214            .filter(|meta_addr| meta_addr.address_is_valid_for_outbound(network))
215            .map(|meta_addr| (meta_addr.addr, meta_addr));
216
217        for (socket_addr, meta_addr) in addrs {
218            // overwrite any duplicate addresses
219            new_book.by_addr.insert(socket_addr, meta_addr);
220            // Add the address to `most_recent_by_ip` if it has responded
221            if new_book.should_update_most_recent_by_ip(meta_addr) {
222                new_book
223                    .most_recent_by_ip
224                    .as_mut()
225                    .expect("should be some when should_update_most_recent_by_ip is true")
226                    .insert(socket_addr.ip(), meta_addr);
227            }
228            // exit as soon as we get enough addresses
229            if new_book.by_addr.len() >= addr_limit {
230                break;
231            }
232        }
233
234        new_book.update_metrics(instant_now, chrono_now);
235        new_book
236    }
237
238    /// Return a watch channel for the address book metrics.
239    ///
240    /// The metrics in the watch channel are only updated when the address book updates,
241    /// so they can be significantly outdated if Zebra is disconnected or hung.
242    ///
243    /// The current metrics value is marked as seen.
244    /// So `Receiver::changed` will only return after the next address book update.
245    pub fn address_metrics_watcher(&self) -> watch::Receiver<AddressMetrics> {
246        self.address_metrics_tx.subscribe()
247    }
248
249    /// Set the local listener address. Only for use in tests.
250    #[cfg(any(test, feature = "proptest-impl"))]
251    pub fn set_local_listener(&mut self, addr: SocketAddr) {
252        self.local_listener = addr;
253    }
254
255    /// Get the local listener address.
256    ///
257    /// This address contains minimal state, but it is not sanitized.
258    pub fn local_listener_meta_addr(&self, now: chrono::DateTime<Utc>) -> MetaAddr {
259        let now: DateTime32 = now.try_into().expect("will succeed until 2038");
260
261        MetaAddr::new_local_listener_change(self.local_listener)
262            .local_listener_into_new_meta_addr(now)
263    }
264
265    /// Get the local listener [`SocketAddr`].
266    pub fn local_listener_socket_addr(&self) -> SocketAddr {
267        self.local_listener
268    }
269
270    /// Get the active addresses in `self` in random order with sanitized timestamps,
271    /// including our local listener address.
272    ///
273    /// Limited to the number of peer addresses Zebra should give out per `GetAddr` request.
274    pub fn fresh_get_addr_response(&self) -> Vec<MetaAddr> {
275        let now = Utc::now();
276        let mut peers = self.sanitized(now);
277        let address_limit = peers.len().div_ceil(ADDR_RESPONSE_LIMIT_DENOMINATOR);
278        peers.truncate(MAX_ADDRS_IN_MESSAGE.min(address_limit));
279
280        peers
281    }
282
283    /// Get the active addresses in `self` in random order with sanitized timestamps,
284    /// including our local listener address.
285    pub(crate) fn sanitized(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
286        use rand::seq::SliceRandom;
287        let _guard = self.span.enter();
288
289        let mut peers = self.by_addr.clone();
290
291        // Unconditionally add our local listener address to the advertised peers,
292        // to replace any self-connection failures. The address book and change
293        // constructors make sure that the SocketAddr is canonical.
294        let local_listener = self.local_listener_meta_addr(now);
295        peers.insert(local_listener.addr, local_listener);
296
297        // Then sanitize and shuffle
298        let mut peers: Vec<MetaAddr> = peers
299            .descending_values()
300            .filter_map(|meta_addr| meta_addr.sanitize(&self.network))
301            // # Security
302            //
303            // Remove peers that:
304            //   - last responded more than three hours ago, or
305            //   - haven't responded yet but were reported last seen more than three hours ago
306            //
307            // This prevents Zebra from gossiping nodes that are likely unreachable. Gossiping such
308            // nodes impacts the network health, because connection attempts end up being wasted on
309            // peers that are less likely to respond.
310            .filter(|addr| addr.is_active_for_gossip(now))
311            .collect();
312
313        peers.shuffle(&mut rand::thread_rng());
314
315        peers
316    }
317
318    /// Get the active addresses in `self`, in preferred caching order,
319    /// excluding our local listener address.
320    pub fn cacheable(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
321        let _guard = self.span.enter();
322
323        let peers = self.by_addr.clone();
324
325        // Get peers in preferred order, then keep the recently active ones
326        peers
327            .descending_values()
328            // # Security
329            //
330            // Remove peers that:
331            //   - last responded more than three hours ago, or
332            //   - haven't responded yet but were reported last seen more than three hours ago
333            //
334            // This prevents Zebra from caching nodes that are likely unreachable,
335            // which improves startup time and reliability.
336            .filter(|addr| addr.is_active_for_gossip(now))
337            .cloned()
338            .collect()
339    }
340
341    /// Look up `addr` in the address book, and return its [`MetaAddr`].
342    ///
343    /// Converts `addr` to a canonical address before looking it up.
344    pub fn get(&mut self, addr: PeerSocketAddr) -> Option<MetaAddr> {
345        let addr = canonical_peer_addr(*addr);
346
347        // Unfortunately, `OrderedMap` doesn't implement `get`.
348        let meta_addr = self.by_addr.remove(&addr);
349
350        if let Some(meta_addr) = meta_addr {
351            self.by_addr.insert(addr, meta_addr);
352        }
353
354        meta_addr
355    }
356
357    /// Returns true if `updated` needs to be applied to the recent outbound peer connection IP cache.
358    ///
359    /// Checks if there are no existing entries in the address book with this IP,
360    /// or if `updated` has a more recent `last_response` requiring the outbound connector to wait
361    /// longer before initiating handshakes with peers at this IP.
362    ///
363    /// This code only needs to check a single cache entry, rather than the entire address book,
364    /// because other code maintains these invariants:
365    /// - `last_response` times for an entry can only increase.
366    /// - this is the only field checked by `has_connection_recently_responded()`
367    ///
368    /// See [`AddressBook::is_ready_for_connection_attempt_with_ip`] for more details.
369    fn should_update_most_recent_by_ip(&self, updated: MetaAddr) -> bool {
370        let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
371            return false;
372        };
373
374        if let Some(previous) = most_recent_by_ip.get(&updated.addr.ip()) {
375            updated.last_connection_state == PeerAddrState::Responded
376                && updated.last_response() > previous.last_response()
377        } else {
378            updated.last_connection_state == PeerAddrState::Responded
379        }
380    }
381
382    /// Returns true if `addr` is the latest entry for its IP, which is stored in `most_recent_by_ip`.
383    /// The entry is checked for an exact match to the IP and port of `addr`.
384    fn should_remove_most_recent_by_ip(&self, addr: PeerSocketAddr) -> bool {
385        let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
386            return false;
387        };
388
389        if let Some(previous) = most_recent_by_ip.get(&addr.ip()) {
390            previous.addr == addr
391        } else {
392            false
393        }
394    }
395
396    /// Apply `change` to the address book, returning the updated `MetaAddr`,
397    /// if the change was valid.
398    ///
399    /// # Correctness
400    ///
401    /// All changes should go through `update`, so that the address book
402    /// only contains valid outbound addresses.
403    ///
404    /// Change addresses must be canonical `PeerSocketAddr`s. This makes sure that
405    /// each address book entry has a unique IP address.
406    ///
407    /// # Security
408    ///
409    /// This function must apply every attempted, responded, and failed change
410    /// to the address book. This prevents rapid reconnections to the same peer.
411    ///
412    /// As an exception, this function can ignore all changes for specific
413    /// [`PeerSocketAddr`]s. Ignored addresses will never be used to connect to
414    /// peers.
415    #[allow(clippy::unwrap_in_result)]
416    pub fn update(&mut self, change: MetaAddrChange) -> Option<MetaAddr> {
417        if self.bans_by_ip.contains_key(&change.addr().ip()) {
418            tracing::warn!(
419                ?change,
420                "attempted to add a banned peer addr to address book"
421            );
422            return None;
423        }
424
425        let previous = self.get(change.addr());
426
427        let _guard = self.span.enter();
428
429        let instant_now = Instant::now();
430        let chrono_now = Utc::now();
431
432        let updated = change.apply_to_meta_addr(previous, instant_now, chrono_now);
433
434        trace!(
435            ?change,
436            ?updated,
437            ?previous,
438            total_peers = self.by_addr.len(),
439            recent_peers = self.recently_live_peers(chrono_now).len(),
440            "calculated updated address book entry",
441        );
442
443        if let Some(updated) = updated {
444            if updated.misbehavior() >= constants::MAX_PEER_MISBEHAVIOR_SCORE {
445                // Ban and skip outbound connections with excessively misbehaving peers.
446                let banned_ip = updated.addr.ip();
447                let bans_by_ip = Arc::make_mut(&mut self.bans_by_ip);
448
449                bans_by_ip.insert(banned_ip, Instant::now());
450                if bans_by_ip.len() > constants::MAX_BANNED_IPS {
451                    // Remove the oldest banned IP from the address book.
452                    bans_by_ip.shift_remove_index(0);
453                }
454
455                self.most_recent_by_ip
456                    .as_mut()
457                    .expect("should be some when should_remove_most_recent_by_ip is true")
458                    .remove(&banned_ip);
459
460                let banned_addrs: Vec<_> = self
461                    .by_addr
462                    .descending_keys()
463                    .skip_while(|addr| addr.ip() != banned_ip)
464                    .take_while(|addr| addr.ip() == banned_ip)
465                    .cloned()
466                    .collect();
467
468                for addr in banned_addrs {
469                    self.by_addr.remove(&addr);
470                }
471
472                warn!(
473                    ?updated,
474                    total_peers = self.by_addr.len(),
475                    recent_peers = self.recently_live_peers(chrono_now).len(),
476                    "banned ip and removed banned peer addresses from address book",
477                );
478
479                return None;
480            }
481
482            // Ignore invalid outbound addresses.
483            // (Inbound connections can be monitored via Zebra's metrics.)
484            if !updated.address_is_valid_for_outbound(&self.network) {
485                return None;
486            }
487
488            // Ignore invalid outbound services and other info,
489            // but only if the peer has never been attempted.
490            //
491            // Otherwise, if we got the info directly from the peer,
492            // store it in the address book, so we know not to reconnect.
493            if !updated.last_known_info_is_valid_for_outbound(&self.network)
494                && updated.last_connection_state.is_never_attempted()
495            {
496                return None;
497            }
498
499            self.by_addr.insert(updated.addr, updated);
500
501            // Add the address to `most_recent_by_ip` if it sent the most recent
502            // response Zebra has received from this IP.
503            if self.should_update_most_recent_by_ip(updated) {
504                self.most_recent_by_ip
505                    .as_mut()
506                    .expect("should be some when should_update_most_recent_by_ip is true")
507                    .insert(updated.addr.ip(), updated);
508            }
509
510            debug!(
511                ?change,
512                ?updated,
513                ?previous,
514                total_peers = self.by_addr.len(),
515                recent_peers = self.recently_live_peers(chrono_now).len(),
516                "updated address book entry",
517            );
518
519            // Security: Limit the number of peers in the address book.
520            //
521            // We only delete outdated peers when we have too many peers.
522            // If we deleted them as soon as they became too old,
523            // then other peers could re-insert them into the address book.
524            // And we would start connecting to those outdated peers again,
525            // ignoring the age limit in [`MetaAddr::is_probably_reachable`].
526            while self.by_addr.len() > self.addr_limit {
527                let surplus_peer = self
528                    .peers()
529                    .next_back()
530                    .expect("just checked there is at least one peer");
531
532                self.by_addr.remove(&surplus_peer.addr);
533
534                // Check if this surplus peer's addr matches that in `most_recent_by_ip`
535                // for this the surplus peer's ip to remove it there as well.
536                if self.should_remove_most_recent_by_ip(surplus_peer.addr) {
537                    self.most_recent_by_ip
538                        .as_mut()
539                        .expect("should be some when should_remove_most_recent_by_ip is true")
540                        .remove(&surplus_peer.addr.ip());
541                }
542
543                debug!(
544                    surplus = ?surplus_peer,
545                    ?updated,
546                    total_peers = self.by_addr.len(),
547                    recent_peers = self.recently_live_peers(chrono_now).len(),
548                    "removed surplus address book entry",
549                );
550            }
551
552            assert!(self.len() <= self.addr_limit);
553
554            std::mem::drop(_guard);
555            self.update_metrics(instant_now, chrono_now);
556        }
557
558        updated
559    }
560
561    /// Removes the entry with `addr`, returning it if it exists
562    ///
563    /// # Note
564    ///
565    /// All address removals should go through `take`, so that the address
566    /// book metrics are accurate.
567    #[allow(dead_code)]
568    fn take(&mut self, removed_addr: PeerSocketAddr) -> Option<MetaAddr> {
569        let _guard = self.span.enter();
570
571        let instant_now = Instant::now();
572        let chrono_now = Utc::now();
573
574        trace!(
575            ?removed_addr,
576            total_peers = self.by_addr.len(),
577            recent_peers = self.recently_live_peers(chrono_now).len(),
578        );
579
580        if let Some(entry) = self.by_addr.remove(&removed_addr) {
581            // Check if this surplus peer's addr matches that in `most_recent_by_ip`
582            // for this the surplus peer's ip to remove it there as well.
583            if self.should_remove_most_recent_by_ip(entry.addr) {
584                if let Some(most_recent_by_ip) = self.most_recent_by_ip.as_mut() {
585                    most_recent_by_ip.remove(&entry.addr.ip());
586                }
587            }
588
589            std::mem::drop(_guard);
590            self.update_metrics(instant_now, chrono_now);
591            Some(entry)
592        } else {
593            None
594        }
595    }
596
597    /// Returns true if the given [`PeerSocketAddr`] is pending a reconnection
598    /// attempt.
599    pub fn pending_reconnection_addr(&mut self, addr: PeerSocketAddr) -> bool {
600        let meta_addr = self.get(addr);
601
602        let _guard = self.span.enter();
603        match meta_addr {
604            None => false,
605            Some(peer) => peer.last_connection_state == PeerAddrState::AttemptPending,
606        }
607    }
608
609    /// Return an iterator over all peers.
610    ///
611    /// Returns peers in reconnection attempt order, including recently connected peers.
612    pub fn peers(&'_ self) -> impl DoubleEndedIterator<Item = MetaAddr> + '_ {
613        let _guard = self.span.enter();
614        self.by_addr.descending_values().cloned()
615    }
616
617    /// Is this IP ready for a new outbound connection attempt?
618    /// Checks if the outbound connection with the most recent response at this IP has recently responded.
619    ///
620    /// Note: last_response times may remain live for a long time if the local clock is changed to an earlier time.
621    fn is_ready_for_connection_attempt_with_ip(
622        &self,
623        ip: &IpAddr,
624        chrono_now: chrono::DateTime<Utc>,
625    ) -> bool {
626        let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
627            // if we're not checking IPs, any connection is allowed
628            return true;
629        };
630        let Some(same_ip_peer) = most_recent_by_ip.get(ip) else {
631            // If there's no entry for this IP, any connection is allowed
632            return true;
633        };
634        !same_ip_peer.has_connection_recently_responded(chrono_now)
635    }
636
637    /// Return an iterator over peers that are due for a reconnection attempt,
638    /// in reconnection attempt order.
639    pub fn reconnection_peers(
640        &'_ self,
641        instant_now: Instant,
642        chrono_now: chrono::DateTime<Utc>,
643    ) -> impl DoubleEndedIterator<Item = MetaAddr> + '_ {
644        let _guard = self.span.enter();
645
646        // Skip live peers, and peers pending a reconnect attempt.
647        // The peers are already stored in sorted order.
648        self.by_addr
649            .descending_values()
650            .filter(move |peer| {
651                peer.is_ready_for_connection_attempt(instant_now, chrono_now, &self.network)
652                    && self.is_ready_for_connection_attempt_with_ip(&peer.addr.ip(), chrono_now)
653            })
654            .cloned()
655    }
656
657    /// Return an iterator over all the peers in `state`,
658    /// in reconnection attempt order, including recently connected peers.
659    pub fn state_peers(
660        &'_ self,
661        state: PeerAddrState,
662    ) -> impl DoubleEndedIterator<Item = MetaAddr> + '_ {
663        let _guard = self.span.enter();
664
665        self.by_addr
666            .descending_values()
667            .filter(move |peer| peer.last_connection_state == state)
668            .cloned()
669    }
670
671    /// Return an iterator over peers that might be connected,
672    /// in reconnection attempt order.
673    pub fn maybe_connected_peers(
674        &'_ self,
675        instant_now: Instant,
676        chrono_now: chrono::DateTime<Utc>,
677    ) -> impl DoubleEndedIterator<Item = MetaAddr> + '_ {
678        let _guard = self.span.enter();
679
680        self.by_addr
681            .descending_values()
682            .filter(move |peer| {
683                !peer.is_ready_for_connection_attempt(instant_now, chrono_now, &self.network)
684            })
685            .cloned()
686    }
687
688    /// Returns banned IP addresses.
689    pub fn bans(&self) -> Arc<IndexMap<IpAddr, Instant>> {
690        self.bans_by_ip.clone()
691    }
692
693    /// Returns the number of entries in this address book.
694    pub fn len(&self) -> usize {
695        self.by_addr.len()
696    }
697
698    /// Returns metrics for the addresses in this address book.
699    /// Only for use in tests.
700    ///
701    /// # Correctness
702    ///
703    /// Use [`AddressBook::address_metrics_watcher().borrow()`] in production code,
704    /// to avoid deadlocks.
705    #[cfg(test)]
706    pub fn address_metrics(&self, now: chrono::DateTime<Utc>) -> AddressMetrics {
707        self.address_metrics_internal(now)
708    }
709
710    /// Returns metrics for the addresses in this address book.
711    ///
712    /// # Correctness
713    ///
714    /// External callers should use [`AddressBook::address_metrics_watcher().borrow()`]
715    /// in production code, to avoid deadlocks.
716    /// (Using the watch channel receiver does not lock the address book mutex.)
717    fn address_metrics_internal(&self, now: chrono::DateTime<Utc>) -> AddressMetrics {
718        let responded = self.state_peers(PeerAddrState::Responded).count();
719        let never_attempted_gossiped = self
720            .state_peers(PeerAddrState::NeverAttemptedGossiped)
721            .count();
722        let failed = self.state_peers(PeerAddrState::Failed).count();
723        let attempt_pending = self.state_peers(PeerAddrState::AttemptPending).count();
724
725        let recently_live = self.recently_live_peers(now).len();
726        let recently_stopped_responding = responded
727            .checked_sub(recently_live)
728            .expect("all recently live peers must have responded");
729
730        let num_addresses = self.len();
731
732        AddressMetrics {
733            responded,
734            never_attempted_gossiped,
735            failed,
736            attempt_pending,
737            recently_live,
738            recently_stopped_responding,
739            num_addresses,
740            address_limit: self.addr_limit,
741        }
742    }
743
744    /// Update the metrics for this address book.
745    fn update_metrics(&mut self, instant_now: Instant, chrono_now: chrono::DateTime<Utc>) {
746        let _guard = self.span.enter();
747
748        let m = self.address_metrics_internal(chrono_now);
749
750        // Ignore errors: we don't care if any receivers are listening.
751        let _ = self.address_metrics_tx.send(m);
752
753        // TODO: rename to address_book.[state_name]
754        metrics::gauge!("candidate_set.responded").set(m.responded as f64);
755        metrics::gauge!("candidate_set.gossiped").set(m.never_attempted_gossiped as f64);
756        metrics::gauge!("candidate_set.failed").set(m.failed as f64);
757        metrics::gauge!("candidate_set.pending").set(m.attempt_pending as f64);
758
759        // TODO: rename to address_book.responded.recently_live
760        metrics::gauge!("candidate_set.recently_live").set(m.recently_live as f64);
761        // TODO: rename to address_book.responded.stopped_responding
762        metrics::gauge!("candidate_set.disconnected").set(m.recently_stopped_responding as f64);
763
764        std::mem::drop(_guard);
765        self.log_metrics(&m, instant_now);
766    }
767
768    /// Log metrics for this address book
769    fn log_metrics(&mut self, m: &AddressMetrics, now: Instant) {
770        let _guard = self.span.enter();
771
772        trace!(
773            address_metrics = ?m,
774        );
775
776        if m.responded > 0 {
777            return;
778        }
779
780        // These logs are designed to be human-readable in a terminal, at the
781        // default Zebra log level. If you need to know address states for
782        // every request, use the trace-level logs, or the metrics exporter.
783        if let Some(last_address_log) = self.last_address_log {
784            // Avoid duplicate address logs
785            if now.saturating_duration_since(last_address_log).as_secs() < 60 {
786                return;
787            }
788        } else {
789            // Suppress initial logs until the peer set has started up.
790            // There can be multiple address changes before the first peer has
791            // responded.
792            self.last_address_log = Some(now);
793            return;
794        }
795
796        self.last_address_log = Some(now);
797        // if all peers have failed
798        if m.responded + m.attempt_pending + m.never_attempted_gossiped == 0 {
799            warn!(
800                address_metrics = ?m,
801                "all peer addresses have failed. Hint: check your network connection"
802            );
803        } else {
804            info!(
805                address_metrics = ?m,
806                "no active peer connections: trying gossiped addresses"
807            );
808        }
809    }
810}
811
812impl AddressBookPeers for AddressBook {
813    fn recently_live_peers(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
814        let _guard = self.span.enter();
815
816        self.by_addr
817            .descending_values()
818            .filter(|peer| peer.was_recently_live(now))
819            .cloned()
820            .collect()
821    }
822}
823
824impl AddressBookPeers for Arc<Mutex<AddressBook>> {
825    fn recently_live_peers(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
826        self.lock()
827            .expect("panic in a previous thread that was holding the mutex")
828            .recently_live_peers(now)
829    }
830}
831
832impl Extend<MetaAddrChange> for AddressBook {
833    fn extend<T>(&mut self, iter: T)
834    where
835        T: IntoIterator<Item = MetaAddrChange>,
836    {
837        for change in iter.into_iter() {
838            self.update(change);
839        }
840    }
841}
842
843impl Clone for AddressBook {
844    /// Clone the addresses, address limit, local listener address, and span.
845    ///
846    /// Cloned address books have a separate metrics struct watch channel, and an empty last address log.
847    ///
848    /// All address books update the same prometheus metrics.
849    fn clone(&self) -> AddressBook {
850        // The existing metrics might be outdated, but we avoid calling `update_metrics`,
851        // so we don't overwrite the prometheus metrics from the main address book.
852        let (address_metrics_tx, _address_metrics_rx) =
853            watch::channel(*self.address_metrics_tx.borrow());
854
855        AddressBook {
856            by_addr: self.by_addr.clone(),
857            local_listener: self.local_listener,
858            network: self.network.clone(),
859            addr_limit: self.addr_limit,
860            span: self.span.clone(),
861            address_metrics_tx,
862            last_address_log: None,
863            most_recent_by_ip: self.most_recent_by_ip.clone(),
864            bans_by_ip: self.bans_by_ip.clone(),
865        }
866    }
867}