zebrad/components/inbound.rs
1//! The inbound service handles requests from Zebra's peers.
2//!
3//! It downloads and verifies gossiped blocks and mempool transactions,
4//! when Zebra is close to the chain tip.
5//!
6//! It also responds to peer requests for blocks, transactions, and peer addresses.
7
8use std::{
9 collections::HashSet,
10 future::Future,
11 pin::Pin,
12 sync::Arc,
13 task::{Context, Poll},
14 time::Duration,
15};
16
17use futures::{
18 future::{FutureExt, TryFutureExt},
19 stream::Stream,
20};
21use tokio::sync::oneshot::{self, error::TryRecvError};
22use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt};
23
24use zebra_network::{self as zn, PeerSocketAddr};
25use zebra_state::{self as zs};
26
27use zebra_chain::{
28 block::{self, Block},
29 serialization::ZcashSerialize,
30 transaction::UnminedTxId,
31};
32use zebra_consensus::{router::RouterError, VerifyBlockError};
33use zebra_network::{AddressBook, InventoryResponse};
34use zebra_node_services::mempool;
35
36use crate::BoxError;
37
38// Re-use the syncer timeouts for consistency.
39use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
40
41use InventoryResponse::*;
42
43mod cached_peer_addr_response;
44pub(crate) mod downloads;
45
46use cached_peer_addr_response::CachedPeerAddrResponse;
47
48#[cfg(test)]
49mod tests;
50
51use downloads::Downloads as BlockDownloads;
52
53/// The maximum amount of time an inbound service response can take.
54///
55/// If the response takes longer than this time, it will be cancelled,
56/// and the peer might be disconnected.
57pub const MAX_INBOUND_RESPONSE_TIME: Duration = Duration::from_secs(5);
58
59/// The number of bytes the [`Inbound`] service will queue in response to a single block or
60/// transaction request, before ignoring any additional block or transaction IDs in that request.
61///
62/// This is the same as `zcashd`'s default send buffer limit:
63/// <https://github.com/zcash/zcash/blob/829dd94f9d253bb705f9e194f13cb8ca8e545e1e/src/net.h#L84>
64/// as used in `ProcessGetData()`:
65/// <https://github.com/zcash/zcash/blob/829dd94f9d253bb705f9e194f13cb8ca8e545e1e/src/main.cpp#L6410-L6412>
66pub const GETDATA_SENT_BYTES_LIMIT: usize = 1_000_000;
67
68/// The maximum number of blocks the [`Inbound`] service will queue in response to a block request,
69/// before ignoring any additional block IDs in that request.
70///
71/// This is the same as `zcashd`'s request limit:
72/// <https://github.com/zcash/zcash/blob/829dd94f9d253bb705f9e194f13cb8ca8e545e1e/src/main.h#L108>
73///
74/// (Zebra's request limit is one block in transit per peer, because it fans out block requests to
75/// many peers instead of just a few peers.)
76pub const GETDATA_MAX_BLOCK_COUNT: usize = 16;
77
78type BlockDownloadPeerSet =
79 Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
80type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
81type Mempool = Buffer<BoxService<mempool::Request, mempool::Response, BoxError>, mempool::Request>;
82type SemanticBlockVerifier = Buffer<
83 BoxService<zebra_consensus::Request, block::Hash, RouterError>,
84 zebra_consensus::Request,
85>;
86type GossipedBlockDownloads =
87 BlockDownloads<Timeout<BlockDownloadPeerSet>, Timeout<SemanticBlockVerifier>, State>;
88
89/// The services used by the [`Inbound`] service.
90pub struct InboundSetupData {
91 /// A shared list of peer addresses.
92 pub address_book: Arc<std::sync::Mutex<AddressBook>>,
93
94 /// A service that can be used to download gossiped blocks.
95 pub block_download_peer_set: BlockDownloadPeerSet,
96
97 /// A service that verifies downloaded blocks.
98 ///
99 /// Given to `Inbound.block_downloads` after the required services are set up.
100 pub block_verifier: SemanticBlockVerifier,
101
102 /// A service that manages transactions in the memory pool.
103 pub mempool: Mempool,
104
105 /// A service that manages cached blockchain state.
106 pub state: State,
107
108 /// Allows efficient access to the best tip of the blockchain.
109 pub latest_chain_tip: zs::LatestChainTip,
110
111 /// A channel to send misbehavior reports to the [`AddressBook`].
112 pub misbehavior_sender: tokio::sync::mpsc::Sender<(PeerSocketAddr, u32)>,
113}
114
115/// Tracks the internal state of the [`Inbound`] service during setup.
116pub enum Setup {
117 /// Waiting for service setup to complete.
118 ///
119 /// All requests are ignored.
120 Pending {
121 // Configuration
122 //
123 /// The configured full verification concurrency limit.
124 full_verify_concurrency_limit: usize,
125
126 // Services
127 //
128 /// A oneshot channel used to receive required services,
129 /// after they are set up.
130 setup: oneshot::Receiver<InboundSetupData>,
131 },
132
133 /// Setup is complete.
134 ///
135 /// All requests are answered.
136 Initialized {
137 // Services
138 //
139 /// An owned partial list of peer addresses used as a `GetAddr` response, and
140 /// a shared list of peer addresses used to periodically refresh the partial list.
141 ///
142 /// Refreshed from the address book in `poll_ready` method
143 /// after [`CACHED_ADDRS_REFRESH_INTERVAL`](cached_peer_addr_response::CACHED_ADDRS_REFRESH_INTERVAL).
144 cached_peer_addr_response: CachedPeerAddrResponse,
145
146 /// A `futures::Stream` that downloads and verifies gossiped blocks.
147 block_downloads: Pin<Box<GossipedBlockDownloads>>,
148
149 /// A service that manages transactions in the memory pool.
150 mempool: Mempool,
151
152 /// A service that manages cached blockchain state.
153 state: State,
154
155 /// A channel to send misbehavior reports to the [`AddressBook`].
156 misbehavior_sender: tokio::sync::mpsc::Sender<(PeerSocketAddr, u32)>,
157 },
158
159 /// Temporary state used in the inbound service's internal initialization code.
160 ///
161 /// If this state occurs outside the service initialization code, the service panics.
162 FailedInit,
163
164 /// Setup failed, because the setup channel permanently failed.
165 /// The service keeps returning readiness errors for every request.
166 FailedRecv {
167 /// The original channel error.
168 error: SharedRecvError,
169 },
170}
171
172/// A wrapper around `Arc<TryRecvError>` that implements `Error`.
173#[derive(thiserror::Error, Debug, Clone)]
174#[error(transparent)]
175pub struct SharedRecvError(Arc<TryRecvError>);
176
177impl From<TryRecvError> for SharedRecvError {
178 fn from(source: TryRecvError) -> Self {
179 Self(Arc::new(source))
180 }
181}
182
183/// Uses the node state to respond to inbound peer requests.
184///
185/// This service, wrapped in appropriate middleware, is passed to
186/// `zebra_network::init` to respond to inbound peer requests.
187///
188/// The `Inbound` service is responsible for:
189///
190/// - supplying network data like peer addresses to other nodes;
191/// - supplying chain data like blocks to other nodes;
192/// - supplying mempool transactions to other nodes;
193/// - receiving gossiped transactions; and
194/// - receiving gossiped blocks.
195///
196/// Because the `Inbound` service is responsible for participating in the gossip
197/// protocols used for transaction and block diffusion, there is a potential
198/// overlap with the `ChainSync` and `Mempool` components.
199///
200/// The division of responsibility is that:
201///
202/// The `ChainSync` and `Mempool` components are *internally driven*,
203/// periodically polling the network to check for new blocks or transactions.
204///
205/// The `Inbound` service is *externally driven*, responding to block gossip
206/// by attempting to download and validate advertised blocks.
207///
208/// Gossiped transactions are forwarded to the mempool downloader,
209/// which unifies polled and gossiped transactions into a single download list.
210pub struct Inbound {
211 /// Provides service dependencies, if they are available.
212 ///
213 /// Some services are unavailable until Zebra has completed setup.
214 setup: Setup,
215}
216
217impl Inbound {
218 /// Create a new inbound service.
219 ///
220 /// Dependent services are sent via the `setup` channel after initialization.
221 pub fn new(
222 full_verify_concurrency_limit: usize,
223 setup: oneshot::Receiver<InboundSetupData>,
224 ) -> Inbound {
225 Inbound {
226 setup: Setup::Pending {
227 full_verify_concurrency_limit,
228 setup,
229 },
230 }
231 }
232
233 /// Remove `self.setup`, temporarily replacing it with an invalid state.
234 fn take_setup(&mut self) -> Setup {
235 let mut setup = Setup::FailedInit;
236 std::mem::swap(&mut self.setup, &mut setup);
237 setup
238 }
239}
240
241impl Service<zn::Request> for Inbound {
242 type Response = zn::Response;
243 type Error = zn::BoxError;
244 type Future =
245 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
246
247 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
248 // Check whether the setup is finished, but don't wait for it to
249 // become ready before reporting readiness. We expect to get it "soon",
250 // and reporting unreadiness might cause unwanted load-shedding, since
251 // the load-shed middleware is unable to distinguish being unready due
252 // to load from being unready while waiting on setup.
253
254 // Every setup variant handler must provide a result
255 let result;
256
257 self.setup = match self.take_setup() {
258 Setup::Pending {
259 full_verify_concurrency_limit,
260 mut setup,
261 } => match setup.try_recv() {
262 Ok(setup_data) => {
263 let InboundSetupData {
264 address_book,
265 block_download_peer_set,
266 block_verifier,
267 mempool,
268 state,
269 latest_chain_tip,
270 misbehavior_sender,
271 } = setup_data;
272
273 let cached_peer_addr_response = CachedPeerAddrResponse::new(address_book);
274
275 let block_downloads = Box::pin(BlockDownloads::new(
276 full_verify_concurrency_limit,
277 Timeout::new(block_download_peer_set, BLOCK_DOWNLOAD_TIMEOUT),
278 Timeout::new(block_verifier, BLOCK_VERIFY_TIMEOUT),
279 state.clone(),
280 latest_chain_tip,
281 ));
282
283 result = Ok(());
284 Setup::Initialized {
285 cached_peer_addr_response,
286 block_downloads,
287 mempool,
288 state,
289 misbehavior_sender,
290 }
291 }
292 Err(TryRecvError::Empty) => {
293 // There's no setup data yet, so keep waiting for it.
294 //
295 // We could use Future::poll() to get a waker and return Poll::Pending here.
296 // But we want to drop excess requests during startup instead. Otherwise,
297 // the inbound service gets overloaded, and starts disconnecting peers.
298 result = Ok(());
299 Setup::Pending {
300 full_verify_concurrency_limit,
301 setup,
302 }
303 }
304 Err(error @ TryRecvError::Closed) => {
305 // Mark the service as failed, because setup failed
306 error!(?error, "inbound setup failed");
307 let error: SharedRecvError = error.into();
308 result = Err(error.clone().into());
309 Setup::FailedRecv { error }
310 }
311 },
312 // Make sure previous setups were left in a valid state
313 Setup::FailedInit => unreachable!("incomplete previous Inbound initialization"),
314 // If setup failed, report service failure
315 Setup::FailedRecv { error } => {
316 result = Err(error.clone().into());
317 Setup::FailedRecv { error }
318 }
319 // Clean up completed download tasks, ignoring their results
320 Setup::Initialized {
321 cached_peer_addr_response,
322 mut block_downloads,
323 mempool,
324 state,
325 misbehavior_sender,
326 } => {
327 // # Correctness
328 //
329 // Clear the stream but ignore the final Pending return value.
330 // If we returned Pending here, and there were no waiting block downloads,
331 // then inbound requests would wait for the next block download, and hang forever.
332 while let Poll::Ready(Some(result)) = block_downloads.as_mut().poll_next(cx) {
333 let Err((err, Some(advertiser_addr))) = result else {
334 continue;
335 };
336
337 let Ok(err) = err.downcast::<VerifyBlockError>() else {
338 continue;
339 };
340
341 if err.misbehavior_score() != 0 {
342 let _ =
343 misbehavior_sender.try_send((advertiser_addr, err.misbehavior_score()));
344 }
345 }
346
347 result = Ok(());
348
349 Setup::Initialized {
350 cached_peer_addr_response,
351 block_downloads,
352 mempool,
353 state,
354 misbehavior_sender,
355 }
356 }
357 };
358
359 // Make sure we're leaving the setup in a valid state
360 if matches!(self.setup, Setup::FailedInit) {
361 unreachable!("incomplete Inbound initialization after poll_ready state handling");
362 }
363
364 // TODO:
365 // * do we want to propagate backpressure from the download queue or its outbound network?
366 // currently, the download queue waits for the outbound network in the download future,
367 // and drops new requests after it reaches a hard-coded limit. This is the
368 // "load shed directly" pattern from #1618.
369 // * currently, the state service is always ready, unless its buffer is full.
370 // So we might also want to propagate backpressure from its buffer.
371 // * poll_ready needs to be implemented carefully, to avoid hangs or deadlocks.
372 // See #1593 for details.
373 Poll::Ready(result)
374 }
375
376 /// Call the inbound service.
377 ///
378 /// Errors indicate that the peer has done something wrong or unexpected,
379 /// and will cause callers to disconnect from the remote peer.
380 #[instrument(name = "inbound", skip(self, req))]
381 fn call(&mut self, req: zn::Request) -> Self::Future {
382 let (cached_peer_addr_response, block_downloads, mempool, state) = match &mut self.setup {
383 Setup::Initialized {
384 cached_peer_addr_response,
385 block_downloads,
386 mempool,
387 state,
388 misbehavior_sender: _,
389 } => (cached_peer_addr_response, block_downloads, mempool, state),
390 _ => {
391 debug!("ignoring request from remote peer during setup");
392 return async { Ok(zn::Response::Nil) }.boxed();
393 }
394 };
395
396 match req {
397 zn::Request::Peers => {
398 // # Security
399 //
400 // We truncate the list to not reveal our entire peer set in one call.
401 // But we don't monitor repeated requests and the results are shuffled,
402 // a crawler could just send repeated queries and get the full list.
403 //
404 // # Correctness
405 //
406 // If the address book is busy, try again inside the future. If it can't be locked
407 // twice, ignore the request.
408 cached_peer_addr_response.try_refresh();
409 let response = cached_peer_addr_response.value();
410
411 async move {
412 Ok(response)
413 }.boxed()
414 }
415 zn::Request::BlocksByHash(hashes) => {
416 // We return an available or missing response to each inventory request,
417 // unless the request is empty, or it reaches a response limit.
418 if hashes.is_empty() {
419 return async { Ok(zn::Response::Nil) }.boxed();
420 }
421
422 let state = state.clone();
423
424 async move {
425 let mut blocks: Vec<InventoryResponse<(Arc<Block>, Option<PeerSocketAddr>), block::Hash>> = Vec::new();
426 let mut total_size = 0;
427
428 // Ignore any block hashes past the response limit.
429 // This saves us expensive database lookups.
430 for &hash in hashes.iter().take(GETDATA_MAX_BLOCK_COUNT) {
431 // We check the limit after including at least one block, so that we can
432 // send blocks greater than 1 MB (but only one at a time)
433 if total_size >= GETDATA_SENT_BYTES_LIMIT {
434 break;
435 }
436
437 let response = state.clone().ready().await?.call(zs::Request::Block(hash.into())).await?;
438
439 // Add the block responses to the list, while updating the size limit.
440 //
441 // If there was a database error, return the error,
442 // and stop processing further chunks.
443 match response {
444 zs::Response::Block(Some(block)) => {
445 // If checking the serialized size of the block performs badly,
446 // return the size from the state using a wrapper type.
447 total_size += block.zcash_serialized_size();
448
449 blocks.push(Available((block, None)))
450 },
451 // We don't need to limit the size of the missing block IDs list,
452 // because it is already limited to the size of the getdata request
453 // sent by the peer. (Their content and encodings are the same.)
454 zs::Response::Block(None) => blocks.push(Missing(hash)),
455 _ => unreachable!("wrong response from state"),
456 }
457
458 }
459
460 // The network layer handles splitting this response into multiple `block`
461 // messages, and a `notfound` message if needed.
462 Ok(zn::Response::Blocks(blocks))
463 }.boxed()
464 }
465 zn::Request::TransactionsById(req_tx_ids) => {
466 // We return an available or missing response to each inventory request,
467 // unless the request is empty, or it reaches a response limit.
468 if req_tx_ids.is_empty() {
469 return async { Ok(zn::Response::Nil) }.boxed();
470 }
471
472 let request = mempool::Request::TransactionsById(req_tx_ids.clone());
473 mempool.clone().oneshot(request).map_ok(move |resp| {
474 let mut total_size = 0;
475
476 let transactions = match resp {
477 mempool::Response::Transactions(transactions) => transactions,
478 _ => unreachable!("Mempool component should always respond to a `TransactionsById` request with a `Transactions` response"),
479 };
480
481 // Work out which transaction IDs were missing.
482 let available_tx_ids: HashSet<UnminedTxId> = transactions.iter().map(|tx| tx.id).collect();
483 // We don't need to limit the size of the missing transaction IDs list,
484 // because it is already limited to the size of the getdata request
485 // sent by the peer. (Their content and encodings are the same.)
486 let missing = req_tx_ids.into_iter().filter(|tx_id| !available_tx_ids.contains(tx_id)).map(Missing);
487
488 // If we skip sending some transactions because the limit has been reached,
489 // they aren't reported as missing. This matches `zcashd`'s behaviour:
490 // <https://github.com/zcash/zcash/blob/829dd94f9d253bb705f9e194f13cb8ca8e545e1e/src/main.cpp#L6410-L6412>
491 let available = transactions.into_iter().take_while(|tx| {
492 // We check the limit after including the transaction,
493 // so that we can send transactions greater than 1 MB
494 // (but only one at a time)
495 let within_limit = total_size < GETDATA_SENT_BYTES_LIMIT;
496
497 total_size += tx.size;
498
499 within_limit
500 }).map(|tx| Available((tx, None)));
501
502 // The network layer handles splitting this response into multiple `tx`
503 // messages, and a `notfound` message if needed.
504 zn::Response::Transactions(available.chain(missing).collect())
505 }).boxed()
506 }
507 // Find* responses are already size-limited by the state.
508 zn::Request::FindBlocks { known_blocks, stop } => {
509 let request = zs::Request::FindBlockHashes { known_blocks, stop };
510 state.clone().oneshot(request).map_ok(|resp| match resp {
511 zs::Response::BlockHashes(hashes) if hashes.is_empty() => zn::Response::Nil,
512 zs::Response::BlockHashes(hashes) => zn::Response::BlockHashes(hashes),
513 _ => unreachable!("zebra-state should always respond to a `FindBlockHashes` request with a `BlockHashes` response"),
514 })
515 .boxed()
516 }
517 zn::Request::FindHeaders { known_blocks, stop } => {
518 let request = zs::Request::FindBlockHeaders { known_blocks, stop };
519 state.clone().oneshot(request).map_ok(|resp| match resp {
520 zs::Response::BlockHeaders(headers) if headers.is_empty() => zn::Response::Nil,
521 zs::Response::BlockHeaders(headers) => zn::Response::BlockHeaders(headers),
522 _ => unreachable!("zebra-state should always respond to a `FindBlockHeaders` request with a `BlockHeaders` response"),
523 })
524 .boxed()
525 }
526 zn::Request::PushTransaction(transaction) => {
527 mempool
528 .clone()
529 .oneshot(mempool::Request::Queue(vec![transaction.into()]))
530 // The response just indicates if processing was queued or not; ignore it
531 .map_ok(|_resp| zn::Response::Nil)
532 .boxed()
533 }
534 zn::Request::AdvertiseTransactionIds(transactions) => {
535 let transactions = transactions.into_iter().map(Into::into).collect();
536 mempool
537 .clone()
538 .oneshot(mempool::Request::Queue(transactions))
539 // The response just indicates if processing was queued or not; ignore it
540 .map_ok(|_resp| zn::Response::Nil)
541 .boxed()
542 }
543 zn::Request::AdvertiseBlock(hash) => {
544 block_downloads.download_and_verify(hash);
545 async { Ok(zn::Response::Nil) }.boxed()
546 }
547 // The size of this response is limited by the `Connection` state machine in the network layer
548 zn::Request::MempoolTransactionIds => {
549 mempool.clone().oneshot(mempool::Request::TransactionIds).map_ok(|resp| match resp {
550 mempool::Response::TransactionIds(transaction_ids) if transaction_ids.is_empty() => zn::Response::Nil,
551 mempool::Response::TransactionIds(transaction_ids) => zn::Response::TransactionIds(transaction_ids.into_iter().collect()),
552 _ => unreachable!("Mempool component should always respond to a `TransactionIds` request with a `TransactionIds` response"),
553 })
554 .boxed()
555 }
556 zn::Request::Ping(_) => {
557 unreachable!("ping requests are handled internally");
558 }
559 }
560 }
561}