zebra_network/
address_book_updater.rs
1use std::{
4 cmp::max,
5 net::{IpAddr, SocketAddr},
6 sync::Arc,
7 time::Instant,
8};
9
10use indexmap::IndexMap;
11use thiserror::Error;
12use tokio::{
13 sync::{mpsc, watch},
14 task::JoinHandle,
15};
16use tracing::{Level, Span};
17
18use crate::{
19 address_book::AddressMetrics, meta_addr::MetaAddrChange, AddressBook, BoxError, Config,
20};
21
22pub const MIN_CHANNEL_SIZE: usize = 10;
24
25#[derive(Debug, Eq, PartialEq)]
30pub struct AddressBookUpdater;
31
32#[derive(Copy, Clone, Debug, Error, Eq, PartialEq, Hash)]
33#[error("all address book updater senders are closed")]
34pub struct AllAddressBookUpdaterSendersClosed;
35
36impl AddressBookUpdater {
37 pub fn spawn(
48 config: &Config,
49 local_listener: SocketAddr,
50 ) -> (
51 Arc<std::sync::Mutex<AddressBook>>,
52 watch::Receiver<Arc<IndexMap<IpAddr, Instant>>>,
53 mpsc::Sender<MetaAddrChange>,
54 watch::Receiver<AddressMetrics>,
55 JoinHandle<Result<(), BoxError>>,
56 ) {
57 let (worker_tx, mut worker_rx) = mpsc::channel(max(
60 config.peerset_total_connection_limit(),
61 MIN_CHANNEL_SIZE,
62 ));
63
64 let address_book = AddressBook::new(
65 local_listener,
66 &config.network,
67 config.max_connections_per_ip,
68 span!(Level::TRACE, "address book"),
69 );
70 let address_metrics = address_book.address_metrics_watcher();
71 let address_book = Arc::new(std::sync::Mutex::new(address_book));
72
73 #[cfg(feature = "progress-bar")]
74 let (mut address_info, address_bar, never_bar, failed_bar) = {
75 let address_bar = howudoin::new_root().label("Known Peers");
76 let never_bar =
77 howudoin::new_with_parent(address_bar.id()).label("Never Attempted Peers");
78 let failed_bar = howudoin::new_with_parent(never_bar.id()).label("Failed Peers");
79
80 (address_metrics.clone(), address_bar, never_bar, failed_bar)
81 };
82
83 let worker_address_book = address_book.clone();
84 let (bans_sender, bans_receiver) = tokio::sync::watch::channel(
85 worker_address_book
86 .lock()
87 .expect("mutex should be unpoisoned")
88 .bans(),
89 );
90
91 let worker = move || {
92 info!("starting the address book updater");
93
94 while let Some(event) = worker_rx.blocking_recv() {
95 trace!(?event, "got address book change");
96
97 let updated = worker_address_book
102 .lock()
103 .expect("mutex should be unpoisoned")
104 .update(event);
105
106 if updated.is_none() {
109 let bans = worker_address_book
110 .lock()
111 .expect("mutex should be unpoisoned")
112 .bans();
113
114 if bans.contains_key(&event.addr().ip()) {
115 let _ = bans_sender.send(bans);
116 }
117 }
118
119 #[cfg(feature = "progress-bar")]
120 if matches!(howudoin::cancelled(), Some(true)) {
121 address_bar.close();
122 never_bar.close();
123 failed_bar.close();
124 } else if address_info.has_changed()? {
125 let address_info = *address_info.borrow_and_update();
131
132 address_bar
133 .set_pos(u64::try_from(address_info.num_addresses).expect("fits in u64"));
134 never_bar.set_pos(
137 u64::try_from(address_info.never_attempted_gossiped).expect("fits in u64"),
138 );
139 failed_bar.set_pos(u64::try_from(address_info.failed).expect("fits in u64"));
142 }
144 }
145
146 #[cfg(feature = "progress-bar")]
147 {
148 address_bar.close();
149 never_bar.close();
150 failed_bar.close();
151 }
152
153 let error = Err(AllAddressBookUpdaterSendersClosed.into());
154 info!(?error, "stopping address book updater");
155 error
156 };
157
158 let span = Span::current();
161 let address_book_updater_task_handle =
162 tokio::task::spawn_blocking(move || span.in_scope(worker));
163
164 (
165 address_book,
166 bans_receiver,
167 worker_tx,
168 address_metrics,
169 address_book_updater_task_handle,
170 )
171 }
172}