zebra_network/peer_set/candidate_set.rs
1//! Candidate peer selection for outbound connections using the [`CandidateSet`].
2
3use std::{any::type_name, cmp::min, sync::Arc};
4
5use chrono::Utc;
6use futures::stream::{FuturesUnordered, StreamExt};
7use tokio::time::{sleep_until, timeout, Instant};
8use tower::{Service, ServiceExt};
9use tracing::Span;
10
11use zebra_chain::{diagnostic::task::WaitForPanics, serialization::DateTime32};
12
13use crate::{
14 constants, meta_addr::MetaAddrChange, peer_set::set::MorePeers, types::MetaAddr, AddressBook,
15 BoxError, Request, Response,
16};
17
18#[cfg(test)]
19mod tests;
20
21/// The [`CandidateSet`] manages outbound peer connection attempts. Successful
22/// connections become peers in the [`PeerSet`](super::set::PeerSet).
23///
24/// The candidate set divides the set of all possible outbound peers into
25/// disjoint subsets, using the [`PeerAddrState`](crate::PeerAddrState):
26///
27/// 1. [`Responded`] peers, which we have had an outbound connection to.
28/// 2. [`NeverAttemptedGossiped`] peers, which we learned about from other peers
29/// but have never connected to. This includes gossiped peers, DNS seeder peers,
30/// cached peers, canonical addresses from the [`Version`] messages of inbound
31/// and outbound connections, and remote IP addresses of inbound connections.
32/// 3. [`Failed`] peers, which failed a connection attempt, or had an error
33/// during an outbound connection.
34/// 4. [`AttemptPending`] peers, which we've recently queued for a connection.
35///
36/// Never attempted peers are always available for connection.
37///
38/// If a peer's attempted, responded, or failure time is recent
39/// (within the liveness limit), we avoid reconnecting to it.
40/// Otherwise, we assume that it has disconnected or hung,
41/// and attempt reconnection.
42///
43/// ```ascii,no_run
44/// ┌──────────────────┐
45/// │ Config / DNS │
46/// ┌───────────│ Seed │───────────┐
47/// │ │ Addresses │ │
48/// │ └──────────────────┘ │
49/// │ │ untrusted_last_seen │
50/// │ │ is unknown │
51/// ▼ │ ▼
52/// ┌──────────────────┐ │ ┌──────────────────┐
53/// │ Handshake │ │ │ Peer Set │
54/// │ Canonical │──────────┼──────────│ Gossiped │
55/// │ Addresses │ │ │ Addresses │
56/// └──────────────────┘ │ └──────────────────┘
57/// untrusted_last_seen │ provides
58/// set to now │ untrusted_last_seen
59/// ▼
60/// Λ if attempted, responded, or failed:
61/// ╱ ╲ ignore gossiped info
62/// ▕ ▏ otherwise, if never attempted:
63/// ╲ ╱ skip updates to existing fields
64/// V
65/// ┌───────────────────────────────┼───────────────────────────────┐
66/// │ AddressBook │ │
67/// │ disjoint `PeerAddrState`s ▼ │
68/// │ ┌─────────────┐ ┌─────────────────────────┐ ┌─────────────┐ │
69/// │ │ `Responded` │ │`NeverAttemptedGossiped` │ │ `Failed` │ │
70/// ┌┼▶│ Peers │ │ Peers │ │ Peers │◀┼┐
71/// ││ └─────────────┘ └─────────────────────────┘ └─────────────┘ ││
72/// ││ │ │ │ ││
73/// ││ #1 oldest_first #2 newest_first #3 oldest_first ││
74/// ││ ├──────────────────────┴──────────────────────┘ ││
75/// ││ ▼ ││
76/// ││ Λ ││
77/// ││ ╱ ╲ filter by ││
78/// ││ ▕ ▏ is_ready_for_connection_attempt ││
79/// ││ ╲ ╱ to remove recent `Responded`, ││
80/// ││ V `AttemptPending`, and `Failed` peers ││
81/// ││ │ ││
82/// ││ │ try outbound connection, ││
83/// ││ ▼ update last_attempt to now() ││
84/// ││┌────────────────┐ ││
85/// │││`AttemptPending`│ ││
86/// │││ Peers │ ││
87/// ││└────────────────┘ ││
88/// │└────────┼──────────────────────────────────────────────────────┘│
89/// │ ▼ │
90/// │ Λ │
91/// │ ╱ ╲ │
92/// │ ▕ ▏─────────────────────────────────────────────────────┘
93/// │ ╲ ╱ connection failed, update last_failure to now()
94/// │ V
95/// │ │
96/// │ │ connection succeeded
97/// │ ▼
98/// │ ┌────────────┐
99/// │ │ send │
100/// │ │peer::Client│
101/// │ │to Discover │
102/// │ └────────────┘
103/// │ │
104/// │ ▼
105/// │┌───────────────────────────────────────┐
106/// ││ when connection succeeds, and every │
107/// ││ time we receive a peer heartbeat: │
108/// └│ * update state to `Responded` │
109/// │ * update last_response to now() │
110/// └───────────────────────────────────────┘
111/// ```
112///
113/// [`Responded`]: crate::PeerAddrState::Responded
114/// [`Version`]: crate::protocol::external::types::Version
115/// [`NeverAttemptedGossiped`]: crate::PeerAddrState::NeverAttemptedGossiped
116/// [`Failed`]: crate::PeerAddrState::Failed
117/// [`AttemptPending`]: crate::PeerAddrState::AttemptPending
118// TODO:
119// * show all possible transitions between Attempt/Responded/Failed,
120// except Failed -> Responded is invalid, must go through Attempt
121//
122// Note: the CandidateSet can't be cloned, because there needs to be a single
123// instance of its timers, so that rate limits are enforced correctly.
124pub(crate) struct CandidateSet<S>
125where
126 S: Service<Request, Response = Response, Error = BoxError> + Send,
127 S::Future: Send + 'static,
128{
129 /// The outbound address book for this peer set.
130 ///
131 /// # Correctness
132 ///
133 /// The address book must be private, so all operations are performed on a blocking thread
134 /// (see #1976).
135 address_book: Arc<std::sync::Mutex<AddressBook>>,
136
137 /// The peer set used to crawl the network for peers.
138 peer_service: S,
139
140 /// A timer that enforces a rate-limit on new outbound connections.
141 min_next_handshake: Instant,
142
143 /// A timer that enforces a rate-limit on peer set requests for more peers.
144 min_next_crawl: Instant,
145}
146
147impl<S> std::fmt::Debug for CandidateSet<S>
148where
149 S: Service<Request, Response = Response, Error = BoxError> + Send,
150 S::Future: Send + 'static,
151{
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 f.debug_struct("CandidateSet")
154 .field("address_book", &self.address_book)
155 .field("peer_service", &type_name::<S>())
156 .field("min_next_handshake", &self.min_next_handshake)
157 .field("min_next_crawl", &self.min_next_crawl)
158 .finish()
159 }
160}
161
162impl<S> CandidateSet<S>
163where
164 S: Service<Request, Response = Response, Error = BoxError> + Send,
165 S::Future: Send + 'static,
166{
167 /// Uses `address_book` and `peer_service` to manage a [`CandidateSet`] of peers.
168 pub fn new(
169 address_book: Arc<std::sync::Mutex<AddressBook>>,
170 peer_service: S,
171 ) -> CandidateSet<S> {
172 CandidateSet {
173 address_book,
174 peer_service,
175 min_next_handshake: Instant::now(),
176 min_next_crawl: Instant::now(),
177 }
178 }
179
180 /// Update the peer set from the network, using the default fanout limit.
181 ///
182 /// See [`update_initial`][Self::update_initial] for details.
183 pub async fn update(&mut self) -> Result<Option<MorePeers>, BoxError> {
184 self.update_timeout(None).await
185 }
186
187 /// Update the peer set from the network, limiting the fanout to
188 /// `fanout_limit`.
189 ///
190 /// - Ask a few live [`Responded`] peers to send us more peers.
191 /// - Process all completed peer responses, adding new peers in the
192 /// [`NeverAttemptedGossiped`] state.
193 ///
194 /// Returns `Some(MorePeers)` if the crawl was successful and the crawler
195 /// should ask for more peers. Returns `None` if there are no new peers.
196 ///
197 /// ## Correctness
198 ///
199 /// Pass the initial peer set size as `fanout_limit` during initialization,
200 /// so that Zebra does not send duplicate requests to the same peer.
201 ///
202 /// The crawler exits when update returns an error, so it must only return
203 /// errors on permanent failures.
204 ///
205 /// The handshaker sets up the peer message receiver so it also sends a
206 /// [`Responded`] peer address update.
207 ///
208 /// [`next`][Self::next] puts peers into the [`AttemptPending`] state.
209 ///
210 /// ## Security
211 ///
212 /// This call is rate-limited to prevent sending a burst of repeated requests for new peer
213 /// addresses. Each call will only update the [`CandidateSet`] if more time
214 /// than [`MIN_PEER_GET_ADDR_INTERVAL`][constants::MIN_PEER_GET_ADDR_INTERVAL] has passed since
215 /// the last call. Otherwise, the update is skipped.
216 ///
217 /// [`Responded`]: crate::PeerAddrState::Responded
218 /// [`NeverAttemptedGossiped`]: crate::PeerAddrState::NeverAttemptedGossiped
219 /// [`Failed`]: crate::PeerAddrState::Failed
220 /// [`AttemptPending`]: crate::PeerAddrState::AttemptPending
221 pub async fn update_initial(
222 &mut self,
223 fanout_limit: usize,
224 ) -> Result<Option<MorePeers>, BoxError> {
225 self.update_timeout(Some(fanout_limit)).await
226 }
227
228 /// Update the peer set from the network, limiting the fanout to
229 /// `fanout_limit`, and imposing a timeout on the entire fanout.
230 ///
231 /// See [`update_initial`][Self::update_initial] for details.
232 async fn update_timeout(
233 &mut self,
234 fanout_limit: Option<usize>,
235 ) -> Result<Option<MorePeers>, BoxError> {
236 let mut more_peers = None;
237
238 // SECURITY
239 //
240 // Rate limit sending `GetAddr` messages to peers.
241 if self.min_next_crawl <= Instant::now() {
242 // CORRECTNESS
243 //
244 // Use a timeout to avoid deadlocks when there are no connected
245 // peers, and:
246 // - we're waiting on a handshake to complete so there are peers, or
247 // - another task that handles or adds peers is waiting on this task
248 // to complete.
249 if let Ok(fanout_result) = timeout(
250 constants::PEER_GET_ADDR_TIMEOUT,
251 self.update_fanout(fanout_limit),
252 )
253 .await
254 {
255 more_peers = fanout_result?;
256 } else {
257 // update must only return an error for permanent failures
258 info!("timeout waiting for peer service readiness or peer responses");
259 }
260
261 self.min_next_crawl = Instant::now() + constants::MIN_PEER_GET_ADDR_INTERVAL;
262 }
263
264 Ok(more_peers)
265 }
266
267 /// Update the peer set from the network, limiting the fanout to
268 /// `fanout_limit`.
269 ///
270 /// Opportunistically crawl the network on every update call to ensure
271 /// we're actively fetching peers. Continue independently of whether we
272 /// actually receive any peers, but always ask the network for more.
273 ///
274 /// Because requests are load-balanced across existing peers, we can make
275 /// multiple requests concurrently, which will be randomly assigned to
276 /// existing peers, but we don't make too many because update may be
277 /// called while the peer set is already loaded.
278 ///
279 /// See [`update_initial`][Self::update_initial] for more details.
280 ///
281 /// # Correctness
282 ///
283 /// This function does not have a timeout.
284 /// Use [`update_timeout`][Self::update_timeout] instead.
285 async fn update_fanout(
286 &mut self,
287 fanout_limit: Option<usize>,
288 ) -> Result<Option<MorePeers>, BoxError> {
289 let fanout_limit = fanout_limit
290 .map(|fanout_limit| min(fanout_limit, constants::GET_ADDR_FANOUT))
291 .unwrap_or(constants::GET_ADDR_FANOUT);
292 debug!(?fanout_limit, "sending GetPeers requests");
293
294 let mut responses = FuturesUnordered::new();
295 let mut more_peers = None;
296
297 // Launch requests
298 for attempt in 0..fanout_limit {
299 if attempt > 0 {
300 // Let other tasks run, so we're more likely to choose a different peer.
301 //
302 // TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
303 tokio::task::yield_now().await;
304 }
305
306 let peer_service = self.peer_service.ready().await?;
307 responses.push(peer_service.call(Request::Peers));
308 }
309
310 let mut address_book_updates = FuturesUnordered::new();
311
312 // Process responses
313 while let Some(rsp) = responses.next().await {
314 match rsp {
315 Ok(Response::Peers(addrs)) => {
316 trace!(
317 addr_count = ?addrs.len(),
318 ?addrs,
319 "got response to GetPeers"
320 );
321 let addrs = validate_addrs(addrs, DateTime32::now());
322 address_book_updates.push(self.send_addrs(addrs));
323 more_peers = Some(MorePeers);
324 }
325 Err(e) => {
326 // since we do a fanout, and new updates are triggered by
327 // each demand, we can ignore errors in individual responses
328 trace!(?e, "got error in GetPeers request");
329 }
330 Ok(_) => unreachable!("Peers requests always return Peers responses"),
331 }
332 }
333
334 // Wait until all the address book updates have finished
335 while let Some(()) = address_book_updates.next().await {}
336
337 Ok(more_peers)
338 }
339
340 /// Add new `addrs` to the address book.
341 async fn send_addrs(&self, addrs: impl IntoIterator<Item = MetaAddr>) {
342 // # Security
343 //
344 // New gossiped peers are rate-limited because:
345 // - Zebra initiates requests for new gossiped peers
346 // - the fanout is limited
347 // - the number of addresses per peer is limited
348 let addrs: Vec<MetaAddrChange> = addrs
349 .into_iter()
350 .map(MetaAddr::new_gossiped_change)
351 .map(|maybe_addr| maybe_addr.expect("Received gossiped peers always have services set"))
352 .collect();
353
354 debug!(count = ?addrs.len(), "sending gossiped addresses to the address book");
355
356 // Don't bother spawning a task if there are no addresses left.
357 if addrs.is_empty() {
358 return;
359 }
360
361 // # Correctness
362 //
363 // Spawn address book accesses on a blocking thread,
364 // to avoid deadlocks (see #1976).
365 //
366 // Extend handles duplicate addresses internally.
367 let address_book = self.address_book.clone();
368 let span = Span::current();
369 tokio::task::spawn_blocking(move || {
370 span.in_scope(|| address_book.lock().unwrap().extend(addrs))
371 })
372 .wait_for_panics()
373 .await
374 }
375
376 /// Returns the next candidate for a connection attempt, if any are available.
377 ///
378 /// Returns peers in reconnection order, based on
379 /// [`AddressBook::reconnection_peers`].
380 ///
381 /// Skips peers that have recently been active, attempted, or failed.
382 ///
383 /// ## Correctness
384 ///
385 /// `AttemptPending` peers will become [`Responded`] if they respond, or
386 /// become `Failed` if they time out or provide a bad response.
387 ///
388 /// Live [`Responded`] peers will stay live if they keep responding, or
389 /// become a reconnection candidate if they stop responding.
390 ///
391 /// ## Security
392 ///
393 /// Zebra resists distributed denial of service attacks by making sure that
394 /// new peer connections are initiated at least
395 /// [`MIN_OUTBOUND_PEER_CONNECTION_INTERVAL`][constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL]
396 /// apart. If `next()` has recently provided a peer, then its future will sleep
397 /// until the rate-limit has passed.
398 ///
399 /// [`Responded`]: crate::PeerAddrState::Responded
400 pub async fn next(&mut self) -> Option<MetaAddr> {
401 // Correctness: To avoid hangs, computation in the critical section should be kept to a minimum.
402 let address_book = self.address_book.clone();
403 let next_peer = move || -> Option<MetaAddr> {
404 let mut guard = address_book.lock().unwrap();
405
406 // Now we have the lock, get the current time
407 let instant_now = std::time::Instant::now();
408 let chrono_now = Utc::now();
409
410 // It's okay to return without sleeping here, because we're returning
411 // `None`. We only need to sleep before yielding an address.
412 let next_peer = guard.reconnection_peers(instant_now, chrono_now).next()?;
413
414 // TODO: only mark the peer as AttemptPending when it is actually used (#1976)
415 //
416 // If the future is dropped before `next` returns, the peer will be marked as AttemptPending,
417 // even if its address is not actually used for a connection.
418 //
419 // We could send a reconnect change to the AddressBookUpdater when the peer is actually used,
420 // but channel order is not guaranteed, so we could accidentally re-use the same peer.
421 let next_peer = MetaAddr::new_reconnect(next_peer.addr);
422 guard.update(next_peer)
423 };
424
425 // Correctness: Spawn address book accesses on a blocking thread, to avoid deadlocks (see #1976).
426 let span = Span::current();
427 let next_peer = tokio::task::spawn_blocking(move || span.in_scope(next_peer))
428 .wait_for_panics()
429 .await?;
430
431 // Security: rate-limit new outbound peer connections
432 sleep_until(self.min_next_handshake).await;
433 self.min_next_handshake = Instant::now() + constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL;
434
435 Some(next_peer)
436 }
437
438 /// Returns the address book for this `CandidateSet`.
439 #[cfg(any(test, feature = "proptest-impl"))]
440 #[allow(dead_code)]
441 pub async fn address_book(&self) -> Arc<std::sync::Mutex<AddressBook>> {
442 self.address_book.clone()
443 }
444}
445
446/// Check new `addrs` before adding them to the address book.
447///
448/// `last_seen_limit` is the maximum permitted last seen time, typically
449/// [`Utc::now`].
450///
451/// If the data in an address is invalid, this function can:
452/// - modify the address data, or
453/// - delete the address.
454///
455/// # Security
456///
457/// Adjusts untrusted last seen times so they are not in the future. This stops
458/// malicious peers keeping all their addresses at the front of the connection
459/// queue. Honest peers with future clock skew also get adjusted.
460///
461/// Rejects all addresses if any calculated times overflow or underflow.
462fn validate_addrs(
463 addrs: impl IntoIterator<Item = MetaAddr>,
464 last_seen_limit: DateTime32,
465) -> impl Iterator<Item = MetaAddr> {
466 // Note: The address book handles duplicate addresses internally,
467 // so we don't need to de-duplicate addresses here.
468
469 // TODO:
470 // We should eventually implement these checks in this function:
471 // - Zebra should ignore peers that are older than 3 weeks (part of #1865)
472 // - Zebra should count back 3 weeks from the newest peer timestamp sent
473 // by the other peer, to compensate for clock skew
474
475 let mut addrs: Vec<_> = addrs.into_iter().collect();
476
477 limit_last_seen_times(&mut addrs, last_seen_limit);
478
479 addrs.into_iter()
480}
481
482/// Ensure all reported `last_seen` times are less than or equal to `last_seen_limit`.
483///
484/// This will consider all addresses as invalid if trying to offset their
485/// `last_seen` times to be before the limit causes an underflow.
486fn limit_last_seen_times(addrs: &mut Vec<MetaAddr>, last_seen_limit: DateTime32) {
487 let last_seen_times = addrs.iter().map(|meta_addr| {
488 meta_addr
489 .untrusted_last_seen()
490 .expect("unexpected missing last seen: should be provided by deserialization")
491 });
492 let oldest_seen = last_seen_times.clone().min().unwrap_or(DateTime32::MIN);
493 let newest_seen = last_seen_times.max().unwrap_or(DateTime32::MAX);
494
495 // If any time is in the future, adjust all times, to compensate for clock skew on honest peers
496 if newest_seen > last_seen_limit {
497 let offset = newest_seen
498 .checked_duration_since(last_seen_limit)
499 .expect("unexpected underflow: just checked newest_seen is greater");
500
501 // Check for underflow
502 if oldest_seen.checked_sub(offset).is_some() {
503 // No underflow is possible, so apply offset to all addresses
504 for addr in addrs {
505 let last_seen = addr
506 .untrusted_last_seen()
507 .expect("unexpected missing last seen: should be provided by deserialization");
508 let last_seen = last_seen
509 .checked_sub(offset)
510 .expect("unexpected underflow: just checked oldest_seen");
511
512 addr.set_untrusted_last_seen(last_seen);
513 }
514 } else {
515 // An underflow will occur, so reject all gossiped peers
516 addrs.clear();
517 }
518 }
519}