zebra_network/peer/connection.rs
1//! Zebra's per-peer connection state machine.
2//!
3//! Maps the external Zcash/Bitcoin protocol to Zebra's internal request/response
4//! protocol.
5//!
6//! This module contains a lot of undocumented state, assumptions and invariants.
7//! And it's unclear if these assumptions match the `zcashd` implementation.
8//! It should be refactored into a cleaner set of request/response pairs (#1515).
9
10use std::{borrow::Cow, collections::HashSet, fmt, pin::Pin, sync::Arc, time::Instant};
11
12use futures::{future::Either, prelude::*};
13use rand::{seq::SliceRandom, thread_rng, Rng};
14use tokio::time::{sleep, Sleep};
15use tower::{Service, ServiceExt};
16use tracing_futures::Instrument;
17
18use zebra_chain::{
19 block::{self, Block},
20 serialization::SerializationError,
21 transaction::{UnminedTx, UnminedTxId},
22};
23
24use crate::{
25 constants::{
26 self, MAX_ADDRS_IN_MESSAGE, MAX_OVERLOAD_DROP_PROBABILITY, MIN_OVERLOAD_DROP_PROBABILITY,
27 OVERLOAD_PROTECTION_INTERVAL, PEER_ADDR_RESPONSE_LIMIT,
28 },
29 meta_addr::MetaAddr,
30 peer::{
31 connection::peer_tx::PeerTx, error::AlreadyErrored, ClientRequest, ClientRequestReceiver,
32 ConnectionInfo, ErrorSlot, InProgressClientRequest, MustUseClientResponseSender, PeerError,
33 SharedPeerError,
34 },
35 peer_set::ConnectionTracker,
36 protocol::{
37 external::{types::Nonce, InventoryHash, Message},
38 internal::{InventoryResponse, Request, Response},
39 },
40 BoxError, PeerSocketAddr, MAX_TX_INV_IN_SENT_MESSAGE,
41};
42
43use InventoryResponse::*;
44
45mod peer_tx;
46
47#[cfg(test)]
48mod tests;
49
50#[derive(Debug)]
51pub(super) enum Handler {
52 /// Indicates that the handler has finished processing the request.
53 /// An error here is scoped to the request.
54 Finished(Result<Response, PeerError>),
55 Ping(Nonce),
56 Peers,
57 FindBlocks,
58 FindHeaders,
59 BlocksByHash {
60 pending_hashes: HashSet<block::Hash>,
61 blocks: Vec<Arc<Block>>,
62 },
63 TransactionsById {
64 pending_ids: HashSet<UnminedTxId>,
65 transactions: Vec<UnminedTx>,
66 },
67 MempoolTransactionIds,
68}
69
70impl fmt::Display for Handler {
71 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
72 f.write_str(&match self {
73 Handler::Finished(Ok(response)) => format!("Finished({response})"),
74 Handler::Finished(Err(error)) => format!("Finished({error})"),
75
76 Handler::Ping(_) => "Ping".to_string(),
77 Handler::Peers => "Peers".to_string(),
78
79 Handler::FindBlocks => "FindBlocks".to_string(),
80 Handler::FindHeaders => "FindHeaders".to_string(),
81 Handler::BlocksByHash {
82 pending_hashes,
83 blocks,
84 } => format!(
85 "BlocksByHash {{ pending_hashes: {}, blocks: {} }}",
86 pending_hashes.len(),
87 blocks.len()
88 ),
89
90 Handler::TransactionsById {
91 pending_ids,
92 transactions,
93 } => format!(
94 "TransactionsById {{ pending_ids: {}, transactions: {} }}",
95 pending_ids.len(),
96 transactions.len()
97 ),
98 Handler::MempoolTransactionIds => "MempoolTransactionIds".to_string(),
99 })
100 }
101}
102
103impl Handler {
104 /// Returns the Zebra internal handler type as a string.
105 pub fn command(&self) -> Cow<'static, str> {
106 match self {
107 Handler::Finished(Ok(response)) => format!("Finished({})", response.command()).into(),
108 Handler::Finished(Err(error)) => format!("Finished({})", error.kind()).into(),
109
110 Handler::Ping(_) => "Ping".into(),
111 Handler::Peers => "Peers".into(),
112
113 Handler::FindBlocks => "FindBlocks".into(),
114 Handler::FindHeaders => "FindHeaders".into(),
115
116 Handler::BlocksByHash { .. } => "BlocksByHash".into(),
117 Handler::TransactionsById { .. } => "TransactionsById".into(),
118
119 Handler::MempoolTransactionIds => "MempoolTransactionIds".into(),
120 }
121 }
122
123 /// Try to handle `msg` as a response to a client request, possibly consuming
124 /// it in the process.
125 ///
126 /// This function is where we statefully interpret Bitcoin/Zcash messages
127 /// into responses to messages in the internal request/response protocol.
128 /// This conversion is done by a sequence of (request, message) match arms,
129 /// each of which contains the conversion logic for that pair.
130 ///
131 /// Taking ownership of the message means that we can pass ownership of its
132 /// contents to responses without additional copies. If the message is not
133 /// interpretable as a response, we return ownership to the caller.
134 ///
135 /// Unexpected messages are left unprocessed, and may be rejected later.
136 ///
137 /// `addr` responses are limited to avoid peer set takeover. Any excess
138 /// addresses are stored in `cached_addrs`.
139 fn process_message(
140 &mut self,
141 msg: Message,
142 cached_addrs: &mut Vec<MetaAddr>,
143 transient_addr: Option<PeerSocketAddr>,
144 ) -> Option<Message> {
145 let mut ignored_msg = None;
146 // TODO: can this be avoided?
147 let tmp_state = std::mem::replace(self, Handler::Finished(Ok(Response::Nil)));
148
149 debug!(handler = %tmp_state, %msg, "received peer response to Zebra request");
150
151 *self = match (tmp_state, msg) {
152 (Handler::Ping(req_nonce), Message::Pong(rsp_nonce)) => {
153 if req_nonce == rsp_nonce {
154 Handler::Finished(Ok(Response::Nil))
155 } else {
156 Handler::Ping(req_nonce)
157 }
158 }
159
160 (Handler::Peers, Message::Addr(new_addrs)) => {
161 // Security: This method performs security-sensitive operations, see its comments
162 // for details.
163 let response_addrs =
164 Handler::update_addr_cache(cached_addrs, &new_addrs, PEER_ADDR_RESPONSE_LIMIT);
165
166 debug!(
167 new_addrs = new_addrs.len(),
168 response_addrs = response_addrs.len(),
169 remaining_addrs = cached_addrs.len(),
170 PEER_ADDR_RESPONSE_LIMIT,
171 "responding to Peers request using new and cached addresses",
172 );
173
174 Handler::Finished(Ok(Response::Peers(response_addrs)))
175 }
176
177 // `zcashd` returns requested transactions in a single batch of messages.
178 // Other transaction or non-transaction messages can come before or after the batch.
179 // After the transaction batch, `zcashd` sends `notfound` if any transactions are missing:
180 // https://github.com/zcash/zcash/blob/e7b425298f6d9a54810cb7183f00be547e4d9415/src/main.cpp#L5617
181 (
182 Handler::TransactionsById {
183 mut pending_ids,
184 mut transactions,
185 },
186 Message::Tx(transaction),
187 ) => {
188 // assumptions:
189 // - the transaction messages are sent in a single continuous batch
190 // - missing transactions are silently skipped
191 // (there is no `notfound` message at the end of the batch)
192 if pending_ids.remove(&transaction.id) {
193 // we are in the middle of the continuous transaction messages
194 transactions.push(transaction);
195 } else {
196 // We got a transaction we didn't ask for. If the caller doesn't know any of the
197 // transactions, they should have sent a `notfound` with all the hashes, rather
198 // than an unsolicited transaction.
199 //
200 // So either:
201 // 1. The peer implements the protocol badly, skipping `notfound`.
202 // We should cancel the request, so we don't hang waiting for transactions
203 // that will never arrive.
204 // 2. The peer sent an unsolicited transaction.
205 // We should ignore the transaction, and wait for the actual response.
206 //
207 // We end the request, so we don't hang on bad peers (case 1). But we keep the
208 // connection open, so the inbound service can process transactions from good
209 // peers (case 2).
210 ignored_msg = Some(Message::Tx(transaction));
211 }
212
213 if ignored_msg.is_some() && transactions.is_empty() {
214 // If we didn't get anything we wanted, retry the request.
215 let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect();
216 Handler::Finished(Err(PeerError::NotFoundResponse(missing_transaction_ids)))
217 } else if pending_ids.is_empty() || ignored_msg.is_some() {
218 // If we got some of what we wanted, let the internal client know.
219 let available = transactions
220 .into_iter()
221 .map(|t| InventoryResponse::Available((t, transient_addr)));
222 let missing = pending_ids.into_iter().map(InventoryResponse::Missing);
223
224 Handler::Finished(Ok(Response::Transactions(
225 available.chain(missing).collect(),
226 )))
227 } else {
228 // Keep on waiting for more.
229 Handler::TransactionsById {
230 pending_ids,
231 transactions,
232 }
233 }
234 }
235 // `zcashd` peers actually return this response
236 (
237 Handler::TransactionsById {
238 pending_ids,
239 transactions,
240 },
241 Message::NotFound(missing_invs),
242 ) => {
243 // assumptions:
244 // - the peer eventually returns a transaction or a `notfound` entry
245 // for each hash
246 // - all `notfound` entries are contained in a single message
247 // - the `notfound` message comes after the transaction messages
248 //
249 // If we're in sync with the peer, then the `notfound` should contain the remaining
250 // hashes from the handler. If we're not in sync with the peer, we should return
251 // what we got so far.
252 let missing_transaction_ids: HashSet<_> = transaction_ids(&missing_invs).collect();
253 if missing_transaction_ids != pending_ids {
254 trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids);
255 // if these errors are noisy, we should replace them with debugs
256 debug!("unexpected notfound message from peer: all remaining transaction hashes should be listed in the notfound. Using partial received transactions as the peer response");
257 }
258 if missing_transaction_ids.len() != missing_invs.len() {
259 trace!(?missing_invs, ?missing_transaction_ids, ?pending_ids);
260 debug!("unexpected notfound message from peer: notfound contains duplicate hashes or non-transaction hashes. Using partial received transactions as the peer response");
261 }
262
263 if transactions.is_empty() {
264 // If we didn't get anything we wanted, retry the request.
265 let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect();
266 Handler::Finished(Err(PeerError::NotFoundResponse(missing_transaction_ids)))
267 } else {
268 // If we got some of what we wanted, let the internal client know.
269 let available = transactions
270 .into_iter()
271 .map(|t| InventoryResponse::Available((t, transient_addr)));
272 let missing = pending_ids.into_iter().map(InventoryResponse::Missing);
273
274 Handler::Finished(Ok(Response::Transactions(
275 available.chain(missing).collect(),
276 )))
277 }
278 }
279
280 // `zcashd` returns requested blocks in a single batch of messages.
281 // Other blocks or non-blocks messages can come before or after the batch.
282 // `zcashd` silently skips missing blocks, rather than sending a final `notfound` message.
283 // https://github.com/zcash/zcash/blob/e7b425298f6d9a54810cb7183f00be547e4d9415/src/main.cpp#L5523
284 (
285 Handler::BlocksByHash {
286 mut pending_hashes,
287 mut blocks,
288 },
289 Message::Block(block),
290 ) => {
291 // assumptions:
292 // - the block messages are sent in a single continuous batch
293 // - missing blocks are silently skipped
294 // (there is no `notfound` message at the end of the batch)
295 if pending_hashes.remove(&block.hash()) {
296 // we are in the middle of the continuous block messages
297 blocks.push(block);
298 } else {
299 // We got a block we didn't ask for.
300 //
301 // So either:
302 // 1. The response is for a previously cancelled block request.
303 // We should treat that block as an inbound gossiped block,
304 // and wait for the actual response.
305 // 2. The peer doesn't know any of the blocks we asked for.
306 // We should cancel the request, so we don't hang waiting for blocks that
307 // will never arrive.
308 // 3. The peer sent an unsolicited block.
309 // We should treat that block as an inbound gossiped block,
310 // and wait for the actual response.
311 //
312 // We ignore the message, so we don't desynchronize with the peer. This happens
313 // when we cancel a request and send a second different request, but receive a
314 // response for the first request. If we ended the request then, we could send
315 // a third request to the peer, and end up having to end that request as well
316 // when the response for the second request arrives.
317 //
318 // Ignoring the message gives us a chance to synchronize back to the correct
319 // request. If that doesn't happen, this request times out.
320 //
321 // In case 2, if peers respond with a `notfound` message,
322 // the cascading errors don't happen. The `notfound` message cancels our request,
323 // and we know we are in sync with the peer.
324 //
325 // Zebra sends `notfound` in response to block requests, but `zcashd` doesn't.
326 // So we need this message workaround, and the related inventory workarounds.
327 ignored_msg = Some(Message::Block(block));
328 }
329
330 if pending_hashes.is_empty() {
331 // If we got everything we wanted, let the internal client know.
332 let available = blocks
333 .into_iter()
334 .map(|block| InventoryResponse::Available((block, transient_addr)));
335 Handler::Finished(Ok(Response::Blocks(available.collect())))
336 } else {
337 // Keep on waiting for all the blocks we wanted, until we get them or time out.
338 Handler::BlocksByHash {
339 pending_hashes,
340 blocks,
341 }
342 }
343 }
344 // peers are allowed to return this response, but `zcashd` never does
345 (
346 Handler::BlocksByHash {
347 pending_hashes,
348 blocks,
349 },
350 Message::NotFound(missing_invs),
351 ) => {
352 // assumptions:
353 // - the peer eventually returns a block or a `notfound` entry
354 // for each hash
355 // - all `notfound` entries are contained in a single message
356 // - the `notfound` message comes after the block messages
357 //
358 // If we're in sync with the peer, then the `notfound` should contain the remaining
359 // hashes from the handler. If we're not in sync with the peer, we should return
360 // what we got so far, and log an error.
361 let missing_blocks: HashSet<_> = block_hashes(&missing_invs).collect();
362 if missing_blocks != pending_hashes {
363 trace!(?missing_invs, ?missing_blocks, ?pending_hashes);
364 // if these errors are noisy, we should replace them with debugs
365 debug!("unexpected notfound message from peer: all remaining block hashes should be listed in the notfound. Using partial received blocks as the peer response");
366 }
367 if missing_blocks.len() != missing_invs.len() {
368 trace!(?missing_invs, ?missing_blocks, ?pending_hashes);
369 debug!("unexpected notfound message from peer: notfound contains duplicate hashes or non-block hashes. Using partial received blocks as the peer response");
370 }
371
372 if blocks.is_empty() {
373 // If we didn't get anything we wanted, retry the request.
374 let missing_block_hashes = pending_hashes.into_iter().map(Into::into).collect();
375 Handler::Finished(Err(PeerError::NotFoundResponse(missing_block_hashes)))
376 } else {
377 // If we got some of what we wanted, let the internal client know.
378 let available = blocks
379 .into_iter()
380 .map(|block| InventoryResponse::Available((block, transient_addr)));
381 let missing = pending_hashes.into_iter().map(InventoryResponse::Missing);
382
383 Handler::Finished(Ok(Response::Blocks(available.chain(missing).collect())))
384 }
385 }
386
387 // TODO:
388 // - use `any(inv)` rather than `all(inv)`?
389 (Handler::FindBlocks, Message::Inv(items))
390 if items
391 .iter()
392 .all(|item| matches!(item, InventoryHash::Block(_))) =>
393 {
394 Handler::Finished(Ok(Response::BlockHashes(
395 block_hashes(&items[..]).collect(),
396 )))
397 }
398 (Handler::FindHeaders, Message::Headers(headers)) => {
399 Handler::Finished(Ok(Response::BlockHeaders(headers)))
400 }
401
402 (Handler::MempoolTransactionIds, Message::Inv(items))
403 if items.iter().all(|item| item.unmined_tx_id().is_some()) =>
404 {
405 Handler::Finished(Ok(Response::TransactionIds(
406 transaction_ids(&items).collect(),
407 )))
408 }
409
410 // By default, messages are not responses.
411 (state, msg) => {
412 trace!(?msg, "did not interpret message as response");
413 ignored_msg = Some(msg);
414 state
415 }
416 };
417
418 ignored_msg
419 }
420
421 /// Adds `new_addrs` to the `cached_addrs` cache, then takes and returns `response_size`
422 /// addresses from that cache.
423 ///
424 /// `cached_addrs` can be empty if the cache is empty. `new_addrs` can be empty or `None` if
425 /// there are no new addresses. `response_size` can be zero or `None` if there is no response
426 /// needed.
427 fn update_addr_cache<'new>(
428 cached_addrs: &mut Vec<MetaAddr>,
429 new_addrs: impl IntoIterator<Item = &'new MetaAddr>,
430 response_size: impl Into<Option<usize>>,
431 ) -> Vec<MetaAddr> {
432 // # Peer Set Reliability
433 //
434 // Newly received peers are added to the cache, so that we can use them if the connection
435 // doesn't respond to our getaddr requests.
436 //
437 // Add the new addresses to the end of the cache.
438 cached_addrs.extend(new_addrs);
439
440 // # Security
441 //
442 // We limit how many peer addresses we take from each peer, so that our address book
443 // and outbound connections aren't controlled by a single peer (#1869). We randomly select
444 // peers, so the remote peer can't control which addresses we choose by changing the order
445 // in the messages they send.
446 let response_size = response_size.into().unwrap_or_default();
447
448 let mut temp_cache = Vec::new();
449 std::mem::swap(cached_addrs, &mut temp_cache);
450
451 // The response is fully shuffled, remaining is partially shuffled.
452 let (response, remaining) = temp_cache.partial_shuffle(&mut thread_rng(), response_size);
453
454 // # Security
455 //
456 // The cache size is limited to avoid memory denial of service.
457 //
458 // It's ok to just partially shuffle the cache, because it doesn't actually matter which
459 // peers we drop. Having excess peers is rare, because most peers only send one large
460 // unsolicited peer message when they first connect.
461 *cached_addrs = remaining.to_vec();
462 cached_addrs.truncate(MAX_ADDRS_IN_MESSAGE);
463
464 response.to_vec()
465 }
466}
467
468#[derive(Debug)]
469#[must_use = "AwaitingResponse.tx.send() must be called before drop"]
470pub(super) enum State {
471 /// Awaiting a client request or a peer message.
472 AwaitingRequest,
473 /// Awaiting a peer message we can interpret as a response to a client request.
474 AwaitingResponse {
475 handler: Handler,
476 tx: MustUseClientResponseSender,
477 span: tracing::Span,
478 },
479 /// A failure has occurred and we are shutting down the connection.
480 Failed,
481}
482
483impl fmt::Display for State {
484 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
485 f.write_str(&match self {
486 State::AwaitingRequest => "AwaitingRequest".to_string(),
487 State::AwaitingResponse { handler, .. } => {
488 format!("AwaitingResponse({handler})")
489 }
490 State::Failed => "Failed".to_string(),
491 })
492 }
493}
494
495impl State {
496 /// Returns the Zebra internal state as a string.
497 pub fn command(&self) -> Cow<'static, str> {
498 match self {
499 State::AwaitingRequest => "AwaitingRequest".into(),
500 State::AwaitingResponse { handler, .. } => {
501 format!("AwaitingResponse({})", handler.command()).into()
502 }
503 State::Failed => "Failed".into(),
504 }
505 }
506}
507
508/// The outcome of mapping an inbound [`Message`] to a [`Request`].
509#[derive(Clone, Debug, Eq, PartialEq)]
510#[must_use = "inbound messages must be handled"]
511pub enum InboundMessage {
512 /// The message was mapped to an inbound [`Request`].
513 AsRequest(Request),
514
515 /// The message was consumed by the mapping method.
516 ///
517 /// For example, it could be cached, treated as an error,
518 /// or an internally handled [`Message::Ping`].
519 Consumed,
520
521 /// The message was not used by the inbound message handler.
522 Unused,
523}
524
525impl From<Request> for InboundMessage {
526 fn from(request: Request) -> Self {
527 InboundMessage::AsRequest(request)
528 }
529}
530
531/// The channels, services, and associated state for a peer connection.
532pub struct Connection<S, Tx>
533where
534 Tx: Sink<Message, Error = SerializationError> + Unpin,
535{
536 /// The metadata for the connected peer `service`.
537 ///
538 /// This field is used for debugging.
539 pub connection_info: Arc<ConnectionInfo>,
540
541 /// The state of this connection's current request or response.
542 pub(super) state: State,
543
544 /// A timeout for a client request. This is stored separately from
545 /// State so that we can move the future out of it independently of
546 /// other state handling.
547 pub(super) request_timer: Option<Pin<Box<Sleep>>>,
548
549 /// Unused peers from recent `addr` or `addrv2` messages from this peer.
550 /// Also holds the initial addresses sent in `version` messages, or guessed from the remote IP.
551 ///
552 /// When peers send solicited or unsolicited peer advertisements, Zebra puts them in this cache.
553 ///
554 /// When Zebra's components request peers, some cached peers are randomly selected,
555 /// consumed, and returned as a modified response. This works around `zcashd`'s address
556 /// response rate-limit.
557 ///
558 /// The cache size is limited to avoid denial of service attacks.
559 pub(super) cached_addrs: Vec<MetaAddr>,
560
561 /// The `inbound` service, used to answer requests from this connection's peer.
562 pub(super) svc: S,
563
564 /// A channel for requests that Zebra's internal services want to send to remote peers.
565 ///
566 /// This channel accepts [`Request`]s, and produces [`InProgressClientRequest`]s.
567 pub(super) client_rx: ClientRequestReceiver,
568
569 /// A slot for an error shared between the Connection and the Client that uses it.
570 ///
571 /// `None` unless the connection or client have errored.
572 pub(super) error_slot: ErrorSlot,
573
574 /// A channel for sending Zcash messages to the connected peer.
575 ///
576 /// This channel accepts [`Message`]s.
577 ///
578 /// The corresponding peer message receiver is passed to [`Connection::run`].
579 pub(super) peer_tx: PeerTx<Tx>,
580
581 /// A connection tracker that reduces the open connection count when dropped.
582 /// Used to limit the number of open connections in Zebra.
583 ///
584 /// This field does nothing until it is dropped.
585 ///
586 /// # Security
587 ///
588 /// If this connection tracker or `Connection`s are leaked,
589 /// the number of active connections will appear higher than it actually is.
590 /// If enough connections leak, Zebra will stop making new connections.
591 #[allow(dead_code)]
592 pub(super) connection_tracker: ConnectionTracker,
593
594 /// The metrics label for this peer. Usually the remote IP and port.
595 pub(super) metrics_label: String,
596
597 /// The state for this peer, when the metrics were last updated.
598 pub(super) last_metrics_state: Option<Cow<'static, str>>,
599
600 /// The time of the last overload error response from the inbound
601 /// service to a request from this connection,
602 /// or None if this connection hasn't yet received an overload error.
603 last_overload_time: Option<Instant>,
604}
605
606impl<S, Tx> fmt::Debug for Connection<S, Tx>
607where
608 Tx: Sink<Message, Error = SerializationError> + Unpin,
609{
610 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
611 // skip the channels, they don't tell us anything useful
612 f.debug_struct(std::any::type_name::<Connection<S, Tx>>())
613 .field("connection_info", &self.connection_info)
614 .field("state", &self.state)
615 .field("request_timer", &self.request_timer)
616 .field("cached_addrs", &self.cached_addrs.len())
617 .field("error_slot", &self.error_slot)
618 .field("metrics_label", &self.metrics_label)
619 .field("last_metrics_state", &self.last_metrics_state)
620 .field("last_overload_time", &self.last_overload_time)
621 .finish()
622 }
623}
624
625impl<S, Tx> Connection<S, Tx>
626where
627 Tx: Sink<Message, Error = SerializationError> + Unpin,
628{
629 /// Return a new connection from its channels, services, shared state, and metadata.
630 pub(crate) fn new(
631 inbound_service: S,
632 client_rx: futures::channel::mpsc::Receiver<ClientRequest>,
633 error_slot: ErrorSlot,
634 peer_tx: Tx,
635 connection_tracker: ConnectionTracker,
636 connection_info: Arc<ConnectionInfo>,
637 initial_cached_addrs: Vec<MetaAddr>,
638 ) -> Self {
639 let metrics_label = connection_info.connected_addr.get_transient_addr_label();
640
641 Connection {
642 connection_info,
643 state: State::AwaitingRequest,
644 request_timer: None,
645 cached_addrs: initial_cached_addrs,
646 svc: inbound_service,
647 client_rx: client_rx.into(),
648 error_slot,
649 peer_tx: peer_tx.into(),
650 connection_tracker,
651 metrics_label,
652 last_metrics_state: None,
653 last_overload_time: None,
654 }
655 }
656}
657
658impl<S, Tx> Connection<S, Tx>
659where
660 S: Service<Request, Response = Response, Error = BoxError>,
661 S::Error: Into<BoxError>,
662 Tx: Sink<Message, Error = SerializationError> + Unpin,
663{
664 /// Consume this `Connection` to form a spawnable future containing its event loop.
665 ///
666 /// `peer_rx` is a channel for receiving Zcash [`Message`]s from the connected peer.
667 /// The corresponding peer message receiver is [`Connection.peer_tx`].
668 pub async fn run<Rx>(mut self, mut peer_rx: Rx)
669 where
670 Rx: Stream<Item = Result<Message, SerializationError>> + Unpin,
671 {
672 // At a high level, the event loop we want is as follows: we check for any
673 // incoming messages from the remote peer, check if they should be interpreted
674 // as a response to a pending client request, and if not, interpret them as a
675 // request from the remote peer to our node.
676 //
677 // We also need to handle those client requests in the first place. The client
678 // requests are received from the corresponding `peer::Client` over a bounded
679 // channel (with bound 1, to minimize buffering), but there is no relationship
680 // between the stream of client requests and the stream of peer messages, so we
681 // cannot ignore one kind while waiting on the other. Moreover, we cannot accept
682 // a second client request while the first one is still pending.
683 //
684 // To do this, we inspect the current request state.
685 //
686 // If there is no pending request, we wait on either an incoming peer message or
687 // an incoming request, whichever comes first.
688 //
689 // If there is a pending request, we wait only on an incoming peer message, and
690 // check whether it can be interpreted as a response to the pending request.
691 //
692 // TODO: turn this comment into a module-level comment, after splitting the module.
693 loop {
694 self.update_state_metrics(None);
695
696 match self.state {
697 State::AwaitingRequest => {
698 trace!("awaiting client request or peer message");
699 // # Correctness
700 //
701 // Currently, select prefers the first future if multiple futures are ready.
702 // We use this behaviour to prioritise messages on each individual peer
703 // connection in this order:
704 // - incoming messages from the remote peer, then
705 // - outgoing messages to the remote peer.
706 //
707 // This improves the performance of peer responses to Zebra requests, and new
708 // peer requests to Zebra's inbound service.
709 //
710 // `futures::StreamExt::next()` is cancel-safe:
711 // <https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety>
712 // This means that messages from the future that isn't selected stay in the stream,
713 // and they will be returned next time the future is checked.
714 //
715 // If an inbound peer message arrives at a ready peer that also has a pending
716 // request from Zebra, we want to process the peer's message first.
717 // If we process the Zebra request first:
718 // - we could misinterpret the inbound peer message as a response to the Zebra
719 // request, or
720 // - if the peer message is a request to Zebra, and we put the peer in the
721 // AwaitingResponse state, then we'll correctly ignore the simultaneous Zebra
722 // request. (Zebra services make multiple requests or retry, so this is ok.)
723 //
724 // # Security
725 //
726 // If a peer sends an uninterrupted series of messages, it will delay any new
727 // requests from Zebra to that individual peer. This is behaviour we want,
728 // because:
729 // - any responses to Zebra's requests to that peer would be slow or timeout,
730 // - the peer will eventually fail a Zebra keepalive check and get disconnected,
731 // - if there are too many inbound messages overall, the inbound service will
732 // return an overload error and the peer will be disconnected.
733 //
734 // Messages to other peers will continue to be processed concurrently. Some
735 // Zebra services might be temporarily delayed until the peer times out, if a
736 // request to that peer is sent by the service, and the service blocks until
737 // the request completes (or times out).
738 match future::select(peer_rx.next(), self.client_rx.next()).await {
739 Either::Left((None, _)) => {
740 self.fail_with(PeerError::ConnectionClosed).await;
741 }
742 Either::Left((Some(Err(e)), _)) => self.fail_with(e).await,
743 Either::Left((Some(Ok(msg)), _)) => {
744 let unhandled_msg = self.handle_message_as_request(msg).await;
745
746 if let Some(unhandled_msg) = unhandled_msg {
747 debug!(
748 %unhandled_msg,
749 "ignoring unhandled request while awaiting a request"
750 );
751 }
752 }
753 Either::Right((None, _)) => {
754 trace!("client_rx closed, ending connection");
755
756 // There are no requests to be flushed,
757 // but we need to set an error and update metrics.
758 // (We don't want to log this error, because it's normal behaviour.)
759 self.shutdown_async(PeerError::ClientDropped).await;
760 break;
761 }
762 Either::Right((Some(req), _)) => {
763 let span = req.span.clone();
764 self.handle_client_request(req).instrument(span).await
765 }
766 }
767 }
768
769 // Check whether the handler is finished before waiting for a response message,
770 // because the response might be `Nil` or synthetic.
771 State::AwaitingResponse {
772 handler: Handler::Finished(_),
773 ref span,
774 ..
775 } => {
776 // We have to get rid of the span reference so we can tamper with the state.
777 let span = span.clone();
778 trace!(
779 parent: &span,
780 "returning completed response to client request"
781 );
782
783 // Replace the state with a temporary value,
784 // so we can take ownership of the response sender.
785 let tmp_state = std::mem::replace(&mut self.state, State::Failed);
786
787 if let State::AwaitingResponse {
788 handler: Handler::Finished(response),
789 tx,
790 ..
791 } = tmp_state
792 {
793 if let Ok(response) = response.as_ref() {
794 debug!(%response, "finished receiving peer response to Zebra request");
795 // Add a metric for inbound responses to outbound requests.
796 metrics::counter!(
797 "zebra.net.in.responses",
798 "command" => response.command(),
799 "addr" => self.metrics_label.clone(),
800 )
801 .increment(1);
802 } else {
803 debug!(error = ?response, "error in peer response to Zebra request");
804 }
805
806 let _ = tx.send(response.map_err(Into::into));
807 } else {
808 unreachable!("already checked for AwaitingResponse");
809 }
810
811 self.state = State::AwaitingRequest;
812 }
813
814 // We're awaiting a response to a client request,
815 // so wait on either a peer message, or on a request cancellation.
816 State::AwaitingResponse {
817 ref span,
818 ref mut tx,
819 ..
820 } => {
821 // we have to get rid of the span reference so we can tamper with the state
822 let span = span.clone();
823 trace!(parent: &span, "awaiting response to client request");
824 let timer_ref = self
825 .request_timer
826 .as_mut()
827 .expect("timeout must be set while awaiting response");
828
829 // # Security
830 //
831 // select() prefers the first future if multiple futures are ready.
832 //
833 // If multiple futures are ready, we want the priority for each individual
834 // connection to be:
835 // - cancellation, then
836 // - timeout, then
837 // - peer responses.
838 //
839 // (Messages to other peers are processed concurrently.)
840 //
841 // This makes sure a peer can't block disconnection or timeouts by sending too
842 // many messages. It also avoids doing work to process messages after a
843 // connection has failed.
844 let cancel = future::select(tx.cancellation(), timer_ref);
845 match future::select(cancel, peer_rx.next())
846 .instrument(span.clone())
847 .await
848 {
849 Either::Right((None, _)) => {
850 self.fail_with(PeerError::ConnectionClosed).await
851 }
852 Either::Right((Some(Err(e)), _)) => self.fail_with(e).await,
853 Either::Right((Some(Ok(peer_msg)), _cancel)) => {
854 self.update_state_metrics(format!("Out::Rsp::{}", peer_msg.command()));
855
856 // Try to process the message using the handler.
857 // This extremely awkward construction avoids
858 // keeping a live reference to handler across the
859 // call to handle_message_as_request, which takes
860 // &mut self. This is a sign that we don't properly
861 // factor the state required for inbound and
862 // outbound requests.
863 let request_msg = match self.state {
864 State::AwaitingResponse {
865 ref mut handler, ..
866 } => span.in_scope(|| handler.process_message(peer_msg, &mut self.cached_addrs, self.connection_info.connected_addr.get_transient_addr())),
867 _ => unreachable!("unexpected state after AwaitingResponse: {:?}, peer_msg: {:?}, client_receiver: {:?}",
868 self.state,
869 peer_msg,
870 self.client_rx,
871 ),
872 };
873
874 self.update_state_metrics(None);
875
876 // If the message was not consumed as a response,
877 // check whether it can be handled as a request.
878 let unused_msg = if let Some(request_msg) = request_msg {
879 // do NOT instrument with the request span, this is
880 // independent work
881 self.handle_message_as_request(request_msg).await
882 } else {
883 None
884 };
885
886 if let Some(unused_msg) = unused_msg {
887 debug!(
888 %unused_msg,
889 %self.state,
890 "ignoring peer message: not a response or a request",
891 );
892 }
893 }
894 Either::Left((Either::Right(_), _peer_fut)) => {
895 trace!(parent: &span, "client request timed out");
896 let e = PeerError::ConnectionReceiveTimeout;
897
898 // Replace the state with a temporary value,
899 // so we can take ownership of the response sender.
900 self.state = match std::mem::replace(&mut self.state, State::Failed) {
901 // Special case: ping timeouts fail the connection.
902 State::AwaitingResponse {
903 handler: Handler::Ping(_),
904 tx,
905 ..
906 } => {
907 // We replaced the original state, which means `fail_with` won't see it.
908 // So we do the state request cleanup manually.
909 let e = SharedPeerError::from(e);
910 let _ = tx.send(Err(e.clone()));
911 self.fail_with(e).await;
912 State::Failed
913 }
914 // Other request timeouts fail the request.
915 State::AwaitingResponse { tx, .. } => {
916 let _ = tx.send(Err(e.into()));
917 State::AwaitingRequest
918 }
919 _ => unreachable!(
920 "unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
921 self.client_rx
922 ),
923 };
924 }
925 Either::Left((Either::Left(_), _peer_fut)) => {
926 // The client receiver was dropped, so we don't need to send on `tx` here.
927 trace!(parent: &span, "client request was cancelled");
928 self.state = State::AwaitingRequest;
929 }
930 }
931 }
932
933 // This connection has failed: stop the event loop, and complete the future.
934 State::Failed => break,
935 }
936 }
937
938 // TODO: close peer_rx here, after changing it from a stream to a channel
939
940 let error = self.error_slot.try_get_error();
941 assert!(
942 error.is_some(),
943 "closing connections must call fail_with() or shutdown() to set the error slot"
944 );
945
946 self.update_state_metrics(error.expect("checked is_some").to_string());
947 }
948
949 /// Fail this connection, log the failure, and shut it down.
950 /// See [`Self::shutdown_async()`] for details.
951 ///
952 /// Use [`Self::shutdown_async()`] to avoid logging the failure,
953 /// and [`Self::shutdown()`] from non-async code.
954 async fn fail_with(&mut self, error: impl Into<SharedPeerError>) {
955 let error = error.into();
956
957 debug!(
958 %error,
959 client_receiver = ?self.client_rx,
960 "failing peer service with error"
961 );
962
963 self.shutdown_async(error).await;
964 }
965
966 /// Handle an internal client request, possibly generating outgoing messages to the
967 /// remote peer.
968 ///
969 /// NOTE: the caller should use .instrument(msg.span) to instrument the function.
970 async fn handle_client_request(&mut self, req: InProgressClientRequest) {
971 trace!(?req.request);
972 use Request::*;
973 use State::*;
974 let InProgressClientRequest { request, tx, span } = req;
975
976 if tx.is_canceled() {
977 metrics::counter!("peer.canceled").increment(1);
978 debug!(state = %self.state, %request, "ignoring canceled request");
979
980 metrics::counter!(
981 "zebra.net.out.requests.canceled",
982 "command" => request.command(),
983 "addr" => self.metrics_label.clone(),
984 )
985 .increment(1);
986 self.update_state_metrics(format!("Out::Req::Canceled::{}", request.command()));
987
988 return;
989 }
990
991 debug!(state = %self.state, %request, "sending request from Zebra to peer");
992
993 // Add a metric for outbound requests.
994 metrics::counter!(
995 "zebra.net.out.requests",
996 "command" => request.command(),
997 "addr" => self.metrics_label.clone(),
998 )
999 .increment(1);
1000 self.update_state_metrics(format!("Out::Req::{}", request.command()));
1001
1002 let new_handler = match (&self.state, request) {
1003 (Failed, request) => panic!(
1004 "failed connection cannot handle new request: {:?}, client_receiver: {:?}",
1005 request,
1006 self.client_rx
1007 ),
1008 (pending @ AwaitingResponse { .. }, request) => panic!(
1009 "tried to process new request: {:?} while awaiting a response: {:?}, client_receiver: {:?}",
1010 request,
1011 pending,
1012 self.client_rx
1013 ),
1014
1015 // Take some cached addresses from the peer connection. This address cache helps
1016 // work-around a `zcashd` addr response rate-limit.
1017 (AwaitingRequest, Peers) if !self.cached_addrs.is_empty() => {
1018 // Security: This method performs security-sensitive operations, see its comments
1019 // for details.
1020 let response_addrs = Handler::update_addr_cache(&mut self.cached_addrs, None, PEER_ADDR_RESPONSE_LIMIT);
1021
1022 debug!(
1023 response_addrs = response_addrs.len(),
1024 remaining_addrs = self.cached_addrs.len(),
1025 PEER_ADDR_RESPONSE_LIMIT,
1026 "responding to Peers request using some cached addresses",
1027 );
1028
1029 Ok(Handler::Finished(Ok(Response::Peers(response_addrs))))
1030 }
1031 (AwaitingRequest, Peers) => self
1032 .peer_tx
1033 .send(Message::GetAddr)
1034 .await
1035 .map(|()| Handler::Peers),
1036
1037 (AwaitingRequest, Ping(nonce)) => self
1038 .peer_tx
1039 .send(Message::Ping(nonce))
1040 .await
1041 .map(|()| Handler::Ping(nonce)),
1042
1043 (AwaitingRequest, BlocksByHash(hashes)) => {
1044 self
1045 .peer_tx
1046 .send(Message::GetData(
1047 hashes.iter().map(|h| (*h).into()).collect(),
1048 ))
1049 .await
1050 .map(|()|
1051 Handler::BlocksByHash {
1052 blocks: Vec::with_capacity(hashes.len()),
1053 pending_hashes: hashes,
1054 }
1055 )
1056 }
1057 (AwaitingRequest, TransactionsById(ids)) => {
1058 self
1059 .peer_tx
1060 .send(Message::GetData(
1061 ids.iter().map(Into::into).collect(),
1062 ))
1063 .await
1064 .map(|()|
1065 Handler::TransactionsById {
1066 transactions: Vec::with_capacity(ids.len()),
1067 pending_ids: ids,
1068 })
1069 }
1070
1071 (AwaitingRequest, FindBlocks { known_blocks, stop }) => {
1072 self
1073 .peer_tx
1074 .send(Message::GetBlocks { known_blocks, stop })
1075 .await
1076 .map(|()|
1077 Handler::FindBlocks
1078 )
1079 }
1080 (AwaitingRequest, FindHeaders { known_blocks, stop }) => {
1081 self
1082 .peer_tx
1083 .send(Message::GetHeaders { known_blocks, stop })
1084 .await
1085 .map(|()|
1086 Handler::FindHeaders
1087 )
1088 }
1089
1090 (AwaitingRequest, MempoolTransactionIds) => {
1091 self
1092 .peer_tx
1093 .send(Message::Mempool)
1094 .await
1095 .map(|()|
1096 Handler::MempoolTransactionIds
1097 )
1098 }
1099
1100 (AwaitingRequest, PushTransaction(transaction)) => {
1101 self
1102 .peer_tx
1103 .send(Message::Tx(transaction))
1104 .await
1105 .map(|()|
1106 Handler::Finished(Ok(Response::Nil))
1107 )
1108 }
1109 (AwaitingRequest, AdvertiseTransactionIds(hashes)) => {
1110 let max_tx_inv_in_message: usize = MAX_TX_INV_IN_SENT_MESSAGE
1111 .try_into()
1112 .expect("constant fits in usize");
1113
1114 // # Security
1115 //
1116 // In most cases, we try to split over-sized requests into multiple network-layer
1117 // messages. But we are unlikely to reach this limit with the default mempool
1118 // config, so a gossip like this could indicate a network amplification attack.
1119 //
1120 // This limit is particularly important here, because advertisements send the same
1121 // message to half our available peers.
1122 //
1123 // If there are thousands of transactions in the mempool, letting peers know the
1124 // exact transactions we have isn't that important, so it's ok to drop arbitrary
1125 // transaction hashes from our response.
1126 if hashes.len() > max_tx_inv_in_message {
1127 debug!(inv_count = ?hashes.len(), ?MAX_TX_INV_IN_SENT_MESSAGE, "unusually large transaction ID gossip");
1128 }
1129
1130 let hashes = hashes.into_iter().take(max_tx_inv_in_message).map(Into::into).collect();
1131
1132 self
1133 .peer_tx
1134 .send(Message::Inv(hashes))
1135 .await
1136 .map(|()|
1137 Handler::Finished(Ok(Response::Nil))
1138 )
1139 }
1140 (AwaitingRequest, AdvertiseBlock(hash)) => {
1141 self
1142 .peer_tx
1143 .send(Message::Inv(vec![hash.into()]))
1144 .await
1145 .map(|()|
1146 Handler::Finished(Ok(Response::Nil))
1147 )
1148 }
1149 };
1150
1151 // Update the connection state with a new handler, or fail with an error.
1152 match new_handler {
1153 Ok(handler) => {
1154 self.state = AwaitingResponse { handler, span, tx };
1155 self.request_timer = Some(Box::pin(sleep(constants::REQUEST_TIMEOUT)));
1156 }
1157 Err(error) => {
1158 let error = SharedPeerError::from(error);
1159 let _ = tx.send(Err(error.clone()));
1160 self.fail_with(error).await;
1161 }
1162 };
1163 }
1164
1165 /// Handle `msg` as a request from a peer to this Zebra instance.
1166 ///
1167 /// If the message is not handled, it is returned.
1168 // This function has its own span, because we're creating a new work
1169 // context (namely, the work of processing the inbound msg as a request)
1170 #[instrument(name = "msg_as_req", skip(self, msg), fields(msg = msg.command()))]
1171 async fn handle_message_as_request(&mut self, msg: Message) -> Option<Message> {
1172 trace!(?msg);
1173 debug!(state = %self.state, %msg, "received inbound peer message");
1174
1175 self.update_state_metrics(format!("In::Msg::{}", msg.command()));
1176
1177 use InboundMessage::*;
1178
1179 let req = match msg {
1180 Message::Ping(nonce) => {
1181 trace!(?nonce, "responding to heartbeat");
1182 if let Err(e) = self.peer_tx.send(Message::Pong(nonce)).await {
1183 self.fail_with(e).await;
1184 }
1185 Consumed
1186 }
1187 // These messages shouldn't be sent outside of a handshake.
1188 Message::Version { .. } => {
1189 self.fail_with(PeerError::DuplicateHandshake).await;
1190 Consumed
1191 }
1192 Message::Verack => {
1193 self.fail_with(PeerError::DuplicateHandshake).await;
1194 Consumed
1195 }
1196 // These messages should already be handled as a response if they
1197 // could be a response, so if we see them here, they were either
1198 // sent unsolicited, or they were sent in response to a canceled request
1199 // that we've already forgotten about.
1200 Message::Reject { .. } => {
1201 debug!(%msg, "got reject message unsolicited or from canceled request");
1202 Unused
1203 }
1204 Message::NotFound { .. } => {
1205 debug!(%msg, "got notfound message unsolicited or from canceled request");
1206 Unused
1207 }
1208 Message::Pong(_) => {
1209 debug!(%msg, "got pong message unsolicited or from canceled request");
1210 Unused
1211 }
1212 Message::Block(_) => {
1213 debug!(%msg, "got block message unsolicited or from canceled request");
1214 Unused
1215 }
1216 Message::Headers(_) => {
1217 debug!(%msg, "got headers message unsolicited or from canceled request");
1218 Unused
1219 }
1220 // These messages should never be sent by peers.
1221 Message::FilterLoad { .. } | Message::FilterAdd { .. } | Message::FilterClear => {
1222 // # Security
1223 //
1224 // Zcash connections are not authenticated, so malicious nodes can send fake messages,
1225 // with connected peers' IP addresses in the IP header.
1226 //
1227 // Since we can't verify their source, Zebra needs to ignore unexpected messages,
1228 // because closing the connection could cause a denial of service or eclipse attack.
1229 debug!(%msg, "got BIP111 message without advertising NODE_BLOOM");
1230
1231 // Ignored, but consumed because it is technically a protocol error.
1232 Consumed
1233 }
1234
1235 // # Security
1236 //
1237 // Zebra crawls the network proactively, and that's the only way peers get into our
1238 // address book. This prevents peers from filling our address book with malicious peer
1239 // addresses.
1240 Message::Addr(ref new_addrs) => {
1241 // # Peer Set Reliability
1242 //
1243 // We keep a list of the unused peer addresses sent by each connection, to work
1244 // around `zcashd`'s `getaddr` response rate-limit.
1245 let no_response =
1246 Handler::update_addr_cache(&mut self.cached_addrs, new_addrs, None);
1247 assert_eq!(
1248 no_response,
1249 Vec::new(),
1250 "peers unexpectedly taken from cache"
1251 );
1252
1253 debug!(
1254 new_addrs = new_addrs.len(),
1255 cached_addrs = self.cached_addrs.len(),
1256 "adding unsolicited addresses to cached addresses",
1257 );
1258
1259 Consumed
1260 }
1261 Message::Tx(ref transaction) => Request::PushTransaction(transaction.clone()).into(),
1262 Message::Inv(ref items) => match &items[..] {
1263 // We don't expect to be advertised multiple blocks at a time,
1264 // so we ignore any advertisements of multiple blocks.
1265 [InventoryHash::Block(hash)] => Request::AdvertiseBlock(*hash).into(),
1266
1267 // Some peers advertise invs with mixed item types.
1268 // But we're just interested in the transaction invs.
1269 //
1270 // TODO: split mixed invs into multiple requests,
1271 // but skip runs of multiple blocks.
1272 tx_ids if tx_ids.iter().any(|item| item.unmined_tx_id().is_some()) => {
1273 Request::AdvertiseTransactionIds(transaction_ids(items).collect()).into()
1274 }
1275
1276 // Log detailed messages for ignored inv advertisement messages.
1277 [] => {
1278 debug!(%msg, "ignoring empty inv");
1279
1280 // This might be a minor protocol error, or it might mean "not found".
1281 Unused
1282 }
1283 [InventoryHash::Block(_), InventoryHash::Block(_), ..] => {
1284 debug!(%msg, "ignoring inv with multiple blocks");
1285 Unused
1286 }
1287 _ => {
1288 debug!(%msg, "ignoring inv with no transactions");
1289 Unused
1290 }
1291 },
1292 Message::GetData(ref items) => match &items[..] {
1293 // Some peers advertise invs with mixed item types.
1294 // So we suspect they might do the same with getdata.
1295 //
1296 // Since we can only handle one message at a time,
1297 // we treat it as a block request if there are any blocks,
1298 // or a transaction request if there are any transactions.
1299 //
1300 // TODO: split mixed getdata into multiple requests.
1301 b_hashes
1302 if b_hashes
1303 .iter()
1304 .any(|item| matches!(item, InventoryHash::Block(_))) =>
1305 {
1306 Request::BlocksByHash(block_hashes(items).collect()).into()
1307 }
1308 tx_ids if tx_ids.iter().any(|item| item.unmined_tx_id().is_some()) => {
1309 Request::TransactionsById(transaction_ids(items).collect()).into()
1310 }
1311
1312 // Log detailed messages for ignored getdata request messages.
1313 [] => {
1314 debug!(%msg, "ignoring empty getdata");
1315
1316 // This might be a minor protocol error, or it might mean "not found".
1317 Unused
1318 }
1319 _ => {
1320 debug!(%msg, "ignoring getdata with no blocks or transactions");
1321 Unused
1322 }
1323 },
1324 Message::GetAddr => Request::Peers.into(),
1325 Message::GetBlocks {
1326 ref known_blocks,
1327 stop,
1328 } => Request::FindBlocks {
1329 known_blocks: known_blocks.clone(),
1330 stop,
1331 }
1332 .into(),
1333 Message::GetHeaders {
1334 ref known_blocks,
1335 stop,
1336 } => Request::FindHeaders {
1337 known_blocks: known_blocks.clone(),
1338 stop,
1339 }
1340 .into(),
1341 Message::Mempool => Request::MempoolTransactionIds.into(),
1342 };
1343
1344 // Handle the request, and return unused messages.
1345 match req {
1346 AsRequest(req) => {
1347 self.drive_peer_request(req).await;
1348 None
1349 }
1350 Consumed => None,
1351 Unused => Some(msg),
1352 }
1353 }
1354
1355 /// Given a `req` originating from the peer, drive it to completion and send
1356 /// any appropriate messages to the remote peer. If an error occurs while
1357 /// processing the request (e.g., the service is shedding load), then we call
1358 /// fail_with to terminate the entire peer connection, shrinking the number
1359 /// of connected peers.
1360 async fn drive_peer_request(&mut self, req: Request) {
1361 trace!(?req);
1362
1363 // Add a metric for inbound requests
1364 metrics::counter!(
1365 "zebra.net.in.requests",
1366 "command" => req.command(),
1367 "addr" => self.metrics_label.clone(),
1368 )
1369 .increment(1);
1370 self.update_state_metrics(format!("In::Req::{}", req.command()));
1371
1372 // Give the inbound service time to clear its queue,
1373 // before sending the next inbound request.
1374 tokio::task::yield_now().await;
1375
1376 // # Security
1377 //
1378 // Holding buffer slots for a long time can cause hangs:
1379 // <https://docs.rs/tower/latest/tower/buffer/struct.Buffer.html#a-note-on-choosing-a-bound>
1380 //
1381 // The inbound service must be called immediately after a buffer slot is reserved.
1382 //
1383 // The inbound service never times out in readiness, because the load shed layer is always
1384 // ready, and returns an error in response to the request instead.
1385 if self.svc.ready().await.is_err() {
1386 self.fail_with(PeerError::ServiceShutdown).await;
1387 return;
1388 }
1389
1390 // Inbound service request timeouts are handled by the timeout layer in `start::start()`.
1391 let rsp = match self.svc.call(req.clone()).await {
1392 Err(e) => {
1393 if e.is::<tower::load_shed::error::Overloaded>() {
1394 // # Security
1395 //
1396 // The peer request queue must have a limited length.
1397 // The buffer and load shed layers are added in `start::start()`.
1398 tracing::debug!("inbound service is overloaded, may close connection");
1399
1400 let now = Instant::now();
1401
1402 self.handle_inbound_overload(req, now, PeerError::Overloaded)
1403 .await;
1404 } else if e.is::<tower::timeout::error::Elapsed>() {
1405 // # Security
1406 //
1407 // Peer requests must have a timeout.
1408 // The timeout layer is added in `start::start()`.
1409 tracing::info!(%req, "inbound service request timed out, may close connection");
1410
1411 let now = Instant::now();
1412
1413 self.handle_inbound_overload(req, now, PeerError::InboundTimeout)
1414 .await;
1415 } else {
1416 // We could send a reject to the remote peer, but that might cause
1417 // them to disconnect, and we might be using them to sync blocks.
1418 // For similar reasons, we don't want to fail_with() here - we
1419 // only close the connection if the peer is doing something wrong.
1420 info!(
1421 %e,
1422 connection_state = ?self.state,
1423 client_receiver = ?self.client_rx,
1424 "error processing peer request",
1425 );
1426 self.update_state_metrics(format!("In::Req::{}/Rsp::Error", req.command()));
1427 }
1428
1429 return;
1430 }
1431 Ok(rsp) => rsp,
1432 };
1433
1434 // Add a metric for outbound responses to inbound requests
1435 metrics::counter!(
1436 "zebra.net.out.responses",
1437 "command" => rsp.command(),
1438 "addr" => self.metrics_label.clone(),
1439 )
1440 .increment(1);
1441 self.update_state_metrics(format!("In::Rsp::{}", rsp.command()));
1442
1443 // TODO: split response handler into its own method
1444 match rsp.clone() {
1445 Response::Nil => { /* generic success, do nothing */ }
1446 Response::Peers(addrs) => {
1447 if let Err(e) = self.peer_tx.send(Message::Addr(addrs)).await {
1448 self.fail_with(e).await;
1449 }
1450 }
1451 Response::Transactions(transactions) => {
1452 // Generate one tx message per transaction,
1453 // then a notfound message with all the missing transaction ids.
1454 let mut missing_ids = Vec::new();
1455
1456 for transaction in transactions.into_iter() {
1457 match transaction {
1458 Available((transaction, _)) => {
1459 if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await {
1460 self.fail_with(e).await;
1461 return;
1462 }
1463 }
1464 Missing(id) => missing_ids.push(id.into()),
1465 }
1466 }
1467
1468 if !missing_ids.is_empty() {
1469 if let Err(e) = self.peer_tx.send(Message::NotFound(missing_ids)).await {
1470 self.fail_with(e).await;
1471 return;
1472 }
1473 }
1474 }
1475 Response::Blocks(blocks) => {
1476 // Generate one tx message per block,
1477 // then a notfound message with all the missing block hashes.
1478 let mut missing_hashes = Vec::new();
1479
1480 for block in blocks.into_iter() {
1481 match block {
1482 Available((block, _)) => {
1483 if let Err(e) = self.peer_tx.send(Message::Block(block)).await {
1484 self.fail_with(e).await;
1485 return;
1486 }
1487 }
1488 Missing(hash) => missing_hashes.push(hash.into()),
1489 }
1490 }
1491
1492 if !missing_hashes.is_empty() {
1493 if let Err(e) = self.peer_tx.send(Message::NotFound(missing_hashes)).await {
1494 self.fail_with(e).await;
1495 return;
1496 }
1497 }
1498 }
1499 Response::BlockHashes(hashes) => {
1500 if let Err(e) = self
1501 .peer_tx
1502 .send(Message::Inv(hashes.into_iter().map(Into::into).collect()))
1503 .await
1504 {
1505 self.fail_with(e).await
1506 }
1507 }
1508 Response::BlockHeaders(headers) => {
1509 if let Err(e) = self.peer_tx.send(Message::Headers(headers)).await {
1510 self.fail_with(e).await
1511 }
1512 }
1513 Response::TransactionIds(hashes) => {
1514 let max_tx_inv_in_message: usize = MAX_TX_INV_IN_SENT_MESSAGE
1515 .try_into()
1516 .expect("constant fits in usize");
1517
1518 // # Security
1519 //
1520 // In most cases, we try to split over-sized responses into multiple network-layer
1521 // messages. But we are unlikely to reach this limit with the default mempool
1522 // config, so a response like this could indicate a network amplification attack.
1523 //
1524 // If there are thousands of transactions in the mempool, letting peers know the
1525 // exact transactions we have isn't that important, so it's ok to drop arbitrary
1526 // transaction hashes from our response.
1527 if hashes.len() > max_tx_inv_in_message {
1528 debug!(inv_count = ?hashes.len(), ?MAX_TX_INV_IN_SENT_MESSAGE, "unusually large transaction ID response");
1529 }
1530
1531 let hashes = hashes
1532 .into_iter()
1533 .take(max_tx_inv_in_message)
1534 .map(Into::into)
1535 .collect();
1536
1537 if let Err(e) = self.peer_tx.send(Message::Inv(hashes)).await {
1538 self.fail_with(e).await
1539 }
1540 }
1541 }
1542
1543 debug!(state = %self.state, %req, %rsp, "sent Zebra response to peer");
1544
1545 // Give the inbound service time to clear its queue,
1546 // before checking the connection for the next inbound or outbound request.
1547 tokio::task::yield_now().await;
1548 }
1549
1550 /// Handle inbound service overload and timeout error responses by randomly terminating some
1551 /// connections.
1552 ///
1553 /// # Security
1554 ///
1555 /// When the inbound service is overloaded with requests, Zebra needs to drop some connections,
1556 /// to reduce the load on the application. But dropping every connection that receives an
1557 /// `Overloaded` error from the inbound service could cause Zebra to drop too many peer
1558 /// connections, and stop itself downloading blocks or transactions.
1559 ///
1560 /// Malicious or misbehaving peers can also overload the inbound service, and make Zebra drop
1561 /// its connections to other peers.
1562 ///
1563 /// So instead, Zebra drops some overloaded connections at random. If a connection has recently
1564 /// overloaded the inbound service, it is more likely to be dropped. This makes it harder for a
1565 /// single peer (or multiple peers) to perform a denial of service attack.
1566 ///
1567 /// The inbound connection rate-limit also makes it hard for multiple peers to perform this
1568 /// attack, because each inbound connection can only send one inbound request before its
1569 /// probability of being disconnected increases.
1570 async fn handle_inbound_overload(&mut self, req: Request, now: Instant, error: PeerError) {
1571 let prev = self.last_overload_time.replace(now);
1572 let drop_connection_probability = overload_drop_connection_probability(now, prev);
1573
1574 if thread_rng().gen::<f32>() < drop_connection_probability {
1575 if matches!(error, PeerError::Overloaded) {
1576 metrics::counter!("pool.closed.loadshed").increment(1);
1577 } else {
1578 metrics::counter!("pool.closed.inbound.timeout").increment(1);
1579 }
1580
1581 tracing::info!(
1582 drop_connection_probability = format!("{drop_connection_probability:.3}"),
1583 remote_user_agent = ?self.connection_info.remote.user_agent,
1584 negotiated_version = ?self.connection_info.negotiated_version,
1585 peer = ?self.metrics_label,
1586 last_peer_state = ?self.last_metrics_state,
1587 // TODO: remove this detailed debug info once #6506 is fixed
1588 remote_height = ?self.connection_info.remote.start_height,
1589 cached_addrs = ?self.cached_addrs.len(),
1590 connection_state = ?self.state,
1591 "inbound service {error} error, closing connection",
1592 );
1593
1594 self.update_state_metrics(format!("In::Req::{}/Rsp::{error}::Error", req.command()));
1595 self.fail_with(error).await;
1596 } else {
1597 self.update_state_metrics(format!("In::Req::{}/Rsp::{error}::Ignored", req.command()));
1598
1599 if matches!(error, PeerError::Overloaded) {
1600 metrics::counter!("pool.ignored.loadshed").increment(1);
1601 } else {
1602 metrics::counter!("pool.ignored.inbound.timeout").increment(1);
1603 }
1604 }
1605 }
1606}
1607
1608/// Returns the probability of dropping a connection where the last overload was at `prev`,
1609/// and the current overload is `now`.
1610///
1611/// # Security
1612///
1613/// Connections that haven't seen an overload error in the past OVERLOAD_PROTECTION_INTERVAL
1614/// have a small chance of being closed (MIN_OVERLOAD_DROP_PROBABILITY).
1615///
1616/// Connections that have seen a previous overload error in that time
1617/// have a higher chance of being dropped up to MAX_OVERLOAD_DROP_PROBABILITY.
1618/// This probability increases quadratically, so peers that send lots of inbound
1619/// requests are more likely to be dropped.
1620///
1621/// ## Examples
1622///
1623/// If a connection sends multiple overloads close together, it is very likely to be
1624/// disconnected. If a connection has two overloads multiple seconds apart, it is unlikely
1625/// to be disconnected.
1626fn overload_drop_connection_probability(now: Instant, prev: Option<Instant>) -> f32 {
1627 let Some(prev) = prev else {
1628 return MIN_OVERLOAD_DROP_PROBABILITY;
1629 };
1630
1631 let protection_fraction_since_last_overload =
1632 (now - prev).as_secs_f32() / OVERLOAD_PROTECTION_INTERVAL.as_secs_f32();
1633
1634 // Quadratically increase the disconnection probability for very recent overloads.
1635 // Negative values are ignored by clamping to MIN_OVERLOAD_DROP_PROBABILITY.
1636 let overload_fraction = protection_fraction_since_last_overload.powi(2);
1637
1638 let probability_range = MAX_OVERLOAD_DROP_PROBABILITY - MIN_OVERLOAD_DROP_PROBABILITY;
1639 let raw_drop_probability =
1640 MAX_OVERLOAD_DROP_PROBABILITY - (overload_fraction * probability_range);
1641
1642 raw_drop_probability.clamp(MIN_OVERLOAD_DROP_PROBABILITY, MAX_OVERLOAD_DROP_PROBABILITY)
1643}
1644
1645impl<S, Tx> Connection<S, Tx>
1646where
1647 Tx: Sink<Message, Error = SerializationError> + Unpin,
1648{
1649 /// Update the connection state metrics for this connection,
1650 /// using `extra_state_info` as additional state information.
1651 fn update_state_metrics(&mut self, extra_state_info: impl Into<Option<String>>) {
1652 let current_metrics_state = if let Some(extra_state_info) = extra_state_info.into() {
1653 format!("{}::{extra_state_info}", self.state.command()).into()
1654 } else {
1655 self.state.command()
1656 };
1657
1658 if self.last_metrics_state.as_ref() == Some(¤t_metrics_state) {
1659 return;
1660 }
1661
1662 self.erase_state_metrics();
1663
1664 // Set the new state
1665 metrics::gauge!(
1666 "zebra.net.connection.state",
1667 "command" => current_metrics_state.clone(),
1668 "addr" => self.metrics_label.clone(),
1669 )
1670 .increment(1.0);
1671
1672 self.last_metrics_state = Some(current_metrics_state);
1673 }
1674
1675 /// Erase the connection state metrics for this connection.
1676 fn erase_state_metrics(&mut self) {
1677 if let Some(last_metrics_state) = self.last_metrics_state.take() {
1678 metrics::gauge!(
1679 "zebra.net.connection.state",
1680 "command" => last_metrics_state,
1681 "addr" => self.metrics_label.clone(),
1682 )
1683 .set(0.0);
1684 }
1685 }
1686
1687 /// Marks the peer as having failed with `error`, and performs connection cleanup,
1688 /// including async channel closes.
1689 ///
1690 /// If the connection has errored already, re-use the original error.
1691 /// Otherwise, fail the connection with `error`.
1692 async fn shutdown_async(&mut self, error: impl Into<SharedPeerError>) {
1693 // Close async channels first, so other tasks can start shutting down.
1694 // There's nothing we can do about errors while shutting down, and some errors are expected.
1695 //
1696 // TODO: close peer_tx and peer_rx in shutdown() and Drop, after:
1697 // - using channels instead of streams/sinks?
1698 // - exposing the underlying implementation rather than using generics and closures?
1699 // - adding peer_rx to the connection struct (optional)
1700 let _ = self.peer_tx.close().await;
1701
1702 self.shutdown(error);
1703 }
1704
1705 /// Marks the peer as having failed with `error`, and performs connection cleanup.
1706 /// See [`Self::shutdown_async()`] for details.
1707 ///
1708 /// Call [`Self::shutdown_async()`] in async code, because it can shut down more channels.
1709 fn shutdown(&mut self, error: impl Into<SharedPeerError>) {
1710 let mut error = error.into();
1711
1712 // Close channels first, so other tasks can start shutting down.
1713 self.client_rx.close();
1714
1715 // Update the shared error slot
1716 //
1717 // # Correctness
1718 //
1719 // Error slots use a threaded `std::sync::Mutex`, so accessing the slot
1720 // can block the async task's current thread. We only perform a single
1721 // slot update per `Client`. We ignore subsequent error slot updates.
1722 let slot_result = self.error_slot.try_update_error(error.clone());
1723
1724 if let Err(AlreadyErrored { original_error }) = slot_result {
1725 debug!(
1726 new_error = %error,
1727 %original_error,
1728 connection_state = ?self.state,
1729 "multiple errors on connection: \
1730 failed connections should stop processing pending requests and responses, \
1731 then close the connection"
1732 );
1733
1734 error = original_error;
1735 } else {
1736 debug!(%error,
1737 connection_state = ?self.state,
1738 "shutting down peer service with error");
1739 }
1740
1741 // Prepare to flush any pending client requests.
1742 //
1743 // We've already closed the client channel, so setting State::Failed
1744 // will make the main loop flush any pending requests.
1745 //
1746 // However, we may have an outstanding client request in State::AwaitingResponse,
1747 // so we need to deal with it first.
1748 if let State::AwaitingResponse { tx, .. } =
1749 std::mem::replace(&mut self.state, State::Failed)
1750 {
1751 // # Correctness
1752 //
1753 // We know the slot has Some(error), because we just set it above,
1754 // and the error slot is never unset.
1755 //
1756 // Accessing the error slot locks a threaded std::sync::Mutex, which
1757 // can block the current async task thread. We briefly lock the mutex
1758 // to clone the error.
1759 let _ = tx.send(Err(error.clone()));
1760 }
1761
1762 // Make the timer and metrics consistent with the Failed state.
1763 self.request_timer = None;
1764 self.update_state_metrics(None);
1765
1766 // Finally, flush pending client requests.
1767 while let Some(InProgressClientRequest { tx, span, .. }) =
1768 self.client_rx.close_and_flush_next()
1769 {
1770 trace!(
1771 parent: &span,
1772 %error,
1773 "sending an error response to a pending request on a failed connection"
1774 );
1775 let _ = tx.send(Err(error.clone()));
1776 }
1777 }
1778}
1779
1780impl<S, Tx> Drop for Connection<S, Tx>
1781where
1782 Tx: Sink<Message, Error = SerializationError> + Unpin,
1783{
1784 fn drop(&mut self) {
1785 self.shutdown(PeerError::ConnectionDropped);
1786
1787 self.erase_state_metrics();
1788 }
1789}
1790
1791/// Map a list of inventory hashes to the corresponding unmined transaction IDs.
1792/// Non-transaction inventory hashes are skipped.
1793///
1794/// v4 transactions use a legacy transaction ID, and
1795/// v5 transactions use a witnessed transaction ID.
1796fn transaction_ids(items: &'_ [InventoryHash]) -> impl Iterator<Item = UnminedTxId> + '_ {
1797 items.iter().filter_map(InventoryHash::unmined_tx_id)
1798}
1799
1800/// Map a list of inventory hashes to the corresponding block hashes.
1801/// Non-block inventory hashes are skipped.
1802fn block_hashes(items: &'_ [InventoryHash]) -> impl Iterator<Item = block::Hash> + '_ {
1803 items.iter().filter_map(InventoryHash::block_hash)
1804}