zebrad/components/
sync.rs

1//! The syncer downloads and verifies large numbers of blocks from peers to Zebra.
2//!
3//! It is used when Zebra is a long way behind the current chain tip.
4
5use std::{cmp::max, collections::HashSet, convert, pin::Pin, task::Poll, time::Duration};
6
7use color_eyre::eyre::{eyre, Report};
8use futures::stream::{FuturesUnordered, StreamExt};
9use indexmap::IndexSet;
10use serde::{Deserialize, Serialize};
11use tokio::{
12    sync::{mpsc, watch},
13    task::JoinError,
14    time::{sleep, timeout},
15};
16use tower::{
17    builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout,
18    Service, ServiceExt,
19};
20
21use zebra_chain::{
22    block::{self, Height, HeightDiff},
23    chain_tip::ChainTip,
24};
25use zebra_consensus::ParameterCheckpoint as _;
26use zebra_network::{self as zn, PeerSocketAddr};
27use zebra_state as zs;
28
29use crate::{
30    components::sync::downloads::BlockDownloadVerifyError, config::ZebradConfig, BoxError,
31};
32
33mod downloads;
34pub mod end_of_support;
35mod gossip;
36mod progress;
37mod recent_sync_lengths;
38mod status;
39
40#[cfg(test)]
41mod tests;
42
43use downloads::{AlwaysHedge, Downloads};
44
45pub use downloads::VERIFICATION_PIPELINE_SCALING_MULTIPLIER;
46pub use gossip::{gossip_best_tip_block_hashes, BlockGossipError};
47pub use progress::show_block_chain_progress;
48pub use recent_sync_lengths::RecentSyncLengths;
49pub use status::SyncStatus;
50
51/// Controls the number of peers used for each ObtainTips and ExtendTips request.
52const FANOUT: usize = 3;
53
54/// Controls how many times we will retry each block download.
55///
56/// Failing block downloads is important because it defends against peers who
57/// feed us bad hashes. But spurious failures of valid blocks cause the syncer to
58/// restart from the previous checkpoint, potentially re-downloading blocks.
59///
60/// We also hedge requests, so we may retry up to twice this many times. Hedged
61/// retries may be concurrent, inner retries are sequential.
62const BLOCK_DOWNLOAD_RETRY_LIMIT: usize = 3;
63
64/// A lower bound on the user-specified checkpoint verification concurrency limit.
65///
66/// Set to the maximum checkpoint interval, so the pipeline holds around a checkpoint's
67/// worth of blocks.
68///
69/// ## Security
70///
71/// If a malicious node is chosen for an ObtainTips or ExtendTips request, it can
72/// provide up to 500 malicious block hashes. These block hashes will be
73/// distributed across all available peers. Assuming there are around 50 connected
74/// peers, the malicious node will receive approximately 10 of those block requests.
75///
76/// Malicious deserialized blocks can take up a large amount of RAM, see
77/// [`super::inbound::downloads::MAX_INBOUND_CONCURRENCY`] and #1880 for details.
78/// So we want to keep the lookahead limit reasonably small.
79///
80/// Once these malicious blocks start failing validation, the syncer will cancel all
81/// the pending download and verify tasks, drop all the blocks, and start a new
82/// ObtainTips with a new set of peers.
83pub const MIN_CHECKPOINT_CONCURRENCY_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP;
84
85/// The default for the user-specified lookahead limit.
86///
87/// See [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`] for details.
88pub const DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT: usize = MAX_TIPS_RESPONSE_HASH_COUNT * 2;
89
90/// A lower bound on the user-specified concurrency limit.
91///
92/// If the concurrency limit is 0, Zebra can't download or verify any blocks.
93pub const MIN_CONCURRENCY_LIMIT: usize = 1;
94
95/// The expected maximum number of hashes in an ObtainTips or ExtendTips response.
96///
97/// This is used to allow block heights that are slightly beyond the lookahead limit,
98/// but still limit the number of blocks in the pipeline between the downloader and
99/// the state.
100///
101/// See [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`] for details.
102pub const MAX_TIPS_RESPONSE_HASH_COUNT: usize = 500;
103
104/// Controls how long we wait for a tips response to return.
105///
106/// ## Correctness
107///
108/// If this timeout is removed (or set too high), the syncer will sometimes hang.
109///
110/// If this timeout is set too low, the syncer will sometimes get stuck in a
111/// failure loop.
112pub const TIPS_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);
113
114/// Controls how long we wait between gossiping successive blocks or transactions.
115///
116/// ## Correctness
117///
118/// If this timeout is set too high, blocks and transactions won't propagate through
119/// the network efficiently.
120///
121/// If this timeout is set too low, the peer set and remote peers can get overloaded.
122pub const PEER_GOSSIP_DELAY: Duration = Duration::from_secs(7);
123
124/// Controls how long we wait for a block download request to complete.
125///
126/// This timeout makes sure that the syncer doesn't hang when:
127///   - the lookahead queue is full, and
128///   - we are waiting for a request that is stuck.
129///
130/// See [`BLOCK_VERIFY_TIMEOUT`] for details.
131///
132/// ## Correctness
133///
134/// If this timeout is removed (or set too high), the syncer will sometimes hang.
135///
136/// If this timeout is set too low, the syncer will sometimes get stuck in a
137/// failure loop.
138///
139/// We set the timeout so that it requires under 1 Mbps bandwidth for a full 2 MB block.
140pub(super) const BLOCK_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(20);
141
142/// Controls how long we wait for a block verify request to complete.
143///
144/// This timeout makes sure that the syncer doesn't hang when:
145///  - the lookahead queue is full, and
146///  - all pending verifications:
147///    - are waiting on a missing download request,
148///    - are waiting on a download or verify request that has failed, but we have
149///      deliberately ignored the error,
150///    - are for blocks a long way ahead of the current tip, or
151///    - are for invalid blocks which will never verify, because they depend on
152///      missing blocks or transactions.
153///
154/// These conditions can happen during normal operation - they are not bugs.
155///
156/// This timeout also mitigates or hides the following kinds of bugs:
157///  - all pending verifications:
158///    - are waiting on a download or verify request that has failed, but we have
159///      accidentally dropped the error,
160///    - are waiting on a download request that has hung inside Zebra,
161///    - are on tokio threads that are waiting for blocked operations.
162///
163/// ## Correctness
164///
165/// If this timeout is removed (or set too high), the syncer will sometimes hang.
166///
167/// If this timeout is set too low, the syncer will sometimes get stuck in a
168/// failure loop.
169///
170/// We've observed spurious 15 minute timeouts when a lot of blocks are being committed to
171/// the state. But there are also some blocks that seem to hang entirely, and never return.
172///
173/// So we allow about half the spurious timeout, which might cause some re-downloads.
174pub(super) const BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(8 * 60);
175
176/// A shorter timeout used for the first few blocks after the final checkpoint.
177///
178/// This is a workaround for bug #5125, where the first fully validated blocks
179/// after the final checkpoint fail with a timeout, due to a UTXO race condition.
180const FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(2 * 60);
181
182/// The number of blocks after the final checkpoint that get the shorter timeout.
183///
184/// We've only seen this error on the first few blocks after the final checkpoint.
185const FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT: HeightDiff = 100;
186
187/// Controls how long we wait to restart syncing after finishing a sync run.
188///
189/// This delay should be long enough to:
190///   - allow zcashd peers to process pending requests. If the node only has a
191///     few peers, we want to clear as much peer state as possible. In
192///     particular, zcashd sends "next block range" hints, based on zcashd's
193///     internal model of our sync progress. But we want to discard these hints,
194///     so they don't get confused with ObtainTips and ExtendTips responses, and
195///   - allow in-progress downloads to time out.
196///
197/// This delay is particularly important on instances with slow or unreliable
198/// networks, and on testnet, which has a small number of slow peers.
199///
200/// Using a prime number makes sure that syncer fanouts don't synchronise with other crawls.
201///
202/// ## Correctness
203///
204/// If this delay is removed (or set too low), the syncer will
205/// sometimes get stuck in a failure loop, due to leftover downloads from
206/// previous sync runs.
207const SYNC_RESTART_DELAY: Duration = Duration::from_secs(67);
208
209/// Controls how long we wait to retry a failed attempt to download
210/// and verify the genesis block.
211///
212/// This timeout gives the crawler time to find better peers.
213///
214/// ## Security
215///
216/// If this timeout is removed (or set too low), Zebra will immediately retry
217/// to download and verify the genesis block from its peers. This can cause
218/// a denial of service on those peers.
219///
220/// If this timeout is too short, old or buggy nodes will keep making useless
221/// network requests. If there are a lot of them, it could overwhelm the network.
222const GENESIS_TIMEOUT_RETRY: Duration = Duration::from_secs(10);
223
224/// Sync configuration section.
225#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
226#[serde(deny_unknown_fields, default)]
227pub struct Config {
228    /// The number of parallel block download requests.
229    ///
230    /// This is set to a low value by default, to avoid task and
231    /// network contention. Increasing this value may improve
232    /// performance on machines with a fast network connection.
233    #[serde(alias = "max_concurrent_block_requests")]
234    pub download_concurrency_limit: usize,
235
236    /// The number of blocks submitted in parallel to the checkpoint verifier.
237    ///
238    /// Increasing this limit increases the buffer size, so it reduces
239    /// the impact of an individual block request failing. However, it
240    /// also increases memory and CPU usage if block validation stalls,
241    /// or there are some large blocks in the pipeline.
242    ///
243    /// The block size limit is 2MB, so in theory, this could represent multiple
244    /// gigabytes of data, if we downloaded arbitrary blocks. However,
245    /// because we randomly load balance outbound requests, and separate
246    /// block download from obtaining block hashes, an adversary would
247    /// have to control a significant fraction of our peers to lead us
248    /// astray.
249    ///
250    /// For reliable checkpoint syncing, Zebra enforces a
251    /// [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`].
252    ///
253    /// This is set to a high value by default, to avoid verification pipeline stalls.
254    /// Decreasing this value reduces RAM usage.
255    #[serde(alias = "lookahead_limit")]
256    pub checkpoint_verify_concurrency_limit: usize,
257
258    /// The number of blocks submitted in parallel to the full verifier.
259    ///
260    /// This is set to a low value by default, to avoid verification timeouts on large blocks.
261    /// Increasing this value may improve performance on machines with many cores.
262    pub full_verify_concurrency_limit: usize,
263
264    /// The number of threads used to verify signatures, proofs, and other CPU-intensive code.
265    ///
266    /// If the number of threads is not configured or zero, Zebra uses the number of logical cores.
267    /// If the number of logical cores can't be detected, Zebra uses one thread.
268    /// For details, see [the `rayon` documentation](https://docs.rs/rayon/latest/rayon/struct.ThreadPoolBuilder.html#method.num_threads).
269    pub parallel_cpu_threads: usize,
270}
271
272impl Default for Config {
273    fn default() -> Self {
274        Self {
275            // 2/3 of the default outbound peer limit.
276            download_concurrency_limit: 50,
277
278            // A few max-length checkpoints.
279            checkpoint_verify_concurrency_limit: DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT,
280
281            // This default is deliberately very low, so Zebra can verify a few large blocks in under 60 seconds,
282            // even on machines with only a few cores.
283            //
284            // This lets users see the committed block height changing in every progress log,
285            // and avoids hangs due to out-of-order verifications flooding the CPUs.
286            //
287            // TODO:
288            // - limit full verification concurrency based on block transaction counts?
289            // - move more disk work to blocking tokio threads,
290            //   and CPU work to the rayon thread pool inside blocking tokio threads
291            full_verify_concurrency_limit: 20,
292
293            // Use one thread per CPU.
294            //
295            // If this causes tokio executor starvation, move CPU-intensive tasks to rayon threads,
296            // or reserve a few cores for tokio threads, based on `num_cpus()`.
297            parallel_cpu_threads: 0,
298        }
299    }
300}
301
302/// Helps work around defects in the bitcoin protocol by checking whether
303/// the returned hashes actually extend a chain tip.
304#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
305struct CheckedTip {
306    tip: block::Hash,
307    expected_next: block::Hash,
308}
309
310pub struct ChainSync<ZN, ZS, ZV, ZSTip>
311where
312    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError>
313        + Send
314        + Sync
315        + Clone
316        + 'static,
317    ZN::Future: Send,
318    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError>
319        + Send
320        + Sync
321        + Clone
322        + 'static,
323    ZS::Future: Send,
324    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
325        + Send
326        + Sync
327        + Clone
328        + 'static,
329    ZV::Future: Send,
330    ZSTip: ChainTip + Clone + Send + 'static,
331{
332    // Configuration
333    //
334    /// The genesis hash for the configured network
335    genesis_hash: block::Hash,
336
337    /// The largest block height for the checkpoint verifier, based on the current config.
338    max_checkpoint_height: Height,
339
340    /// The configured checkpoint verification concurrency limit, after applying the minimum limit.
341    checkpoint_verify_concurrency_limit: usize,
342
343    /// The configured full verification concurrency limit, after applying the minimum limit.
344    full_verify_concurrency_limit: usize,
345
346    // Services
347    //
348    /// A network service which is used to perform ObtainTips and ExtendTips
349    /// requests.
350    ///
351    /// Has no retry logic, because failover is handled using fanout.
352    tip_network: Timeout<ZN>,
353
354    /// A service which downloads and verifies blocks, using the provided
355    /// network and verifier services.
356    downloads: Pin<
357        Box<
358            Downloads<
359                Hedge<ConcurrencyLimit<Retry<zn::RetryLimit, Timeout<ZN>>>, AlwaysHedge>,
360                Timeout<ZV>,
361                ZSTip,
362            >,
363        >,
364    >,
365
366    /// The cached block chain state.
367    state: ZS,
368
369    /// Allows efficient access to the best tip of the blockchain.
370    latest_chain_tip: ZSTip,
371
372    // Internal sync state
373    //
374    /// The tips that the syncer is currently following.
375    prospective_tips: HashSet<CheckedTip>,
376
377    /// The lengths of recent sync responses.
378    recent_syncs: RecentSyncLengths,
379
380    /// Receiver that is `true` when the downloader is past the lookahead limit.
381    /// This is based on the downloaded block height and the state tip height.
382    past_lookahead_limit_receiver: zs::WatchReceiver<bool>,
383
384    /// Sender for reporting peer addresses that advertised unexpectedly invalid transactions.
385    misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
386}
387
388/// Polls the network to determine whether further blocks are available and
389/// downloads them.
390///
391/// This component is used for initial block sync, but the `Inbound` service is
392/// responsible for participating in the gossip protocols used for block
393/// diffusion.
394impl<ZN, ZS, ZV, ZSTip> ChainSync<ZN, ZS, ZV, ZSTip>
395where
396    ZN: Service<zn::Request, Response = zn::Response, Error = BoxError>
397        + Send
398        + Sync
399        + Clone
400        + 'static,
401    ZN::Future: Send,
402    ZS: Service<zs::Request, Response = zs::Response, Error = BoxError>
403        + Send
404        + Sync
405        + Clone
406        + 'static,
407    ZS::Future: Send,
408    ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
409        + Send
410        + Sync
411        + Clone
412        + 'static,
413    ZV::Future: Send,
414    ZSTip: ChainTip + Clone + Send + 'static,
415{
416    /// Returns a new syncer instance, using:
417    ///  - chain: the zebra-chain `Network` to download (Mainnet or Testnet)
418    ///  - peers: the zebra-network peers to contact for downloads
419    ///  - verifier: the zebra-consensus verifier that checks the chain
420    ///  - state: the zebra-state that stores the chain
421    ///  - latest_chain_tip: the latest chain tip from `state`
422    ///
423    /// Also returns a [`SyncStatus`] to check if the syncer has likely reached the chain tip.
424    pub fn new(
425        config: &ZebradConfig,
426        max_checkpoint_height: Height,
427        peers: ZN,
428        verifier: ZV,
429        state: ZS,
430        latest_chain_tip: ZSTip,
431        misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
432    ) -> (Self, SyncStatus) {
433        let mut download_concurrency_limit = config.sync.download_concurrency_limit;
434        let mut checkpoint_verify_concurrency_limit =
435            config.sync.checkpoint_verify_concurrency_limit;
436        let mut full_verify_concurrency_limit = config.sync.full_verify_concurrency_limit;
437
438        if download_concurrency_limit < MIN_CONCURRENCY_LIMIT {
439            warn!(
440                "configured download concurrency limit {} too low, increasing to {}",
441                config.sync.download_concurrency_limit, MIN_CONCURRENCY_LIMIT,
442            );
443
444            download_concurrency_limit = MIN_CONCURRENCY_LIMIT;
445        }
446
447        if checkpoint_verify_concurrency_limit < MIN_CHECKPOINT_CONCURRENCY_LIMIT {
448            warn!(
449                "configured checkpoint verify concurrency limit {} too low, increasing to {}",
450                config.sync.checkpoint_verify_concurrency_limit, MIN_CHECKPOINT_CONCURRENCY_LIMIT,
451            );
452
453            checkpoint_verify_concurrency_limit = MIN_CHECKPOINT_CONCURRENCY_LIMIT;
454        }
455
456        if full_verify_concurrency_limit < MIN_CONCURRENCY_LIMIT {
457            warn!(
458                "configured full verify concurrency limit {} too low, increasing to {}",
459                config.sync.full_verify_concurrency_limit, MIN_CONCURRENCY_LIMIT,
460            );
461
462            full_verify_concurrency_limit = MIN_CONCURRENCY_LIMIT;
463        }
464
465        let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT);
466
467        // The Hedge middleware is the outermost layer, hedging requests
468        // between two retry-wrapped networks.  The innermost timeout
469        // layer is relatively unimportant, because slow requests will
470        // probably be preemptively hedged.
471        //
472        // The Hedge goes outside the Retry, because the Retry layer
473        // abstracts away spurious failures from individual peers
474        // making a less-fallible network service, and the Hedge layer
475        // tries to reduce latency of that less-fallible service.
476        let block_network = Hedge::new(
477            ServiceBuilder::new()
478                .concurrency_limit(download_concurrency_limit)
479                .retry(zn::RetryLimit::new(BLOCK_DOWNLOAD_RETRY_LIMIT))
480                .timeout(BLOCK_DOWNLOAD_TIMEOUT)
481                .service(peers),
482            AlwaysHedge,
483            20,
484            0.95,
485            2 * SYNC_RESTART_DELAY,
486        );
487
488        // We apply a timeout to the verifier to avoid hangs due to missing earlier blocks.
489        let verifier = Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT);
490
491        let (sync_status, recent_syncs) = SyncStatus::new();
492
493        let (past_lookahead_limit_sender, past_lookahead_limit_receiver) = watch::channel(false);
494        let past_lookahead_limit_receiver = zs::WatchReceiver::new(past_lookahead_limit_receiver);
495
496        let downloads = Box::pin(Downloads::new(
497            block_network,
498            verifier,
499            latest_chain_tip.clone(),
500            past_lookahead_limit_sender,
501            max(
502                checkpoint_verify_concurrency_limit,
503                full_verify_concurrency_limit,
504            ),
505            max_checkpoint_height,
506        ));
507
508        let new_syncer = Self {
509            genesis_hash: config.network.network.genesis_hash(),
510            max_checkpoint_height,
511            checkpoint_verify_concurrency_limit,
512            full_verify_concurrency_limit,
513            tip_network,
514            downloads,
515            state,
516            latest_chain_tip,
517            prospective_tips: HashSet::new(),
518            recent_syncs,
519            past_lookahead_limit_receiver,
520            misbehavior_sender,
521        };
522
523        (new_syncer, sync_status)
524    }
525
526    /// Runs the syncer to synchronize the chain and keep it synchronized.
527    #[instrument(skip(self))]
528    pub async fn sync(mut self) -> Result<(), Report> {
529        // We can't download the genesis block using our normal algorithm,
530        // due to protocol limitations
531        self.request_genesis().await?;
532
533        loop {
534            if self.try_to_sync().await.is_err() {
535                self.downloads.cancel_all();
536            }
537
538            self.update_metrics();
539
540            info!(
541                timeout = ?SYNC_RESTART_DELAY,
542                state_tip = ?self.latest_chain_tip.best_tip_height(),
543                "waiting to restart sync"
544            );
545            sleep(SYNC_RESTART_DELAY).await;
546        }
547    }
548
549    /// Tries to synchronize the chain as far as it can.
550    ///
551    /// Obtains some prospective tips and iteratively tries to extend them and download the missing
552    /// blocks.
553    ///
554    /// Returns `Ok` if it was able to synchronize as much of the chain as it could, and then ran
555    /// out of prospective tips. This happens when synchronization finishes or if Zebra ended up
556    /// following a fork. Either way, Zebra should attempt to obtain some more tips.
557    ///
558    /// Returns `Err` if there was an unrecoverable error and restarting the synchronization is
559    /// necessary. This includes outer timeouts, where an entire syncing step takes an extremely
560    /// long time. (These usually indicate hangs.)
561    #[instrument(skip(self))]
562    async fn try_to_sync(&mut self) -> Result<(), Report> {
563        self.prospective_tips = HashSet::new();
564
565        info!(
566            state_tip = ?self.latest_chain_tip.best_tip_height(),
567            "starting sync, obtaining new tips"
568        );
569        let mut extra_hashes = timeout(SYNC_RESTART_DELAY, self.obtain_tips())
570            .await
571            .map_err(Into::into)
572            // TODO: replace with flatten() when it stabilises (#70142)
573            .and_then(convert::identity)
574            .map_err(|e| {
575                info!("temporary error obtaining tips: {:#}", e);
576                e
577            })?;
578        self.update_metrics();
579
580        while !self.prospective_tips.is_empty() || !extra_hashes.is_empty() {
581            // Avoid hangs due to service readiness or other internal operations
582            extra_hashes = timeout(BLOCK_VERIFY_TIMEOUT, self.try_to_sync_once(extra_hashes))
583                .await
584                .map_err(Into::into)
585                // TODO: replace with flatten() when it stabilises (#70142)
586                .and_then(convert::identity)?;
587        }
588
589        info!("exhausted prospective tip set");
590
591        Ok(())
592    }
593
594    /// Tries to synchronize the chain once, using the existing `extra_hashes`.
595    ///
596    /// Tries to extend the existing tips and download the missing blocks.
597    ///
598    /// Returns `Ok(extra_hashes)` if it was able to extend once and synchronize sone of the chain.
599    /// Returns `Err` if there was an unrecoverable error and restarting the synchronization is
600    /// necessary.
601    #[instrument(skip(self))]
602    async fn try_to_sync_once(
603        &mut self,
604        mut extra_hashes: IndexSet<block::Hash>,
605    ) -> Result<IndexSet<block::Hash>, Report> {
606        // Check whether any block tasks are currently ready.
607        while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) {
608            // Some temporary errors are ignored, and syncing continues with other blocks.
609            // If it turns out they were actually important, syncing will run out of blocks, and
610            // the syncer will reset itself.
611            self.handle_block_response(rsp)?;
612        }
613        self.update_metrics();
614
615        // Pause new downloads while the syncer or downloader are past their lookahead limits.
616        //
617        // To avoid a deadlock or long waits for blocks to expire, we ignore the download
618        // lookahead limit when there are only a small number of blocks waiting.
619        while self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len())
620            || (self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) / 2
621                && self.past_lookahead_limit_receiver.cloned_watch_data())
622        {
623            trace!(
624                tips.len = self.prospective_tips.len(),
625                in_flight = self.downloads.in_flight(),
626                extra_hashes = extra_hashes.len(),
627                lookahead_limit = self.lookahead_limit(extra_hashes.len()),
628                state_tip = ?self.latest_chain_tip.best_tip_height(),
629                "waiting for pending blocks",
630            );
631
632            let response = self.downloads.next().await.expect("downloads is nonempty");
633
634            self.handle_block_response(response)?;
635            self.update_metrics();
636        }
637
638        // Once we're below the lookahead limit, we can request more blocks or hashes.
639        if !extra_hashes.is_empty() {
640            debug!(
641                tips.len = self.prospective_tips.len(),
642                in_flight = self.downloads.in_flight(),
643                extra_hashes = extra_hashes.len(),
644                lookahead_limit = self.lookahead_limit(extra_hashes.len()),
645                state_tip = ?self.latest_chain_tip.best_tip_height(),
646                "requesting more blocks",
647            );
648
649            let response = self.request_blocks(extra_hashes).await;
650            extra_hashes = Self::handle_hash_response(response)?;
651        } else {
652            info!(
653                tips.len = self.prospective_tips.len(),
654                in_flight = self.downloads.in_flight(),
655                extra_hashes = extra_hashes.len(),
656                lookahead_limit = self.lookahead_limit(extra_hashes.len()),
657                state_tip = ?self.latest_chain_tip.best_tip_height(),
658                "extending tips",
659            );
660
661            extra_hashes = self.extend_tips().await.map_err(|e| {
662                info!("temporary error extending tips: {:#}", e);
663                e
664            })?;
665        }
666        self.update_metrics();
667
668        Ok(extra_hashes)
669    }
670
671    /// Given a block_locator list fan out request for subsequent hashes to
672    /// multiple peers
673    #[instrument(skip(self))]
674    async fn obtain_tips(&mut self) -> Result<IndexSet<block::Hash>, Report> {
675        let block_locator = self
676            .state
677            .ready()
678            .await
679            .map_err(|e| eyre!(e))?
680            .call(zebra_state::Request::BlockLocator)
681            .await
682            .map(|response| match response {
683                zebra_state::Response::BlockLocator(block_locator) => block_locator,
684                _ => unreachable!(
685                    "GetBlockLocator request can only result in Response::BlockLocator"
686                ),
687            })
688            .map_err(|e| eyre!(e))?;
689
690        debug!(
691            tip = ?block_locator.first().expect("we have at least one block locator object"),
692            ?block_locator,
693            "got block locator and trying to obtain new chain tips"
694        );
695
696        let mut requests = FuturesUnordered::new();
697        for attempt in 0..FANOUT {
698            if attempt > 0 {
699                // Let other tasks run, so we're more likely to choose a different peer.
700                //
701                // TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
702                tokio::task::yield_now().await;
703            }
704
705            let ready_tip_network = self.tip_network.ready().await;
706            requests.push(tokio::spawn(ready_tip_network.map_err(|e| eyre!(e))?.call(
707                zn::Request::FindBlocks {
708                    known_blocks: block_locator.clone(),
709                    stop: None,
710                },
711            )));
712        }
713
714        let mut download_set = IndexSet::new();
715        while let Some(res) = requests.next().await {
716            match res
717                .unwrap_or_else(|e @ JoinError { .. }| {
718                    if e.is_panic() {
719                        panic!("panic in obtain tips task: {e:?}");
720                    } else {
721                        info!(
722                            "task error during obtain tips task: {e:?},\
723                     is Zebra shutting down?"
724                        );
725                        Err(e.into())
726                    }
727                })
728                .map_err::<Report, _>(|e| eyre!(e))
729            {
730                Ok(zn::Response::BlockHashes(hashes)) => {
731                    trace!(?hashes);
732
733                    // zcashd sometimes appends an unrelated hash at the start
734                    // or end of its response.
735                    //
736                    // We can't discard the first hash, because it might be a
737                    // block we want to download. So we just accept any
738                    // out-of-order first hashes.
739
740                    // We use the last hash for the tip, and we want to avoid bad
741                    // tips. So we discard the last hash. (We don't need to worry
742                    // about missed downloads, because we will pick them up again
743                    // in ExtendTips.)
744                    let hashes = match hashes.as_slice() {
745                        [] => continue,
746                        [rest @ .., _last] => rest,
747                    };
748
749                    let mut first_unknown = None;
750                    for (i, &hash) in hashes.iter().enumerate() {
751                        if !self.state_contains(hash).await? {
752                            first_unknown = Some(i);
753                            break;
754                        }
755                    }
756
757                    debug!(hashes.len = ?hashes.len(), ?first_unknown);
758
759                    let unknown_hashes = if let Some(index) = first_unknown {
760                        &hashes[index..]
761                    } else {
762                        continue;
763                    };
764
765                    trace!(?unknown_hashes);
766
767                    let new_tip = if let Some(end) = unknown_hashes.rchunks_exact(2).next() {
768                        CheckedTip {
769                            tip: end[0],
770                            expected_next: end[1],
771                        }
772                    } else {
773                        debug!("discarding response that extends only one block");
774                        continue;
775                    };
776
777                    // Make sure we get the same tips, regardless of the
778                    // order of peer responses
779                    if !download_set.contains(&new_tip.expected_next) {
780                        debug!(?new_tip,
781                                        "adding new prospective tip, and removing existing tips in the new block hash list");
782                        self.prospective_tips
783                            .retain(|t| !unknown_hashes.contains(&t.expected_next));
784                        self.prospective_tips.insert(new_tip);
785                    } else {
786                        debug!(
787                            ?new_tip,
788                            "discarding prospective tip: already in download set"
789                        );
790                    }
791
792                    // security: the first response determines our download order
793                    //
794                    // TODO: can we make the download order independent of response order?
795                    let prev_download_len = download_set.len();
796                    download_set.extend(unknown_hashes);
797                    let new_download_len = download_set.len();
798                    let new_hashes = new_download_len - prev_download_len;
799                    debug!(new_hashes, "added hashes to download set");
800                    metrics::histogram!("sync.obtain.response.hash.count")
801                        .record(new_hashes as f64);
802                }
803                Ok(_) => unreachable!("network returned wrong response"),
804                // We ignore this error because we made multiple fanout requests.
805                Err(e) => debug!(?e),
806            }
807        }
808
809        debug!(?self.prospective_tips);
810
811        // Check that the new tips we got are actually unknown.
812        for hash in &download_set {
813            debug!(?hash, "checking if state contains hash");
814            if self.state_contains(*hash).await? {
815                return Err(eyre!("queued download of hash behind our chain tip"));
816            }
817        }
818
819        let new_downloads = download_set.len();
820        debug!(new_downloads, "queueing new downloads");
821        metrics::gauge!("sync.obtain.queued.hash.count").set(new_downloads as f64);
822
823        // security: use the actual number of new downloads from all peers,
824        // so the last peer to respond can't toggle our mempool
825        self.recent_syncs.push_obtain_tips_length(new_downloads);
826
827        let response = self.request_blocks(download_set).await;
828
829        Self::handle_hash_response(response).map_err(Into::into)
830    }
831
832    #[instrument(skip(self))]
833    async fn extend_tips(&mut self) -> Result<IndexSet<block::Hash>, Report> {
834        let tips = std::mem::take(&mut self.prospective_tips);
835
836        let mut download_set = IndexSet::new();
837        debug!(tips = ?tips.len(), "trying to extend chain tips");
838        for tip in tips {
839            debug!(?tip, "asking peers to extend chain tip");
840            let mut responses = FuturesUnordered::new();
841            for attempt in 0..FANOUT {
842                if attempt > 0 {
843                    // Let other tasks run, so we're more likely to choose a different peer.
844                    //
845                    // TODO: move fanouts into the PeerSet, so we always choose different peers (#2214)
846                    tokio::task::yield_now().await;
847                }
848
849                let ready_tip_network = self.tip_network.ready().await;
850                responses.push(tokio::spawn(ready_tip_network.map_err(|e| eyre!(e))?.call(
851                    zn::Request::FindBlocks {
852                        known_blocks: vec![tip.tip],
853                        stop: None,
854                    },
855                )));
856            }
857            while let Some(res) = responses.next().await {
858                match res
859                    .expect("panic in spawned extend tips request")
860                    .map_err::<Report, _>(|e| eyre!(e))
861                {
862                    Ok(zn::Response::BlockHashes(hashes)) => {
863                        debug!(first = ?hashes.first(), len = ?hashes.len());
864                        trace!(?hashes);
865
866                        // zcashd sometimes appends an unrelated hash at the
867                        // start or end of its response. Check the first hash
868                        // against the previous response, and discard mismatches.
869                        let unknown_hashes = match hashes.as_slice() {
870                            [expected_hash, rest @ ..] if expected_hash == &tip.expected_next => {
871                                rest
872                            }
873                            // If the first hash doesn't match, retry with the second.
874                            [first_hash, expected_hash, rest @ ..]
875                                if expected_hash == &tip.expected_next =>
876                            {
877                                debug!(?first_hash,
878                                                ?tip.expected_next,
879                                                ?tip.tip,
880                                                "unexpected first hash, but the second matches: using the hashes after the match");
881                                rest
882                            }
883                            // We ignore these responses
884                            [] => continue,
885                            [single_hash] => {
886                                debug!(?single_hash,
887                                                ?tip.expected_next,
888                                                ?tip.tip,
889                                                "discarding response containing a single unexpected hash");
890                                continue;
891                            }
892                            [first_hash, second_hash, rest @ ..] => {
893                                debug!(?first_hash,
894                                                ?second_hash,
895                                                rest_len = ?rest.len(),
896                                                ?tip.expected_next,
897                                                ?tip.tip,
898                                                "discarding response that starts with two unexpected hashes");
899                                continue;
900                            }
901                        };
902
903                        // We use the last hash for the tip, and we want to avoid
904                        // bad tips. So we discard the last hash. (We don't need
905                        // to worry about missed downloads, because we will pick
906                        // them up again in the next ExtendTips.)
907                        let unknown_hashes = match unknown_hashes {
908                            [] => continue,
909                            [rest @ .., _last] => rest,
910                        };
911
912                        let new_tip = if let Some(end) = unknown_hashes.rchunks_exact(2).next() {
913                            CheckedTip {
914                                tip: end[0],
915                                expected_next: end[1],
916                            }
917                        } else {
918                            debug!("discarding response that extends only one block");
919                            continue;
920                        };
921
922                        trace!(?unknown_hashes);
923
924                        // Make sure we get the same tips, regardless of the
925                        // order of peer responses
926                        if !download_set.contains(&new_tip.expected_next) {
927                            debug!(?new_tip,
928                                            "adding new prospective tip, and removing any existing tips in the new block hash list");
929                            self.prospective_tips
930                                .retain(|t| !unknown_hashes.contains(&t.expected_next));
931                            self.prospective_tips.insert(new_tip);
932                        } else {
933                            debug!(
934                                ?new_tip,
935                                "discarding prospective tip: already in download set"
936                            );
937                        }
938
939                        // security: the first response determines our download order
940                        //
941                        // TODO: can we make the download order independent of response order?
942                        let prev_download_len = download_set.len();
943                        download_set.extend(unknown_hashes);
944                        let new_download_len = download_set.len();
945                        let new_hashes = new_download_len - prev_download_len;
946                        debug!(new_hashes, "added hashes to download set");
947                        metrics::histogram!("sync.extend.response.hash.count")
948                            .record(new_hashes as f64);
949                    }
950                    Ok(_) => unreachable!("network returned wrong response"),
951                    // We ignore this error because we made multiple fanout requests.
952                    Err(e) => debug!(?e),
953                }
954            }
955        }
956
957        let new_downloads = download_set.len();
958        debug!(new_downloads, "queueing new downloads");
959        metrics::gauge!("sync.extend.queued.hash.count").set(new_downloads as f64);
960
961        // security: use the actual number of new downloads from all peers,
962        // so the last peer to respond can't toggle our mempool
963        self.recent_syncs.push_extend_tips_length(new_downloads);
964
965        let response = self.request_blocks(download_set).await;
966
967        Self::handle_hash_response(response).map_err(Into::into)
968    }
969
970    /// Download and verify the genesis block, if it isn't currently known to
971    /// our node.
972    async fn request_genesis(&mut self) -> Result<(), Report> {
973        // Due to Bitcoin protocol limitations, we can't request the genesis
974        // block using our standard tip-following algorithm:
975        //  - getblocks requires at least one hash
976        //  - responses start with the block *after* the requested block, and
977        //  - the genesis hash is used as a placeholder for "no matches".
978        //
979        // So we just download and verify the genesis block here.
980        while !self.state_contains(self.genesis_hash).await? {
981            info!("starting genesis block download and verify");
982
983            let response = timeout(SYNC_RESTART_DELAY, self.request_genesis_once())
984                .await
985                .map_err(Into::into);
986
987            // 3 layers of results is not ideal, but we need the timeout on the outside.
988            match response {
989                Ok(Ok(Ok(response))) => self
990                    .handle_block_response(Ok(response))
991                    .expect("never returns Err for Ok"),
992                // Handle fatal errors
993                Ok(Err(fatal_error)) => Err(fatal_error)?,
994                // Handle timeouts and block errors
995                Err(error) | Ok(Ok(Err(error))) => {
996                    // TODO: exit syncer on permanent service errors (NetworkError, VerifierError)
997                    if Self::should_restart_sync(&error) {
998                        warn!(
999                            ?error,
1000                            "could not download or verify genesis block, retrying"
1001                        );
1002                    } else {
1003                        info!(
1004                            ?error,
1005                            "temporary error downloading or verifying genesis block, retrying"
1006                        );
1007                    }
1008
1009                    tokio::time::sleep(GENESIS_TIMEOUT_RETRY).await;
1010                }
1011            }
1012        }
1013
1014        Ok(())
1015    }
1016
1017    /// Try to download and verify the genesis block once.
1018    ///
1019    /// Fatal errors are returned in the outer result, temporary errors in the inner one.
1020    async fn request_genesis_once(
1021        &mut self,
1022    ) -> Result<Result<(Height, block::Hash), BlockDownloadVerifyError>, Report> {
1023        let response = self.downloads.download_and_verify(self.genesis_hash).await;
1024        Self::handle_response(response).map_err(|e| eyre!(e))?;
1025
1026        let response = self.downloads.next().await.expect("downloads is nonempty");
1027
1028        Ok(response)
1029    }
1030
1031    /// Queue download and verify tasks for each block that isn't currently known to our node.
1032    ///
1033    /// TODO: turn obtain and extend tips into a separate task, which sends hashes via a channel?
1034    async fn request_blocks(
1035        &mut self,
1036        mut hashes: IndexSet<block::Hash>,
1037    ) -> Result<IndexSet<block::Hash>, BlockDownloadVerifyError> {
1038        let lookahead_limit = self.lookahead_limit(hashes.len());
1039
1040        debug!(
1041            hashes.len = hashes.len(),
1042            ?lookahead_limit,
1043            "requesting blocks",
1044        );
1045
1046        let extra_hashes = if hashes.len() > lookahead_limit {
1047            hashes.split_off(lookahead_limit)
1048        } else {
1049            IndexSet::new()
1050        };
1051
1052        for hash in hashes.into_iter() {
1053            self.downloads.download_and_verify(hash).await?;
1054        }
1055
1056        Ok(extra_hashes)
1057    }
1058
1059    /// The configured lookahead limit, based on the currently verified height,
1060    /// and the number of hashes we haven't queued yet.
1061    fn lookahead_limit(&self, new_hashes: usize) -> usize {
1062        let max_checkpoint_height: usize = self
1063            .max_checkpoint_height
1064            .0
1065            .try_into()
1066            .expect("fits in usize");
1067
1068        // When the state is empty, we want to verify using checkpoints
1069        let verified_height: usize = self
1070            .latest_chain_tip
1071            .best_tip_height()
1072            .unwrap_or(Height(0))
1073            .0
1074            .try_into()
1075            .expect("fits in usize");
1076
1077        if verified_height >= max_checkpoint_height {
1078            self.full_verify_concurrency_limit
1079        } else if (verified_height + new_hashes) >= max_checkpoint_height {
1080            // If we're just about to start full verification, allow enough for the remaining checkpoint,
1081            // and also enough for a separate full verification lookahead.
1082            let checkpoint_hashes = verified_height + new_hashes - max_checkpoint_height;
1083
1084            self.full_verify_concurrency_limit + checkpoint_hashes
1085        } else {
1086            self.checkpoint_verify_concurrency_limit
1087        }
1088    }
1089
1090    /// Handles a response for a requested block.
1091    ///
1092    /// See [`Self::handle_response`] for more details.
1093    #[allow(unknown_lints)]
1094    fn handle_block_response(
1095        &mut self,
1096        response: Result<(Height, block::Hash), BlockDownloadVerifyError>,
1097    ) -> Result<(), BlockDownloadVerifyError> {
1098        match response {
1099            Ok((height, hash)) => {
1100                trace!(?height, ?hash, "verified and committed block to state");
1101
1102                return Ok(());
1103            }
1104
1105            Err(BlockDownloadVerifyError::Invalid {
1106                ref error,
1107                advertiser_addr: Some(advertiser_addr),
1108                ..
1109            }) if error.misbehavior_score() != 0 => {
1110                let _ = self
1111                    .misbehavior_sender
1112                    .try_send((advertiser_addr, error.misbehavior_score()));
1113            }
1114
1115            Err(_) => {}
1116        };
1117
1118        Self::handle_response(response)
1119    }
1120
1121    /// Handles a response to block hash submission, passing through any extra hashes.
1122    ///
1123    /// See [`Self::handle_response`] for more details.
1124    #[allow(unknown_lints)]
1125    fn handle_hash_response(
1126        response: Result<IndexSet<block::Hash>, BlockDownloadVerifyError>,
1127    ) -> Result<IndexSet<block::Hash>, BlockDownloadVerifyError> {
1128        match response {
1129            Ok(extra_hashes) => Ok(extra_hashes),
1130            Err(_) => Self::handle_response(response).map(|()| IndexSet::new()),
1131        }
1132    }
1133
1134    /// Handles a response to a syncer request.
1135    ///
1136    /// Returns `Ok` if the request was successful, or if an expected error occurred,
1137    /// so that the synchronization can continue normally.
1138    ///
1139    /// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart.
1140    #[allow(unknown_lints)]
1141    fn handle_response<T>(
1142        response: Result<T, BlockDownloadVerifyError>,
1143    ) -> Result<(), BlockDownloadVerifyError> {
1144        match response {
1145            Ok(_t) => Ok(()),
1146            Err(error) => {
1147                // TODO: exit syncer on permanent service errors (NetworkError, VerifierError)
1148                if Self::should_restart_sync(&error) {
1149                    Err(error)
1150                } else {
1151                    Ok(())
1152                }
1153            }
1154        }
1155    }
1156
1157    /// Returns `true` if the hash is present in the state, and `false`
1158    /// if the hash is not present in the state.
1159    pub(crate) async fn state_contains(&mut self, hash: block::Hash) -> Result<bool, Report> {
1160        match self
1161            .state
1162            .ready()
1163            .await
1164            .map_err(|e| eyre!(e))?
1165            .call(zebra_state::Request::KnownBlock(hash))
1166            .await
1167            .map_err(|e| eyre!(e))?
1168        {
1169            zs::Response::KnownBlock(loc) => Ok(loc.is_some()),
1170            _ => unreachable!("wrong response to known block request"),
1171        }
1172    }
1173
1174    fn update_metrics(&mut self) {
1175        metrics::gauge!("sync.prospective_tips.len",).set(self.prospective_tips.len() as f64);
1176        metrics::gauge!("sync.downloads.in_flight",).set(self.downloads.in_flight() as f64);
1177    }
1178
1179    /// Return if the sync should be restarted based on the given error
1180    /// from the block downloader and verifier stream.
1181    fn should_restart_sync(e: &BlockDownloadVerifyError) -> bool {
1182        match e {
1183            // Structural matches: downcasts
1184            BlockDownloadVerifyError::Invalid { error, .. } if error.is_duplicate_request() => {
1185                debug!(error = ?e, "block was already verified, possibly from a previous sync run, continuing");
1186                false
1187            }
1188
1189            // Structural matches: direct
1190            BlockDownloadVerifyError::CancelledDuringDownload { .. }
1191            | BlockDownloadVerifyError::CancelledDuringVerification { .. } => {
1192                debug!(error = ?e, "block verification was cancelled, continuing");
1193                false
1194            }
1195            BlockDownloadVerifyError::BehindTipHeightLimit { .. } => {
1196                debug!(
1197                    error = ?e,
1198                    "block height is behind the current state tip, \
1199                     assuming the syncer will eventually catch up to the state, continuing"
1200                );
1201                false
1202            }
1203            BlockDownloadVerifyError::DuplicateBlockQueuedForDownload { .. } => {
1204                debug!(
1205                    error = ?e,
1206                    "queued duplicate block hash for download, \
1207                     assuming the syncer will eventually resolve duplicates, continuing"
1208                );
1209                false
1210            }
1211
1212            // String matches
1213            //
1214            // We want to match VerifyChainError::Block(VerifyBlockError::Commit(ref source)),
1215            // but that type is boxed.
1216            // TODO:
1217            // - turn this check into a function on VerifyChainError, like is_duplicate_request()
1218            BlockDownloadVerifyError::Invalid { error, .. }
1219                if format!("{error:?}").contains("block is already committed to the state")
1220                    || format!("{error:?}")
1221                        .contains("block has already been sent to be committed to the state") =>
1222            {
1223                // TODO: improve this by checking the type (#2908)
1224                debug!(error = ?e, "block is already committed or pending a commit, possibly from a previous sync run, continuing");
1225                false
1226            }
1227            BlockDownloadVerifyError::DownloadFailed { ref error, .. }
1228                if format!("{error:?}").contains("NotFound") =>
1229            {
1230                // Covers these errors:
1231                // - NotFoundResponse
1232                // - NotFoundRegistry
1233                //
1234                // TODO: improve this by checking the type (#2908)
1235                //       restart after a certain number of NotFound errors?
1236                debug!(error = ?e, "block was not found, possibly from a peer that doesn't have the block yet, continuing");
1237                false
1238            }
1239
1240            _ => {
1241                // download_and_verify downcasts errors from the block verifier
1242                // into VerifyChainError, and puts the result inside one of the
1243                // BlockDownloadVerifyError enumerations. This downcast could
1244                // become incorrect e.g. after some refactoring, and it is difficult
1245                // to write a test to check it. The test below is a best-effort
1246                // attempt to catch if that happens and log it.
1247                //
1248                // TODO: add a proper test and remove this
1249                // https://github.com/ZcashFoundation/zebra/issues/2909
1250                let err_str = format!("{e:?}");
1251                if err_str.contains("AlreadyVerified")
1252                    || err_str.contains("AlreadyInChain")
1253                    || err_str.contains("block is already committed to the state")
1254                    || err_str.contains("block has already been sent to be committed to the state")
1255                    || err_str.contains("NotFound")
1256                {
1257                    error!(?e,
1258                        "a BlockDownloadVerifyError that should have been filtered out was detected, \
1259                        which possibly indicates a programming error in the downcast inside \
1260                        zebrad::components::sync::downloads::Downloads::download_and_verify"
1261                    )
1262                }
1263
1264                warn!(?e, "error downloading and verifying block");
1265                true
1266            }
1267        }
1268    }
1269}