zebrad/components/inbound/
downloads.rs

1//! A download stream that handles gossiped blocks from peers.
2
3use std::{
4    collections::HashMap,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use futures::{
10    future::TryFutureExt,
11    ready,
12    stream::{FuturesUnordered, Stream},
13};
14use pin_project::pin_project;
15use tokio::{sync::oneshot, task::JoinHandle};
16use tower::{Service, ServiceExt};
17use tracing_futures::Instrument;
18
19use zebra_chain::{
20    block::{self, HeightDiff},
21    chain_tip::ChainTip,
22};
23use zebra_network::{self as zn, PeerSocketAddr};
24use zebra_state as zs;
25
26use crate::components::sync::MIN_CONCURRENCY_LIMIT;
27
28type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
29
30/// The maximum number of concurrent inbound download and verify tasks.
31/// Also used as the maximum lookahead limit, before block verification.
32///
33/// We expect the syncer to download and verify checkpoints, so this bound
34/// can be small.
35///
36/// ## Security
37///
38/// We use a small concurrency limit, to prevent memory denial-of-service
39/// attacks.
40///
41/// The maximum block size is 2 million bytes. A deserialized malicious
42/// block with ~225_000 transparent outputs can take up 9MB of RAM.
43/// So the maximum inbound queue usage is `MAX_INBOUND_CONCURRENCY * 9 MB`.
44/// (See #1880 for more details.)
45///
46/// Malicious blocks will eventually timeout or fail contextual validation.
47/// Once validation fails, the block is dropped, and its memory is deallocated.
48///
49/// Since Zebra keeps an `inv` index, inbound downloads for malicious blocks
50/// will be directed to the malicious node that originally gossiped the hash.
51/// Therefore, this attack can be carried out by a single malicious node.
52pub const MAX_INBOUND_CONCURRENCY: usize = 30;
53
54/// The action taken in response to a peer's gossiped block hash.
55pub enum DownloadAction {
56    /// The block hash was successfully queued for download and verification.
57    AddedToQueue,
58
59    /// The block hash is already queued, so this request was ignored.
60    ///
61    /// Another peer has already gossiped the same hash to us.
62    AlreadyQueued,
63
64    /// The queue is at capacity, so this request was ignored.
65    ///
66    /// The sync service should discover this block later, when we are closer
67    /// to the tip. The queue's capacity is [`Downloads.full_verify_concurrency_limit`].
68    FullQueue,
69}
70
71/// Manages download and verification of blocks gossiped to this peer.
72#[pin_project]
73#[derive(Debug)]
74pub struct Downloads<ZN, ZV, ZS>
75where
76    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
77    ZN::Future: Send,
78    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
79        + Send
80        + Clone
81        + 'static,
82    ZV::Future: Send,
83    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
84    ZS::Future: Send,
85{
86    // Configuration
87    //
88    /// The configured full verification concurrency limit, after applying the minimum limit.
89    full_verify_concurrency_limit: usize,
90
91    // Services
92    //
93    /// A service that forwards requests to connected peers, and returns their
94    /// responses.
95    network: ZN,
96
97    /// A service that verifies downloaded blocks.
98    verifier: ZV,
99
100    /// A service that manages cached blockchain state.
101    state: ZS,
102
103    /// Allows efficient access to the best tip of the blockchain.
104    latest_chain_tip: zs::LatestChainTip,
105
106    // Internal downloads state
107    //
108    /// A list of pending block download and verify tasks.
109    #[pin]
110    pending: FuturesUnordered<
111        JoinHandle<Result<block::Hash, (BoxError, block::Hash, Option<PeerSocketAddr>)>>,
112    >,
113
114    /// A list of channels that can be used to cancel pending block download and
115    /// verify tasks.
116    cancel_handles: HashMap<block::Hash, oneshot::Sender<()>>,
117}
118
119impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
120where
121    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
122    ZN::Future: Send,
123    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
124        + Send
125        + Clone
126        + 'static,
127    ZV::Future: Send,
128    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
129    ZS::Future: Send,
130{
131    type Item = Result<block::Hash, (BoxError, Option<PeerSocketAddr>)>;
132
133    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
134        let this = self.project();
135        // CORRECTNESS
136        //
137        // The current task must be scheduled for wakeup every time we return
138        // `Poll::Pending`.
139        //
140        // If no download and verify tasks have exited since the last poll, this
141        // task is scheduled for wakeup when the next task becomes ready.
142        //
143        // TODO: this would be cleaner with poll_map (#2693)
144        if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
145            match join_result.expect("block download and verify tasks must not panic") {
146                Ok(hash) => {
147                    this.cancel_handles.remove(&hash);
148                    Poll::Ready(Some(Ok(hash)))
149                }
150                Err((e, hash, advertiser_addr)) => {
151                    this.cancel_handles.remove(&hash);
152                    Poll::Ready(Some(Err((e, advertiser_addr))))
153                }
154            }
155        } else {
156            Poll::Ready(None)
157        }
158    }
159
160    fn size_hint(&self) -> (usize, Option<usize>) {
161        self.pending.size_hint()
162    }
163}
164
165impl<ZN, ZV, ZS> Downloads<ZN, ZV, ZS>
166where
167    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
168    ZN::Future: Send,
169    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
170        + Send
171        + Clone
172        + 'static,
173    ZV::Future: Send,
174    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
175    ZS::Future: Send,
176{
177    /// Initialize a new download stream with the provided `network`, `verifier`, and `state` services.
178    /// The `latest_chain_tip` must be linked to the provided `state` service.
179    ///
180    /// The [`Downloads`] stream is agnostic to the network policy, so retry and
181    /// timeout limits should be applied to the `network` service passed into
182    /// this constructor.
183    pub fn new(
184        full_verify_concurrency_limit: usize,
185        network: ZN,
186        verifier: ZV,
187        state: ZS,
188        latest_chain_tip: zs::LatestChainTip,
189    ) -> Self {
190        // The syncer already warns about the minimum.
191        let full_verify_concurrency_limit =
192            full_verify_concurrency_limit.clamp(MIN_CONCURRENCY_LIMIT, MAX_INBOUND_CONCURRENCY);
193
194        Self {
195            full_verify_concurrency_limit,
196            network,
197            verifier,
198            state,
199            latest_chain_tip,
200            pending: FuturesUnordered::new(),
201            cancel_handles: HashMap::new(),
202        }
203    }
204
205    /// Queue a block for download and verification.
206    ///
207    /// Returns the action taken in response to the queue request.
208    #[instrument(skip(self, hash), fields(hash = %hash))]
209    pub fn download_and_verify(&mut self, hash: block::Hash) -> DownloadAction {
210        if self.cancel_handles.contains_key(&hash) {
211            debug!(
212                ?hash,
213                queue_len = self.pending.len(),
214                concurrency_limit = self.full_verify_concurrency_limit,
215                "block hash already queued for inbound download: ignored block",
216            );
217
218            metrics::gauge!("gossip.queued.block.count").set(self.pending.len() as f64);
219            metrics::counter!("gossip.already.queued.dropped.block.hash.count").increment(1);
220
221            return DownloadAction::AlreadyQueued;
222        }
223
224        if self.pending.len() >= self.full_verify_concurrency_limit {
225            debug!(
226                ?hash,
227                queue_len = self.pending.len(),
228                concurrency_limit = self.full_verify_concurrency_limit,
229                "too many blocks queued for inbound download: ignored block",
230            );
231
232            metrics::gauge!("gossip.queued.block.count").set(self.pending.len() as f64);
233            metrics::counter!("gossip.full.queue.dropped.block.hash.count").increment(1);
234
235            return DownloadAction::FullQueue;
236        }
237
238        // This oneshot is used to signal cancellation to the download task.
239        let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
240
241        let state = self.state.clone();
242        let network = self.network.clone();
243        let verifier = self.verifier.clone();
244        let latest_chain_tip = self.latest_chain_tip.clone();
245        let full_verify_concurrency_limit = self.full_verify_concurrency_limit;
246
247        let fut = async move {
248            // Check if the block is already in the state.
249            match state.oneshot(zs::Request::KnownBlock(hash)).await {
250                Ok(zs::Response::KnownBlock(None)) => Ok(()),
251                Ok(zs::Response::KnownBlock(Some(_))) => Err("already present".into()),
252                Ok(_) => unreachable!("wrong response"),
253                Err(e) => Err(e),
254            }
255            .map_err(|e| (e, None))?;
256
257            let (block, advertiser_addr) = if let zn::Response::Blocks(blocks) = network
258                .oneshot(zn::Request::BlocksByHash(std::iter::once(hash).collect()))
259                .await
260                .map_err(|e| (e, None))?
261            {
262                assert_eq!(
263                    blocks.len(),
264                    1,
265                    "wrong number of blocks in response to a single hash",
266                );
267
268                blocks
269                    .first()
270                    .expect("just checked length")
271                    .available()
272                    .expect(
273                        "unexpected missing block status: single block failures should be errors",
274                    )
275            } else {
276                unreachable!("wrong response to block request");
277            };
278            metrics::counter!("gossip.downloaded.block.count").increment(1);
279
280            // # Security & Performance
281            //
282            // Reject blocks that are too far ahead of our tip,
283            // and blocks that are behind the finalized tip.
284            //
285            // Avoids denial of service attacks. Also reduces wasted work on high blocks
286            // that will timeout before being verified, and low blocks that can never be finalized.
287            let tip_height = latest_chain_tip.best_tip_height();
288
289            let max_lookahead_height = if let Some(tip_height) = tip_height {
290                let lookahead = HeightDiff::try_from(full_verify_concurrency_limit)
291                    .expect("fits in HeightDiff");
292                (tip_height + lookahead).expect("tip is much lower than Height::MAX")
293            } else {
294                let genesis_lookahead =
295                    u32::try_from(full_verify_concurrency_limit - 1).expect("fits in u32");
296                block::Height(genesis_lookahead)
297            };
298
299            // Get the finalized tip height, assuming we're using the non-finalized state.
300            //
301            // It doesn't matter if we're a few blocks off here, because blocks this low
302            // are part of a fork with much less work. So they would be rejected anyway.
303            //
304            // And if we're still checkpointing, the checkpointer will reject blocks behind
305            // the finalized tip anyway.
306            //
307            // TODO: get the actual finalized tip height
308            let min_accepted_height = tip_height
309                .map(|tip_height| {
310                    block::Height(tip_height.0.saturating_sub(zs::MAX_BLOCK_REORG_HEIGHT))
311                })
312                .unwrap_or(block::Height(0));
313
314            let block_height = block
315                .coinbase_height()
316                .ok_or_else(|| {
317                    debug!(
318                        ?hash,
319                        "gossiped block with no height: dropped downloaded block"
320                    );
321                    metrics::counter!("gossip.no.height.dropped.block.count").increment(1);
322
323                    BoxError::from("gossiped block with no height")
324                })
325                .map_err(|e| (e, None))?;
326
327            if block_height > max_lookahead_height {
328                debug!(
329                    ?hash,
330                    ?block_height,
331                    ?tip_height,
332                    ?max_lookahead_height,
333                    lookahead_limit = full_verify_concurrency_limit,
334                    "gossiped block height too far ahead of the tip: dropped downloaded block",
335                );
336                metrics::counter!("gossip.max.height.limit.dropped.block.count").increment(1);
337
338                Err("gossiped block height too far ahead").map_err(|e| (e.into(), None))?;
339            } else if block_height < min_accepted_height {
340                debug!(
341                    ?hash,
342                    ?block_height,
343                    ?tip_height,
344                    ?min_accepted_height,
345                    behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT,
346                    "gossiped block height behind the finalized tip: dropped downloaded block",
347                );
348                metrics::counter!("gossip.min.height.limit.dropped.block.count").increment(1);
349
350                Err("gossiped block height behind the finalized tip")
351                    .map_err(|e| (e.into(), None))?;
352            }
353
354            verifier
355                .oneshot(zebra_consensus::Request::Commit(block))
356                .await
357                .map(|hash| (hash, block_height))
358                .map_err(|e| (e, advertiser_addr))
359        }
360        .map_ok(|(hash, height)| {
361            info!(?height, "downloaded and verified gossiped block");
362            metrics::counter!("gossip.verified.block.count").increment(1);
363            hash
364        })
365        // Tack the hash onto the error so we can remove the cancel handle
366        // on failure as well as on success.
367        .map_err(move |(e, advertiser_addr)| (e, hash, advertiser_addr))
368        .in_current_span();
369
370        let task = tokio::spawn(async move {
371            // Prefer the cancel handle if both are ready.
372            tokio::select! {
373                biased;
374                _ = &mut cancel_rx => {
375                    trace!("task cancelled prior to completion");
376                    metrics::counter!("gossip.cancelled.count").increment(1);
377                    Err(("canceled".into(), hash, None))
378                }
379                verification = fut => verification,
380            }
381        });
382
383        self.pending.push(task);
384        assert!(
385            self.cancel_handles.insert(hash, cancel_tx).is_none(),
386            "blocks are only queued once"
387        );
388
389        debug!(
390            ?hash,
391            queue_len = self.pending.len(),
392            concurrency_limit = self.full_verify_concurrency_limit,
393            "queued hash for download",
394        );
395        metrics::gauge!("gossip.queued.block.count").set(self.pending.len() as f64);
396
397        DownloadAction::AddedToQueue
398    }
399}