zebra_network/peer/
connection.rs

1//! Zebra's per-peer connection state machine.
2//!
3//! Maps the external Zcash/Bitcoin protocol to Zebra's internal request/response
4//! protocol.
5//!
6//! This module contains a lot of undocumented state, assumptions and invariants.
7//! And it's unclear if these assumptions match the `zcashd` implementation.
8//! It should be refactored into a cleaner set of request/response pairs (#1515).
9
10use std::{borrow::Cow, collections::HashSet, fmt, pin::Pin, sync::Arc, time::Instant};
11
12use futures::{future::Either, prelude::*};
13use rand::{seq::SliceRandom, thread_rng, Rng};
14use tokio::time::{sleep, Sleep};
15use tower::{Service, ServiceExt};
16use tracing_futures::Instrument;
17
18use zebra_chain::{
19    block::{self, Block},
20    serialization::SerializationError,
21    transaction::{UnminedTx, UnminedTxId},
22};
23
24use crate::{
25    constants::{
26        self, MAX_ADDRS_IN_MESSAGE, MAX_OVERLOAD_DROP_PROBABILITY, MIN_OVERLOAD_DROP_PROBABILITY,
27        OVERLOAD_PROTECTION_INTERVAL, PEER_ADDR_RESPONSE_LIMIT,
28    },
29    meta_addr::MetaAddr,
30    peer::{
31        connection::peer_tx::PeerTx, error::AlreadyErrored, ClientRequest, ClientRequestReceiver,
32        ConnectionInfo, ErrorSlot, InProgressClientRequest, MustUseClientResponseSender, PeerError,
33        SharedPeerError,
34    },
35    peer_set::ConnectionTracker,
36    protocol::{
37        external::{types::Nonce, InventoryHash, Message},
38        internal::{InventoryResponse, Request, Response},
39    },
40    BoxError, PeerSocketAddr, MAX_TX_INV_IN_SENT_MESSAGE,
41};
42
43use InventoryResponse::*;
44
45mod peer_tx;
46
47#[cfg(test)]
48mod tests;
49
50#[derive(Debug)]
51pub(super) enum Handler {
52    /// Indicates that the handler has finished processing the request.
53    /// An error here is scoped to the request.
54    Finished(Result<Response, PeerError>),
55    Ping(Nonce),
56    Peers,
57    FindBlocks,
58    FindHeaders,
59    BlocksByHash {
60        pending_hashes: HashSet<block::Hash>,
61        blocks: Vec<Arc<Block>>,
62    },
63    TransactionsById {
64        pending_ids: HashSet<UnminedTxId>,
65        transactions: Vec<UnminedTx>,
66    },
67    MempoolTransactionIds,
68}
69
70impl fmt::Display for Handler {
71    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
72        f.write_str(&match self {
73            Handler::Finished(Ok(response)) => format!("Finished({response})"),
74            Handler::Finished(Err(error)) => format!("Finished({error})"),
75
76            Handler::Ping(_) => "Ping".to_string(),
77            Handler::Peers => "Peers".to_string(),
78
79            Handler::FindBlocks => "FindBlocks".to_string(),
80            Handler::FindHeaders => "FindHeaders".to_string(),
81            Handler::BlocksByHash {
82                pending_hashes,
83                blocks,
84            } => format!(
85                "BlocksByHash {{ pending_hashes: {}, blocks: {} }}",
86                pending_hashes.len(),
87                blocks.len()
88            ),
89
90            Handler::TransactionsById {
91                pending_ids,
92                transactions,
93            } => format!(
94                "TransactionsById {{ pending_ids: {}, transactions: {} }}",
95                pending_ids.len(),
96                transactions.len()
97            ),
98            Handler::MempoolTransactionIds => "MempoolTransactionIds".to_string(),
99        })
100    }
101}
102
103impl Handler {
104    /// Returns the Zebra internal handler type as a string.
105    pub fn command(&self) -> Cow<'static, str> {
106        match self {
107            Handler::Finished(Ok(response)) => format!("Finished({})", response.command()).into(),
108            Handler::Finished(Err(error)) => format!("Finished({})", error.kind()).into(),
109
110            Handler::Ping(_) => "Ping".into(),
111            Handler::Peers => "Peers".into(),
112
113            Handler::FindBlocks => "FindBlocks".into(),
114            Handler::FindHeaders => "FindHeaders".into(),
115
116            Handler::BlocksByHash { .. } => "BlocksByHash".into(),
117            Handler::TransactionsById { .. } => "TransactionsById".into(),
118
119            Handler::MempoolTransactionIds => "MempoolTransactionIds".into(),
120        }
121    }
122
123    /// Try to handle `msg` as a response to a client request, possibly consuming
124    /// it in the process.
125    ///
126    /// This function is where we statefully interpret Bitcoin/Zcash messages
127    /// into responses to messages in the internal request/response protocol.
128    /// This conversion is done by a sequence of (request, message) match arms,
129    /// each of which contains the conversion logic for that pair.
130    ///
131    /// Taking ownership of the message means that we can pass ownership of its
132    /// contents to responses without additional copies.  If the message is not
133    /// interpretable as a response, we return ownership to the caller.
134    ///
135    /// Unexpected messages are left unprocessed, and may be rejected later.
136    ///
137    /// `addr` responses are limited to avoid peer set takeover. Any excess
138    /// addresses are stored in `cached_addrs`.
139    fn process_message(
140        &mut self,
141        msg: Message,
142        cached_addrs: &mut Vec<MetaAddr>,
143        transient_addr: Option<PeerSocketAddr>,
144    ) -> Option<Message> {
145        let mut ignored_msg = None;
146        // TODO: can this be avoided?
147        let tmp_state = std::mem::replace(self, Handler::Finished(Ok(Response::Nil)));
148
149        debug!(handler = %tmp_state, %msg, "received peer response to Zebra request");
150
151        *self = match (tmp_state, msg) {
152            (Handler::Ping(req_nonce), Message::Pong(rsp_nonce)) => {
153                if req_nonce == rsp_nonce {
154                    Handler::Finished(Ok(Response::Nil))
155                } else {
156                    Handler::Ping(req_nonce)
157                }
158            }
159
160            (Handler::Peers, Message::Addr(new_addrs)) => {
161                // Security: This method performs security-sensitive operations, see its comments
162                // for details.
163                let response_addrs =
164                    Handler::update_addr_cache(cached_addrs, &new_addrs, PEER_ADDR_RESPONSE_LIMIT);
165
166                debug!(
167                    new_addrs = new_addrs.len(),
168                    response_addrs = response_addrs.len(),
169                    remaining_addrs = cached_addrs.len(),
170                    PEER_ADDR_RESPONSE_LIMIT,
171                    "responding to Peers request using new and cached addresses",
172                );
173
174                Handler::Finished(Ok(Response::Peers(response_addrs)))
175            }
176
177            // `zcashd` returns requested transactions in a single batch of messages.
178            // Other transaction or non-transaction messages can come before or after the batch.
179            // After the transaction batch, `zcashd` sends `notfound` if any transactions are missing:
180            // https://github.com/zcash/zcash/blob/e7b425298f6d9a54810cb7183f00be547e4d9415/src/main.cpp#L5617
181            (
182                Handler::TransactionsById {
183                    mut pending_ids,
184                    mut transactions,
185                },
186                Message::Tx(transaction),
187            ) => {
188                // assumptions:
189                //   - the transaction messages are sent in a single continuous batch
190                //   - missing transactions are silently skipped
191                //     (there is no `notfound` message at the end of the batch)
192                if pending_ids.remove(&transaction.id) {
193                    // we are in the middle of the continuous transaction messages
194                    transactions.push(transaction);
195                } else {
196                    // We got a transaction we didn't ask for. If the caller doesn't know any of the
197                    // transactions, they should have sent a `notfound` with all the hashes, rather
198                    // than an unsolicited transaction.
199                    //
200                    // So either:
201                    // 1. The peer implements the protocol badly, skipping `notfound`.
202                    //    We should cancel the request, so we don't hang waiting for transactions
203                    //    that will never arrive.
204                    // 2. The peer sent an unsolicited transaction.
205                    //    We should ignore the transaction, and wait for the actual response.
206                    //
207                    // We end the request, so we don't hang on bad peers (case 1). But we keep the
208                    // connection open, so the inbound service can process transactions from good
209                    // peers (case 2).
210                    ignored_msg = Some(Message::Tx(transaction));
211                }
212
213                if ignored_msg.is_some() && transactions.is_empty() {
214                    // If we didn't get anything we wanted, retry the request.
215                    let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect();
216                    Handler::Finished(Err(PeerError::NotFoundResponse(missing_transaction_ids)))
217                } else if pending_ids.is_empty() || ignored_msg.is_some() {
218                    // If we got some of what we wanted, let the internal client know.
219                    let available = transactions
220                        .into_iter()
221                        .map(|t| InventoryResponse::Available((t, transient_addr)));
222                    let missing = pending_ids.into_iter().map(InventoryResponse::Missing);
223
224                    Handler::Finished(Ok(Response::Transactions(
225                        available.chain(missing).collect(),
226                    )))
227                } else {
228                    // Keep on waiting for more.
229                    Handler::TransactionsById {
230                        pending_ids,
231                        transactions,
232                    }
233                }
234            }
235            // `zcashd` peers actually return this response
236            (
237                Handler::TransactionsById {
238                    pending_ids,
239                    transactions,
240                },
241                Message::NotFound(missing_invs),
242            ) => {
243                // assumptions:
244                //   - the peer eventually returns a transaction or a `notfound` entry
245                //     for each hash
246                //   - all `notfound` entries are contained in a single message
247                //   - the `notfound` message comes after the transaction messages
248                //
249                // If we're in sync with the peer, then the `notfound` should contain the remaining
250                // hashes from the handler. If we're not in sync with the peer, we should return
251                // what we got so far.
252                let missing_transaction_ids: HashSet<_> = transaction_ids(&missing_invs).collect();
253                if missing_transaction_ids != pending_ids {
254                    trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids);
255                    // if these errors are noisy, we should replace them with debugs
256                    debug!("unexpected notfound message from peer: all remaining transaction hashes should be listed in the notfound. Using partial received transactions as the peer response");
257                }
258                if missing_transaction_ids.len() != missing_invs.len() {
259                    trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids);
260                    debug!("unexpected notfound message from peer: notfound contains duplicate hashes or non-transaction hashes. Using partial received transactions as the peer response");
261                }
262
263                if transactions.is_empty() {
264                    // If we didn't get anything we wanted, retry the request.
265                    let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect();
266                    Handler::Finished(Err(PeerError::NotFoundResponse(missing_transaction_ids)))
267                } else {
268                    // If we got some of what we wanted, let the internal client know.
269                    let available = transactions
270                        .into_iter()
271                        .map(|t| InventoryResponse::Available((t, transient_addr)));
272                    let missing = pending_ids.into_iter().map(InventoryResponse::Missing);
273
274                    Handler::Finished(Ok(Response::Transactions(
275                        available.chain(missing).collect(),
276                    )))
277                }
278            }
279
280            // `zcashd` returns requested blocks in a single batch of messages.
281            // Other blocks or non-blocks messages can come before or after the batch.
282            // `zcashd` silently skips missing blocks, rather than sending a final `notfound` message.
283            // https://github.com/zcash/zcash/blob/e7b425298f6d9a54810cb7183f00be547e4d9415/src/main.cpp#L5523
284            (
285                Handler::BlocksByHash {
286                    mut pending_hashes,
287                    mut blocks,
288                },
289                Message::Block(block),
290            ) => {
291                // assumptions:
292                //   - the block messages are sent in a single continuous batch
293                //   - missing blocks are silently skipped
294                //     (there is no `notfound` message at the end of the batch)
295                if pending_hashes.remove(&block.hash()) {
296                    // we are in the middle of the continuous block messages
297                    blocks.push(block);
298                } else {
299                    // We got a block we didn't ask for.
300                    //
301                    // So either:
302                    // 1. The response is for a previously cancelled block request.
303                    //    We should treat that block as an inbound gossiped block,
304                    //    and wait for the actual response.
305                    // 2. The peer doesn't know any of the blocks we asked for.
306                    //    We should cancel the request, so we don't hang waiting for blocks that
307                    //    will never arrive.
308                    // 3. The peer sent an unsolicited block.
309                    //    We should treat that block as an inbound gossiped block,
310                    //    and wait for the actual response.
311                    //
312                    // We ignore the message, so we don't desynchronize with the peer. This happens
313                    // when we cancel a request and send a second different request, but receive a
314                    // response for the first request. If we ended the request then, we could send
315                    // a third request to the peer, and end up having to end that request as well
316                    // when the response for the second request arrives.
317                    //
318                    // Ignoring the message gives us a chance to synchronize back to the correct
319                    // request. If that doesn't happen, this request times out.
320                    //
321                    // In case 2, if peers respond with a `notfound` message,
322                    // the cascading errors don't happen. The `notfound` message cancels our request,
323                    // and we know we are in sync with the peer.
324                    //
325                    // Zebra sends `notfound` in response to block requests, but `zcashd` doesn't.
326                    // So we need this message workaround, and the related inventory workarounds.
327                    ignored_msg = Some(Message::Block(block));
328                }
329
330                if pending_hashes.is_empty() {
331                    // If we got everything we wanted, let the internal client know.
332                    let available = blocks
333                        .into_iter()
334                        .map(|block| InventoryResponse::Available((block, transient_addr)));
335                    Handler::Finished(Ok(Response::Blocks(available.collect())))
336                } else {
337                    // Keep on waiting for all the blocks we wanted, until we get them or time out.
338                    Handler::BlocksByHash {
339                        pending_hashes,
340                        blocks,
341                    }
342                }
343            }
344            // peers are allowed to return this response, but `zcashd` never does
345            (
346                Handler::BlocksByHash {
347                    pending_hashes,
348                    blocks,
349                },
350                Message::NotFound(missing_invs),
351            ) => {
352                // assumptions:
353                //   - the peer eventually returns a block or a `notfound` entry
354                //     for each hash
355                //   - all `notfound` entries are contained in a single message
356                //   - the `notfound` message comes after the block messages
357                //
358                // If we're in sync with the peer, then the `notfound` should contain the remaining
359                // hashes from the handler. If we're not in sync with the peer, we should return
360                // what we got so far, and log an error.
361                let missing_blocks: HashSet<_> = block_hashes(&missing_invs).collect();
362                if missing_blocks != pending_hashes {
363                    trace!(?missing_invs, ?missing_blocks, ?pending_hashes);
364                    // if these errors are noisy, we should replace them with debugs
365                    debug!("unexpected notfound message from peer: all remaining block hashes should be listed in the notfound. Using partial received blocks as the peer response");
366                }
367                if missing_blocks.len() != missing_invs.len() {
368                    trace!(?missing_invs, ?missing_blocks, ?pending_hashes);
369                    debug!("unexpected notfound message from peer: notfound contains duplicate hashes or non-block hashes. Using partial received blocks as the peer response");
370                }
371
372                if blocks.is_empty() {
373                    // If we didn't get anything we wanted, retry the request.
374                    let missing_block_hashes = pending_hashes.into_iter().map(Into::into).collect();
375                    Handler::Finished(Err(PeerError::NotFoundResponse(missing_block_hashes)))
376                } else {
377                    // If we got some of what we wanted, let the internal client know.
378                    let available = blocks
379                        .into_iter()
380                        .map(|block| InventoryResponse::Available((block, transient_addr)));
381                    let missing = pending_hashes.into_iter().map(InventoryResponse::Missing);
382
383                    Handler::Finished(Ok(Response::Blocks(available.chain(missing).collect())))
384                }
385            }
386
387            // TODO:
388            // - use `any(inv)` rather than `all(inv)`?
389            (Handler::FindBlocks, Message::Inv(items))
390                if items
391                    .iter()
392                    .all(|item| matches!(item, InventoryHash::Block(_))) =>
393            {
394                Handler::Finished(Ok(Response::BlockHashes(
395                    block_hashes(&items[..]).collect(),
396                )))
397            }
398            (Handler::FindHeaders, Message::Headers(headers)) => {
399                Handler::Finished(Ok(Response::BlockHeaders(headers)))
400            }
401
402            (Handler::MempoolTransactionIds, Message::Inv(items))
403                if items.iter().all(|item| item.unmined_tx_id().is_some()) =>
404            {
405                Handler::Finished(Ok(Response::TransactionIds(
406                    transaction_ids(&items).collect(),
407                )))
408            }
409
410            // By default, messages are not responses.
411            (state, msg) => {
412                trace!(?msg, "did not interpret message as response");
413                ignored_msg = Some(msg);
414                state
415            }
416        };
417
418        ignored_msg
419    }
420
421    /// Adds `new_addrs` to the `cached_addrs` cache, then takes and returns `response_size`
422    /// addresses from that cache.
423    ///
424    /// `cached_addrs` can be empty if the cache is empty. `new_addrs` can be empty or `None` if
425    /// there are no new addresses. `response_size` can be zero or `None` if there is no response
426    /// needed.
427    fn update_addr_cache<'new>(
428        cached_addrs: &mut Vec<MetaAddr>,
429        new_addrs: impl IntoIterator<Item = &'new MetaAddr>,
430        response_size: impl Into<Option<usize>>,
431    ) -> Vec<MetaAddr> {
432        // # Peer Set Reliability
433        //
434        // Newly received peers are added to the cache, so that we can use them if the connection
435        // doesn't respond to our getaddr requests.
436        //
437        // Add the new addresses to the end of the cache.
438        cached_addrs.extend(new_addrs);
439
440        // # Security
441        //
442        // We limit how many peer addresses we take from each peer, so that our address book
443        // and outbound connections aren't controlled by a single peer (#1869). We randomly select
444        // peers, so the remote peer can't control which addresses we choose by changing the order
445        // in the messages they send.
446        let response_size = response_size.into().unwrap_or_default();
447
448        let mut temp_cache = Vec::new();
449        std::mem::swap(cached_addrs, &mut temp_cache);
450
451        // The response is fully shuffled, remaining is partially shuffled.
452        let (response, remaining) = temp_cache.partial_shuffle(&mut thread_rng(), response_size);
453
454        // # Security
455        //
456        // The cache size is limited to avoid memory denial of service.
457        //
458        // It's ok to just partially shuffle the cache, because it doesn't actually matter which
459        // peers we drop. Having excess peers is rare, because most peers only send one large
460        // unsolicited peer message when they first connect.
461        *cached_addrs = remaining.to_vec();
462        cached_addrs.truncate(MAX_ADDRS_IN_MESSAGE);
463
464        response.to_vec()
465    }
466}
467
468#[derive(Debug)]
469#[must_use = "AwaitingResponse.tx.send() must be called before drop"]
470pub(super) enum State {
471    /// Awaiting a client request or a peer message.
472    AwaitingRequest,
473    /// Awaiting a peer message we can interpret as a response to a client request.
474    AwaitingResponse {
475        handler: Handler,
476        tx: MustUseClientResponseSender,
477        span: tracing::Span,
478    },
479    /// A failure has occurred and we are shutting down the connection.
480    Failed,
481}
482
483impl fmt::Display for State {
484    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
485        f.write_str(&match self {
486            State::AwaitingRequest => "AwaitingRequest".to_string(),
487            State::AwaitingResponse { handler, .. } => {
488                format!("AwaitingResponse({handler})")
489            }
490            State::Failed => "Failed".to_string(),
491        })
492    }
493}
494
495impl State {
496    /// Returns the Zebra internal state as a string.
497    pub fn command(&self) -> Cow<'static, str> {
498        match self {
499            State::AwaitingRequest => "AwaitingRequest".into(),
500            State::AwaitingResponse { handler, .. } => {
501                format!("AwaitingResponse({})", handler.command()).into()
502            }
503            State::Failed => "Failed".into(),
504        }
505    }
506}
507
508/// The outcome of mapping an inbound [`Message`] to a [`Request`].
509#[derive(Clone, Debug, Eq, PartialEq)]
510#[must_use = "inbound messages must be handled"]
511pub enum InboundMessage {
512    /// The message was mapped to an inbound [`Request`].
513    AsRequest(Request),
514
515    /// The message was consumed by the mapping method.
516    ///
517    /// For example, it could be cached, treated as an error,
518    /// or an internally handled [`Message::Ping`].
519    Consumed,
520
521    /// The message was not used by the inbound message handler.
522    Unused,
523}
524
525impl From<Request> for InboundMessage {
526    fn from(request: Request) -> Self {
527        InboundMessage::AsRequest(request)
528    }
529}
530
531/// The channels, services, and associated state for a peer connection.
532pub struct Connection<S, Tx>
533where
534    Tx: Sink<Message, Error = SerializationError> + Unpin,
535{
536    /// The metadata for the connected peer `service`.
537    ///
538    /// This field is used for debugging.
539    pub connection_info: Arc<ConnectionInfo>,
540
541    /// The state of this connection's current request or response.
542    pub(super) state: State,
543
544    /// A timeout for a client request. This is stored separately from
545    /// State so that we can move the future out of it independently of
546    /// other state handling.
547    pub(super) request_timer: Option<Pin<Box<Sleep>>>,
548
549    /// Unused peers from recent `addr` or `addrv2` messages from this peer.
550    /// Also holds the initial addresses sent in `version` messages, or guessed from the remote IP.
551    ///
552    /// When peers send solicited or unsolicited peer advertisements, Zebra puts them in this cache.
553    ///
554    /// When Zebra's components request peers, some cached peers are randomly selected,
555    /// consumed, and returned as a modified response. This works around `zcashd`'s address
556    /// response rate-limit.
557    ///
558    /// The cache size is limited to avoid denial of service attacks.
559    pub(super) cached_addrs: Vec<MetaAddr>,
560
561    /// The `inbound` service, used to answer requests from this connection's peer.
562    pub(super) svc: S,
563
564    /// A channel for requests that Zebra's internal services want to send to remote peers.
565    ///
566    /// This channel accepts [`Request`]s, and produces [`InProgressClientRequest`]s.
567    pub(super) client_rx: ClientRequestReceiver,
568
569    /// A slot for an error shared between the Connection and the Client that uses it.
570    ///
571    /// `None` unless the connection or client have errored.
572    pub(super) error_slot: ErrorSlot,
573
574    /// A channel for sending Zcash messages to the connected peer.
575    ///
576    /// This channel accepts [`Message`]s.
577    ///
578    /// The corresponding peer message receiver is passed to [`Connection::run`].
579    pub(super) peer_tx: PeerTx<Tx>,
580
581    /// A connection tracker that reduces the open connection count when dropped.
582    /// Used to limit the number of open connections in Zebra.
583    ///
584    /// This field does nothing until it is dropped.
585    ///
586    /// # Security
587    ///
588    /// If this connection tracker or `Connection`s are leaked,
589    /// the number of active connections will appear higher than it actually is.
590    /// If enough connections leak, Zebra will stop making new connections.
591    #[allow(dead_code)]
592    pub(super) connection_tracker: ConnectionTracker,
593
594    /// The metrics label for this peer. Usually the remote IP and port.
595    pub(super) metrics_label: String,
596
597    /// The state for this peer, when the metrics were last updated.
598    pub(super) last_metrics_state: Option<Cow<'static, str>>,
599
600    /// The time of the last overload error response from the inbound
601    /// service to a request from this connection,
602    /// or None if this connection hasn't yet received an overload error.
603    last_overload_time: Option<Instant>,
604}
605
606impl<S, Tx> fmt::Debug for Connection<S, Tx>
607where
608    Tx: Sink<Message, Error = SerializationError> + Unpin,
609{
610    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
611        // skip the channels, they don't tell us anything useful
612        f.debug_struct(std::any::type_name::<Connection<S, Tx>>())
613            .field("connection_info", &self.connection_info)
614            .field("state", &self.state)
615            .field("request_timer", &self.request_timer)
616            .field("cached_addrs", &self.cached_addrs.len())
617            .field("error_slot", &self.error_slot)
618            .field("metrics_label", &self.metrics_label)
619            .field("last_metrics_state", &self.last_metrics_state)
620            .field("last_overload_time", &self.last_overload_time)
621            .finish()
622    }
623}
624
625impl<S, Tx> Connection<S, Tx>
626where
627    Tx: Sink<Message, Error = SerializationError> + Unpin,
628{
629    /// Return a new connection from its channels, services, shared state, and metadata.
630    pub(crate) fn new(
631        inbound_service: S,
632        client_rx: futures::channel::mpsc::Receiver<ClientRequest>,
633        error_slot: ErrorSlot,
634        peer_tx: Tx,
635        connection_tracker: ConnectionTracker,
636        connection_info: Arc<ConnectionInfo>,
637        initial_cached_addrs: Vec<MetaAddr>,
638    ) -> Self {
639        let metrics_label = connection_info.connected_addr.get_transient_addr_label();
640
641        Connection {
642            connection_info,
643            state: State::AwaitingRequest,
644            request_timer: None,
645            cached_addrs: initial_cached_addrs,
646            svc: inbound_service,
647            client_rx: client_rx.into(),
648            error_slot,
649            peer_tx: peer_tx.into(),
650            connection_tracker,
651            metrics_label,
652            last_metrics_state: None,
653            last_overload_time: None,
654        }
655    }
656}
657
658impl<S, Tx> Connection<S, Tx>
659where
660    S: Service<Request, Response = Response, Error = BoxError>,
661    S::Error: Into<BoxError>,
662    Tx: Sink<Message, Error = SerializationError> + Unpin,
663{
664    /// Consume this `Connection` to form a spawnable future containing its event loop.
665    ///
666    /// `peer_rx` is a channel for receiving Zcash [`Message`]s from the connected peer.
667    /// The corresponding peer message receiver is [`Connection.peer_tx`].
668    pub async fn run<Rx>(mut self, mut peer_rx: Rx)
669    where
670        Rx: Stream<Item = Result<Message, SerializationError>> + Unpin,
671    {
672        // At a high level, the event loop we want is as follows: we check for any
673        // incoming messages from the remote peer, check if they should be interpreted
674        // as a response to a pending client request, and if not, interpret them as a
675        // request from the remote peer to our node.
676        //
677        // We also need to handle those client requests in the first place. The client
678        // requests are received from the corresponding `peer::Client` over a bounded
679        // channel (with bound 1, to minimize buffering), but there is no relationship
680        // between the stream of client requests and the stream of peer messages, so we
681        // cannot ignore one kind while waiting on the other. Moreover, we cannot accept
682        // a second client request while the first one is still pending.
683        //
684        // To do this, we inspect the current request state.
685        //
686        // If there is no pending request, we wait on either an incoming peer message or
687        // an incoming request, whichever comes first.
688        //
689        // If there is a pending request, we wait only on an incoming peer message, and
690        // check whether it can be interpreted as a response to the pending request.
691        //
692        // TODO: turn this comment into a module-level comment, after splitting the module.
693        loop {
694            self.update_state_metrics(None);
695
696            match self.state {
697                State::AwaitingRequest => {
698                    trace!("awaiting client request or peer message");
699                    // # Correctness
700                    //
701                    // Currently, select prefers the first future if multiple futures are ready.
702                    // We use this behaviour to prioritise messages on each individual peer
703                    // connection in this order:
704                    // - incoming messages from the remote peer, then
705                    // - outgoing messages to the remote peer.
706                    //
707                    // This improves the performance of peer responses to Zebra requests, and new
708                    // peer requests to Zebra's inbound service.
709                    //
710                    // `futures::StreamExt::next()` is cancel-safe:
711                    // <https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety>
712                    // This means that messages from the future that isn't selected stay in the stream,
713                    // and they will be returned next time the future is checked.
714                    //
715                    // If an inbound peer message arrives at a ready peer that also has a pending
716                    // request from Zebra, we want to process the peer's message first.
717                    // If we process the Zebra request first:
718                    // - we could misinterpret the inbound peer message as a response to the Zebra
719                    //   request, or
720                    // - if the peer message is a request to Zebra, and we put the peer in the
721                    //   AwaitingResponse state, then we'll correctly ignore the simultaneous Zebra
722                    //   request. (Zebra services make multiple requests or retry, so this is ok.)
723                    //
724                    // # Security
725                    //
726                    // If a peer sends an uninterrupted series of messages, it will delay any new
727                    // requests from Zebra to that individual peer. This is behaviour we want,
728                    // because:
729                    // - any responses to Zebra's requests to that peer would be slow or timeout,
730                    // - the peer will eventually fail a Zebra keepalive check and get disconnected,
731                    // - if there are too many inbound messages overall, the inbound service will
732                    //   return an overload error and the peer will be disconnected.
733                    //
734                    // Messages to other peers will continue to be processed concurrently. Some
735                    // Zebra services might be temporarily delayed until the peer times out, if a
736                    // request to that peer is sent by the service, and the service blocks until
737                    // the request completes (or times out).
738                    match future::select(peer_rx.next(), self.client_rx.next()).await {
739                        Either::Left((None, _)) => {
740                            self.fail_with(PeerError::ConnectionClosed).await;
741                        }
742                        Either::Left((Some(Err(e)), _)) => self.fail_with(e).await,
743                        Either::Left((Some(Ok(msg)), _)) => {
744                            let unhandled_msg = self.handle_message_as_request(msg).await;
745
746                            if let Some(unhandled_msg) = unhandled_msg {
747                                debug!(
748                                    %unhandled_msg,
749                                    "ignoring unhandled request while awaiting a request"
750                                );
751                            }
752                        }
753                        Either::Right((None, _)) => {
754                            trace!("client_rx closed, ending connection");
755
756                            // There are no requests to be flushed,
757                            // but we need to set an error and update metrics.
758                            // (We don't want to log this error, because it's normal behaviour.)
759                            self.shutdown_async(PeerError::ClientDropped).await;
760                            break;
761                        }
762                        Either::Right((Some(req), _)) => {
763                            let span = req.span.clone();
764                            self.handle_client_request(req).instrument(span).await
765                        }
766                    }
767                }
768
769                // Check whether the handler is finished before waiting for a response message,
770                // because the response might be `Nil` or synthetic.
771                State::AwaitingResponse {
772                    handler: Handler::Finished(_),
773                    ref span,
774                    ..
775                } => {
776                    // We have to get rid of the span reference so we can tamper with the state.
777                    let span = span.clone();
778                    trace!(
779                        parent: &span,
780                        "returning completed response to client request"
781                    );
782
783                    // Replace the state with a temporary value,
784                    // so we can take ownership of the response sender.
785                    let tmp_state = std::mem::replace(&mut self.state, State::Failed);
786
787                    if let State::AwaitingResponse {
788                        handler: Handler::Finished(response),
789                        tx,
790                        ..
791                    } = tmp_state
792                    {
793                        if let Ok(response) = response.as_ref() {
794                            debug!(%response, "finished receiving peer response to Zebra request");
795                            // Add a metric for inbound responses to outbound requests.
796                            metrics::counter!(
797                                "zebra.net.in.responses",
798                                "command" => response.command(),
799                                "addr" => self.metrics_label.clone(),
800                            )
801                            .increment(1);
802                        } else {
803                            debug!(error = ?response, "error in peer response to Zebra request");
804                        }
805
806                        let _ = tx.send(response.map_err(Into::into));
807                    } else {
808                        unreachable!("already checked for AwaitingResponse");
809                    }
810
811                    self.state = State::AwaitingRequest;
812                }
813
814                // We're awaiting a response to a client request,
815                // so wait on either a peer message, or on a request cancellation.
816                State::AwaitingResponse {
817                    ref span,
818                    ref mut tx,
819                    ..
820                } => {
821                    // we have to get rid of the span reference so we can tamper with the state
822                    let span = span.clone();
823                    trace!(parent: &span, "awaiting response to client request");
824                    let timer_ref = self
825                        .request_timer
826                        .as_mut()
827                        .expect("timeout must be set while awaiting response");
828
829                    // # Security
830                    //
831                    // select() prefers the first future if multiple futures are ready.
832                    //
833                    // If multiple futures are ready, we want the priority for each individual
834                    // connection to be:
835                    // - cancellation, then
836                    // - timeout, then
837                    // - peer responses.
838                    //
839                    // (Messages to other peers are processed concurrently.)
840                    //
841                    // This makes sure a peer can't block disconnection or timeouts by sending too
842                    // many messages. It also avoids doing work to process messages after a
843                    // connection has failed.
844                    let cancel = future::select(tx.cancellation(), timer_ref);
845                    match future::select(cancel, peer_rx.next())
846                        .instrument(span.clone())
847                        .await
848                    {
849                        Either::Right((None, _)) => {
850                            self.fail_with(PeerError::ConnectionClosed).await
851                        }
852                        Either::Right((Some(Err(e)), _)) => self.fail_with(e).await,
853                        Either::Right((Some(Ok(peer_msg)), _cancel)) => {
854                            self.update_state_metrics(format!("Out::Rsp::{}", peer_msg.command()));
855
856                            // Try to process the message using the handler.
857                            // This extremely awkward construction avoids
858                            // keeping a live reference to handler across the
859                            // call to handle_message_as_request, which takes
860                            // &mut self. This is a sign that we don't properly
861                            // factor the state required for inbound and
862                            // outbound requests.
863                            let request_msg = match self.state {
864                                State::AwaitingResponse {
865                                    ref mut handler, ..
866                                } => span.in_scope(|| handler.process_message(peer_msg, &mut self.cached_addrs, self.connection_info.connected_addr.get_transient_addr())),
867                                _ => unreachable!("unexpected state after AwaitingResponse: {:?}, peer_msg: {:?}, client_receiver: {:?}",
868                                                  self.state,
869                                                  peer_msg,
870                                                  self.client_rx,
871                                ),
872                            };
873
874                            self.update_state_metrics(None);
875
876                            // If the message was not consumed as a response,
877                            // check whether it can be handled as a request.
878                            let unused_msg = if let Some(request_msg) = request_msg {
879                                // do NOT instrument with the request span, this is
880                                // independent work
881                                self.handle_message_as_request(request_msg).await
882                            } else {
883                                None
884                            };
885
886                            if let Some(unused_msg) = unused_msg {
887                                debug!(
888                                    %unused_msg,
889                                    %self.state,
890                                    "ignoring peer message: not a response or a request",
891                                );
892                            }
893                        }
894                        Either::Left((Either::Right(_), _peer_fut)) => {
895                            trace!(parent: &span, "client request timed out");
896                            let e = PeerError::ConnectionReceiveTimeout;
897
898                            // Replace the state with a temporary value,
899                            // so we can take ownership of the response sender.
900                            self.state = match std::mem::replace(&mut self.state, State::Failed) {
901                                // Special case: ping timeouts fail the connection.
902                                State::AwaitingResponse {
903                                    handler: Handler::Ping(_),
904                                    tx,
905                                    ..
906                                } => {
907                                    // We replaced the original state, which means `fail_with` won't see it.
908                                    // So we do the state request cleanup manually.
909                                    let e = SharedPeerError::from(e);
910                                    let _ = tx.send(Err(e.clone()));
911                                    self.fail_with(e).await;
912                                    State::Failed
913                                }
914                                // Other request timeouts fail the request.
915                                State::AwaitingResponse { tx, .. } => {
916                                    let _ = tx.send(Err(e.into()));
917                                    State::AwaitingRequest
918                                }
919                                _ => unreachable!(
920                                    "unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
921                                    self.client_rx
922                                ),
923                            };
924                        }
925                        Either::Left((Either::Left(_), _peer_fut)) => {
926                            // The client receiver was dropped, so we don't need to send on `tx` here.
927                            trace!(parent: &span, "client request was cancelled");
928                            self.state = State::AwaitingRequest;
929                        }
930                    }
931                }
932
933                // This connection has failed: stop the event loop, and complete the future.
934                State::Failed => break,
935            }
936        }
937
938        // TODO: close peer_rx here, after changing it from a stream to a channel
939
940        let error = self.error_slot.try_get_error();
941        assert!(
942            error.is_some(),
943            "closing connections must call fail_with() or shutdown() to set the error slot"
944        );
945
946        self.update_state_metrics(error.expect("checked is_some").to_string());
947    }
948
949    /// Fail this connection, log the failure, and shut it down.
950    /// See [`Self::shutdown_async()`] for details.
951    ///
952    /// Use [`Self::shutdown_async()`] to avoid logging the failure,
953    /// and [`Self::shutdown()`] from non-async code.
954    async fn fail_with(&mut self, error: impl Into<SharedPeerError>) {
955        let error = error.into();
956
957        debug!(
958            %error,
959            client_receiver = ?self.client_rx,
960            "failing peer service with error"
961        );
962
963        self.shutdown_async(error).await;
964    }
965
966    /// Handle an internal client request, possibly generating outgoing messages to the
967    /// remote peer.
968    ///
969    /// NOTE: the caller should use .instrument(msg.span) to instrument the function.
970    async fn handle_client_request(&mut self, req: InProgressClientRequest) {
971        trace!(?req.request);
972        use Request::*;
973        use State::*;
974        let InProgressClientRequest { request, tx, span } = req;
975
976        if tx.is_canceled() {
977            metrics::counter!("peer.canceled").increment(1);
978            debug!(state = %self.state, %request, "ignoring canceled request");
979
980            metrics::counter!(
981                "zebra.net.out.requests.canceled",
982                "command" => request.command(),
983                "addr" => self.metrics_label.clone(),
984            )
985            .increment(1);
986            self.update_state_metrics(format!("Out::Req::Canceled::{}", request.command()));
987
988            return;
989        }
990
991        debug!(state = %self.state, %request, "sending request from Zebra to peer");
992
993        // Add a metric for outbound requests.
994        metrics::counter!(
995            "zebra.net.out.requests",
996            "command" => request.command(),
997            "addr" => self.metrics_label.clone(),
998        )
999        .increment(1);
1000        self.update_state_metrics(format!("Out::Req::{}", request.command()));
1001
1002        let new_handler = match (&self.state, request) {
1003            (Failed, request) => panic!(
1004                "failed connection cannot handle new request: {:?}, client_receiver: {:?}",
1005                request,
1006                self.client_rx
1007            ),
1008            (pending @ AwaitingResponse { .. }, request) => panic!(
1009                "tried to process new request: {:?} while awaiting a response: {:?}, client_receiver: {:?}",
1010                request,
1011                pending,
1012                self.client_rx
1013            ),
1014
1015            // Take some cached addresses from the peer connection. This address cache helps
1016            // work-around a `zcashd` addr response rate-limit.
1017            (AwaitingRequest, Peers) if !self.cached_addrs.is_empty() => {
1018                // Security: This method performs security-sensitive operations, see its comments
1019                // for details.
1020                let response_addrs = Handler::update_addr_cache(&mut self.cached_addrs, None, PEER_ADDR_RESPONSE_LIMIT);
1021
1022                debug!(
1023                    response_addrs = response_addrs.len(),
1024                    remaining_addrs = self.cached_addrs.len(),
1025                    PEER_ADDR_RESPONSE_LIMIT,
1026                    "responding to Peers request using some cached addresses",
1027                );
1028
1029                Ok(Handler::Finished(Ok(Response::Peers(response_addrs))))
1030            }
1031            (AwaitingRequest, Peers) => self
1032                .peer_tx
1033                .send(Message::GetAddr)
1034                .await
1035                .map(|()| Handler::Peers),
1036
1037            (AwaitingRequest, Ping(nonce)) => self
1038                .peer_tx
1039                .send(Message::Ping(nonce))
1040                .await
1041                .map(|()| Handler::Ping(nonce)),
1042
1043            (AwaitingRequest, BlocksByHash(hashes)) => {
1044                self
1045                    .peer_tx
1046                    .send(Message::GetData(
1047                        hashes.iter().map(|h| (*h).into()).collect(),
1048                    ))
1049                    .await
1050                    .map(|()|
1051                         Handler::BlocksByHash {
1052                             blocks: Vec::with_capacity(hashes.len()),
1053                             pending_hashes: hashes,
1054                         }
1055                    )
1056            }
1057            (AwaitingRequest, TransactionsById(ids)) => {
1058                self
1059                    .peer_tx
1060                    .send(Message::GetData(
1061                        ids.iter().map(Into::into).collect(),
1062                    ))
1063                    .await
1064                    .map(|()|
1065                         Handler::TransactionsById {
1066                             transactions: Vec::with_capacity(ids.len()),
1067                             pending_ids: ids,
1068                         })
1069            }
1070
1071            (AwaitingRequest, FindBlocks { known_blocks, stop }) => {
1072                self
1073                    .peer_tx
1074                    .send(Message::GetBlocks { known_blocks, stop })
1075                    .await
1076                    .map(|()|
1077                         Handler::FindBlocks
1078                    )
1079            }
1080            (AwaitingRequest, FindHeaders { known_blocks, stop }) => {
1081                self
1082                    .peer_tx
1083                    .send(Message::GetHeaders { known_blocks, stop })
1084                    .await
1085                    .map(|()|
1086                         Handler::FindHeaders
1087                    )
1088            }
1089
1090            (AwaitingRequest, MempoolTransactionIds) => {
1091                self
1092                    .peer_tx
1093                    .send(Message::Mempool)
1094                    .await
1095                    .map(|()|
1096                         Handler::MempoolTransactionIds
1097                    )
1098            }
1099
1100            (AwaitingRequest, PushTransaction(transaction)) => {
1101                self
1102                    .peer_tx
1103                    .send(Message::Tx(transaction))
1104                    .await
1105                    .map(|()|
1106                         Handler::Finished(Ok(Response::Nil))
1107                    )
1108            }
1109            (AwaitingRequest, AdvertiseTransactionIds(hashes)) => {
1110                let max_tx_inv_in_message: usize = MAX_TX_INV_IN_SENT_MESSAGE
1111                    .try_into()
1112                    .expect("constant fits in usize");
1113
1114                // # Security
1115                //
1116                // In most cases, we try to split over-sized requests into multiple network-layer
1117                // messages. But we are unlikely to reach this limit with the default mempool
1118                // config, so a gossip like this could indicate a network amplification attack.
1119                //
1120                // This limit is particularly important here, because advertisements send the same
1121                // message to half our available peers.
1122                //
1123                // If there are thousands of transactions in the mempool, letting peers know the
1124                // exact transactions we have isn't that important, so it's ok to drop arbitrary
1125                // transaction hashes from our response.
1126                if hashes.len() > max_tx_inv_in_message {
1127                    debug!(inv_count = ?hashes.len(), ?MAX_TX_INV_IN_SENT_MESSAGE, "unusually large transaction ID gossip");
1128                }
1129
1130                let hashes = hashes.into_iter().take(max_tx_inv_in_message).map(Into::into).collect();
1131
1132                self
1133                    .peer_tx
1134                    .send(Message::Inv(hashes))
1135                    .await
1136                    .map(|()|
1137                         Handler::Finished(Ok(Response::Nil))
1138                    )
1139            }
1140            (AwaitingRequest, AdvertiseBlock(hash)) => {
1141                self
1142                    .peer_tx
1143                    .send(Message::Inv(vec![hash.into()]))
1144                    .await
1145                    .map(|()|
1146                         Handler::Finished(Ok(Response::Nil))
1147                    )
1148            }
1149        };
1150
1151        // Update the connection state with a new handler, or fail with an error.
1152        match new_handler {
1153            Ok(handler) => {
1154                self.state = AwaitingResponse { handler, span, tx };
1155                self.request_timer = Some(Box::pin(sleep(constants::REQUEST_TIMEOUT)));
1156            }
1157            Err(error) => {
1158                let error = SharedPeerError::from(error);
1159                let _ = tx.send(Err(error.clone()));
1160                self.fail_with(error).await;
1161            }
1162        };
1163    }
1164
1165    /// Handle `msg` as a request from a peer to this Zebra instance.
1166    ///
1167    /// If the message is not handled, it is returned.
1168    // This function has its own span, because we're creating a new work
1169    // context (namely, the work of processing the inbound msg as a request)
1170    #[instrument(name = "msg_as_req", skip(self, msg), fields(msg = msg.command()))]
1171    async fn handle_message_as_request(&mut self, msg: Message) -> Option<Message> {
1172        trace!(?msg);
1173        debug!(state = %self.state, %msg, "received inbound peer message");
1174
1175        self.update_state_metrics(format!("In::Msg::{}", msg.command()));
1176
1177        use InboundMessage::*;
1178
1179        let req = match msg {
1180            Message::Ping(nonce) => {
1181                trace!(?nonce, "responding to heartbeat");
1182                if let Err(e) = self.peer_tx.send(Message::Pong(nonce)).await {
1183                    self.fail_with(e).await;
1184                }
1185                Consumed
1186            }
1187            // These messages shouldn't be sent outside of a handshake.
1188            Message::Version { .. } => {
1189                self.fail_with(PeerError::DuplicateHandshake).await;
1190                Consumed
1191            }
1192            Message::Verack => {
1193                self.fail_with(PeerError::DuplicateHandshake).await;
1194                Consumed
1195            }
1196            // These messages should already be handled as a response if they
1197            // could be a response, so if we see them here, they were either
1198            // sent unsolicited, or they were sent in response to a canceled request
1199            // that we've already forgotten about.
1200            Message::Reject { .. } => {
1201                debug!(%msg, "got reject message unsolicited or from canceled request");
1202                Unused
1203            }
1204            Message::NotFound { .. } => {
1205                debug!(%msg, "got notfound message unsolicited or from canceled request");
1206                Unused
1207            }
1208            Message::Pong(_) => {
1209                debug!(%msg, "got pong message unsolicited or from canceled request");
1210                Unused
1211            }
1212            Message::Block(_) => {
1213                debug!(%msg, "got block message unsolicited or from canceled request");
1214                Unused
1215            }
1216            Message::Headers(_) => {
1217                debug!(%msg, "got headers message unsolicited or from canceled request");
1218                Unused
1219            }
1220            // These messages should never be sent by peers.
1221            Message::FilterLoad { .. } | Message::FilterAdd { .. } | Message::FilterClear => {
1222                // # Security
1223                //
1224                // Zcash connections are not authenticated, so malicious nodes can send fake messages,
1225                // with connected peers' IP addresses in the IP header.
1226                //
1227                // Since we can't verify their source, Zebra needs to ignore unexpected messages,
1228                // because closing the connection could cause a denial of service or eclipse attack.
1229                debug!(%msg, "got BIP111 message without advertising NODE_BLOOM");
1230
1231                // Ignored, but consumed because it is technically a protocol error.
1232                Consumed
1233            }
1234
1235            // # Security
1236            //
1237            // Zebra crawls the network proactively, and that's the only way peers get into our
1238            // address book. This prevents peers from filling our address book with malicious peer
1239            // addresses.
1240            Message::Addr(ref new_addrs) => {
1241                // # Peer Set Reliability
1242                //
1243                // We keep a list of the unused peer addresses sent by each connection, to work
1244                // around `zcashd`'s `getaddr` response rate-limit.
1245                let no_response =
1246                    Handler::update_addr_cache(&mut self.cached_addrs, new_addrs, None);
1247                assert_eq!(
1248                    no_response,
1249                    Vec::new(),
1250                    "peers unexpectedly taken from cache"
1251                );
1252
1253                debug!(
1254                    new_addrs = new_addrs.len(),
1255                    cached_addrs = self.cached_addrs.len(),
1256                    "adding unsolicited addresses to cached addresses",
1257                );
1258
1259                Consumed
1260            }
1261            Message::Tx(ref transaction) => Request::PushTransaction(transaction.clone()).into(),
1262            Message::Inv(ref items) => match &items[..] {
1263                // We don't expect to be advertised multiple blocks at a time,
1264                // so we ignore any advertisements of multiple blocks.
1265                [InventoryHash::Block(hash)] => Request::AdvertiseBlock(*hash).into(),
1266
1267                // Some peers advertise invs with mixed item types.
1268                // But we're just interested in the transaction invs.
1269                //
1270                // TODO: split mixed invs into multiple requests,
1271                //       but skip runs of multiple blocks.
1272                tx_ids if tx_ids.iter().any(|item| item.unmined_tx_id().is_some()) => {
1273                    Request::AdvertiseTransactionIds(transaction_ids(items).collect()).into()
1274                }
1275
1276                // Log detailed messages for ignored inv advertisement messages.
1277                [] => {
1278                    debug!(%msg, "ignoring empty inv");
1279
1280                    // This might be a minor protocol error, or it might mean "not found".
1281                    Unused
1282                }
1283                [InventoryHash::Block(_), InventoryHash::Block(_), ..] => {
1284                    debug!(%msg, "ignoring inv with multiple blocks");
1285                    Unused
1286                }
1287                _ => {
1288                    debug!(%msg, "ignoring inv with no transactions");
1289                    Unused
1290                }
1291            },
1292            Message::GetData(ref items) => match &items[..] {
1293                // Some peers advertise invs with mixed item types.
1294                // So we suspect they might do the same with getdata.
1295                //
1296                // Since we can only handle one message at a time,
1297                // we treat it as a block request if there are any blocks,
1298                // or a transaction request if there are any transactions.
1299                //
1300                // TODO: split mixed getdata into multiple requests.
1301                b_hashes
1302                    if b_hashes
1303                        .iter()
1304                        .any(|item| matches!(item, InventoryHash::Block(_))) =>
1305                {
1306                    Request::BlocksByHash(block_hashes(items).collect()).into()
1307                }
1308                tx_ids if tx_ids.iter().any(|item| item.unmined_tx_id().is_some()) => {
1309                    Request::TransactionsById(transaction_ids(items).collect()).into()
1310                }
1311
1312                // Log detailed messages for ignored getdata request messages.
1313                [] => {
1314                    debug!(%msg, "ignoring empty getdata");
1315
1316                    // This might be a minor protocol error, or it might mean "not found".
1317                    Unused
1318                }
1319                _ => {
1320                    debug!(%msg, "ignoring getdata with no blocks or transactions");
1321                    Unused
1322                }
1323            },
1324            Message::GetAddr => Request::Peers.into(),
1325            Message::GetBlocks {
1326                ref known_blocks,
1327                stop,
1328            } => Request::FindBlocks {
1329                known_blocks: known_blocks.clone(),
1330                stop,
1331            }
1332            .into(),
1333            Message::GetHeaders {
1334                ref known_blocks,
1335                stop,
1336            } => Request::FindHeaders {
1337                known_blocks: known_blocks.clone(),
1338                stop,
1339            }
1340            .into(),
1341            Message::Mempool => Request::MempoolTransactionIds.into(),
1342        };
1343
1344        // Handle the request, and return unused messages.
1345        match req {
1346            AsRequest(req) => {
1347                self.drive_peer_request(req).await;
1348                None
1349            }
1350            Consumed => None,
1351            Unused => Some(msg),
1352        }
1353    }
1354
1355    /// Given a `req` originating from the peer, drive it to completion and send
1356    /// any appropriate messages to the remote peer. If an error occurs while
1357    /// processing the request (e.g., the service is shedding load), then we call
1358    /// fail_with to terminate the entire peer connection, shrinking the number
1359    /// of connected peers.
1360    async fn drive_peer_request(&mut self, req: Request) {
1361        trace!(?req);
1362
1363        // Add a metric for inbound requests
1364        metrics::counter!(
1365            "zebra.net.in.requests",
1366            "command" => req.command(),
1367            "addr" => self.metrics_label.clone(),
1368        )
1369        .increment(1);
1370        self.update_state_metrics(format!("In::Req::{}", req.command()));
1371
1372        // Give the inbound service time to clear its queue,
1373        // before sending the next inbound request.
1374        tokio::task::yield_now().await;
1375
1376        // # Security
1377        //
1378        // Holding buffer slots for a long time can cause hangs:
1379        // <https://docs.rs/tower/latest/tower/buffer/struct.Buffer.html#a-note-on-choosing-a-bound>
1380        //
1381        // The inbound service must be called immediately after a buffer slot is reserved.
1382        //
1383        // The inbound service never times out in readiness, because the load shed layer is always
1384        // ready, and returns an error in response to the request instead.
1385        if self.svc.ready().await.is_err() {
1386            self.fail_with(PeerError::ServiceShutdown).await;
1387            return;
1388        }
1389
1390        // Inbound service request timeouts are handled by the timeout layer in `start::start()`.
1391        let rsp = match self.svc.call(req.clone()).await {
1392            Err(e) => {
1393                if e.is::<tower::load_shed::error::Overloaded>() {
1394                    // # Security
1395                    //
1396                    // The peer request queue must have a limited length.
1397                    // The buffer and load shed layers are added in `start::start()`.
1398                    tracing::debug!("inbound service is overloaded, may close connection");
1399
1400                    let now = Instant::now();
1401
1402                    self.handle_inbound_overload(req, now, PeerError::Overloaded)
1403                        .await;
1404                } else if e.is::<tower::timeout::error::Elapsed>() {
1405                    // # Security
1406                    //
1407                    // Peer requests must have a timeout.
1408                    // The timeout layer is added in `start::start()`.
1409                    tracing::info!(%req, "inbound service request timed out, may close connection");
1410
1411                    let now = Instant::now();
1412
1413                    self.handle_inbound_overload(req, now, PeerError::InboundTimeout)
1414                        .await;
1415                } else {
1416                    // We could send a reject to the remote peer, but that might cause
1417                    // them to disconnect, and we might be using them to sync blocks.
1418                    // For similar reasons, we don't want to fail_with() here - we
1419                    // only close the connection if the peer is doing something wrong.
1420                    info!(
1421                        %e,
1422                        connection_state = ?self.state,
1423                        client_receiver = ?self.client_rx,
1424                        "error processing peer request",
1425                    );
1426                    self.update_state_metrics(format!("In::Req::{}/Rsp::Error", req.command()));
1427                }
1428
1429                return;
1430            }
1431            Ok(rsp) => rsp,
1432        };
1433
1434        // Add a metric for outbound responses to inbound requests
1435        metrics::counter!(
1436            "zebra.net.out.responses",
1437            "command" => rsp.command(),
1438            "addr" => self.metrics_label.clone(),
1439        )
1440        .increment(1);
1441        self.update_state_metrics(format!("In::Rsp::{}", rsp.command()));
1442
1443        // TODO: split response handler into its own method
1444        match rsp.clone() {
1445            Response::Nil => { /* generic success, do nothing */ }
1446            Response::Peers(addrs) => {
1447                if let Err(e) = self.peer_tx.send(Message::Addr(addrs)).await {
1448                    self.fail_with(e).await;
1449                }
1450            }
1451            Response::Transactions(transactions) => {
1452                // Generate one tx message per transaction,
1453                // then a notfound message with all the missing transaction ids.
1454                let mut missing_ids = Vec::new();
1455
1456                for transaction in transactions.into_iter() {
1457                    match transaction {
1458                        Available((transaction, _)) => {
1459                            if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await {
1460                                self.fail_with(e).await;
1461                                return;
1462                            }
1463                        }
1464                        Missing(id) => missing_ids.push(id.into()),
1465                    }
1466                }
1467
1468                if !missing_ids.is_empty() {
1469                    if let Err(e) = self.peer_tx.send(Message::NotFound(missing_ids)).await {
1470                        self.fail_with(e).await;
1471                        return;
1472                    }
1473                }
1474            }
1475            Response::Blocks(blocks) => {
1476                // Generate one tx message per block,
1477                // then a notfound message with all the missing block hashes.
1478                let mut missing_hashes = Vec::new();
1479
1480                for block in blocks.into_iter() {
1481                    match block {
1482                        Available((block, _)) => {
1483                            if let Err(e) = self.peer_tx.send(Message::Block(block)).await {
1484                                self.fail_with(e).await;
1485                                return;
1486                            }
1487                        }
1488                        Missing(hash) => missing_hashes.push(hash.into()),
1489                    }
1490                }
1491
1492                if !missing_hashes.is_empty() {
1493                    if let Err(e) = self.peer_tx.send(Message::NotFound(missing_hashes)).await {
1494                        self.fail_with(e).await;
1495                        return;
1496                    }
1497                }
1498            }
1499            Response::BlockHashes(hashes) => {
1500                if let Err(e) = self
1501                    .peer_tx
1502                    .send(Message::Inv(hashes.into_iter().map(Into::into).collect()))
1503                    .await
1504                {
1505                    self.fail_with(e).await
1506                }
1507            }
1508            Response::BlockHeaders(headers) => {
1509                if let Err(e) = self.peer_tx.send(Message::Headers(headers)).await {
1510                    self.fail_with(e).await
1511                }
1512            }
1513            Response::TransactionIds(hashes) => {
1514                let max_tx_inv_in_message: usize = MAX_TX_INV_IN_SENT_MESSAGE
1515                    .try_into()
1516                    .expect("constant fits in usize");
1517
1518                // # Security
1519                //
1520                // In most cases, we try to split over-sized responses into multiple network-layer
1521                // messages. But we are unlikely to reach this limit with the default mempool
1522                // config, so a response like this could indicate a network amplification attack.
1523                //
1524                // If there are thousands of transactions in the mempool, letting peers know the
1525                // exact transactions we have isn't that important, so it's ok to drop arbitrary
1526                // transaction hashes from our response.
1527                if hashes.len() > max_tx_inv_in_message {
1528                    debug!(inv_count = ?hashes.len(), ?MAX_TX_INV_IN_SENT_MESSAGE, "unusually large transaction ID response");
1529                }
1530
1531                let hashes = hashes
1532                    .into_iter()
1533                    .take(max_tx_inv_in_message)
1534                    .map(Into::into)
1535                    .collect();
1536
1537                if let Err(e) = self.peer_tx.send(Message::Inv(hashes)).await {
1538                    self.fail_with(e).await
1539                }
1540            }
1541        }
1542
1543        debug!(state = %self.state, %req, %rsp, "sent Zebra response to peer");
1544
1545        // Give the inbound service time to clear its queue,
1546        // before checking the connection for the next inbound or outbound request.
1547        tokio::task::yield_now().await;
1548    }
1549
1550    /// Handle inbound service overload and timeout error responses by randomly terminating some
1551    /// connections.
1552    ///
1553    /// # Security
1554    ///
1555    /// When the inbound service is overloaded with requests, Zebra needs to drop some connections,
1556    /// to reduce the load on the application. But dropping every connection that receives an
1557    /// `Overloaded` error from the inbound service could cause Zebra to drop too many peer
1558    /// connections, and stop itself downloading blocks or transactions.
1559    ///
1560    /// Malicious or misbehaving peers can also overload the inbound service, and make Zebra drop
1561    /// its connections to other peers.
1562    ///
1563    /// So instead, Zebra drops some overloaded connections at random. If a connection has recently
1564    /// overloaded the inbound service, it is more likely to be dropped. This makes it harder for a
1565    /// single peer (or multiple peers) to perform a denial of service attack.
1566    ///
1567    /// The inbound connection rate-limit also makes it hard for multiple peers to perform this
1568    /// attack, because each inbound connection can only send one inbound request before its
1569    /// probability of being disconnected increases.
1570    async fn handle_inbound_overload(&mut self, req: Request, now: Instant, error: PeerError) {
1571        let prev = self.last_overload_time.replace(now);
1572        let drop_connection_probability = overload_drop_connection_probability(now, prev);
1573
1574        if thread_rng().gen::<f32>() < drop_connection_probability {
1575            if matches!(error, PeerError::Overloaded) {
1576                metrics::counter!("pool.closed.loadshed").increment(1);
1577            } else {
1578                metrics::counter!("pool.closed.inbound.timeout").increment(1);
1579            }
1580
1581            tracing::info!(
1582                drop_connection_probability = format!("{drop_connection_probability:.3}"),
1583                remote_user_agent = ?self.connection_info.remote.user_agent,
1584                negotiated_version = ?self.connection_info.negotiated_version,
1585                peer = ?self.metrics_label,
1586                last_peer_state = ?self.last_metrics_state,
1587                // TODO: remove this detailed debug info once #6506 is fixed
1588                remote_height = ?self.connection_info.remote.start_height,
1589                cached_addrs = ?self.cached_addrs.len(),
1590                connection_state = ?self.state,
1591                "inbound service {error} error, closing connection",
1592            );
1593
1594            self.update_state_metrics(format!("In::Req::{}/Rsp::{error}::Error", req.command()));
1595            self.fail_with(error).await;
1596        } else {
1597            self.update_state_metrics(format!("In::Req::{}/Rsp::{error}::Ignored", req.command()));
1598
1599            if matches!(error, PeerError::Overloaded) {
1600                metrics::counter!("pool.ignored.loadshed").increment(1);
1601            } else {
1602                metrics::counter!("pool.ignored.inbound.timeout").increment(1);
1603            }
1604        }
1605    }
1606}
1607
1608/// Returns the probability of dropping a connection where the last overload was at `prev`,
1609/// and the current overload is `now`.
1610///
1611/// # Security
1612///
1613/// Connections that haven't seen an overload error in the past OVERLOAD_PROTECTION_INTERVAL
1614/// have a small chance of being closed (MIN_OVERLOAD_DROP_PROBABILITY).
1615///
1616/// Connections that have seen a previous overload error in that time
1617/// have a higher chance of being dropped up to MAX_OVERLOAD_DROP_PROBABILITY.
1618/// This probability increases quadratically, so peers that send lots of inbound
1619/// requests are more likely to be dropped.
1620///
1621/// ## Examples
1622///
1623/// If a connection sends multiple overloads close together, it is very likely to be
1624/// disconnected. If a connection has two overloads multiple seconds apart, it is unlikely
1625/// to be disconnected.
1626fn overload_drop_connection_probability(now: Instant, prev: Option<Instant>) -> f32 {
1627    let Some(prev) = prev else {
1628        return MIN_OVERLOAD_DROP_PROBABILITY;
1629    };
1630
1631    let protection_fraction_since_last_overload =
1632        (now - prev).as_secs_f32() / OVERLOAD_PROTECTION_INTERVAL.as_secs_f32();
1633
1634    // Quadratically increase the disconnection probability for very recent overloads.
1635    // Negative values are ignored by clamping to MIN_OVERLOAD_DROP_PROBABILITY.
1636    let overload_fraction = protection_fraction_since_last_overload.powi(2);
1637
1638    let probability_range = MAX_OVERLOAD_DROP_PROBABILITY - MIN_OVERLOAD_DROP_PROBABILITY;
1639    let raw_drop_probability =
1640        MAX_OVERLOAD_DROP_PROBABILITY - (overload_fraction * probability_range);
1641
1642    raw_drop_probability.clamp(MIN_OVERLOAD_DROP_PROBABILITY, MAX_OVERLOAD_DROP_PROBABILITY)
1643}
1644
1645impl<S, Tx> Connection<S, Tx>
1646where
1647    Tx: Sink<Message, Error = SerializationError> + Unpin,
1648{
1649    /// Update the connection state metrics for this connection,
1650    /// using `extra_state_info` as additional state information.
1651    fn update_state_metrics(&mut self, extra_state_info: impl Into<Option<String>>) {
1652        let current_metrics_state = if let Some(extra_state_info) = extra_state_info.into() {
1653            format!("{}::{extra_state_info}", self.state.command()).into()
1654        } else {
1655            self.state.command()
1656        };
1657
1658        if self.last_metrics_state.as_ref() == Some(&current_metrics_state) {
1659            return;
1660        }
1661
1662        self.erase_state_metrics();
1663
1664        // Set the new state
1665        metrics::gauge!(
1666            "zebra.net.connection.state",
1667            "command" => current_metrics_state.clone(),
1668            "addr" => self.metrics_label.clone(),
1669        )
1670        .increment(1.0);
1671
1672        self.last_metrics_state = Some(current_metrics_state);
1673    }
1674
1675    /// Erase the connection state metrics for this connection.
1676    fn erase_state_metrics(&mut self) {
1677        if let Some(last_metrics_state) = self.last_metrics_state.take() {
1678            metrics::gauge!(
1679                "zebra.net.connection.state",
1680                "command" => last_metrics_state,
1681                "addr" => self.metrics_label.clone(),
1682            )
1683            .set(0.0);
1684        }
1685    }
1686
1687    /// Marks the peer as having failed with `error`, and performs connection cleanup,
1688    /// including async channel closes.
1689    ///
1690    /// If the connection has errored already, re-use the original error.
1691    /// Otherwise, fail the connection with `error`.
1692    async fn shutdown_async(&mut self, error: impl Into<SharedPeerError>) {
1693        // Close async channels first, so other tasks can start shutting down.
1694        // There's nothing we can do about errors while shutting down, and some errors are expected.
1695        //
1696        // TODO: close peer_tx and peer_rx in shutdown() and Drop, after:
1697        // - using channels instead of streams/sinks?
1698        // - exposing the underlying implementation rather than using generics and closures?
1699        // - adding peer_rx to the connection struct (optional)
1700        let _ = self.peer_tx.close().await;
1701
1702        self.shutdown(error);
1703    }
1704
1705    /// Marks the peer as having failed with `error`, and performs connection cleanup.
1706    /// See [`Self::shutdown_async()`] for details.
1707    ///
1708    /// Call [`Self::shutdown_async()`] in async code, because it can shut down more channels.
1709    fn shutdown(&mut self, error: impl Into<SharedPeerError>) {
1710        let mut error = error.into();
1711
1712        // Close channels first, so other tasks can start shutting down.
1713        self.client_rx.close();
1714
1715        // Update the shared error slot
1716        //
1717        // # Correctness
1718        //
1719        // Error slots use a threaded `std::sync::Mutex`, so accessing the slot
1720        // can block the async task's current thread. We only perform a single
1721        // slot update per `Client`. We ignore subsequent error slot updates.
1722        let slot_result = self.error_slot.try_update_error(error.clone());
1723
1724        if let Err(AlreadyErrored { original_error }) = slot_result {
1725            debug!(
1726                new_error = %error,
1727                %original_error,
1728                connection_state = ?self.state,
1729                "multiple errors on connection: \
1730                 failed connections should stop processing pending requests and responses, \
1731                 then close the connection"
1732            );
1733
1734            error = original_error;
1735        } else {
1736            debug!(%error,
1737                   connection_state = ?self.state,
1738                   "shutting down peer service with error");
1739        }
1740
1741        // Prepare to flush any pending client requests.
1742        //
1743        // We've already closed the client channel, so setting State::Failed
1744        // will make the main loop flush any pending requests.
1745        //
1746        // However, we may have an outstanding client request in State::AwaitingResponse,
1747        // so we need to deal with it first.
1748        if let State::AwaitingResponse { tx, .. } =
1749            std::mem::replace(&mut self.state, State::Failed)
1750        {
1751            // # Correctness
1752            //
1753            // We know the slot has Some(error), because we just set it above,
1754            // and the error slot is never unset.
1755            //
1756            // Accessing the error slot locks a threaded std::sync::Mutex, which
1757            // can block the current async task thread. We briefly lock the mutex
1758            // to clone the error.
1759            let _ = tx.send(Err(error.clone()));
1760        }
1761
1762        // Make the timer and metrics consistent with the Failed state.
1763        self.request_timer = None;
1764        self.update_state_metrics(None);
1765
1766        // Finally, flush pending client requests.
1767        while let Some(InProgressClientRequest { tx, span, .. }) =
1768            self.client_rx.close_and_flush_next()
1769        {
1770            trace!(
1771                parent: &span,
1772                %error,
1773                "sending an error response to a pending request on a failed connection"
1774            );
1775            let _ = tx.send(Err(error.clone()));
1776        }
1777    }
1778}
1779
1780impl<S, Tx> Drop for Connection<S, Tx>
1781where
1782    Tx: Sink<Message, Error = SerializationError> + Unpin,
1783{
1784    fn drop(&mut self) {
1785        self.shutdown(PeerError::ConnectionDropped);
1786
1787        self.erase_state_metrics();
1788    }
1789}
1790
1791/// Map a list of inventory hashes to the corresponding unmined transaction IDs.
1792/// Non-transaction inventory hashes are skipped.
1793///
1794/// v4 transactions use a legacy transaction ID, and
1795/// v5 transactions use a witnessed transaction ID.
1796fn transaction_ids(items: &'_ [InventoryHash]) -> impl Iterator<Item = UnminedTxId> + '_ {
1797    items.iter().filter_map(InventoryHash::unmined_tx_id)
1798}
1799
1800/// Map a list of inventory hashes to the corresponding block hashes.
1801/// Non-block inventory hashes are skipped.
1802fn block_hashes(items: &'_ [InventoryHash]) -> impl Iterator<Item = block::Hash> + '_ {
1803    items.iter().filter_map(InventoryHash::block_hash)
1804}