zebrad/components/sync/
downloads.rs

1//! A download stream for Zebra's block syncer.
2
3use std::{
4    collections::HashMap,
5    convert,
6    pin::Pin,
7    sync::Arc,
8    task::{Context, Poll},
9};
10
11use futures::{
12    future::{FutureExt, TryFutureExt},
13    ready,
14    stream::{FuturesUnordered, Stream},
15};
16use pin_project::pin_project;
17use thiserror::Error;
18use tokio::{
19    sync::{oneshot, watch},
20    task::JoinHandle,
21    time::timeout,
22};
23use tower::{hedge, Service, ServiceExt};
24use tracing_futures::Instrument;
25
26use zebra_chain::{
27    block::{self, Height, HeightDiff},
28    chain_tip::ChainTip,
29};
30use zebra_network::{self as zn, PeerSocketAddr};
31use zebra_state as zs;
32
33use crate::components::sync::{
34    FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT, FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT,
35};
36
37type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
38
39/// A multiplier used to calculate the extra number of blocks we allow in the
40/// verifier, state, and block commit pipelines, on top of the lookahead limit.
41///
42/// The extra number of blocks is calculated using
43/// `lookahead_limit * VERIFICATION_PIPELINE_SCALING_MULTIPLIER`.
44///
45/// This allows the verifier and state queues, and the block commit channel,
46/// to hold a few extra tips responses worth of blocks,
47/// even if the syncer queue is full. Any unused capacity is shared between both queues.
48///
49/// If this capacity is exceeded, the downloader will tell the syncer to pause new downloads.
50///
51/// Since the syncer queue is limited to the `lookahead_limit`,
52/// the rest of the capacity is reserved for the other queues.
53/// There is no reserved capacity for the syncer queue:
54/// if the other queues stay full, the syncer will eventually time out and reset.
55pub const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 2;
56
57/// The maximum height difference between Zebra's state tip and a downloaded block.
58/// Blocks higher than this will get dropped and return an error.
59pub const VERIFICATION_PIPELINE_DROP_LIMIT: HeightDiff = 50_000;
60
61#[derive(Copy, Clone, Debug)]
62pub(super) struct AlwaysHedge;
63
64impl<Request: Clone> hedge::Policy<Request> for AlwaysHedge {
65    fn can_retry(&self, _req: &Request) -> bool {
66        true
67    }
68    fn clone_request(&self, req: &Request) -> Option<Request> {
69        Some(req.clone())
70    }
71}
72
73/// Errors that can occur while downloading and verifying a block.
74#[derive(Error, Debug)]
75#[allow(dead_code)]
76pub enum BlockDownloadVerifyError {
77    #[error("permanent readiness error from the network service: {error:?}")]
78    NetworkServiceError {
79        #[source]
80        error: BoxError,
81    },
82
83    #[error("permanent readiness error from the verifier service: {error:?}")]
84    VerifierServiceError {
85        #[source]
86        error: BoxError,
87    },
88
89    #[error("duplicate block hash queued for download: {hash:?}")]
90    DuplicateBlockQueuedForDownload { hash: block::Hash },
91
92    #[error("error downloading block: {error:?} {hash:?}")]
93    DownloadFailed {
94        #[source]
95        error: BoxError,
96        hash: block::Hash,
97    },
98
99    /// A downloaded block was a long way ahead of the state chain tip.
100    /// This error should be very rare during normal operation.
101    ///
102    /// We need to reset the syncer on this error, to allow the verifier and state to catch up,
103    /// or prevent it following a bad chain.
104    ///
105    /// If we don't reset the syncer on this error, it will continue downloading blocks from a bad
106    /// chain, or blocks far ahead of the current state tip.
107    #[error("downloaded block was too far ahead of the chain tip: {height:?} {hash:?}")]
108    AboveLookaheadHeightLimit {
109        height: block::Height,
110        hash: block::Hash,
111    },
112
113    #[error("downloaded block was too far behind the chain tip: {height:?} {hash:?}")]
114    BehindTipHeightLimit {
115        height: block::Height,
116        hash: block::Hash,
117    },
118
119    #[error("downloaded block had an invalid height: {hash:?}")]
120    InvalidHeight { hash: block::Hash },
121
122    #[error("block failed consensus validation: {error:?} {height:?} {hash:?}")]
123    Invalid {
124        #[source]
125        error: zebra_consensus::router::RouterError,
126        height: block::Height,
127        hash: block::Hash,
128        advertiser_addr: Option<PeerSocketAddr>,
129    },
130
131    #[error("block validation request failed: {error:?} {height:?} {hash:?}")]
132    ValidationRequestError {
133        #[source]
134        error: BoxError,
135        height: block::Height,
136        hash: block::Hash,
137    },
138
139    #[error("block download & verification was cancelled during download: {hash:?}")]
140    CancelledDuringDownload { hash: block::Hash },
141
142    #[error(
143        "block download & verification was cancelled while waiting for the verifier service: \
144         to become ready: {height:?} {hash:?}"
145    )]
146    CancelledAwaitingVerifierReadiness {
147        height: block::Height,
148        hash: block::Hash,
149    },
150
151    #[error(
152        "block download & verification was cancelled during verification: {height:?} {hash:?}"
153    )]
154    CancelledDuringVerification {
155        height: block::Height,
156        hash: block::Hash,
157    },
158
159    #[error(
160        "timeout during service readiness, download, verification, or internal downloader operation"
161    )]
162    Timeout,
163}
164
165impl From<tokio::time::error::Elapsed> for BlockDownloadVerifyError {
166    fn from(_value: tokio::time::error::Elapsed) -> Self {
167        BlockDownloadVerifyError::Timeout
168    }
169}
170
171/// Represents a [`Stream`] of download and verification tasks during chain sync.
172#[pin_project]
173#[derive(Debug)]
174pub struct Downloads<ZN, ZV, ZSTip>
175where
176    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
177    ZN::Future: Send,
178    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
179        + Send
180        + Sync
181        + Clone
182        + 'static,
183    ZV::Future: Send,
184    ZSTip: ChainTip + Clone + Send + 'static,
185{
186    // Services
187    //
188    /// A service that forwards requests to connected peers, and returns their
189    /// responses.
190    network: ZN,
191
192    /// A service that verifies downloaded blocks.
193    verifier: ZV,
194
195    /// Allows efficient access to the best tip of the blockchain.
196    latest_chain_tip: ZSTip,
197
198    // Configuration
199    //
200    /// The configured lookahead limit, after applying the minimum limit.
201    lookahead_limit: usize,
202
203    /// The largest block height for the checkpoint verifier, based on the current config.
204    max_checkpoint_height: Height,
205
206    // Shared syncer state
207    //
208    /// Sender that is set to `true` when the downloader is past the lookahead limit.
209    /// This is based on the downloaded block height and the state tip height.
210    past_lookahead_limit_sender: Arc<std::sync::Mutex<watch::Sender<bool>>>,
211
212    /// Receiver for `past_lookahead_limit_sender`, which is used to avoid accessing the mutex.
213    past_lookahead_limit_receiver: zs::WatchReceiver<bool>,
214
215    // Internal downloads state
216    //
217    /// A list of pending block download and verify tasks.
218    #[pin]
219    pending: FuturesUnordered<
220        JoinHandle<Result<(Height, block::Hash), (BlockDownloadVerifyError, block::Hash)>>,
221    >,
222
223    /// A list of channels that can be used to cancel pending block download and
224    /// verify tasks.
225    cancel_handles: HashMap<block::Hash, oneshot::Sender<()>>,
226}
227
228impl<ZN, ZV, ZSTip> Stream for Downloads<ZN, ZV, ZSTip>
229where
230    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
231    ZN::Future: Send,
232    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
233        + Send
234        + Sync
235        + Clone
236        + 'static,
237    ZV::Future: Send,
238    ZSTip: ChainTip + Clone + Send + 'static,
239{
240    type Item = Result<(Height, block::Hash), BlockDownloadVerifyError>;
241
242    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
243        let this = self.project();
244        // CORRECTNESS
245        //
246        // The current task must be scheduled for wakeup every time we return
247        // `Poll::Pending`.
248        //
249        // If no download and verify tasks have exited since the last poll, this
250        // task is scheduled for wakeup when the next task becomes ready.
251        //
252        // TODO: this would be cleaner with poll_map (#2693)
253        if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
254            match join_result.expect("block download and verify tasks must not panic") {
255                Ok((height, hash)) => {
256                    this.cancel_handles.remove(&hash);
257
258                    Poll::Ready(Some(Ok((height, hash))))
259                }
260                Err((e, hash)) => {
261                    this.cancel_handles.remove(&hash);
262                    Poll::Ready(Some(Err(e)))
263                }
264            }
265        } else {
266            Poll::Ready(None)
267        }
268    }
269
270    fn size_hint(&self) -> (usize, Option<usize>) {
271        self.pending.size_hint()
272    }
273}
274
275impl<ZN, ZV, ZSTip> Downloads<ZN, ZV, ZSTip>
276where
277    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Sync + 'static,
278    ZN::Future: Send,
279    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
280        + Send
281        + Sync
282        + Clone
283        + 'static,
284    ZV::Future: Send,
285    ZSTip: ChainTip + Clone + Send + 'static,
286{
287    /// Initialize a new download stream with the provided `network` and
288    /// `verifier` services.
289    ///
290    /// Uses the `latest_chain_tip` and `lookahead_limit` to drop blocks
291    /// that are too far ahead of the current state tip.
292    /// Uses `max_checkpoint_height` to work around a known block timeout (#5125).
293    ///
294    /// The [`Downloads`] stream is agnostic to the network policy, so retry and
295    /// timeout limits should be applied to the `network` service passed into
296    /// this constructor.
297    pub fn new(
298        network: ZN,
299        verifier: ZV,
300        latest_chain_tip: ZSTip,
301        past_lookahead_limit_sender: watch::Sender<bool>,
302        lookahead_limit: usize,
303        max_checkpoint_height: Height,
304    ) -> Self {
305        let past_lookahead_limit_receiver =
306            zs::WatchReceiver::new(past_lookahead_limit_sender.subscribe());
307
308        Self {
309            network,
310            verifier,
311            latest_chain_tip,
312            lookahead_limit,
313            max_checkpoint_height,
314            past_lookahead_limit_sender: Arc::new(std::sync::Mutex::new(
315                past_lookahead_limit_sender,
316            )),
317            past_lookahead_limit_receiver,
318            pending: FuturesUnordered::new(),
319            cancel_handles: HashMap::new(),
320        }
321    }
322
323    /// Queue a block for download and verification.
324    ///
325    /// This method waits for the network to become ready, and returns an error
326    /// only if the network service fails. It returns immediately after queuing
327    /// the request.
328    #[instrument(level = "debug", skip(self), fields(%hash))]
329    pub async fn download_and_verify(
330        &mut self,
331        hash: block::Hash,
332    ) -> Result<(), BlockDownloadVerifyError> {
333        if self.cancel_handles.contains_key(&hash) {
334            metrics::counter!("sync.already.queued.dropped.block.hash.count").increment(1);
335            return Err(BlockDownloadVerifyError::DuplicateBlockQueuedForDownload { hash });
336        }
337
338        // We construct the block requests sequentially, waiting for the peer
339        // set to be ready to process each request. This ensures that we start
340        // block downloads in the order we want them (though they may resolve
341        // out of order), and it means that we respect backpressure. Otherwise,
342        // if we waited for readiness and did the service call in the spawned
343        // tasks, all of the spawned tasks would race each other waiting for the
344        // network to become ready.
345        let block_req = self
346            .network
347            .ready()
348            .await
349            .map_err(|error| BlockDownloadVerifyError::NetworkServiceError { error })?
350            .call(zn::Request::BlocksByHash(std::iter::once(hash).collect()));
351
352        // This oneshot is used to signal cancellation to the download task.
353        let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
354
355        let mut verifier = self.verifier.clone();
356        let latest_chain_tip = self.latest_chain_tip.clone();
357
358        let lookahead_limit = self.lookahead_limit;
359        let max_checkpoint_height = self.max_checkpoint_height;
360
361        let past_lookahead_limit_sender = self.past_lookahead_limit_sender.clone();
362        let past_lookahead_limit_receiver = self.past_lookahead_limit_receiver.clone();
363
364        let task = tokio::spawn(
365            async move {
366                // Download the block.
367                // Prefer the cancel handle if both are ready.
368                let rsp = tokio::select! {
369                    biased;
370                    _ = &mut cancel_rx => {
371                        trace!("task cancelled prior to download completion");
372                        metrics::counter!("sync.cancelled.download.count").increment(1);
373                        return Err(BlockDownloadVerifyError::CancelledDuringDownload { hash })
374                    }
375                    rsp = block_req => rsp.map_err(|error| BlockDownloadVerifyError::DownloadFailed { error, hash})?,
376                };
377
378                let (block, advertiser_addr) = if let zn::Response::Blocks(blocks) = rsp {
379                    assert_eq!(
380                        blocks.len(),
381                        1,
382                        "wrong number of blocks in response to a single hash"
383                    );
384
385                    blocks
386                        .first()
387                        .expect("just checked length")
388                        .available()
389                        .expect("unexpected missing block status: single block failures should be errors")
390                } else {
391                    unreachable!("wrong response to block request");
392                };
393                metrics::counter!("sync.downloaded.block.count").increment(1);
394
395                // Security & Performance: reject blocks that are too far ahead of our tip.
396                // Avoids denial of service attacks, and reduces wasted work on high blocks
397                // that will timeout before being verified.
398                let tip_height = latest_chain_tip.best_tip_height();
399
400                let (lookahead_drop_height, lookahead_pause_height, lookahead_reset_height) = if let Some(tip_height) = tip_height {
401                    // Scale the height limit with the lookahead limit,
402                    // so users with low capacity or under DoS can reduce them both.
403                    let lookahead_pause = HeightDiff::try_from(
404                        lookahead_limit + lookahead_limit * VERIFICATION_PIPELINE_SCALING_MULTIPLIER,
405                    )
406                        .expect("fits in HeightDiff");
407
408
409                    ((tip_height + VERIFICATION_PIPELINE_DROP_LIMIT).expect("tip is much lower than Height::MAX"),
410                     (tip_height + lookahead_pause).expect("tip is much lower than Height::MAX"),
411                     (tip_height + lookahead_pause/2).expect("tip is much lower than Height::MAX"))
412                } else {
413                    let genesis_drop = VERIFICATION_PIPELINE_DROP_LIMIT.try_into().expect("fits in u32");
414                    let genesis_lookahead =
415                        u32::try_from(lookahead_limit - 1).expect("fits in u32");
416
417                    (block::Height(genesis_drop),
418                     block::Height(genesis_lookahead),
419                     block::Height(genesis_lookahead/2))
420                };
421
422                // Get the finalized tip height, assuming we're using the non-finalized state.
423                //
424                // It doesn't matter if we're a few blocks off here, because blocks this low
425                // are part of a fork with much less work. So they would be rejected anyway.
426                //
427                // And if we're still checkpointing, the checkpointer will reject blocks behind
428                // the finalized tip anyway.
429                //
430                // TODO: get the actual finalized tip height
431                let min_accepted_height = tip_height
432                    .map(|tip_height| {
433                        block::Height(tip_height.0.saturating_sub(zs::MAX_BLOCK_REORG_HEIGHT))
434                    })
435                    .unwrap_or(block::Height(0));
436
437                let block_height = if let Some(block_height) = block.coinbase_height() {
438                    block_height
439                } else {
440                    debug!(
441                        ?hash,
442                        "synced block with no height: dropped downloaded block"
443                    );
444                    metrics::counter!("sync.no.height.dropped.block.count").increment(1);
445
446                    return Err(BlockDownloadVerifyError::InvalidHeight { hash });
447                };
448
449                if block_height > lookahead_drop_height {
450                    Err(BlockDownloadVerifyError::AboveLookaheadHeightLimit { height: block_height, hash })?;
451                } else if block_height > lookahead_pause_height {
452                    // This log can be very verbose, usually hundreds of blocks are dropped.
453                    // So we only log at info level for the first above-height block.
454                    if !past_lookahead_limit_receiver.cloned_watch_data() {
455                        info!(
456                            ?hash,
457                            ?block_height,
458                            ?tip_height,
459                            ?lookahead_pause_height,
460                            ?lookahead_reset_height,
461                            lookahead_limit = ?lookahead_limit,
462                            "synced block height too far ahead of the tip: \
463                             waiting for downloaded blocks to commit to the state",
464                        );
465
466                        // Set the watched value to true, since we're over the limit.
467                        //
468                        // It is ok to block here, because we're going to pause new downloads anyway.
469                        // But if Zebra is shutting down, ignore the send error.
470                        let _ = past_lookahead_limit_sender.lock().expect("thread panicked while holding the past_lookahead_limit_sender mutex guard").send(true);
471                    } else {
472                        debug!(
473                            ?hash,
474                            ?block_height,
475                            ?tip_height,
476                            ?lookahead_pause_height,
477                            ?lookahead_reset_height,
478                            lookahead_limit = ?lookahead_limit,
479                            "synced block height too far ahead of the tip: \
480                             waiting for downloaded blocks to commit to the state",
481                        );
482                    }
483
484                    metrics::counter!("sync.max.height.limit.paused.count").increment(1);
485                } else if block_height <= lookahead_reset_height && past_lookahead_limit_receiver.cloned_watch_data() {
486                    // Reset the watched value to false, since we're well under the limit.
487                    // We need to block here, because if we don't the syncer can hang.
488
489                    // But if Zebra is shutting down, ignore the send error.
490                    let _ = past_lookahead_limit_sender.lock().expect("thread panicked while holding the past_lookahead_limit_sender mutex guard").send(false);
491                    metrics::counter!("sync.max.height.limit.reset.count").increment(1);
492
493                    metrics::counter!("sync.max.height.limit.reset.attempt.count").increment(1);
494                }
495
496                if block_height < min_accepted_height {
497                    debug!(
498                        ?hash,
499                        ?block_height,
500                        ?tip_height,
501                        ?min_accepted_height,
502                        behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT,
503                        "synced block height behind the finalized tip: dropped downloaded block"
504                    );
505                    metrics::counter!("gossip.min.height.limit.dropped.block.count").increment(1);
506
507                    Err(BlockDownloadVerifyError::BehindTipHeightLimit { height: block_height, hash })?;
508                }
509
510                // Wait for the verifier service to be ready.
511                let readiness = verifier.ready();
512                // Prefer the cancel handle if both are ready.
513                let verifier = tokio::select! {
514                    biased;
515                    _ = &mut cancel_rx => {
516                        trace!("task cancelled waiting for verifier service readiness");
517                        metrics::counter!("sync.cancelled.verify.ready.count").increment(1);
518                        return Err(BlockDownloadVerifyError::CancelledAwaitingVerifierReadiness { height: block_height, hash })
519                    }
520                    verifier = readiness => verifier,
521                };
522
523                // Verify the block.
524                let mut rsp = verifier
525                    .map_err(|error| BlockDownloadVerifyError::VerifierServiceError { error })?
526                    .call(zebra_consensus::Request::Commit(block)).boxed();
527
528                // Add a shorter timeout to workaround a known bug (#5125)
529                let short_timeout_max = (max_checkpoint_height + FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT).expect("checkpoint block height is in valid range");
530                if block_height >= max_checkpoint_height && block_height <= short_timeout_max {
531                    rsp = timeout(FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT, rsp)
532                        .map_err(|timeout| format!("initial fully verified block timed out: retrying: {timeout:?}").into())
533                        .map(|nested_result| nested_result.and_then(convert::identity)).boxed();
534                }
535
536                let verification = tokio::select! {
537                    biased;
538                    _ = &mut cancel_rx => {
539                        trace!("task cancelled prior to verification");
540                        metrics::counter!("sync.cancelled.verify.count").increment(1);
541                        return Err(BlockDownloadVerifyError::CancelledDuringVerification { height: block_height, hash })
542                    }
543                    verification = rsp => verification,
544                };
545
546                if verification.is_ok() {
547                    metrics::counter!("sync.verified.block.count").increment(1);
548                }
549
550                verification
551                    .map(|hash| (block_height, hash))
552                    .map_err(|err| {
553                        match err.downcast::<zebra_consensus::router::RouterError>() {
554                            Ok(error) => BlockDownloadVerifyError::Invalid { error: *error, height: block_height, hash, advertiser_addr },
555                            Err(error) => BlockDownloadVerifyError::ValidationRequestError { error, height: block_height, hash },
556                        }
557                    })
558            }
559            .in_current_span()
560            // Tack the hash onto the error so we can remove the cancel handle
561            // on failure as well as on success.
562            .map_err(move |e| (e, hash)),
563        );
564
565        // Try to start the spawned task before queueing the next block request
566        tokio::task::yield_now().await;
567
568        self.pending.push(task);
569        assert!(
570            self.cancel_handles.insert(hash, cancel_tx).is_none(),
571            "blocks are only queued once"
572        );
573
574        Ok(())
575    }
576
577    /// Cancel all running tasks and reset the downloader state.
578    pub fn cancel_all(&mut self) {
579        // Replace the pending task list with an empty one and drop it.
580        let _ = std::mem::take(&mut self.pending);
581
582        // Signal cancellation to all running tasks.
583        // Since we already dropped the JoinHandles above, they should
584        // fail silently.
585        for (_hash, cancel) in self.cancel_handles.drain() {
586            let _ = cancel.send(());
587        }
588
589        assert!(self.pending.is_empty());
590        assert!(self.cancel_handles.is_empty());
591
592        // Set the lookahead limit to false, since we're empty (so we're under the limit).
593        //
594        // It is ok to block here, because we're doing a reset and sleep anyway.
595        // But if Zebra is shutting down, ignore the send error.
596        let _ = self
597            .past_lookahead_limit_sender
598            .lock()
599            .expect("thread panicked while holding the past_lookahead_limit_sender mutex guard")
600            .send(false);
601    }
602
603    /// Get the number of currently in-flight download and verify tasks.
604    pub fn in_flight(&mut self) -> usize {
605        self.pending.len()
606    }
607
608    /// Returns true if there are no in-flight download and verify tasks.
609    #[allow(dead_code)]
610    pub fn is_empty(&mut self) -> bool {
611        self.pending.is_empty()
612    }
613}