1use std::{
17 collections::BTreeMap,
18 ops::{Bound, Bound::*},
19 pin::Pin,
20 sync::{mpsc, Arc},
21 task::{Context, Poll},
22};
23
24use futures::{Future, FutureExt, TryFutureExt};
25use thiserror::Error;
26use tokio::sync::oneshot;
27use tower::{Service, ServiceExt};
28use tracing::instrument;
29
30use zebra_chain::{
31 amount,
32 block::{self, Block},
33 parameters::{
34 subsidy::{block_subsidy, funding_stream_values, FundingStreamReceiver, SubsidyError},
35 Network, GENESIS_PREVIOUS_BLOCK_HASH,
36 },
37 work::equihash,
38};
39use zebra_state::{self as zs, CheckpointVerifiedBlock};
40
41use crate::{
42 block::VerifyBlockError,
43 checkpoint::types::{
44 Progress::{self, *},
45 TargetHeight::{self, *},
46 },
47 error::BlockError,
48 BoxError, ParameterCheckpoint as _,
49};
50
51pub(crate) mod list;
52mod types;
53
54#[cfg(test)]
55mod tests;
56
57pub use zebra_node_services::constants::{MAX_CHECKPOINT_BYTE_COUNT, MAX_CHECKPOINT_HEIGHT_GAP};
58
59pub use list::CheckpointList;
60
61#[derive(Debug)]
63struct QueuedBlock {
64 block: CheckpointVerifiedBlock,
66 tx: oneshot::Sender<Result<block::Hash, VerifyCheckpointError>>,
68}
69
70#[derive(Debug)]
72struct RequestBlock {
73 block: CheckpointVerifiedBlock,
75 rx: oneshot::Receiver<Result<block::Hash, VerifyCheckpointError>>,
77}
78
79type QueuedBlockList = Vec<QueuedBlock>;
86
87pub const MAX_QUEUED_BLOCKS_PER_HEIGHT: usize = 4;
98
99fn progress_from_tip(
101 checkpoint_list: &CheckpointList,
102 tip: Option<(block::Height, block::Hash)>,
103) -> (Option<block::Hash>, Progress<block::Height>) {
104 match tip {
105 Some((height, hash)) => {
106 if height >= checkpoint_list.max_height() {
107 (None, Progress::FinalCheckpoint)
108 } else {
109 metrics::gauge!("checkpoint.verified.height").set(height.0 as f64);
110 metrics::gauge!("checkpoint.processing.next.height").set(height.0 as f64);
111 (Some(hash), Progress::InitialTip(height))
112 }
113 }
114 None => (None, Progress::BeforeGenesis),
116 }
117}
118
119pub struct CheckpointVerifier<S>
124where
125 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
126 S::Future: Send + 'static,
127{
128 checkpoint_list: CheckpointList,
130
131 network: Network,
133
134 initial_tip_hash: Option<block::Hash>,
136
137 state_service: S,
139
140 queued: BTreeMap<block::Height, QueuedBlockList>,
151
152 verifier_progress: Progress<block::Height>,
154
155 reset_receiver: mpsc::Receiver<Option<(block::Height, block::Hash)>>,
158 reset_sender: mpsc::Sender<Option<(block::Height, block::Hash)>>,
161
162 #[cfg(feature = "progress-bar")]
164 queued_blocks_bar: howudoin::Tx,
165
166 #[cfg(feature = "progress-bar")]
168 verified_checkpoint_bar: howudoin::Tx,
169}
170
171impl<S> std::fmt::Debug for CheckpointVerifier<S>
172where
173 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
174 S::Future: Send + 'static,
175{
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177 f.debug_struct("CheckpointVerifier")
178 .field("checkpoint_list", &self.checkpoint_list)
179 .field("network", &self.network)
180 .field("initial_tip_hash", &self.initial_tip_hash)
181 .field("queued", &self.queued)
182 .field("verifier_progress", &self.verifier_progress)
183 .finish()
184 }
185}
186
187impl<S> CheckpointVerifier<S>
188where
189 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
190 S::Future: Send + 'static,
191{
192 #[allow(dead_code)]
208 pub fn new(
209 network: &Network,
210 initial_tip: Option<(block::Height, block::Hash)>,
211 state_service: S,
212 ) -> Self {
213 let checkpoint_list = network.checkpoint_list();
214 let max_height = checkpoint_list.max_height();
215 tracing::info!(
216 ?max_height,
217 ?network,
218 ?initial_tip,
219 "initialising CheckpointVerifier"
220 );
221 Self::from_checkpoint_list(checkpoint_list, network, initial_tip, state_service)
222 }
223
224 #[allow(dead_code)]
236 pub(crate) fn from_list(
237 list: impl IntoIterator<Item = (block::Height, block::Hash)>,
238 network: &Network,
239 initial_tip: Option<(block::Height, block::Hash)>,
240 state_service: S,
241 ) -> Result<Self, VerifyCheckpointError> {
242 Ok(Self::from_checkpoint_list(
243 CheckpointList::from_list(list).map_err(VerifyCheckpointError::CheckpointList)?,
244 network,
245 initial_tip,
246 state_service,
247 ))
248 }
249
250 pub(crate) fn from_checkpoint_list(
258 checkpoint_list: CheckpointList,
259 network: &Network,
260 initial_tip: Option<(block::Height, block::Hash)>,
261 state_service: S,
262 ) -> Self {
263 let (initial_tip_hash, verifier_progress) =
266 progress_from_tip(&checkpoint_list, initial_tip);
267
268 let (sender, receiver) = mpsc::channel();
269
270 #[cfg(feature = "progress-bar")]
271 let queued_blocks_bar = howudoin::new_root().label("Checkpoint Queue Height");
272
273 #[cfg(feature = "progress-bar")]
274 let verified_checkpoint_bar =
275 howudoin::new_with_parent(queued_blocks_bar.id()).label("Verified Checkpoints");
276
277 let verifier = CheckpointVerifier {
278 checkpoint_list,
279 network: network.clone(),
280 initial_tip_hash,
281 state_service,
282 queued: BTreeMap::new(),
283 verifier_progress,
284 reset_receiver: receiver,
285 reset_sender: sender,
286 #[cfg(feature = "progress-bar")]
287 queued_blocks_bar,
288 #[cfg(feature = "progress-bar")]
289 verified_checkpoint_bar,
290 };
291
292 if verifier_progress.is_final_checkpoint() {
293 verifier.finish_diagnostics();
294 } else {
295 verifier.verified_checkpoint_diagnostics(verifier_progress.height());
296 }
297
298 verifier
299 }
300
301 fn queued_block_diagnostics(&self, height: block::Height, hash: block::Hash) {
303 let max_queued_height = self
304 .queued
305 .keys()
306 .next_back()
307 .expect("queued has at least one entry");
308
309 metrics::gauge!("checkpoint.queued.max.height").set(max_queued_height.0 as f64);
310
311 let is_checkpoint = self.checkpoint_list.contains(height);
312 tracing::debug!(?height, ?hash, ?is_checkpoint, "queued block");
313
314 #[cfg(feature = "progress-bar")]
315 if matches!(howudoin::cancelled(), Some(true)) {
316 self.finish_diagnostics();
317 } else {
318 self.queued_blocks_bar
319 .set_pos(max_queued_height.0)
320 .set_len(u64::from(self.checkpoint_list.max_height().0));
321 }
322 }
323
324 fn verified_checkpoint_diagnostics(&self, verified_height: impl Into<Option<block::Height>>) {
326 let Some(verified_height) = verified_height.into() else {
327 return;
330 };
331
332 metrics::gauge!("checkpoint.verified.height").set(verified_height.0 as f64);
333
334 let checkpoint_index = self.checkpoint_list.prev_checkpoint_index(verified_height);
335 let checkpoint_count = self.checkpoint_list.len();
336
337 metrics::gauge!("checkpoint.verified.count").set(checkpoint_index as f64);
338
339 tracing::debug!(
340 ?verified_height,
341 ?checkpoint_index,
342 ?checkpoint_count,
343 "verified checkpoint",
344 );
345
346 #[cfg(feature = "progress-bar")]
347 if matches!(howudoin::cancelled(), Some(true)) {
348 self.finish_diagnostics();
349 } else {
350 self.verified_checkpoint_bar
351 .set_pos(u64::try_from(checkpoint_index).expect("fits in u64"))
352 .set_len(u64::try_from(checkpoint_count).expect("fits in u64"));
353 }
354 }
355
356 fn finish_diagnostics(&self) {
358 #[cfg(feature = "progress-bar")]
359 {
360 self.queued_blocks_bar.close();
361 self.verified_checkpoint_bar.close();
362 }
363 }
364
365 fn reset_progress(&mut self, tip: Option<(block::Height, block::Hash)>) {
367 let (initial_tip_hash, verifier_progress) = progress_from_tip(&self.checkpoint_list, tip);
368 self.initial_tip_hash = initial_tip_hash;
369 self.verifier_progress = verifier_progress;
370
371 self.verified_checkpoint_diagnostics(verifier_progress.height());
372 }
373
374 fn previous_checkpoint_height(&self) -> Progress<block::Height> {
384 self.verifier_progress
385 }
386
387 fn current_start_bound(&self) -> Option<Bound<block::Height>> {
391 match self.previous_checkpoint_height() {
392 BeforeGenesis => Some(Unbounded),
393 InitialTip(height) | PreviousCheckpoint(height) => Some(Excluded(height)),
394 FinalCheckpoint => None,
395 }
396 }
397
398 fn target_checkpoint_height(&self) -> TargetHeight {
410 let start_height = match self.previous_checkpoint_height() {
412 BeforeGenesis if !self.queued.contains_key(&block::Height(0)) => {
414 tracing::trace!("Waiting for genesis block");
415 metrics::counter!("checkpoint.waiting.count").increment(1);
416 return WaitingForBlocks;
417 }
418 BeforeGenesis => block::Height(0),
419 InitialTip(height) | PreviousCheckpoint(height) => height,
420 FinalCheckpoint => return FinishedVerifying,
421 };
422
423 let mut pending_height = start_height;
436 for (&height, _) in self.queued.range((Excluded(pending_height), Unbounded)) {
437 if height == block::Height(pending_height.0 + 1) {
439 pending_height = height;
440 } else {
441 let gap = height.0 - pending_height.0;
442 tracing::trace!(contiguous_height = ?pending_height,
444 next_height = ?height,
445 ?gap,
446 "Waiting for more checkpoint blocks");
447 break;
448 }
449 }
450 metrics::gauge!("checkpoint.queued.continuous.height").set(pending_height.0 as f64);
451
452 let start = self.current_start_bound().expect(
454 "if verification has finished, we should have returned earlier in the function",
455 );
456 let target_checkpoint = self
459 .checkpoint_list
460 .max_height_in_range((start, Included(pending_height)));
461
462 tracing::trace!(
463 checkpoint_start = ?start,
464 highest_contiguous_block = ?pending_height,
465 ?target_checkpoint
466 );
467
468 if let Some(block::Height(target_checkpoint)) = target_checkpoint {
469 metrics::gauge!("checkpoint.processing.next.height").set(target_checkpoint as f64);
470 } else {
471 metrics::gauge!("checkpoint.processing.next.height").set(start_height.0 as f64);
473 metrics::counter!("checkpoint.waiting.count").increment(1);
474 }
475
476 target_checkpoint
477 .map(Checkpoint)
478 .unwrap_or(WaitingForBlocks)
479 }
480
481 fn previous_checkpoint_hash(&self) -> Progress<block::Hash> {
485 match self.previous_checkpoint_height() {
486 BeforeGenesis => BeforeGenesis,
487 InitialTip(_) => self
488 .initial_tip_hash
489 .map(InitialTip)
490 .expect("initial tip height must have an initial tip hash"),
491 PreviousCheckpoint(height) => self
492 .checkpoint_list
493 .hash(height)
494 .map(PreviousCheckpoint)
495 .expect("every checkpoint height must have a hash"),
496 FinalCheckpoint => FinalCheckpoint,
497 }
498 }
499
500 fn check_height(&self, height: block::Height) -> Result<(), VerifyCheckpointError> {
509 if height > self.checkpoint_list.max_height() {
510 Err(VerifyCheckpointError::TooHigh {
511 height,
512 max_height: self.checkpoint_list.max_height(),
513 })?;
514 }
515
516 match self.previous_checkpoint_height() {
517 BeforeGenesis => {}
519 InitialTip(previous_height) | PreviousCheckpoint(previous_height)
521 if (height <= previous_height) =>
522 {
523 let e = Err(VerifyCheckpointError::AlreadyVerified {
524 height,
525 verified_height: previous_height,
526 });
527 tracing::trace!(?e);
528 e?;
529 }
530 InitialTip(_) | PreviousCheckpoint(_) => {}
531 FinalCheckpoint => Err(VerifyCheckpointError::Finished)?,
533 };
534
535 Ok(())
536 }
537
538 fn update_progress(&mut self, verified_height: block::Height) {
540 if let Some(max_height) = self.queued.keys().next_back() {
541 metrics::gauge!("checkpoint.queued.max.height").set(max_height.0 as f64);
542 } else {
543 metrics::gauge!("checkpoint.queued.max.height").set(f64::NAN);
545 }
546 metrics::gauge!("checkpoint.queued_slots").set(self.queued.len() as f64);
547
548 if self.check_height(verified_height).is_err() {
556 return;
557 }
558
559 if verified_height == self.checkpoint_list.max_height() {
561 self.verifier_progress = FinalCheckpoint;
562
563 tracing::info!(
564 final_checkpoint_height = ?verified_height,
565 "verified final checkpoint: starting full validation",
566 );
567
568 self.verified_checkpoint_diagnostics(verified_height);
569 self.finish_diagnostics();
570 } else if self.checkpoint_list.contains(verified_height) {
571 self.verifier_progress = PreviousCheckpoint(verified_height);
572 self.initial_tip_hash = None;
574
575 self.verified_checkpoint_diagnostics(verified_height);
576 }
577 }
578
579 fn check_block(
592 &self,
593 block: Arc<Block>,
594 ) -> Result<CheckpointVerifiedBlock, VerifyCheckpointError> {
595 let hash = block.hash();
596 let height = block
597 .coinbase_height()
598 .ok_or(VerifyCheckpointError::CoinbaseHeight { hash })?;
599 self.check_height(height)?;
600
601 if self.network.disable_pow() {
602 crate::block::check::difficulty_threshold_is_valid(
603 &block.header,
604 &self.network,
605 &height,
606 &hash,
607 )?;
608 } else {
609 crate::block::check::difficulty_is_valid(&block.header, &self.network, &height, &hash)?;
610 crate::block::check::equihash_solution_is_valid(&block.header)?;
611 }
612
613 let expected_deferred_amount = if height > self.network.slow_start_interval() {
616 funding_stream_values(height, &self.network, block_subsidy(height, &self.network)?)?
618 .remove(&FundingStreamReceiver::Deferred)
619 } else {
620 None
621 };
622
623 let block = CheckpointVerifiedBlock::new(block, Some(hash), expected_deferred_amount);
625
626 crate::block::check::merkle_root_validity(
627 &self.network,
628 &block.block,
629 &block.transaction_hashes,
630 )?;
631
632 Ok(block)
633 }
634
635 #[allow(clippy::unwrap_in_result)]
646 fn queue_block(&mut self, block: Arc<Block>) -> Result<RequestBlock, VerifyCheckpointError> {
647 let (tx, rx) = oneshot::channel();
649
650 let block = self.check_block(block)?;
652 let height = block.height;
653 let hash = block.hash;
654
655 let new_qblock = QueuedBlock {
656 block: block.clone(),
657 tx,
658 };
659 let req_block = RequestBlock { block, rx };
660
661 let qblocks = self
665 .queued
666 .entry(height)
667 .or_insert_with(|| QueuedBlockList::with_capacity(1));
668
669 for qb in qblocks.iter_mut() {
672 if qb.block.hash == hash {
673 let e = VerifyCheckpointError::NewerRequest { height, hash };
674 tracing::trace!(?e, "failing older of duplicate requests");
675
676 let old = std::mem::replace(qb, new_qblock);
685 let _ = old.tx.send(Err(e));
686 return Ok(req_block);
687 }
688 }
689
690 if qblocks.len() >= MAX_QUEUED_BLOCKS_PER_HEIGHT {
692 let e = VerifyCheckpointError::QueuedLimit;
693 tracing::warn!(?e);
694 return Err(e);
695 }
696
697 qblocks.reserve_exact(1);
700 qblocks.push(new_qblock);
701
702 self.queued_block_diagnostics(height, hash);
703
704 Ok(req_block)
705 }
706
707 #[allow(clippy::unwrap_in_result)]
711 fn process_height(
712 &mut self,
713 height: block::Height,
714 expected_hash: block::Hash,
715 ) -> Option<QueuedBlock> {
716 let mut qblocks = self
717 .queued
718 .remove(&height)
719 .expect("the current checkpoint range has continuous Vec<QueuedBlock>s");
720 assert!(
721 !qblocks.is_empty(),
722 "the current checkpoint range has continuous Blocks"
723 );
724
725 if let Some(checkpoint_hash) = self.checkpoint_list.hash(height) {
727 assert_eq!(expected_hash, checkpoint_hash,
732 "checkpoints in the range should match: bad checkpoint list, zebra bug, or bad chain"
733 );
734 }
735
736 let mut valid_qblock = None;
743 for qblock in qblocks.drain(..) {
744 if qblock.block.hash == expected_hash {
745 if valid_qblock.is_none() {
746 valid_qblock = Some(qblock);
748 } else {
749 unreachable!("unexpected duplicate block {:?} {:?}: duplicate blocks should be rejected before being queued",
750 height, qblock.block.hash);
751 }
752 } else {
753 tracing::info!(?height, ?qblock.block.hash, ?expected_hash,
754 "Side chain hash at height in CheckpointVerifier");
755 let _ = qblock
756 .tx
757 .send(Err(VerifyCheckpointError::UnexpectedSideChain {
758 found: qblock.block.hash,
759 expected: expected_hash,
760 }));
761 }
762 }
763
764 valid_qblock
765 }
766
767 fn process_checkpoint_range(&mut self) {
775 let previous_checkpoint_hash = match self.previous_checkpoint_hash() {
785 BeforeGenesis => GENESIS_PREVIOUS_BLOCK_HASH,
789 InitialTip(hash) | PreviousCheckpoint(hash) => hash,
790 FinalCheckpoint => return,
791 };
792 let (target_checkpoint_height, mut expected_hash) = match self.target_checkpoint_height() {
794 Checkpoint(height) => (
795 height,
796 self.checkpoint_list
797 .hash(height)
798 .expect("every checkpoint height must have a hash"),
799 ),
800 WaitingForBlocks => {
801 return;
802 }
803 FinishedVerifying => {
804 unreachable!("the FinalCheckpoint case should have returned earlier")
805 }
806 };
807
808 let old_prev_check_height = self.previous_checkpoint_height();
811
812 let current_range = (
814 self.current_start_bound()
815 .expect("earlier code checks if verification has finished"),
816 Included(target_checkpoint_height),
817 );
818 let range_heights: Vec<block::Height> = self
819 .queued
820 .range_mut(current_range)
821 .rev()
822 .map(|(key, _)| *key)
823 .collect();
824 let mut rev_valid_blocks = Vec::new();
826
827 for current_height in range_heights {
829 let valid_qblock = self.process_height(current_height, expected_hash);
830 if let Some(qblock) = valid_qblock {
831 expected_hash = qblock.block.block.header.previous_block_hash;
832 rev_valid_blocks.push(qblock);
836 } else {
837 tracing::info!(
840 ?current_height,
841 ?current_range,
842 "No valid blocks at height in CheckpointVerifier"
843 );
844
845 for vblock in rev_valid_blocks.drain(..).rev() {
852 self.queued
853 .entry(vblock.block.height)
854 .or_default()
855 .push(vblock);
856 }
857
858 assert_eq!(
860 self.previous_checkpoint_height(),
861 old_prev_check_height,
862 "we must not change the previous checkpoint on failure"
863 );
864 let current_target = self.target_checkpoint_height();
868 assert!(
869 current_target == WaitingForBlocks
870 || current_target < Checkpoint(target_checkpoint_height),
871 "we must decrease or eliminate our target on failure"
872 );
873
874 return;
876 }
877 }
878
879 assert_eq!(
882 expected_hash, previous_checkpoint_hash,
883 "the previous checkpoint should match: bad checkpoint list, zebra bug, or bad chain"
884 );
885
886 let block_count = rev_valid_blocks.len();
887 tracing::info!(?block_count, ?current_range, "verified checkpoint range");
888 metrics::counter!("checkpoint.verified.block.count").increment(block_count as u64);
889
890 for qblock in rev_valid_blocks.drain(..).rev() {
893 let _ = qblock.tx.send(Ok(qblock.block.hash));
895 }
896
897 self.update_progress(target_checkpoint_height);
899
900 let new_progress = self.previous_checkpoint_height();
902 assert!(
903 new_progress > old_prev_check_height,
904 "we must make progress on success"
905 );
906 if new_progress == FinalCheckpoint {
908 assert_eq!(
909 target_checkpoint_height,
910 self.checkpoint_list.max_height(),
911 "we finish at the maximum checkpoint"
912 );
913 } else {
914 assert_eq!(
915 new_progress,
916 PreviousCheckpoint(target_checkpoint_height),
917 "the new previous checkpoint must match the old target"
918 );
919 }
920 let new_target = self.target_checkpoint_height();
924 assert!(
925 new_target == WaitingForBlocks || new_target == FinishedVerifying,
926 "processing must cover all available checkpoints"
927 );
928 }
929}
930
931impl<S> Drop for CheckpointVerifier<S>
933where
934 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
935 S::Future: Send + 'static,
936{
937 fn drop(&mut self) {
942 self.finish_diagnostics();
943
944 let drop_keys: Vec<_> = self.queued.keys().cloned().collect();
945 for key in drop_keys {
946 let mut qblocks = self
947 .queued
948 .remove(&key)
949 .expect("each entry is only removed once");
950 for qblock in qblocks.drain(..) {
951 let _ = qblock.tx.send(Err(VerifyCheckpointError::Dropped));
953 }
954 }
955 }
956}
957
958#[derive(Debug, Error)]
959#[allow(missing_docs)]
960pub enum VerifyCheckpointError {
961 #[error("checkpoint request after the final checkpoint has been verified")]
962 Finished,
963 #[error("block at {height:?} is higher than the maximum checkpoint {max_height:?}")]
964 TooHigh {
965 height: block::Height,
966 max_height: block::Height,
967 },
968 #[error("block {height:?} is less than or equal to the verified tip {verified_height:?}")]
969 AlreadyVerified {
970 height: block::Height,
971 verified_height: block::Height,
972 },
973 #[error("rejected older of duplicate verification requests for block at {height:?} {hash:?}")]
974 NewerRequest {
975 height: block::Height,
976 hash: block::Hash,
977 },
978 #[error("the block {hash:?} does not have a coinbase height")]
979 CoinbaseHeight { hash: block::Hash },
980 #[error("merkle root {actual:?} does not match expected {expected:?}")]
981 BadMerkleRoot {
982 actual: block::merkle::Root,
983 expected: block::merkle::Root,
984 },
985 #[error("duplicate transactions in block")]
986 DuplicateTransaction,
987 #[error("checkpoint verifier was dropped")]
988 Dropped,
989 #[error(transparent)]
990 CommitCheckpointVerified(BoxError),
991 #[error(transparent)]
992 Tip(BoxError),
993 #[error(transparent)]
994 CheckpointList(BoxError),
995 #[error(transparent)]
996 VerifyBlock(VerifyBlockError),
997 #[error("invalid block subsidy: {0}")]
998 SubsidyError(#[from] SubsidyError),
999 #[error("invalid amount: {0}")]
1000 AmountError(#[from] amount::Error),
1001 #[error("too many queued blocks at this height")]
1002 QueuedLimit,
1003 #[error("the block hash does not match the chained checkpoint hash, expected {expected:?} found {found:?}")]
1004 UnexpectedSideChain {
1005 expected: block::Hash,
1006 found: block::Hash,
1007 },
1008 #[error("zebra is shutting down")]
1009 ShuttingDown,
1010}
1011
1012impl From<VerifyBlockError> for VerifyCheckpointError {
1013 fn from(err: VerifyBlockError) -> VerifyCheckpointError {
1014 VerifyCheckpointError::VerifyBlock(err)
1015 }
1016}
1017
1018impl From<BlockError> for VerifyCheckpointError {
1019 fn from(err: BlockError) -> VerifyCheckpointError {
1020 VerifyCheckpointError::VerifyBlock(err.into())
1021 }
1022}
1023
1024impl From<equihash::Error> for VerifyCheckpointError {
1025 fn from(err: equihash::Error) -> VerifyCheckpointError {
1026 VerifyCheckpointError::VerifyBlock(err.into())
1027 }
1028}
1029
1030impl VerifyCheckpointError {
1031 pub fn is_duplicate_request(&self) -> bool {
1034 match self {
1035 VerifyCheckpointError::AlreadyVerified { .. } => true,
1036 VerifyCheckpointError::NewerRequest { .. } => true,
1038 VerifyCheckpointError::VerifyBlock(block_error) => block_error.is_duplicate_request(),
1039 _ => false,
1040 }
1041 }
1042
1043 pub fn misbehavior_score(&self) -> u32 {
1045 match self {
1047 VerifyCheckpointError::VerifyBlock(verify_block_error) => {
1048 verify_block_error.misbehavior_score()
1049 }
1050 VerifyCheckpointError::SubsidyError(_)
1051 | VerifyCheckpointError::CoinbaseHeight { .. }
1052 | VerifyCheckpointError::DuplicateTransaction
1053 | VerifyCheckpointError::AmountError(_) => 100,
1054 _other => 0,
1055 }
1056 }
1057}
1058
1059impl<S> Service<Arc<Block>> for CheckpointVerifier<S>
1063where
1064 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
1065 S::Future: Send + 'static,
1066{
1067 type Response = block::Hash;
1068 type Error = VerifyCheckpointError;
1069 type Future =
1070 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1071
1072 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1073 Poll::Ready(Ok(()))
1074 }
1075
1076 #[instrument(name = "checkpoint", skip(self, block))]
1077 fn call(&mut self, block: Arc<Block>) -> Self::Future {
1078 if let Ok(tip) = self.reset_receiver.try_recv() {
1081 self.reset_progress(tip);
1082 }
1083
1084 if let FinalCheckpoint = self.previous_checkpoint_height() {
1086 return async { Err(VerifyCheckpointError::Finished) }.boxed();
1087 }
1088
1089 let req_block = match self.queue_block(block) {
1090 Ok(req_block) => req_block,
1091 Err(e) => return async { Err(e) }.boxed(),
1092 };
1093
1094 self.process_checkpoint_range();
1095
1096 metrics::gauge!("checkpoint.queued_slots").set(self.queued.len() as f64);
1097
1098 let state_service = self.state_service.clone();
1123 let commit_checkpoint_verified = tokio::spawn(async move {
1124 let hash = req_block
1125 .rx
1126 .await
1127 .map_err(Into::into)
1128 .map_err(VerifyCheckpointError::CommitCheckpointVerified)
1129 .expect("CheckpointVerifier does not leave dangling receivers")?;
1130
1131 match state_service
1134 .oneshot(zs::Request::CommitCheckpointVerifiedBlock(req_block.block))
1135 .map_err(VerifyCheckpointError::CommitCheckpointVerified)
1136 .await?
1137 {
1138 zs::Response::Committed(committed_hash) => {
1139 assert_eq!(committed_hash, hash, "state must commit correct hash");
1140 Ok(hash)
1141 }
1142 _ => unreachable!("wrong response for CommitCheckpointVerifiedBlock"),
1143 }
1144 });
1145
1146 let state_service = self.state_service.clone();
1147 let reset_sender = self.reset_sender.clone();
1148 async move {
1149 let result = commit_checkpoint_verified.await;
1150 let result = if zebra_chain::shutdown::is_shutting_down() {
1158 Err(VerifyCheckpointError::ShuttingDown)
1159 } else {
1160 result.expect("commit_checkpoint_verified should not panic")
1161 };
1162 if result.is_err() {
1163 let tip = match state_service
1167 .oneshot(zs::Request::Tip)
1168 .await
1169 .map_err(VerifyCheckpointError::Tip)?
1170 {
1171 zs::Response::Tip(tip) => tip,
1172 _ => unreachable!("wrong response for Tip"),
1173 };
1174 let _ = reset_sender.send(tip);
1177 }
1178 result
1179 }
1180 .boxed()
1181 }
1182}