1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
//! Candidate peer selection for outbound connections using the [`CandidateSet`].

use std::{any::type_name, cmp::min, sync::Arc};

use chrono::Utc;
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::time::{sleep_until, timeout, Instant};
use tower::{Service, ServiceExt};
use tracing::Span;

use zebra_chain::{diagnostic::task::WaitForPanics, serialization::DateTime32};

use crate::{
    constants, meta_addr::MetaAddrChange, peer_set::set::MorePeers, types::MetaAddr, AddressBook,
    BoxError, Request, Response,
};

#[cfg(test)]
mod tests;

/// The [`CandidateSet`] manages outbound peer connection attempts. Successful
/// connections become peers in the [`PeerSet`](super::set::PeerSet).
///
/// The candidate set divides the set of all possible outbound peers into
/// disjoint subsets, using the [`PeerAddrState`](crate::PeerAddrState):
///
/// 1. [`Responded`] peers, which we have had an outbound connection to.
/// 2. [`NeverAttemptedGossiped`] peers, which we learned about from other peers
///    but have never connected to. This includes gossiped peers, DNS seeder peers,
///    cached peers, canonical addresses from the [`Version`] messages of inbound
///    and outbound connections, and remote IP addresses of inbound connections.
/// 3. [`Failed`] peers, which failed a connection attempt, or had an error
///    during an outbound connection.
/// 4. [`AttemptPending`] peers, which we've recently queued for a connection.
///
/// Never attempted peers are always available for connection.
///
/// If a peer's attempted, responded, or failure time is recent
/// (within the liveness limit), we avoid reconnecting to it.
/// Otherwise, we assume that it has disconnected or hung,
/// and attempt reconnection.
///
/// ```ascii,no_run
///                         ┌──────────────────┐
///                         │   Config / DNS   │
///             ┌───────────│       Seed       │───────────┐
///             │           │    Addresses     │           │
///             │           └──────────────────┘           │
///             │                    │ untrusted_last_seen │
///             │                    │     is unknown      │
///             ▼                    │                     ▼
///    ┌──────────────────┐          │          ┌──────────────────┐
///    │    Handshake     │          │          │     Peer Set     │
///    │    Canonical     │──────────┼──────────│     Gossiped     │
///    │    Addresses     │          │          │    Addresses     │
///    └──────────────────┘          │          └──────────────────┘
///     untrusted_last_seen          │                provides
///         set to now               │           untrusted_last_seen
///                                  ▼
///                                  Λ   if attempted, responded, or failed:
///                                 ╱ ╲         ignore gossiped info
///                                ▕   ▏    otherwise, if never attempted:
///                                 ╲ ╱    skip updates to existing fields
///                                  V
///  ┌───────────────────────────────┼───────────────────────────────┐
///  │ AddressBook                   │                               │
///  │ disjoint `PeerAddrState`s     ▼                               │
///  │ ┌─────────────┐  ┌─────────────────────────┐  ┌─────────────┐ │
///  │ │ `Responded` │  │`NeverAttemptedGossiped` │  │  `Failed`   │ │
/// ┌┼▶│    Peers    │  │          Peers          │  │   Peers     │◀┼┐
/// ││ └─────────────┘  └─────────────────────────┘  └─────────────┘ ││
/// ││        │                      │                      │        ││
/// ││ #1 oldest_first        #2 newest_first        #3 oldest_first ││
/// ││        ├──────────────────────┴──────────────────────┘        ││
/// ││        ▼                                                      ││
/// ││        Λ                                                      ││
/// ││       ╱ ╲              filter by                              ││
/// ││      ▕   ▏   is_ready_for_connection_attempt                  ││
/// ││       ╲ ╱     to remove recent `Responded`,                   ││
/// ││        V  `AttemptPending`, and `Failed` peers                ││
/// ││        │                                                      ││
/// ││        │    try outbound connection,                          ││
/// ││        ▼  update last_attempt to now()                        ││
/// ││┌────────────────┐                                             ││
/// │││`AttemptPending`│                                             ││
/// │││     Peers      │                                             ││
/// ││└────────────────┘                                             ││
/// │└────────┼──────────────────────────────────────────────────────┘│
/// │         ▼                                                       │
/// │         Λ                                                       │
/// │        ╱ ╲                                                      │
/// │       ▕   ▏─────────────────────────────────────────────────────┘
/// │        ╲ ╱   connection failed, update last_failure to now()
/// │         V
/// │         │
/// │         │ connection succeeded
/// │         ▼
/// │  ┌────────────┐
/// │  │    send    │
/// │  │peer::Client│
/// │  │to Discover │
/// │  └────────────┘
/// │         │
/// │         ▼
/// │┌───────────────────────────────────────┐
/// ││ when connection succeeds, and every   │
/// ││  time we receive a peer heartbeat:    │
/// └│  * update state to `Responded`        │
///  │  * update last_response to now()      │
///  └───────────────────────────────────────┘
/// ```
///
/// [`Responded`]: crate::PeerAddrState::Responded
/// [`Version`]: crate::protocol::external::types::Version
/// [`NeverAttemptedGossiped`]: crate::PeerAddrState::NeverAttemptedGossiped
/// [`Failed`]: crate::PeerAddrState::Failed
/// [`AttemptPending`]: crate::PeerAddrState::AttemptPending
// TODO:
//   * show all possible transitions between Attempt/Responded/Failed,
//     except Failed -> Responded is invalid, must go through Attempt
//
// Note: the CandidateSet can't be cloned, because there needs to be a single
// instance of its timers, so that rate limits are enforced correctly.
pub(crate) struct CandidateSet<S>
where
    S: Service<Request, Response = Response, Error = BoxError> + Send,
    S::Future: Send + 'static,
{
    /// The outbound address book for this peer set.
    ///
    /// # Correctness
    ///
    /// The address book must be private, so all operations are performed on a blocking thread
    /// (see #1976).
    address_book: Arc<std::sync::Mutex<AddressBook>>,

    /// The peer set used to crawl the network for peers.
    peer_service: S,

    /// A timer that enforces a rate-limit on new outbound connections.
    min_next_handshake: Instant,

    /// A timer that enforces a rate-limit on peer set requests for more peers.
    min_next_crawl: Instant,
}

impl<S> std::fmt::Debug for CandidateSet<S>
where
    S: Service<Request, Response = Response, Error = BoxError> + Send,
    S::Future: Send + 'static,
{
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("CandidateSet")
            .field("address_book", &self.address_book)
            .field("peer_service", &type_name::<S>())
            .field("min_next_handshake", &self.min_next_handshake)
            .field("min_next_crawl", &self.min_next_crawl)
            .finish()
    }
}

impl<S> CandidateSet<S>
where
    S: Service<Request, Response = Response, Error = BoxError> + Send,
    S::Future: Send + 'static,
{
    /// Uses `address_book` and `peer_service` to manage a [`CandidateSet`] of peers.
    pub fn new(
        address_book: Arc<std::sync::Mutex<AddressBook>>,
        peer_service: S,
    ) -> CandidateSet<S> {
        CandidateSet {
            address_book,
            peer_service,
            min_next_handshake: Instant::now(),
            min_next_crawl: Instant::now(),
        }
    }

    /// Update the peer set from the network, using the default fanout limit.
    ///
    /// See [`update_initial`][Self::update_initial] for details.
    pub async fn update(&mut self) -> Result<Option<MorePeers>, BoxError> {
        self.update_timeout(None).await
    }

    /// Update the peer set from the network, limiting the fanout to
    /// `fanout_limit`.
    ///
    /// - Ask a few live [`Responded`] peers to send us more peers.
    /// - Process all completed peer responses, adding new peers in the
    ///   [`NeverAttemptedGossiped`] state.
    ///
    /// Returns `Some(MorePeers)` if the crawl was successful and the crawler
    /// should ask for more peers. Returns `None` if there are no new peers.
    ///
    /// ## Correctness
    ///
    /// Pass the initial peer set size as `fanout_limit` during initialization,
    /// so that Zebra does not send duplicate requests to the same peer.
    ///
    /// The crawler exits when update returns an error, so it must only return
    /// errors on permanent failures.
    ///
    /// The handshaker sets up the peer message receiver so it also sends a
    /// [`Responded`] peer address update.
    ///
    /// [`next`][Self::next] puts peers into the [`AttemptPending`] state.
    ///
    /// ## Security
    ///
    /// This call is rate-limited to prevent sending a burst of repeated requests for new peer
    /// addresses. Each call will only update the [`CandidateSet`] if more time
    /// than [`MIN_PEER_GET_ADDR_INTERVAL`][constants::MIN_PEER_GET_ADDR_INTERVAL] has passed since
    /// the last call. Otherwise, the update is skipped.
    ///
    /// [`Responded`]: crate::PeerAddrState::Responded
    /// [`NeverAttemptedGossiped`]: crate::PeerAddrState::NeverAttemptedGossiped
    /// [`Failed`]: crate::PeerAddrState::Failed
    /// [`AttemptPending`]: crate::PeerAddrState::AttemptPending
    pub async fn update_initial(
        &mut self,
        fanout_limit: usize,
    ) -> Result<Option<MorePeers>, BoxError> {
        self.update_timeout(Some(fanout_limit)).await
    }

    /// Update the peer set from the network, limiting the fanout to
    /// `fanout_limit`, and imposing a timeout on the entire fanout.
    ///
    /// See [`update_initial`][Self::update_initial] for details.
    async fn update_timeout(
        &mut self,
        fanout_limit: Option<usize>,
    ) -> Result<Option<MorePeers>, BoxError> {
        let mut more_peers = None;

        // SECURITY
        //
        // Rate limit sending `GetAddr` messages to peers.
        if self.min_next_crawl <= Instant::now() {
            // CORRECTNESS
            //
            // Use a timeout to avoid deadlocks when there are no connected
            // peers, and:
            // - we're waiting on a handshake to complete so there are peers, or
            // - another task that handles or adds peers is waiting on this task
            //   to complete.
            if let Ok(fanout_result) = timeout(
                constants::PEER_GET_ADDR_TIMEOUT,
                self.update_fanout(fanout_limit),
            )
            .await
            {
                more_peers = fanout_result?;
            } else {
                // update must only return an error for permanent failures
                info!("timeout waiting for peer service readiness or peer responses");
            }

            self.min_next_crawl = Instant::now() + constants::MIN_PEER_GET_ADDR_INTERVAL;
        }

        Ok(more_peers)
    }

    /// Update the peer set from the network, limiting the fanout to
    /// `fanout_limit`.
    ///
    /// Opportunistically crawl the network on every update call to ensure
    /// we're actively fetching peers. Continue independently of whether we
    /// actually receive any peers, but always ask the network for more.
    ///
    /// Because requests are load-balanced across existing peers, we can make
    /// multiple requests concurrently, which will be randomly assigned to
    /// existing peers, but we don't make too many because update may be
    /// called while the peer set is already loaded.
    ///
    /// See [`update_initial`][Self::update_initial] for more details.
    ///
    /// # Correctness
    ///
    /// This function does not have a timeout.
    /// Use [`update_timeout`][Self::update_timeout] instead.
    async fn update_fanout(
        &mut self,
        fanout_limit: Option<usize>,
    ) -> Result<Option<MorePeers>, BoxError> {
        let fanout_limit = fanout_limit
            .map(|fanout_limit| min(fanout_limit, constants::GET_ADDR_FANOUT))
            .unwrap_or(constants::GET_ADDR_FANOUT);
        debug!(?fanout_limit, "sending GetPeers requests");

        let mut responses = FuturesUnordered::new();
        let mut more_peers = None;

        // Launch requests
        for attempt in 0..fanout_limit {
            if attempt > 0 {
                // Let other tasks run, so we're more likely to choose a different peer.
                //
                // TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
                tokio::task::yield_now().await;
            }

            let peer_service = self.peer_service.ready().await?;
            responses.push(peer_service.call(Request::Peers));
        }

        let mut address_book_updates = FuturesUnordered::new();

        // Process responses
        while let Some(rsp) = responses.next().await {
            match rsp {
                Ok(Response::Peers(addrs)) => {
                    trace!(
                        addr_count = ?addrs.len(),
                        ?addrs,
                        "got response to GetPeers"
                    );
                    let addrs = validate_addrs(addrs, DateTime32::now());
                    address_book_updates.push(self.send_addrs(addrs));
                    more_peers = Some(MorePeers);
                }
                Err(e) => {
                    // since we do a fanout, and new updates are triggered by
                    // each demand, we can ignore errors in individual responses
                    trace!(?e, "got error in GetPeers request");
                }
                Ok(_) => unreachable!("Peers requests always return Peers responses"),
            }
        }

        // Wait until all the address book updates have finished
        while let Some(()) = address_book_updates.next().await {}

        Ok(more_peers)
    }

    /// Add new `addrs` to the address book.
    async fn send_addrs(&self, addrs: impl IntoIterator<Item = MetaAddr>) {
        // # Security
        //
        // New gossiped peers are rate-limited because:
        // - Zebra initiates requests for new gossiped peers
        // - the fanout is limited
        // - the number of addresses per peer is limited
        let addrs: Vec<MetaAddrChange> = addrs
            .into_iter()
            .map(MetaAddr::new_gossiped_change)
            .map(|maybe_addr| maybe_addr.expect("Received gossiped peers always have services set"))
            .collect();

        debug!(count = ?addrs.len(), "sending gossiped addresses to the address book");

        // Don't bother spawning a task if there are no addresses left.
        if addrs.is_empty() {
            return;
        }

        // # Correctness
        //
        // Spawn address book accesses on a blocking thread,
        // to avoid deadlocks (see #1976).
        //
        // Extend handles duplicate addresses internally.
        let address_book = self.address_book.clone();
        let span = Span::current();
        tokio::task::spawn_blocking(move || {
            span.in_scope(|| address_book.lock().unwrap().extend(addrs))
        })
        .wait_for_panics()
        .await
    }

    /// Returns the next candidate for a connection attempt, if any are available.
    ///
    /// Returns peers in reconnection order, based on
    /// [`AddressBook::reconnection_peers`].
    ///
    /// Skips peers that have recently been active, attempted, or failed.
    ///
    /// ## Correctness
    ///
    /// `AttemptPending` peers will become [`Responded`] if they respond, or
    /// become `Failed` if they time out or provide a bad response.
    ///
    /// Live [`Responded`] peers will stay live if they keep responding, or
    /// become a reconnection candidate if they stop responding.
    ///
    /// ## Security
    ///
    /// Zebra resists distributed denial of service attacks by making sure that
    /// new peer connections are initiated at least
    /// [`MIN_OUTBOUND_PEER_CONNECTION_INTERVAL`][constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL]
    /// apart. If `next()` has recently provided a peer, then its future will sleep
    /// until the rate-limit has passed.
    ///
    /// [`Responded`]: crate::PeerAddrState::Responded
    pub async fn next(&mut self) -> Option<MetaAddr> {
        // Correctness: To avoid hangs, computation in the critical section should be kept to a minimum.
        let address_book = self.address_book.clone();
        let next_peer = move || -> Option<MetaAddr> {
            let mut guard = address_book.lock().unwrap();

            // Now we have the lock, get the current time
            let instant_now = std::time::Instant::now();
            let chrono_now = Utc::now();

            // It's okay to return without sleeping here, because we're returning
            // `None`. We only need to sleep before yielding an address.
            let next_peer = guard.reconnection_peers(instant_now, chrono_now).next()?;

            // TODO: only mark the peer as AttemptPending when it is actually used (#1976)
            //
            // If the future is dropped before `next` returns, the peer will be marked as AttemptPending,
            // even if its address is not actually used for a connection.
            //
            // We could send a reconnect change to the AddressBookUpdater when the peer is actually used,
            // but channel order is not guaranteed, so we could accidentally re-use the same peer.
            let next_peer = MetaAddr::new_reconnect(next_peer.addr);
            guard.update(next_peer)
        };

        // Correctness: Spawn address book accesses on a blocking thread, to avoid deadlocks (see #1976).
        let span = Span::current();
        let next_peer = tokio::task::spawn_blocking(move || span.in_scope(next_peer))
            .wait_for_panics()
            .await?;

        // Security: rate-limit new outbound peer connections
        sleep_until(self.min_next_handshake).await;
        self.min_next_handshake = Instant::now() + constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL;

        Some(next_peer)
    }

    /// Returns the address book for this `CandidateSet`.
    #[cfg(any(test, feature = "proptest-impl"))]
    #[allow(dead_code)]
    pub async fn address_book(&self) -> Arc<std::sync::Mutex<AddressBook>> {
        self.address_book.clone()
    }
}

/// Check new `addrs` before adding them to the address book.
///
/// `last_seen_limit` is the maximum permitted last seen time, typically
/// [`Utc::now`].
///
/// If the data in an address is invalid, this function can:
/// - modify the address data, or
/// - delete the address.
///
/// # Security
///
/// Adjusts untrusted last seen times so they are not in the future. This stops
/// malicious peers keeping all their addresses at the front of the connection
/// queue. Honest peers with future clock skew also get adjusted.
///
/// Rejects all addresses if any calculated times overflow or underflow.
fn validate_addrs(
    addrs: impl IntoIterator<Item = MetaAddr>,
    last_seen_limit: DateTime32,
) -> impl Iterator<Item = MetaAddr> {
    // Note: The address book handles duplicate addresses internally,
    // so we don't need to de-duplicate addresses here.

    // TODO:
    // We should eventually implement these checks in this function:
    // - Zebra should ignore peers that are older than 3 weeks (part of #1865)
    // - Zebra should count back 3 weeks from the newest peer timestamp sent
    //   by the other peer, to compensate for clock skew

    let mut addrs: Vec<_> = addrs.into_iter().collect();

    limit_last_seen_times(&mut addrs, last_seen_limit);

    addrs.into_iter()
}

/// Ensure all reported `last_seen` times are less than or equal to `last_seen_limit`.
///
/// This will consider all addresses as invalid if trying to offset their
/// `last_seen` times to be before the limit causes an underflow.
fn limit_last_seen_times(addrs: &mut Vec<MetaAddr>, last_seen_limit: DateTime32) {
    let last_seen_times = addrs.iter().map(|meta_addr| {
        meta_addr
            .untrusted_last_seen()
            .expect("unexpected missing last seen: should be provided by deserialization")
    });
    let oldest_seen = last_seen_times.clone().min().unwrap_or(DateTime32::MIN);
    let newest_seen = last_seen_times.max().unwrap_or(DateTime32::MAX);

    // If any time is in the future, adjust all times, to compensate for clock skew on honest peers
    if newest_seen > last_seen_limit {
        let offset = newest_seen
            .checked_duration_since(last_seen_limit)
            .expect("unexpected underflow: just checked newest_seen is greater");

        // Check for underflow
        if oldest_seen.checked_sub(offset).is_some() {
            // No underflow is possible, so apply offset to all addresses
            for addr in addrs {
                let last_seen = addr
                    .untrusted_last_seen()
                    .expect("unexpected missing last seen: should be provided by deserialization");
                let last_seen = last_seen
                    .checked_sub(offset)
                    .expect("unexpected underflow: just checked oldest_seen");

                addr.set_untrusted_last_seen(last_seen);
            }
        } else {
            // An underflow will occur, so reject all gossiped peers
            addrs.clear();
        }
    }
}