zebra_network/address_book.rs
1//! The `AddressBook` manages information about what peers exist, when they were
2//! seen, and what services they provide.
3
4use std::{
5 cmp::Reverse,
6 collections::HashMap,
7 net::{IpAddr, SocketAddr},
8 sync::{Arc, Mutex},
9 time::Instant,
10};
11
12use chrono::Utc;
13use indexmap::IndexMap;
14use ordered_map::OrderedMap;
15use tokio::sync::watch;
16use tracing::Span;
17
18use zebra_chain::{parameters::Network, serialization::DateTime32};
19
20use crate::{
21 constants::{self, ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE},
22 meta_addr::MetaAddrChange,
23 protocol::external::{canonical_peer_addr, canonical_socket_addr},
24 types::MetaAddr,
25 AddressBookPeers, PeerAddrState, PeerSocketAddr,
26};
27
28#[cfg(test)]
29mod tests;
30
31/// A database of peer listener addresses, their advertised services, and
32/// information on when they were last seen.
33///
34/// # Security
35///
36/// Address book state must be based on outbound connections to peers.
37///
38/// If the address book is updated incorrectly:
39/// - malicious peers can interfere with other peers' `AddressBook` state,
40/// or
41/// - Zebra can advertise unreachable addresses to its own peers.
42///
43/// ## Adding Addresses
44///
45/// The address book should only contain Zcash listener port addresses from peers
46/// on the configured network. These addresses can come from:
47/// - DNS seeders
48/// - addresses gossiped by other peers
49/// - the canonical address (`Version.address_from`) provided by each peer,
50/// particularly peers on inbound connections.
51///
52/// The remote addresses of inbound connections must not be added to the address
53/// book, because they contain ephemeral outbound ports, not listener ports.
54///
55/// Isolated connections must not add addresses or update the address book.
56///
57/// ## Updating Address State
58///
59/// Updates to address state must be based on outbound connections to peers.
60///
61/// Updates must not be based on:
62/// - the remote addresses of inbound connections, or
63/// - the canonical address of any connection.
64#[derive(Debug)]
65pub struct AddressBook {
66 /// Peer listener addresses, suitable for outbound connections,
67 /// in connection attempt order.
68 ///
69 /// Some peers in this list might have open outbound or inbound connections.
70 ///
71 /// We reverse the comparison order, because the standard library
72 /// ([`BTreeMap`](std::collections::BTreeMap)) sorts in ascending order, but
73 /// [`OrderedMap`] sorts in descending order.
74 by_addr: OrderedMap<PeerSocketAddr, MetaAddr, Reverse<MetaAddr>>,
75
76 /// The address with a last_connection_state of [`PeerAddrState::Responded`] and
77 /// the most recent `last_response` time by IP.
78 ///
79 /// This is used to avoid initiating outbound connections past [`Config::max_connections_per_ip`](crate::config::Config), and
80 /// currently only supports a `max_connections_per_ip` of 1, and must be `None` when used with a greater `max_connections_per_ip`.
81 // TODO: Replace with `by_ip: HashMap<IpAddr, BTreeMap<DateTime32, MetaAddr>>` to support configured `max_connections_per_ip` greater than 1
82 most_recent_by_ip: Option<HashMap<IpAddr, MetaAddr>>,
83
84 /// A list of banned addresses, with the time they were banned.
85 bans_by_ip: Arc<IndexMap<IpAddr, Instant>>,
86
87 /// The local listener address.
88 local_listener: SocketAddr,
89
90 /// The configured Zcash network.
91 network: Network,
92
93 /// The maximum number of addresses in the address book.
94 ///
95 /// Always set to [`MAX_ADDRS_IN_ADDRESS_BOOK`](constants::MAX_ADDRS_IN_ADDRESS_BOOK),
96 /// in release builds. Lower values are used during testing.
97 addr_limit: usize,
98
99 /// The span for operations on this address book.
100 span: Span,
101
102 /// A channel used to send the latest address book metrics.
103 address_metrics_tx: watch::Sender<AddressMetrics>,
104
105 /// The last time we logged a message about the address metrics.
106 last_address_log: Option<Instant>,
107}
108
109/// Metrics about the states of the addresses in an [`AddressBook`].
110#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)]
111pub struct AddressMetrics {
112 /// The number of addresses in the `Responded` state.
113 pub responded: usize,
114
115 /// The number of addresses in the `NeverAttemptedGossiped` state.
116 pub never_attempted_gossiped: usize,
117
118 /// The number of addresses in the `Failed` state.
119 pub failed: usize,
120
121 /// The number of addresses in the `AttemptPending` state.
122 pub attempt_pending: usize,
123
124 /// The number of `Responded` addresses within the liveness limit.
125 pub recently_live: usize,
126
127 /// The number of `Responded` addresses outside the liveness limit.
128 pub recently_stopped_responding: usize,
129
130 /// The number of addresses in the address book, regardless of their states.
131 pub num_addresses: usize,
132
133 /// The maximum number of addresses in the address book.
134 pub address_limit: usize,
135}
136
137#[allow(clippy::len_without_is_empty)]
138impl AddressBook {
139 /// Construct an [`AddressBook`] with the given `local_listener` on `network`.
140 ///
141 /// Uses the supplied [`tracing::Span`] for address book operations.
142 pub fn new(
143 local_listener: SocketAddr,
144 network: &Network,
145 max_connections_per_ip: usize,
146 span: Span,
147 ) -> AddressBook {
148 let constructor_span = span.clone();
149 let _guard = constructor_span.enter();
150
151 let instant_now = Instant::now();
152 let chrono_now = Utc::now();
153
154 // The default value is correct for an empty address book,
155 // and it gets replaced by `update_metrics` anyway.
156 let (address_metrics_tx, _address_metrics_rx) = watch::channel(AddressMetrics::default());
157
158 // Avoid initiating outbound handshakes when max_connections_per_ip is 1.
159 let should_limit_outbound_conns_per_ip = max_connections_per_ip == 1;
160 let mut new_book = AddressBook {
161 by_addr: OrderedMap::new(|meta_addr| Reverse(*meta_addr)),
162 local_listener: canonical_socket_addr(local_listener),
163 network: network.clone(),
164 addr_limit: constants::MAX_ADDRS_IN_ADDRESS_BOOK,
165 span,
166 address_metrics_tx,
167 last_address_log: None,
168 most_recent_by_ip: should_limit_outbound_conns_per_ip.then(HashMap::new),
169 bans_by_ip: Default::default(),
170 };
171
172 new_book.update_metrics(instant_now, chrono_now);
173 new_book
174 }
175
176 /// Construct an [`AddressBook`] with the given `local_listener`, `network`,
177 /// `addr_limit`, [`tracing::Span`], and addresses.
178 ///
179 /// `addr_limit` is enforced by this method, and by [`AddressBook::update`].
180 ///
181 /// If there are multiple [`MetaAddr`]s with the same address,
182 /// an arbitrary address is inserted into the address book,
183 /// and the rest are dropped.
184 ///
185 /// This constructor can be used to break address book invariants,
186 /// so it should only be used in tests.
187 #[cfg(any(test, feature = "proptest-impl"))]
188 pub fn new_with_addrs(
189 local_listener: SocketAddr,
190 network: &Network,
191 max_connections_per_ip: usize,
192 addr_limit: usize,
193 span: Span,
194 addrs: impl IntoIterator<Item = MetaAddr>,
195 ) -> AddressBook {
196 let constructor_span = span.clone();
197 let _guard = constructor_span.enter();
198
199 let instant_now = Instant::now();
200 let chrono_now = Utc::now();
201
202 // The maximum number of addresses should be always greater than 0
203 assert!(addr_limit > 0);
204
205 let mut new_book = AddressBook::new(local_listener, network, max_connections_per_ip, span);
206 new_book.addr_limit = addr_limit;
207
208 let addrs = addrs
209 .into_iter()
210 .map(|mut meta_addr| {
211 meta_addr.addr = canonical_peer_addr(meta_addr.addr);
212 meta_addr
213 })
214 .filter(|meta_addr| meta_addr.address_is_valid_for_outbound(network))
215 .map(|meta_addr| (meta_addr.addr, meta_addr));
216
217 for (socket_addr, meta_addr) in addrs {
218 // overwrite any duplicate addresses
219 new_book.by_addr.insert(socket_addr, meta_addr);
220 // Add the address to `most_recent_by_ip` if it has responded
221 if new_book.should_update_most_recent_by_ip(meta_addr) {
222 new_book
223 .most_recent_by_ip
224 .as_mut()
225 .expect("should be some when should_update_most_recent_by_ip is true")
226 .insert(socket_addr.ip(), meta_addr);
227 }
228 // exit as soon as we get enough addresses
229 if new_book.by_addr.len() >= addr_limit {
230 break;
231 }
232 }
233
234 new_book.update_metrics(instant_now, chrono_now);
235 new_book
236 }
237
238 /// Return a watch channel for the address book metrics.
239 ///
240 /// The metrics in the watch channel are only updated when the address book updates,
241 /// so they can be significantly outdated if Zebra is disconnected or hung.
242 ///
243 /// The current metrics value is marked as seen.
244 /// So `Receiver::changed` will only return after the next address book update.
245 pub fn address_metrics_watcher(&self) -> watch::Receiver<AddressMetrics> {
246 self.address_metrics_tx.subscribe()
247 }
248
249 /// Set the local listener address. Only for use in tests.
250 #[cfg(any(test, feature = "proptest-impl"))]
251 pub fn set_local_listener(&mut self, addr: SocketAddr) {
252 self.local_listener = addr;
253 }
254
255 /// Get the local listener address.
256 ///
257 /// This address contains minimal state, but it is not sanitized.
258 pub fn local_listener_meta_addr(&self, now: chrono::DateTime<Utc>) -> MetaAddr {
259 let now: DateTime32 = now.try_into().expect("will succeed until 2038");
260
261 MetaAddr::new_local_listener_change(self.local_listener)
262 .local_listener_into_new_meta_addr(now)
263 }
264
265 /// Get the local listener [`SocketAddr`].
266 pub fn local_listener_socket_addr(&self) -> SocketAddr {
267 self.local_listener
268 }
269
270 /// Get the active addresses in `self` in random order with sanitized timestamps,
271 /// including our local listener address.
272 ///
273 /// Limited to the number of peer addresses Zebra should give out per `GetAddr` request.
274 pub fn fresh_get_addr_response(&self) -> Vec<MetaAddr> {
275 let now = Utc::now();
276 let mut peers = self.sanitized(now);
277 let address_limit = peers.len().div_ceil(ADDR_RESPONSE_LIMIT_DENOMINATOR);
278 peers.truncate(MAX_ADDRS_IN_MESSAGE.min(address_limit));
279
280 peers
281 }
282
283 /// Get the active addresses in `self` in random order with sanitized timestamps,
284 /// including our local listener address.
285 pub(crate) fn sanitized(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
286 use rand::seq::SliceRandom;
287 let _guard = self.span.enter();
288
289 let mut peers = self.by_addr.clone();
290
291 // Unconditionally add our local listener address to the advertised peers,
292 // to replace any self-connection failures. The address book and change
293 // constructors make sure that the SocketAddr is canonical.
294 let local_listener = self.local_listener_meta_addr(now);
295 peers.insert(local_listener.addr, local_listener);
296
297 // Then sanitize and shuffle
298 let mut peers: Vec<MetaAddr> = peers
299 .descending_values()
300 .filter_map(|meta_addr| meta_addr.sanitize(&self.network))
301 // # Security
302 //
303 // Remove peers that:
304 // - last responded more than three hours ago, or
305 // - haven't responded yet but were reported last seen more than three hours ago
306 //
307 // This prevents Zebra from gossiping nodes that are likely unreachable. Gossiping such
308 // nodes impacts the network health, because connection attempts end up being wasted on
309 // peers that are less likely to respond.
310 .filter(|addr| addr.is_active_for_gossip(now))
311 .collect();
312
313 peers.shuffle(&mut rand::thread_rng());
314
315 peers
316 }
317
318 /// Get the active addresses in `self`, in preferred caching order,
319 /// excluding our local listener address.
320 pub fn cacheable(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
321 let _guard = self.span.enter();
322
323 let peers = self.by_addr.clone();
324
325 // Get peers in preferred order, then keep the recently active ones
326 peers
327 .descending_values()
328 // # Security
329 //
330 // Remove peers that:
331 // - last responded more than three hours ago, or
332 // - haven't responded yet but were reported last seen more than three hours ago
333 //
334 // This prevents Zebra from caching nodes that are likely unreachable,
335 // which improves startup time and reliability.
336 .filter(|addr| addr.is_active_for_gossip(now))
337 .cloned()
338 .collect()
339 }
340
341 /// Look up `addr` in the address book, and return its [`MetaAddr`].
342 ///
343 /// Converts `addr` to a canonical address before looking it up.
344 pub fn get(&mut self, addr: PeerSocketAddr) -> Option<MetaAddr> {
345 let addr = canonical_peer_addr(*addr);
346
347 // Unfortunately, `OrderedMap` doesn't implement `get`.
348 let meta_addr = self.by_addr.remove(&addr);
349
350 if let Some(meta_addr) = meta_addr {
351 self.by_addr.insert(addr, meta_addr);
352 }
353
354 meta_addr
355 }
356
357 /// Returns true if `updated` needs to be applied to the recent outbound peer connection IP cache.
358 ///
359 /// Checks if there are no existing entries in the address book with this IP,
360 /// or if `updated` has a more recent `last_response` requiring the outbound connector to wait
361 /// longer before initiating handshakes with peers at this IP.
362 ///
363 /// This code only needs to check a single cache entry, rather than the entire address book,
364 /// because other code maintains these invariants:
365 /// - `last_response` times for an entry can only increase.
366 /// - this is the only field checked by `has_connection_recently_responded()`
367 ///
368 /// See [`AddressBook::is_ready_for_connection_attempt_with_ip`] for more details.
369 fn should_update_most_recent_by_ip(&self, updated: MetaAddr) -> bool {
370 let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
371 return false;
372 };
373
374 if let Some(previous) = most_recent_by_ip.get(&updated.addr.ip()) {
375 updated.last_connection_state == PeerAddrState::Responded
376 && updated.last_response() > previous.last_response()
377 } else {
378 updated.last_connection_state == PeerAddrState::Responded
379 }
380 }
381
382 /// Returns true if `addr` is the latest entry for its IP, which is stored in `most_recent_by_ip`.
383 /// The entry is checked for an exact match to the IP and port of `addr`.
384 fn should_remove_most_recent_by_ip(&self, addr: PeerSocketAddr) -> bool {
385 let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
386 return false;
387 };
388
389 if let Some(previous) = most_recent_by_ip.get(&addr.ip()) {
390 previous.addr == addr
391 } else {
392 false
393 }
394 }
395
396 /// Apply `change` to the address book, returning the updated `MetaAddr`,
397 /// if the change was valid.
398 ///
399 /// # Correctness
400 ///
401 /// All changes should go through `update`, so that the address book
402 /// only contains valid outbound addresses.
403 ///
404 /// Change addresses must be canonical `PeerSocketAddr`s. This makes sure that
405 /// each address book entry has a unique IP address.
406 ///
407 /// # Security
408 ///
409 /// This function must apply every attempted, responded, and failed change
410 /// to the address book. This prevents rapid reconnections to the same peer.
411 ///
412 /// As an exception, this function can ignore all changes for specific
413 /// [`PeerSocketAddr`]s. Ignored addresses will never be used to connect to
414 /// peers.
415 #[allow(clippy::unwrap_in_result)]
416 pub fn update(&mut self, change: MetaAddrChange) -> Option<MetaAddr> {
417 if self.bans_by_ip.contains_key(&change.addr().ip()) {
418 tracing::warn!(
419 ?change,
420 "attempted to add a banned peer addr to address book"
421 );
422 return None;
423 }
424
425 let previous = self.get(change.addr());
426
427 let _guard = self.span.enter();
428
429 let instant_now = Instant::now();
430 let chrono_now = Utc::now();
431
432 let updated = change.apply_to_meta_addr(previous, instant_now, chrono_now);
433
434 trace!(
435 ?change,
436 ?updated,
437 ?previous,
438 total_peers = self.by_addr.len(),
439 recent_peers = self.recently_live_peers(chrono_now).len(),
440 "calculated updated address book entry",
441 );
442
443 if let Some(updated) = updated {
444 if updated.misbehavior() >= constants::MAX_PEER_MISBEHAVIOR_SCORE {
445 // Ban and skip outbound connections with excessively misbehaving peers.
446 let banned_ip = updated.addr.ip();
447 let bans_by_ip = Arc::make_mut(&mut self.bans_by_ip);
448
449 bans_by_ip.insert(banned_ip, Instant::now());
450 if bans_by_ip.len() > constants::MAX_BANNED_IPS {
451 // Remove the oldest banned IP from the address book.
452 bans_by_ip.shift_remove_index(0);
453 }
454
455 self.most_recent_by_ip
456 .as_mut()
457 .expect("should be some when should_remove_most_recent_by_ip is true")
458 .remove(&banned_ip);
459
460 let banned_addrs: Vec<_> = self
461 .by_addr
462 .descending_keys()
463 .skip_while(|addr| addr.ip() != banned_ip)
464 .take_while(|addr| addr.ip() == banned_ip)
465 .cloned()
466 .collect();
467
468 for addr in banned_addrs {
469 self.by_addr.remove(&addr);
470 }
471
472 warn!(
473 ?updated,
474 total_peers = self.by_addr.len(),
475 recent_peers = self.recently_live_peers(chrono_now).len(),
476 "banned ip and removed banned peer addresses from address book",
477 );
478
479 return None;
480 }
481
482 // Ignore invalid outbound addresses.
483 // (Inbound connections can be monitored via Zebra's metrics.)
484 if !updated.address_is_valid_for_outbound(&self.network) {
485 return None;
486 }
487
488 // Ignore invalid outbound services and other info,
489 // but only if the peer has never been attempted.
490 //
491 // Otherwise, if we got the info directly from the peer,
492 // store it in the address book, so we know not to reconnect.
493 if !updated.last_known_info_is_valid_for_outbound(&self.network)
494 && updated.last_connection_state.is_never_attempted()
495 {
496 return None;
497 }
498
499 self.by_addr.insert(updated.addr, updated);
500
501 // Add the address to `most_recent_by_ip` if it sent the most recent
502 // response Zebra has received from this IP.
503 if self.should_update_most_recent_by_ip(updated) {
504 self.most_recent_by_ip
505 .as_mut()
506 .expect("should be some when should_update_most_recent_by_ip is true")
507 .insert(updated.addr.ip(), updated);
508 }
509
510 debug!(
511 ?change,
512 ?updated,
513 ?previous,
514 total_peers = self.by_addr.len(),
515 recent_peers = self.recently_live_peers(chrono_now).len(),
516 "updated address book entry",
517 );
518
519 // Security: Limit the number of peers in the address book.
520 //
521 // We only delete outdated peers when we have too many peers.
522 // If we deleted them as soon as they became too old,
523 // then other peers could re-insert them into the address book.
524 // And we would start connecting to those outdated peers again,
525 // ignoring the age limit in [`MetaAddr::is_probably_reachable`].
526 while self.by_addr.len() > self.addr_limit {
527 let surplus_peer = self
528 .peers()
529 .next_back()
530 .expect("just checked there is at least one peer");
531
532 self.by_addr.remove(&surplus_peer.addr);
533
534 // Check if this surplus peer's addr matches that in `most_recent_by_ip`
535 // for this the surplus peer's ip to remove it there as well.
536 if self.should_remove_most_recent_by_ip(surplus_peer.addr) {
537 self.most_recent_by_ip
538 .as_mut()
539 .expect("should be some when should_remove_most_recent_by_ip is true")
540 .remove(&surplus_peer.addr.ip());
541 }
542
543 debug!(
544 surplus = ?surplus_peer,
545 ?updated,
546 total_peers = self.by_addr.len(),
547 recent_peers = self.recently_live_peers(chrono_now).len(),
548 "removed surplus address book entry",
549 );
550 }
551
552 assert!(self.len() <= self.addr_limit);
553
554 std::mem::drop(_guard);
555 self.update_metrics(instant_now, chrono_now);
556 }
557
558 updated
559 }
560
561 /// Removes the entry with `addr`, returning it if it exists
562 ///
563 /// # Note
564 ///
565 /// All address removals should go through `take`, so that the address
566 /// book metrics are accurate.
567 #[allow(dead_code)]
568 fn take(&mut self, removed_addr: PeerSocketAddr) -> Option<MetaAddr> {
569 let _guard = self.span.enter();
570
571 let instant_now = Instant::now();
572 let chrono_now = Utc::now();
573
574 trace!(
575 ?removed_addr,
576 total_peers = self.by_addr.len(),
577 recent_peers = self.recently_live_peers(chrono_now).len(),
578 );
579
580 if let Some(entry) = self.by_addr.remove(&removed_addr) {
581 // Check if this surplus peer's addr matches that in `most_recent_by_ip`
582 // for this the surplus peer's ip to remove it there as well.
583 if self.should_remove_most_recent_by_ip(entry.addr) {
584 if let Some(most_recent_by_ip) = self.most_recent_by_ip.as_mut() {
585 most_recent_by_ip.remove(&entry.addr.ip());
586 }
587 }
588
589 std::mem::drop(_guard);
590 self.update_metrics(instant_now, chrono_now);
591 Some(entry)
592 } else {
593 None
594 }
595 }
596
597 /// Returns true if the given [`PeerSocketAddr`] is pending a reconnection
598 /// attempt.
599 pub fn pending_reconnection_addr(&mut self, addr: PeerSocketAddr) -> bool {
600 let meta_addr = self.get(addr);
601
602 let _guard = self.span.enter();
603 match meta_addr {
604 None => false,
605 Some(peer) => peer.last_connection_state == PeerAddrState::AttemptPending,
606 }
607 }
608
609 /// Return an iterator over all peers.
610 ///
611 /// Returns peers in reconnection attempt order, including recently connected peers.
612 pub fn peers(&'_ self) -> impl DoubleEndedIterator<Item = MetaAddr> + '_ {
613 let _guard = self.span.enter();
614 self.by_addr.descending_values().cloned()
615 }
616
617 /// Is this IP ready for a new outbound connection attempt?
618 /// Checks if the outbound connection with the most recent response at this IP has recently responded.
619 ///
620 /// Note: last_response times may remain live for a long time if the local clock is changed to an earlier time.
621 fn is_ready_for_connection_attempt_with_ip(
622 &self,
623 ip: &IpAddr,
624 chrono_now: chrono::DateTime<Utc>,
625 ) -> bool {
626 let Some(most_recent_by_ip) = self.most_recent_by_ip.as_ref() else {
627 // if we're not checking IPs, any connection is allowed
628 return true;
629 };
630 let Some(same_ip_peer) = most_recent_by_ip.get(ip) else {
631 // If there's no entry for this IP, any connection is allowed
632 return true;
633 };
634 !same_ip_peer.has_connection_recently_responded(chrono_now)
635 }
636
637 /// Return an iterator over peers that are due for a reconnection attempt,
638 /// in reconnection attempt order.
639 pub fn reconnection_peers(
640 &'_ self,
641 instant_now: Instant,
642 chrono_now: chrono::DateTime<Utc>,
643 ) -> impl DoubleEndedIterator<Item = MetaAddr> + '_ {
644 let _guard = self.span.enter();
645
646 // Skip live peers, and peers pending a reconnect attempt.
647 // The peers are already stored in sorted order.
648 self.by_addr
649 .descending_values()
650 .filter(move |peer| {
651 peer.is_ready_for_connection_attempt(instant_now, chrono_now, &self.network)
652 && self.is_ready_for_connection_attempt_with_ip(&peer.addr.ip(), chrono_now)
653 })
654 .cloned()
655 }
656
657 /// Return an iterator over all the peers in `state`,
658 /// in reconnection attempt order, including recently connected peers.
659 pub fn state_peers(
660 &'_ self,
661 state: PeerAddrState,
662 ) -> impl DoubleEndedIterator<Item = MetaAddr> + '_ {
663 let _guard = self.span.enter();
664
665 self.by_addr
666 .descending_values()
667 .filter(move |peer| peer.last_connection_state == state)
668 .cloned()
669 }
670
671 /// Return an iterator over peers that might be connected,
672 /// in reconnection attempt order.
673 pub fn maybe_connected_peers(
674 &'_ self,
675 instant_now: Instant,
676 chrono_now: chrono::DateTime<Utc>,
677 ) -> impl DoubleEndedIterator<Item = MetaAddr> + '_ {
678 let _guard = self.span.enter();
679
680 self.by_addr
681 .descending_values()
682 .filter(move |peer| {
683 !peer.is_ready_for_connection_attempt(instant_now, chrono_now, &self.network)
684 })
685 .cloned()
686 }
687
688 /// Returns banned IP addresses.
689 pub fn bans(&self) -> Arc<IndexMap<IpAddr, Instant>> {
690 self.bans_by_ip.clone()
691 }
692
693 /// Returns the number of entries in this address book.
694 pub fn len(&self) -> usize {
695 self.by_addr.len()
696 }
697
698 /// Returns metrics for the addresses in this address book.
699 /// Only for use in tests.
700 ///
701 /// # Correctness
702 ///
703 /// Use [`AddressBook::address_metrics_watcher().borrow()`] in production code,
704 /// to avoid deadlocks.
705 #[cfg(test)]
706 pub fn address_metrics(&self, now: chrono::DateTime<Utc>) -> AddressMetrics {
707 self.address_metrics_internal(now)
708 }
709
710 /// Returns metrics for the addresses in this address book.
711 ///
712 /// # Correctness
713 ///
714 /// External callers should use [`AddressBook::address_metrics_watcher().borrow()`]
715 /// in production code, to avoid deadlocks.
716 /// (Using the watch channel receiver does not lock the address book mutex.)
717 fn address_metrics_internal(&self, now: chrono::DateTime<Utc>) -> AddressMetrics {
718 let responded = self.state_peers(PeerAddrState::Responded).count();
719 let never_attempted_gossiped = self
720 .state_peers(PeerAddrState::NeverAttemptedGossiped)
721 .count();
722 let failed = self.state_peers(PeerAddrState::Failed).count();
723 let attempt_pending = self.state_peers(PeerAddrState::AttemptPending).count();
724
725 let recently_live = self.recently_live_peers(now).len();
726 let recently_stopped_responding = responded
727 .checked_sub(recently_live)
728 .expect("all recently live peers must have responded");
729
730 let num_addresses = self.len();
731
732 AddressMetrics {
733 responded,
734 never_attempted_gossiped,
735 failed,
736 attempt_pending,
737 recently_live,
738 recently_stopped_responding,
739 num_addresses,
740 address_limit: self.addr_limit,
741 }
742 }
743
744 /// Update the metrics for this address book.
745 fn update_metrics(&mut self, instant_now: Instant, chrono_now: chrono::DateTime<Utc>) {
746 let _guard = self.span.enter();
747
748 let m = self.address_metrics_internal(chrono_now);
749
750 // Ignore errors: we don't care if any receivers are listening.
751 let _ = self.address_metrics_tx.send(m);
752
753 // TODO: rename to address_book.[state_name]
754 metrics::gauge!("candidate_set.responded").set(m.responded as f64);
755 metrics::gauge!("candidate_set.gossiped").set(m.never_attempted_gossiped as f64);
756 metrics::gauge!("candidate_set.failed").set(m.failed as f64);
757 metrics::gauge!("candidate_set.pending").set(m.attempt_pending as f64);
758
759 // TODO: rename to address_book.responded.recently_live
760 metrics::gauge!("candidate_set.recently_live").set(m.recently_live as f64);
761 // TODO: rename to address_book.responded.stopped_responding
762 metrics::gauge!("candidate_set.disconnected").set(m.recently_stopped_responding as f64);
763
764 std::mem::drop(_guard);
765 self.log_metrics(&m, instant_now);
766 }
767
768 /// Log metrics for this address book
769 fn log_metrics(&mut self, m: &AddressMetrics, now: Instant) {
770 let _guard = self.span.enter();
771
772 trace!(
773 address_metrics = ?m,
774 );
775
776 if m.responded > 0 {
777 return;
778 }
779
780 // These logs are designed to be human-readable in a terminal, at the
781 // default Zebra log level. If you need to know address states for
782 // every request, use the trace-level logs, or the metrics exporter.
783 if let Some(last_address_log) = self.last_address_log {
784 // Avoid duplicate address logs
785 if now.saturating_duration_since(last_address_log).as_secs() < 60 {
786 return;
787 }
788 } else {
789 // Suppress initial logs until the peer set has started up.
790 // There can be multiple address changes before the first peer has
791 // responded.
792 self.last_address_log = Some(now);
793 return;
794 }
795
796 self.last_address_log = Some(now);
797 // if all peers have failed
798 if m.responded + m.attempt_pending + m.never_attempted_gossiped == 0 {
799 warn!(
800 address_metrics = ?m,
801 "all peer addresses have failed. Hint: check your network connection"
802 );
803 } else {
804 info!(
805 address_metrics = ?m,
806 "no active peer connections: trying gossiped addresses"
807 );
808 }
809 }
810}
811
812impl AddressBookPeers for AddressBook {
813 fn recently_live_peers(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
814 let _guard = self.span.enter();
815
816 self.by_addr
817 .descending_values()
818 .filter(|peer| peer.was_recently_live(now))
819 .cloned()
820 .collect()
821 }
822}
823
824impl AddressBookPeers for Arc<Mutex<AddressBook>> {
825 fn recently_live_peers(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
826 self.lock()
827 .expect("panic in a previous thread that was holding the mutex")
828 .recently_live_peers(now)
829 }
830}
831
832impl Extend<MetaAddrChange> for AddressBook {
833 fn extend<T>(&mut self, iter: T)
834 where
835 T: IntoIterator<Item = MetaAddrChange>,
836 {
837 for change in iter.into_iter() {
838 self.update(change);
839 }
840 }
841}
842
843impl Clone for AddressBook {
844 /// Clone the addresses, address limit, local listener address, and span.
845 ///
846 /// Cloned address books have a separate metrics struct watch channel, and an empty last address log.
847 ///
848 /// All address books update the same prometheus metrics.
849 fn clone(&self) -> AddressBook {
850 // The existing metrics might be outdated, but we avoid calling `update_metrics`,
851 // so we don't overwrite the prometheus metrics from the main address book.
852 let (address_metrics_tx, _address_metrics_rx) =
853 watch::channel(*self.address_metrics_tx.borrow());
854
855 AddressBook {
856 by_addr: self.by_addr.clone(),
857 local_listener: self.local_listener,
858 network: self.network.clone(),
859 addr_limit: self.addr_limit,
860 span: self.span.clone(),
861 address_metrics_tx,
862 last_address_log: None,
863 most_recent_by_ip: self.most_recent_by_ip.clone(),
864 bans_by_ip: self.bans_by_ip.clone(),
865 }
866 }
867}