1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
//! Candidate peer selection for outbound connections using the [`CandidateSet`].
use std::{any::type_name, cmp::min, sync::Arc};
use chrono::Utc;
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::time::{sleep_until, timeout, Instant};
use tower::{Service, ServiceExt};
use tracing::Span;
use zebra_chain::{diagnostic::task::WaitForPanics, serialization::DateTime32};
use crate::{
constants, meta_addr::MetaAddrChange, peer_set::set::MorePeers, types::MetaAddr, AddressBook,
BoxError, Request, Response,
};
#[cfg(test)]
mod tests;
/// The [`CandidateSet`] manages outbound peer connection attempts. Successful
/// connections become peers in the [`PeerSet`](super::set::PeerSet).
///
/// The candidate set divides the set of all possible outbound peers into
/// disjoint subsets, using the [`PeerAddrState`](crate::PeerAddrState):
///
/// 1. [`Responded`] peers, which we have had an outbound connection to.
/// 2. [`NeverAttemptedGossiped`] peers, which we learned about from other peers
/// but have never connected to. This includes gossiped peers, DNS seeder peers,
/// cached peers, canonical addresses from the [`Version`] messages of inbound
/// and outbound connections, and remote IP addresses of inbound connections.
/// 3. [`Failed`] peers, which failed a connection attempt, or had an error
/// during an outbound connection.
/// 4. [`AttemptPending`] peers, which we've recently queued for a connection.
///
/// Never attempted peers are always available for connection.
///
/// If a peer's attempted, responded, or failure time is recent
/// (within the liveness limit), we avoid reconnecting to it.
/// Otherwise, we assume that it has disconnected or hung,
/// and attempt reconnection.
///
/// ```ascii,no_run
/// ┌──────────────────┐
/// │ Config / DNS │
/// ┌───────────│ Seed │───────────┐
/// │ │ Addresses │ │
/// │ └──────────────────┘ │
/// │ │ untrusted_last_seen │
/// │ │ is unknown │
/// ▼ │ ▼
/// ┌──────────────────┐ │ ┌──────────────────┐
/// │ Handshake │ │ │ Peer Set │
/// │ Canonical │──────────┼──────────│ Gossiped │
/// │ Addresses │ │ │ Addresses │
/// └──────────────────┘ │ └──────────────────┘
/// untrusted_last_seen │ provides
/// set to now │ untrusted_last_seen
/// ▼
/// Λ if attempted, responded, or failed:
/// ╱ ╲ ignore gossiped info
/// ▕ ▏ otherwise, if never attempted:
/// ╲ ╱ skip updates to existing fields
/// V
/// ┌───────────────────────────────┼───────────────────────────────┐
/// │ AddressBook │ │
/// │ disjoint `PeerAddrState`s ▼ │
/// │ ┌─────────────┐ ┌─────────────────────────┐ ┌─────────────┐ │
/// │ │ `Responded` │ │`NeverAttemptedGossiped` │ │ `Failed` │ │
/// ┌┼▶│ Peers │ │ Peers │ │ Peers │◀┼┐
/// ││ └─────────────┘ └─────────────────────────┘ └─────────────┘ ││
/// ││ │ │ │ ││
/// ││ #1 oldest_first #2 newest_first #3 oldest_first ││
/// ││ ├──────────────────────┴──────────────────────┘ ││
/// ││ ▼ ││
/// ││ Λ ││
/// ││ ╱ ╲ filter by ││
/// ││ ▕ ▏ is_ready_for_connection_attempt ││
/// ││ ╲ ╱ to remove recent `Responded`, ││
/// ││ V `AttemptPending`, and `Failed` peers ││
/// ││ │ ││
/// ││ │ try outbound connection, ││
/// ││ ▼ update last_attempt to now() ││
/// ││┌────────────────┐ ││
/// │││`AttemptPending`│ ││
/// │││ Peers │ ││
/// ││└────────────────┘ ││
/// │└────────┼──────────────────────────────────────────────────────┘│
/// │ ▼ │
/// │ Λ │
/// │ ╱ ╲ │
/// │ ▕ ▏─────────────────────────────────────────────────────┘
/// │ ╲ ╱ connection failed, update last_failure to now()
/// │ V
/// │ │
/// │ │ connection succeeded
/// │ ▼
/// │ ┌────────────┐
/// │ │ send │
/// │ │peer::Client│
/// │ │to Discover │
/// │ └────────────┘
/// │ │
/// │ ▼
/// │┌───────────────────────────────────────┐
/// ││ when connection succeeds, and every │
/// ││ time we receive a peer heartbeat: │
/// └│ * update state to `Responded` │
/// │ * update last_response to now() │
/// └───────────────────────────────────────┘
/// ```
///
/// [`Responded`]: crate::PeerAddrState::Responded
/// [`Version`]: crate::protocol::external::types::Version
/// [`NeverAttemptedGossiped`]: crate::PeerAddrState::NeverAttemptedGossiped
/// [`Failed`]: crate::PeerAddrState::Failed
/// [`AttemptPending`]: crate::PeerAddrState::AttemptPending
// TODO:
// * show all possible transitions between Attempt/Responded/Failed,
// except Failed -> Responded is invalid, must go through Attempt
//
// Note: the CandidateSet can't be cloned, because there needs to be a single
// instance of its timers, so that rate limits are enforced correctly.
pub(crate) struct CandidateSet<S>
where
S: Service<Request, Response = Response, Error = BoxError> + Send,
S::Future: Send + 'static,
{
/// The outbound address book for this peer set.
///
/// # Correctness
///
/// The address book must be private, so all operations are performed on a blocking thread
/// (see #1976).
address_book: Arc<std::sync::Mutex<AddressBook>>,
/// The peer set used to crawl the network for peers.
peer_service: S,
/// A timer that enforces a rate-limit on new outbound connections.
min_next_handshake: Instant,
/// A timer that enforces a rate-limit on peer set requests for more peers.
min_next_crawl: Instant,
}
impl<S> std::fmt::Debug for CandidateSet<S>
where
S: Service<Request, Response = Response, Error = BoxError> + Send,
S::Future: Send + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CandidateSet")
.field("address_book", &self.address_book)
.field("peer_service", &type_name::<S>())
.field("min_next_handshake", &self.min_next_handshake)
.field("min_next_crawl", &self.min_next_crawl)
.finish()
}
}
impl<S> CandidateSet<S>
where
S: Service<Request, Response = Response, Error = BoxError> + Send,
S::Future: Send + 'static,
{
/// Uses `address_book` and `peer_service` to manage a [`CandidateSet`] of peers.
pub fn new(
address_book: Arc<std::sync::Mutex<AddressBook>>,
peer_service: S,
) -> CandidateSet<S> {
CandidateSet {
address_book,
peer_service,
min_next_handshake: Instant::now(),
min_next_crawl: Instant::now(),
}
}
/// Update the peer set from the network, using the default fanout limit.
///
/// See [`update_initial`][Self::update_initial] for details.
pub async fn update(&mut self) -> Result<Option<MorePeers>, BoxError> {
self.update_timeout(None).await
}
/// Update the peer set from the network, limiting the fanout to
/// `fanout_limit`.
///
/// - Ask a few live [`Responded`] peers to send us more peers.
/// - Process all completed peer responses, adding new peers in the
/// [`NeverAttemptedGossiped`] state.
///
/// Returns `Some(MorePeers)` if the crawl was successful and the crawler
/// should ask for more peers. Returns `None` if there are no new peers.
///
/// ## Correctness
///
/// Pass the initial peer set size as `fanout_limit` during initialization,
/// so that Zebra does not send duplicate requests to the same peer.
///
/// The crawler exits when update returns an error, so it must only return
/// errors on permanent failures.
///
/// The handshaker sets up the peer message receiver so it also sends a
/// [`Responded`] peer address update.
///
/// [`next`][Self::next] puts peers into the [`AttemptPending`] state.
///
/// ## Security
///
/// This call is rate-limited to prevent sending a burst of repeated requests for new peer
/// addresses. Each call will only update the [`CandidateSet`] if more time
/// than [`MIN_PEER_GET_ADDR_INTERVAL`][constants::MIN_PEER_GET_ADDR_INTERVAL] has passed since
/// the last call. Otherwise, the update is skipped.
///
/// [`Responded`]: crate::PeerAddrState::Responded
/// [`NeverAttemptedGossiped`]: crate::PeerAddrState::NeverAttemptedGossiped
/// [`Failed`]: crate::PeerAddrState::Failed
/// [`AttemptPending`]: crate::PeerAddrState::AttemptPending
pub async fn update_initial(
&mut self,
fanout_limit: usize,
) -> Result<Option<MorePeers>, BoxError> {
self.update_timeout(Some(fanout_limit)).await
}
/// Update the peer set from the network, limiting the fanout to
/// `fanout_limit`, and imposing a timeout on the entire fanout.
///
/// See [`update_initial`][Self::update_initial] for details.
async fn update_timeout(
&mut self,
fanout_limit: Option<usize>,
) -> Result<Option<MorePeers>, BoxError> {
let mut more_peers = None;
// SECURITY
//
// Rate limit sending `GetAddr` messages to peers.
if self.min_next_crawl <= Instant::now() {
// CORRECTNESS
//
// Use a timeout to avoid deadlocks when there are no connected
// peers, and:
// - we're waiting on a handshake to complete so there are peers, or
// - another task that handles or adds peers is waiting on this task
// to complete.
if let Ok(fanout_result) = timeout(
constants::PEER_GET_ADDR_TIMEOUT,
self.update_fanout(fanout_limit),
)
.await
{
more_peers = fanout_result?;
} else {
// update must only return an error for permanent failures
info!("timeout waiting for peer service readiness or peer responses");
}
self.min_next_crawl = Instant::now() + constants::MIN_PEER_GET_ADDR_INTERVAL;
}
Ok(more_peers)
}
/// Update the peer set from the network, limiting the fanout to
/// `fanout_limit`.
///
/// Opportunistically crawl the network on every update call to ensure
/// we're actively fetching peers. Continue independently of whether we
/// actually receive any peers, but always ask the network for more.
///
/// Because requests are load-balanced across existing peers, we can make
/// multiple requests concurrently, which will be randomly assigned to
/// existing peers, but we don't make too many because update may be
/// called while the peer set is already loaded.
///
/// See [`update_initial`][Self::update_initial] for more details.
///
/// # Correctness
///
/// This function does not have a timeout.
/// Use [`update_timeout`][Self::update_timeout] instead.
async fn update_fanout(
&mut self,
fanout_limit: Option<usize>,
) -> Result<Option<MorePeers>, BoxError> {
let fanout_limit = fanout_limit
.map(|fanout_limit| min(fanout_limit, constants::GET_ADDR_FANOUT))
.unwrap_or(constants::GET_ADDR_FANOUT);
debug!(?fanout_limit, "sending GetPeers requests");
let mut responses = FuturesUnordered::new();
let mut more_peers = None;
// Launch requests
for attempt in 0..fanout_limit {
if attempt > 0 {
// Let other tasks run, so we're more likely to choose a different peer.
//
// TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
tokio::task::yield_now().await;
}
let peer_service = self.peer_service.ready().await?;
responses.push(peer_service.call(Request::Peers));
}
let mut address_book_updates = FuturesUnordered::new();
// Process responses
while let Some(rsp) = responses.next().await {
match rsp {
Ok(Response::Peers(addrs)) => {
trace!(
addr_count = ?addrs.len(),
?addrs,
"got response to GetPeers"
);
let addrs = validate_addrs(addrs, DateTime32::now());
address_book_updates.push(self.send_addrs(addrs));
more_peers = Some(MorePeers);
}
Err(e) => {
// since we do a fanout, and new updates are triggered by
// each demand, we can ignore errors in individual responses
trace!(?e, "got error in GetPeers request");
}
Ok(_) => unreachable!("Peers requests always return Peers responses"),
}
}
// Wait until all the address book updates have finished
while let Some(()) = address_book_updates.next().await {}
Ok(more_peers)
}
/// Add new `addrs` to the address book.
async fn send_addrs(&self, addrs: impl IntoIterator<Item = MetaAddr>) {
// # Security
//
// New gossiped peers are rate-limited because:
// - Zebra initiates requests for new gossiped peers
// - the fanout is limited
// - the number of addresses per peer is limited
let addrs: Vec<MetaAddrChange> = addrs
.into_iter()
.map(MetaAddr::new_gossiped_change)
.map(|maybe_addr| maybe_addr.expect("Received gossiped peers always have services set"))
.collect();
debug!(count = ?addrs.len(), "sending gossiped addresses to the address book");
// Don't bother spawning a task if there are no addresses left.
if addrs.is_empty() {
return;
}
// # Correctness
//
// Spawn address book accesses on a blocking thread,
// to avoid deadlocks (see #1976).
//
// Extend handles duplicate addresses internally.
let address_book = self.address_book.clone();
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(|| address_book.lock().unwrap().extend(addrs))
})
.wait_for_panics()
.await
}
/// Returns the next candidate for a connection attempt, if any are available.
///
/// Returns peers in reconnection order, based on
/// [`AddressBook::reconnection_peers`].
///
/// Skips peers that have recently been active, attempted, or failed.
///
/// ## Correctness
///
/// `AttemptPending` peers will become [`Responded`] if they respond, or
/// become `Failed` if they time out or provide a bad response.
///
/// Live [`Responded`] peers will stay live if they keep responding, or
/// become a reconnection candidate if they stop responding.
///
/// ## Security
///
/// Zebra resists distributed denial of service attacks by making sure that
/// new peer connections are initiated at least
/// [`MIN_OUTBOUND_PEER_CONNECTION_INTERVAL`][constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL]
/// apart. If `next()` has recently provided a peer, then its future will sleep
/// until the rate-limit has passed.
///
/// [`Responded`]: crate::PeerAddrState::Responded
pub async fn next(&mut self) -> Option<MetaAddr> {
// Correctness: To avoid hangs, computation in the critical section should be kept to a minimum.
let address_book = self.address_book.clone();
let next_peer = move || -> Option<MetaAddr> {
let mut guard = address_book.lock().unwrap();
// Now we have the lock, get the current time
let instant_now = std::time::Instant::now();
let chrono_now = Utc::now();
// It's okay to return without sleeping here, because we're returning
// `None`. We only need to sleep before yielding an address.
let next_peer = guard.reconnection_peers(instant_now, chrono_now).next()?;
// TODO: only mark the peer as AttemptPending when it is actually used (#1976)
//
// If the future is dropped before `next` returns, the peer will be marked as AttemptPending,
// even if its address is not actually used for a connection.
//
// We could send a reconnect change to the AddressBookUpdater when the peer is actually used,
// but channel order is not guaranteed, so we could accidentally re-use the same peer.
let next_peer = MetaAddr::new_reconnect(next_peer.addr);
guard.update(next_peer)
};
// Correctness: Spawn address book accesses on a blocking thread, to avoid deadlocks (see #1976).
let span = Span::current();
let next_peer = tokio::task::spawn_blocking(move || span.in_scope(next_peer))
.wait_for_panics()
.await?;
// Security: rate-limit new outbound peer connections
sleep_until(self.min_next_handshake).await;
self.min_next_handshake = Instant::now() + constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL;
Some(next_peer)
}
/// Returns the address book for this `CandidateSet`.
#[cfg(any(test, feature = "proptest-impl"))]
#[allow(dead_code)]
pub async fn address_book(&self) -> Arc<std::sync::Mutex<AddressBook>> {
self.address_book.clone()
}
}
/// Check new `addrs` before adding them to the address book.
///
/// `last_seen_limit` is the maximum permitted last seen time, typically
/// [`Utc::now`].
///
/// If the data in an address is invalid, this function can:
/// - modify the address data, or
/// - delete the address.
///
/// # Security
///
/// Adjusts untrusted last seen times so they are not in the future. This stops
/// malicious peers keeping all their addresses at the front of the connection
/// queue. Honest peers with future clock skew also get adjusted.
///
/// Rejects all addresses if any calculated times overflow or underflow.
fn validate_addrs(
addrs: impl IntoIterator<Item = MetaAddr>,
last_seen_limit: DateTime32,
) -> impl Iterator<Item = MetaAddr> {
// Note: The address book handles duplicate addresses internally,
// so we don't need to de-duplicate addresses here.
// TODO:
// We should eventually implement these checks in this function:
// - Zebra should ignore peers that are older than 3 weeks (part of #1865)
// - Zebra should count back 3 weeks from the newest peer timestamp sent
// by the other peer, to compensate for clock skew
let mut addrs: Vec<_> = addrs.into_iter().collect();
limit_last_seen_times(&mut addrs, last_seen_limit);
addrs.into_iter()
}
/// Ensure all reported `last_seen` times are less than or equal to `last_seen_limit`.
///
/// This will consider all addresses as invalid if trying to offset their
/// `last_seen` times to be before the limit causes an underflow.
fn limit_last_seen_times(addrs: &mut Vec<MetaAddr>, last_seen_limit: DateTime32) {
let last_seen_times = addrs.iter().map(|meta_addr| {
meta_addr
.untrusted_last_seen()
.expect("unexpected missing last seen: should be provided by deserialization")
});
let oldest_seen = last_seen_times.clone().min().unwrap_or(DateTime32::MIN);
let newest_seen = last_seen_times.max().unwrap_or(DateTime32::MAX);
// If any time is in the future, adjust all times, to compensate for clock skew on honest peers
if newest_seen > last_seen_limit {
let offset = newest_seen
.checked_duration_since(last_seen_limit)
.expect("unexpected underflow: just checked newest_seen is greater");
// Check for underflow
if oldest_seen.checked_sub(offset).is_some() {
// No underflow is possible, so apply offset to all addresses
for addr in addrs {
let last_seen = addr
.untrusted_last_seen()
.expect("unexpected missing last seen: should be provided by deserialization");
let last_seen = last_seen
.checked_sub(offset)
.expect("unexpected underflow: just checked oldest_seen");
addr.set_untrusted_last_seen(last_seen);
}
} else {
// An underflow will occur, so reject all gossiped peers
addrs.clear();
}
}
}