zebrad/components/
inbound.rs

1//! The inbound service handles requests from Zebra's peers.
2//!
3//! It downloads and verifies gossiped blocks and mempool transactions,
4//! when Zebra is close to the chain tip.
5//!
6//! It also responds to peer requests for blocks, transactions, and peer addresses.
7
8use std::{
9    collections::HashSet,
10    future::Future,
11    pin::Pin,
12    sync::Arc,
13    task::{Context, Poll},
14    time::Duration,
15};
16
17use futures::{
18    future::{FutureExt, TryFutureExt},
19    stream::Stream,
20};
21use tokio::sync::oneshot::{self, error::TryRecvError};
22use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt};
23
24use zebra_network::{self as zn, PeerSocketAddr};
25use zebra_state::{self as zs};
26
27use zebra_chain::{
28    block::{self, Block},
29    serialization::ZcashSerialize,
30    transaction::UnminedTxId,
31};
32use zebra_consensus::{router::RouterError, VerifyBlockError};
33use zebra_network::{AddressBook, InventoryResponse};
34use zebra_node_services::mempool;
35
36use crate::BoxError;
37
38// Re-use the syncer timeouts for consistency.
39use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
40
41use InventoryResponse::*;
42
43mod cached_peer_addr_response;
44pub(crate) mod downloads;
45
46use cached_peer_addr_response::CachedPeerAddrResponse;
47
48#[cfg(test)]
49mod tests;
50
51use downloads::Downloads as BlockDownloads;
52
53/// The maximum amount of time an inbound service response can take.
54///
55/// If the response takes longer than this time, it will be cancelled,
56/// and the peer might be disconnected.
57pub const MAX_INBOUND_RESPONSE_TIME: Duration = Duration::from_secs(5);
58
59/// The number of bytes the [`Inbound`] service will queue in response to a single block or
60/// transaction request, before ignoring any additional block or transaction IDs in that request.
61///
62/// This is the same as `zcashd`'s default send buffer limit:
63/// <https://github.com/zcash/zcash/blob/829dd94f9d253bb705f9e194f13cb8ca8e545e1e/src/net.h#L84>
64/// as used in `ProcessGetData()`:
65/// <https://github.com/zcash/zcash/blob/829dd94f9d253bb705f9e194f13cb8ca8e545e1e/src/main.cpp#L6410-L6412>
66pub const GETDATA_SENT_BYTES_LIMIT: usize = 1_000_000;
67
68/// The maximum number of blocks the [`Inbound`] service will queue in response to a block request,
69/// before ignoring any additional block IDs in that request.
70///
71/// This is the same as `zcashd`'s request limit:
72/// <https://github.com/zcash/zcash/blob/829dd94f9d253bb705f9e194f13cb8ca8e545e1e/src/main.h#L108>
73///
74/// (Zebra's request limit is one block in transit per peer, because it fans out block requests to
75/// many peers instead of just a few peers.)
76pub const GETDATA_MAX_BLOCK_COUNT: usize = 16;
77
78type BlockDownloadPeerSet =
79    Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
80type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
81type Mempool = Buffer<BoxService<mempool::Request, mempool::Response, BoxError>, mempool::Request>;
82type SemanticBlockVerifier = Buffer<
83    BoxService<zebra_consensus::Request, block::Hash, RouterError>,
84    zebra_consensus::Request,
85>;
86type GossipedBlockDownloads =
87    BlockDownloads<Timeout<BlockDownloadPeerSet>, Timeout<SemanticBlockVerifier>, State>;
88
89/// The services used by the [`Inbound`] service.
90pub struct InboundSetupData {
91    /// A shared list of peer addresses.
92    pub address_book: Arc<std::sync::Mutex<AddressBook>>,
93
94    /// A service that can be used to download gossiped blocks.
95    pub block_download_peer_set: BlockDownloadPeerSet,
96
97    /// A service that verifies downloaded blocks.
98    ///
99    /// Given to `Inbound.block_downloads` after the required services are set up.
100    pub block_verifier: SemanticBlockVerifier,
101
102    /// A service that manages transactions in the memory pool.
103    pub mempool: Mempool,
104
105    /// A service that manages cached blockchain state.
106    pub state: State,
107
108    /// Allows efficient access to the best tip of the blockchain.
109    pub latest_chain_tip: zs::LatestChainTip,
110
111    /// A channel to send misbehavior reports to the [`AddressBook`].
112    pub misbehavior_sender: tokio::sync::mpsc::Sender<(PeerSocketAddr, u32)>,
113}
114
115/// Tracks the internal state of the [`Inbound`] service during setup.
116pub enum Setup {
117    /// Waiting for service setup to complete.
118    ///
119    /// All requests are ignored.
120    Pending {
121        // Configuration
122        //
123        /// The configured full verification concurrency limit.
124        full_verify_concurrency_limit: usize,
125
126        // Services
127        //
128        /// A oneshot channel used to receive required services,
129        /// after they are set up.
130        setup: oneshot::Receiver<InboundSetupData>,
131    },
132
133    /// Setup is complete.
134    ///
135    /// All requests are answered.
136    Initialized {
137        // Services
138        //
139        /// An owned partial list of peer addresses used as a `GetAddr` response, and
140        /// a shared list of peer addresses used to periodically refresh the partial list.
141        ///
142        /// Refreshed from the address book in `poll_ready` method
143        /// after [`CACHED_ADDRS_REFRESH_INTERVAL`](cached_peer_addr_response::CACHED_ADDRS_REFRESH_INTERVAL).
144        cached_peer_addr_response: CachedPeerAddrResponse,
145
146        /// A `futures::Stream` that downloads and verifies gossiped blocks.
147        block_downloads: Pin<Box<GossipedBlockDownloads>>,
148
149        /// A service that manages transactions in the memory pool.
150        mempool: Mempool,
151
152        /// A service that manages cached blockchain state.
153        state: State,
154
155        /// A channel to send misbehavior reports to the [`AddressBook`].
156        misbehavior_sender: tokio::sync::mpsc::Sender<(PeerSocketAddr, u32)>,
157    },
158
159    /// Temporary state used in the inbound service's internal initialization code.
160    ///
161    /// If this state occurs outside the service initialization code, the service panics.
162    FailedInit,
163
164    /// Setup failed, because the setup channel permanently failed.
165    /// The service keeps returning readiness errors for every request.
166    FailedRecv {
167        /// The original channel error.
168        error: SharedRecvError,
169    },
170}
171
172/// A wrapper around `Arc<TryRecvError>` that implements `Error`.
173#[derive(thiserror::Error, Debug, Clone)]
174#[error(transparent)]
175pub struct SharedRecvError(Arc<TryRecvError>);
176
177impl From<TryRecvError> for SharedRecvError {
178    fn from(source: TryRecvError) -> Self {
179        Self(Arc::new(source))
180    }
181}
182
183/// Uses the node state to respond to inbound peer requests.
184///
185/// This service, wrapped in appropriate middleware, is passed to
186/// `zebra_network::init` to respond to inbound peer requests.
187///
188/// The `Inbound` service is responsible for:
189///
190/// - supplying network data like peer addresses to other nodes;
191/// - supplying chain data like blocks to other nodes;
192/// - supplying mempool transactions to other nodes;
193/// - receiving gossiped transactions; and
194/// - receiving gossiped blocks.
195///
196/// Because the `Inbound` service is responsible for participating in the gossip
197/// protocols used for transaction and block diffusion, there is a potential
198/// overlap with the `ChainSync` and `Mempool` components.
199///
200/// The division of responsibility is that:
201///
202/// The `ChainSync` and `Mempool` components are *internally driven*,
203/// periodically polling the network to check for new blocks or transactions.
204///
205/// The `Inbound` service is *externally driven*, responding to block gossip
206/// by attempting to download and validate advertised blocks.
207///
208/// Gossiped transactions are forwarded to the mempool downloader,
209/// which unifies polled and gossiped transactions into a single download list.
210pub struct Inbound {
211    /// Provides service dependencies, if they are available.
212    ///
213    /// Some services are unavailable until Zebra has completed setup.
214    setup: Setup,
215}
216
217impl Inbound {
218    /// Create a new inbound service.
219    ///
220    /// Dependent services are sent via the `setup` channel after initialization.
221    pub fn new(
222        full_verify_concurrency_limit: usize,
223        setup: oneshot::Receiver<InboundSetupData>,
224    ) -> Inbound {
225        Inbound {
226            setup: Setup::Pending {
227                full_verify_concurrency_limit,
228                setup,
229            },
230        }
231    }
232
233    /// Remove `self.setup`, temporarily replacing it with an invalid state.
234    fn take_setup(&mut self) -> Setup {
235        let mut setup = Setup::FailedInit;
236        std::mem::swap(&mut self.setup, &mut setup);
237        setup
238    }
239}
240
241impl Service<zn::Request> for Inbound {
242    type Response = zn::Response;
243    type Error = zn::BoxError;
244    type Future =
245        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
246
247    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
248        // Check whether the setup is finished, but don't wait for it to
249        // become ready before reporting readiness. We expect to get it "soon",
250        // and reporting unreadiness might cause unwanted load-shedding, since
251        // the load-shed middleware is unable to distinguish being unready due
252        // to load from being unready while waiting on setup.
253
254        // Every setup variant handler must provide a result
255        let result;
256
257        self.setup = match self.take_setup() {
258            Setup::Pending {
259                full_verify_concurrency_limit,
260                mut setup,
261            } => match setup.try_recv() {
262                Ok(setup_data) => {
263                    let InboundSetupData {
264                        address_book,
265                        block_download_peer_set,
266                        block_verifier,
267                        mempool,
268                        state,
269                        latest_chain_tip,
270                        misbehavior_sender,
271                    } = setup_data;
272
273                    let cached_peer_addr_response = CachedPeerAddrResponse::new(address_book);
274
275                    let block_downloads = Box::pin(BlockDownloads::new(
276                        full_verify_concurrency_limit,
277                        Timeout::new(block_download_peer_set, BLOCK_DOWNLOAD_TIMEOUT),
278                        Timeout::new(block_verifier, BLOCK_VERIFY_TIMEOUT),
279                        state.clone(),
280                        latest_chain_tip,
281                    ));
282
283                    result = Ok(());
284                    Setup::Initialized {
285                        cached_peer_addr_response,
286                        block_downloads,
287                        mempool,
288                        state,
289                        misbehavior_sender,
290                    }
291                }
292                Err(TryRecvError::Empty) => {
293                    // There's no setup data yet, so keep waiting for it.
294                    //
295                    // We could use Future::poll() to get a waker and return Poll::Pending here.
296                    // But we want to drop excess requests during startup instead. Otherwise,
297                    // the inbound service gets overloaded, and starts disconnecting peers.
298                    result = Ok(());
299                    Setup::Pending {
300                        full_verify_concurrency_limit,
301                        setup,
302                    }
303                }
304                Err(error @ TryRecvError::Closed) => {
305                    // Mark the service as failed, because setup failed
306                    error!(?error, "inbound setup failed");
307                    let error: SharedRecvError = error.into();
308                    result = Err(error.clone().into());
309                    Setup::FailedRecv { error }
310                }
311            },
312            // Make sure previous setups were left in a valid state
313            Setup::FailedInit => unreachable!("incomplete previous Inbound initialization"),
314            // If setup failed, report service failure
315            Setup::FailedRecv { error } => {
316                result = Err(error.clone().into());
317                Setup::FailedRecv { error }
318            }
319            // Clean up completed download tasks, ignoring their results
320            Setup::Initialized {
321                cached_peer_addr_response,
322                mut block_downloads,
323                mempool,
324                state,
325                misbehavior_sender,
326            } => {
327                // # Correctness
328                //
329                // Clear the stream but ignore the final Pending return value.
330                // If we returned Pending here, and there were no waiting block downloads,
331                // then inbound requests would wait for the next block download, and hang forever.
332                while let Poll::Ready(Some(result)) = block_downloads.as_mut().poll_next(cx) {
333                    let Err((err, Some(advertiser_addr))) = result else {
334                        continue;
335                    };
336
337                    let Ok(err) = err.downcast::<VerifyBlockError>() else {
338                        continue;
339                    };
340
341                    if err.misbehavior_score() != 0 {
342                        let _ =
343                            misbehavior_sender.try_send((advertiser_addr, err.misbehavior_score()));
344                    }
345                }
346
347                result = Ok(());
348
349                Setup::Initialized {
350                    cached_peer_addr_response,
351                    block_downloads,
352                    mempool,
353                    state,
354                    misbehavior_sender,
355                }
356            }
357        };
358
359        // Make sure we're leaving the setup in a valid state
360        if matches!(self.setup, Setup::FailedInit) {
361            unreachable!("incomplete Inbound initialization after poll_ready state handling");
362        }
363
364        // TODO:
365        //  * do we want to propagate backpressure from the download queue or its outbound network?
366        //    currently, the download queue waits for the outbound network in the download future,
367        //    and drops new requests after it reaches a hard-coded limit. This is the
368        //    "load shed directly" pattern from #1618.
369        //  * currently, the state service is always ready, unless its buffer is full.
370        //    So we might also want to propagate backpressure from its buffer.
371        //  * poll_ready needs to be implemented carefully, to avoid hangs or deadlocks.
372        //    See #1593 for details.
373        Poll::Ready(result)
374    }
375
376    /// Call the inbound service.
377    ///
378    /// Errors indicate that the peer has done something wrong or unexpected,
379    /// and will cause callers to disconnect from the remote peer.
380    #[instrument(name = "inbound", skip(self, req))]
381    fn call(&mut self, req: zn::Request) -> Self::Future {
382        let (cached_peer_addr_response, block_downloads, mempool, state) = match &mut self.setup {
383            Setup::Initialized {
384                cached_peer_addr_response,
385                block_downloads,
386                mempool,
387                state,
388                misbehavior_sender: _,
389            } => (cached_peer_addr_response, block_downloads, mempool, state),
390            _ => {
391                debug!("ignoring request from remote peer during setup");
392                return async { Ok(zn::Response::Nil) }.boxed();
393            }
394        };
395
396        match req {
397            zn::Request::Peers => {
398                // # Security
399                //
400                // We truncate the list to not reveal our entire peer set in one call.
401                // But we don't monitor repeated requests and the results are shuffled,
402                // a crawler could just send repeated queries and get the full list.
403                //
404                // # Correctness
405                //
406                // If the address book is busy, try again inside the future. If it can't be locked
407                // twice, ignore the request.
408                cached_peer_addr_response.try_refresh();
409                let response = cached_peer_addr_response.value();
410
411                async move {
412                    Ok(response)
413                }.boxed()
414            }
415            zn::Request::BlocksByHash(hashes) => {
416                // We return an available or missing response to each inventory request,
417                // unless the request is empty, or it reaches a response limit.
418                if hashes.is_empty() {
419                    return async { Ok(zn::Response::Nil) }.boxed();
420                }
421
422                let state = state.clone();
423
424                async move {
425                    let mut blocks: Vec<InventoryResponse<(Arc<Block>, Option<PeerSocketAddr>), block::Hash>> = Vec::new();
426                    let mut total_size = 0;
427
428                    // Ignore any block hashes past the response limit.
429                    // This saves us expensive database lookups.
430                    for &hash in hashes.iter().take(GETDATA_MAX_BLOCK_COUNT) {
431                        // We check the limit after including at least one block, so that we can
432                        // send blocks greater than 1 MB (but only one at a time)
433                        if total_size >= GETDATA_SENT_BYTES_LIMIT {
434                            break;
435                        }
436
437                        let response = state.clone().ready().await?.call(zs::Request::Block(hash.into())).await?;
438
439                        // Add the block responses to the list, while updating the size limit.
440                        //
441                        // If there was a database error, return the error,
442                        // and stop processing further chunks.
443                        match response {
444                            zs::Response::Block(Some(block)) => {
445                                // If checking the serialized size of the block performs badly,
446                                // return the size from the state using a wrapper type.
447                                total_size += block.zcash_serialized_size();
448
449                                blocks.push(Available((block, None)))
450                            },
451                            // We don't need to limit the size of the missing block IDs list,
452                            // because it is already limited to the size of the getdata request
453                            // sent by the peer. (Their content and encodings are the same.)
454                            zs::Response::Block(None) => blocks.push(Missing(hash)),
455                            _ => unreachable!("wrong response from state"),
456                        }
457
458                    }
459
460                    // The network layer handles splitting this response into multiple `block`
461                    // messages, and a `notfound` message if needed.
462                    Ok(zn::Response::Blocks(blocks))
463                }.boxed()
464            }
465            zn::Request::TransactionsById(req_tx_ids) => {
466                // We return an available or missing response to each inventory request,
467                // unless the request is empty, or it reaches a response limit.
468                if req_tx_ids.is_empty() {
469                    return async { Ok(zn::Response::Nil) }.boxed();
470                }
471
472                let request = mempool::Request::TransactionsById(req_tx_ids.clone());
473                mempool.clone().oneshot(request).map_ok(move |resp| {
474                    let mut total_size = 0;
475
476                    let transactions = match resp {
477                        mempool::Response::Transactions(transactions) => transactions,
478                        _ => unreachable!("Mempool component should always respond to a `TransactionsById` request with a `Transactions` response"),
479                    };
480
481                    // Work out which transaction IDs were missing.
482                    let available_tx_ids: HashSet<UnminedTxId> = transactions.iter().map(|tx| tx.id).collect();
483                    // We don't need to limit the size of the missing transaction IDs list,
484                    // because it is already limited to the size of the getdata request
485                    // sent by the peer. (Their content and encodings are the same.)
486                    let missing = req_tx_ids.into_iter().filter(|tx_id| !available_tx_ids.contains(tx_id)).map(Missing);
487
488                    // If we skip sending some transactions because the limit has been reached,
489                    // they aren't reported as missing. This matches `zcashd`'s behaviour:
490                    // <https://github.com/zcash/zcash/blob/829dd94f9d253bb705f9e194f13cb8ca8e545e1e/src/main.cpp#L6410-L6412>
491                    let available = transactions.into_iter().take_while(|tx| {
492                        // We check the limit after including the transaction,
493                        // so that we can send transactions greater than 1 MB
494                        // (but only one at a time)
495                        let within_limit = total_size < GETDATA_SENT_BYTES_LIMIT;
496
497                        total_size += tx.size;
498
499                        within_limit
500                    }).map(|tx| Available((tx, None)));
501
502                    // The network layer handles splitting this response into multiple `tx`
503                    // messages, and a `notfound` message if needed.
504                    zn::Response::Transactions(available.chain(missing).collect())
505                }).boxed()
506            }
507            // Find* responses are already size-limited by the state.
508            zn::Request::FindBlocks { known_blocks, stop } => {
509                let request = zs::Request::FindBlockHashes { known_blocks, stop };
510                state.clone().oneshot(request).map_ok(|resp| match resp {
511                    zs::Response::BlockHashes(hashes) if hashes.is_empty() => zn::Response::Nil,
512                    zs::Response::BlockHashes(hashes) => zn::Response::BlockHashes(hashes),
513                    _ => unreachable!("zebra-state should always respond to a `FindBlockHashes` request with a `BlockHashes` response"),
514                })
515                    .boxed()
516            }
517            zn::Request::FindHeaders { known_blocks, stop } => {
518                let request = zs::Request::FindBlockHeaders { known_blocks, stop };
519                state.clone().oneshot(request).map_ok(|resp| match resp {
520                    zs::Response::BlockHeaders(headers) if headers.is_empty() => zn::Response::Nil,
521                    zs::Response::BlockHeaders(headers) => zn::Response::BlockHeaders(headers),
522                    _ => unreachable!("zebra-state should always respond to a `FindBlockHeaders` request with a `BlockHeaders` response"),
523                })
524                    .boxed()
525            }
526            zn::Request::PushTransaction(transaction) => {
527                mempool
528                    .clone()
529                    .oneshot(mempool::Request::Queue(vec![transaction.into()]))
530                    // The response just indicates if processing was queued or not; ignore it
531                    .map_ok(|_resp| zn::Response::Nil)
532                    .boxed()
533            }
534            zn::Request::AdvertiseTransactionIds(transactions) => {
535                let transactions = transactions.into_iter().map(Into::into).collect();
536                mempool
537                    .clone()
538                    .oneshot(mempool::Request::Queue(transactions))
539                    // The response just indicates if processing was queued or not; ignore it
540                    .map_ok(|_resp| zn::Response::Nil)
541                    .boxed()
542            }
543            zn::Request::AdvertiseBlock(hash) => {
544                block_downloads.download_and_verify(hash);
545                async { Ok(zn::Response::Nil) }.boxed()
546            }
547            // The size of this response is limited by the `Connection` state machine in the network layer
548            zn::Request::MempoolTransactionIds => {
549                mempool.clone().oneshot(mempool::Request::TransactionIds).map_ok(|resp| match resp {
550                    mempool::Response::TransactionIds(transaction_ids) if transaction_ids.is_empty() => zn::Response::Nil,
551                    mempool::Response::TransactionIds(transaction_ids) => zn::Response::TransactionIds(transaction_ids.into_iter().collect()),
552                    _ => unreachable!("Mempool component should always respond to a `TransactionIds` request with a `TransactionIds` response"),
553                })
554                    .boxed()
555            }
556            zn::Request::Ping(_) => {
557                unreachable!("ping requests are handled internally");
558            }
559        }
560    }
561}