zebrad/components/
sync.rs

1//! The syncer downloads and verifies large numbers of blocks from peers to Zebra.
2//!
3//! It is used when Zebra is a long way behind the current chain tip.
4
5use std::{cmp::max, collections::HashSet, convert, pin::Pin, task::Poll, time::Duration};
6
7use color_eyre::eyre::{eyre, Report};
8use futures::stream::{FuturesUnordered, StreamExt};
9use indexmap::IndexSet;
10use serde::{Deserialize, Serialize};
11use tokio::{
12    sync::{mpsc, watch},
13    task::JoinError,
14    time::{sleep, timeout},
15};
16use tower::{
17    builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout,
18    Service, ServiceExt,
19};
20
21use zebra_chain::{
22    block::{self, Height, HeightDiff},
23    chain_tip::ChainTip,
24};
25use zebra_network::{self as zn, PeerSocketAddr};
26use zebra_state as zs;
27
28use crate::{
29    components::sync::downloads::BlockDownloadVerifyError, config::ZebradConfig, BoxError,
30};
31
32mod downloads;
33pub mod end_of_support;
34mod gossip;
35mod progress;
36mod recent_sync_lengths;
37mod status;
38
39#[cfg(test)]
40mod tests;
41
42use downloads::{AlwaysHedge, Downloads};
43
44pub use downloads::VERIFICATION_PIPELINE_SCALING_MULTIPLIER;
45pub use gossip::{gossip_best_tip_block_hashes, BlockGossipError};
46pub use progress::show_block_chain_progress;
47pub use recent_sync_lengths::RecentSyncLengths;
48pub use status::SyncStatus;
49
50/// Controls the number of peers used for each ObtainTips and ExtendTips request.
51const FANOUT: usize = 3;
52
53/// Controls how many times we will retry each block download.
54///
55/// Failing block downloads is important because it defends against peers who
56/// feed us bad hashes. But spurious failures of valid blocks cause the syncer to
57/// restart from the previous checkpoint, potentially re-downloading blocks.
58///
59/// We also hedge requests, so we may retry up to twice this many times. Hedged
60/// retries may be concurrent, inner retries are sequential.
61const BLOCK_DOWNLOAD_RETRY_LIMIT: usize = 3;
62
63/// A lower bound on the user-specified checkpoint verification concurrency limit.
64///
65/// Set to the maximum checkpoint interval, so the pipeline holds around a checkpoint's
66/// worth of blocks.
67///
68/// ## Security
69///
70/// If a malicious node is chosen for an ObtainTips or ExtendTips request, it can
71/// provide up to 500 malicious block hashes. These block hashes will be
72/// distributed across all available peers. Assuming there are around 50 connected
73/// peers, the malicious node will receive approximately 10 of those block requests.
74///
75/// Malicious deserialized blocks can take up a large amount of RAM, see
76/// [`super::inbound::downloads::MAX_INBOUND_CONCURRENCY`] and #1880 for details.
77/// So we want to keep the lookahead limit reasonably small.
78///
79/// Once these malicious blocks start failing validation, the syncer will cancel all
80/// the pending download and verify tasks, drop all the blocks, and start a new
81/// ObtainTips with a new set of peers.
82pub const MIN_CHECKPOINT_CONCURRENCY_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP;
83
84/// The default for the user-specified lookahead limit.
85///
86/// See [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`] for details.
87pub const DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT: usize = MAX_TIPS_RESPONSE_HASH_COUNT * 2;
88
89/// A lower bound on the user-specified concurrency limit.
90///
91/// If the concurrency limit is 0, Zebra can't download or verify any blocks.
92pub const MIN_CONCURRENCY_LIMIT: usize = 1;
93
94/// The expected maximum number of hashes in an ObtainTips or ExtendTips response.
95///
96/// This is used to allow block heights that are slightly beyond the lookahead limit,
97/// but still limit the number of blocks in the pipeline between the downloader and
98/// the state.
99///
100/// See [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`] for details.
101pub const MAX_TIPS_RESPONSE_HASH_COUNT: usize = 500;
102
103/// Controls how long we wait for a tips response to return.
104///
105/// ## Correctness
106///
107/// If this timeout is removed (or set too high), the syncer will sometimes hang.
108///
109/// If this timeout is set too low, the syncer will sometimes get stuck in a
110/// failure loop.
111pub const TIPS_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);
112
113/// Controls how long we wait between gossiping successive blocks or transactions.
114///
115/// ## Correctness
116///
117/// If this timeout is set too high, blocks and transactions won't propagate through
118/// the network efficiently.
119///
120/// If this timeout is set too low, the peer set and remote peers can get overloaded.
121pub const PEER_GOSSIP_DELAY: Duration = Duration::from_secs(7);
122
123/// Controls how long we wait for a block download request to complete.
124///
125/// This timeout makes sure that the syncer doesn't hang when:
126///   - the lookahead queue is full, and
127///   - we are waiting for a request that is stuck.
128///
129/// See [`BLOCK_VERIFY_TIMEOUT`] for details.
130///
131/// ## Correctness
132///
133/// If this timeout is removed (or set too high), the syncer will sometimes hang.
134///
135/// If this timeout is set too low, the syncer will sometimes get stuck in a
136/// failure loop.
137///
138/// We set the timeout so that it requires under 1 Mbps bandwidth for a full 2 MB block.
139pub(super) const BLOCK_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(20);
140
141/// Controls how long we wait for a block verify request to complete.
142///
143/// This timeout makes sure that the syncer doesn't hang when:
144///  - the lookahead queue is full, and
145///  - all pending verifications:
146///    - are waiting on a missing download request,
147///    - are waiting on a download or verify request that has failed, but we have
148///      deliberately ignored the error,
149///    - are for blocks a long way ahead of the current tip, or
150///    - are for invalid blocks which will never verify, because they depend on
151///      missing blocks or transactions.
152///
153/// These conditions can happen during normal operation - they are not bugs.
154///
155/// This timeout also mitigates or hides the following kinds of bugs:
156///  - all pending verifications:
157///    - are waiting on a download or verify request that has failed, but we have
158///      accidentally dropped the error,
159///    - are waiting on a download request that has hung inside Zebra,
160///    - are on tokio threads that are waiting for blocked operations.
161///
162/// ## Correctness
163///
164/// If this timeout is removed (or set too high), the syncer will sometimes hang.
165///
166/// If this timeout is set too low, the syncer will sometimes get stuck in a
167/// failure loop.
168///
169/// We've observed spurious 15 minute timeouts when a lot of blocks are being committed to
170/// the state. But there are also some blocks that seem to hang entirely, and never return.
171///
172/// So we allow about half the spurious timeout, which might cause some re-downloads.
173pub(super) const BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(8 * 60);
174
175/// A shorter timeout used for the first few blocks after the final checkpoint.
176///
177/// This is a workaround for bug #5125, where the first fully validated blocks
178/// after the final checkpoint fail with a timeout, due to a UTXO race condition.
179const FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(2 * 60);
180
181/// The number of blocks after the final checkpoint that get the shorter timeout.
182///
183/// We've only seen this error on the first few blocks after the final checkpoint.
184const FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT: HeightDiff = 100;
185
186/// Controls how long we wait to restart syncing after finishing a sync run.
187///
188/// This delay should be long enough to:
189///   - allow zcashd peers to process pending requests. If the node only has a
190///     few peers, we want to clear as much peer state as possible. In
191///     particular, zcashd sends "next block range" hints, based on zcashd's
192///     internal model of our sync progress. But we want to discard these hints,
193///     so they don't get confused with ObtainTips and ExtendTips responses, and
194///   - allow in-progress downloads to time out.
195///
196/// This delay is particularly important on instances with slow or unreliable
197/// networks, and on testnet, which has a small number of slow peers.
198///
199/// Using a prime number makes sure that syncer fanouts don't synchronise with other crawls.
200///
201/// ## Correctness
202///
203/// If this delay is removed (or set too low), the syncer will
204/// sometimes get stuck in a failure loop, due to leftover downloads from
205/// previous sync runs.
206const SYNC_RESTART_DELAY: Duration = Duration::from_secs(67);
207
208/// Controls how long we wait to retry a failed attempt to download
209/// and verify the genesis block.
210///
211/// This timeout gives the crawler time to find better peers.
212///
213/// ## Security
214///
215/// If this timeout is removed (or set too low), Zebra will immediately retry
216/// to download and verify the genesis block from its peers. This can cause
217/// a denial of service on those peers.
218///
219/// If this timeout is too short, old or buggy nodes will keep making useless
220/// network requests. If there are a lot of them, it could overwhelm the network.
221const GENESIS_TIMEOUT_RETRY: Duration = Duration::from_secs(10);
222
223/// Sync configuration section.
224#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
225#[serde(deny_unknown_fields, default)]
226pub struct Config {
227    /// The number of parallel block download requests.
228    ///
229    /// This is set to a low value by default, to avoid task and
230    /// network contention. Increasing this value may improve
231    /// performance on machines with a fast network connection.
232    #[serde(alias = "max_concurrent_block_requests")]
233    pub download_concurrency_limit: usize,
234
235    /// The number of blocks submitted in parallel to the checkpoint verifier.
236    ///
237    /// Increasing this limit increases the buffer size, so it reduces
238    /// the impact of an individual block request failing. However, it
239    /// also increases memory and CPU usage if block validation stalls,
240    /// or there are some large blocks in the pipeline.
241    ///
242    /// The block size limit is 2MB, so in theory, this could represent multiple
243    /// gigabytes of data, if we downloaded arbitrary blocks. However,
244    /// because we randomly load balance outbound requests, and separate
245    /// block download from obtaining block hashes, an adversary would
246    /// have to control a significant fraction of our peers to lead us
247    /// astray.
248    ///
249    /// For reliable checkpoint syncing, Zebra enforces a
250    /// [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`].
251    ///
252    /// This is set to a high value by default, to avoid verification pipeline stalls.
253    /// Decreasing this value reduces RAM usage.
254    #[serde(alias = "lookahead_limit")]
255    pub checkpoint_verify_concurrency_limit: usize,
256
257    /// The number of blocks submitted in parallel to the full verifier.
258    ///
259    /// This is set to a low value by default, to avoid verification timeouts on large blocks.
260    /// Increasing this value may improve performance on machines with many cores.
261    pub full_verify_concurrency_limit: usize,
262
263    /// The number of threads used to verify signatures, proofs, and other CPU-intensive code.
264    ///
265    /// If the number of threads is not configured or zero, Zebra uses the number of logical cores.
266    /// If the number of logical cores can't be detected, Zebra uses one thread.
267    /// For details, see [the `rayon` documentation](https://docs.rs/rayon/latest/rayon/struct.ThreadPoolBuilder.html#method.num_threads).
268    pub parallel_cpu_threads: usize,
269}
270
271impl Default for Config {
272    fn default() -> Self {
273        Self {
274            // 2/3 of the default outbound peer limit.
275            download_concurrency_limit: 50,
276
277            // A few max-length checkpoints.
278            checkpoint_verify_concurrency_limit: DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT,
279
280            // This default is deliberately very low, so Zebra can verify a few large blocks in under 60 seconds,
281            // even on machines with only a few cores.
282            //
283            // This lets users see the committed block height changing in every progress log,
284            // and avoids hangs due to out-of-order verifications flooding the CPUs.
285            //
286            // TODO:
287            // - limit full verification concurrency based on block transaction counts?
288            // - move more disk work to blocking tokio threads,
289            //   and CPU work to the rayon thread pool inside blocking tokio threads
290            full_verify_concurrency_limit: 20,
291
292            // Use one thread per CPU.
293            //
294            // If this causes tokio executor starvation, move CPU-intensive tasks to rayon threads,
295            // or reserve a few cores for tokio threads, based on `num_cpus()`.
296            parallel_cpu_threads: 0,
297        }
298    }
299}
300
301/// Helps work around defects in the bitcoin protocol by checking whether
302/// the returned hashes actually extend a chain tip.
303#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
304struct CheckedTip {
305    tip: block::Hash,
306    expected_next: block::Hash,
307}
308
309pub struct ChainSync<ZN, ZS, ZV, ZSTip>
310where
311    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError>
312        + Send
313        + Sync
314        + Clone
315        + 'static,
316    ZN::Future: Send,
317    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError>
318        + Send
319        + Sync
320        + Clone
321        + 'static,
322    ZS::Future: Send,
323    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
324        + Send
325        + Sync
326        + Clone
327        + 'static,
328    ZV::Future: Send,
329    ZSTip: ChainTip + Clone + Send + 'static,
330{
331    // Configuration
332    //
333    /// The genesis hash for the configured network
334    genesis_hash: block::Hash,
335
336    /// The largest block height for the checkpoint verifier, based on the current config.
337    max_checkpoint_height: Height,
338
339    /// The configured checkpoint verification concurrency limit, after applying the minimum limit.
340    checkpoint_verify_concurrency_limit: usize,
341
342    /// The configured full verification concurrency limit, after applying the minimum limit.
343    full_verify_concurrency_limit: usize,
344
345    // Services
346    //
347    /// A network service which is used to perform ObtainTips and ExtendTips
348    /// requests.
349    ///
350    /// Has no retry logic, because failover is handled using fanout.
351    tip_network: Timeout<ZN>,
352
353    /// A service which downloads and verifies blocks, using the provided
354    /// network and verifier services.
355    downloads: Pin<
356        Box<
357            Downloads<
358                Hedge<ConcurrencyLimit<Retry<zn::RetryLimit, Timeout<ZN>>>, AlwaysHedge>,
359                Timeout<ZV>,
360                ZSTip,
361            >,
362        >,
363    >,
364
365    /// The cached block chain state.
366    state: ZS,
367
368    /// Allows efficient access to the best tip of the blockchain.
369    latest_chain_tip: ZSTip,
370
371    // Internal sync state
372    //
373    /// The tips that the syncer is currently following.
374    prospective_tips: HashSet<CheckedTip>,
375
376    /// The lengths of recent sync responses.
377    recent_syncs: RecentSyncLengths,
378
379    /// Receiver that is `true` when the downloader is past the lookahead limit.
380    /// This is based on the downloaded block height and the state tip height.
381    past_lookahead_limit_receiver: zs::WatchReceiver<bool>,
382
383    /// Sender for reporting peer addresses that advertised unexpectedly invalid transactions.
384    misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
385}
386
387/// Polls the network to determine whether further blocks are available and
388/// downloads them.
389///
390/// This component is used for initial block sync, but the `Inbound` service is
391/// responsible for participating in the gossip protocols used for block
392/// diffusion.
393impl<ZN, ZS, ZV, ZSTip> ChainSync<ZN, ZS, ZV, ZSTip>
394where
395    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError>
396        + Send
397        + Sync
398        + Clone
399        + 'static,
400    ZN::Future: Send,
401    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError>
402        + Send
403        + Sync
404        + Clone
405        + 'static,
406    ZS::Future: Send,
407    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
408        + Send
409        + Sync
410        + Clone
411        + 'static,
412    ZV::Future: Send,
413    ZSTip: ChainTip + Clone + Send + 'static,
414{
415    /// Returns a new syncer instance, using:
416    ///  - chain: the zebra-chain `Network` to download (Mainnet or Testnet)
417    ///  - peers: the zebra-network peers to contact for downloads
418    ///  - verifier: the zebra-consensus verifier that checks the chain
419    ///  - state: the zebra-state that stores the chain
420    ///  - latest_chain_tip: the latest chain tip from `state`
421    ///
422    /// Also returns a [`SyncStatus`] to check if the syncer has likely reached the chain tip.
423    pub fn new(
424        config: &ZebradConfig,
425        max_checkpoint_height: Height,
426        peers: ZN,
427        verifier: ZV,
428        state: ZS,
429        latest_chain_tip: ZSTip,
430        misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
431    ) -> (Self, SyncStatus) {
432        let mut download_concurrency_limit = config.sync.download_concurrency_limit;
433        let mut checkpoint_verify_concurrency_limit =
434            config.sync.checkpoint_verify_concurrency_limit;
435        let mut full_verify_concurrency_limit = config.sync.full_verify_concurrency_limit;
436
437        if download_concurrency_limit < MIN_CONCURRENCY_LIMIT {
438            warn!(
439                "configured download concurrency limit {} too low, increasing to {}",
440                config.sync.download_concurrency_limit, MIN_CONCURRENCY_LIMIT,
441            );
442
443            download_concurrency_limit = MIN_CONCURRENCY_LIMIT;
444        }
445
446        if checkpoint_verify_concurrency_limit < MIN_CHECKPOINT_CONCURRENCY_LIMIT {
447            warn!(
448                "configured checkpoint verify concurrency limit {} too low, increasing to {}",
449                config.sync.checkpoint_verify_concurrency_limit, MIN_CHECKPOINT_CONCURRENCY_LIMIT,
450            );
451
452            checkpoint_verify_concurrency_limit = MIN_CHECKPOINT_CONCURRENCY_LIMIT;
453        }
454
455        if full_verify_concurrency_limit < MIN_CONCURRENCY_LIMIT {
456            warn!(
457                "configured full verify concurrency limit {} too low, increasing to {}",
458                config.sync.full_verify_concurrency_limit, MIN_CONCURRENCY_LIMIT,
459            );
460
461            full_verify_concurrency_limit = MIN_CONCURRENCY_LIMIT;
462        }
463
464        let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT);
465
466        // The Hedge middleware is the outermost layer, hedging requests
467        // between two retry-wrapped networks.  The innermost timeout
468        // layer is relatively unimportant, because slow requests will
469        // probably be preemptively hedged.
470        //
471        // The Hedge goes outside the Retry, because the Retry layer
472        // abstracts away spurious failures from individual peers
473        // making a less-fallible network service, and the Hedge layer
474        // tries to reduce latency of that less-fallible service.
475        let block_network = Hedge::new(
476            ServiceBuilder::new()
477                .concurrency_limit(download_concurrency_limit)
478                .retry(zn::RetryLimit::new(BLOCK_DOWNLOAD_RETRY_LIMIT))
479                .timeout(BLOCK_DOWNLOAD_TIMEOUT)
480                .service(peers),
481            AlwaysHedge,
482            20,
483            0.95,
484            2 * SYNC_RESTART_DELAY,
485        );
486
487        // We apply a timeout to the verifier to avoid hangs due to missing earlier blocks.
488        let verifier = Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT);
489
490        let (sync_status, recent_syncs) = SyncStatus::new();
491
492        let (past_lookahead_limit_sender, past_lookahead_limit_receiver) = watch::channel(false);
493        let past_lookahead_limit_receiver = zs::WatchReceiver::new(past_lookahead_limit_receiver);
494
495        let downloads = Box::pin(Downloads::new(
496            block_network,
497            verifier,
498            latest_chain_tip.clone(),
499            past_lookahead_limit_sender,
500            max(
501                checkpoint_verify_concurrency_limit,
502                full_verify_concurrency_limit,
503            ),
504            max_checkpoint_height,
505        ));
506
507        let new_syncer = Self {
508            genesis_hash: config.network.network.genesis_hash(),
509            max_checkpoint_height,
510            checkpoint_verify_concurrency_limit,
511            full_verify_concurrency_limit,
512            tip_network,
513            downloads,
514            state,
515            latest_chain_tip,
516            prospective_tips: HashSet::new(),
517            recent_syncs,
518            past_lookahead_limit_receiver,
519            misbehavior_sender,
520        };
521
522        (new_syncer, sync_status)
523    }
524
525    /// Runs the syncer to synchronize the chain and keep it synchronized.
526    #[instrument(skip(self))]
527    pub async fn sync(mut self) -> Result<(), Report> {
528        // We can't download the genesis block using our normal algorithm,
529        // due to protocol limitations
530        self.request_genesis().await?;
531
532        loop {
533            if self.try_to_sync().await.is_err() {
534                self.downloads.cancel_all();
535            }
536
537            self.update_metrics();
538
539            info!(
540                timeout = ?SYNC_RESTART_DELAY,
541                state_tip = ?self.latest_chain_tip.best_tip_height(),
542                "waiting to restart sync"
543            );
544            sleep(SYNC_RESTART_DELAY).await;
545        }
546    }
547
548    /// Tries to synchronize the chain as far as it can.
549    ///
550    /// Obtains some prospective tips and iteratively tries to extend them and download the missing
551    /// blocks.
552    ///
553    /// Returns `Ok` if it was able to synchronize as much of the chain as it could, and then ran
554    /// out of prospective tips. This happens when synchronization finishes or if Zebra ended up
555    /// following a fork. Either way, Zebra should attempt to obtain some more tips.
556    ///
557    /// Returns `Err` if there was an unrecoverable error and restarting the synchronization is
558    /// necessary. This includes outer timeouts, where an entire syncing step takes an extremely
559    /// long time. (These usually indicate hangs.)
560    #[instrument(skip(self))]
561    async fn try_to_sync(&mut self) -> Result<(), Report> {
562        self.prospective_tips = HashSet::new();
563
564        info!(
565            state_tip = ?self.latest_chain_tip.best_tip_height(),
566            "starting sync, obtaining new tips"
567        );
568        let mut extra_hashes = timeout(SYNC_RESTART_DELAY, self.obtain_tips())
569            .await
570            .map_err(Into::into)
571            // TODO: replace with flatten() when it stabilises (#70142)
572            .and_then(convert::identity)
573            .map_err(|e| {
574                info!("temporary error obtaining tips: {:#}", e);
575                e
576            })?;
577        self.update_metrics();
578
579        while !self.prospective_tips.is_empty() || !extra_hashes.is_empty() {
580            // Avoid hangs due to service readiness or other internal operations
581            extra_hashes = timeout(BLOCK_VERIFY_TIMEOUT, self.try_to_sync_once(extra_hashes))
582                .await
583                .map_err(Into::into)
584                // TODO: replace with flatten() when it stabilises (#70142)
585                .and_then(convert::identity)?;
586        }
587
588        info!("exhausted prospective tip set");
589
590        Ok(())
591    }
592
593    /// Tries to synchronize the chain once, using the existing `extra_hashes`.
594    ///
595    /// Tries to extend the existing tips and download the missing blocks.
596    ///
597    /// Returns `Ok(extra_hashes)` if it was able to extend once and synchronize sone of the chain.
598    /// Returns `Err` if there was an unrecoverable error and restarting the synchronization is
599    /// necessary.
600    #[instrument(skip(self, extra_hashes))]
601    async fn try_to_sync_once(
602        &mut self,
603        mut extra_hashes: IndexSet<block::Hash>,
604    ) -> Result<IndexSet<block::Hash>, Report> {
605        // Check whether any block tasks are currently ready.
606        while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) {
607            // Some temporary errors are ignored, and syncing continues with other blocks.
608            // If it turns out they were actually important, syncing will run out of blocks, and
609            // the syncer will reset itself.
610            self.handle_block_response(rsp)?;
611        }
612        self.update_metrics();
613
614        // Pause new downloads while the syncer or downloader are past their lookahead limits.
615        //
616        // To avoid a deadlock or long waits for blocks to expire, we ignore the download
617        // lookahead limit when there are only a small number of blocks waiting.
618        while self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len())
619            || (self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) / 2
620                && self.past_lookahead_limit_receiver.cloned_watch_data())
621        {
622            trace!(
623                tips.len = self.prospective_tips.len(),
624                in_flight = self.downloads.in_flight(),
625                extra_hashes = extra_hashes.len(),
626                lookahead_limit = self.lookahead_limit(extra_hashes.len()),
627                state_tip = ?self.latest_chain_tip.best_tip_height(),
628                "waiting for pending blocks",
629            );
630
631            let response = self.downloads.next().await.expect("downloads is nonempty");
632
633            self.handle_block_response(response)?;
634            self.update_metrics();
635        }
636
637        // Once we're below the lookahead limit, we can request more blocks or hashes.
638        if !extra_hashes.is_empty() {
639            debug!(
640                tips.len = self.prospective_tips.len(),
641                in_flight = self.downloads.in_flight(),
642                extra_hashes = extra_hashes.len(),
643                lookahead_limit = self.lookahead_limit(extra_hashes.len()),
644                state_tip = ?self.latest_chain_tip.best_tip_height(),
645                "requesting more blocks",
646            );
647
648            let response = self.request_blocks(extra_hashes).await;
649            extra_hashes = Self::handle_hash_response(response)?;
650        } else {
651            info!(
652                tips.len = self.prospective_tips.len(),
653                in_flight = self.downloads.in_flight(),
654                extra_hashes = extra_hashes.len(),
655                lookahead_limit = self.lookahead_limit(extra_hashes.len()),
656                state_tip = ?self.latest_chain_tip.best_tip_height(),
657                "extending tips",
658            );
659
660            extra_hashes = self.extend_tips().await.map_err(|e| {
661                info!("temporary error extending tips: {:#}", e);
662                e
663            })?;
664        }
665        self.update_metrics();
666
667        Ok(extra_hashes)
668    }
669
670    /// Given a block_locator list fan out request for subsequent hashes to
671    /// multiple peers
672    #[instrument(skip(self))]
673    async fn obtain_tips(&mut self) -> Result<IndexSet<block::Hash>, Report> {
674        let block_locator = self
675            .state
676            .ready()
677            .await
678            .map_err(|e| eyre!(e))?
679            .call(zebra_state::Request::BlockLocator)
680            .await
681            .map(|response| match response {
682                zebra_state::Response::BlockLocator(block_locator) => block_locator,
683                _ => unreachable!(
684                    "GetBlockLocator request can only result in Response::BlockLocator"
685                ),
686            })
687            .map_err(|e| eyre!(e))?;
688
689        debug!(
690            tip = ?block_locator.first().expect("we have at least one block locator object"),
691            ?block_locator,
692            "got block locator and trying to obtain new chain tips"
693        );
694
695        let mut requests = FuturesUnordered::new();
696        for attempt in 0..FANOUT {
697            if attempt > 0 {
698                // Let other tasks run, so we're more likely to choose a different peer.
699                //
700                // TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
701                tokio::task::yield_now().await;
702            }
703
704            let ready_tip_network = self.tip_network.ready().await;
705            requests.push(tokio::spawn(ready_tip_network.map_err(|e| eyre!(e))?.call(
706                zn::Request::FindBlocks {
707                    known_blocks: block_locator.clone(),
708                    stop: None,
709                },
710            )));
711        }
712
713        let mut download_set = IndexSet::new();
714        while let Some(res) = requests.next().await {
715            match res
716                .unwrap_or_else(|e @ JoinError { .. }| {
717                    if e.is_panic() {
718                        panic!("panic in obtain tips task: {e:?}");
719                    } else {
720                        info!(
721                            "task error during obtain tips task: {e:?},\
722                     is Zebra shutting down?"
723                        );
724                        Err(e.into())
725                    }
726                })
727                .map_err::<Report, _>(|e| eyre!(e))
728            {
729                Ok(zn::Response::BlockHashes(hashes)) => {
730                    trace!(?hashes);
731
732                    // zcashd sometimes appends an unrelated hash at the start
733                    // or end of its response.
734                    //
735                    // We can't discard the first hash, because it might be a
736                    // block we want to download. So we just accept any
737                    // out-of-order first hashes.
738
739                    // We use the last hash for the tip, and we want to avoid bad
740                    // tips. So we discard the last hash. (We don't need to worry
741                    // about missed downloads, because we will pick them up again
742                    // in ExtendTips.)
743                    let hashes = match hashes.as_slice() {
744                        [] => continue,
745                        [rest @ .., _last] => rest,
746                    };
747
748                    let mut first_unknown = None;
749                    for (i, &hash) in hashes.iter().enumerate() {
750                        if !self.state_contains(hash).await? {
751                            first_unknown = Some(i);
752                            break;
753                        }
754                    }
755
756                    debug!(hashes.len = ?hashes.len(), ?first_unknown);
757
758                    let unknown_hashes = if let Some(index) = first_unknown {
759                        &hashes[index..]
760                    } else {
761                        continue;
762                    };
763
764                    trace!(?unknown_hashes);
765
766                    let new_tip = if let Some(end) = unknown_hashes.rchunks_exact(2).next() {
767                        CheckedTip {
768                            tip: end[0],
769                            expected_next: end[1],
770                        }
771                    } else {
772                        debug!("discarding response that extends only one block");
773                        continue;
774                    };
775
776                    // Make sure we get the same tips, regardless of the
777                    // order of peer responses
778                    if !download_set.contains(&new_tip.expected_next) {
779                        debug!(?new_tip,
780                                        "adding new prospective tip, and removing existing tips in the new block hash list");
781                        self.prospective_tips
782                            .retain(|t| !unknown_hashes.contains(&t.expected_next));
783                        self.prospective_tips.insert(new_tip);
784                    } else {
785                        debug!(
786                            ?new_tip,
787                            "discarding prospective tip: already in download set"
788                        );
789                    }
790
791                    // security: the first response determines our download order
792                    //
793                    // TODO: can we make the download order independent of response order?
794                    let prev_download_len = download_set.len();
795                    download_set.extend(unknown_hashes);
796                    let new_download_len = download_set.len();
797                    let new_hashes = new_download_len - prev_download_len;
798                    debug!(new_hashes, "added hashes to download set");
799                    metrics::histogram!("sync.obtain.response.hash.count")
800                        .record(new_hashes as f64);
801                }
802                Ok(_) => unreachable!("network returned wrong response"),
803                // We ignore this error because we made multiple fanout requests.
804                Err(e) => debug!(?e),
805            }
806        }
807
808        debug!(?self.prospective_tips);
809
810        // Check that the new tips we got are actually unknown.
811        for hash in &download_set {
812            debug!(?hash, "checking if state contains hash");
813            if self.state_contains(*hash).await? {
814                return Err(eyre!("queued download of hash behind our chain tip"));
815            }
816        }
817
818        let new_downloads = download_set.len();
819        debug!(new_downloads, "queueing new downloads");
820        metrics::gauge!("sync.obtain.queued.hash.count").set(new_downloads as f64);
821
822        // security: use the actual number of new downloads from all peers,
823        // so the last peer to respond can't toggle our mempool
824        self.recent_syncs.push_obtain_tips_length(new_downloads);
825
826        let response = self.request_blocks(download_set).await;
827
828        Self::handle_hash_response(response).map_err(Into::into)
829    }
830
831    #[instrument(skip(self))]
832    async fn extend_tips(&mut self) -> Result<IndexSet<block::Hash>, Report> {
833        let tips = std::mem::take(&mut self.prospective_tips);
834
835        let mut download_set = IndexSet::new();
836        debug!(tips = ?tips.len(), "trying to extend chain tips");
837        for tip in tips {
838            debug!(?tip, "asking peers to extend chain tip");
839            let mut responses = FuturesUnordered::new();
840            for attempt in 0..FANOUT {
841                if attempt > 0 {
842                    // Let other tasks run, so we're more likely to choose a different peer.
843                    //
844                    // TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
845                    tokio::task::yield_now().await;
846                }
847
848                let ready_tip_network = self.tip_network.ready().await;
849                responses.push(tokio::spawn(ready_tip_network.map_err(|e| eyre!(e))?.call(
850                    zn::Request::FindBlocks {
851                        known_blocks: vec![tip.tip],
852                        stop: None,
853                    },
854                )));
855            }
856            while let Some(res) = responses.next().await {
857                match res
858                    .expect("panic in spawned extend tips request")
859                    .map_err::<Report, _>(|e| eyre!(e))
860                {
861                    Ok(zn::Response::BlockHashes(hashes)) => {
862                        debug!(first = ?hashes.first(), len = ?hashes.len());
863                        trace!(?hashes);
864
865                        // zcashd sometimes appends an unrelated hash at the
866                        // start or end of its response. Check the first hash
867                        // against the previous response, and discard mismatches.
868                        let unknown_hashes = match hashes.as_slice() {
869                            [expected_hash, rest @ ..] if expected_hash == &tip.expected_next => {
870                                rest
871                            }
872                            // If the first hash doesn't match, retry with the second.
873                            [first_hash, expected_hash, rest @ ..]
874                                if expected_hash == &tip.expected_next =>
875                            {
876                                debug!(?first_hash,
877                                                ?tip.expected_next,
878                                                ?tip.tip,
879                                                "unexpected first hash, but the second matches: using the hashes after the match");
880                                rest
881                            }
882                            // We ignore these responses
883                            [] => continue,
884                            [single_hash] => {
885                                debug!(?single_hash,
886                                                ?tip.expected_next,
887                                                ?tip.tip,
888                                                "discarding response containing a single unexpected hash");
889                                continue;
890                            }
891                            [first_hash, second_hash, rest @ ..] => {
892                                debug!(?first_hash,
893                                                ?second_hash,
894                                                rest_len = ?rest.len(),
895                                                ?tip.expected_next,
896                                                ?tip.tip,
897                                                "discarding response that starts with two unexpected hashes");
898                                continue;
899                            }
900                        };
901
902                        // We use the last hash for the tip, and we want to avoid
903                        // bad tips. So we discard the last hash. (We don't need
904                        // to worry about missed downloads, because we will pick
905                        // them up again in the next ExtendTips.)
906                        let unknown_hashes = match unknown_hashes {
907                            [] => continue,
908                            [rest @ .., _last] => rest,
909                        };
910
911                        let new_tip = if let Some(end) = unknown_hashes.rchunks_exact(2).next() {
912                            CheckedTip {
913                                tip: end[0],
914                                expected_next: end[1],
915                            }
916                        } else {
917                            debug!("discarding response that extends only one block");
918                            continue;
919                        };
920
921                        trace!(?unknown_hashes);
922
923                        // Make sure we get the same tips, regardless of the
924                        // order of peer responses
925                        if !download_set.contains(&new_tip.expected_next) {
926                            debug!(?new_tip,
927                                            "adding new prospective tip, and removing any existing tips in the new block hash list");
928                            self.prospective_tips
929                                .retain(|t| !unknown_hashes.contains(&t.expected_next));
930                            self.prospective_tips.insert(new_tip);
931                        } else {
932                            debug!(
933                                ?new_tip,
934                                "discarding prospective tip: already in download set"
935                            );
936                        }
937
938                        // security: the first response determines our download order
939                        //
940                        // TODO: can we make the download order independent of response order?
941                        let prev_download_len = download_set.len();
942                        download_set.extend(unknown_hashes);
943                        let new_download_len = download_set.len();
944                        let new_hashes = new_download_len - prev_download_len;
945                        debug!(new_hashes, "added hashes to download set");
946                        metrics::histogram!("sync.extend.response.hash.count")
947                            .record(new_hashes as f64);
948                    }
949                    Ok(_) => unreachable!("network returned wrong response"),
950                    // We ignore this error because we made multiple fanout requests.
951                    Err(e) => debug!(?e),
952                }
953            }
954        }
955
956        let new_downloads = download_set.len();
957        debug!(new_downloads, "queueing new downloads");
958        metrics::gauge!("sync.extend.queued.hash.count").set(new_downloads as f64);
959
960        // security: use the actual number of new downloads from all peers,
961        // so the last peer to respond can't toggle our mempool
962        self.recent_syncs.push_extend_tips_length(new_downloads);
963
964        let response = self.request_blocks(download_set).await;
965
966        Self::handle_hash_response(response).map_err(Into::into)
967    }
968
969    /// Download and verify the genesis block, if it isn't currently known to
970    /// our node.
971    async fn request_genesis(&mut self) -> Result<(), Report> {
972        // Due to Bitcoin protocol limitations, we can't request the genesis
973        // block using our standard tip-following algorithm:
974        //  - getblocks requires at least one hash
975        //  - responses start with the block *after* the requested block, and
976        //  - the genesis hash is used as a placeholder for "no matches".
977        //
978        // So we just download and verify the genesis block here.
979        while !self.state_contains(self.genesis_hash).await? {
980            info!("starting genesis block download and verify");
981
982            let response = timeout(SYNC_RESTART_DELAY, self.request_genesis_once())
983                .await
984                .map_err(Into::into);
985
986            // 3 layers of results is not ideal, but we need the timeout on the outside.
987            match response {
988                Ok(Ok(Ok(response))) => self
989                    .handle_block_response(Ok(response))
990                    .expect("never returns Err for Ok"),
991                // Handle fatal errors
992                Ok(Err(fatal_error)) => Err(fatal_error)?,
993                // Handle timeouts and block errors
994                Err(error) | Ok(Ok(Err(error))) => {
995                    // TODO: exit syncer on permanent service errors (NetworkError, VerifierError)
996                    if Self::should_restart_sync(&error) {
997                        warn!(
998                            ?error,
999                            "could not download or verify genesis block, retrying"
1000                        );
1001                    } else {
1002                        info!(
1003                            ?error,
1004                            "temporary error downloading or verifying genesis block, retrying"
1005                        );
1006                    }
1007
1008                    tokio::time::sleep(GENESIS_TIMEOUT_RETRY).await;
1009                }
1010            }
1011        }
1012
1013        Ok(())
1014    }
1015
1016    /// Try to download and verify the genesis block once.
1017    ///
1018    /// Fatal errors are returned in the outer result, temporary errors in the inner one.
1019    async fn request_genesis_once(
1020        &mut self,
1021    ) -> Result<Result<(Height, block::Hash), BlockDownloadVerifyError>, Report> {
1022        let response = self.downloads.download_and_verify(self.genesis_hash).await;
1023        Self::handle_response(response).map_err(|e| eyre!(e))?;
1024
1025        let response = self.downloads.next().await.expect("downloads is nonempty");
1026
1027        Ok(response)
1028    }
1029
1030    /// Queue download and verify tasks for each block that isn't currently known to our node.
1031    ///
1032    /// TODO: turn obtain and extend tips into a separate task, which sends hashes via a channel?
1033    async fn request_blocks(
1034        &mut self,
1035        mut hashes: IndexSet<block::Hash>,
1036    ) -> Result<IndexSet<block::Hash>, BlockDownloadVerifyError> {
1037        let lookahead_limit = self.lookahead_limit(hashes.len());
1038
1039        debug!(
1040            hashes.len = hashes.len(),
1041            ?lookahead_limit,
1042            "requesting blocks",
1043        );
1044
1045        let extra_hashes = if hashes.len() > lookahead_limit {
1046            hashes.split_off(lookahead_limit)
1047        } else {
1048            IndexSet::new()
1049        };
1050
1051        for hash in hashes.into_iter() {
1052            self.downloads.download_and_verify(hash).await?;
1053        }
1054
1055        Ok(extra_hashes)
1056    }
1057
1058    /// The configured lookahead limit, based on the currently verified height,
1059    /// and the number of hashes we haven't queued yet.
1060    fn lookahead_limit(&self, new_hashes: usize) -> usize {
1061        let max_checkpoint_height: usize = self
1062            .max_checkpoint_height
1063            .0
1064            .try_into()
1065            .expect("fits in usize");
1066
1067        // When the state is empty, we want to verify using checkpoints
1068        let verified_height: usize = self
1069            .latest_chain_tip
1070            .best_tip_height()
1071            .unwrap_or(Height(0))
1072            .0
1073            .try_into()
1074            .expect("fits in usize");
1075
1076        if verified_height >= max_checkpoint_height {
1077            self.full_verify_concurrency_limit
1078        } else if (verified_height + new_hashes) >= max_checkpoint_height {
1079            // If we're just about to start full verification, allow enough for the remaining checkpoint,
1080            // and also enough for a separate full verification lookahead.
1081            let checkpoint_hashes = verified_height + new_hashes - max_checkpoint_height;
1082
1083            self.full_verify_concurrency_limit + checkpoint_hashes
1084        } else {
1085            self.checkpoint_verify_concurrency_limit
1086        }
1087    }
1088
1089    /// Handles a response for a requested block.
1090    ///
1091    /// See [`Self::handle_response`] for more details.
1092    #[allow(unknown_lints)]
1093    fn handle_block_response(
1094        &mut self,
1095        response: Result<(Height, block::Hash), BlockDownloadVerifyError>,
1096    ) -> Result<(), BlockDownloadVerifyError> {
1097        match response {
1098            Ok((height, hash)) => {
1099                trace!(?height, ?hash, "verified and committed block to state");
1100
1101                return Ok(());
1102            }
1103
1104            Err(BlockDownloadVerifyError::Invalid {
1105                ref error,
1106                advertiser_addr: Some(advertiser_addr),
1107                ..
1108            }) if error.misbehavior_score() != 0 => {
1109                let _ = self
1110                    .misbehavior_sender
1111                    .try_send((advertiser_addr, error.misbehavior_score()));
1112            }
1113
1114            Err(_) => {}
1115        };
1116
1117        Self::handle_response(response)
1118    }
1119
1120    /// Handles a response to block hash submission, passing through any extra hashes.
1121    ///
1122    /// See [`Self::handle_response`] for more details.
1123    #[allow(unknown_lints)]
1124    fn handle_hash_response(
1125        response: Result<IndexSet<block::Hash>, BlockDownloadVerifyError>,
1126    ) -> Result<IndexSet<block::Hash>, BlockDownloadVerifyError> {
1127        match response {
1128            Ok(extra_hashes) => Ok(extra_hashes),
1129            Err(_) => Self::handle_response(response).map(|()| IndexSet::new()),
1130        }
1131    }
1132
1133    /// Handles a response to a syncer request.
1134    ///
1135    /// Returns `Ok` if the request was successful, or if an expected error occurred,
1136    /// so that the synchronization can continue normally.
1137    ///
1138    /// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart.
1139    #[allow(unknown_lints)]
1140    fn handle_response<T>(
1141        response: Result<T, BlockDownloadVerifyError>,
1142    ) -> Result<(), BlockDownloadVerifyError> {
1143        match response {
1144            Ok(_t) => Ok(()),
1145            Err(error) => {
1146                // TODO: exit syncer on permanent service errors (NetworkError, VerifierError)
1147                if Self::should_restart_sync(&error) {
1148                    Err(error)
1149                } else {
1150                    Ok(())
1151                }
1152            }
1153        }
1154    }
1155
1156    /// Returns `true` if the hash is present in the state, and `false`
1157    /// if the hash is not present in the state.
1158    pub(crate) async fn state_contains(&mut self, hash: block::Hash) -> Result<bool, Report> {
1159        match self
1160            .state
1161            .ready()
1162            .await
1163            .map_err(|e| eyre!(e))?
1164            .call(zebra_state::Request::KnownBlock(hash))
1165            .await
1166            .map_err(|e| eyre!(e))?
1167        {
1168            zs::Response::KnownBlock(loc) => Ok(loc.is_some()),
1169            _ => unreachable!("wrong response to known block request"),
1170        }
1171    }
1172
1173    fn update_metrics(&mut self) {
1174        metrics::gauge!("sync.prospective_tips.len",).set(self.prospective_tips.len() as f64);
1175        metrics::gauge!("sync.downloads.in_flight",).set(self.downloads.in_flight() as f64);
1176    }
1177
1178    /// Return if the sync should be restarted based on the given error
1179    /// from the block downloader and verifier stream.
1180    fn should_restart_sync(e: &BlockDownloadVerifyError) -> bool {
1181        match e {
1182            // Structural matches: downcasts
1183            BlockDownloadVerifyError::Invalid { error, .. } if error.is_duplicate_request() => {
1184                debug!(error = ?e, "block was already verified, possibly from a previous sync run, continuing");
1185                false
1186            }
1187
1188            // Structural matches: direct
1189            BlockDownloadVerifyError::CancelledDuringDownload { .. }
1190            | BlockDownloadVerifyError::CancelledDuringVerification { .. } => {
1191                debug!(error = ?e, "block verification was cancelled, continuing");
1192                false
1193            }
1194            BlockDownloadVerifyError::BehindTipHeightLimit { .. } => {
1195                debug!(
1196                    error = ?e,
1197                    "block height is behind the current state tip, \
1198                     assuming the syncer will eventually catch up to the state, continuing"
1199                );
1200                false
1201            }
1202            BlockDownloadVerifyError::DuplicateBlockQueuedForDownload { .. } => {
1203                debug!(
1204                    error = ?e,
1205                    "queued duplicate block hash for download, \
1206                     assuming the syncer will eventually resolve duplicates, continuing"
1207                );
1208                false
1209            }
1210
1211            // String matches
1212            //
1213            // We want to match VerifyChainError::Block(VerifyBlockError::Commit(ref source)),
1214            // but that type is boxed.
1215            // TODO:
1216            // - turn this check into a function on VerifyChainError, like is_duplicate_request()
1217            BlockDownloadVerifyError::Invalid { error, .. }
1218                if format!("{error:?}").contains("block is already committed to the state")
1219                    || format!("{error:?}")
1220                        .contains("block has already been sent to be committed to the state") =>
1221            {
1222                // TODO: improve this by checking the type (#2908)
1223                debug!(error = ?e, "block is already committed or pending a commit, possibly from a previous sync run, continuing");
1224                false
1225            }
1226            BlockDownloadVerifyError::DownloadFailed { ref error, .. }
1227                if format!("{error:?}").contains("NotFound") =>
1228            {
1229                // Covers these errors:
1230                // - NotFoundResponse
1231                // - NotFoundRegistry
1232                //
1233                // TODO: improve this by checking the type (#2908)
1234                //       restart after a certain number of NotFound errors?
1235                debug!(error = ?e, "block was not found, possibly from a peer that doesn't have the block yet, continuing");
1236                false
1237            }
1238
1239            _ => {
1240                // download_and_verify downcasts errors from the block verifier
1241                // into VerifyChainError, and puts the result inside one of the
1242                // BlockDownloadVerifyError enumerations. This downcast could
1243                // become incorrect e.g. after some refactoring, and it is difficult
1244                // to write a test to check it. The test below is a best-effort
1245                // attempt to catch if that happens and log it.
1246                //
1247                // TODO: add a proper test and remove this
1248                // https://github.com/ZcashFoundation/zebra/issues/2909
1249                let err_str = format!("{e:?}");
1250                if err_str.contains("AlreadyVerified")
1251                    || err_str.contains("AlreadyInChain")
1252                    || err_str.contains("block is already committed to the state")
1253                    || err_str.contains("block has already been sent to be committed to the state")
1254                    || err_str.contains("NotFound")
1255                {
1256                    error!(?e,
1257                        "a BlockDownloadVerifyError that should have been filtered out was detected, \
1258                        which possibly indicates a programming error in the downcast inside \
1259                        zebrad::components::sync::downloads::Downloads::download_and_verify"
1260                    )
1261                }
1262
1263                warn!(?e, "error downloading and verifying block");
1264                true
1265            }
1266        }
1267    }
1268}