zebra_network/peer_set/
initialize.rs

1//! A peer set whose size is dynamically determined by resource constraints.
2//!
3//! The [`PeerSet`] implementation is adapted from the one in [tower::Balance][tower-balance].
4//!
5//! [tower-balance]: https://github.com/tower-rs/tower/tree/master/tower/src/balance
6
7use std::{
8    collections::{BTreeMap, HashMap, HashSet},
9    convert::Infallible,
10    net::{IpAddr, SocketAddr},
11    pin::Pin,
12    sync::Arc,
13    time::Duration,
14};
15
16use futures::{
17    future::{self, FutureExt},
18    sink::SinkExt,
19    stream::{FuturesUnordered, StreamExt},
20    Future, TryFutureExt,
21};
22use indexmap::IndexMap;
23use rand::seq::SliceRandom;
24use tokio::{
25    net::{TcpListener, TcpStream},
26    sync::{broadcast, mpsc, watch},
27    time::{sleep, Instant},
28};
29use tokio_stream::wrappers::IntervalStream;
30use tower::{
31    buffer::Buffer, discover::Change, layer::Layer, util::BoxService, Service, ServiceExt,
32};
33use tracing_futures::Instrument;
34
35use zebra_chain::{chain_tip::ChainTip, diagnostic::task::WaitForPanics};
36
37use crate::{
38    address_book_updater::{AddressBookUpdater, MIN_CHANNEL_SIZE},
39    constants,
40    meta_addr::{MetaAddr, MetaAddrChange},
41    peer::{
42        self, address_is_valid_for_inbound_listeners, HandshakeRequest, MinimumPeerVersion,
43        OutboundConnectorRequest, PeerPreference,
44    },
45    peer_cache_updater::peer_cache_updater,
46    peer_set::{set::MorePeers, ActiveConnectionCounter, CandidateSet, ConnectionTracker, PeerSet},
47    AddressBook, BoxError, Config, PeerSocketAddr, Request, Response,
48};
49
50#[cfg(test)]
51mod tests;
52
53mod recent_by_ip;
54
55/// A successful outbound peer connection attempt or inbound connection handshake.
56///
57/// The [`Handshake`](peer::Handshake) service returns a [`Result`]. Only successful connections
58/// should be sent on the channel. Errors should be logged or ignored.
59///
60/// We don't allow any errors in this type, because:
61/// - The connection limits don't include failed connections
62/// - tower::Discover interprets an error as stream termination
63type DiscoveredPeer = (PeerSocketAddr, peer::Client);
64
65/// Initialize a peer set, using a network `config`, `inbound_service`,
66/// and `latest_chain_tip`.
67///
68/// The peer set abstracts away peer management to provide a
69/// [`tower::Service`] representing "the network" that load-balances requests
70/// over available peers.  The peer set automatically crawls the network to
71/// find more peer addresses and opportunistically connects to new peers.
72///
73/// Each peer connection's message handling is isolated from other
74/// connections, unlike in `zcashd`.  The peer connection first attempts to
75/// interpret inbound messages as part of a response to a previously-issued
76/// request.  Otherwise, inbound messages are interpreted as requests and sent
77/// to the supplied `inbound_service`.
78///
79/// Wrapping the `inbound_service` in [`tower::load_shed`] middleware will
80/// cause the peer set to shrink when the inbound service is unable to keep up
81/// with the volume of inbound requests.
82///
83/// Use [`NoChainTip`][1] to explicitly provide no chain tip receiver.
84///
85/// In addition to returning a service for outbound requests, this method
86/// returns a shared [`AddressBook`] updated with last-seen timestamps for
87/// connected peers. The shared address book should be accessed using a
88/// [blocking thread](https://docs.rs/tokio/1.15.0/tokio/task/index.html#blocking-and-yielding),
89/// to avoid async task deadlocks.
90///
91/// # Panics
92///
93/// If `config.config.peerset_initial_target_size` is zero.
94/// (zebra-network expects to be able to connect to at least one peer.)
95///
96/// [1]: zebra_chain::chain_tip::NoChainTip
97pub async fn init<S, C>(
98    config: Config,
99    inbound_service: S,
100    latest_chain_tip: C,
101    user_agent: String,
102) -> (
103    Buffer<BoxService<Request, Response, BoxError>, Request>,
104    Arc<std::sync::Mutex<AddressBook>>,
105    mpsc::Sender<(PeerSocketAddr, u32)>,
106)
107where
108    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + Sync + 'static,
109    S::Future: Send + 'static,
110    C: ChainTip + Clone + Send + Sync + 'static,
111{
112    let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;
113
114    let (
115        address_book,
116        bans_receiver,
117        address_book_updater,
118        address_metrics,
119        address_book_updater_guard,
120    ) = AddressBookUpdater::spawn(&config, listen_addr);
121
122    let (misbehavior_tx, mut misbehavior_rx) = mpsc::channel(
123        // Leave enough room for a misbehaviour update on every peer connection
124        // before the channel is drained.
125        config
126            .peerset_total_connection_limit()
127            .max(MIN_CHANNEL_SIZE),
128    );
129
130    let misbehaviour_updater = address_book_updater.clone();
131    tokio::spawn(
132        async move {
133            let mut misbehaviors: HashMap<PeerSocketAddr, u32> = HashMap::new();
134            // Batch misbehaviour updates so peers can't keep the address book mutex locked
135            // by repeatedly sending invalid blocks or transactions.
136            let mut flush_timer =
137                IntervalStream::new(tokio::time::interval(Duration::from_secs(30)));
138
139            loop {
140                tokio::select! {
141                    msg = misbehavior_rx.recv() => match msg {
142                        Some((peer_addr, score_increment)) => *misbehaviors
143                            .entry(peer_addr)
144                            .or_default()
145                            += score_increment,
146                        None => break,
147                    },
148
149                    _ = flush_timer.next() => {
150                        for (addr, score_increment) in misbehaviors.drain() {
151                            let _ = misbehaviour_updater
152                                .send(MetaAddrChange::UpdateMisbehavior {
153                                    addr,
154                                    score_increment
155                                })
156                                .await;
157                        }
158                    },
159                };
160            }
161
162            tracing::warn!("exiting misbehavior update batch task");
163        }
164        .in_current_span(),
165    );
166
167    // Create a broadcast channel for peer inventory advertisements.
168    // If it reaches capacity, this channel drops older inventory advertisements.
169    //
170    // When Zebra is at the chain tip with an up-to-date mempool,
171    // we expect to have at most 1 new transaction per connected peer,
172    // and 1-2 new blocks across the entire network.
173    // (The block syncer and mempool crawler handle bulk fetches of blocks and transactions.)
174    let (inv_sender, inv_receiver) = broadcast::channel(config.peerset_total_connection_limit());
175
176    // Construct services that handle inbound handshakes and perform outbound
177    // handshakes. These use the same handshake service internally to detect
178    // self-connection attempts. Both are decorated with a tower TimeoutLayer to
179    // enforce timeouts as specified in the Config.
180    let (listen_handshaker, outbound_connector) = {
181        use tower::timeout::TimeoutLayer;
182        let hs_timeout = TimeoutLayer::new(constants::HANDSHAKE_TIMEOUT);
183        use crate::protocol::external::types::PeerServices;
184        let hs = peer::Handshake::builder()
185            .with_config(config.clone())
186            .with_inbound_service(inbound_service)
187            .with_inventory_collector(inv_sender)
188            .with_address_book_updater(address_book_updater.clone())
189            .with_advertised_services(PeerServices::NODE_NETWORK)
190            .with_user_agent(user_agent)
191            .with_latest_chain_tip(latest_chain_tip.clone())
192            .want_transactions(true)
193            .finish()
194            .expect("configured all required parameters");
195        (
196            hs_timeout.layer(hs.clone()),
197            hs_timeout.layer(peer::Connector::new(hs)),
198        )
199    };
200
201    // Create an mpsc channel for peer changes,
202    // based on the maximum number of inbound and outbound peers.
203    //
204    // The connection limit does not apply to errors,
205    // so they need to be handled before sending to this channel.
206    let (peerset_tx, peerset_rx) =
207        futures::channel::mpsc::channel::<DiscoveredPeer>(config.peerset_total_connection_limit());
208
209    let discovered_peers = peerset_rx.map(|(address, client)| {
210        Result::<_, Infallible>::Ok(Change::Insert(address, client.into()))
211    });
212
213    // Create an mpsc channel for peerset demand signaling,
214    // based on the maximum number of outbound peers.
215    let (mut demand_tx, demand_rx) =
216        futures::channel::mpsc::channel::<MorePeers>(config.peerset_outbound_connection_limit());
217
218    // Create a oneshot to send background task JoinHandles to the peer set
219    let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
220
221    // Connect the rx end to a PeerSet, wrapping new peers in load instruments.
222    let peer_set = PeerSet::new(
223        &config,
224        discovered_peers,
225        demand_tx.clone(),
226        handle_rx,
227        inv_receiver,
228        bans_receiver.clone(),
229        address_metrics,
230        MinimumPeerVersion::new(latest_chain_tip, &config.network),
231        None,
232    );
233    let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);
234
235    // Connect peerset_tx to the 3 peer sources:
236    //
237    // 1. Incoming peer connections, via a listener.
238    let listen_fut = accept_inbound_connections(
239        config.clone(),
240        tcp_listener,
241        constants::MIN_INBOUND_PEER_CONNECTION_INTERVAL,
242        listen_handshaker,
243        peerset_tx.clone(),
244        bans_receiver,
245    );
246    let listen_guard = tokio::spawn(listen_fut.in_current_span());
247
248    // 2. Initial peers, specified in the config and cached on disk.
249    let initial_peers_fut = add_initial_peers(
250        config.clone(),
251        outbound_connector.clone(),
252        peerset_tx.clone(),
253        address_book_updater.clone(),
254    );
255    let initial_peers_join = tokio::spawn(initial_peers_fut.in_current_span());
256
257    // 3. Outgoing peers we connect to in response to load.
258    let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone());
259
260    // Wait for the initial seed peer count
261    let mut active_outbound_connections = initial_peers_join
262        .wait_for_panics()
263        .await
264        .expect("unexpected error connecting to initial peers");
265    let active_initial_peer_count = active_outbound_connections.update_count();
266
267    // We need to await candidates.update() here,
268    // because zcashd rate-limits `addr`/`addrv2` messages per connection,
269    // and if we only have one initial peer,
270    // we need to ensure that its `Response::Addr` is used by the crawler.
271    //
272    // TODO: this might not be needed after we added the Connection peer address cache,
273    //       try removing it in a future release?
274    info!(
275        ?active_initial_peer_count,
276        "sending initial request for peers"
277    );
278    let _ = candidates.update_initial(active_initial_peer_count).await;
279
280    // Compute remaining connections to open.
281    let demand_count = config
282        .peerset_initial_target_size
283        .saturating_sub(active_outbound_connections.update_count());
284
285    for _ in 0..demand_count {
286        let _ = demand_tx.try_send(MorePeers);
287    }
288
289    // Start the peer crawler
290    let crawl_fut = crawl_and_dial(
291        config.clone(),
292        demand_tx,
293        demand_rx,
294        candidates,
295        outbound_connector,
296        peerset_tx,
297        active_outbound_connections,
298        address_book_updater,
299    );
300    let crawl_guard = tokio::spawn(crawl_fut.in_current_span());
301
302    // Start the peer disk cache updater
303    let peer_cache_updater_fut = peer_cache_updater(config, address_book.clone());
304    let peer_cache_updater_guard = tokio::spawn(peer_cache_updater_fut.in_current_span());
305
306    handle_tx
307        .send(vec![
308            listen_guard,
309            crawl_guard,
310            address_book_updater_guard,
311            peer_cache_updater_guard,
312        ])
313        .unwrap();
314
315    (peer_set, address_book, misbehavior_tx)
316}
317
318/// Use the provided `outbound_connector` to connect to the configured DNS seeder and
319/// disk cache initial peers, then send the resulting peer connections over `peerset_tx`.
320///
321/// Also sends every initial peer address to the `address_book_updater`.
322#[instrument(skip(config, outbound_connector, peerset_tx, address_book_updater))]
323async fn add_initial_peers<S>(
324    config: Config,
325    outbound_connector: S,
326    mut peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
327    address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
328) -> Result<ActiveConnectionCounter, BoxError>
329where
330    S: Service<
331            OutboundConnectorRequest,
332            Response = (PeerSocketAddr, peer::Client),
333            Error = BoxError,
334        > + Clone
335        + Send
336        + 'static,
337    S::Future: Send + 'static,
338{
339    let initial_peers = limit_initial_peers(&config, address_book_updater).await;
340
341    let mut handshake_success_total: usize = 0;
342    let mut handshake_error_total: usize = 0;
343
344    let mut active_outbound_connections = ActiveConnectionCounter::new_counter_with(
345        config.peerset_outbound_connection_limit(),
346        "Outbound Connections",
347    );
348
349    // TODO: update when we add Tor peers or other kinds of addresses.
350    let ipv4_peer_count = initial_peers.iter().filter(|ip| ip.is_ipv4()).count();
351    let ipv6_peer_count = initial_peers.iter().filter(|ip| ip.is_ipv6()).count();
352    info!(
353        ?ipv4_peer_count,
354        ?ipv6_peer_count,
355        "connecting to initial peer set"
356    );
357
358    // # Security
359    //
360    // Resists distributed denial of service attacks by making sure that
361    // new peer connections are initiated at least `MIN_OUTBOUND_PEER_CONNECTION_INTERVAL` apart.
362    //
363    // # Correctness
364    //
365    // Each `FuturesUnordered` can hold one `Buffer` or `Batch` reservation for
366    // an indefinite period. We can use `FuturesUnordered` without filling
367    // the underlying network buffers, because we immediately drive this
368    // single `FuturesUnordered` to completion, and handshakes have a short timeout.
369    let mut handshakes: FuturesUnordered<_> = initial_peers
370        .into_iter()
371        .enumerate()
372        .map(|(i, addr)| {
373            let connection_tracker = active_outbound_connections.track_connection();
374            let req = OutboundConnectorRequest {
375                addr,
376                connection_tracker,
377            };
378            let outbound_connector = outbound_connector.clone();
379
380            // Spawn a new task to make the outbound connection.
381            tokio::spawn(
382                async move {
383                    // Only spawn one outbound connector per
384                    // `MIN_OUTBOUND_PEER_CONNECTION_INTERVAL`,
385                    // by sleeping for the interval multiplied by the peer's index in the list.
386                    sleep(
387                        constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL.saturating_mul(i as u32),
388                    )
389                    .await;
390
391                    // As soon as we create the connector future,
392                    // the handshake starts running as a spawned task.
393                    outbound_connector
394                        .oneshot(req)
395                        .map_err(move |e| (addr, e))
396                        .await
397                }
398                .in_current_span(),
399            )
400            .wait_for_panics()
401        })
402        .collect();
403
404    while let Some(handshake_result) = handshakes.next().await {
405        match handshake_result {
406            Ok(change) => {
407                handshake_success_total += 1;
408                debug!(
409                    ?handshake_success_total,
410                    ?handshake_error_total,
411                    ?change,
412                    "an initial peer handshake succeeded"
413                );
414
415                // The connection limit makes sure this send doesn't block
416                peerset_tx.send(change).await?;
417            }
418            Err((addr, ref e)) => {
419                handshake_error_total += 1;
420
421                // this is verbose, but it's better than just hanging with no output when there are errors
422                let mut expected_error = false;
423                if let Some(io_error) = e.downcast_ref::<tokio::io::Error>() {
424                    // Some systems only have IPv4, or only have IPv6,
425                    // so these errors are not particularly interesting.
426                    if io_error.kind() == tokio::io::ErrorKind::AddrNotAvailable {
427                        expected_error = true;
428                    }
429                }
430
431                if expected_error {
432                    debug!(
433                        successes = ?handshake_success_total,
434                        errors = ?handshake_error_total,
435                        ?addr,
436                        ?e,
437                        "an initial peer connection failed"
438                    );
439                } else {
440                    info!(
441                        successes = ?handshake_success_total,
442                        errors = ?handshake_error_total,
443                        ?addr,
444                        %e,
445                        "an initial peer connection failed"
446                    );
447                }
448            }
449        }
450
451        // Security: Let other tasks run after each connection is processed.
452        //
453        // Avoids remote peers starving other Zebra tasks using initial connection successes or errors.
454        tokio::task::yield_now().await;
455    }
456
457    let outbound_connections = active_outbound_connections.update_count();
458    info!(
459        ?handshake_success_total,
460        ?handshake_error_total,
461        ?outbound_connections,
462        "finished connecting to initial seed and disk cache peers"
463    );
464
465    Ok(active_outbound_connections)
466}
467
468/// Limit the number of `initial_peers` addresses entries to the configured
469/// `peerset_initial_target_size`.
470///
471/// Returns randomly chosen entries from the provided set of addresses,
472/// in a random order.
473///
474/// Also sends every initial peer to the `address_book_updater`.
475async fn limit_initial_peers(
476    config: &Config,
477    address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
478) -> HashSet<PeerSocketAddr> {
479    let all_peers: HashSet<PeerSocketAddr> = config.initial_peers().await;
480    let mut preferred_peers: BTreeMap<PeerPreference, Vec<PeerSocketAddr>> = BTreeMap::new();
481
482    let all_peers_count = all_peers.len();
483    if all_peers_count > config.peerset_initial_target_size {
484        info!(
485            "limiting the initial peers list from {} to {}",
486            all_peers_count, config.peerset_initial_target_size,
487        );
488    }
489
490    // Filter out invalid initial peers, and prioritise valid peers for initial connections.
491    // (This treats initial peers the same way we treat gossiped peers.)
492    for peer_addr in all_peers {
493        let preference = PeerPreference::new(peer_addr, config.network.clone());
494
495        match preference {
496            Ok(preference) => preferred_peers
497                .entry(preference)
498                .or_default()
499                .push(peer_addr),
500            Err(error) => info!(
501                ?peer_addr,
502                ?error,
503                "invalid initial peer from DNS seeder, configured IP address, or disk cache",
504            ),
505        }
506    }
507
508    // Send every initial peer to the address book, in preferred order.
509    // (This treats initial peers the same way we treat gossiped peers.)
510    //
511    // # Security
512    //
513    // Initial peers are limited because:
514    // - the number of initial peers is limited
515    // - this code only runs once at startup
516    for peer in preferred_peers.values().flatten() {
517        let peer_addr = MetaAddr::new_initial_peer(*peer);
518        // `send` only waits when the channel is full.
519        // The address book updater runs in its own thread, so we will only wait for a short time.
520        let _ = address_book_updater.send(peer_addr).await;
521    }
522
523    // Split out the `initial_peers` that will be shuffled and returned,
524    // choosing preferred peers first.
525    let mut initial_peers: HashSet<PeerSocketAddr> = HashSet::new();
526    for better_peers in preferred_peers.values() {
527        let mut better_peers = better_peers.clone();
528        let (chosen_peers, _unused_peers) = better_peers.partial_shuffle(
529            &mut rand::thread_rng(),
530            config.peerset_initial_target_size - initial_peers.len(),
531        );
532
533        initial_peers.extend(chosen_peers.iter());
534
535        if initial_peers.len() >= config.peerset_initial_target_size {
536            break;
537        }
538    }
539
540    initial_peers
541}
542
543/// Open a peer connection listener on `config.listen_addr`,
544/// returning the opened [`TcpListener`], and the address it is bound to.
545///
546/// If the listener is configured to use an automatically chosen port (port `0`),
547/// then the returned address will contain the actual port.
548///
549/// # Panics
550///
551/// If opening the listener fails.
552#[instrument(skip(config), fields(addr = ?config.listen_addr))]
553pub(crate) async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) {
554    // Warn if we're configured using the wrong network port.
555    if let Err(wrong_addr) =
556        address_is_valid_for_inbound_listeners(config.listen_addr, config.network.clone())
557    {
558        warn!(
559            "We are configured with address {} on {:?}, but it could cause network issues. \
560             The default port for {:?} is {}. Error: {wrong_addr:?}",
561            config.listen_addr,
562            config.network,
563            config.network,
564            config.network.default_port(),
565        );
566    }
567
568    info!(
569        "Trying to open Zcash protocol endpoint at {}...",
570        config.listen_addr
571    );
572    let listener_result = TcpListener::bind(config.listen_addr).await;
573
574    let listener = match listener_result {
575        Ok(l) => l,
576        Err(e) => panic!(
577            "Opening Zcash network protocol listener {:?} failed: {e:?}. \
578             Hint: Check if another zebrad or zcashd process is running. \
579             Try changing the network listen_addr in the Zebra config.",
580            config.listen_addr,
581        ),
582    };
583
584    let local_addr = listener
585        .local_addr()
586        .expect("unexpected missing local addr for open listener");
587    info!("Opened Zcash protocol endpoint at {}", local_addr);
588
589    (listener, local_addr)
590}
591
592/// Listens for peer connections on `addr`, then sets up each connection as a
593/// Zcash peer.
594///
595/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
596/// the [`peer::Client`] result over `peerset_tx`.
597///
598/// Limits the number of active inbound connections based on `config`,
599/// and waits `min_inbound_peer_connection_interval` between connections.
600#[instrument(skip(config, listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))]
601async fn accept_inbound_connections<S>(
602    config: Config,
603    listener: TcpListener,
604    min_inbound_peer_connection_interval: Duration,
605    handshaker: S,
606    peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
607    bans_receiver: watch::Receiver<Arc<IndexMap<IpAddr, std::time::Instant>>>,
608) -> Result<(), BoxError>
609where
610    S: Service<peer::HandshakeRequest<TcpStream>, Response = peer::Client, Error = BoxError>
611        + Clone,
612    S::Future: Send + 'static,
613{
614    let mut recent_inbound_connections =
615        recent_by_ip::RecentByIp::new(None, Some(config.max_connections_per_ip));
616
617    let mut active_inbound_connections = ActiveConnectionCounter::new_counter_with(
618        config.peerset_inbound_connection_limit(),
619        "Inbound Connections",
620    );
621
622    let mut handshakes: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>> =
623        FuturesUnordered::new();
624    // Keeping an unresolved future in the pool means the stream never terminates.
625    handshakes.push(future::pending().boxed());
626
627    loop {
628        // Check for panics in finished tasks, before accepting new connections
629        let inbound_result = tokio::select! {
630            biased;
631            next_handshake_res = handshakes.next() => match next_handshake_res {
632                // The task has already sent the peer change to the peer set.
633                Some(()) => continue,
634                None => unreachable!("handshakes never terminates, because it contains a future that never resolves"),
635            },
636
637            // This future must wait until new connections are available: it can't have a timeout.
638            inbound_result = listener.accept() => inbound_result,
639        };
640
641        if let Ok((tcp_stream, addr)) = inbound_result {
642            let addr: PeerSocketAddr = addr.into();
643
644            if bans_receiver.borrow().clone().contains_key(&addr.ip()) {
645                debug!(?addr, "banned inbound connection attempt");
646                std::mem::drop(tcp_stream);
647                continue;
648            }
649
650            if active_inbound_connections.update_count()
651                >= config.peerset_inbound_connection_limit()
652                || recent_inbound_connections.is_past_limit_or_add(addr.ip())
653            {
654                // Too many open inbound connections or pending handshakes already.
655                // Close the connection.
656                std::mem::drop(tcp_stream);
657                // Allow invalid connections to be cleared quickly,
658                // but still put a limit on our CPU and network usage from failed connections.
659                tokio::time::sleep(constants::MIN_INBOUND_PEER_FAILED_CONNECTION_INTERVAL).await;
660                continue;
661            }
662
663            // The peer already opened a connection to us.
664            // So we want to increment the connection count as soon as possible.
665            let connection_tracker = active_inbound_connections.track_connection();
666            debug!(
667                inbound_connections = ?active_inbound_connections.update_count(),
668                "handshaking on an open inbound peer connection"
669            );
670
671            let handshake_task = accept_inbound_handshake(
672                addr,
673                handshaker.clone(),
674                tcp_stream,
675                connection_tracker,
676                peerset_tx.clone(),
677            )
678            .await?
679            .wait_for_panics();
680
681            handshakes.push(handshake_task);
682
683            // Rate-limit inbound connection handshakes.
684            // But sleep longer after a successful connection,
685            // so we can clear out failed connections at a higher rate.
686            //
687            // If there is a flood of connections,
688            // this stops Zebra overloading the network with handshake data.
689            //
690            // Zebra can't control how many queued connections are waiting,
691            // but most OSes also limit the number of queued inbound connections on a listener port.
692            tokio::time::sleep(min_inbound_peer_connection_interval).await;
693        } else {
694            // Allow invalid connections to be cleared quickly,
695            // but still put a limit on our CPU and network usage from failed connections.
696            debug!(?inbound_result, "error accepting inbound connection");
697            tokio::time::sleep(constants::MIN_INBOUND_PEER_FAILED_CONNECTION_INTERVAL).await;
698        }
699
700        // Security: Let other tasks run after each connection is processed.
701        //
702        // Avoids remote peers starving other Zebra tasks using inbound connection successes or
703        // errors.
704        //
705        // Preventing a denial of service is important in this code, so we want to sleep *and* make
706        // the next connection after other tasks have run. (Sleeps are not guaranteed to do that.)
707        tokio::task::yield_now().await;
708    }
709}
710
711/// Set up a new inbound connection as a Zcash peer.
712///
713/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
714/// the [`peer::Client`] result over `peerset_tx`.
715//
716// TODO: when we support inbound proxies, distinguish between proxied listeners and
717//       direct listeners in the span generated by this instrument macro
718#[instrument(skip(handshaker, tcp_stream, connection_tracker, peerset_tx))]
719async fn accept_inbound_handshake<S>(
720    addr: PeerSocketAddr,
721    mut handshaker: S,
722    tcp_stream: TcpStream,
723    connection_tracker: ConnectionTracker,
724    peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
725) -> Result<tokio::task::JoinHandle<()>, BoxError>
726where
727    S: Service<peer::HandshakeRequest<TcpStream>, Response = peer::Client, Error = BoxError>
728        + Clone,
729    S::Future: Send + 'static,
730{
731    let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr);
732
733    debug!("got incoming connection");
734
735    // # Correctness
736    //
737    // Holding the drop guard returned by Span::enter across .await points will
738    // result in incorrect traces if it yields.
739    //
740    // This await is okay because the handshaker's `poll_ready` method always returns Ready.
741    handshaker.ready().await?;
742
743    // Construct a handshake future but do not drive it yet....
744    let handshake = handshaker.call(HandshakeRequest {
745        data_stream: tcp_stream,
746        connected_addr,
747        connection_tracker,
748    });
749    // ... instead, spawn a new task to handle this connection
750    let mut peerset_tx = peerset_tx.clone();
751
752    let handshake_task = tokio::spawn(
753        async move {
754            let handshake_result = handshake.await;
755
756            if let Ok(client) = handshake_result {
757                // The connection limit makes sure this send doesn't block
758                let _ = peerset_tx.send((addr, client)).await;
759            } else {
760                debug!(?handshake_result, "error handshaking with inbound peer");
761            }
762        }
763        .in_current_span(),
764    );
765
766    Ok(handshake_task)
767}
768
769/// An action that the peer crawler can take.
770enum CrawlerAction {
771    /// Drop the demand signal because there are too many pending handshakes.
772    DemandDrop,
773    /// Initiate a handshake to the next candidate peer in response to demand.
774    ///
775    /// If there are no available candidates, crawl existing peers.
776    DemandHandshakeOrCrawl,
777    /// Crawl existing peers for more peers in response to a timer `tick`.
778    TimerCrawl { tick: Instant },
779    /// Clear a finished handshake.
780    HandshakeFinished,
781    /// Clear a finished demand crawl (DemandHandshakeOrCrawl with no peers).
782    DemandCrawlFinished,
783    /// Clear a finished TimerCrawl.
784    TimerCrawlFinished,
785}
786
787/// Given a channel `demand_rx` that signals a need for new peers, try to find
788/// and connect to new peers, and send the resulting `peer::Client`s through the
789/// `peerset_tx` channel.
790///
791/// Crawl for new peers every `config.crawl_new_peer_interval`.
792/// Also crawl whenever there is demand, but no new peers in `candidates`.
793/// After crawling, try to connect to one new peer using `outbound_connector`.
794///
795/// If a handshake fails, restore the unused demand signal by sending it to
796/// `demand_tx`.
797///
798/// The crawler terminates when `candidates.update()` or `peerset_tx` returns a
799/// permanent internal error. Transient errors and individual peer errors should
800/// be handled within the crawler.
801///
802/// Uses `active_outbound_connections` to limit the number of active outbound connections
803/// across both the initial peers and crawler. The limit is based on `config`.
804#[allow(clippy::too_many_arguments)]
805#[instrument(
806    skip(
807        config,
808        demand_tx,
809        demand_rx,
810        candidates,
811        outbound_connector,
812        peerset_tx,
813        active_outbound_connections,
814        address_book_updater,
815    ),
816    fields(
817        new_peer_interval = ?config.crawl_new_peer_interval,
818    )
819)]
820async fn crawl_and_dial<C, S>(
821    config: Config,
822    demand_tx: futures::channel::mpsc::Sender<MorePeers>,
823    mut demand_rx: futures::channel::mpsc::Receiver<MorePeers>,
824    candidates: CandidateSet<S>,
825    outbound_connector: C,
826    peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
827    mut active_outbound_connections: ActiveConnectionCounter,
828    address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
829) -> Result<(), BoxError>
830where
831    C: Service<
832            OutboundConnectorRequest,
833            Response = (PeerSocketAddr, peer::Client),
834            Error = BoxError,
835        > + Clone
836        + Send
837        + 'static,
838    C::Future: Send + 'static,
839    S: Service<Request, Response = Response, Error = BoxError> + Send + Sync + 'static,
840    S::Future: Send + 'static,
841{
842    use CrawlerAction::*;
843
844    info!(
845        crawl_new_peer_interval = ?config.crawl_new_peer_interval,
846        outbound_connections = ?active_outbound_connections.update_count(),
847        "starting the peer address crawler",
848    );
849
850    // # Concurrency
851    //
852    // Allow tasks using the candidate set to be spawned, so they can run concurrently.
853    // Previously, Zebra has had deadlocks and long hangs caused by running dependent
854    // candidate set futures in the same async task.
855    let candidates = Arc::new(futures::lock::Mutex::new(candidates));
856
857    // This contains both crawl and handshake tasks.
858    let mut handshakes: FuturesUnordered<
859        Pin<Box<dyn Future<Output = Result<CrawlerAction, BoxError>> + Send>>,
860    > = FuturesUnordered::new();
861    // <FuturesUnordered as Stream> returns None when empty.
862    // Keeping an unresolved future in the pool means the stream never terminates.
863    handshakes.push(future::pending().boxed());
864
865    let mut crawl_timer = tokio::time::interval(config.crawl_new_peer_interval);
866    // If the crawl is delayed, also delay all future crawls.
867    // (Shorter intervals just add load, without any benefit.)
868    crawl_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
869
870    let mut crawl_timer = IntervalStream::new(crawl_timer).map(|tick| TimerCrawl { tick });
871
872    // # Concurrency
873    //
874    // To avoid hangs and starvation, the crawler must spawn a separate task for each crawl
875    // and handshake, so they can make progress independently (and avoid deadlocking each other).
876    loop {
877        metrics::gauge!("crawler.in_flight_handshakes").set(
878            handshakes
879                .len()
880                .checked_sub(1)
881                .expect("the pool always contains an unresolved future") as f64,
882        );
883
884        let crawler_action = tokio::select! {
885            biased;
886            // Check for completed handshakes first, because the rest of the app needs them.
887            // Pending handshakes are limited by the connection limit.
888            next_handshake_res = handshakes.next() => next_handshake_res.expect(
889                "handshakes never terminates, because it contains a future that never resolves"
890            ),
891            // The timer is rate-limited
892            next_timer = crawl_timer.next() => Ok(next_timer.expect("timers never terminate")),
893            // Turn any new demand into an action, based on the crawler's current state.
894            //
895            // # Concurrency
896            //
897            // Demand is potentially unlimited, so it must go last in a biased select!.
898            next_demand = demand_rx.next() => next_demand.ok_or("demand stream closed, is Zebra shutting down?".into()).map(|MorePeers|{
899                if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() {
900                    // Too many open outbound connections or pending handshakes already
901                    DemandDrop
902                } else {
903                    DemandHandshakeOrCrawl
904                }
905            })
906        };
907
908        match crawler_action {
909            // Dummy actions
910            Ok(DemandDrop) => {
911                // This is set to trace level because when the peerset is
912                // congested it can generate a lot of demand signal very rapidly.
913                trace!("too many open connections or in-flight handshakes, dropping demand signal");
914            }
915
916            // Spawned tasks
917            Ok(DemandHandshakeOrCrawl) => {
918                let candidates = candidates.clone();
919                let outbound_connector = outbound_connector.clone();
920                let peerset_tx = peerset_tx.clone();
921                let address_book_updater = address_book_updater.clone();
922                let demand_tx = demand_tx.clone();
923
924                // Increment the connection count before we spawn the connection.
925                let outbound_connection_tracker = active_outbound_connections.track_connection();
926                let outbound_connections = active_outbound_connections.update_count();
927                debug!(?outbound_connections, "opening an outbound peer connection");
928
929                // Spawn each handshake or crawl into an independent task, so handshakes can make
930                // progress while crawls are running.
931                //
932                // # Concurrency
933                //
934                // The peer crawler must be able to make progress even if some handshakes are
935                // rate-limited. So the async mutex and next peer timeout are awaited inside the
936                // spawned task.
937                let handshake_or_crawl_handle = tokio::spawn(
938                    async move {
939                        // Try to get the next available peer for a handshake.
940                        //
941                        // candidates.next() has a short timeout, and briefly holds the address
942                        // book lock, so it shouldn't hang.
943                        //
944                        // Hold the lock for as short a time as possible.
945                        let candidate = { candidates.lock().await.next().await };
946
947                        if let Some(candidate) = candidate {
948                            // we don't need to spawn here, because there's nothing running concurrently
949                            dial(
950                                candidate,
951                                outbound_connector,
952                                outbound_connection_tracker,
953                                outbound_connections,
954                                peerset_tx,
955                                address_book_updater,
956                                demand_tx,
957                            )
958                            .await?;
959
960                            Ok(HandshakeFinished)
961                        } else {
962                            // There weren't any peers, so try to get more peers.
963                            debug!("demand for peers but no available candidates");
964
965                            crawl(candidates, demand_tx, false).await?;
966
967                            Ok(DemandCrawlFinished)
968                        }
969                    }
970                    .in_current_span(),
971                )
972                .wait_for_panics();
973
974                handshakes.push(handshake_or_crawl_handle);
975            }
976            Ok(TimerCrawl { tick }) => {
977                let candidates = candidates.clone();
978                let demand_tx = demand_tx.clone();
979                let should_always_dial = active_outbound_connections.update_count() == 0;
980
981                let crawl_handle = tokio::spawn(
982                    async move {
983                        debug!(
984                            ?tick,
985                            "crawling for more peers in response to the crawl timer"
986                        );
987
988                        crawl(candidates, demand_tx, should_always_dial).await?;
989
990                        Ok(TimerCrawlFinished)
991                    }
992                    .in_current_span(),
993                )
994                .wait_for_panics();
995
996                handshakes.push(crawl_handle);
997            }
998
999            // Completed spawned tasks
1000            Ok(HandshakeFinished) => {
1001                // Already logged in dial()
1002            }
1003            Ok(DemandCrawlFinished) => {
1004                // This is set to trace level because when the peerset is
1005                // congested it can generate a lot of demand signal very rapidly.
1006                trace!("demand-based crawl finished");
1007            }
1008            Ok(TimerCrawlFinished) => {
1009                debug!("timer-based crawl finished");
1010            }
1011
1012            // Fatal errors and shutdowns
1013            Err(error) => {
1014                info!(?error, "crawler task exiting due to an error");
1015                return Err(error);
1016            }
1017        }
1018
1019        // Security: Let other tasks run after each crawler action is processed.
1020        //
1021        // Avoids remote peers starving other Zebra tasks using outbound connection errors.
1022        tokio::task::yield_now().await;
1023    }
1024}
1025
1026/// Try to get more peers using `candidates`, then queue a connection attempt using `demand_tx`.
1027/// If there were no new peers and `should_always_dial` is false, the connection attempt is skipped.
1028#[instrument(skip(candidates, demand_tx))]
1029async fn crawl<S>(
1030    candidates: Arc<futures::lock::Mutex<CandidateSet<S>>>,
1031    mut demand_tx: futures::channel::mpsc::Sender<MorePeers>,
1032    should_always_dial: bool,
1033) -> Result<(), BoxError>
1034where
1035    S: Service<Request, Response = Response, Error = BoxError> + Send + Sync + 'static,
1036    S::Future: Send + 'static,
1037{
1038    // update() has timeouts, and briefly holds the address book
1039    // lock, so it shouldn't hang.
1040    // Try to get new peers, holding the lock for as short a time as possible.
1041    let result = {
1042        let result = candidates.lock().await.update().await;
1043        std::mem::drop(candidates);
1044        result
1045    };
1046    let more_peers = match result {
1047        Ok(more_peers) => more_peers.or_else(|| should_always_dial.then_some(MorePeers)),
1048        Err(e) => {
1049            info!(
1050                ?e,
1051                "candidate set returned an error, is Zebra shutting down?"
1052            );
1053            return Err(e);
1054        }
1055    };
1056
1057    // If we got more peers, try to connect to a new peer on our next loop.
1058    //
1059    // # Security
1060    //
1061    // Update attempts are rate-limited by the candidate set,
1062    // and we only try peers if there was actually an update.
1063    //
1064    // So if all peers have had a recent attempt, and there was recent update
1065    // with no peers, the channel will drain. This prevents useless update attempt
1066    // loops.
1067    if let Some(more_peers) = more_peers {
1068        if let Err(send_error) = demand_tx.try_send(more_peers) {
1069            if send_error.is_disconnected() {
1070                // Zebra is shutting down
1071                return Err(send_error.into());
1072            }
1073        }
1074    }
1075
1076    Ok(())
1077}
1078
1079/// Try to connect to `candidate` using `outbound_connector`.
1080/// Uses `outbound_connection_tracker` to track the active connection count.
1081///
1082/// On success, sends peers to `peerset_tx`.
1083/// On failure, marks the peer as failed in the address book,
1084/// then re-adds demand to `demand_tx`.
1085#[instrument(skip(
1086    outbound_connector,
1087    outbound_connection_tracker,
1088    outbound_connections,
1089    peerset_tx,
1090    address_book_updater,
1091    demand_tx
1092))]
1093async fn dial<C>(
1094    candidate: MetaAddr,
1095    mut outbound_connector: C,
1096    outbound_connection_tracker: ConnectionTracker,
1097    outbound_connections: usize,
1098    mut peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
1099    address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
1100    mut demand_tx: futures::channel::mpsc::Sender<MorePeers>,
1101) -> Result<(), BoxError>
1102where
1103    C: Service<
1104            OutboundConnectorRequest,
1105            Response = (PeerSocketAddr, peer::Client),
1106            Error = BoxError,
1107        > + Clone
1108        + Send
1109        + 'static,
1110    C::Future: Send + 'static,
1111{
1112    // If Zebra only has a few connections, we log connection failures at info level,
1113    // so users can diagnose and fix the problem. This defines the threshold for info logs.
1114    const MAX_CONNECTIONS_FOR_INFO_LOG: usize = 5;
1115
1116    // # Correctness
1117    //
1118    // To avoid hangs, the dialer must only await:
1119    // - functions that return immediately, or
1120    // - functions that have a reasonable timeout
1121
1122    debug!(?candidate.addr, "attempting outbound connection in response to demand");
1123
1124    // the connector is always ready, so this can't hang
1125    let outbound_connector = outbound_connector.ready().await?;
1126
1127    let req = OutboundConnectorRequest {
1128        addr: candidate.addr,
1129        connection_tracker: outbound_connection_tracker,
1130    };
1131
1132    // the handshake has timeouts, so it shouldn't hang
1133    let handshake_result = outbound_connector.call(req).map(Into::into).await;
1134
1135    match handshake_result {
1136        Ok((address, client)) => {
1137            debug!(?candidate.addr, "successfully dialed new peer");
1138
1139            // The connection limit makes sure this send doesn't block.
1140            peerset_tx.send((address, client)).await?;
1141        }
1142        // The connection was never opened, or it failed the handshake and was dropped.
1143        Err(error) => {
1144            // Silence verbose info logs in production, but keep logs if the number of connections is low.
1145            // Also silence them completely in tests.
1146            if outbound_connections <= MAX_CONNECTIONS_FOR_INFO_LOG && !cfg!(test) {
1147                info!(?error, ?candidate.addr, "failed to make outbound connection to peer");
1148            } else {
1149                debug!(?error, ?candidate.addr, "failed to make outbound connection to peer");
1150            }
1151            report_failed(address_book_updater.clone(), candidate).await;
1152
1153            // The demand signal that was taken out of the queue to attempt to connect to the
1154            // failed candidate never turned into a connection, so add it back.
1155            //
1156            // # Security
1157            //
1158            // Handshake failures are rate-limited by peer attempt timeouts.
1159            if let Err(send_error) = demand_tx.try_send(MorePeers) {
1160                if send_error.is_disconnected() {
1161                    // Zebra is shutting down
1162                    return Err(send_error.into());
1163                }
1164            }
1165        }
1166    }
1167
1168    Ok(())
1169}
1170
1171/// Mark `addr` as a failed peer to `address_book_updater`.
1172#[instrument(skip(address_book_updater))]
1173async fn report_failed(
1174    address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
1175    addr: MetaAddr,
1176) {
1177    // The connection info is the same as what's already in the address book.
1178    let addr = MetaAddr::new_errored(addr.addr, None);
1179
1180    // Ignore send errors on Zebra shutdown.
1181    let _ = address_book_updater.send(addr).await;
1182}