zebra_consensus/
checkpoint.rs

1//! Checkpoint-based block verification.
2//!
3//! Checkpoint-based verification uses a list of checkpoint hashes to
4//! speed up the initial chain sync for Zebra. This list is distributed
5//! with Zebra.
6//!
7//! The checkpoint verifier queues pending blocks. Once there is a
8//! chain from the previous checkpoint to a target checkpoint, it
9//! verifies all the blocks in that chain, and sends accepted blocks to
10//! the state service as finalized chain state, skipping the majority of
11//! contextual verification checks.
12//!
13//! Verification starts at the first checkpoint, which is the genesis
14//! block for the configured network.
15
16use std::{
17    collections::BTreeMap,
18    ops::{Bound, Bound::*},
19    pin::Pin,
20    sync::{mpsc, Arc},
21    task::{Context, Poll},
22};
23
24use futures::{Future, FutureExt, TryFutureExt};
25use thiserror::Error;
26use tokio::sync::oneshot;
27use tower::{Service, ServiceExt};
28use tracing::instrument;
29
30use zebra_chain::{
31    amount,
32    block::{self, Block},
33    parameters::{
34        subsidy::{block_subsidy, funding_stream_values, FundingStreamReceiver, SubsidyError},
35        Network, GENESIS_PREVIOUS_BLOCK_HASH,
36    },
37    work::equihash,
38};
39use zebra_state::{self as zs, CheckpointVerifiedBlock};
40
41use crate::{
42    block::VerifyBlockError,
43    checkpoint::types::{
44        Progress::{self, *},
45        TargetHeight::{self, *},
46    },
47    error::BlockError,
48    BoxError, ParameterCheckpoint as _,
49};
50
51pub(crate) mod list;
52mod types;
53
54#[cfg(test)]
55mod tests;
56
57pub use zebra_node_services::constants::{MAX_CHECKPOINT_BYTE_COUNT, MAX_CHECKPOINT_HEIGHT_GAP};
58
59pub use list::CheckpointList;
60
61/// An unverified block, which is in the queue for checkpoint verification.
62#[derive(Debug)]
63struct QueuedBlock {
64    /// The block, with additional precalculated data.
65    block: CheckpointVerifiedBlock,
66    /// The transmitting end of the oneshot channel for this block's result.
67    tx: oneshot::Sender<Result<block::Hash, VerifyCheckpointError>>,
68}
69
70/// The unverified block, with a receiver for the [`QueuedBlock`]'s result.
71#[derive(Debug)]
72struct RequestBlock {
73    /// The block, with additional precalculated data.
74    block: CheckpointVerifiedBlock,
75    /// The receiving end of the oneshot channel for this block's result.
76    rx: oneshot::Receiver<Result<block::Hash, VerifyCheckpointError>>,
77}
78
79/// A list of unverified blocks at a particular height.
80///
81/// Typically contains a single block, but might contain more if a peer
82/// has an old chain fork. (Or sends us a bad block.)
83///
84/// The CheckpointVerifier avoids creating zero-block lists.
85type QueuedBlockList = Vec<QueuedBlock>;
86
87/// The maximum number of queued blocks at any one height.
88///
89/// This value is a tradeoff between:
90/// - rejecting bad blocks: if we queue more blocks, we need fewer network
91///   retries, but use a bit more CPU when verifying,
92/// - avoiding a memory DoS: if we queue fewer blocks, we use less memory.
93///
94/// Memory usage is controlled by the sync service, because it controls block
95/// downloads. When the verifier services process blocks, they reduce memory
96/// usage by committing blocks to the disk state. (Or dropping invalid blocks.)
97pub const MAX_QUEUED_BLOCKS_PER_HEIGHT: usize = 4;
98
99/// Convert a tip into its hash and matching progress.
100fn progress_from_tip(
101    checkpoint_list: &CheckpointList,
102    tip: Option<(block::Height, block::Hash)>,
103) -> (Option<block::Hash>, Progress<block::Height>) {
104    match tip {
105        Some((height, hash)) => {
106            if height >= checkpoint_list.max_height() {
107                (None, Progress::FinalCheckpoint)
108            } else {
109                metrics::gauge!("checkpoint.verified.height").set(height.0 as f64);
110                metrics::gauge!("checkpoint.processing.next.height").set(height.0 as f64);
111                (Some(hash), Progress::InitialTip(height))
112            }
113        }
114        // We start by verifying the genesis block, by itself
115        None => (None, Progress::BeforeGenesis),
116    }
117}
118
119/// A checkpointing block verifier.
120///
121/// Verifies blocks using a supplied list of checkpoints. There must be at
122/// least one checkpoint for the genesis block.
123pub struct CheckpointVerifier<S>
124where
125    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
126    S::Future: Send + 'static,
127{
128    /// The checkpoint list for this verifier.
129    checkpoint_list: CheckpointList,
130
131    /// The network rules used by this verifier.
132    network: Network,
133
134    /// The hash of the initial tip, if any.
135    initial_tip_hash: Option<block::Hash>,
136
137    /// The underlying state service, possibly wrapped in other services.
138    state_service: S,
139
140    /// A queue of unverified blocks.
141    ///
142    /// Contains a list of unverified blocks at each block height. In most cases,
143    /// the checkpoint verifier will store zero or one block at each height.
144    ///
145    /// Blocks are verified in order, once there is a chain from the previous
146    /// checkpoint to a target checkpoint.
147    ///
148    /// The first checkpoint does not have any ancestors, so it only verifies the
149    /// genesis block.
150    queued: BTreeMap<block::Height, QueuedBlockList>,
151
152    /// The current progress of this verifier.
153    verifier_progress: Progress<block::Height>,
154
155    /// A channel to receive requests to reset the verifier,
156    /// receiving the tip of the state.
157    reset_receiver: mpsc::Receiver<Option<(block::Height, block::Hash)>>,
158    /// A channel to send requests to reset the verifier,
159    /// passing the tip of the state.
160    reset_sender: mpsc::Sender<Option<(block::Height, block::Hash)>>,
161
162    /// Queued block height progress transmitter.
163    #[cfg(feature = "progress-bar")]
164    queued_blocks_bar: howudoin::Tx,
165
166    /// Verified checkpoint progress transmitter.
167    #[cfg(feature = "progress-bar")]
168    verified_checkpoint_bar: howudoin::Tx,
169}
170
171impl<S> std::fmt::Debug for CheckpointVerifier<S>
172where
173    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
174    S::Future: Send + 'static,
175{
176    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177        f.debug_struct("CheckpointVerifier")
178            .field("checkpoint_list", &self.checkpoint_list)
179            .field("network", &self.network)
180            .field("initial_tip_hash", &self.initial_tip_hash)
181            .field("queued", &self.queued)
182            .field("verifier_progress", &self.verifier_progress)
183            .finish()
184    }
185}
186
187impl<S> CheckpointVerifier<S>
188where
189    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
190    S::Future: Send + 'static,
191{
192    /// Return a checkpoint verification service for `network`, using the
193    /// hard-coded checkpoint list, and the provided `state_service`.
194    ///
195    /// If `initial_tip` is Some(_), the verifier starts at that initial tip.
196    /// The initial tip can be between the checkpoints in the hard-coded
197    /// checkpoint list.
198    ///
199    /// The checkpoint verifier holds a state service of type `S`, into which newly
200    /// verified blocks will be committed. This state is pluggable to allow for
201    /// testing or instrumentation.
202    ///
203    /// This function should be called only once for a particular network, rather
204    /// than constructing multiple verification services for the same network. To
205    /// clone a CheckpointVerifier, you might need to wrap it in a
206    /// `tower::Buffer` service.
207    #[allow(dead_code)]
208    pub fn new(
209        network: &Network,
210        initial_tip: Option<(block::Height, block::Hash)>,
211        state_service: S,
212    ) -> Self {
213        let checkpoint_list = network.checkpoint_list();
214        let max_height = checkpoint_list.max_height();
215        tracing::info!(
216            ?max_height,
217            ?network,
218            ?initial_tip,
219            "initialising CheckpointVerifier"
220        );
221        Self::from_checkpoint_list(checkpoint_list, network, initial_tip, state_service)
222    }
223
224    /// Return a checkpoint verification service using `list`, `network`,
225    /// `initial_tip`, and `state_service`.
226    ///
227    /// Assumes that the provided genesis checkpoint is correct.
228    ///
229    /// Callers should prefer `CheckpointVerifier::new`, which uses the
230    /// hard-coded checkpoint lists, or `CheckpointList::from_list` if you need
231    /// to specify a custom checkpoint list. See those functions for more
232    /// details.
233    ///
234    /// This function is designed for use in tests.
235    #[allow(dead_code)]
236    pub(crate) fn from_list(
237        list: impl IntoIterator<Item = (block::Height, block::Hash)>,
238        network: &Network,
239        initial_tip: Option<(block::Height, block::Hash)>,
240        state_service: S,
241    ) -> Result<Self, VerifyCheckpointError> {
242        Ok(Self::from_checkpoint_list(
243            CheckpointList::from_list(list).map_err(VerifyCheckpointError::CheckpointList)?,
244            network,
245            initial_tip,
246            state_service,
247        ))
248    }
249
250    /// Return a checkpoint verification service using `checkpoint_list`,
251    /// `network`, `initial_tip`, and `state_service`.
252    ///
253    /// Assumes that the provided genesis checkpoint is correct.
254    ///
255    /// Callers should prefer `CheckpointVerifier::new`, which uses the
256    /// hard-coded checkpoint lists. See that function for more details.
257    pub(crate) fn from_checkpoint_list(
258        checkpoint_list: CheckpointList,
259        network: &Network,
260        initial_tip: Option<(block::Height, block::Hash)>,
261        state_service: S,
262    ) -> Self {
263        // All the initialisers should call this function, so we only have to
264        // change fields or default values in one place.
265        let (initial_tip_hash, verifier_progress) =
266            progress_from_tip(&checkpoint_list, initial_tip);
267
268        let (sender, receiver) = mpsc::channel();
269
270        #[cfg(feature = "progress-bar")]
271        let queued_blocks_bar = howudoin::new_root().label("Checkpoint Queue Height");
272
273        #[cfg(feature = "progress-bar")]
274        let verified_checkpoint_bar =
275            howudoin::new_with_parent(queued_blocks_bar.id()).label("Verified Checkpoints");
276
277        let verifier = CheckpointVerifier {
278            checkpoint_list,
279            network: network.clone(),
280            initial_tip_hash,
281            state_service,
282            queued: BTreeMap::new(),
283            verifier_progress,
284            reset_receiver: receiver,
285            reset_sender: sender,
286            #[cfg(feature = "progress-bar")]
287            queued_blocks_bar,
288            #[cfg(feature = "progress-bar")]
289            verified_checkpoint_bar,
290        };
291
292        if verifier_progress.is_final_checkpoint() {
293            verifier.finish_diagnostics();
294        } else {
295            verifier.verified_checkpoint_diagnostics(verifier_progress.height());
296        }
297
298        verifier
299    }
300
301    /// Update diagnostics for queued blocks.
302    fn queued_block_diagnostics(&self, height: block::Height, hash: block::Hash) {
303        let max_queued_height = self
304            .queued
305            .keys()
306            .next_back()
307            .expect("queued has at least one entry");
308
309        metrics::gauge!("checkpoint.queued.max.height").set(max_queued_height.0 as f64);
310
311        let is_checkpoint = self.checkpoint_list.contains(height);
312        tracing::debug!(?height, ?hash, ?is_checkpoint, "queued block");
313
314        #[cfg(feature = "progress-bar")]
315        if matches!(howudoin::cancelled(), Some(true)) {
316            self.finish_diagnostics();
317        } else {
318            self.queued_blocks_bar
319                .set_pos(max_queued_height.0)
320                .set_len(u64::from(self.checkpoint_list.max_height().0));
321        }
322    }
323
324    /// Update diagnostics for verified checkpoints.
325    fn verified_checkpoint_diagnostics(&self, verified_height: impl Into<Option<block::Height>>) {
326        let Some(verified_height) = verified_height.into() else {
327            // We don't know if we have already finished, or haven't started yet,
328            // so don't register any progress
329            return;
330        };
331
332        metrics::gauge!("checkpoint.verified.height").set(verified_height.0 as f64);
333
334        let checkpoint_index = self.checkpoint_list.prev_checkpoint_index(verified_height);
335        let checkpoint_count = self.checkpoint_list.len();
336
337        metrics::gauge!("checkpoint.verified.count").set(checkpoint_index as f64);
338
339        tracing::debug!(
340            ?verified_height,
341            ?checkpoint_index,
342            ?checkpoint_count,
343            "verified checkpoint",
344        );
345
346        #[cfg(feature = "progress-bar")]
347        if matches!(howudoin::cancelled(), Some(true)) {
348            self.finish_diagnostics();
349        } else {
350            self.verified_checkpoint_bar
351                .set_pos(u64::try_from(checkpoint_index).expect("fits in u64"))
352                .set_len(u64::try_from(checkpoint_count).expect("fits in u64"));
353        }
354    }
355
356    /// Finish checkpoint verifier diagnostics.
357    fn finish_diagnostics(&self) {
358        #[cfg(feature = "progress-bar")]
359        {
360            self.queued_blocks_bar.close();
361            self.verified_checkpoint_bar.close();
362        }
363    }
364
365    /// Reset the verifier progress back to given tip.
366    fn reset_progress(&mut self, tip: Option<(block::Height, block::Hash)>) {
367        let (initial_tip_hash, verifier_progress) = progress_from_tip(&self.checkpoint_list, tip);
368        self.initial_tip_hash = initial_tip_hash;
369        self.verifier_progress = verifier_progress;
370
371        self.verified_checkpoint_diagnostics(verifier_progress.height());
372    }
373
374    /// Return the current verifier's progress.
375    ///
376    /// If verification has not started yet, returns `BeforeGenesis`,
377    /// or `InitialTip(height)` if there were cached verified blocks.
378    ///
379    /// If verification is ongoing, returns `PreviousCheckpoint(height)`.
380    /// `height` increases as checkpoints are verified.
381    ///
382    /// If verification has finished, returns `FinalCheckpoint`.
383    fn previous_checkpoint_height(&self) -> Progress<block::Height> {
384        self.verifier_progress
385    }
386
387    /// Return the start of the current checkpoint range.
388    ///
389    /// Returns None if verification has finished.
390    fn current_start_bound(&self) -> Option<Bound<block::Height>> {
391        match self.previous_checkpoint_height() {
392            BeforeGenesis => Some(Unbounded),
393            InitialTip(height) | PreviousCheckpoint(height) => Some(Excluded(height)),
394            FinalCheckpoint => None,
395        }
396    }
397
398    /// Return the target checkpoint height that we want to verify.
399    ///
400    /// If we need more blocks, returns `WaitingForBlocks`.
401    ///
402    /// If the queued blocks are continuous from the previous checkpoint to a
403    /// target checkpoint, returns `Checkpoint(height)`. The target checkpoint
404    /// can be multiple checkpoints ahead of the previous checkpoint.
405    ///
406    /// `height` increases as checkpoints are verified.
407    ///
408    /// If verification has finished, returns `FinishedVerifying`.
409    fn target_checkpoint_height(&self) -> TargetHeight {
410        // Find the height we want to start searching at
411        let start_height = match self.previous_checkpoint_height() {
412            // Check if we have the genesis block as a special case, to simplify the loop
413            BeforeGenesis if !self.queued.contains_key(&block::Height(0)) => {
414                tracing::trace!("Waiting for genesis block");
415                metrics::counter!("checkpoint.waiting.count").increment(1);
416                return WaitingForBlocks;
417            }
418            BeforeGenesis => block::Height(0),
419            InitialTip(height) | PreviousCheckpoint(height) => height,
420            FinalCheckpoint => return FinishedVerifying,
421        };
422
423        // Find the end of the continuous sequence of blocks, starting at the
424        // last verified checkpoint. If there is no verified checkpoint, start
425        // *after* the genesis block (which we checked above).
426        //
427        // If `btree_map::Range` implements `ExactSizeIterator`, it would be
428        // much faster to walk the checkpoint list, and compare the length of
429        // the `btree_map::Range` to the block height difference between
430        // checkpoints. (In maps, keys are unique, so we don't need to check
431        // each height value.)
432        //
433        // But at the moment, this implementation is slightly faster, because
434        // it stops after the first gap.
435        let mut pending_height = start_height;
436        for (&height, _) in self.queued.range((Excluded(pending_height), Unbounded)) {
437            // If the queued blocks are continuous.
438            if height == block::Height(pending_height.0 + 1) {
439                pending_height = height;
440            } else {
441                let gap = height.0 - pending_height.0;
442                // Try to log a useful message when checkpointing has issues
443                tracing::trace!(contiguous_height = ?pending_height,
444                                next_height = ?height,
445                                ?gap,
446                                "Waiting for more checkpoint blocks");
447                break;
448            }
449        }
450        metrics::gauge!("checkpoint.queued.continuous.height").set(pending_height.0 as f64);
451
452        // Now find the start of the checkpoint range
453        let start = self.current_start_bound().expect(
454            "if verification has finished, we should have returned earlier in the function",
455        );
456        // Find the highest checkpoint below pending_height, excluding any
457        // previously verified checkpoints
458        let target_checkpoint = self
459            .checkpoint_list
460            .max_height_in_range((start, Included(pending_height)));
461
462        tracing::trace!(
463            checkpoint_start = ?start,
464            highest_contiguous_block = ?pending_height,
465            ?target_checkpoint
466        );
467
468        if let Some(block::Height(target_checkpoint)) = target_checkpoint {
469            metrics::gauge!("checkpoint.processing.next.height").set(target_checkpoint as f64);
470        } else {
471            // Use the start height if there is no potential next checkpoint
472            metrics::gauge!("checkpoint.processing.next.height").set(start_height.0 as f64);
473            metrics::counter!("checkpoint.waiting.count").increment(1);
474        }
475
476        target_checkpoint
477            .map(Checkpoint)
478            .unwrap_or(WaitingForBlocks)
479    }
480
481    /// Return the most recently verified checkpoint's hash.
482    ///
483    /// See `previous_checkpoint_height()` for details.
484    fn previous_checkpoint_hash(&self) -> Progress<block::Hash> {
485        match self.previous_checkpoint_height() {
486            BeforeGenesis => BeforeGenesis,
487            InitialTip(_) => self
488                .initial_tip_hash
489                .map(InitialTip)
490                .expect("initial tip height must have an initial tip hash"),
491            PreviousCheckpoint(height) => self
492                .checkpoint_list
493                .hash(height)
494                .map(PreviousCheckpoint)
495                .expect("every checkpoint height must have a hash"),
496            FinalCheckpoint => FinalCheckpoint,
497        }
498    }
499
500    /// Check that `height` is valid and able to be verified.
501    ///
502    /// Returns an error if:
503    ///  - the block's height is greater than the maximum checkpoint
504    ///  - there are no checkpoints
505    ///  - the block's height is less than or equal to the previously verified
506    ///    checkpoint
507    ///  - verification has finished
508    fn check_height(&self, height: block::Height) -> Result<(), VerifyCheckpointError> {
509        if height > self.checkpoint_list.max_height() {
510            Err(VerifyCheckpointError::TooHigh {
511                height,
512                max_height: self.checkpoint_list.max_height(),
513            })?;
514        }
515
516        match self.previous_checkpoint_height() {
517            // Any height is valid
518            BeforeGenesis => {}
519            // Greater heights are valid
520            InitialTip(previous_height) | PreviousCheckpoint(previous_height)
521                if (height <= previous_height) =>
522            {
523                let e = Err(VerifyCheckpointError::AlreadyVerified {
524                    height,
525                    verified_height: previous_height,
526                });
527                tracing::trace!(?e);
528                e?;
529            }
530            InitialTip(_) | PreviousCheckpoint(_) => {}
531            // We're finished, so no checkpoint height is valid
532            FinalCheckpoint => Err(VerifyCheckpointError::Finished)?,
533        };
534
535        Ok(())
536    }
537
538    /// Increase the current checkpoint height to `verified_height`,
539    fn update_progress(&mut self, verified_height: block::Height) {
540        if let Some(max_height) = self.queued.keys().next_back() {
541            metrics::gauge!("checkpoint.queued.max.height").set(max_height.0 as f64);
542        } else {
543            // use f64::NAN as a sentinel value for "None", because 0 is a valid height
544            metrics::gauge!("checkpoint.queued.max.height").set(f64::NAN);
545        }
546        metrics::gauge!("checkpoint.queued_slots").set(self.queued.len() as f64);
547
548        // Ignore blocks that are below the previous checkpoint, or otherwise
549        // have invalid heights.
550        //
551        // We ignore out-of-order verification, such as:
552        //  - the height is less than the previous checkpoint height, or
553        //  - the previous checkpoint height is the maximum height (checkpoint verifies are finished),
554        // because futures might not resolve in height order.
555        if self.check_height(verified_height).is_err() {
556            return;
557        }
558
559        // Ignore heights that aren't checkpoint heights
560        if verified_height == self.checkpoint_list.max_height() {
561            self.verifier_progress = FinalCheckpoint;
562
563            tracing::info!(
564                final_checkpoint_height = ?verified_height,
565                "verified final checkpoint: starting full validation",
566            );
567
568            self.verified_checkpoint_diagnostics(verified_height);
569            self.finish_diagnostics();
570        } else if self.checkpoint_list.contains(verified_height) {
571            self.verifier_progress = PreviousCheckpoint(verified_height);
572            // We're done with the initial tip hash now
573            self.initial_tip_hash = None;
574
575            self.verified_checkpoint_diagnostics(verified_height);
576        }
577    }
578
579    /// Check that the block height, proof of work, and Merkle root are valid.
580    ///
581    /// Returns a [`CheckpointVerifiedBlock`] with precalculated block data.
582    ///
583    /// ## Security
584    ///
585    /// Checking the proof of work makes resource exhaustion attacks harder to
586    /// carry out, because malicious blocks require a valid proof of work.
587    ///
588    /// Checking the Merkle root ensures that the block hash binds the block
589    /// contents. To prevent malleability (CVE-2012-2459), we also need to check
590    /// whether the transaction hashes are unique.
591    fn check_block(
592        &self,
593        block: Arc<Block>,
594    ) -> Result<CheckpointVerifiedBlock, VerifyCheckpointError> {
595        let hash = block.hash();
596        let height = block
597            .coinbase_height()
598            .ok_or(VerifyCheckpointError::CoinbaseHeight { hash })?;
599        self.check_height(height)?;
600
601        if self.network.disable_pow() {
602            crate::block::check::difficulty_threshold_is_valid(
603                &block.header,
604                &self.network,
605                &height,
606                &hash,
607            )?;
608        } else {
609            crate::block::check::difficulty_is_valid(&block.header, &self.network, &height, &hash)?;
610            crate::block::check::equihash_solution_is_valid(&block.header)?;
611        }
612
613        // We can't get the block subsidy for blocks with heights in the slow start interval, so we
614        // omit the calculation of the expected deferred amount.
615        let expected_deferred_amount = if height > self.network.slow_start_interval() {
616            // See [ZIP-1015](https://zips.z.cash/zip-1015).
617            funding_stream_values(height, &self.network, block_subsidy(height, &self.network)?)?
618                .remove(&FundingStreamReceiver::Deferred)
619        } else {
620            None
621        };
622
623        // don't do precalculation until the block passes basic difficulty checks
624        let block = CheckpointVerifiedBlock::new(block, Some(hash), expected_deferred_amount);
625
626        crate::block::check::merkle_root_validity(
627            &self.network,
628            &block.block,
629            &block.transaction_hashes,
630        )?;
631
632        Ok(block)
633    }
634
635    /// Queue `block` for verification.
636    ///
637    /// On success, returns a [`RequestBlock`] containing the block,
638    /// precalculated request data, and the queued result receiver.
639    ///
640    /// Verification will finish when the chain to the next checkpoint is
641    /// complete, and the caller will be notified via the channel.
642    ///
643    /// If the block does not pass basic validity checks,
644    /// returns an error immediately.
645    #[allow(clippy::unwrap_in_result)]
646    fn queue_block(&mut self, block: Arc<Block>) -> Result<RequestBlock, VerifyCheckpointError> {
647        // Set up a oneshot channel to send results
648        let (tx, rx) = oneshot::channel();
649
650        // Check that the height and Merkle roots are valid.
651        let block = self.check_block(block)?;
652        let height = block.height;
653        let hash = block.hash;
654
655        let new_qblock = QueuedBlock {
656            block: block.clone(),
657            tx,
658        };
659        let req_block = RequestBlock { block, rx };
660
661        // Since we're using Arc<Block>, each entry is a single pointer to the
662        // Arc. But there are a lot of QueuedBlockLists in the queue, so we keep
663        // allocations as small as possible.
664        let qblocks = self
665            .queued
666            .entry(height)
667            .or_insert_with(|| QueuedBlockList::with_capacity(1));
668
669        // Replace older requests with newer ones.
670        // The newer block is ok, the older block is an error.
671        for qb in qblocks.iter_mut() {
672            if qb.block.hash == hash {
673                let e = VerifyCheckpointError::NewerRequest { height, hash };
674                tracing::trace!(?e, "failing older of duplicate requests");
675
676                // ## Security
677                //
678                // Replace the entire queued block.
679                //
680                // We don't check the authorizing data hash until checkpoint blocks reach the state.
681                // So signatures, proofs, or scripts could be different,
682                // even if the block hash is the same.
683
684                let old = std::mem::replace(qb, new_qblock);
685                let _ = old.tx.send(Err(e));
686                return Ok(req_block);
687            }
688        }
689
690        // Memory DoS resistance: limit the queued blocks at each height
691        if qblocks.len() >= MAX_QUEUED_BLOCKS_PER_HEIGHT {
692            let e = VerifyCheckpointError::QueuedLimit;
693            tracing::warn!(?e);
694            return Err(e);
695        }
696
697        // Add the block to the list of queued blocks at this height
698        // This is a no-op for the first block in each QueuedBlockList.
699        qblocks.reserve_exact(1);
700        qblocks.push(new_qblock);
701
702        self.queued_block_diagnostics(height, hash);
703
704        Ok(req_block)
705    }
706
707    /// During checkpoint range processing, process all the blocks at `height`.
708    ///
709    /// Returns the first valid block. If there is no valid block, returns None.
710    #[allow(clippy::unwrap_in_result)]
711    fn process_height(
712        &mut self,
713        height: block::Height,
714        expected_hash: block::Hash,
715    ) -> Option<QueuedBlock> {
716        let mut qblocks = self
717            .queued
718            .remove(&height)
719            .expect("the current checkpoint range has continuous Vec<QueuedBlock>s");
720        assert!(
721            !qblocks.is_empty(),
722            "the current checkpoint range has continuous Blocks"
723        );
724
725        // Check interim checkpoints
726        if let Some(checkpoint_hash) = self.checkpoint_list.hash(height) {
727            // We assume the checkpoints are valid. And we have verified back
728            // from the target checkpoint, so the last block must also be valid.
729            // This is probably a bad checkpoint list, a zebra bug, or a bad
730            // chain (in a testing mode like regtest).
731            assert_eq!(expected_hash, checkpoint_hash,
732                           "checkpoints in the range should match: bad checkpoint list, zebra bug, or bad chain"
733                );
734        }
735
736        // Find a queued block at this height, which is part of the hash chain.
737        //
738        // There are two possible outcomes here:
739        //   - one of the blocks matches the chain (the common case)
740        //   - no blocks match the chain, verification has failed for this range
741        // If there are any side-chain blocks, they fail validation.
742        let mut valid_qblock = None;
743        for qblock in qblocks.drain(..) {
744            if qblock.block.hash == expected_hash {
745                if valid_qblock.is_none() {
746                    // The first valid block at the current height
747                    valid_qblock = Some(qblock);
748                } else {
749                    unreachable!("unexpected duplicate block {:?} {:?}: duplicate blocks should be rejected before being queued",
750                                 height, qblock.block.hash);
751                }
752            } else {
753                tracing::info!(?height, ?qblock.block.hash, ?expected_hash,
754                               "Side chain hash at height in CheckpointVerifier");
755                let _ = qblock
756                    .tx
757                    .send(Err(VerifyCheckpointError::UnexpectedSideChain {
758                        found: qblock.block.hash,
759                        expected: expected_hash,
760                    }));
761            }
762        }
763
764        valid_qblock
765    }
766
767    /// Try to verify from the previous checkpoint to a target checkpoint.
768    ///
769    /// Send `Ok` for the blocks that are in the chain, and `Err` for side-chain
770    /// blocks.
771    ///
772    /// Does nothing if we are waiting for more blocks, or if verification has
773    /// finished.
774    fn process_checkpoint_range(&mut self) {
775        // If this code shows up in profiles, we can try the following
776        // optimisations:
777        //   - only check the chain when the length of the queue is greater
778        //     than or equal to the length of a checkpoint interval
779        //     (note: the genesis checkpoint interval is only one block long)
780        //   - cache the height of the last continuous chain as a new field in
781        //     self, and start at that height during the next check.
782
783        // Return early if verification has finished
784        let previous_checkpoint_hash = match self.previous_checkpoint_hash() {
785            // Since genesis blocks are hard-coded in zcashd, and not verified
786            // like other blocks, the genesis parent hash is set by the
787            // consensus parameters.
788            BeforeGenesis => GENESIS_PREVIOUS_BLOCK_HASH,
789            InitialTip(hash) | PreviousCheckpoint(hash) => hash,
790            FinalCheckpoint => return,
791        };
792        // Return early if we're still waiting for more blocks
793        let (target_checkpoint_height, mut expected_hash) = match self.target_checkpoint_height() {
794            Checkpoint(height) => (
795                height,
796                self.checkpoint_list
797                    .hash(height)
798                    .expect("every checkpoint height must have a hash"),
799            ),
800            WaitingForBlocks => {
801                return;
802            }
803            FinishedVerifying => {
804                unreachable!("the FinalCheckpoint case should have returned earlier")
805            }
806        };
807
808        // Keep the old previous checkpoint height, to make sure we're making
809        // progress
810        let old_prev_check_height = self.previous_checkpoint_height();
811
812        // Work out which blocks and checkpoints we're checking
813        let current_range = (
814            self.current_start_bound()
815                .expect("earlier code checks if verification has finished"),
816            Included(target_checkpoint_height),
817        );
818        let range_heights: Vec<block::Height> = self
819            .queued
820            .range_mut(current_range)
821            .rev()
822            .map(|(key, _)| *key)
823            .collect();
824        // A list of pending valid blocks, in reverse chain order
825        let mut rev_valid_blocks = Vec::new();
826
827        // Check all the blocks, and discard all the bad blocks
828        for current_height in range_heights {
829            let valid_qblock = self.process_height(current_height, expected_hash);
830            if let Some(qblock) = valid_qblock {
831                expected_hash = qblock.block.block.header.previous_block_hash;
832                // Add the block to the end of the pending block list
833                // (since we're walking the chain backwards, the list is
834                // in reverse chain order)
835                rev_valid_blocks.push(qblock);
836            } else {
837                // The last block height we processed did not have any blocks
838                // with a matching hash, so chain verification has failed.
839                tracing::info!(
840                    ?current_height,
841                    ?current_range,
842                    "No valid blocks at height in CheckpointVerifier"
843                );
844
845                // We kept all the matching blocks down to this height, in
846                // anticipation of the chain verifying. But the chain is
847                // incomplete, so we have to put them back in the queue.
848                //
849                // The order here shouldn't matter, but add the blocks in
850                // height order, for consistency.
851                for vblock in rev_valid_blocks.drain(..).rev() {
852                    self.queued
853                        .entry(vblock.block.height)
854                        .or_default()
855                        .push(vblock);
856                }
857
858                // Make sure the current progress hasn't changed
859                assert_eq!(
860                    self.previous_checkpoint_height(),
861                    old_prev_check_height,
862                    "we must not change the previous checkpoint on failure"
863                );
864                // We've reduced the target
865                //
866                // This check should be cheap, because we just reduced the target
867                let current_target = self.target_checkpoint_height();
868                assert!(
869                    current_target == WaitingForBlocks
870                        || current_target < Checkpoint(target_checkpoint_height),
871                    "we must decrease or eliminate our target on failure"
872                );
873
874                // Stop verifying, and wait for the next valid block
875                return;
876            }
877        }
878
879        // The checkpoint and the parent hash must match.
880        // See the detailed checkpoint comparison comment above.
881        assert_eq!(
882            expected_hash, previous_checkpoint_hash,
883            "the previous checkpoint should match: bad checkpoint list, zebra bug, or bad chain"
884        );
885
886        let block_count = rev_valid_blocks.len();
887        tracing::info!(?block_count, ?current_range, "verified checkpoint range");
888        metrics::counter!("checkpoint.verified.block.count").increment(block_count as u64);
889
890        // All the blocks we've kept are valid, so let's verify them
891        // in height order.
892        for qblock in rev_valid_blocks.drain(..).rev() {
893            // Sending can fail, but there's nothing we can do about it.
894            let _ = qblock.tx.send(Ok(qblock.block.hash));
895        }
896
897        // Finally, update the checkpoint bounds
898        self.update_progress(target_checkpoint_height);
899
900        // Ensure that we're making progress
901        let new_progress = self.previous_checkpoint_height();
902        assert!(
903            new_progress > old_prev_check_height,
904            "we must make progress on success"
905        );
906        // We met the old target
907        if new_progress == FinalCheckpoint {
908            assert_eq!(
909                target_checkpoint_height,
910                self.checkpoint_list.max_height(),
911                "we finish at the maximum checkpoint"
912            );
913        } else {
914            assert_eq!(
915                new_progress,
916                PreviousCheckpoint(target_checkpoint_height),
917                "the new previous checkpoint must match the old target"
918            );
919        }
920        // We processed all available checkpoints
921        //
922        // We've cleared the target range, so this check should be cheap
923        let new_target = self.target_checkpoint_height();
924        assert!(
925            new_target == WaitingForBlocks || new_target == FinishedVerifying,
926            "processing must cover all available checkpoints"
927        );
928    }
929}
930
931/// CheckpointVerifier rejects pending futures on drop.
932impl<S> Drop for CheckpointVerifier<S>
933where
934    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
935    S::Future: Send + 'static,
936{
937    /// Send an error on `tx` for any `QueuedBlock`s that haven't been verified.
938    ///
939    /// We can't implement `Drop` on QueuedBlock, because `send()` consumes
940    /// `tx`. And `tx` doesn't implement `Copy` or `Default` (for `take()`).
941    fn drop(&mut self) {
942        self.finish_diagnostics();
943
944        let drop_keys: Vec<_> = self.queued.keys().cloned().collect();
945        for key in drop_keys {
946            let mut qblocks = self
947                .queued
948                .remove(&key)
949                .expect("each entry is only removed once");
950            for qblock in qblocks.drain(..) {
951                // Sending can fail, but there's nothing we can do about it.
952                let _ = qblock.tx.send(Err(VerifyCheckpointError::Dropped));
953            }
954        }
955    }
956}
957
958#[derive(Debug, Error)]
959#[allow(missing_docs)]
960pub enum VerifyCheckpointError {
961    #[error("checkpoint request after the final checkpoint has been verified")]
962    Finished,
963    #[error("block at {height:?} is higher than the maximum checkpoint {max_height:?}")]
964    TooHigh {
965        height: block::Height,
966        max_height: block::Height,
967    },
968    #[error("block {height:?} is less than or equal to the verified tip {verified_height:?}")]
969    AlreadyVerified {
970        height: block::Height,
971        verified_height: block::Height,
972    },
973    #[error("rejected older of duplicate verification requests for block at {height:?} {hash:?}")]
974    NewerRequest {
975        height: block::Height,
976        hash: block::Hash,
977    },
978    #[error("the block {hash:?} does not have a coinbase height")]
979    CoinbaseHeight { hash: block::Hash },
980    #[error("merkle root {actual:?} does not match expected {expected:?}")]
981    BadMerkleRoot {
982        actual: block::merkle::Root,
983        expected: block::merkle::Root,
984    },
985    #[error("duplicate transactions in block")]
986    DuplicateTransaction,
987    #[error("checkpoint verifier was dropped")]
988    Dropped,
989    #[error(transparent)]
990    CommitCheckpointVerified(BoxError),
991    #[error(transparent)]
992    Tip(BoxError),
993    #[error(transparent)]
994    CheckpointList(BoxError),
995    #[error(transparent)]
996    VerifyBlock(VerifyBlockError),
997    #[error("invalid block subsidy: {0}")]
998    SubsidyError(#[from] SubsidyError),
999    #[error("invalid amount: {0}")]
1000    AmountError(#[from] amount::Error),
1001    #[error("too many queued blocks at this height")]
1002    QueuedLimit,
1003    #[error("the block hash does not match the chained checkpoint hash, expected {expected:?} found {found:?}")]
1004    UnexpectedSideChain {
1005        expected: block::Hash,
1006        found: block::Hash,
1007    },
1008    #[error("zebra is shutting down")]
1009    ShuttingDown,
1010}
1011
1012impl From<VerifyBlockError> for VerifyCheckpointError {
1013    fn from(err: VerifyBlockError) -> VerifyCheckpointError {
1014        VerifyCheckpointError::VerifyBlock(err)
1015    }
1016}
1017
1018impl From<BlockError> for VerifyCheckpointError {
1019    fn from(err: BlockError) -> VerifyCheckpointError {
1020        VerifyCheckpointError::VerifyBlock(err.into())
1021    }
1022}
1023
1024impl From<equihash::Error> for VerifyCheckpointError {
1025    fn from(err: equihash::Error) -> VerifyCheckpointError {
1026        VerifyCheckpointError::VerifyBlock(err.into())
1027    }
1028}
1029
1030impl VerifyCheckpointError {
1031    /// Returns `true` if this is definitely a duplicate request.
1032    /// Some duplicate requests might not be detected, and therefore return `false`.
1033    pub fn is_duplicate_request(&self) -> bool {
1034        match self {
1035            VerifyCheckpointError::AlreadyVerified { .. } => true,
1036            // TODO: make this duplicate-incomplete
1037            VerifyCheckpointError::NewerRequest { .. } => true,
1038            VerifyCheckpointError::VerifyBlock(block_error) => block_error.is_duplicate_request(),
1039            _ => false,
1040        }
1041    }
1042
1043    /// Returns a suggested misbehaviour score increment for a certain error.
1044    pub fn misbehavior_score(&self) -> u32 {
1045        // TODO: Adjust these values based on zcashd (#9258).
1046        match self {
1047            VerifyCheckpointError::VerifyBlock(verify_block_error) => {
1048                verify_block_error.misbehavior_score()
1049            }
1050            VerifyCheckpointError::SubsidyError(_)
1051            | VerifyCheckpointError::CoinbaseHeight { .. }
1052            | VerifyCheckpointError::DuplicateTransaction
1053            | VerifyCheckpointError::AmountError(_) => 100,
1054            _other => 0,
1055        }
1056    }
1057}
1058
1059/// The CheckpointVerifier service implementation.
1060///
1061/// After verification, the block futures resolve to their hashes.
1062impl<S> Service<Arc<Block>> for CheckpointVerifier<S>
1063where
1064    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
1065    S::Future: Send + 'static,
1066{
1067    type Response = block::Hash;
1068    type Error = VerifyCheckpointError;
1069    type Future =
1070        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1071
1072    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1073        Poll::Ready(Ok(()))
1074    }
1075
1076    #[instrument(name = "checkpoint", skip(self, block))]
1077    fn call(&mut self, block: Arc<Block>) -> Self::Future {
1078        // Reset the verifier back to the state tip if requested
1079        // (e.g. due to an error when committing a block to the state)
1080        if let Ok(tip) = self.reset_receiver.try_recv() {
1081            self.reset_progress(tip);
1082        }
1083
1084        // Immediately reject all incoming blocks that arrive after we've finished.
1085        if let FinalCheckpoint = self.previous_checkpoint_height() {
1086            return async { Err(VerifyCheckpointError::Finished) }.boxed();
1087        }
1088
1089        let req_block = match self.queue_block(block) {
1090            Ok(req_block) => req_block,
1091            Err(e) => return async { Err(e) }.boxed(),
1092        };
1093
1094        self.process_checkpoint_range();
1095
1096        metrics::gauge!("checkpoint.queued_slots").set(self.queued.len() as f64);
1097
1098        // Because the checkpoint verifier duplicates state from the state
1099        // service (it tracks which checkpoints have been verified), we must
1100        // commit blocks transactionally on a per-checkpoint basis. Otherwise,
1101        // the checkpoint verifier's state could desync from the underlying
1102        // state service. Among other problems, this could cause the checkpoint
1103        // verifier to reject blocks not already in the state as
1104        // already-verified.
1105        //
1106        // # Dropped Receivers
1107        //
1108        // To commit blocks transactionally on a per-checkpoint basis, we must
1109        // commit all verified blocks in a checkpoint range, regardless of
1110        // whether or not the response futures for each block were dropped.
1111        //
1112        // We accomplish this by spawning a new task containing the
1113        // commit-if-verified logic. This task will always execute, except if
1114        // the program is interrupted, in which case there is no longer a
1115        // checkpoint verifier to keep in sync with the state.
1116        //
1117        // # State Commit Failures
1118        //
1119        // If the state commit fails due to corrupt block data,
1120        // we don't reject the entire checkpoint.
1121        // Instead, we reset the verifier to the successfully committed state tip.
1122        let state_service = self.state_service.clone();
1123        let commit_checkpoint_verified = tokio::spawn(async move {
1124            let hash = req_block
1125                .rx
1126                .await
1127                .map_err(Into::into)
1128                .map_err(VerifyCheckpointError::CommitCheckpointVerified)
1129                .expect("CheckpointVerifier does not leave dangling receivers")?;
1130
1131            // We use a `ServiceExt::oneshot`, so that every state service
1132            // `poll_ready` has a corresponding `call`. See #1593.
1133            match state_service
1134                .oneshot(zs::Request::CommitCheckpointVerifiedBlock(req_block.block))
1135                .map_err(VerifyCheckpointError::CommitCheckpointVerified)
1136                .await?
1137            {
1138                zs::Response::Committed(committed_hash) => {
1139                    assert_eq!(committed_hash, hash, "state must commit correct hash");
1140                    Ok(hash)
1141                }
1142                _ => unreachable!("wrong response for CommitCheckpointVerifiedBlock"),
1143            }
1144        });
1145
1146        let state_service = self.state_service.clone();
1147        let reset_sender = self.reset_sender.clone();
1148        async move {
1149            let result = commit_checkpoint_verified.await;
1150            // Avoid a panic on shutdown
1151            //
1152            // When `zebrad` is terminated using Ctrl-C, the `commit_checkpoint_verified` task
1153            // can return a `JoinError::Cancelled`. We expect task cancellation on shutdown,
1154            // so we don't need to panic here. The persistent state is correct even when the
1155            // task is cancelled, because block data is committed inside transactions, in
1156            // height order.
1157            let result = if zebra_chain::shutdown::is_shutting_down() {
1158                Err(VerifyCheckpointError::ShuttingDown)
1159            } else {
1160                result.expect("commit_checkpoint_verified should not panic")
1161            };
1162            if result.is_err() {
1163                // If there was an error committing the block, then this verifier
1164                // will be out of sync with the state. In that case, reset
1165                // its progress back to the state tip.
1166                let tip = match state_service
1167                    .oneshot(zs::Request::Tip)
1168                    .await
1169                    .map_err(VerifyCheckpointError::Tip)?
1170                {
1171                    zs::Response::Tip(tip) => tip,
1172                    _ => unreachable!("wrong response for Tip"),
1173                };
1174                // Ignore errors since send() can fail only when the verifier
1175                // is being dropped, and then it doesn't matter anymore.
1176                let _ = reset_sender.send(tip);
1177            }
1178            result
1179        }
1180        .boxed()
1181    }
1182}