1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
//! The timestamp collector collects liveness information from peers.

use std::{cmp::max, net::SocketAddr, sync::Arc};

use thiserror::Error;
use tokio::{
    sync::{mpsc, watch},
    task::JoinHandle,
};
use tracing::{Level, Span};

use crate::{
    address_book::AddressMetrics, meta_addr::MetaAddrChange, AddressBook, BoxError, Config,
};

/// The minimum size of the address book updater channel.
pub const MIN_CHANNEL_SIZE: usize = 10;

/// The `AddressBookUpdater` hooks into incoming message streams for each peer
/// and lets the owner of the sender handle update the address book. For
/// example, it can be used to record per-connection last-seen timestamps, or
/// add new initial peers to the address book.
#[derive(Debug, Eq, PartialEq)]
pub struct AddressBookUpdater;

#[derive(Copy, Clone, Debug, Error, Eq, PartialEq, Hash)]
#[error("all address book updater senders are closed")]
pub struct AllAddressBookUpdaterSendersClosed;

impl AddressBookUpdater {
    /// Spawn a new [`AddressBookUpdater`] task, updating a new [`AddressBook`]
    /// configured with Zebra's actual `local_listener` address.
    ///
    /// Returns handles for:
    /// - the address book,
    /// - the transmission channel for address book update events,
    /// - a watch channel for address book metrics, and
    /// - the address book updater task join handle.
    ///
    /// The task exits with an error when the returned [`mpsc::Sender`] is closed.
    pub fn spawn(
        config: &Config,
        local_listener: SocketAddr,
    ) -> (
        Arc<std::sync::Mutex<AddressBook>>,
        mpsc::Sender<MetaAddrChange>,
        watch::Receiver<AddressMetrics>,
        JoinHandle<Result<(), BoxError>>,
    ) {
        // Create an mpsc channel for peerset address book updates,
        // based on the maximum number of inbound and outbound peers.
        let (worker_tx, mut worker_rx) = mpsc::channel(max(
            config.peerset_total_connection_limit(),
            MIN_CHANNEL_SIZE,
        ));

        let address_book = AddressBook::new(
            local_listener,
            &config.network,
            config.max_connections_per_ip,
            span!(Level::TRACE, "address book"),
        );
        let address_metrics = address_book.address_metrics_watcher();
        let address_book = Arc::new(std::sync::Mutex::new(address_book));

        #[cfg(feature = "progress-bar")]
        let (mut address_info, address_bar, never_bar, failed_bar) = {
            let address_bar = howudoin::new_root().label("Known Peers");
            let never_bar =
                howudoin::new_with_parent(address_bar.id()).label("Never Attempted Peers");
            let failed_bar = howudoin::new_with_parent(never_bar.id()).label("Failed Peers");

            (address_metrics.clone(), address_bar, never_bar, failed_bar)
        };

        let worker_address_book = address_book.clone();
        let worker = move || {
            info!("starting the address book updater");

            while let Some(event) = worker_rx.blocking_recv() {
                trace!(?event, "got address book change");

                // # Correctness
                //
                // Briefly hold the address book threaded mutex, to update the
                // state for a single address.
                worker_address_book
                    .lock()
                    .expect("mutex should be unpoisoned")
                    .update(event);

                #[cfg(feature = "progress-bar")]
                if matches!(howudoin::cancelled(), Some(true)) {
                    address_bar.close();
                    never_bar.close();
                    failed_bar.close();
                } else if address_info.has_changed()? {
                    // We don't track:
                    // - attempt pending because it's always small
                    // - responded because it's the remaining attempted-but-not-failed peers
                    // - recently live because it's similar to the connected peer counts

                    let address_info = *address_info.borrow_and_update();

                    address_bar
                        .set_pos(u64::try_from(address_info.num_addresses).expect("fits in u64"));
                    // .set_len(u64::try_from(address_info.address_limit).expect("fits in u64"));

                    never_bar.set_pos(
                        u64::try_from(address_info.never_attempted_gossiped).expect("fits in u64"),
                    );
                    // .set_len(u64::try_from(address_info.address_limit).expect("fits in u64"));

                    failed_bar.set_pos(u64::try_from(address_info.failed).expect("fits in u64"));
                    // .set_len(u64::try_from(address_info.address_limit).expect("fits in u64"));
                }
            }

            #[cfg(feature = "progress-bar")]
            {
                address_bar.close();
                never_bar.close();
                failed_bar.close();
            }

            let error = Err(AllAddressBookUpdaterSendersClosed.into());
            info!(?error, "stopping address book updater");
            error
        };

        // Correctness: spawn address book accesses on a blocking thread,
        //              to avoid deadlocks (see #1976)
        let span = Span::current();
        let address_book_updater_task_handle =
            tokio::task::spawn_blocking(move || span.in_scope(worker));

        (
            address_book,
            worker_tx,
            address_metrics,
            address_book_updater_task_handle,
        )
    }
}