zebra_consensus/
checkpoint.rs

1//! Checkpoint-based block verification.
2//!
3//! Checkpoint-based verification uses a list of checkpoint hashes to
4//! speed up the initial chain sync for Zebra. This list is distributed
5//! with Zebra.
6//!
7//! The checkpoint verifier queues pending blocks. Once there is a
8//! chain from the previous checkpoint to a target checkpoint, it
9//! verifies all the blocks in that chain, and sends accepted blocks to
10//! the state service as finalized chain state, skipping the majority of
11//! contextual verification checks.
12//!
13//! Verification starts at the first checkpoint, which is the genesis
14//! block for the configured network.
15
16use std::{
17    collections::BTreeMap,
18    ops::{Bound, Bound::*},
19    pin::Pin,
20    sync::{mpsc, Arc},
21    task::{Context, Poll},
22};
23
24use futures::{Future, FutureExt, TryFutureExt};
25use thiserror::Error;
26use tokio::sync::oneshot;
27use tower::{Service, ServiceExt};
28use tracing::instrument;
29
30use zebra_chain::{
31    amount::{self, DeferredPoolBalanceChange},
32    block::{self, Block},
33    parameters::{
34        subsidy::{block_subsidy, funding_stream_values, FundingStreamReceiver, SubsidyError},
35        Network, GENESIS_PREVIOUS_BLOCK_HASH,
36    },
37    work::equihash,
38};
39use zebra_state::{self as zs, CheckpointVerifiedBlock};
40
41use crate::{
42    block::VerifyBlockError,
43    checkpoint::types::{
44        Progress::{self, *},
45        TargetHeight::{self, *},
46    },
47    error::BlockError,
48    BoxError, ParameterCheckpoint as _,
49};
50
51pub(crate) mod list;
52mod types;
53
54#[cfg(test)]
55mod tests;
56
57pub use zebra_node_services::constants::{MAX_CHECKPOINT_BYTE_COUNT, MAX_CHECKPOINT_HEIGHT_GAP};
58
59pub use list::CheckpointList;
60
61/// An unverified block, which is in the queue for checkpoint verification.
62#[derive(Debug)]
63struct QueuedBlock {
64    /// The block, with additional precalculated data.
65    block: CheckpointVerifiedBlock,
66    /// The transmitting end of the oneshot channel for this block's result.
67    tx: oneshot::Sender<Result<block::Hash, VerifyCheckpointError>>,
68}
69
70/// The unverified block, with a receiver for the [`QueuedBlock`]'s result.
71#[derive(Debug)]
72struct RequestBlock {
73    /// The block, with additional precalculated data.
74    block: CheckpointVerifiedBlock,
75    /// The receiving end of the oneshot channel for this block's result.
76    rx: oneshot::Receiver<Result<block::Hash, VerifyCheckpointError>>,
77}
78
79/// A list of unverified blocks at a particular height.
80///
81/// Typically contains a single block, but might contain more if a peer
82/// has an old chain fork. (Or sends us a bad block.)
83///
84/// The CheckpointVerifier avoids creating zero-block lists.
85type QueuedBlockList = Vec<QueuedBlock>;
86
87/// The maximum number of queued blocks at any one height.
88///
89/// This value is a tradeoff between:
90/// - rejecting bad blocks: if we queue more blocks, we need fewer network
91///   retries, but use a bit more CPU when verifying,
92/// - avoiding a memory DoS: if we queue fewer blocks, we use less memory.
93///
94/// Memory usage is controlled by the sync service, because it controls block
95/// downloads. When the verifier services process blocks, they reduce memory
96/// usage by committing blocks to the disk state. (Or dropping invalid blocks.)
97pub const MAX_QUEUED_BLOCKS_PER_HEIGHT: usize = 4;
98
99/// Convert a tip into its hash and matching progress.
100fn progress_from_tip(
101    checkpoint_list: &CheckpointList,
102    tip: Option<(block::Height, block::Hash)>,
103) -> (Option<block::Hash>, Progress<block::Height>) {
104    match tip {
105        Some((height, hash)) => {
106            if height >= checkpoint_list.max_height() {
107                (None, Progress::FinalCheckpoint)
108            } else {
109                metrics::gauge!("checkpoint.verified.height").set(height.0 as f64);
110                metrics::gauge!("checkpoint.processing.next.height").set(height.0 as f64);
111                (Some(hash), Progress::InitialTip(height))
112            }
113        }
114        // We start by verifying the genesis block, by itself
115        None => (None, Progress::BeforeGenesis),
116    }
117}
118
119/// A checkpointing block verifier.
120///
121/// Verifies blocks using a supplied list of checkpoints. There must be at
122/// least one checkpoint for the genesis block.
123pub struct CheckpointVerifier<S>
124where
125    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
126    S::Future: Send + 'static,
127{
128    /// The checkpoint list for this verifier.
129    checkpoint_list: CheckpointList,
130
131    /// The network rules used by this verifier.
132    network: Network,
133
134    /// The hash of the initial tip, if any.
135    initial_tip_hash: Option<block::Hash>,
136
137    /// The underlying state service, possibly wrapped in other services.
138    state_service: S,
139
140    /// A queue of unverified blocks.
141    ///
142    /// Contains a list of unverified blocks at each block height. In most cases,
143    /// the checkpoint verifier will store zero or one block at each height.
144    ///
145    /// Blocks are verified in order, once there is a chain from the previous
146    /// checkpoint to a target checkpoint.
147    ///
148    /// The first checkpoint does not have any ancestors, so it only verifies the
149    /// genesis block.
150    queued: BTreeMap<block::Height, QueuedBlockList>,
151
152    /// The current progress of this verifier.
153    verifier_progress: Progress<block::Height>,
154
155    /// A channel to receive requests to reset the verifier,
156    /// receiving the tip of the state.
157    reset_receiver: mpsc::Receiver<Option<(block::Height, block::Hash)>>,
158    /// A channel to send requests to reset the verifier,
159    /// passing the tip of the state.
160    reset_sender: mpsc::Sender<Option<(block::Height, block::Hash)>>,
161
162    /// Queued block height progress transmitter.
163    #[cfg(feature = "progress-bar")]
164    queued_blocks_bar: howudoin::Tx,
165
166    /// Verified checkpoint progress transmitter.
167    #[cfg(feature = "progress-bar")]
168    verified_checkpoint_bar: howudoin::Tx,
169}
170
171impl<S> std::fmt::Debug for CheckpointVerifier<S>
172where
173    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
174    S::Future: Send + 'static,
175{
176    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177        f.debug_struct("CheckpointVerifier")
178            .field("checkpoint_list", &self.checkpoint_list)
179            .field("network", &self.network)
180            .field("initial_tip_hash", &self.initial_tip_hash)
181            .field("queued", &self.queued)
182            .field("verifier_progress", &self.verifier_progress)
183            .finish()
184    }
185}
186
187impl<S> CheckpointVerifier<S>
188where
189    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
190    S::Future: Send + 'static,
191{
192    /// Return a checkpoint verification service for `network`, using the
193    /// hard-coded checkpoint list, and the provided `state_service`.
194    ///
195    /// If `initial_tip` is Some(_), the verifier starts at that initial tip.
196    /// The initial tip can be between the checkpoints in the hard-coded
197    /// checkpoint list.
198    ///
199    /// The checkpoint verifier holds a state service of type `S`, into which newly
200    /// verified blocks will be committed. This state is pluggable to allow for
201    /// testing or instrumentation.
202    ///
203    /// This function should be called only once for a particular network, rather
204    /// than constructing multiple verification services for the same network. To
205    /// clone a CheckpointVerifier, you might need to wrap it in a
206    /// `tower::Buffer` service.
207    #[allow(dead_code)]
208    pub fn new(
209        network: &Network,
210        initial_tip: Option<(block::Height, block::Hash)>,
211        state_service: S,
212    ) -> Self {
213        let checkpoint_list = network.checkpoint_list();
214        let max_height = checkpoint_list.max_height();
215        tracing::info!(
216            ?max_height,
217            ?network,
218            ?initial_tip,
219            "initialising CheckpointVerifier"
220        );
221        Self::from_checkpoint_list(checkpoint_list, network, initial_tip, state_service)
222    }
223
224    /// Return a checkpoint verification service using `list`, `network`,
225    /// `initial_tip`, and `state_service`.
226    ///
227    /// Assumes that the provided genesis checkpoint is correct.
228    ///
229    /// Callers should prefer `CheckpointVerifier::new`, which uses the
230    /// hard-coded checkpoint lists, or `CheckpointList::from_list` if you need
231    /// to specify a custom checkpoint list. See those functions for more
232    /// details.
233    ///
234    /// This function is designed for use in tests.
235    #[allow(dead_code)]
236    pub(crate) fn from_list(
237        list: impl IntoIterator<Item = (block::Height, block::Hash)>,
238        network: &Network,
239        initial_tip: Option<(block::Height, block::Hash)>,
240        state_service: S,
241    ) -> Result<Self, VerifyCheckpointError> {
242        Ok(Self::from_checkpoint_list(
243            CheckpointList::from_list(list).map_err(VerifyCheckpointError::CheckpointList)?,
244            network,
245            initial_tip,
246            state_service,
247        ))
248    }
249
250    /// Return a checkpoint verification service using `checkpoint_list`,
251    /// `network`, `initial_tip`, and `state_service`.
252    ///
253    /// Assumes that the provided genesis checkpoint is correct.
254    ///
255    /// Callers should prefer `CheckpointVerifier::new`, which uses the
256    /// hard-coded checkpoint lists. See that function for more details.
257    pub(crate) fn from_checkpoint_list(
258        checkpoint_list: CheckpointList,
259        network: &Network,
260        initial_tip: Option<(block::Height, block::Hash)>,
261        state_service: S,
262    ) -> Self {
263        // All the initialisers should call this function, so we only have to
264        // change fields or default values in one place.
265        let (initial_tip_hash, verifier_progress) =
266            progress_from_tip(&checkpoint_list, initial_tip);
267
268        let (sender, receiver) = mpsc::channel();
269
270        #[cfg(feature = "progress-bar")]
271        let queued_blocks_bar = howudoin::new_root().label("Checkpoint Queue Height");
272
273        #[cfg(feature = "progress-bar")]
274        let verified_checkpoint_bar =
275            howudoin::new_with_parent(queued_blocks_bar.id()).label("Verified Checkpoints");
276
277        let verifier = CheckpointVerifier {
278            checkpoint_list,
279            network: network.clone(),
280            initial_tip_hash,
281            state_service,
282            queued: BTreeMap::new(),
283            verifier_progress,
284            reset_receiver: receiver,
285            reset_sender: sender,
286            #[cfg(feature = "progress-bar")]
287            queued_blocks_bar,
288            #[cfg(feature = "progress-bar")]
289            verified_checkpoint_bar,
290        };
291
292        if verifier_progress.is_final_checkpoint() {
293            verifier.finish_diagnostics();
294        } else {
295            verifier.verified_checkpoint_diagnostics(verifier_progress.height());
296        }
297
298        verifier
299    }
300
301    /// Update diagnostics for queued blocks.
302    fn queued_block_diagnostics(&self, height: block::Height, hash: block::Hash) {
303        let max_queued_height = self
304            .queued
305            .keys()
306            .next_back()
307            .expect("queued has at least one entry");
308
309        metrics::gauge!("checkpoint.queued.max.height").set(max_queued_height.0 as f64);
310
311        let is_checkpoint = self.checkpoint_list.contains(height);
312        tracing::debug!(?height, ?hash, ?is_checkpoint, "queued block");
313
314        #[cfg(feature = "progress-bar")]
315        if matches!(howudoin::cancelled(), Some(true)) {
316            self.finish_diagnostics();
317        } else {
318            self.queued_blocks_bar
319                .set_pos(max_queued_height.0)
320                .set_len(u64::from(self.checkpoint_list.max_height().0));
321        }
322    }
323
324    /// Update diagnostics for verified checkpoints.
325    fn verified_checkpoint_diagnostics(&self, verified_height: impl Into<Option<block::Height>>) {
326        let Some(verified_height) = verified_height.into() else {
327            // We don't know if we have already finished, or haven't started yet,
328            // so don't register any progress
329            return;
330        };
331
332        metrics::gauge!("checkpoint.verified.height").set(verified_height.0 as f64);
333
334        let checkpoint_index = self.checkpoint_list.prev_checkpoint_index(verified_height);
335        let checkpoint_count = self.checkpoint_list.len();
336
337        metrics::gauge!("checkpoint.verified.count").set(checkpoint_index as f64);
338
339        tracing::debug!(
340            ?verified_height,
341            ?checkpoint_index,
342            ?checkpoint_count,
343            "verified checkpoint",
344        );
345
346        #[cfg(feature = "progress-bar")]
347        if matches!(howudoin::cancelled(), Some(true)) {
348            self.finish_diagnostics();
349        } else {
350            self.verified_checkpoint_bar
351                .set_pos(u64::try_from(checkpoint_index).expect("fits in u64"))
352                .set_len(u64::try_from(checkpoint_count).expect("fits in u64"));
353        }
354    }
355
356    /// Finish checkpoint verifier diagnostics.
357    fn finish_diagnostics(&self) {
358        #[cfg(feature = "progress-bar")]
359        {
360            self.queued_blocks_bar.close();
361            self.verified_checkpoint_bar.close();
362        }
363    }
364
365    /// Reset the verifier progress back to given tip.
366    fn reset_progress(&mut self, tip: Option<(block::Height, block::Hash)>) {
367        let (initial_tip_hash, verifier_progress) = progress_from_tip(&self.checkpoint_list, tip);
368        self.initial_tip_hash = initial_tip_hash;
369        self.verifier_progress = verifier_progress;
370
371        self.verified_checkpoint_diagnostics(verifier_progress.height());
372    }
373
374    /// Return the current verifier's progress.
375    ///
376    /// If verification has not started yet, returns `BeforeGenesis`,
377    /// or `InitialTip(height)` if there were cached verified blocks.
378    ///
379    /// If verification is ongoing, returns `PreviousCheckpoint(height)`.
380    /// `height` increases as checkpoints are verified.
381    ///
382    /// If verification has finished, returns `FinalCheckpoint`.
383    fn previous_checkpoint_height(&self) -> Progress<block::Height> {
384        self.verifier_progress
385    }
386
387    /// Return the start of the current checkpoint range.
388    ///
389    /// Returns None if verification has finished.
390    fn current_start_bound(&self) -> Option<Bound<block::Height>> {
391        match self.previous_checkpoint_height() {
392            BeforeGenesis => Some(Unbounded),
393            InitialTip(height) | PreviousCheckpoint(height) => Some(Excluded(height)),
394            FinalCheckpoint => None,
395        }
396    }
397
398    /// Return the target checkpoint height that we want to verify.
399    ///
400    /// If we need more blocks, returns `WaitingForBlocks`.
401    ///
402    /// If the queued blocks are continuous from the previous checkpoint to a
403    /// target checkpoint, returns `Checkpoint(height)`. The target checkpoint
404    /// can be multiple checkpoints ahead of the previous checkpoint.
405    ///
406    /// `height` increases as checkpoints are verified.
407    ///
408    /// If verification has finished, returns `FinishedVerifying`.
409    fn target_checkpoint_height(&self) -> TargetHeight {
410        // Find the height we want to start searching at
411        let start_height = match self.previous_checkpoint_height() {
412            // Check if we have the genesis block as a special case, to simplify the loop
413            BeforeGenesis if !self.queued.contains_key(&block::Height(0)) => {
414                tracing::trace!("Waiting for genesis block");
415                metrics::counter!("checkpoint.waiting.count").increment(1);
416                return WaitingForBlocks;
417            }
418            BeforeGenesis => block::Height(0),
419            InitialTip(height) | PreviousCheckpoint(height) => height,
420            FinalCheckpoint => return FinishedVerifying,
421        };
422
423        // Find the end of the continuous sequence of blocks, starting at the
424        // last verified checkpoint. If there is no verified checkpoint, start
425        // *after* the genesis block (which we checked above).
426        //
427        // If `btree_map::Range` implements `ExactSizeIterator`, it would be
428        // much faster to walk the checkpoint list, and compare the length of
429        // the `btree_map::Range` to the block height difference between
430        // checkpoints. (In maps, keys are unique, so we don't need to check
431        // each height value.)
432        //
433        // But at the moment, this implementation is slightly faster, because
434        // it stops after the first gap.
435        let mut pending_height = start_height;
436        for (&height, _) in self.queued.range((Excluded(pending_height), Unbounded)) {
437            // If the queued blocks are continuous.
438            if height == block::Height(pending_height.0 + 1) {
439                pending_height = height;
440            } else {
441                let gap = height.0 - pending_height.0;
442                // Try to log a useful message when checkpointing has issues
443                tracing::trace!(contiguous_height = ?pending_height,
444                                next_height = ?height,
445                                ?gap,
446                                "Waiting for more checkpoint blocks");
447                break;
448            }
449        }
450        metrics::gauge!("checkpoint.queued.continuous.height").set(pending_height.0 as f64);
451
452        // Now find the start of the checkpoint range
453        let start = self.current_start_bound().expect(
454            "if verification has finished, we should have returned earlier in the function",
455        );
456        // Find the highest checkpoint below pending_height, excluding any
457        // previously verified checkpoints
458        let target_checkpoint = self
459            .checkpoint_list
460            .max_height_in_range((start, Included(pending_height)));
461
462        tracing::trace!(
463            checkpoint_start = ?start,
464            highest_contiguous_block = ?pending_height,
465            ?target_checkpoint
466        );
467
468        if let Some(block::Height(target_checkpoint)) = target_checkpoint {
469            metrics::gauge!("checkpoint.processing.next.height").set(target_checkpoint as f64);
470        } else {
471            // Use the start height if there is no potential next checkpoint
472            metrics::gauge!("checkpoint.processing.next.height").set(start_height.0 as f64);
473            metrics::counter!("checkpoint.waiting.count").increment(1);
474        }
475
476        target_checkpoint
477            .map(Checkpoint)
478            .unwrap_or(WaitingForBlocks)
479    }
480
481    /// Return the most recently verified checkpoint's hash.
482    ///
483    /// See `previous_checkpoint_height()` for details.
484    fn previous_checkpoint_hash(&self) -> Progress<block::Hash> {
485        match self.previous_checkpoint_height() {
486            BeforeGenesis => BeforeGenesis,
487            InitialTip(_) => self
488                .initial_tip_hash
489                .map(InitialTip)
490                .expect("initial tip height must have an initial tip hash"),
491            PreviousCheckpoint(height) => self
492                .checkpoint_list
493                .hash(height)
494                .map(PreviousCheckpoint)
495                .expect("every checkpoint height must have a hash"),
496            FinalCheckpoint => FinalCheckpoint,
497        }
498    }
499
500    /// Check that `height` is valid and able to be verified.
501    ///
502    /// Returns an error if:
503    ///  - the block's height is greater than the maximum checkpoint
504    ///  - there are no checkpoints
505    ///  - the block's height is less than or equal to the previously verified
506    ///    checkpoint
507    ///  - verification has finished
508    fn check_height(&self, height: block::Height) -> Result<(), VerifyCheckpointError> {
509        if height > self.checkpoint_list.max_height() {
510            Err(VerifyCheckpointError::TooHigh {
511                height,
512                max_height: self.checkpoint_list.max_height(),
513            })?;
514        }
515
516        match self.previous_checkpoint_height() {
517            // Any height is valid
518            BeforeGenesis => {}
519            // Greater heights are valid
520            InitialTip(previous_height) | PreviousCheckpoint(previous_height)
521                if (height <= previous_height) =>
522            {
523                let e = Err(VerifyCheckpointError::AlreadyVerified {
524                    height,
525                    verified_height: previous_height,
526                });
527                tracing::trace!(?e);
528                e?;
529            }
530            InitialTip(_) | PreviousCheckpoint(_) => {}
531            // We're finished, so no checkpoint height is valid
532            FinalCheckpoint => Err(VerifyCheckpointError::Finished)?,
533        };
534
535        Ok(())
536    }
537
538    /// Increase the current checkpoint height to `verified_height`,
539    fn update_progress(&mut self, verified_height: block::Height) {
540        if let Some(max_height) = self.queued.keys().next_back() {
541            metrics::gauge!("checkpoint.queued.max.height").set(max_height.0 as f64);
542        } else {
543            // use f64::NAN as a sentinel value for "None", because 0 is a valid height
544            metrics::gauge!("checkpoint.queued.max.height").set(f64::NAN);
545        }
546        metrics::gauge!("checkpoint.queued_slots").set(self.queued.len() as f64);
547
548        // Ignore blocks that are below the previous checkpoint, or otherwise
549        // have invalid heights.
550        //
551        // We ignore out-of-order verification, such as:
552        //  - the height is less than the previous checkpoint height, or
553        //  - the previous checkpoint height is the maximum height (checkpoint verifies are finished),
554        // because futures might not resolve in height order.
555        if self.check_height(verified_height).is_err() {
556            return;
557        }
558
559        // Ignore heights that aren't checkpoint heights
560        if verified_height == self.checkpoint_list.max_height() {
561            self.verifier_progress = FinalCheckpoint;
562
563            tracing::info!(
564                final_checkpoint_height = ?verified_height,
565                "verified final checkpoint: starting full validation",
566            );
567
568            self.verified_checkpoint_diagnostics(verified_height);
569            self.finish_diagnostics();
570        } else if self.checkpoint_list.contains(verified_height) {
571            self.verifier_progress = PreviousCheckpoint(verified_height);
572            // We're done with the initial tip hash now
573            self.initial_tip_hash = None;
574
575            self.verified_checkpoint_diagnostics(verified_height);
576        }
577    }
578
579    /// Check that the block height, proof of work, and Merkle root are valid.
580    ///
581    /// Returns a [`CheckpointVerifiedBlock`] with precalculated block data.
582    ///
583    /// ## Security
584    ///
585    /// Checking the proof of work makes resource exhaustion attacks harder to
586    /// carry out, because malicious blocks require a valid proof of work.
587    ///
588    /// Checking the Merkle root ensures that the block hash binds the block
589    /// contents. To prevent malleability (CVE-2012-2459), we also need to check
590    /// whether the transaction hashes are unique.
591    fn check_block(
592        &self,
593        block: Arc<Block>,
594    ) -> Result<CheckpointVerifiedBlock, VerifyCheckpointError> {
595        let hash = block.hash();
596        let height = block
597            .coinbase_height()
598            .ok_or(VerifyCheckpointError::CoinbaseHeight { hash })?;
599        self.check_height(height)?;
600
601        if self.network.disable_pow() {
602            crate::block::check::difficulty_threshold_is_valid(
603                &block.header,
604                &self.network,
605                &height,
606                &hash,
607            )?;
608        } else {
609            crate::block::check::difficulty_is_valid(&block.header, &self.network, &height, &hash)?;
610            crate::block::check::equihash_solution_is_valid(&block.header)?;
611        }
612
613        // We can't get the block subsidy for blocks with heights in the slow start interval, so we
614        // omit the calculation of the expected deferred amount.
615        let expected_deferred_amount = if height > self.network.slow_start_interval() {
616            // See [ZIP-1015](https://zips.z.cash/zip-1015).
617            funding_stream_values(height, &self.network, block_subsidy(height, &self.network)?)?
618                .remove(&FundingStreamReceiver::Deferred)
619        } else {
620            None
621        };
622
623        let deferred_pool_balance_change = expected_deferred_amount
624            .unwrap_or_default()
625            .checked_sub(self.network.lockbox_disbursement_total_amount(height))
626            .map(DeferredPoolBalanceChange::new);
627
628        // don't do precalculation until the block passes basic difficulty checks
629        let block = CheckpointVerifiedBlock::new(block, Some(hash), deferred_pool_balance_change);
630
631        crate::block::check::merkle_root_validity(
632            &self.network,
633            &block.block,
634            &block.transaction_hashes,
635        )?;
636
637        Ok(block)
638    }
639
640    /// Queue `block` for verification.
641    ///
642    /// On success, returns a [`RequestBlock`] containing the block,
643    /// precalculated request data, and the queued result receiver.
644    ///
645    /// Verification will finish when the chain to the next checkpoint is
646    /// complete, and the caller will be notified via the channel.
647    ///
648    /// If the block does not pass basic validity checks,
649    /// returns an error immediately.
650    #[allow(clippy::unwrap_in_result)]
651    fn queue_block(&mut self, block: Arc<Block>) -> Result<RequestBlock, VerifyCheckpointError> {
652        // Set up a oneshot channel to send results
653        let (tx, rx) = oneshot::channel();
654
655        // Check that the height and Merkle roots are valid.
656        let block = self.check_block(block)?;
657        let height = block.height;
658        let hash = block.hash;
659
660        let new_qblock = QueuedBlock {
661            block: block.clone(),
662            tx,
663        };
664        let req_block = RequestBlock { block, rx };
665
666        // Since we're using Arc<Block>, each entry is a single pointer to the
667        // Arc. But there are a lot of QueuedBlockLists in the queue, so we keep
668        // allocations as small as possible.
669        let qblocks = self
670            .queued
671            .entry(height)
672            .or_insert_with(|| QueuedBlockList::with_capacity(1));
673
674        // Replace older requests with newer ones.
675        // The newer block is ok, the older block is an error.
676        for qb in qblocks.iter_mut() {
677            if qb.block.hash == hash {
678                let e = VerifyCheckpointError::NewerRequest { height, hash };
679                tracing::trace!(?e, "failing older of duplicate requests");
680
681                // ## Security
682                //
683                // Replace the entire queued block.
684                //
685                // We don't check the authorizing data hash until checkpoint blocks reach the state.
686                // So signatures, proofs, or scripts could be different,
687                // even if the block hash is the same.
688
689                let old = std::mem::replace(qb, new_qblock);
690                let _ = old.tx.send(Err(e));
691                return Ok(req_block);
692            }
693        }
694
695        // Memory DoS resistance: limit the queued blocks at each height
696        if qblocks.len() >= MAX_QUEUED_BLOCKS_PER_HEIGHT {
697            let e = VerifyCheckpointError::QueuedLimit;
698            tracing::warn!(?e);
699            return Err(e);
700        }
701
702        // Add the block to the list of queued blocks at this height
703        // This is a no-op for the first block in each QueuedBlockList.
704        qblocks.reserve_exact(1);
705        qblocks.push(new_qblock);
706
707        self.queued_block_diagnostics(height, hash);
708
709        Ok(req_block)
710    }
711
712    /// During checkpoint range processing, process all the blocks at `height`.
713    ///
714    /// Returns the first valid block. If there is no valid block, returns None.
715    #[allow(clippy::unwrap_in_result)]
716    fn process_height(
717        &mut self,
718        height: block::Height,
719        expected_hash: block::Hash,
720    ) -> Option<QueuedBlock> {
721        let mut qblocks = self
722            .queued
723            .remove(&height)
724            .expect("the current checkpoint range has continuous Vec<QueuedBlock>s");
725        assert!(
726            !qblocks.is_empty(),
727            "the current checkpoint range has continuous Blocks"
728        );
729
730        // Check interim checkpoints
731        if let Some(checkpoint_hash) = self.checkpoint_list.hash(height) {
732            // We assume the checkpoints are valid. And we have verified back
733            // from the target checkpoint, so the last block must also be valid.
734            // This is probably a bad checkpoint list, a zebra bug, or a bad
735            // chain (in a testing mode like regtest).
736            assert_eq!(expected_hash, checkpoint_hash,
737                           "checkpoints in the range should match: bad checkpoint list, zebra bug, or bad chain"
738                );
739        }
740
741        // Find a queued block at this height, which is part of the hash chain.
742        //
743        // There are two possible outcomes here:
744        //   - one of the blocks matches the chain (the common case)
745        //   - no blocks match the chain, verification has failed for this range
746        // If there are any side-chain blocks, they fail validation.
747        let mut valid_qblock = None;
748        for qblock in qblocks.drain(..) {
749            if qblock.block.hash == expected_hash {
750                if valid_qblock.is_none() {
751                    // The first valid block at the current height
752                    valid_qblock = Some(qblock);
753                } else {
754                    unreachable!("unexpected duplicate block {:?} {:?}: duplicate blocks should be rejected before being queued",
755                                 height, qblock.block.hash);
756                }
757            } else {
758                tracing::info!(?height, ?qblock.block.hash, ?expected_hash,
759                               "Side chain hash at height in CheckpointVerifier");
760                let _ = qblock
761                    .tx
762                    .send(Err(VerifyCheckpointError::UnexpectedSideChain {
763                        found: qblock.block.hash,
764                        expected: expected_hash,
765                    }));
766            }
767        }
768
769        valid_qblock
770    }
771
772    /// Try to verify from the previous checkpoint to a target checkpoint.
773    ///
774    /// Send `Ok` for the blocks that are in the chain, and `Err` for side-chain
775    /// blocks.
776    ///
777    /// Does nothing if we are waiting for more blocks, or if verification has
778    /// finished.
779    fn process_checkpoint_range(&mut self) {
780        // If this code shows up in profiles, we can try the following
781        // optimisations:
782        //   - only check the chain when the length of the queue is greater
783        //     than or equal to the length of a checkpoint interval
784        //     (note: the genesis checkpoint interval is only one block long)
785        //   - cache the height of the last continuous chain as a new field in
786        //     self, and start at that height during the next check.
787
788        // Return early if verification has finished
789        let previous_checkpoint_hash = match self.previous_checkpoint_hash() {
790            // Since genesis blocks are hard-coded in zcashd, and not verified
791            // like other blocks, the genesis parent hash is set by the
792            // consensus parameters.
793            BeforeGenesis => GENESIS_PREVIOUS_BLOCK_HASH,
794            InitialTip(hash) | PreviousCheckpoint(hash) => hash,
795            FinalCheckpoint => return,
796        };
797        // Return early if we're still waiting for more blocks
798        let (target_checkpoint_height, mut expected_hash) = match self.target_checkpoint_height() {
799            Checkpoint(height) => (
800                height,
801                self.checkpoint_list
802                    .hash(height)
803                    .expect("every checkpoint height must have a hash"),
804            ),
805            WaitingForBlocks => {
806                return;
807            }
808            FinishedVerifying => {
809                unreachable!("the FinalCheckpoint case should have returned earlier")
810            }
811        };
812
813        // Keep the old previous checkpoint height, to make sure we're making
814        // progress
815        let old_prev_check_height = self.previous_checkpoint_height();
816
817        // Work out which blocks and checkpoints we're checking
818        let current_range = (
819            self.current_start_bound()
820                .expect("earlier code checks if verification has finished"),
821            Included(target_checkpoint_height),
822        );
823        let range_heights: Vec<block::Height> = self
824            .queued
825            .range_mut(current_range)
826            .rev()
827            .map(|(key, _)| *key)
828            .collect();
829        // A list of pending valid blocks, in reverse chain order
830        let mut rev_valid_blocks = Vec::new();
831
832        // Check all the blocks, and discard all the bad blocks
833        for current_height in range_heights {
834            let valid_qblock = self.process_height(current_height, expected_hash);
835            if let Some(qblock) = valid_qblock {
836                expected_hash = qblock.block.block.header.previous_block_hash;
837                // Add the block to the end of the pending block list
838                // (since we're walking the chain backwards, the list is
839                // in reverse chain order)
840                rev_valid_blocks.push(qblock);
841            } else {
842                // The last block height we processed did not have any blocks
843                // with a matching hash, so chain verification has failed.
844                tracing::info!(
845                    ?current_height,
846                    ?current_range,
847                    "No valid blocks at height in CheckpointVerifier"
848                );
849
850                // We kept all the matching blocks down to this height, in
851                // anticipation of the chain verifying. But the chain is
852                // incomplete, so we have to put them back in the queue.
853                //
854                // The order here shouldn't matter, but add the blocks in
855                // height order, for consistency.
856                for vblock in rev_valid_blocks.drain(..).rev() {
857                    self.queued
858                        .entry(vblock.block.height)
859                        .or_default()
860                        .push(vblock);
861                }
862
863                // Make sure the current progress hasn't changed
864                assert_eq!(
865                    self.previous_checkpoint_height(),
866                    old_prev_check_height,
867                    "we must not change the previous checkpoint on failure"
868                );
869                // We've reduced the target
870                //
871                // This check should be cheap, because we just reduced the target
872                let current_target = self.target_checkpoint_height();
873                assert!(
874                    current_target == WaitingForBlocks
875                        || current_target < Checkpoint(target_checkpoint_height),
876                    "we must decrease or eliminate our target on failure"
877                );
878
879                // Stop verifying, and wait for the next valid block
880                return;
881            }
882        }
883
884        // The checkpoint and the parent hash must match.
885        // See the detailed checkpoint comparison comment above.
886        assert_eq!(
887            expected_hash, previous_checkpoint_hash,
888            "the previous checkpoint should match: bad checkpoint list, zebra bug, or bad chain"
889        );
890
891        let block_count = rev_valid_blocks.len();
892        tracing::info!(?block_count, ?current_range, "verified checkpoint range");
893        metrics::counter!("checkpoint.verified.block.count").increment(block_count as u64);
894
895        // All the blocks we've kept are valid, so let's verify them
896        // in height order.
897        for qblock in rev_valid_blocks.drain(..).rev() {
898            // Sending can fail, but there's nothing we can do about it.
899            let _ = qblock.tx.send(Ok(qblock.block.hash));
900        }
901
902        // Finally, update the checkpoint bounds
903        self.update_progress(target_checkpoint_height);
904
905        // Ensure that we're making progress
906        let new_progress = self.previous_checkpoint_height();
907        assert!(
908            new_progress > old_prev_check_height,
909            "we must make progress on success"
910        );
911        // We met the old target
912        if new_progress == FinalCheckpoint {
913            assert_eq!(
914                target_checkpoint_height,
915                self.checkpoint_list.max_height(),
916                "we finish at the maximum checkpoint"
917            );
918        } else {
919            assert_eq!(
920                new_progress,
921                PreviousCheckpoint(target_checkpoint_height),
922                "the new previous checkpoint must match the old target"
923            );
924        }
925        // We processed all available checkpoints
926        //
927        // We've cleared the target range, so this check should be cheap
928        let new_target = self.target_checkpoint_height();
929        assert!(
930            new_target == WaitingForBlocks || new_target == FinishedVerifying,
931            "processing must cover all available checkpoints"
932        );
933    }
934}
935
936/// CheckpointVerifier rejects pending futures on drop.
937impl<S> Drop for CheckpointVerifier<S>
938where
939    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
940    S::Future: Send + 'static,
941{
942    /// Send an error on `tx` for any `QueuedBlock`s that haven't been verified.
943    ///
944    /// We can't implement `Drop` on QueuedBlock, because `send()` consumes
945    /// `tx`. And `tx` doesn't implement `Copy` or `Default` (for `take()`).
946    fn drop(&mut self) {
947        self.finish_diagnostics();
948
949        let drop_keys: Vec<_> = self.queued.keys().cloned().collect();
950        for key in drop_keys {
951            let mut qblocks = self
952                .queued
953                .remove(&key)
954                .expect("each entry is only removed once");
955            for qblock in qblocks.drain(..) {
956                // Sending can fail, but there's nothing we can do about it.
957                let _ = qblock.tx.send(Err(VerifyCheckpointError::Dropped));
958            }
959        }
960    }
961}
962
963#[derive(Debug, Error)]
964#[allow(missing_docs)]
965pub enum VerifyCheckpointError {
966    #[error("checkpoint request after the final checkpoint has been verified")]
967    Finished,
968    #[error("block at {height:?} is higher than the maximum checkpoint {max_height:?}")]
969    TooHigh {
970        height: block::Height,
971        max_height: block::Height,
972    },
973    #[error("block {height:?} is less than or equal to the verified tip {verified_height:?}")]
974    AlreadyVerified {
975        height: block::Height,
976        verified_height: block::Height,
977    },
978    #[error("rejected older of duplicate verification requests for block at {height:?} {hash:?}")]
979    NewerRequest {
980        height: block::Height,
981        hash: block::Hash,
982    },
983    #[error("the block {hash:?} does not have a coinbase height")]
984    CoinbaseHeight { hash: block::Hash },
985    #[error("merkle root {actual:?} does not match expected {expected:?}")]
986    BadMerkleRoot {
987        actual: block::merkle::Root,
988        expected: block::merkle::Root,
989    },
990    #[error("duplicate transactions in block")]
991    DuplicateTransaction,
992    #[error("checkpoint verifier was dropped")]
993    Dropped,
994    #[error(transparent)]
995    CommitCheckpointVerified(BoxError),
996    #[error(transparent)]
997    Tip(BoxError),
998    #[error(transparent)]
999    CheckpointList(BoxError),
1000    #[error(transparent)]
1001    VerifyBlock(VerifyBlockError),
1002    #[error("invalid block subsidy: {0}")]
1003    SubsidyError(#[from] SubsidyError),
1004    #[error("invalid amount: {0}")]
1005    AmountError(#[from] amount::Error),
1006    #[error("too many queued blocks at this height")]
1007    QueuedLimit,
1008    #[error("the block hash does not match the chained checkpoint hash, expected {expected:?} found {found:?}")]
1009    UnexpectedSideChain {
1010        expected: block::Hash,
1011        found: block::Hash,
1012    },
1013    #[error("zebra is shutting down")]
1014    ShuttingDown,
1015}
1016
1017impl From<VerifyBlockError> for VerifyCheckpointError {
1018    fn from(err: VerifyBlockError) -> VerifyCheckpointError {
1019        VerifyCheckpointError::VerifyBlock(err)
1020    }
1021}
1022
1023impl From<BlockError> for VerifyCheckpointError {
1024    fn from(err: BlockError) -> VerifyCheckpointError {
1025        VerifyCheckpointError::VerifyBlock(err.into())
1026    }
1027}
1028
1029impl From<equihash::Error> for VerifyCheckpointError {
1030    fn from(err: equihash::Error) -> VerifyCheckpointError {
1031        VerifyCheckpointError::VerifyBlock(err.into())
1032    }
1033}
1034
1035impl VerifyCheckpointError {
1036    /// Returns `true` if this is definitely a duplicate request.
1037    /// Some duplicate requests might not be detected, and therefore return `false`.
1038    pub fn is_duplicate_request(&self) -> bool {
1039        match self {
1040            VerifyCheckpointError::AlreadyVerified { .. } => true,
1041            // TODO: make this duplicate-incomplete
1042            VerifyCheckpointError::NewerRequest { .. } => true,
1043            VerifyCheckpointError::VerifyBlock(block_error) => block_error.is_duplicate_request(),
1044            _ => false,
1045        }
1046    }
1047
1048    /// Returns a suggested misbehaviour score increment for a certain error.
1049    pub fn misbehavior_score(&self) -> u32 {
1050        // TODO: Adjust these values based on zcashd (#9258).
1051        match self {
1052            VerifyCheckpointError::VerifyBlock(verify_block_error) => {
1053                verify_block_error.misbehavior_score()
1054            }
1055            VerifyCheckpointError::SubsidyError(_)
1056            | VerifyCheckpointError::CoinbaseHeight { .. }
1057            | VerifyCheckpointError::DuplicateTransaction
1058            | VerifyCheckpointError::AmountError(_) => 100,
1059            _other => 0,
1060        }
1061    }
1062}
1063
1064/// The CheckpointVerifier service implementation.
1065///
1066/// After verification, the block futures resolve to their hashes.
1067impl<S> Service<Arc<Block>> for CheckpointVerifier<S>
1068where
1069    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
1070    S::Future: Send + 'static,
1071{
1072    type Response = block::Hash;
1073    type Error = VerifyCheckpointError;
1074    type Future =
1075        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1076
1077    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1078        Poll::Ready(Ok(()))
1079    }
1080
1081    #[instrument(name = "checkpoint", skip(self, block))]
1082    fn call(&mut self, block: Arc<Block>) -> Self::Future {
1083        // Reset the verifier back to the state tip if requested
1084        // (e.g. due to an error when committing a block to the state)
1085        if let Ok(tip) = self.reset_receiver.try_recv() {
1086            self.reset_progress(tip);
1087        }
1088
1089        // Immediately reject all incoming blocks that arrive after we've finished.
1090        if let FinalCheckpoint = self.previous_checkpoint_height() {
1091            return async { Err(VerifyCheckpointError::Finished) }.boxed();
1092        }
1093
1094        let req_block = match self.queue_block(block) {
1095            Ok(req_block) => req_block,
1096            Err(e) => return async { Err(e) }.boxed(),
1097        };
1098
1099        self.process_checkpoint_range();
1100
1101        metrics::gauge!("checkpoint.queued_slots").set(self.queued.len() as f64);
1102
1103        // Because the checkpoint verifier duplicates state from the state
1104        // service (it tracks which checkpoints have been verified), we must
1105        // commit blocks transactionally on a per-checkpoint basis. Otherwise,
1106        // the checkpoint verifier's state could desync from the underlying
1107        // state service. Among other problems, this could cause the checkpoint
1108        // verifier to reject blocks not already in the state as
1109        // already-verified.
1110        //
1111        // # Dropped Receivers
1112        //
1113        // To commit blocks transactionally on a per-checkpoint basis, we must
1114        // commit all verified blocks in a checkpoint range, regardless of
1115        // whether or not the response futures for each block were dropped.
1116        //
1117        // We accomplish this by spawning a new task containing the
1118        // commit-if-verified logic. This task will always execute, except if
1119        // the program is interrupted, in which case there is no longer a
1120        // checkpoint verifier to keep in sync with the state.
1121        //
1122        // # State Commit Failures
1123        //
1124        // If the state commit fails due to corrupt block data,
1125        // we don't reject the entire checkpoint.
1126        // Instead, we reset the verifier to the successfully committed state tip.
1127        let state_service = self.state_service.clone();
1128        let commit_checkpoint_verified = tokio::spawn(async move {
1129            let hash = req_block
1130                .rx
1131                .await
1132                .map_err(Into::into)
1133                .map_err(VerifyCheckpointError::CommitCheckpointVerified)
1134                .expect("CheckpointVerifier does not leave dangling receivers")?;
1135
1136            // We use a `ServiceExt::oneshot`, so that every state service
1137            // `poll_ready` has a corresponding `call`. See #1593.
1138            match state_service
1139                .oneshot(zs::Request::CommitCheckpointVerifiedBlock(req_block.block))
1140                .map_err(VerifyCheckpointError::CommitCheckpointVerified)
1141                .await?
1142            {
1143                zs::Response::Committed(committed_hash) => {
1144                    assert_eq!(committed_hash, hash, "state must commit correct hash");
1145                    Ok(hash)
1146                }
1147                _ => unreachable!("wrong response for CommitCheckpointVerifiedBlock"),
1148            }
1149        });
1150
1151        let state_service = self.state_service.clone();
1152        let reset_sender = self.reset_sender.clone();
1153        async move {
1154            let result = commit_checkpoint_verified.await;
1155            // Avoid a panic on shutdown
1156            //
1157            // When `zebrad` is terminated using Ctrl-C, the `commit_checkpoint_verified` task
1158            // can return a `JoinError::Cancelled`. We expect task cancellation on shutdown,
1159            // so we don't need to panic here. The persistent state is correct even when the
1160            // task is cancelled, because block data is committed inside transactions, in
1161            // height order.
1162            let result = if zebra_chain::shutdown::is_shutting_down() {
1163                Err(VerifyCheckpointError::ShuttingDown)
1164            } else {
1165                result.expect("commit_checkpoint_verified should not panic")
1166            };
1167            if result.is_err() {
1168                // If there was an error committing the block, then this verifier
1169                // will be out of sync with the state. In that case, reset
1170                // its progress back to the state tip.
1171                let tip = match state_service
1172                    .oneshot(zs::Request::Tip)
1173                    .await
1174                    .map_err(VerifyCheckpointError::Tip)?
1175                {
1176                    zs::Response::Tip(tip) => tip,
1177                    _ => unreachable!("wrong response for Tip"),
1178                };
1179                // Ignore errors since send() can fail only when the verifier
1180                // is being dropped, and then it doesn't matter anymore.
1181                let _ = reset_sender.send(tip);
1182            }
1183            result
1184        }
1185        .boxed()
1186    }
1187}