zebra_network/peer/handshake.rs
1//! Initial [`Handshake`]s with Zebra peers over a `PeerTransport`.
2
3use std::{
4 cmp::min,
5 fmt,
6 future::Future,
7 net::{Ipv4Addr, SocketAddr},
8 panic,
9 pin::Pin,
10 sync::Arc,
11 task::{Context, Poll},
12};
13
14use chrono::{TimeZone, Utc};
15use futures::{channel::oneshot, future, pin_mut, FutureExt, SinkExt, StreamExt};
16use indexmap::IndexSet;
17use tokio::{
18 io::{AsyncRead, AsyncWrite},
19 sync::broadcast,
20 task::JoinError,
21 time::{error, timeout, Instant},
22};
23use tokio_stream::wrappers::IntervalStream;
24use tokio_util::codec::Framed;
25use tower::Service;
26use tracing::{span, Level, Span};
27use tracing_futures::Instrument;
28
29use zebra_chain::{
30 chain_tip::{ChainTip, NoChainTip},
31 parameters::Network,
32 serialization::{DateTime32, SerializationError},
33};
34
35use crate::{
36 constants,
37 meta_addr::MetaAddrChange,
38 peer::{
39 CancelHeartbeatTask, Client, ClientRequest, Connection, ErrorSlot, HandshakeError,
40 MinimumPeerVersion, PeerError,
41 },
42 peer_set::{ConnectionTracker, InventoryChange},
43 protocol::{
44 external::{types::*, AddrInVersion, Codec, InventoryHash, Message},
45 internal::{Request, Response},
46 },
47 types::MetaAddr,
48 BoxError, Config, PeerSocketAddr, VersionMessage,
49};
50
51#[cfg(test)]
52mod tests;
53
54/// A [`Service`] that handshakes with a remote peer and constructs a
55/// client/server pair.
56///
57/// CORRECTNESS
58///
59/// To avoid hangs, each handshake (or its connector) should be:
60/// - launched in a separate task, and
61/// - wrapped in a timeout.
62pub struct Handshake<S, C = NoChainTip>
63where
64 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
65 S::Future: Send,
66 C: ChainTip + Clone + Send + 'static,
67{
68 config: Config,
69 user_agent: String,
70 our_services: PeerServices,
71 relay: bool,
72
73 inbound_service: S,
74 address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
75 inv_collector: broadcast::Sender<InventoryChange>,
76 minimum_peer_version: MinimumPeerVersion<C>,
77 nonces: Arc<futures::lock::Mutex<IndexSet<Nonce>>>,
78
79 parent_span: Span,
80}
81
82impl<S, C> fmt::Debug for Handshake<S, C>
83where
84 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
85 S::Future: Send,
86 C: ChainTip + Clone + Send + 'static,
87{
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 // skip the channels, they don't tell us anything useful
90 f.debug_struct(std::any::type_name::<Handshake<S, C>>())
91 .field("config", &self.config)
92 .field("user_agent", &self.user_agent)
93 .field("our_services", &self.our_services)
94 .field("relay", &self.relay)
95 .field("minimum_peer_version", &self.minimum_peer_version)
96 .field("parent_span", &self.parent_span)
97 .finish()
98 }
99}
100
101impl<S, C> Clone for Handshake<S, C>
102where
103 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
104 S::Future: Send,
105 C: ChainTip + Clone + Send + 'static,
106{
107 fn clone(&self) -> Self {
108 Self {
109 config: self.config.clone(),
110 user_agent: self.user_agent.clone(),
111 our_services: self.our_services,
112 relay: self.relay,
113 inbound_service: self.inbound_service.clone(),
114 address_book_updater: self.address_book_updater.clone(),
115 inv_collector: self.inv_collector.clone(),
116 minimum_peer_version: self.minimum_peer_version.clone(),
117 nonces: self.nonces.clone(),
118 parent_span: self.parent_span.clone(),
119 }
120 }
121}
122
123/// The metadata for a peer connection.
124#[derive(Clone, Debug, PartialEq, Eq)]
125pub struct ConnectionInfo {
126 /// The connected peer address, if known.
127 /// This address might not be valid for outbound connections.
128 ///
129 /// Peers can be connected via a transient inbound or proxy address,
130 /// which will appear as the connected address to the OS and Zebra.
131 pub connected_addr: ConnectedAddr,
132
133 /// The network protocol [`VersionMessage`] sent by the remote peer.
134 pub remote: VersionMessage,
135
136 /// The network protocol version negotiated with the remote peer.
137 ///
138 /// Derived from `remote.version` and the
139 /// [current `zebra_network` protocol version](constants::CURRENT_NETWORK_PROTOCOL_VERSION).
140 pub negotiated_version: Version,
141}
142
143/// The peer address that we are handshaking with.
144///
145/// Typically, we can rely on outbound addresses, but inbound addresses don't
146/// give us enough information to reconnect to that peer.
147#[derive(Copy, Clone, PartialEq, Eq)]
148pub enum ConnectedAddr {
149 /// The address we used to make a direct outbound connection.
150 ///
151 /// In an honest network, a Zcash peer is listening on this exact address
152 /// and port.
153 OutboundDirect {
154 /// The connected outbound remote address and port.
155 addr: PeerSocketAddr,
156 },
157
158 /// The address we received from the OS, when a remote peer directly
159 /// connected to our Zcash listener port.
160 ///
161 /// In an honest network, a Zcash peer might be listening on this address,
162 /// if its outbound address is the same as its listener address. But the port
163 /// is an ephemeral outbound TCP port, not a listener port.
164 InboundDirect {
165 /// The connected inbound remote address and ephemeral port.
166 ///
167 /// The IP address might be the address of a Zcash peer, but the port is an ephemeral port.
168 addr: PeerSocketAddr,
169 },
170
171 /// The proxy address we used to make an outbound connection.
172 ///
173 /// The proxy address can be used by many connections, but our own ephemeral
174 /// outbound address and port can be used as an identifier for the duration
175 /// of this connection.
176 OutboundProxy {
177 /// The remote address and port of the proxy.
178 proxy_addr: SocketAddr,
179
180 /// The local address and transient port we used to connect to the proxy.
181 transient_local_addr: SocketAddr,
182 },
183
184 /// The address we received from the OS, when a remote peer connected via an
185 /// inbound proxy.
186 ///
187 /// The proxy's ephemeral outbound address can be used as an identifier for
188 /// the duration of this connection.
189 InboundProxy {
190 /// The local address and transient port we used to connect to the proxy.
191 transient_addr: SocketAddr,
192 },
193
194 /// An isolated connection, where we deliberately don't have any connection metadata.
195 Isolated,
196 //
197 // TODO: handle Tor onion addresses
198}
199
200/// Get an unspecified IPv4 address for `network`
201pub fn get_unspecified_ipv4_addr(network: Network) -> SocketAddr {
202 (Ipv4Addr::UNSPECIFIED, network.default_port()).into()
203}
204
205use ConnectedAddr::*;
206
207impl ConnectedAddr {
208 /// Returns a new outbound directly connected addr.
209 pub fn new_outbound_direct(addr: PeerSocketAddr) -> ConnectedAddr {
210 OutboundDirect { addr }
211 }
212
213 /// Returns a new inbound directly connected addr.
214 pub fn new_inbound_direct(addr: PeerSocketAddr) -> ConnectedAddr {
215 InboundDirect { addr }
216 }
217
218 /// Returns a new outbound connected addr via `proxy`.
219 ///
220 /// `local_addr` is the ephemeral local address of the connection.
221 #[allow(unused)]
222 pub fn new_outbound_proxy(proxy: SocketAddr, local_addr: SocketAddr) -> ConnectedAddr {
223 OutboundProxy {
224 proxy_addr: proxy,
225 transient_local_addr: local_addr,
226 }
227 }
228
229 /// Returns a new inbound connected addr from `proxy`.
230 //
231 // TODO: distinguish between direct listeners and proxy listeners in the
232 // rest of zebra-network
233 #[allow(unused)]
234 pub fn new_inbound_proxy(proxy: SocketAddr) -> ConnectedAddr {
235 InboundProxy {
236 transient_addr: proxy,
237 }
238 }
239
240 /// Returns a new isolated connected addr, with no metadata.
241 pub fn new_isolated() -> ConnectedAddr {
242 Isolated
243 }
244
245 /// Returns a `PeerSocketAddr` that can be used to track this connection in the
246 /// `AddressBook`.
247 ///
248 /// `None` for inbound connections, proxy connections, and isolated
249 /// connections.
250 ///
251 /// # Correctness
252 ///
253 /// This address can be used for reconnection attempts, or as a permanent
254 /// identifier.
255 ///
256 /// # Security
257 ///
258 /// This address must not depend on the canonical address from the `Version`
259 /// message. Otherwise, malicious peers could interfere with other peers
260 /// `AddressBook` state.
261 ///
262 /// TODO: remove the `get_` from these methods (Rust style avoids `get` prefixes)
263 pub fn get_address_book_addr(&self) -> Option<PeerSocketAddr> {
264 match self {
265 OutboundDirect { addr } | InboundDirect { addr } => Some(*addr),
266 // TODO: consider using the canonical address of the peer to track
267 // outbound proxy connections
268 OutboundProxy { .. } | InboundProxy { .. } | Isolated => None,
269 }
270 }
271
272 /// Returns a `PeerSocketAddr` that can be used to temporarily identify a
273 /// connection.
274 ///
275 /// Isolated connections must not change Zebra's peer set or address book
276 /// state, so they do not have an identifier.
277 ///
278 /// # Correctness
279 ///
280 /// The returned address is only valid while the original connection is
281 /// open. It must not be used in the `AddressBook`, for outbound connection
282 /// attempts, or as a permanent identifier.
283 ///
284 /// # Security
285 ///
286 /// This address must not depend on the canonical address from the `Version`
287 /// message. Otherwise, malicious peers could interfere with other peers'
288 /// `PeerSet` state.
289 pub fn get_transient_addr(&self) -> Option<PeerSocketAddr> {
290 match self {
291 OutboundDirect { addr } => Some(*addr),
292 InboundDirect { addr } => Some(*addr),
293 OutboundProxy {
294 transient_local_addr,
295 ..
296 } => Some(PeerSocketAddr::from(*transient_local_addr)),
297 InboundProxy { transient_addr } => Some(PeerSocketAddr::from(*transient_addr)),
298 Isolated => None,
299 }
300 }
301
302 /// Returns the metrics label for this connection's address.
303 pub fn get_transient_addr_label(&self) -> String {
304 self.get_transient_addr()
305 .map_or_else(|| "isolated".to_string(), |addr| addr.to_string())
306 }
307
308 /// Returns a short label for the kind of connection.
309 pub fn get_short_kind_label(&self) -> &'static str {
310 match self {
311 OutboundDirect { .. } => "Out",
312 InboundDirect { .. } => "In",
313 OutboundProxy { .. } => "ProxOut",
314 InboundProxy { .. } => "ProxIn",
315 Isolated => "Isol",
316 }
317 }
318
319 /// Returns a list of alternate remote peer addresses, which can be used for
320 /// reconnection attempts.
321 ///
322 /// Uses the connected address, and the remote canonical address.
323 ///
324 /// Skips duplicates. If this is an outbound connection, also skips the
325 /// remote address that we're currently connected to.
326 pub fn get_alternate_addrs(
327 &self,
328 mut canonical_remote: PeerSocketAddr,
329 ) -> impl Iterator<Item = PeerSocketAddr> {
330 let addrs = match self {
331 OutboundDirect { addr } => {
332 // Fixup unspecified addresses and ports using known good data
333 if canonical_remote.ip().is_unspecified() {
334 canonical_remote.set_ip(addr.ip());
335 }
336 if canonical_remote.port() == 0 {
337 canonical_remote.set_port(addr.port());
338 }
339
340 // Try the canonical remote address, if it is different from the
341 // outbound address (which we already have in our address book)
342 if &canonical_remote != addr {
343 vec![canonical_remote]
344 } else {
345 // we didn't learn a new address from the handshake:
346 // it's the same as the outbound address, which is already in our address book
347 Vec::new()
348 }
349 }
350
351 InboundDirect { addr } => {
352 // Use the IP from the TCP connection, and the port the peer told us
353 let maybe_addr = SocketAddr::new(addr.ip(), canonical_remote.port()).into();
354
355 // Try both addresses, but remove one duplicate if they match
356 if canonical_remote != maybe_addr {
357 vec![canonical_remote, maybe_addr]
358 } else {
359 vec![canonical_remote]
360 }
361 }
362
363 // Proxy addresses can't be used for reconnection attempts, but we
364 // can try the canonical remote address
365 OutboundProxy { .. } | InboundProxy { .. } => vec![canonical_remote],
366
367 // Hide all metadata for isolated connections
368 Isolated => Vec::new(),
369 };
370
371 addrs.into_iter()
372 }
373
374 /// Returns true if the [`ConnectedAddr`] was created for an inbound connection.
375 pub fn is_inbound(&self) -> bool {
376 matches!(self, InboundDirect { .. } | InboundProxy { .. })
377 }
378}
379
380impl fmt::Debug for ConnectedAddr {
381 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
382 let kind = self.get_short_kind_label();
383 let addr = self.get_transient_addr_label();
384
385 if matches!(self, Isolated) {
386 f.write_str(kind)
387 } else {
388 f.debug_tuple(kind).field(&addr).finish()
389 }
390 }
391}
392
393/// A builder for `Handshake`.
394pub struct Builder<S, C = NoChainTip>
395where
396 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
397 S::Future: Send,
398 C: ChainTip + Clone + Send + 'static,
399{
400 config: Option<Config>,
401 our_services: Option<PeerServices>,
402 user_agent: Option<String>,
403 relay: Option<bool>,
404
405 inbound_service: Option<S>,
406 address_book_updater: Option<tokio::sync::mpsc::Sender<MetaAddrChange>>,
407 inv_collector: Option<broadcast::Sender<InventoryChange>>,
408 latest_chain_tip: C,
409}
410
411impl<S, C> Builder<S, C>
412where
413 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
414 S::Future: Send,
415 C: ChainTip + Clone + Send + 'static,
416{
417 /// Provide a config. Mandatory.
418 pub fn with_config(mut self, config: Config) -> Self {
419 self.config = Some(config);
420 self
421 }
422
423 /// Provide a service to handle inbound requests. Mandatory.
424 pub fn with_inbound_service(mut self, inbound_service: S) -> Self {
425 self.inbound_service = Some(inbound_service);
426 self
427 }
428
429 /// Provide a channel for registering inventory advertisements. Optional.
430 ///
431 /// This channel takes transient remote addresses, which the `PeerSet` uses
432 /// to look up peers that have specific inventory.
433 pub fn with_inventory_collector(
434 mut self,
435 inv_collector: broadcast::Sender<InventoryChange>,
436 ) -> Self {
437 self.inv_collector = Some(inv_collector);
438 self
439 }
440
441 /// Provide a hook for timestamp collection. Optional.
442 ///
443 /// This channel takes `MetaAddr`s, permanent addresses which can be used to
444 /// make outbound connections to peers.
445 pub fn with_address_book_updater(
446 mut self,
447 address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
448 ) -> Self {
449 self.address_book_updater = Some(address_book_updater);
450 self
451 }
452
453 /// Provide the services this node advertises to other peers. Optional.
454 ///
455 /// If this is unset, the node will advertise itself as a client.
456 pub fn with_advertised_services(mut self, services: PeerServices) -> Self {
457 self.our_services = Some(services);
458 self
459 }
460
461 /// Provide this node's user agent. Optional.
462 ///
463 /// This must be a valid BIP14 string. If it is unset, the user-agent will be empty.
464 pub fn with_user_agent(mut self, user_agent: String) -> Self {
465 self.user_agent = Some(user_agent);
466 self
467 }
468
469 /// Provide a realtime endpoint to obtain the current best chain tip block height. Optional.
470 ///
471 /// If this is unset, the minimum accepted protocol version for peer connections is kept
472 /// constant over network upgrade activations.
473 ///
474 /// Use [`NoChainTip`] to explicitly provide no chain tip.
475 pub fn with_latest_chain_tip<NewC>(self, latest_chain_tip: NewC) -> Builder<S, NewC>
476 where
477 NewC: ChainTip + Clone + Send + 'static,
478 {
479 Builder {
480 latest_chain_tip,
481
482 // TODO: Until Rust RFC 2528 reaches stable, we can't do `..self`
483 config: self.config,
484 inbound_service: self.inbound_service,
485 address_book_updater: self.address_book_updater,
486 our_services: self.our_services,
487 user_agent: self.user_agent,
488 relay: self.relay,
489 inv_collector: self.inv_collector,
490 }
491 }
492
493 /// Whether to request that peers relay transactions to our node. Optional.
494 ///
495 /// If this is unset, the node will not request transactions.
496 pub fn want_transactions(mut self, relay: bool) -> Self {
497 self.relay = Some(relay);
498 self
499 }
500
501 /// Consume this builder and produce a [`Handshake`].
502 ///
503 /// Returns an error only if any mandatory field was unset.
504 pub fn finish(self) -> Result<Handshake<S, C>, &'static str> {
505 let config = self.config.ok_or("did not specify config")?;
506 let inbound_service = self
507 .inbound_service
508 .ok_or("did not specify inbound service")?;
509 let inv_collector = self.inv_collector.unwrap_or_else(|| {
510 let (tx, _) = broadcast::channel(100);
511 tx
512 });
513 let address_book_updater = self.address_book_updater.unwrap_or_else(|| {
514 // No `AddressBookUpdater` for timestamp collection was passed, so create a stub
515 // channel. Dropping the receiver means sends will fail, but we don't care.
516 let (tx, _rx) = tokio::sync::mpsc::channel(1);
517 tx
518 });
519 let nonces = Arc::new(futures::lock::Mutex::new(IndexSet::new()));
520 let user_agent = self.user_agent.unwrap_or_default();
521 let our_services = self.our_services.unwrap_or_else(PeerServices::empty);
522 let relay = self.relay.unwrap_or(false);
523 let network = config.network.clone();
524 let minimum_peer_version = MinimumPeerVersion::new(self.latest_chain_tip, &network);
525
526 Ok(Handshake {
527 config,
528 user_agent,
529 our_services,
530 relay,
531 inbound_service,
532 address_book_updater,
533 inv_collector,
534 minimum_peer_version,
535 nonces,
536 parent_span: Span::current(),
537 })
538 }
539}
540
541impl<S> Handshake<S, NoChainTip>
542where
543 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
544 S::Future: Send,
545{
546 /// Create a builder that configures a [`Handshake`] service.
547 pub fn builder() -> Builder<S, NoChainTip> {
548 // We don't derive `Default` because the derive inserts a `where S:
549 // Default` bound even though `Option<S>` implements `Default` even if
550 // `S` does not.
551 Builder {
552 config: None,
553 our_services: None,
554 user_agent: None,
555 relay: None,
556 inbound_service: None,
557 address_book_updater: None,
558 inv_collector: None,
559 latest_chain_tip: NoChainTip,
560 }
561 }
562}
563
564/// Negotiate the Zcash network protocol version with the remote peer at `connected_addr`, using
565/// the connection `peer_conn`.
566///
567/// We split `Handshake` into its components before calling this function, to avoid infectious
568/// `Sync` bounds on the returned future.
569///
570/// Returns the [`VersionMessage`] sent by the remote peer, and the [`Version`] negotiated with the
571/// remote peer, inside a [`ConnectionInfo`] struct.
572#[allow(clippy::too_many_arguments)]
573pub async fn negotiate_version<PeerTransport>(
574 peer_conn: &mut Framed<PeerTransport, Codec>,
575 connected_addr: &ConnectedAddr,
576 config: Config,
577 nonces: Arc<futures::lock::Mutex<IndexSet<Nonce>>>,
578 user_agent: String,
579 our_services: PeerServices,
580 relay: bool,
581 mut minimum_peer_version: MinimumPeerVersion<impl ChainTip>,
582) -> Result<Arc<ConnectionInfo>, HandshakeError>
583where
584 PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
585{
586 // Create a random nonce for this connection
587 let local_nonce = Nonce::default();
588
589 // Insert the nonce for this handshake into the shared nonce set.
590 // Each connection has its own connection state, and handshakes execute concurrently.
591 //
592 // # Correctness
593 //
594 // It is ok to wait for the lock here, because handshakes have a short
595 // timeout, and the async mutex will be released when the task times
596 // out.
597 {
598 let mut locked_nonces = nonces.lock().await;
599
600 // Duplicate nonces are very rare, because they require a 64-bit random number collision,
601 // and the nonce set is limited to a few hundred entries.
602 let is_unique_nonce = locked_nonces.insert(local_nonce);
603 if !is_unique_nonce {
604 return Err(HandshakeError::LocalDuplicateNonce);
605 }
606
607 // # Security
608 //
609 // Limit the amount of memory used for nonces.
610 // Nonces can be left in the set if the connection fails or times out between
611 // the nonce being inserted, and it being removed.
612 //
613 // Zebra has strict connection limits, so we limit the number of nonces to
614 // the configured connection limit.
615 // This is a tradeoff between:
616 // - avoiding memory denial of service attacks which make large numbers of connections,
617 // for example, 100 failed inbound connections takes 1 second.
618 // - memory usage: 16 bytes per `Nonce`, 3.2 kB for 200 nonces
619 // - collision probability: two hundred 64-bit nonces have a very low collision probability
620 // <https://en.wikipedia.org/wiki/Birthday_problem#Probability_of_a_shared_birthday_(collision)>
621 while locked_nonces.len() > config.peerset_total_connection_limit() {
622 locked_nonces.shift_remove_index(0);
623 }
624
625 std::mem::drop(locked_nonces);
626 }
627
628 // Don't leak our exact clock skew to our peers. On the other hand,
629 // we can't deviate too much, or zcashd will get confused.
630 // Inspection of the zcashd source code reveals that the timestamp
631 // is only ever used at the end of parsing the version message, in
632 //
633 // pfrom->nTimeOffset = timeWarning.AddTimeData(pfrom->addr, nTime, GetTime());
634 //
635 // AddTimeData is defined in src/timedata.cpp and is a no-op as long
636 // as the difference between the specified timestamp and the
637 // zcashd's local time is less than TIMEDATA_WARNING_THRESHOLD, set
638 // to 10 * 60 seconds (10 minutes).
639 //
640 // nTimeOffset is peer metadata that is never used, except for
641 // statistics.
642 //
643 // To try to stay within the range where zcashd will ignore our clock skew,
644 // truncate the timestamp to the nearest 5 minutes.
645 let now = Utc::now().timestamp();
646 let timestamp = Utc
647 .timestamp_opt(now - now.rem_euclid(5 * 60), 0)
648 .single()
649 .expect("in-range number of seconds and valid nanosecond");
650
651 let (their_addr, our_services, our_listen_addr) = match connected_addr {
652 // Version messages require an address, so we use
653 // an unspecified address for Isolated connections
654 Isolated => {
655 let unspec_ipv4 = get_unspecified_ipv4_addr(config.network);
656 (unspec_ipv4.into(), PeerServices::empty(), unspec_ipv4)
657 }
658 _ => {
659 let their_addr = connected_addr
660 .get_transient_addr()
661 .expect("non-Isolated connections have a remote addr");
662
663 // Include the configured external address in our version message, if any, otherwise, include our listen address.
664 let advertise_addr = match config.external_addr {
665 Some(external_addr) => {
666 info!(?their_addr, ?config.listen_addr, "using external address for Version messages");
667 external_addr
668 }
669 None => config.listen_addr,
670 };
671
672 (their_addr, our_services, advertise_addr)
673 }
674 };
675
676 let our_version = VersionMessage {
677 version: constants::CURRENT_NETWORK_PROTOCOL_VERSION,
678 services: our_services,
679 timestamp,
680 address_recv: AddrInVersion::new(their_addr, PeerServices::NODE_NETWORK),
681 // TODO: detect external address (#1893)
682 address_from: AddrInVersion::new(our_listen_addr, our_services),
683 nonce: local_nonce,
684 user_agent: user_agent.clone(),
685 start_height: minimum_peer_version.chain_tip_height(),
686 relay,
687 }
688 .into();
689
690 debug!(?our_version, "sending initial version message");
691 peer_conn.send(our_version).await?;
692
693 let mut remote_msg = peer_conn
694 .next()
695 .await
696 .ok_or(HandshakeError::ConnectionClosed)??;
697
698 // Wait for next message if the one we got is not Version
699 let remote: VersionMessage = loop {
700 match remote_msg {
701 Message::Version(version_message) => {
702 debug!(?version_message, "got version message from remote peer");
703 break version_message;
704 }
705 _ => {
706 remote_msg = peer_conn
707 .next()
708 .await
709 .ok_or(HandshakeError::ConnectionClosed)??;
710 debug!(?remote_msg, "ignoring non-version message from remote peer");
711 }
712 }
713 };
714
715 let remote_address_services = remote.address_from.untrusted_services();
716 if remote_address_services != remote.services {
717 info!(
718 ?remote.services,
719 ?remote_address_services,
720 ?remote.user_agent,
721 "peer with inconsistent version services and version address services",
722 );
723 }
724
725 // Check for nonce reuse, indicating self-connection
726 //
727 // # Correctness
728 //
729 // We must wait for the lock before we continue with the connection, to avoid
730 // self-connection. If the connection times out, the async lock will be
731 // released.
732 //
733 // # Security
734 //
735 // We don't remove the nonce here, because peers that observe our network traffic could
736 // maliciously remove nonces, and force us to make self-connections.
737 let nonce_reuse = nonces.lock().await.contains(&remote.nonce);
738 if nonce_reuse {
739 info!(?connected_addr, "rejecting self-connection attempt");
740 Err(HandshakeError::RemoteNonceReuse)?;
741 }
742
743 // # Security
744 //
745 // Reject connections to peers on old versions, because they might not know about all
746 // network upgrades and could lead to chain forks or slower block propagation.
747 let min_version = minimum_peer_version.current();
748
749 if remote.version < min_version {
750 debug!(
751 remote_ip = ?their_addr,
752 ?remote.version,
753 ?min_version,
754 ?remote.user_agent,
755 "disconnecting from peer with obsolete network protocol version",
756 );
757
758 // the value is the number of rejected handshakes, by peer IP and protocol version
759 metrics::counter!(
760 "zcash.net.peers.obsolete",
761 "remote_ip" => their_addr.to_string(),
762 "remote_version" => remote.version.to_string(),
763 "min_version" => min_version.to_string(),
764 "user_agent" => remote.user_agent.clone(),
765 )
766 .increment(1);
767
768 // the value is the remote version of the most recent rejected handshake from each peer
769 metrics::gauge!(
770 "zcash.net.peers.version.obsolete",
771 "remote_ip" => their_addr.to_string(),
772 )
773 .set(remote.version.0 as f64);
774
775 // Disconnect if peer is using an obsolete version.
776 return Err(HandshakeError::ObsoleteVersion(remote.version));
777 }
778
779 let negotiated_version = min(constants::CURRENT_NETWORK_PROTOCOL_VERSION, remote.version);
780
781 // Limit containing struct size, and avoid multiple duplicates of 300+ bytes of data.
782 let connection_info = Arc::new(ConnectionInfo {
783 connected_addr: *connected_addr,
784 remote,
785 negotiated_version,
786 });
787
788 debug!(
789 remote_ip = ?their_addr,
790 ?connection_info.remote.version,
791 ?negotiated_version,
792 ?min_version,
793 ?connection_info.remote.user_agent,
794 "negotiated network protocol version with peer",
795 );
796
797 // the value is the number of connected handshakes, by peer IP and protocol version
798 metrics::counter!(
799 "zcash.net.peers.connected",
800 "remote_ip" => their_addr.to_string(),
801 "remote_version" => connection_info.remote.version.to_string(),
802 "negotiated_version" => negotiated_version.to_string(),
803 "min_version" => min_version.to_string(),
804 "user_agent" => connection_info.remote.user_agent.clone(),
805 )
806 .increment(1);
807
808 // the value is the remote version of the most recent connected handshake from each peer
809 metrics::gauge!(
810 "zcash.net.peers.version.connected",
811 "remote_ip" => their_addr.to_string(),
812 )
813 .set(connection_info.remote.version.0 as f64);
814
815 peer_conn.send(Message::Verack).await?;
816
817 let mut remote_msg = peer_conn
818 .next()
819 .await
820 .ok_or(HandshakeError::ConnectionClosed)??;
821
822 // Wait for next message if the one we got is not Verack
823 loop {
824 match remote_msg {
825 Message::Verack => {
826 debug!(?remote_msg, "got verack message from remote peer");
827 break;
828 }
829 _ => {
830 remote_msg = peer_conn
831 .next()
832 .await
833 .ok_or(HandshakeError::ConnectionClosed)??;
834 debug!(?remote_msg, "ignoring non-verack message from remote peer");
835 }
836 }
837 }
838
839 Ok(connection_info)
840}
841
842/// A handshake request.
843/// Contains the information needed to handshake with the peer.
844pub struct HandshakeRequest<PeerTransport>
845where
846 PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
847{
848 /// The tokio [`TcpStream`](tokio::net::TcpStream) or Tor
849 /// `arti_client::DataStream` to the peer.
850 // Use [`arti_client::DataStream`] when #5492 is done.
851 pub data_stream: PeerTransport,
852
853 /// The address of the peer, and other related information.
854 pub connected_addr: ConnectedAddr,
855
856 /// A connection tracker that reduces the open connection count when dropped.
857 ///
858 /// Used to limit the number of open connections in Zebra.
859 pub connection_tracker: ConnectionTracker,
860}
861
862impl<S, PeerTransport, C> Service<HandshakeRequest<PeerTransport>> for Handshake<S, C>
863where
864 S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
865 S::Future: Send,
866 C: ChainTip + Clone + Send + 'static,
867 PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
868{
869 type Response = Client;
870 type Error = BoxError;
871 type Future =
872 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
873
874 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
875 Poll::Ready(Ok(()))
876 }
877
878 fn call(&mut self, req: HandshakeRequest<PeerTransport>) -> Self::Future {
879 let HandshakeRequest {
880 data_stream,
881 connected_addr,
882 connection_tracker,
883 } = req;
884
885 let negotiator_span = debug_span!("negotiator", peer = ?connected_addr);
886 // set the peer connection span's parent to the global span, as it
887 // should exist independently of its creation source (inbound
888 // connection, crawler, initial peer, ...)
889 let connection_span =
890 span!(parent: &self.parent_span, Level::INFO, "", peer = ?connected_addr);
891
892 // Clone these upfront, so they can be moved into the future.
893 let nonces = self.nonces.clone();
894 let inbound_service = self.inbound_service.clone();
895 let address_book_updater = self.address_book_updater.clone();
896 let inv_collector = self.inv_collector.clone();
897 let config = self.config.clone();
898 let user_agent = self.user_agent.clone();
899 let our_services = self.our_services;
900 let relay = self.relay;
901 let minimum_peer_version = self.minimum_peer_version.clone();
902
903 // # Security
904 //
905 // `zebra_network::init()` implements a connection timeout on this future.
906 // Any code outside this future does not have a timeout.
907 let fut = async move {
908 debug!(
909 addr = ?connected_addr,
910 "negotiating protocol version with remote peer"
911 );
912
913 let mut peer_conn = Framed::new(
914 data_stream,
915 Codec::builder()
916 .for_network(&config.network)
917 .with_metrics_addr_label(connected_addr.get_transient_addr_label())
918 .finish(),
919 );
920
921 let connection_info = negotiate_version(
922 &mut peer_conn,
923 &connected_addr,
924 config,
925 nonces,
926 user_agent,
927 our_services,
928 relay,
929 minimum_peer_version,
930 )
931 .await?;
932
933 let remote_services = connection_info.remote.services;
934
935 // The handshake succeeded: update the peer status from AttemptPending to Responded,
936 // and send initial connection info.
937 if let Some(book_addr) = connected_addr.get_address_book_addr() {
938 // the collector doesn't depend on network activity,
939 // so this await should not hang
940 let _ = address_book_updater
941 .send(MetaAddr::new_connected(
942 book_addr,
943 &remote_services,
944 connected_addr.is_inbound(),
945 ))
946 .await;
947 }
948
949 // Reconfigure the codec to use the negotiated version.
950 //
951 // TODO: The tokio documentation says not to do this while any frames are still being processed.
952 // Since we don't know that here, another way might be to release the tcp
953 // stream from the unversioned Framed wrapper and construct a new one with a versioned codec.
954 let bare_codec = peer_conn.codec_mut();
955 bare_codec.reconfigure_version(connection_info.negotiated_version);
956
957 debug!("constructing client, spawning server");
958
959 // These channels communicate between the inbound and outbound halves of the connection,
960 // and between the different connection tasks. We create separate tasks and channels
961 // for each new connection.
962 let (server_tx, server_rx) = futures::channel::mpsc::channel(0);
963 let (shutdown_tx, shutdown_rx) = oneshot::channel();
964 let error_slot = ErrorSlot::default();
965
966 let (peer_tx, peer_rx) = peer_conn.split();
967
968 // Instrument the peer's rx and tx streams.
969
970 let inner_conn_span = connection_span.clone();
971 let peer_tx = peer_tx.with(move |msg: Message| {
972 let span = debug_span!(parent: inner_conn_span.clone(), "outbound_metric");
973 // Add a metric for outbound messages.
974 metrics::counter!(
975 "zcash.net.out.messages",
976 "command" => msg.command(),
977 "addr" => connected_addr.get_transient_addr_label(),
978 )
979 .increment(1);
980 // We need to use future::ready rather than an async block here,
981 // because we need the sink to be Unpin, and the With<Fut, ...>
982 // returned by .with is Unpin only if Fut is Unpin, and the
983 // futures generated by async blocks are not Unpin.
984 future::ready(Ok(msg)).instrument(span)
985 });
986
987 // CORRECTNESS
988 //
989 // Ping/Pong messages and every error must update the peer address state via
990 // the inbound_ts_collector.
991 //
992 // The heartbeat task sends regular Ping/Pong messages,
993 // and it ends the connection if the heartbeat times out.
994 // So we can just track peer activity based on Ping and Pong.
995 // (This significantly improves performance, by reducing time system calls.)
996 let inbound_ts_collector = address_book_updater.clone();
997 let inbound_inv_collector = inv_collector.clone();
998 let ts_inner_conn_span = connection_span.clone();
999 let inv_inner_conn_span = connection_span.clone();
1000 let peer_rx = peer_rx
1001 .then(move |msg| {
1002 // Add a metric for inbound messages and errors.
1003 // Fire a timestamp or failure event.
1004 let inbound_ts_collector = inbound_ts_collector.clone();
1005 let span =
1006 debug_span!(parent: ts_inner_conn_span.clone(), "inbound_ts_collector");
1007
1008 async move {
1009 match &msg {
1010 Ok(msg) => {
1011 metrics::counter!(
1012 "zcash.net.in.messages",
1013 "command" => msg.command(),
1014 "addr" => connected_addr.get_transient_addr_label(),
1015 )
1016 .increment(1);
1017
1018 // # Security
1019 //
1020 // Peer messages are not rate-limited, so we can't send anything
1021 // to a shared channel or do anything expensive here.
1022 }
1023 Err(err) => {
1024 metrics::counter!(
1025 "zebra.net.in.errors",
1026 "error" => err.to_string(),
1027 "addr" => connected_addr.get_transient_addr_label(),
1028 )
1029 .increment(1);
1030
1031 // # Security
1032 //
1033 // Peer errors are rate-limited because:
1034 // - opening connections is rate-limited
1035 // - the number of connections is limited
1036 // - after the first error, the peer is disconnected
1037 if let Some(book_addr) = connected_addr.get_address_book_addr() {
1038 let _ = inbound_ts_collector
1039 .send(MetaAddr::new_errored(book_addr, remote_services))
1040 .await;
1041 }
1042 }
1043 }
1044 msg
1045 }
1046 .instrument(span)
1047 })
1048 .then(move |msg| {
1049 let inbound_inv_collector = inbound_inv_collector.clone();
1050 let span = debug_span!(parent: inv_inner_conn_span.clone(), "inventory_filter");
1051 register_inventory_status(msg, connected_addr, inbound_inv_collector)
1052 .instrument(span)
1053 })
1054 .boxed();
1055
1056 // If we've learned potential peer addresses from the inbound connection remote address
1057 // or the handshake version message, add those addresses to the peer cache for this
1058 // peer.
1059 //
1060 // # Security
1061 //
1062 // We can't add these alternate addresses directly to the address book. If we did,
1063 // malicious peers could interfere with the address book state of other peers by
1064 // providing their addresses in `Version` messages. Or they could fill the address book
1065 // with fake addresses.
1066 //
1067 // These peer addresses are rate-limited because:
1068 // - opening connections is rate-limited
1069 // - these addresses are put in the peer address cache
1070 // - the peer address cache is only used when Zebra requests addresses from that peer
1071 let remote_canonical_addr = connection_info.remote.address_from.addr();
1072 let alternate_addrs = connected_addr
1073 .get_alternate_addrs(remote_canonical_addr)
1074 .map(|addr| {
1075 // Assume the connecting node is a server node, and it's available now.
1076 MetaAddr::new_gossiped_meta_addr(
1077 addr,
1078 PeerServices::NODE_NETWORK,
1079 DateTime32::now(),
1080 )
1081 });
1082
1083 let server = Connection::new(
1084 inbound_service,
1085 server_rx,
1086 error_slot.clone(),
1087 peer_tx,
1088 connection_tracker,
1089 connection_info.clone(),
1090 alternate_addrs.collect(),
1091 );
1092
1093 let connection_task = tokio::spawn(
1094 server
1095 .run(peer_rx)
1096 .instrument(connection_span.clone())
1097 .boxed(),
1098 );
1099
1100 let heartbeat_task = tokio::spawn(
1101 send_periodic_heartbeats_with_shutdown_handle(
1102 connected_addr,
1103 shutdown_rx,
1104 server_tx.clone(),
1105 address_book_updater.clone(),
1106 )
1107 .instrument(tracing::debug_span!(parent: connection_span, "heartbeat"))
1108 .boxed(),
1109 );
1110
1111 let client = Client {
1112 connection_info,
1113 shutdown_tx: Some(shutdown_tx),
1114 server_tx,
1115 inv_collector,
1116 error_slot,
1117 connection_task,
1118 heartbeat_task,
1119 };
1120
1121 Ok(client)
1122 };
1123
1124 // Correctness: As a defence-in-depth against hangs, wrap the entire handshake in a timeout.
1125 let fut = timeout(constants::HANDSHAKE_TIMEOUT, fut);
1126
1127 // Spawn a new task to drive this handshake, forwarding panics to the calling task.
1128 tokio::spawn(fut.instrument(negotiator_span))
1129 .map(
1130 |join_result: Result<
1131 Result<Result<Client, HandshakeError>, error::Elapsed>,
1132 JoinError,
1133 >| {
1134 match join_result {
1135 Ok(Ok(Ok(connection_client))) => Ok(connection_client),
1136 Ok(Ok(Err(handshake_error))) => Err(handshake_error.into()),
1137 Ok(Err(timeout_error)) => Err(timeout_error.into()),
1138 Err(join_error) => match join_error.try_into_panic() {
1139 // Forward panics to the calling task
1140 Ok(panic_reason) => panic::resume_unwind(panic_reason),
1141 Err(join_error) => Err(join_error.into()),
1142 },
1143 }
1144 },
1145 )
1146 .boxed()
1147 }
1148}
1149
1150/// Register any advertised or missing inventory in `msg` for `connected_addr`.
1151pub(crate) async fn register_inventory_status(
1152 msg: Result<Message, SerializationError>,
1153 connected_addr: ConnectedAddr,
1154 inv_collector: broadcast::Sender<InventoryChange>,
1155) -> Result<Message, SerializationError> {
1156 match (&msg, connected_addr.get_transient_addr()) {
1157 (Ok(Message::Inv(advertised)), Some(transient_addr)) => {
1158 // We ignore inventory messages with more than one
1159 // block, because they are most likely replies to a
1160 // query, rather than a newly gossiped block.
1161 //
1162 // (We process inventory messages with any number of
1163 // transactions.)
1164 //
1165 // https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html#inventory-monitoring
1166 //
1167 // Note: zcashd has a bug where it merges queued inv messages of
1168 // the same or different types. Zebra compensates by sending `notfound`
1169 // responses to the inv collector. (#2156, #1768)
1170 //
1171 // (We can't split `inv`s, because that fills the inventory registry
1172 // with useless entries that the whole network has, making it large and slow.)
1173 match advertised.as_slice() {
1174 [advertised @ InventoryHash::Block(_)] => {
1175 debug!(
1176 ?advertised,
1177 "registering gossiped advertised block inventory for peer"
1178 );
1179
1180 // The peer set and inv collector use the peer's remote
1181 // address as an identifier
1182 // If all receivers have been dropped, `send` returns an error.
1183 // When that happens, Zebra is shutting down, so we want to ignore this error.
1184 let _ = inv_collector
1185 .send(InventoryChange::new_available(*advertised, transient_addr));
1186 }
1187 advertised => {
1188 let advertised = advertised
1189 .iter()
1190 .filter(|advertised| advertised.unmined_tx_id().is_some());
1191
1192 debug!(
1193 ?advertised,
1194 "registering advertised unmined transaction inventory for peer",
1195 );
1196
1197 if let Some(change) =
1198 InventoryChange::new_available_multi(advertised, transient_addr)
1199 {
1200 // Ignore channel errors that should only happen during shutdown.
1201 let _ = inv_collector.send(change);
1202 }
1203 }
1204 }
1205 }
1206
1207 (Ok(Message::NotFound(missing)), Some(transient_addr)) => {
1208 // Ignore Errors and the unsupported FilteredBlock type
1209 let missing = missing.iter().filter(|missing| {
1210 missing.unmined_tx_id().is_some() || missing.block_hash().is_some()
1211 });
1212
1213 debug!(?missing, "registering missing inventory for peer");
1214
1215 if let Some(change) = InventoryChange::new_missing_multi(missing, transient_addr) {
1216 let _ = inv_collector.send(change);
1217 }
1218 }
1219 _ => {}
1220 }
1221
1222 msg
1223}
1224
1225/// Send periodical heartbeats to `server_tx`, and update the peer status through
1226/// `heartbeat_ts_collector`.
1227///
1228/// # Correctness
1229///
1230/// To prevent hangs:
1231/// - every await that depends on the network must have a timeout (or interval)
1232/// - every error/shutdown must update the address book state and return
1233///
1234/// The address book state can be updated via `ClientRequest.tx`, or the
1235/// heartbeat_ts_collector.
1236///
1237/// Returning from this function terminates the connection's heartbeat task.
1238async fn send_periodic_heartbeats_with_shutdown_handle(
1239 connected_addr: ConnectedAddr,
1240 shutdown_rx: oneshot::Receiver<CancelHeartbeatTask>,
1241 server_tx: futures::channel::mpsc::Sender<ClientRequest>,
1242 heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>,
1243) -> Result<(), BoxError> {
1244 use futures::future::Either;
1245
1246 let heartbeat_run_loop = send_periodic_heartbeats_run_loop(
1247 connected_addr,
1248 server_tx,
1249 heartbeat_ts_collector.clone(),
1250 );
1251
1252 pin_mut!(shutdown_rx);
1253 pin_mut!(heartbeat_run_loop);
1254
1255 // CORRECTNESS
1256 //
1257 // Currently, select prefers the first future if multiple
1258 // futures are ready.
1259 //
1260 // Starvation is impossible here, because interval has a
1261 // slow rate, and shutdown is a oneshot. If both futures
1262 // are ready, we want the shutdown to take priority over
1263 // sending a useless heartbeat.
1264 match future::select(shutdown_rx, heartbeat_run_loop).await {
1265 Either::Left((Ok(CancelHeartbeatTask), _unused_run_loop)) => {
1266 tracing::trace!("shutting down because Client requested shut down");
1267 handle_heartbeat_shutdown(
1268 PeerError::ClientCancelledHeartbeatTask,
1269 &heartbeat_ts_collector,
1270 &connected_addr,
1271 )
1272 .await
1273 }
1274 Either::Left((Err(oneshot::Canceled), _unused_run_loop)) => {
1275 tracing::trace!("shutting down because Client was dropped");
1276 handle_heartbeat_shutdown(
1277 PeerError::ClientDropped,
1278 &heartbeat_ts_collector,
1279 &connected_addr,
1280 )
1281 .await
1282 }
1283 Either::Right((result, _unused_shutdown)) => {
1284 tracing::trace!("shutting down due to heartbeat failure");
1285 // heartbeat_timeout() already send an error on the timestamp collector channel
1286
1287 result
1288 }
1289 }
1290}
1291
1292/// Send periodical heartbeats to `server_tx`, and update the peer status through
1293/// `heartbeat_ts_collector`.
1294///
1295/// See `send_periodic_heartbeats_with_shutdown_handle` for details.
1296async fn send_periodic_heartbeats_run_loop(
1297 connected_addr: ConnectedAddr,
1298 mut server_tx: futures::channel::mpsc::Sender<ClientRequest>,
1299 heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>,
1300) -> Result<(), BoxError> {
1301 // Don't send the first heartbeat immediately - we've just completed the handshake!
1302 let mut interval = tokio::time::interval_at(
1303 Instant::now() + constants::HEARTBEAT_INTERVAL,
1304 constants::HEARTBEAT_INTERVAL,
1305 );
1306 // If the heartbeat is delayed, also delay all future heartbeats.
1307 // (Shorter heartbeat intervals just add load, without any benefit.)
1308 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
1309
1310 let mut interval_stream = IntervalStream::new(interval);
1311
1312 while let Some(_instant) = interval_stream.next().await {
1313 // We've reached another heartbeat interval without
1314 // shutting down, so do a heartbeat request.
1315 let heartbeat = send_one_heartbeat(&mut server_tx);
1316 heartbeat_timeout(heartbeat, &heartbeat_ts_collector, &connected_addr).await?;
1317
1318 // # Security
1319 //
1320 // Peer heartbeats are rate-limited because:
1321 // - opening connections is rate-limited
1322 // - the number of connections is limited
1323 // - Zebra initiates each heartbeat using a timer
1324 if let Some(book_addr) = connected_addr.get_address_book_addr() {
1325 // the collector doesn't depend on network activity,
1326 // so this await should not hang
1327 let _ = heartbeat_ts_collector
1328 .send(MetaAddr::new_responded(book_addr))
1329 .await;
1330 }
1331 }
1332
1333 unreachable!("unexpected IntervalStream termination")
1334}
1335
1336/// Send one heartbeat using `server_tx`.
1337async fn send_one_heartbeat(
1338 server_tx: &mut futures::channel::mpsc::Sender<ClientRequest>,
1339) -> Result<(), BoxError> {
1340 // We just reached a heartbeat interval, so start sending
1341 // a heartbeat.
1342 let (tx, rx) = oneshot::channel();
1343
1344 // Try to send the heartbeat request
1345 let request = Request::Ping(Nonce::default());
1346 tracing::trace!(?request, "queueing heartbeat request");
1347 match server_tx.try_send(ClientRequest {
1348 request,
1349 tx,
1350 // we're not requesting inventory, so we don't need to update the registry
1351 inv_collector: None,
1352 transient_addr: None,
1353 span: tracing::Span::current(),
1354 }) {
1355 Ok(()) => {}
1356 Err(e) => {
1357 if e.is_disconnected() {
1358 Err(PeerError::ConnectionClosed)?;
1359 } else if e.is_full() {
1360 // Send the message when the Client becomes ready.
1361 // If sending takes too long, the heartbeat timeout will elapse
1362 // and close the connection, reducing our load to busy peers.
1363 server_tx.send(e.into_inner()).await?;
1364 } else {
1365 // we need to map unexpected error types to PeerErrors
1366 warn!(?e, "unexpected try_send error");
1367 Err(e)?;
1368 };
1369 }
1370 }
1371
1372 // Flush the heartbeat request from the queue
1373 server_tx.flush().await?;
1374 tracing::trace!("sent heartbeat request");
1375
1376 // Heartbeats are checked internally to the
1377 // connection logic, but we need to wait on the
1378 // response to avoid canceling the request.
1379 rx.await??;
1380 tracing::trace!("got heartbeat response");
1381
1382 Ok(())
1383}
1384
1385/// Wrap `fut` in a timeout, handing any inner or outer errors using
1386/// `handle_heartbeat_error`.
1387async fn heartbeat_timeout<F, T>(
1388 fut: F,
1389 address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1390 connected_addr: &ConnectedAddr,
1391) -> Result<T, BoxError>
1392where
1393 F: Future<Output = Result<T, BoxError>>,
1394{
1395 let t = match timeout(constants::HEARTBEAT_INTERVAL, fut).await {
1396 Ok(inner_result) => {
1397 handle_heartbeat_error(inner_result, address_book_updater, connected_addr).await?
1398 }
1399 Err(elapsed) => {
1400 handle_heartbeat_error(Err(elapsed), address_book_updater, connected_addr).await?
1401 }
1402 };
1403
1404 Ok(t)
1405}
1406
1407/// If `result.is_err()`, mark `connected_addr` as failed using `address_book_updater`.
1408async fn handle_heartbeat_error<T, E>(
1409 result: Result<T, E>,
1410 address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1411 connected_addr: &ConnectedAddr,
1412) -> Result<T, E>
1413where
1414 E: std::fmt::Debug,
1415{
1416 match result {
1417 Ok(t) => Ok(t),
1418 Err(err) => {
1419 tracing::debug!(?err, "heartbeat error, shutting down");
1420
1421 // # Security
1422 //
1423 // Peer errors and shutdowns are rate-limited because:
1424 // - opening connections is rate-limited
1425 // - the number of connections is limited
1426 // - after the first error or shutdown, the peer is disconnected
1427 if let Some(book_addr) = connected_addr.get_address_book_addr() {
1428 let _ = address_book_updater
1429 .send(MetaAddr::new_errored(book_addr, None))
1430 .await;
1431 }
1432 Err(err)
1433 }
1434 }
1435}
1436
1437/// Mark `connected_addr` as shut down using `address_book_updater`.
1438async fn handle_heartbeat_shutdown(
1439 peer_error: PeerError,
1440 address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
1441 connected_addr: &ConnectedAddr,
1442) -> Result<(), BoxError> {
1443 tracing::debug!(?peer_error, "client shutdown, shutting down heartbeat");
1444
1445 if let Some(book_addr) = connected_addr.get_address_book_addr() {
1446 let _ = address_book_updater
1447 .send(MetaAddr::new_shutdown(book_addr))
1448 .await;
1449 }
1450
1451 Err(peer_error.into())
1452}