1use std::{
17    collections::BTreeMap,
18    ops::{Bound, Bound::*},
19    pin::Pin,
20    sync::{mpsc, Arc},
21    task::{Context, Poll},
22};
23
24use futures::{Future, FutureExt, TryFutureExt};
25use thiserror::Error;
26use tokio::sync::oneshot;
27use tower::{Service, ServiceExt};
28use tracing::instrument;
29
30use zebra_chain::{
31    amount::{self, DeferredPoolBalanceChange},
32    block::{self, Block},
33    parameters::{
34        checkpoint::list::CheckpointList,
35        subsidy::{block_subsidy, funding_stream_values, FundingStreamReceiver, SubsidyError},
36        Network, GENESIS_PREVIOUS_BLOCK_HASH,
37    },
38    work::equihash,
39};
40use zebra_state::{self as zs, CheckpointVerifiedBlock};
41
42use crate::{
43    block::VerifyBlockError,
44    checkpoint::types::{
45        Progress::{self, *},
46        TargetHeight::{self, *},
47    },
48    error::BlockError,
49    BoxError,
50};
51
52mod types;
53
54#[cfg(test)]
55mod tests;
56
57pub use zebra_node_services::constants::{MAX_CHECKPOINT_BYTE_COUNT, MAX_CHECKPOINT_HEIGHT_GAP};
58
59#[derive(Debug)]
61struct QueuedBlock {
62    block: CheckpointVerifiedBlock,
64    tx: oneshot::Sender<Result<block::Hash, VerifyCheckpointError>>,
66}
67
68#[derive(Debug)]
70struct RequestBlock {
71    block: CheckpointVerifiedBlock,
73    rx: oneshot::Receiver<Result<block::Hash, VerifyCheckpointError>>,
75}
76
77type QueuedBlockList = Vec<QueuedBlock>;
84
85pub const MAX_QUEUED_BLOCKS_PER_HEIGHT: usize = 4;
96
97fn progress_from_tip(
99    checkpoint_list: &CheckpointList,
100    tip: Option<(block::Height, block::Hash)>,
101) -> (Option<block::Hash>, Progress<block::Height>) {
102    match tip {
103        Some((height, hash)) => {
104            if height >= checkpoint_list.max_height() {
105                (None, Progress::FinalCheckpoint)
106            } else {
107                metrics::gauge!("checkpoint.verified.height").set(height.0 as f64);
108                metrics::gauge!("checkpoint.processing.next.height").set(height.0 as f64);
109                (Some(hash), Progress::InitialTip(height))
110            }
111        }
112        None => (None, Progress::BeforeGenesis),
114    }
115}
116
117pub struct CheckpointVerifier<S>
122where
123    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
124    S::Future: Send + 'static,
125{
126    checkpoint_list: Arc<CheckpointList>,
128
129    network: Network,
131
132    initial_tip_hash: Option<block::Hash>,
134
135    state_service: S,
137
138    queued: BTreeMap<block::Height, QueuedBlockList>,
149
150    verifier_progress: Progress<block::Height>,
152
153    reset_receiver: mpsc::Receiver<Option<(block::Height, block::Hash)>>,
156    reset_sender: mpsc::Sender<Option<(block::Height, block::Hash)>>,
159
160    #[cfg(feature = "progress-bar")]
162    queued_blocks_bar: howudoin::Tx,
163
164    #[cfg(feature = "progress-bar")]
166    verified_checkpoint_bar: howudoin::Tx,
167}
168
169impl<S> std::fmt::Debug for CheckpointVerifier<S>
170where
171    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
172    S::Future: Send + 'static,
173{
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        f.debug_struct("CheckpointVerifier")
176            .field("checkpoint_list", &self.checkpoint_list)
177            .field("network", &self.network)
178            .field("initial_tip_hash", &self.initial_tip_hash)
179            .field("queued", &self.queued)
180            .field("verifier_progress", &self.verifier_progress)
181            .finish()
182    }
183}
184
185impl<S> CheckpointVerifier<S>
186where
187    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
188    S::Future: Send + 'static,
189{
190    #[allow(dead_code)]
206    pub fn new(
207        network: &Network,
208        initial_tip: Option<(block::Height, block::Hash)>,
209        state_service: S,
210    ) -> Self {
211        let checkpoint_list = network.checkpoint_list();
212        let max_height = checkpoint_list.max_height();
213        tracing::info!(
214            ?max_height,
215            ?network,
216            ?initial_tip,
217            "initialising CheckpointVerifier"
218        );
219        Self::from_checkpoint_list(checkpoint_list, network, initial_tip, state_service)
220    }
221
222    #[allow(dead_code)]
234    pub(crate) fn from_list(
235        list: impl IntoIterator<Item = (block::Height, block::Hash)>,
236        network: &Network,
237        initial_tip: Option<(block::Height, block::Hash)>,
238        state_service: S,
239    ) -> Result<Self, VerifyCheckpointError> {
240        Ok(Self::from_checkpoint_list(
241            CheckpointList::from_list(list)
242                .map(Arc::new)
243                .map_err(VerifyCheckpointError::CheckpointList)?,
244            network,
245            initial_tip,
246            state_service,
247        ))
248    }
249
250    pub(crate) fn from_checkpoint_list(
258        checkpoint_list: Arc<CheckpointList>,
259        network: &Network,
260        initial_tip: Option<(block::Height, block::Hash)>,
261        state_service: S,
262    ) -> Self {
263        let (initial_tip_hash, verifier_progress) =
266            progress_from_tip(&checkpoint_list, initial_tip);
267
268        let (sender, receiver) = mpsc::channel();
269
270        #[cfg(feature = "progress-bar")]
271        let queued_blocks_bar = howudoin::new_root().label("Checkpoint Queue Height");
272
273        #[cfg(feature = "progress-bar")]
274        let verified_checkpoint_bar =
275            howudoin::new_with_parent(queued_blocks_bar.id()).label("Verified Checkpoints");
276
277        let verifier = CheckpointVerifier {
278            checkpoint_list,
279            network: network.clone(),
280            initial_tip_hash,
281            state_service,
282            queued: BTreeMap::new(),
283            verifier_progress,
284            reset_receiver: receiver,
285            reset_sender: sender,
286            #[cfg(feature = "progress-bar")]
287            queued_blocks_bar,
288            #[cfg(feature = "progress-bar")]
289            verified_checkpoint_bar,
290        };
291
292        if verifier_progress.is_final_checkpoint() {
293            verifier.finish_diagnostics();
294        } else {
295            verifier.verified_checkpoint_diagnostics(verifier_progress.height());
296        }
297
298        verifier
299    }
300
301    fn queued_block_diagnostics(&self, height: block::Height, hash: block::Hash) {
303        let max_queued_height = self
304            .queued
305            .keys()
306            .next_back()
307            .expect("queued has at least one entry");
308
309        metrics::gauge!("checkpoint.queued.max.height").set(max_queued_height.0 as f64);
310
311        let is_checkpoint = self.checkpoint_list.contains(height);
312        tracing::debug!(?height, ?hash, ?is_checkpoint, "queued block");
313
314        #[cfg(feature = "progress-bar")]
315        if matches!(howudoin::cancelled(), Some(true)) {
316            self.finish_diagnostics();
317        } else {
318            self.queued_blocks_bar
319                .set_pos(max_queued_height.0)
320                .set_len(u64::from(self.checkpoint_list.max_height().0));
321        }
322    }
323
324    fn verified_checkpoint_diagnostics(&self, verified_height: impl Into<Option<block::Height>>) {
326        let Some(verified_height) = verified_height.into() else {
327            return;
330        };
331
332        metrics::gauge!("checkpoint.verified.height").set(verified_height.0 as f64);
333
334        let checkpoint_index = self.checkpoint_list.prev_checkpoint_index(verified_height);
335        let checkpoint_count = self.checkpoint_list.len();
336
337        metrics::gauge!("checkpoint.verified.count").set(checkpoint_index as f64);
338
339        tracing::debug!(
340            ?verified_height,
341            ?checkpoint_index,
342            ?checkpoint_count,
343            "verified checkpoint",
344        );
345
346        #[cfg(feature = "progress-bar")]
347        if matches!(howudoin::cancelled(), Some(true)) {
348            self.finish_diagnostics();
349        } else {
350            self.verified_checkpoint_bar
351                .set_pos(u64::try_from(checkpoint_index).expect("fits in u64"))
352                .set_len(u64::try_from(checkpoint_count).expect("fits in u64"));
353        }
354    }
355
356    fn finish_diagnostics(&self) {
358        #[cfg(feature = "progress-bar")]
359        {
360            self.queued_blocks_bar.close();
361            self.verified_checkpoint_bar.close();
362        }
363    }
364
365    fn reset_progress(&mut self, tip: Option<(block::Height, block::Hash)>) {
367        let (initial_tip_hash, verifier_progress) = progress_from_tip(&self.checkpoint_list, tip);
368        self.initial_tip_hash = initial_tip_hash;
369        self.verifier_progress = verifier_progress;
370
371        self.verified_checkpoint_diagnostics(verifier_progress.height());
372    }
373
374    fn previous_checkpoint_height(&self) -> Progress<block::Height> {
384        self.verifier_progress
385    }
386
387    fn current_start_bound(&self) -> Option<Bound<block::Height>> {
391        match self.previous_checkpoint_height() {
392            BeforeGenesis => Some(Unbounded),
393            InitialTip(height) | PreviousCheckpoint(height) => Some(Excluded(height)),
394            FinalCheckpoint => None,
395        }
396    }
397
398    fn target_checkpoint_height(&self) -> TargetHeight {
410        let start_height = match self.previous_checkpoint_height() {
412            BeforeGenesis if !self.queued.contains_key(&block::Height(0)) => {
414                tracing::trace!("Waiting for genesis block");
415                metrics::counter!("checkpoint.waiting.count").increment(1);
416                return WaitingForBlocks;
417            }
418            BeforeGenesis => block::Height(0),
419            InitialTip(height) | PreviousCheckpoint(height) => height,
420            FinalCheckpoint => return FinishedVerifying,
421        };
422
423        let mut pending_height = start_height;
436        for (&height, _) in self.queued.range((Excluded(pending_height), Unbounded)) {
437            if height == block::Height(pending_height.0 + 1) {
439                pending_height = height;
440            } else {
441                let gap = height.0 - pending_height.0;
442                tracing::trace!(contiguous_height = ?pending_height,
444                                next_height = ?height,
445                                ?gap,
446                                "Waiting for more checkpoint blocks");
447                break;
448            }
449        }
450        metrics::gauge!("checkpoint.queued.continuous.height").set(pending_height.0 as f64);
451
452        let start = self.current_start_bound().expect(
454            "if verification has finished, we should have returned earlier in the function",
455        );
456        let target_checkpoint = self
459            .checkpoint_list
460            .max_height_in_range((start, Included(pending_height)));
461
462        tracing::trace!(
463            checkpoint_start = ?start,
464            highest_contiguous_block = ?pending_height,
465            ?target_checkpoint
466        );
467
468        if let Some(block::Height(target_checkpoint)) = target_checkpoint {
469            metrics::gauge!("checkpoint.processing.next.height").set(target_checkpoint as f64);
470        } else {
471            metrics::gauge!("checkpoint.processing.next.height").set(start_height.0 as f64);
473            metrics::counter!("checkpoint.waiting.count").increment(1);
474        }
475
476        target_checkpoint
477            .map(Checkpoint)
478            .unwrap_or(WaitingForBlocks)
479    }
480
481    fn previous_checkpoint_hash(&self) -> Progress<block::Hash> {
485        match self.previous_checkpoint_height() {
486            BeforeGenesis => BeforeGenesis,
487            InitialTip(_) => self
488                .initial_tip_hash
489                .map(InitialTip)
490                .expect("initial tip height must have an initial tip hash"),
491            PreviousCheckpoint(height) => self
492                .checkpoint_list
493                .hash(height)
494                .map(PreviousCheckpoint)
495                .expect("every checkpoint height must have a hash"),
496            FinalCheckpoint => FinalCheckpoint,
497        }
498    }
499
500    fn check_height(&self, height: block::Height) -> Result<(), VerifyCheckpointError> {
509        if height > self.checkpoint_list.max_height() {
510            Err(VerifyCheckpointError::TooHigh {
511                height,
512                max_height: self.checkpoint_list.max_height(),
513            })?;
514        }
515
516        match self.previous_checkpoint_height() {
517            BeforeGenesis => {}
519            InitialTip(previous_height) | PreviousCheckpoint(previous_height)
521                if (height <= previous_height) =>
522            {
523                let e = Err(VerifyCheckpointError::AlreadyVerified {
524                    height,
525                    verified_height: previous_height,
526                });
527                tracing::trace!(?e);
528                e?;
529            }
530            InitialTip(_) | PreviousCheckpoint(_) => {}
531            FinalCheckpoint => Err(VerifyCheckpointError::Finished)?,
533        };
534
535        Ok(())
536    }
537
538    fn update_progress(&mut self, verified_height: block::Height) {
540        if let Some(max_height) = self.queued.keys().next_back() {
541            metrics::gauge!("checkpoint.queued.max.height").set(max_height.0 as f64);
542        } else {
543            metrics::gauge!("checkpoint.queued.max.height").set(f64::NAN);
545        }
546        metrics::gauge!("checkpoint.queued_slots").set(self.queued.len() as f64);
547
548        if self.check_height(verified_height).is_err() {
556            return;
557        }
558
559        if verified_height == self.checkpoint_list.max_height() {
561            self.verifier_progress = FinalCheckpoint;
562
563            tracing::info!(
564                final_checkpoint_height = ?verified_height,
565                "verified final checkpoint: starting full validation",
566            );
567
568            self.verified_checkpoint_diagnostics(verified_height);
569            self.finish_diagnostics();
570        } else if self.checkpoint_list.contains(verified_height) {
571            self.verifier_progress = PreviousCheckpoint(verified_height);
572            self.initial_tip_hash = None;
574
575            self.verified_checkpoint_diagnostics(verified_height);
576        }
577    }
578
579    fn check_block(
592        &self,
593        block: Arc<Block>,
594    ) -> Result<CheckpointVerifiedBlock, VerifyCheckpointError> {
595        let hash = block.hash();
596        let height = block
597            .coinbase_height()
598            .ok_or(VerifyCheckpointError::CoinbaseHeight { hash })?;
599        self.check_height(height)?;
600
601        if self.network.disable_pow() {
602            crate::block::check::difficulty_threshold_is_valid(
603                &block.header,
604                &self.network,
605                &height,
606                &hash,
607            )?;
608        } else {
609            crate::block::check::difficulty_is_valid(&block.header, &self.network, &height, &hash)?;
610            crate::block::check::equihash_solution_is_valid(&block.header)?;
611        }
612
613        let expected_deferred_amount = if height > self.network.slow_start_interval() {
616            funding_stream_values(height, &self.network, block_subsidy(height, &self.network)?)?
618                .remove(&FundingStreamReceiver::Deferred)
619        } else {
620            None
621        };
622
623        let deferred_pool_balance_change = expected_deferred_amount
624            .unwrap_or_default()
625            .checked_sub(self.network.lockbox_disbursement_total_amount(height))
626            .map(DeferredPoolBalanceChange::new);
627
628        let block = CheckpointVerifiedBlock::new(block, Some(hash), deferred_pool_balance_change);
630
631        crate::block::check::merkle_root_validity(
632            &self.network,
633            &block.block,
634            &block.transaction_hashes,
635        )?;
636
637        Ok(block)
638    }
639
640    #[allow(clippy::unwrap_in_result)]
651    fn queue_block(&mut self, block: Arc<Block>) -> Result<RequestBlock, VerifyCheckpointError> {
652        let (tx, rx) = oneshot::channel();
654
655        let block = self.check_block(block)?;
657        let height = block.height;
658        let hash = block.hash;
659
660        let new_qblock = QueuedBlock {
661            block: block.clone(),
662            tx,
663        };
664        let req_block = RequestBlock { block, rx };
665
666        let qblocks = self
670            .queued
671            .entry(height)
672            .or_insert_with(|| QueuedBlockList::with_capacity(1));
673
674        for qb in qblocks.iter_mut() {
677            if qb.block.hash == hash {
678                let e = VerifyCheckpointError::NewerRequest { height, hash };
679                tracing::trace!(?e, "failing older of duplicate requests");
680
681                let old = std::mem::replace(qb, new_qblock);
690                let _ = old.tx.send(Err(e));
691                return Ok(req_block);
692            }
693        }
694
695        if qblocks.len() >= MAX_QUEUED_BLOCKS_PER_HEIGHT {
697            let e = VerifyCheckpointError::QueuedLimit;
698            tracing::warn!(?e);
699            return Err(e);
700        }
701
702        qblocks.reserve_exact(1);
705        qblocks.push(new_qblock);
706
707        self.queued_block_diagnostics(height, hash);
708
709        Ok(req_block)
710    }
711
712    #[allow(clippy::unwrap_in_result)]
716    fn process_height(
717        &mut self,
718        height: block::Height,
719        expected_hash: block::Hash,
720    ) -> Option<QueuedBlock> {
721        let mut qblocks = self
722            .queued
723            .remove(&height)
724            .expect("the current checkpoint range has continuous Vec<QueuedBlock>s");
725        assert!(
726            !qblocks.is_empty(),
727            "the current checkpoint range has continuous Blocks"
728        );
729
730        if let Some(checkpoint_hash) = self.checkpoint_list.hash(height) {
732            assert_eq!(expected_hash, checkpoint_hash,
737                           "checkpoints in the range should match: bad checkpoint list, zebra bug, or bad chain"
738                );
739        }
740
741        let mut valid_qblock = None;
748        for qblock in qblocks.drain(..) {
749            if qblock.block.hash == expected_hash {
750                if valid_qblock.is_none() {
751                    valid_qblock = Some(qblock);
753                } else {
754                    unreachable!("unexpected duplicate block {:?} {:?}: duplicate blocks should be rejected before being queued",
755                                 height, qblock.block.hash);
756                }
757            } else {
758                tracing::info!(?height, ?qblock.block.hash, ?expected_hash,
759                               "Side chain hash at height in CheckpointVerifier");
760                let _ = qblock
761                    .tx
762                    .send(Err(VerifyCheckpointError::UnexpectedSideChain {
763                        found: qblock.block.hash,
764                        expected: expected_hash,
765                    }));
766            }
767        }
768
769        valid_qblock
770    }
771
772    fn process_checkpoint_range(&mut self) {
780        let previous_checkpoint_hash = match self.previous_checkpoint_hash() {
790            BeforeGenesis => GENESIS_PREVIOUS_BLOCK_HASH,
794            InitialTip(hash) | PreviousCheckpoint(hash) => hash,
795            FinalCheckpoint => return,
796        };
797        let (target_checkpoint_height, mut expected_hash) = match self.target_checkpoint_height() {
799            Checkpoint(height) => (
800                height,
801                self.checkpoint_list
802                    .hash(height)
803                    .expect("every checkpoint height must have a hash"),
804            ),
805            WaitingForBlocks => {
806                return;
807            }
808            FinishedVerifying => {
809                unreachable!("the FinalCheckpoint case should have returned earlier")
810            }
811        };
812
813        let old_prev_check_height = self.previous_checkpoint_height();
816
817        let current_range = (
819            self.current_start_bound()
820                .expect("earlier code checks if verification has finished"),
821            Included(target_checkpoint_height),
822        );
823        let range_heights: Vec<block::Height> = self
824            .queued
825            .range_mut(current_range)
826            .rev()
827            .map(|(key, _)| *key)
828            .collect();
829        let mut rev_valid_blocks = Vec::new();
831
832        for current_height in range_heights {
834            let valid_qblock = self.process_height(current_height, expected_hash);
835            if let Some(qblock) = valid_qblock {
836                expected_hash = qblock.block.block.header.previous_block_hash;
837                rev_valid_blocks.push(qblock);
841            } else {
842                tracing::info!(
845                    ?current_height,
846                    ?current_range,
847                    "No valid blocks at height in CheckpointVerifier"
848                );
849
850                for vblock in rev_valid_blocks.drain(..).rev() {
857                    self.queued
858                        .entry(vblock.block.height)
859                        .or_default()
860                        .push(vblock);
861                }
862
863                assert_eq!(
865                    self.previous_checkpoint_height(),
866                    old_prev_check_height,
867                    "we must not change the previous checkpoint on failure"
868                );
869                let current_target = self.target_checkpoint_height();
873                assert!(
874                    current_target == WaitingForBlocks
875                        || current_target < Checkpoint(target_checkpoint_height),
876                    "we must decrease or eliminate our target on failure"
877                );
878
879                return;
881            }
882        }
883
884        assert_eq!(
887            expected_hash, previous_checkpoint_hash,
888            "the previous checkpoint should match: bad checkpoint list, zebra bug, or bad chain"
889        );
890
891        let block_count = rev_valid_blocks.len();
892        tracing::info!(?block_count, ?current_range, "verified checkpoint range");
893        metrics::counter!("checkpoint.verified.block.count").increment(block_count as u64);
894
895        for qblock in rev_valid_blocks.drain(..).rev() {
898            let _ = qblock.tx.send(Ok(qblock.block.hash));
900        }
901
902        self.update_progress(target_checkpoint_height);
904
905        let new_progress = self.previous_checkpoint_height();
907        assert!(
908            new_progress > old_prev_check_height,
909            "we must make progress on success"
910        );
911        if new_progress == FinalCheckpoint {
913            assert_eq!(
914                target_checkpoint_height,
915                self.checkpoint_list.max_height(),
916                "we finish at the maximum checkpoint"
917            );
918        } else {
919            assert_eq!(
920                new_progress,
921                PreviousCheckpoint(target_checkpoint_height),
922                "the new previous checkpoint must match the old target"
923            );
924        }
925        let new_target = self.target_checkpoint_height();
929        assert!(
930            new_target == WaitingForBlocks || new_target == FinishedVerifying,
931            "processing must cover all available checkpoints"
932        );
933    }
934}
935
936impl<S> Drop for CheckpointVerifier<S>
938where
939    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
940    S::Future: Send + 'static,
941{
942    fn drop(&mut self) {
947        self.finish_diagnostics();
948
949        let drop_keys: Vec<_> = self.queued.keys().cloned().collect();
950        for key in drop_keys {
951            let mut qblocks = self
952                .queued
953                .remove(&key)
954                .expect("each entry is only removed once");
955            for qblock in qblocks.drain(..) {
956                let _ = qblock.tx.send(Err(VerifyCheckpointError::Dropped));
958            }
959        }
960    }
961}
962
963#[derive(Debug, Error)]
964#[allow(missing_docs)]
965pub enum VerifyCheckpointError {
966    #[error("checkpoint request after the final checkpoint has been verified")]
967    Finished,
968    #[error("block at {height:?} is higher than the maximum checkpoint {max_height:?}")]
969    TooHigh {
970        height: block::Height,
971        max_height: block::Height,
972    },
973    #[error("block {height:?} is less than or equal to the verified tip {verified_height:?}")]
974    AlreadyVerified {
975        height: block::Height,
976        verified_height: block::Height,
977    },
978    #[error("rejected older of duplicate verification requests for block at {height:?} {hash:?}")]
979    NewerRequest {
980        height: block::Height,
981        hash: block::Hash,
982    },
983    #[error("the block {hash:?} does not have a coinbase height")]
984    CoinbaseHeight { hash: block::Hash },
985    #[error("merkle root {actual:?} does not match expected {expected:?}")]
986    BadMerkleRoot {
987        actual: block::merkle::Root,
988        expected: block::merkle::Root,
989    },
990    #[error("duplicate transactions in block")]
991    DuplicateTransaction,
992    #[error("checkpoint verifier was dropped")]
993    Dropped,
994    #[error(transparent)]
995    CommitCheckpointVerified(BoxError),
996    #[error(transparent)]
997    Tip(BoxError),
998    #[error(transparent)]
999    CheckpointList(BoxError),
1000    #[error(transparent)]
1001    VerifyBlock(VerifyBlockError),
1002    #[error("invalid block subsidy: {0}")]
1003    SubsidyError(#[from] SubsidyError),
1004    #[error("invalid amount: {0}")]
1005    AmountError(#[from] amount::Error),
1006    #[error("too many queued blocks at this height")]
1007    QueuedLimit,
1008    #[error("the block hash does not match the chained checkpoint hash, expected {expected:?} found {found:?}")]
1009    UnexpectedSideChain {
1010        expected: block::Hash,
1011        found: block::Hash,
1012    },
1013    #[error("zebra is shutting down")]
1014    ShuttingDown,
1015}
1016
1017impl From<VerifyBlockError> for VerifyCheckpointError {
1018    fn from(err: VerifyBlockError) -> VerifyCheckpointError {
1019        VerifyCheckpointError::VerifyBlock(err)
1020    }
1021}
1022
1023impl From<BlockError> for VerifyCheckpointError {
1024    fn from(err: BlockError) -> VerifyCheckpointError {
1025        VerifyCheckpointError::VerifyBlock(err.into())
1026    }
1027}
1028
1029impl From<equihash::Error> for VerifyCheckpointError {
1030    fn from(err: equihash::Error) -> VerifyCheckpointError {
1031        VerifyCheckpointError::VerifyBlock(err.into())
1032    }
1033}
1034
1035impl VerifyCheckpointError {
1036    pub fn is_duplicate_request(&self) -> bool {
1039        match self {
1040            VerifyCheckpointError::AlreadyVerified { .. } => true,
1041            VerifyCheckpointError::NewerRequest { .. } => true,
1043            VerifyCheckpointError::VerifyBlock(block_error) => block_error.is_duplicate_request(),
1044            _ => false,
1045        }
1046    }
1047
1048    pub fn misbehavior_score(&self) -> u32 {
1050        match self {
1052            VerifyCheckpointError::VerifyBlock(verify_block_error) => {
1053                verify_block_error.misbehavior_score()
1054            }
1055            VerifyCheckpointError::SubsidyError(_)
1056            | VerifyCheckpointError::CoinbaseHeight { .. }
1057            | VerifyCheckpointError::DuplicateTransaction
1058            | VerifyCheckpointError::AmountError(_) => 100,
1059            _other => 0,
1060        }
1061    }
1062}
1063
1064impl<S> Service<Arc<Block>> for CheckpointVerifier<S>
1068where
1069    S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
1070    S::Future: Send + 'static,
1071{
1072    type Response = block::Hash;
1073    type Error = VerifyCheckpointError;
1074    type Future =
1075        Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1076
1077    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1078        Poll::Ready(Ok(()))
1079    }
1080
1081    #[instrument(name = "checkpoint", skip(self, block))]
1082    fn call(&mut self, block: Arc<Block>) -> Self::Future {
1083        if let Ok(tip) = self.reset_receiver.try_recv() {
1086            self.reset_progress(tip);
1087        }
1088
1089        if let FinalCheckpoint = self.previous_checkpoint_height() {
1091            return async { Err(VerifyCheckpointError::Finished) }.boxed();
1092        }
1093
1094        let req_block = match self.queue_block(block) {
1095            Ok(req_block) => req_block,
1096            Err(e) => return async { Err(e) }.boxed(),
1097        };
1098
1099        self.process_checkpoint_range();
1100
1101        metrics::gauge!("checkpoint.queued_slots").set(self.queued.len() as f64);
1102
1103        let state_service = self.state_service.clone();
1128        let commit_checkpoint_verified = tokio::spawn(async move {
1129            let hash = req_block
1130                .rx
1131                .await
1132                .map_err(Into::into)
1133                .map_err(VerifyCheckpointError::CommitCheckpointVerified)
1134                .expect("CheckpointVerifier does not leave dangling receivers")?;
1135
1136            match state_service
1139                .oneshot(zs::Request::CommitCheckpointVerifiedBlock(req_block.block))
1140                .map_err(VerifyCheckpointError::CommitCheckpointVerified)
1141                .await?
1142            {
1143                zs::Response::Committed(committed_hash) => {
1144                    assert_eq!(committed_hash, hash, "state must commit correct hash");
1145                    Ok(hash)
1146                }
1147                _ => unreachable!("wrong response for CommitCheckpointVerifiedBlock"),
1148            }
1149        });
1150
1151        let state_service = self.state_service.clone();
1152        let reset_sender = self.reset_sender.clone();
1153        async move {
1154            let result = commit_checkpoint_verified.await;
1155            let result = if zebra_chain::shutdown::is_shutting_down() {
1163                Err(VerifyCheckpointError::ShuttingDown)
1164            } else {
1165                result.expect("commit_checkpoint_verified should not panic")
1166            };
1167            if result.is_err() {
1168                let tip = match state_service
1172                    .oneshot(zs::Request::Tip)
1173                    .await
1174                    .map_err(VerifyCheckpointError::Tip)?
1175                {
1176                    zs::Response::Tip(tip) => tip,
1177                    _ => unreachable!("wrong response for Tip"),
1178                };
1179                let _ = reset_sender.send(tip);
1182            }
1183            result
1184        }
1185        .boxed()
1186    }
1187}