zebra_network/address_book_updater.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
//! The timestamp collector collects liveness information from peers.
use std::{
cmp::max,
net::{IpAddr, SocketAddr},
sync::Arc,
time::Instant,
};
use indexmap::IndexMap;
use thiserror::Error;
use tokio::{
sync::{mpsc, watch},
task::JoinHandle,
};
use tracing::{Level, Span};
use crate::{
address_book::AddressMetrics, meta_addr::MetaAddrChange, AddressBook, BoxError, Config,
};
/// The minimum size of the address book updater channel.
pub const MIN_CHANNEL_SIZE: usize = 10;
/// The `AddressBookUpdater` hooks into incoming message streams for each peer
/// and lets the owner of the sender handle update the address book. For
/// example, it can be used to record per-connection last-seen timestamps, or
/// add new initial peers to the address book.
#[derive(Debug, Eq, PartialEq)]
pub struct AddressBookUpdater;
#[derive(Copy, Clone, Debug, Error, Eq, PartialEq, Hash)]
#[error("all address book updater senders are closed")]
pub struct AllAddressBookUpdaterSendersClosed;
impl AddressBookUpdater {
/// Spawn a new [`AddressBookUpdater`] task, updating a new [`AddressBook`]
/// configured with Zebra's actual `local_listener` address.
///
/// Returns handles for:
/// - the address book,
/// - the transmission channel for address book update events,
/// - a watch channel for address book metrics, and
/// - the address book updater task join handle.
///
/// The task exits with an error when the returned [`mpsc::Sender`] is closed.
pub fn spawn(
config: &Config,
local_listener: SocketAddr,
) -> (
Arc<std::sync::Mutex<AddressBook>>,
watch::Receiver<Arc<IndexMap<IpAddr, Instant>>>,
mpsc::Sender<MetaAddrChange>,
watch::Receiver<AddressMetrics>,
JoinHandle<Result<(), BoxError>>,
) {
// Create an mpsc channel for peerset address book updates,
// based on the maximum number of inbound and outbound peers.
let (worker_tx, mut worker_rx) = mpsc::channel(max(
config.peerset_total_connection_limit(),
MIN_CHANNEL_SIZE,
));
let address_book = AddressBook::new(
local_listener,
&config.network,
config.max_connections_per_ip,
span!(Level::TRACE, "address book"),
);
let address_metrics = address_book.address_metrics_watcher();
let address_book = Arc::new(std::sync::Mutex::new(address_book));
#[cfg(feature = "progress-bar")]
let (mut address_info, address_bar, never_bar, failed_bar) = {
let address_bar = howudoin::new_root().label("Known Peers");
let never_bar =
howudoin::new_with_parent(address_bar.id()).label("Never Attempted Peers");
let failed_bar = howudoin::new_with_parent(never_bar.id()).label("Failed Peers");
(address_metrics.clone(), address_bar, never_bar, failed_bar)
};
let worker_address_book = address_book.clone();
let (bans_sender, bans_receiver) = tokio::sync::watch::channel(
worker_address_book
.lock()
.expect("mutex should be unpoisoned")
.bans(),
);
let worker = move || {
info!("starting the address book updater");
while let Some(event) = worker_rx.blocking_recv() {
trace!(?event, "got address book change");
// # Correctness
//
// Briefly hold the address book threaded mutex, to update the
// state for a single address.
let updated = worker_address_book
.lock()
.expect("mutex should be unpoisoned")
.update(event);
// `UpdateMisbehavior` events should only be passed to `update()` here,
// so that this channel is always updated when new addresses are banned.
if updated.is_none() {
let bans = worker_address_book
.lock()
.expect("mutex should be unpoisoned")
.bans();
if bans.contains_key(&event.addr().ip()) {
let _ = bans_sender.send(bans);
}
}
#[cfg(feature = "progress-bar")]
if matches!(howudoin::cancelled(), Some(true)) {
address_bar.close();
never_bar.close();
failed_bar.close();
} else if address_info.has_changed()? {
// We don't track:
// - attempt pending because it's always small
// - responded because it's the remaining attempted-but-not-failed peers
// - recently live because it's similar to the connected peer counts
let address_info = *address_info.borrow_and_update();
address_bar
.set_pos(u64::try_from(address_info.num_addresses).expect("fits in u64"));
// .set_len(u64::try_from(address_info.address_limit).expect("fits in u64"));
never_bar.set_pos(
u64::try_from(address_info.never_attempted_gossiped).expect("fits in u64"),
);
// .set_len(u64::try_from(address_info.address_limit).expect("fits in u64"));
failed_bar.set_pos(u64::try_from(address_info.failed).expect("fits in u64"));
// .set_len(u64::try_from(address_info.address_limit).expect("fits in u64"));
}
}
#[cfg(feature = "progress-bar")]
{
address_bar.close();
never_bar.close();
failed_bar.close();
}
let error = Err(AllAddressBookUpdaterSendersClosed.into());
info!(?error, "stopping address book updater");
error
};
// Correctness: spawn address book accesses on a blocking thread,
// to avoid deadlocks (see #1976)
let span = Span::current();
let address_book_updater_task_handle =
tokio::task::spawn_blocking(move || span.in_scope(worker));
(
address_book,
bans_receiver,
worker_tx,
address_metrics,
address_book_updater_task_handle,
)
}
}