zebra_network/peer/
handshake.rs

1//! Initial [`Handshake`]s with Zebra peers over a `PeerTransport`.
2
3use std::{
4    cmp::min,
5    fmt,
6    future::Future,
7    net::{Ipv4Addr, SocketAddr},
8    panic,
9    pin::Pin,
10    sync::Arc,
11    task::{Context, Poll},
12};
13
14use chrono::{TimeZone, Utc};
15use futures::{channel::oneshot, future, pin_mut, FutureExt, SinkExt, StreamExt};
16use indexmap::IndexSet;
17use tokio::{
18    io::{AsyncRead, AsyncWrite},
19    sync::broadcast,
20    task::JoinError,
21    time::{error, timeout, Instant},
22};
23use tokio_stream::wrappers::IntervalStream;
24use tokio_util::codec::Framed;
25use tower::Service;
26use tracing::{span, Level, Span};
27use tracing_futures::Instrument;
28
29use zebra_chain::{
30    chain_tip::{ChainTip, NoChainTip},
31    parameters::Network,
32    serialization::{DateTime32, SerializationError},
33};
34
35use crate::{
36    constants,
37    meta_addr::MetaAddrChange,
38    peer::{
39        CancelHeartbeatTask, Client, ClientRequest, Connection, ErrorSlot, HandshakeError,
40        MinimumPeerVersion, PeerError,
41    },
42    peer_set::{ConnectionTracker, InventoryChange},
43    protocol::{
44        external::{types::*, AddrInVersion, Codec, InventoryHash, Message},
45        internal::{Request, Response},
46    },
47    types::MetaAddr,
48    BoxError, Config, PeerSocketAddr, VersionMessage,
49};
50
51#[cfg(test)]
52mod tests;
53
54/// A [`Service`] that handshakes with a remote peer and constructs a
55/// client/server pair.
56///
57/// CORRECTNESS
58///
59/// To avoid hangs, each handshake (or its connector) should be:
60/// - launched in a separate task, and
61/// - wrapped in a timeout.
62pub struct Handshake<S, C = NoChainTip>
63where
64    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
65    S::Future: Send,
66    C: ChainTip + Clone + Send + 'static,
67{
68    config: Config,
69    user_agent: String,
70    our_services: PeerServices,
71    relay: bool,
72
73    inbound_service: S,
74    address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
75    inv_collector: broadcast::Sender<InventoryChange>,
76    minimum_peer_version: MinimumPeerVersion<C>,
77    nonces: Arc<futures::lock::Mutex<IndexSet<Nonce>>>,
78
79    parent_span: Span,
80}
81
82impl<S, C> fmt::Debug for Handshake<S, C>
83where
84    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
85    S::Future: Send,
86    C: ChainTip + Clone + Send + 'static,
87{
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        // skip the channels, they don't tell us anything useful
90        f.debug_struct(std::any::type_name::<Handshake<S, C>>())
91            .field("config", &self.config)
92            .field("user_agent", &self.user_agent)
93            .field("our_services", &self.our_services)
94            .field("relay", &self.relay)
95            .field("minimum_peer_version", &self.minimum_peer_version)
96            .field("parent_span", &self.parent_span)
97            .finish()
98    }
99}
100
101impl<S, C> Clone for Handshake<S, C>
102where
103    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
104    S::Future: Send,
105    C: ChainTip + Clone + Send + 'static,
106{
107    fn clone(&self) -> Self {
108        Self {
109            config: self.config.clone(),
110            user_agent: self.user_agent.clone(),
111            our_services: self.our_services,
112            relay: self.relay,
113            inbound_service: self.inbound_service.clone(),
114            address_book_updater: self.address_book_updater.clone(),
115            inv_collector: self.inv_collector.clone(),
116            minimum_peer_version: self.minimum_peer_version.clone(),
117            nonces: self.nonces.clone(),
118            parent_span: self.parent_span.clone(),
119        }
120    }
121}
122
123/// The metadata for a peer connection.
124#[derive(Clone, Debug, PartialEq, Eq)]
125pub struct ConnectionInfo {
126    /// The connected peer address, if known.
127    /// This address might not be valid for outbound connections.
128    ///
129    /// Peers can be connected via a transient inbound or proxy address,
130    /// which will appear as the connected address to the OS and Zebra.
131    pub connected_addr: ConnectedAddr,
132
133    /// The network protocol [`VersionMessage`] sent by the remote peer.
134    pub remote: VersionMessage,
135
136    /// The network protocol version negotiated with the remote peer.
137    ///
138    /// Derived from `remote.version` and the
139    /// [current `zebra_network` protocol version](constants::CURRENT_NETWORK_PROTOCOL_VERSION).
140    pub negotiated_version: Version,
141}
142
143/// The peer address that we are handshaking with.
144///
145/// Typically, we can rely on outbound addresses, but inbound addresses don't
146/// give us enough information to reconnect to that peer.
147#[derive(Copy, Clone, PartialEq, Eq)]
148pub enum ConnectedAddr {
149    /// The address we used to make a direct outbound connection.
150    ///
151    /// In an honest network, a Zcash peer is listening on this exact address
152    /// and port.
153    OutboundDirect {
154        /// The connected outbound remote address and port.
155        addr: PeerSocketAddr,
156    },
157
158    /// The address we received from the OS, when a remote peer directly
159    /// connected to our Zcash listener port.
160    ///
161    /// In an honest network, a Zcash peer might be listening on this address,
162    /// if its outbound address is the same as its listener address. But the port
163    /// is an ephemeral outbound TCP port, not a listener port.
164    InboundDirect {
165        /// The connected inbound remote address and ephemeral port.
166        ///
167        /// The IP address might be the address of a Zcash peer, but the port is an ephemeral port.
168        addr: PeerSocketAddr,
169    },
170
171    /// The proxy address we used to make an outbound connection.
172    ///
173    /// The proxy address can be used by many connections, but our own ephemeral
174    /// outbound address and port can be used as an identifier for the duration
175    /// of this connection.
176    OutboundProxy {
177        /// The remote address and port of the proxy.
178        proxy_addr: SocketAddr,
179
180        /// The local address and transient port we used to connect to the proxy.
181        transient_local_addr: SocketAddr,
182    },
183
184    /// The address we received from the OS, when a remote peer connected via an
185    /// inbound proxy.
186    ///
187    /// The proxy's ephemeral outbound address can be used as an identifier for
188    /// the duration of this connection.
189    InboundProxy {
190        /// The local address and transient port we used to connect to the proxy.
191        transient_addr: SocketAddr,
192    },
193
194    /// An isolated connection, where we deliberately don't have any connection metadata.
195    Isolated,
196    //
197    // TODO: handle Tor onion addresses
198}
199
200/// Get an unspecified IPv4 address for `network`
201pub fn get_unspecified_ipv4_addr(network: Network) -> SocketAddr {
202    (Ipv4Addr::UNSPECIFIED, network.default_port()).into()
203}
204
205use ConnectedAddr::*;
206
207impl ConnectedAddr {
208    /// Returns a new outbound directly connected addr.
209    pub fn new_outbound_direct(addr: PeerSocketAddr) -> ConnectedAddr {
210        OutboundDirect { addr }
211    }
212
213    /// Returns a new inbound directly connected addr.
214    pub fn new_inbound_direct(addr: PeerSocketAddr) -> ConnectedAddr {
215        InboundDirect { addr }
216    }
217
218    /// Returns a new outbound connected addr via `proxy`.
219    ///
220    /// `local_addr` is the ephemeral local address of the connection.
221    #[allow(unused)]
222    pub fn new_outbound_proxy(proxy: SocketAddr, local_addr: SocketAddr) -> ConnectedAddr {
223        OutboundProxy {
224            proxy_addr: proxy,
225            transient_local_addr: local_addr,
226        }
227    }
228
229    /// Returns a new inbound connected addr from `proxy`.
230    //
231    // TODO: distinguish between direct listeners and proxy listeners in the
232    //       rest of zebra-network
233    #[allow(unused)]
234    pub fn new_inbound_proxy(proxy: SocketAddr) -> ConnectedAddr {
235        InboundProxy {
236            transient_addr: proxy,
237        }
238    }
239
240    /// Returns a new isolated connected addr, with no metadata.
241    pub fn new_isolated() -> ConnectedAddr {
242        Isolated
243    }
244
245    /// Returns a `PeerSocketAddr` that can be used to track this connection in the
246    /// `AddressBook`.
247    ///
248    /// `None` for inbound connections, proxy connections, and isolated
249    /// connections.
250    ///
251    /// # Correctness
252    ///
253    /// This address can be used for reconnection attempts, or as a permanent
254    /// identifier.
255    ///
256    /// # Security
257    ///
258    /// This address must not depend on the canonical address from the `Version`
259    /// message. Otherwise, malicious peers could interfere with other peers
260    /// `AddressBook` state.
261    ///
262    /// TODO: remove the `get_` from these methods (Rust style avoids `get` prefixes)
263    pub fn get_address_book_addr(&self) -> Option<PeerSocketAddr> {
264        match self {
265            OutboundDirect { addr } | InboundDirect { addr } => Some(*addr),
266            // TODO: consider using the canonical address of the peer to track
267            //       outbound proxy connections
268            OutboundProxy { .. } | InboundProxy { .. } | Isolated => None,
269        }
270    }
271
272    /// Returns a `PeerSocketAddr` that can be used to temporarily identify a
273    /// connection.
274    ///
275    /// Isolated connections must not change Zebra's peer set or address book
276    /// state, so they do not have an identifier.
277    ///
278    /// # Correctness
279    ///
280    /// The returned address is only valid while the original connection is
281    /// open. It must not be used in the `AddressBook`, for outbound connection
282    /// attempts, or as a permanent identifier.
283    ///
284    /// # Security
285    ///
286    /// This address must not depend on the canonical address from the `Version`
287    /// message. Otherwise, malicious peers could interfere with other peers'
288    /// `PeerSet` state.
289    pub fn get_transient_addr(&self) -> Option<PeerSocketAddr> {
290        match self {
291            OutboundDirect { addr } => Some(*addr),
292            InboundDirect { addr } => Some(*addr),
293            OutboundProxy {
294                transient_local_addr,
295                ..
296            } => Some(PeerSocketAddr::from(*transient_local_addr)),
297            InboundProxy { transient_addr } => Some(PeerSocketAddr::from(*transient_addr)),
298            Isolated => None,
299        }
300    }
301
302    /// Returns the metrics label for this connection's address.
303    pub fn get_transient_addr_label(&self) -> String {
304        self.get_transient_addr()
305            .map_or_else(|| "isolated".to_string(), |addr| addr.to_string())
306    }
307
308    /// Returns a short label for the kind of connection.
309    pub fn get_short_kind_label(&self) -> &'static str {
310        match self {
311            OutboundDirect { .. } => "Out",
312            InboundDirect { .. } => "In",
313            OutboundProxy { .. } => "ProxOut",
314            InboundProxy { .. } => "ProxIn",
315            Isolated => "Isol",
316        }
317    }
318
319    /// Returns a list of alternate remote peer addresses, which can be used for
320    /// reconnection attempts.
321    ///
322    /// Uses the connected address, and the remote canonical address.
323    ///
324    /// Skips duplicates. If this is an outbound connection, also skips the
325    /// remote address that we're currently connected to.
326    pub fn get_alternate_addrs(
327        &self,
328        mut canonical_remote: PeerSocketAddr,
329    ) -> impl Iterator<Item = PeerSocketAddr> {
330        let addrs = match self {
331            OutboundDirect { addr } => {
332                // Fixup unspecified addresses and ports using known good data
333                if canonical_remote.ip().is_unspecified() {
334                    canonical_remote.set_ip(addr.ip());
335                }
336                if canonical_remote.port() == 0 {
337                    canonical_remote.set_port(addr.port());
338                }
339
340                // Try the canonical remote address, if it is different from the
341                // outbound address (which we already have in our address book)
342                if &canonical_remote != addr {
343                    vec![canonical_remote]
344                } else {
345                    // we didn't learn a new address from the handshake:
346                    // it's the same as the outbound address, which is already in our address book
347                    Vec::new()
348                }
349            }
350
351            InboundDirect { addr } => {
352                // Use the IP from the TCP connection, and the port the peer told us
353                let maybe_addr = SocketAddr::new(addr.ip(), canonical_remote.port()).into();
354
355                // Try both addresses, but remove one duplicate if they match
356                if canonical_remote != maybe_addr {
357                    vec![canonical_remote, maybe_addr]
358                } else {
359                    vec![canonical_remote]
360                }
361            }
362
363            // Proxy addresses can't be used for reconnection attempts, but we
364            // can try the canonical remote address
365            OutboundProxy { .. } | InboundProxy { .. } => vec![canonical_remote],
366
367            // Hide all metadata for isolated connections
368            Isolated => Vec::new(),
369        };
370
371        addrs.into_iter()
372    }
373
374    /// Returns true if the [`ConnectedAddr`] was created for an inbound connection.
375    pub fn is_inbound(&self) -> bool {
376        matches!(self, InboundDirect { .. } | InboundProxy { .. })
377    }
378}
379
380impl fmt::Debug for ConnectedAddr {
381    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
382        let kind = self.get_short_kind_label();
383        let addr = self.get_transient_addr_label();
384
385        if matches!(self, Isolated) {
386            f.write_str(kind)
387        } else {
388            f.debug_tuple(kind).field(&addr).finish()
389        }
390    }
391}
392
393/// A builder for `Handshake`.
394pub struct Builder<S, C = NoChainTip>
395where
396    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
397    S::Future: Send,
398    C: ChainTip + Clone + Send + 'static,
399{
400    config: Option<Config>,
401    our_services: Option<PeerServices>,
402    user_agent: Option<String>,
403    relay: Option<bool>,
404
405    inbound_service: Option<S>,
406    address_book_updater: Option<tokio::sync::mpsc::Sender<MetaAddrChange>>,
407    inv_collector: Option<broadcast::Sender<InventoryChange>>,
408    latest_chain_tip: C,
409}
410
411impl<S, C> Builder<S, C>
412where
413    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
414    S::Future: Send,
415    C: ChainTip + Clone + Send + 'static,
416{
417    /// Provide a config.  Mandatory.
418    pub fn with_config(mut self, config: Config) -> Self {
419        self.config = Some(config);
420        self
421    }
422
423    /// Provide a service to handle inbound requests. Mandatory.
424    pub fn with_inbound_service(mut self, inbound_service: S) -> Self {
425        self.inbound_service = Some(inbound_service);
426        self
427    }
428
429    /// Provide a channel for registering inventory advertisements. Optional.
430    ///
431    /// This channel takes transient remote addresses, which the `PeerSet` uses
432    /// to look up peers that have specific inventory.
433    pub fn with_inventory_collector(
434        mut self,
435        inv_collector: broadcast::Sender<InventoryChange>,
436    ) -> Self {
437        self.inv_collector = Some(inv_collector);
438        self
439    }
440
441    /// Provide a hook for timestamp collection. Optional.
442    ///
443    /// This channel takes `MetaAddr`s, permanent addresses which can be used to
444    /// make outbound connections to peers.
445    pub fn with_address_book_updater(
446        mut self,
447        address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
448    ) -> Self {
449        self.address_book_updater = Some(address_book_updater);
450        self
451    }
452
453    /// Provide the services this node advertises to other peers.  Optional.
454    ///
455    /// If this is unset, the node will advertise itself as a client.
456    pub fn with_advertised_services(mut self, services: PeerServices) -> Self {
457        self.our_services = Some(services);
458        self
459    }
460
461    /// Provide this node's user agent.  Optional.
462    ///
463    /// This must be a valid BIP14 string.  If it is unset, the user-agent will be empty.
464    pub fn with_user_agent(mut self, user_agent: String) -> Self {
465        self.user_agent = Some(user_agent);
466        self
467    }
468
469    /// Provide a realtime endpoint to obtain the current best chain tip block height. Optional.
470    ///
471    /// If this is unset, the minimum accepted protocol version for peer connections is kept
472    /// constant over network upgrade activations.
473    ///
474    /// Use [`NoChainTip`] to explicitly provide no chain tip.
475    pub fn with_latest_chain_tip<NewC>(self, latest_chain_tip: NewC) -> Builder<S, NewC>
476    where
477        NewC: ChainTip + Clone + Send + 'static,
478    {
479        Builder {
480            latest_chain_tip,
481
482            // TODO: Until Rust RFC 2528 reaches stable, we can't do `..self`
483            config: self.config,
484            inbound_service: self.inbound_service,
485            address_book_updater: self.address_book_updater,
486            our_services: self.our_services,
487            user_agent: self.user_agent,
488            relay: self.relay,
489            inv_collector: self.inv_collector,
490        }
491    }
492
493    /// Whether to request that peers relay transactions to our node.  Optional.
494    ///
495    /// If this is unset, the node will not request transactions.
496    pub fn want_transactions(mut self, relay: bool) -> Self {
497        self.relay = Some(relay);
498        self
499    }
500
501    /// Consume this builder and produce a [`Handshake`].
502    ///
503    /// Returns an error only if any mandatory field was unset.
504    pub fn finish(self) -> Result<Handshake<S, C>, &'static str> {
505        let config = self.config.ok_or("did not specify config")?;
506        let inbound_service = self
507            .inbound_service
508            .ok_or("did not specify inbound service")?;
509        let inv_collector = self.inv_collector.unwrap_or_else(|| {
510            let (tx, _) = broadcast::channel(100);
511            tx
512        });
513        let address_book_updater = self.address_book_updater.unwrap_or_else(|| {
514            // No `AddressBookUpdater` for timestamp collection was passed, so create a stub
515            // channel. Dropping the receiver means sends will fail, but we don't care.
516            let (tx, _rx) = tokio::sync::mpsc::channel(1);
517            tx
518        });
519        let nonces = Arc::new(futures::lock::Mutex::new(IndexSet::new()));
520        let user_agent = self.user_agent.unwrap_or_default();
521        let our_services = self.our_services.unwrap_or_else(PeerServices::empty);
522        let relay = self.relay.unwrap_or(false);
523        let network = config.network.clone();
524        let minimum_peer_version = MinimumPeerVersion::new(self.latest_chain_tip, &network);
525
526        Ok(Handshake {
527            config,
528            user_agent,
529            our_services,
530            relay,
531            inbound_service,
532            address_book_updater,
533            inv_collector,
534            minimum_peer_version,
535            nonces,
536            parent_span: Span::current(),
537        })
538    }
539}
540
541impl<S> Handshake<S, NoChainTip>
542where
543    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
544    S::Future: Send,
545{
546    /// Create a builder that configures a [`Handshake`] service.
547    pub fn builder() -> Builder<S, NoChainTip> {
548        // We don't derive `Default` because the derive inserts a `where S:
549        // Default` bound even though `Option<S>` implements `Default` even if
550        // `S` does not.
551        Builder {
552            config: None,
553            our_services: None,
554            user_agent: None,
555            relay: None,
556            inbound_service: None,
557            address_book_updater: None,
558            inv_collector: None,
559            latest_chain_tip: NoChainTip,
560        }
561    }
562}
563
564/// Negotiate the Zcash network protocol version with the remote peer at `connected_addr`, using
565/// the connection `peer_conn`.
566///
567/// We split `Handshake` into its components before calling this function, to avoid infectious
568/// `Sync` bounds on the returned future.
569///
570/// Returns the [`VersionMessage`] sent by the remote peer, and the [`Version`] negotiated with the
571/// remote peer, inside a [`ConnectionInfo`] struct.
572#[allow(clippy::too_many_arguments)]
573pub async fn negotiate_version<PeerTransport>(
574    peer_conn: &mut Framed<PeerTransport, Codec>,
575    connected_addr: &ConnectedAddr,
576    config: Config,
577    nonces: Arc<futures::lock::Mutex<IndexSet<Nonce>>>,
578    user_agent: String,
579    our_services: PeerServices,
580    relay: bool,
581    mut minimum_peer_version: MinimumPeerVersion<impl ChainTip>,
582) -> Result<Arc<ConnectionInfo>, HandshakeError>
583where
584    PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
585{
586    // Create a random nonce for this connection
587    let local_nonce = Nonce::default();
588
589    // Insert the nonce for this handshake into the shared nonce set.
590    // Each connection has its own connection state, and handshakes execute concurrently.
591    //
592    // # Correctness
593    //
594    // It is ok to wait for the lock here, because handshakes have a short
595    // timeout, and the async mutex will be released when the task times
596    // out.
597    {
598        let mut locked_nonces = nonces.lock().await;
599
600        // Duplicate nonces are very rare, because they require a 64-bit random number collision,
601        // and the nonce set is limited to a few hundred entries.
602        let is_unique_nonce = locked_nonces.insert(local_nonce);
603        if !is_unique_nonce {
604            return Err(HandshakeError::LocalDuplicateNonce);
605        }
606
607        // # Security
608        //
609        // Limit the amount of memory used for nonces.
610        // Nonces can be left in the set if the connection fails or times out between
611        // the nonce being inserted, and it being removed.
612        //
613        // Zebra has strict connection limits, so we limit the number of nonces to
614        // the configured connection limit.
615        // This is a tradeoff between:
616        // - avoiding memory denial of service attacks which make large numbers of connections,
617        //   for example, 100 failed inbound connections takes 1 second.
618        // - memory usage: 16 bytes per `Nonce`, 3.2 kB for 200 nonces
619        // - collision probability: two hundred 64-bit nonces have a very low collision probability
620        //   <https://en.wikipedia.org/wiki/Birthday_problem#Probability_of_a_shared_birthday_(collision)>
621        while locked_nonces.len() > config.peerset_total_connection_limit() {
622            locked_nonces.shift_remove_index(0);
623        }
624
625        std::mem::drop(locked_nonces);
626    }
627
628    // Don't leak our exact clock skew to our peers. On the other hand,
629    // we can't deviate too much, or zcashd will get confused.
630    // Inspection of the zcashd source code reveals that the timestamp
631    // is only ever used at the end of parsing the version message, in
632    //
633    // pfrom->nTimeOffset = timeWarning.AddTimeData(pfrom->addr, nTime, GetTime());
634    //
635    // AddTimeData is defined in src/timedata.cpp and is a no-op as long
636    // as the difference between the specified timestamp and the
637    // zcashd's local time is less than TIMEDATA_WARNING_THRESHOLD, set
638    // to 10 * 60 seconds (10 minutes).
639    //
640    // nTimeOffset is peer metadata that is never used, except for
641    // statistics.
642    //
643    // To try to stay within the range where zcashd will ignore our clock skew,
644    // truncate the timestamp to the nearest 5 minutes.
645    let now = Utc::now().timestamp();
646    let timestamp = Utc
647        .timestamp_opt(now - now.rem_euclid(5 * 60), 0)
648        .single()
649        .expect("in-range number of seconds and valid nanosecond");
650
651    let (their_addr, our_services, our_listen_addr) = match connected_addr {
652        // Version messages require an address, so we use
653        // an unspecified address for Isolated connections
654        Isolated => {
655            let unspec_ipv4 = get_unspecified_ipv4_addr(config.network);
656            (unspec_ipv4.into(), PeerServices::empty(), unspec_ipv4)
657        }
658        _ => {
659            let their_addr = connected_addr
660                .get_transient_addr()
661                .expect("non-Isolated connections have a remote addr");
662
663            // Include the configured external address in our version message, if any, otherwise, include our listen address.
664            let advertise_addr = match config.external_addr {
665                Some(external_addr) => {
666                    info!(?their_addr, ?config.listen_addr, "using external address for Version messages");
667                    external_addr
668                }
669                None => config.listen_addr,
670            };
671
672            (their_addr, our_services, advertise_addr)
673        }
674    };
675
676    let our_version = VersionMessage {
677        version: constants::CURRENT_NETWORK_PROTOCOL_VERSION,
678        services: our_services,
679        timestamp,
680        address_recv: AddrInVersion::new(their_addr, PeerServices::NODE_NETWORK),
681        // TODO: detect external address (#1893)
682        address_from: AddrInVersion::new(our_listen_addr, our_services),
683        nonce: local_nonce,
684        user_agent: user_agent.clone(),
685        start_height: minimum_peer_version.chain_tip_height(),
686        relay,
687    }
688    .into();
689
690    debug!(?our_version, "sending initial version message");
691    peer_conn.send(our_version).await?;
692
693    let mut remote_msg = peer_conn
694        .next()
695        .await
696        .ok_or(HandshakeError::ConnectionClosed)??;
697
698    // Wait for next message if the one we got is not Version
699    let remote: VersionMessage = loop {
700        match remote_msg {
701            Message::Version(version_message) => {
702                debug!(?version_message, "got version message from remote peer");
703                break version_message;
704            }
705            _ => {
706                remote_msg = peer_conn
707                    .next()
708                    .await
709                    .ok_or(HandshakeError::ConnectionClosed)??;
710                debug!(?remote_msg, "ignoring non-version message from remote peer");
711            }
712        }
713    };
714
715    let remote_address_services = remote.address_from.untrusted_services();
716    if remote_address_services != remote.services {
717        info!(
718            ?remote.services,
719            ?remote_address_services,
720            ?remote.user_agent,
721            "peer with inconsistent version services and version address services",
722        );
723    }
724
725    // Check for nonce reuse, indicating self-connection
726    //
727    // # Correctness
728    //
729    // We must wait for the lock before we continue with the connection, to avoid
730    // self-connection. If the connection times out, the async lock will be
731    // released.
732    //
733    // # Security
734    //
735    // We don't remove the nonce here, because peers that observe our network traffic could
736    // maliciously remove nonces, and force us to make self-connections.
737    let nonce_reuse = nonces.lock().await.contains(&remote.nonce);
738    if nonce_reuse {
739        info!(?connected_addr, "rejecting self-connection attempt");
740        Err(HandshakeError::RemoteNonceReuse)?;
741    }
742
743    // # Security
744    //
745    // Reject connections to peers on old versions, because they might not know about all
746    // network upgrades and could lead to chain forks or slower block propagation.
747    let min_version = minimum_peer_version.current();
748
749    if remote.version < min_version {
750        debug!(
751            remote_ip = ?their_addr,
752            ?remote.version,
753            ?min_version,
754            ?remote.user_agent,
755            "disconnecting from peer with obsolete network protocol version",
756        );
757
758        // the value is the number of rejected handshakes, by peer IP and protocol version
759        metrics::counter!(
760            "zcash.net.peers.obsolete",
761            "remote_ip" => their_addr.to_string(),
762            "remote_version" => remote.version.to_string(),
763            "min_version" => min_version.to_string(),
764            "user_agent" => remote.user_agent.clone(),
765        )
766        .increment(1);
767
768        // the value is the remote version of the most recent rejected handshake from each peer
769        metrics::gauge!(
770            "zcash.net.peers.version.obsolete",
771            "remote_ip" => their_addr.to_string(),
772        )
773        .set(remote.version.0 as f64);
774
775        // Disconnect if peer is using an obsolete version.
776        return Err(HandshakeError::ObsoleteVersion(remote.version));
777    }
778
779    let negotiated_version = min(constants::CURRENT_NETWORK_PROTOCOL_VERSION, remote.version);
780
781    // Limit containing struct size, and avoid multiple duplicates of 300+ bytes of data.
782    let connection_info = Arc::new(ConnectionInfo {
783        connected_addr: *connected_addr,
784        remote,
785        negotiated_version,
786    });
787
788    debug!(
789        remote_ip = ?their_addr,
790        ?connection_info.remote.version,
791        ?negotiated_version,
792        ?min_version,
793        ?connection_info.remote.user_agent,
794        "negotiated network protocol version with peer",
795    );
796
797    // the value is the number of connected handshakes, by peer IP and protocol version
798    metrics::counter!(
799        "zcash.net.peers.connected",
800        "remote_ip" => their_addr.to_string(),
801        "remote_version" => connection_info.remote.version.to_string(),
802        "negotiated_version" => negotiated_version.to_string(),
803        "min_version" => min_version.to_string(),
804        "user_agent" => connection_info.remote.user_agent.clone(),
805    )
806    .increment(1);
807
808    // the value is the remote version of the most recent connected handshake from each peer
809    metrics::gauge!(
810        "zcash.net.peers.version.connected",
811        "remote_ip" => their_addr.to_string(),
812    )
813    .set(connection_info.remote.version.0 as f64);
814
815    peer_conn.send(Message::Verack).await?;
816
817    let mut remote_msg = peer_conn
818        .next()
819        .await
820        .ok_or(HandshakeError::ConnectionClosed)??;
821
822    // Wait for next message if the one we got is not Verack
823    loop {
824        match remote_msg {
825            Message::Verack => {
826                debug!(?remote_msg, "got verack message from remote peer");
827                break;
828            }
829            _ => {
830                remote_msg = peer_conn
831                    .next()
832                    .await
833                    .ok_or(HandshakeError::ConnectionClosed)??;
834                debug!(?remote_msg, "ignoring non-verack message from remote peer");
835            }
836        }
837    }
838
839    Ok(connection_info)
840}
841
842/// A handshake request.
843/// Contains the information needed to handshake with the peer.
844pub struct HandshakeRequest<PeerTransport>
845where
846    PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
847{
848    /// The tokio [`TcpStream`](tokio::net::TcpStream) or Tor
849    /// `arti_client::DataStream` to the peer.
850    // Use [`arti_client::DataStream`] when #5492 is done.
851    pub data_stream: PeerTransport,
852
853    /// The address of the peer, and other related information.
854    pub connected_addr: ConnectedAddr,
855
856    /// A connection tracker that reduces the open connection count when dropped.
857    ///
858    /// Used to limit the number of open connections in Zebra.
859    pub connection_tracker: ConnectionTracker,
860}
861
862impl<S, PeerTransport, C> Service<HandshakeRequest<PeerTransport>> for Handshake<S, C>
863where
864    S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
865    S::Future: Send,
866    C: ChainTip + Clone + Send + 'static,
867    PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
868{
869    type Response = Client;
870    type Error = BoxError;
871    type Future =
872        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
873
874    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
875        Poll::Ready(Ok(()))
876    }
877
878    fn call(&mut self, req: HandshakeRequest<PeerTransport>) -> Self::Future {
879        let HandshakeRequest {
880            data_stream,
881            connected_addr,
882            connection_tracker,
883        } = req;
884
885        let negotiator_span = debug_span!("negotiator", peer = ?connected_addr);
886        // set the peer connection span's parent to the global span, as it
887        // should exist independently of its creation source (inbound
888        // connection, crawler, initial peer, ...)
889        let connection_span =
890            span!(parent: &self.parent_span, Level::INFO, "", peer = ?connected_addr);
891
892        // Clone these upfront, so they can be moved into the future.
893        let nonces = self.nonces.clone();
894        let inbound_service = self.inbound_service.clone();
895        let address_book_updater = self.address_book_updater.clone();
896        let inv_collector = self.inv_collector.clone();
897        let config = self.config.clone();
898        let user_agent = self.user_agent.clone();
899        let our_services = self.our_services;
900        let relay = self.relay;
901        let minimum_peer_version = self.minimum_peer_version.clone();
902
903        // # Security
904        //
905        // `zebra_network::init()` implements a connection timeout on this future.
906        // Any code outside this future does not have a timeout.
907        let fut = async move {
908            debug!(
909                addr = ?connected_addr,
910                "negotiating protocol version with remote peer"
911            );
912
913            let mut peer_conn = Framed::new(
914                data_stream,
915                Codec::builder()
916                    .for_network(&config.network)
917                    .with_metrics_addr_label(connected_addr.get_transient_addr_label())
918                    .finish(),
919            );
920
921            let connection_info = negotiate_version(
922                &mut peer_conn,
923                &connected_addr,
924                config,
925                nonces,
926                user_agent,
927                our_services,
928                relay,
929                minimum_peer_version,
930            )
931            .await?;
932
933            let remote_services = connection_info.remote.services;
934
935            // The handshake succeeded: update the peer status from AttemptPending to Responded,
936            // and send initial connection info.
937            if let Some(book_addr) = connected_addr.get_address_book_addr() {
938                // the collector doesn't depend on network activity,
939                // so this await should not hang
940                let _ = address_book_updater
941                    .send(MetaAddr::new_connected(
942                        book_addr,
943                        &remote_services,
944                        connected_addr.is_inbound(),
945                    ))
946                    .await;
947            }
948
949            // Reconfigure the codec to use the negotiated version.
950            //
951            // TODO: The tokio documentation says not to do this while any frames are still being processed.
952            // Since we don't know that here, another way might be to release the tcp
953            // stream from the unversioned Framed wrapper and construct a new one with a versioned codec.
954            let bare_codec = peer_conn.codec_mut();
955            bare_codec.reconfigure_version(connection_info.negotiated_version);
956
957            debug!("constructing client, spawning server");
958
959            // These channels communicate between the inbound and outbound halves of the connection,
960            // and between the different connection tasks. We create separate tasks and channels
961            // for each new connection.
962            let (server_tx, server_rx) = futures::channel::mpsc::channel(0);
963            let (shutdown_tx, shutdown_rx) = oneshot::channel();
964            let error_slot = ErrorSlot::default();
965
966            let (peer_tx, peer_rx) = peer_conn.split();
967
968            // Instrument the peer's rx and tx streams.
969
970            let inner_conn_span = connection_span.clone();
971            let peer_tx = peer_tx.with(move |msg: Message| {
972                let span = debug_span!(parent: inner_conn_span.clone(), "outbound_metric");
973                // Add a metric for outbound messages.
974                metrics::counter!(
975                    "zcash.net.out.messages",
976                    "command" => msg.command(),
977                    "addr" => connected_addr.get_transient_addr_label(),
978                )
979                .increment(1);
980                // We need to use future::ready rather than an async block here,
981                // because we need the sink to be Unpin, and the With<Fut, ...>
982                // returned by .with is Unpin only if Fut is Unpin, and the
983                // futures generated by async blocks are not Unpin.
984                future::ready(Ok(msg)).instrument(span)
985            });
986
987            // CORRECTNESS
988            //
989            // Ping/Pong messages and every error must update the peer address state via
990            // the inbound_ts_collector.
991            //
992            // The heartbeat task sends regular Ping/Pong messages,
993            // and it ends the connection if the heartbeat times out.
994            // So we can just track peer activity based on Ping and Pong.
995            // (This significantly improves performance, by reducing time system calls.)
996            let inbound_ts_collector = address_book_updater.clone();
997            let inbound_inv_collector = inv_collector.clone();
998            let ts_inner_conn_span = connection_span.clone();
999            let inv_inner_conn_span = connection_span.clone();
1000            let peer_rx = peer_rx
1001                .then(move |msg| {
1002                    // Add a metric for inbound messages and errors.
1003                    // Fire a timestamp or failure event.
1004                    let inbound_ts_collector = inbound_ts_collector.clone();
1005                    let span =
1006                        debug_span!(parent: ts_inner_conn_span.clone(), "inbound_ts_collector");
1007
1008                    async move {
1009                        match &msg {
1010                            Ok(msg) => {
1011                                metrics::counter!(
1012                                    "zcash.net.in.messages",
1013                                    "command" => msg.command(),
1014                                    "addr" => connected_addr.get_transient_addr_label(),
1015                                )
1016                                .increment(1);
1017
1018                                // # Security
1019                                //
1020                                // Peer messages are not rate-limited, so we can't send anything
1021                                // to a shared channel or do anything expensive here.
1022                            }
1023                            Err(err) => {
1024                                metrics::counter!(
1025                                    "zebra.net.in.errors",
1026                                    "error" => err.to_string(),
1027                                    "addr" => connected_addr.get_transient_addr_label(),
1028                                )
1029                                .increment(1);
1030
1031                                // # Security
1032                                //
1033                                // Peer errors are rate-limited because:
1034                                // - opening connections is rate-limited
1035                                // - the number of connections is limited
1036                                // - after the first error, the peer is disconnected
1037                                if let Some(book_addr) = connected_addr.get_address_book_addr() {
1038                                    let _ = inbound_ts_collector
1039                                        .send(MetaAddr::new_errored(book_addr, remote_services))
1040                                        .await;
1041                                }
1042                            }
1043                        }
1044                        msg
1045                    }
1046                    .instrument(span)
1047                })
1048                .then(move |msg| {
1049                    let inbound_inv_collector = inbound_inv_collector.clone();
1050                    let span = debug_span!(parent: inv_inner_conn_span.clone(), "inventory_filter");
1051                    register_inventory_status(msg, connected_addr, inbound_inv_collector)
1052                        .instrument(span)
1053                })
1054                .boxed();
1055
1056            // If we've learned potential peer addresses from the inbound connection remote address
1057            // or the handshake version message, add those addresses to the peer cache for this
1058            // peer.
1059            //
1060            // # Security
1061            //
1062            // We can't add these alternate addresses directly to the address book. If we did,
1063            // malicious peers could interfere with the address book state of other peers by
1064            // providing their addresses in `Version` messages. Or they could fill the address book
1065            // with fake addresses.
1066            //
1067            // These peer addresses are rate-limited because:
1068            // - opening connections is rate-limited
1069            // - these addresses are put in the peer address cache
1070            // - the peer address cache is only used when Zebra requests addresses from that peer
1071            let remote_canonical_addr = connection_info.remote.address_from.addr();
1072            let alternate_addrs = connected_addr
1073                .get_alternate_addrs(remote_canonical_addr)
1074                .map(|addr| {
1075                    // Assume the connecting node is a server node, and it's available now.
1076                    MetaAddr::new_gossiped_meta_addr(
1077                        addr,
1078                        PeerServices::NODE_NETWORK,
1079                        DateTime32::now(),
1080                    )
1081                });
1082
1083            let server = Connection::new(
1084                inbound_service,
1085                server_rx,
1086                error_slot.clone(),
1087                peer_tx,
1088                connection_tracker,
1089                connection_info.clone(),
1090                alternate_addrs.collect(),
1091            );
1092
1093            let connection_task = tokio::spawn(
1094                server
1095                    .run(peer_rx)
1096                    .instrument(connection_span.clone())
1097                    .boxed(),
1098            );
1099
1100            let heartbeat_task = tokio::spawn(
1101                send_periodic_heartbeats_with_shutdown_handle(
1102                    connected_addr,
1103                    shutdown_rx,
1104                    server_tx.clone(),
1105                    address_book_updater.clone(),
1106                )
1107                .instrument(tracing::debug_span!(parent: connection_span, "heartbeat"))
1108                .boxed(),
1109            );
1110
1111            let client = Client {
1112                connection_info,
1113                shutdown_tx: Some(shutdown_tx),
1114                server_tx,
1115                inv_collector,
1116                error_slot,
1117                connection_task,
1118                heartbeat_task,
1119            };
1120
1121            Ok(client)
1122        };
1123
1124        // Correctness: As a defence-in-depth against hangs, wrap the entire handshake in a timeout.
1125        let fut = timeout(constants::HANDSHAKE_TIMEOUT, fut);
1126
1127        // Spawn a new task to drive this handshake, forwarding panics to the calling task.
1128        tokio::spawn(fut.instrument(negotiator_span))
1129            .map(
1130                |join_result: Result<
1131                    Result<Result<Client, HandshakeError>, error::Elapsed>,
1132                    JoinError,
1133                >| {
1134                    match join_result {
1135                        Ok(Ok(Ok(connection_client))) => Ok(connection_client),
1136                        Ok(Ok(Err(handshake_error))) => Err(handshake_error.into()),
1137                        Ok(Err(timeout_error)) => Err(timeout_error.into()),
1138                        Err(join_error) => match join_error.try_into_panic() {
1139                            // Forward panics to the calling task
1140                            Ok(panic_reason) => panic::resume_unwind(panic_reason),
1141                            Err(join_error) => Err(join_error.into()),
1142                        },
1143                    }
1144                },
1145            )
1146            .boxed()
1147    }
1148}
1149
1150/// Register any advertised or missing inventory in `msg` for `connected_addr`.
1151pub(crate) async fn register_inventory_status(
1152    msg: Result<Message, SerializationError>,
1153    connected_addr: ConnectedAddr,
1154    inv_collector: broadcast::Sender<InventoryChange>,
1155) -> Result<Message, SerializationError> {
1156    match (&msg, connected_addr.get_transient_addr()) {
1157        (Ok(Message::Inv(advertised)), Some(transient_addr)) => {
1158            // We ignore inventory messages with more than one
1159            // block, because they are most likely replies to a
1160            // query, rather than a newly gossiped block.
1161            //
1162            // (We process inventory messages with any number of
1163            // transactions.)
1164            //
1165            // https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html#inventory-monitoring
1166            //
1167            // Note: zcashd has a bug where it merges queued inv messages of
1168            // the same or different types. Zebra compensates by sending `notfound`
1169            // responses to the inv collector. (#2156, #1768)
1170            //
1171            // (We can't split `inv`s, because that fills the inventory registry
1172            // with useless entries that the whole network has, making it large and slow.)
1173            match advertised.as_slice() {
1174                [advertised @ InventoryHash::Block(_)] => {
1175                    debug!(
1176                        ?advertised,
1177                        "registering gossiped advertised block inventory for peer"
1178                    );
1179
1180                    // The peer set and inv collector use the peer's remote
1181                    // address as an identifier
1182                    // If all receivers have been dropped, `send` returns an error.
1183                    // When that happens, Zebra is shutting down, so we want to ignore this error.
1184                    let _ = inv_collector
1185                        .send(InventoryChange::new_available(*advertised, transient_addr));
1186                }
1187                advertised => {
1188                    let advertised = advertised
1189                        .iter()
1190                        .filter(|advertised| advertised.unmined_tx_id().is_some());
1191
1192                    debug!(
1193                        ?advertised,
1194                        "registering advertised unmined transaction inventory for peer",
1195                    );
1196
1197                    if let Some(change) =
1198                        InventoryChange::new_available_multi(advertised, transient_addr)
1199                    {
1200                        // Ignore channel errors that should only happen during shutdown.
1201                        let _ = inv_collector.send(change);
1202                    }
1203                }
1204            }
1205        }
1206
1207        (Ok(Message::NotFound(missing)), Some(transient_addr)) => {
1208            // Ignore Errors and the unsupported FilteredBlock type
1209            let missing = missing.iter().filter(|missing| {
1210                missing.unmined_tx_id().is_some() || missing.block_hash().is_some()
1211            });
1212
1213            debug!(?missing, "registering missing inventory for peer");
1214
1215            if let Some(change) = InventoryChange::new_missing_multi(missing, transient_addr) {
1216                let _ = inv_collector.send(change);
1217            }
1218        }
1219        _ => {}
1220    }
1221
1222    msg
1223}
1224
1225/// Send periodical heartbeats to `server_tx`, and update the peer status through
1226/// `heartbeat_ts_collector`.
1227///
1228/// # Correctness
1229///
1230/// To prevent hangs:
1231/// - every await that depends on the network must have a timeout (or interval)
1232/// - every error/shutdown must update the address book state and return
1233///
1234/// The address book state can be updated via `ClientRequest.tx`, or the
1235/// heartbeat_ts_collector.
1236///
1237/// Returning from this function terminates the connection's heartbeat task.
1238async fn send_periodic_heartbeats_with_shutdown_handle(
1239    connected_addr: ConnectedAddr,
1240    shutdown_rx: oneshot::Receiver<CancelHeartbeatTask>,
1241    server_tx: futures::channel::mpsc::Sender<ClientRequest>,
1242    heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>,
1243) -> Result<(), BoxError> {
1244    use futures::future::Either;
1245
1246    let heartbeat_run_loop = send_periodic_heartbeats_run_loop(
1247        connected_addr,
1248        server_tx,
1249        heartbeat_ts_collector.clone(),
1250    );
1251
1252    pin_mut!(shutdown_rx);
1253    pin_mut!(heartbeat_run_loop);
1254
1255    // CORRECTNESS
1256    //
1257    // Currently, select prefers the first future if multiple
1258    // futures are ready.
1259    //
1260    // Starvation is impossible here, because interval has a
1261    // slow rate, and shutdown is a oneshot. If both futures
1262    // are ready, we want the shutdown to take priority over
1263    // sending a useless heartbeat.
1264    match future::select(shutdown_rx, heartbeat_run_loop).await {
1265        Either::Left((Ok(CancelHeartbeatTask), _unused_run_loop)) => {
1266            tracing::trace!("shutting down because Client requested shut down");
1267            handle_heartbeat_shutdown(
1268                PeerError::ClientCancelledHeartbeatTask,
1269                &heartbeat_ts_collector,
1270                &connected_addr,
1271            )
1272            .await
1273        }
1274        Either::Left((Err(oneshot::Canceled), _unused_run_loop)) => {
1275            tracing::trace!("shutting down because Client was dropped");
1276            handle_heartbeat_shutdown(
1277                PeerError::ClientDropped,
1278                &heartbeat_ts_collector,
1279                &connected_addr,
1280            )
1281            .await
1282        }
1283        Either::Right((result, _unused_shutdown)) => {
1284            tracing::trace!("shutting down due to heartbeat failure");
1285            // heartbeat_timeout() already send an error on the timestamp collector channel
1286
1287            result
1288        }
1289    }
1290}
1291
1292/// Send periodical heartbeats to `server_tx`, and update the peer status through
1293/// `heartbeat_ts_collector`.
1294///
1295/// See `send_periodic_heartbeats_with_shutdown_handle` for details.
1296async fn send_periodic_heartbeats_run_loop(
1297    connected_addr: ConnectedAddr,
1298    mut server_tx: futures::channel::mpsc::Sender<ClientRequest>,
1299    heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>,
1300) -> Result<(), BoxError> {
1301    // Don't send the first heartbeat immediately - we've just completed the handshake!
1302    let mut interval = tokio::time::interval_at(
1303        Instant::now() + constants::HEARTBEAT_INTERVAL,
1304        constants::HEARTBEAT_INTERVAL,
1305    );
1306    // If the heartbeat is delayed, also delay all future heartbeats.
1307    // (Shorter heartbeat intervals just add load, without any benefit.)
1308    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1309
1310    let mut interval_stream = IntervalStream::new(interval);
1311
1312    while let Some(_instant) = interval_stream.next().await {
1313        // We've reached another heartbeat interval without
1314        // shutting down, so do a heartbeat request.
1315        let heartbeat = send_one_heartbeat(&mut server_tx);
1316        heartbeat_timeout(heartbeat, &heartbeat_ts_collector, &connected_addr).await?;
1317
1318        // # Security
1319        //
1320        // Peer heartbeats are rate-limited because:
1321        // - opening connections is rate-limited
1322        // - the number of connections is limited
1323        // - Zebra initiates each heartbeat using a timer
1324        if let Some(book_addr) = connected_addr.get_address_book_addr() {
1325            // the collector doesn't depend on network activity,
1326            // so this await should not hang
1327            let _ = heartbeat_ts_collector
1328                .send(MetaAddr::new_responded(book_addr))
1329                .await;
1330        }
1331    }
1332
1333    unreachable!("unexpected IntervalStream termination")
1334}
1335
1336/// Send one heartbeat using `server_tx`.
1337async fn send_one_heartbeat(
1338    server_tx: &mut futures::channel::mpsc::Sender<ClientRequest>,
1339) -> Result<(), BoxError> {
1340    // We just reached a heartbeat interval, so start sending
1341    // a heartbeat.
1342    let (tx, rx) = oneshot::channel();
1343
1344    // Try to send the heartbeat request
1345    let request = Request::Ping(Nonce::default());
1346    tracing::trace!(?request, "queueing heartbeat request");
1347    match server_tx.try_send(ClientRequest {
1348        request,
1349        tx,
1350        // we're not requesting inventory, so we don't need to update the registry
1351        inv_collector: None,
1352        transient_addr: None,
1353        span: tracing::Span::current(),
1354    }) {
1355        Ok(()) => {}
1356        Err(e) => {
1357            if e.is_disconnected() {
1358                Err(PeerError::ConnectionClosed)?;
1359            } else if e.is_full() {
1360                // Send the message when the Client becomes ready.
1361                // If sending takes too long, the heartbeat timeout will elapse
1362                // and close the connection, reducing our load to busy peers.
1363                server_tx.send(e.into_inner()).await?;
1364            } else {
1365                // we need to map unexpected error types to PeerErrors
1366                warn!(?e, "unexpected try_send error");
1367                Err(e)?;
1368            };
1369        }
1370    }
1371
1372    // Flush the heartbeat request from the queue
1373    server_tx.flush().await?;
1374    tracing::trace!("sent heartbeat request");
1375
1376    // Heartbeats are checked internally to the
1377    // connection logic, but we need to wait on the
1378    // response to avoid canceling the request.
1379    rx.await??;
1380    tracing::trace!("got heartbeat response");
1381
1382    Ok(())
1383}
1384
1385/// Wrap `fut` in a timeout, handing any inner or outer errors using
1386/// `handle_heartbeat_error`.
1387async fn heartbeat_timeout<F, T>(
1388    fut: F,
1389    address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1390    connected_addr: &ConnectedAddr,
1391) -> Result<T, BoxError>
1392where
1393    F: Future<Output = Result<T, BoxError>>,
1394{
1395    let t = match timeout(constants::HEARTBEAT_INTERVAL, fut).await {
1396        Ok(inner_result) => {
1397            handle_heartbeat_error(inner_result, address_book_updater, connected_addr).await?
1398        }
1399        Err(elapsed) => {
1400            handle_heartbeat_error(Err(elapsed), address_book_updater, connected_addr).await?
1401        }
1402    };
1403
1404    Ok(t)
1405}
1406
1407/// If `result.is_err()`, mark `connected_addr` as failed using `address_book_updater`.
1408async fn handle_heartbeat_error<T, E>(
1409    result: Result<T, E>,
1410    address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1411    connected_addr: &ConnectedAddr,
1412) -> Result<T, E>
1413where
1414    E: std::fmt::Debug,
1415{
1416    match result {
1417        Ok(t) => Ok(t),
1418        Err(err) => {
1419            tracing::debug!(?err, "heartbeat error, shutting down");
1420
1421            // # Security
1422            //
1423            // Peer errors and shutdowns are rate-limited because:
1424            // - opening connections is rate-limited
1425            // - the number of connections is limited
1426            // - after the first error or shutdown, the peer is disconnected
1427            if let Some(book_addr) = connected_addr.get_address_book_addr() {
1428                let _ = address_book_updater
1429                    .send(MetaAddr::new_errored(book_addr, None))
1430                    .await;
1431            }
1432            Err(err)
1433        }
1434    }
1435}
1436
1437/// Mark `connected_addr` as shut down using `address_book_updater`.
1438async fn handle_heartbeat_shutdown(
1439    peer_error: PeerError,
1440    address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1441    connected_addr: &ConnectedAddr,
1442) -> Result<(), BoxError> {
1443    tracing::debug!(?peer_error, "client shutdown, shutting down heartbeat");
1444
1445    if let Some(book_addr) = connected_addr.get_address_book_addr() {
1446        let _ = address_book_updater
1447            .send(MetaAddr::new_shutdown(book_addr))
1448            .await;
1449    }
1450
1451    Err(peer_error.into())
1452}