zebra_network/
address_book_updater.rs

1//! The timestamp collector collects liveness information from peers.
2
3use std::{
4    cmp::max,
5    net::{IpAddr, SocketAddr},
6    sync::Arc,
7    time::Instant,
8};
9
10use indexmap::IndexMap;
11use thiserror::Error;
12use tokio::{
13    sync::{mpsc, watch},
14    task::JoinHandle,
15};
16use tracing::{Level, Span};
17
18use crate::{
19    address_book::AddressMetrics, meta_addr::MetaAddrChange, AddressBook, BoxError, Config,
20};
21
22/// The minimum size of the address book updater channel.
23pub const MIN_CHANNEL_SIZE: usize = 10;
24
25/// The `AddressBookUpdater` hooks into incoming message streams for each peer
26/// and lets the owner of the sender handle update the address book. For
27/// example, it can be used to record per-connection last-seen timestamps, or
28/// add new initial peers to the address book.
29#[derive(Debug, Eq, PartialEq)]
30pub struct AddressBookUpdater;
31
32#[derive(Copy, Clone, Debug, Error, Eq, PartialEq, Hash)]
33#[error("all address book updater senders are closed")]
34pub struct AllAddressBookUpdaterSendersClosed;
35
36impl AddressBookUpdater {
37    /// Spawn a new [`AddressBookUpdater`] task, updating a new [`AddressBook`]
38    /// configured with Zebra's actual `local_listener` address.
39    ///
40    /// Returns handles for:
41    /// - the address book,
42    /// - the transmission channel for address book update events,
43    /// - a watch channel for address book metrics, and
44    /// - the address book updater task join handle.
45    ///
46    /// The task exits with an error when the returned [`mpsc::Sender`] is closed.
47    pub fn spawn(
48        config: &Config,
49        local_listener: SocketAddr,
50    ) -> (
51        Arc<std::sync::Mutex<AddressBook>>,
52        watch::Receiver<Arc<IndexMap<IpAddr, Instant>>>,
53        mpsc::Sender<MetaAddrChange>,
54        watch::Receiver<AddressMetrics>,
55        JoinHandle<Result<(), BoxError>>,
56    ) {
57        // Create an mpsc channel for peerset address book updates,
58        // based on the maximum number of inbound and outbound peers.
59        let (worker_tx, mut worker_rx) = mpsc::channel(max(
60            config.peerset_total_connection_limit(),
61            MIN_CHANNEL_SIZE,
62        ));
63
64        let address_book = AddressBook::new(
65            local_listener,
66            &config.network,
67            config.max_connections_per_ip,
68            span!(Level::TRACE, "address book"),
69        );
70        let address_metrics = address_book.address_metrics_watcher();
71        let address_book = Arc::new(std::sync::Mutex::new(address_book));
72
73        #[cfg(feature = "progress-bar")]
74        let (mut address_info, address_bar, never_bar, failed_bar) = {
75            let address_bar = howudoin::new_root().label("Known Peers");
76            let never_bar =
77                howudoin::new_with_parent(address_bar.id()).label("Never Attempted Peers");
78            let failed_bar = howudoin::new_with_parent(never_bar.id()).label("Failed Peers");
79
80            (address_metrics.clone(), address_bar, never_bar, failed_bar)
81        };
82
83        let worker_address_book = address_book.clone();
84        let (bans_sender, bans_receiver) = tokio::sync::watch::channel(
85            worker_address_book
86                .lock()
87                .expect("mutex should be unpoisoned")
88                .bans(),
89        );
90
91        let worker = move || {
92            info!("starting the address book updater");
93
94            while let Some(event) = worker_rx.blocking_recv() {
95                trace!(?event, "got address book change");
96
97                // # Correctness
98                //
99                // Briefly hold the address book threaded mutex, to update the
100                // state for a single address.
101                let updated = worker_address_book
102                    .lock()
103                    .expect("mutex should be unpoisoned")
104                    .update(event);
105
106                // `UpdateMisbehavior` events should only be passed to `update()` here,
107                // so that this channel is always updated when new addresses are banned.
108                if updated.is_none() {
109                    let bans = worker_address_book
110                        .lock()
111                        .expect("mutex should be unpoisoned")
112                        .bans();
113
114                    if bans.contains_key(&event.addr().ip()) {
115                        let _ = bans_sender.send(bans);
116                    }
117                }
118
119                #[cfg(feature = "progress-bar")]
120                if matches!(howudoin::cancelled(), Some(true)) {
121                    address_bar.close();
122                    never_bar.close();
123                    failed_bar.close();
124                } else if address_info.has_changed()? {
125                    // We don't track:
126                    // - attempt pending because it's always small
127                    // - responded because it's the remaining attempted-but-not-failed peers
128                    // - recently live because it's similar to the connected peer counts
129
130                    let address_info = *address_info.borrow_and_update();
131
132                    address_bar
133                        .set_pos(u64::try_from(address_info.num_addresses).expect("fits in u64"));
134                    // .set_len(u64::try_from(address_info.address_limit).expect("fits in u64"));
135
136                    never_bar.set_pos(
137                        u64::try_from(address_info.never_attempted_gossiped).expect("fits in u64"),
138                    );
139                    // .set_len(u64::try_from(address_info.address_limit).expect("fits in u64"));
140
141                    failed_bar.set_pos(u64::try_from(address_info.failed).expect("fits in u64"));
142                    // .set_len(u64::try_from(address_info.address_limit).expect("fits in u64"));
143                }
144            }
145
146            #[cfg(feature = "progress-bar")]
147            {
148                address_bar.close();
149                never_bar.close();
150                failed_bar.close();
151            }
152
153            let error = Err(AllAddressBookUpdaterSendersClosed.into());
154            info!(?error, "stopping address book updater");
155            error
156        };
157
158        // Correctness: spawn address book accesses on a blocking thread,
159        //              to avoid deadlocks (see #1976)
160        let span = Span::current();
161        let address_book_updater_task_handle =
162            tokio::task::spawn_blocking(move || span.in_scope(worker));
163
164        (
165            address_book,
166            bans_receiver,
167            worker_tx,
168            address_metrics,
169            address_book_updater_task_handle,
170        )
171    }
172}