1use std::{
17 collections::BTreeMap,
18 ops::{Bound, Bound::*},
19 pin::Pin,
20 sync::{mpsc, Arc},
21 task::{Context, Poll},
22};
23
24use futures::{Future, FutureExt, TryFutureExt};
25use thiserror::Error;
26use tokio::sync::oneshot;
27use tower::{Service, ServiceExt};
28use tracing::instrument;
29
30use zebra_chain::{
31 amount::{self, DeferredPoolBalanceChange},
32 block::{self, Block},
33 parameters::{
34 subsidy::{block_subsidy, funding_stream_values, FundingStreamReceiver, SubsidyError},
35 Network, GENESIS_PREVIOUS_BLOCK_HASH,
36 },
37 work::equihash,
38};
39use zebra_state::{self as zs, CheckpointVerifiedBlock};
40
41use crate::{
42 block::VerifyBlockError,
43 checkpoint::types::{
44 Progress::{self, *},
45 TargetHeight::{self, *},
46 },
47 error::BlockError,
48 BoxError, ParameterCheckpoint as _,
49};
50
51pub(crate) mod list;
52mod types;
53
54#[cfg(test)]
55mod tests;
56
57pub use zebra_node_services::constants::{MAX_CHECKPOINT_BYTE_COUNT, MAX_CHECKPOINT_HEIGHT_GAP};
58
59pub use list::CheckpointList;
60
61#[derive(Debug)]
63struct QueuedBlock {
64 block: CheckpointVerifiedBlock,
66 tx: oneshot::Sender<Result<block::Hash, VerifyCheckpointError>>,
68}
69
70#[derive(Debug)]
72struct RequestBlock {
73 block: CheckpointVerifiedBlock,
75 rx: oneshot::Receiver<Result<block::Hash, VerifyCheckpointError>>,
77}
78
79type QueuedBlockList = Vec<QueuedBlock>;
86
87pub const MAX_QUEUED_BLOCKS_PER_HEIGHT: usize = 4;
98
99fn progress_from_tip(
101 checkpoint_list: &CheckpointList,
102 tip: Option<(block::Height, block::Hash)>,
103) -> (Option<block::Hash>, Progress<block::Height>) {
104 match tip {
105 Some((height, hash)) => {
106 if height >= checkpoint_list.max_height() {
107 (None, Progress::FinalCheckpoint)
108 } else {
109 metrics::gauge!("checkpoint.verified.height").set(height.0 as f64);
110 metrics::gauge!("checkpoint.processing.next.height").set(height.0 as f64);
111 (Some(hash), Progress::InitialTip(height))
112 }
113 }
114 None => (None, Progress::BeforeGenesis),
116 }
117}
118
119pub struct CheckpointVerifier<S>
124where
125 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
126 S::Future: Send + 'static,
127{
128 checkpoint_list: CheckpointList,
130
131 network: Network,
133
134 initial_tip_hash: Option<block::Hash>,
136
137 state_service: S,
139
140 queued: BTreeMap<block::Height, QueuedBlockList>,
151
152 verifier_progress: Progress<block::Height>,
154
155 reset_receiver: mpsc::Receiver<Option<(block::Height, block::Hash)>>,
158 reset_sender: mpsc::Sender<Option<(block::Height, block::Hash)>>,
161
162 #[cfg(feature = "progress-bar")]
164 queued_blocks_bar: howudoin::Tx,
165
166 #[cfg(feature = "progress-bar")]
168 verified_checkpoint_bar: howudoin::Tx,
169}
170
171impl<S> std::fmt::Debug for CheckpointVerifier<S>
172where
173 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
174 S::Future: Send + 'static,
175{
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177 f.debug_struct("CheckpointVerifier")
178 .field("checkpoint_list", &self.checkpoint_list)
179 .field("network", &self.network)
180 .field("initial_tip_hash", &self.initial_tip_hash)
181 .field("queued", &self.queued)
182 .field("verifier_progress", &self.verifier_progress)
183 .finish()
184 }
185}
186
187impl<S> CheckpointVerifier<S>
188where
189 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
190 S::Future: Send + 'static,
191{
192 #[allow(dead_code)]
208 pub fn new(
209 network: &Network,
210 initial_tip: Option<(block::Height, block::Hash)>,
211 state_service: S,
212 ) -> Self {
213 let checkpoint_list = network.checkpoint_list();
214 let max_height = checkpoint_list.max_height();
215 tracing::info!(
216 ?max_height,
217 ?network,
218 ?initial_tip,
219 "initialising CheckpointVerifier"
220 );
221 Self::from_checkpoint_list(checkpoint_list, network, initial_tip, state_service)
222 }
223
224 #[allow(dead_code)]
236 pub(crate) fn from_list(
237 list: impl IntoIterator<Item = (block::Height, block::Hash)>,
238 network: &Network,
239 initial_tip: Option<(block::Height, block::Hash)>,
240 state_service: S,
241 ) -> Result<Self, VerifyCheckpointError> {
242 Ok(Self::from_checkpoint_list(
243 CheckpointList::from_list(list).map_err(VerifyCheckpointError::CheckpointList)?,
244 network,
245 initial_tip,
246 state_service,
247 ))
248 }
249
250 pub(crate) fn from_checkpoint_list(
258 checkpoint_list: CheckpointList,
259 network: &Network,
260 initial_tip: Option<(block::Height, block::Hash)>,
261 state_service: S,
262 ) -> Self {
263 let (initial_tip_hash, verifier_progress) =
266 progress_from_tip(&checkpoint_list, initial_tip);
267
268 let (sender, receiver) = mpsc::channel();
269
270 #[cfg(feature = "progress-bar")]
271 let queued_blocks_bar = howudoin::new_root().label("Checkpoint Queue Height");
272
273 #[cfg(feature = "progress-bar")]
274 let verified_checkpoint_bar =
275 howudoin::new_with_parent(queued_blocks_bar.id()).label("Verified Checkpoints");
276
277 let verifier = CheckpointVerifier {
278 checkpoint_list,
279 network: network.clone(),
280 initial_tip_hash,
281 state_service,
282 queued: BTreeMap::new(),
283 verifier_progress,
284 reset_receiver: receiver,
285 reset_sender: sender,
286 #[cfg(feature = "progress-bar")]
287 queued_blocks_bar,
288 #[cfg(feature = "progress-bar")]
289 verified_checkpoint_bar,
290 };
291
292 if verifier_progress.is_final_checkpoint() {
293 verifier.finish_diagnostics();
294 } else {
295 verifier.verified_checkpoint_diagnostics(verifier_progress.height());
296 }
297
298 verifier
299 }
300
301 fn queued_block_diagnostics(&self, height: block::Height, hash: block::Hash) {
303 let max_queued_height = self
304 .queued
305 .keys()
306 .next_back()
307 .expect("queued has at least one entry");
308
309 metrics::gauge!("checkpoint.queued.max.height").set(max_queued_height.0 as f64);
310
311 let is_checkpoint = self.checkpoint_list.contains(height);
312 tracing::debug!(?height, ?hash, ?is_checkpoint, "queued block");
313
314 #[cfg(feature = "progress-bar")]
315 if matches!(howudoin::cancelled(), Some(true)) {
316 self.finish_diagnostics();
317 } else {
318 self.queued_blocks_bar
319 .set_pos(max_queued_height.0)
320 .set_len(u64::from(self.checkpoint_list.max_height().0));
321 }
322 }
323
324 fn verified_checkpoint_diagnostics(&self, verified_height: impl Into<Option<block::Height>>) {
326 let Some(verified_height) = verified_height.into() else {
327 return;
330 };
331
332 metrics::gauge!("checkpoint.verified.height").set(verified_height.0 as f64);
333
334 let checkpoint_index = self.checkpoint_list.prev_checkpoint_index(verified_height);
335 let checkpoint_count = self.checkpoint_list.len();
336
337 metrics::gauge!("checkpoint.verified.count").set(checkpoint_index as f64);
338
339 tracing::debug!(
340 ?verified_height,
341 ?checkpoint_index,
342 ?checkpoint_count,
343 "verified checkpoint",
344 );
345
346 #[cfg(feature = "progress-bar")]
347 if matches!(howudoin::cancelled(), Some(true)) {
348 self.finish_diagnostics();
349 } else {
350 self.verified_checkpoint_bar
351 .set_pos(u64::try_from(checkpoint_index).expect("fits in u64"))
352 .set_len(u64::try_from(checkpoint_count).expect("fits in u64"));
353 }
354 }
355
356 fn finish_diagnostics(&self) {
358 #[cfg(feature = "progress-bar")]
359 {
360 self.queued_blocks_bar.close();
361 self.verified_checkpoint_bar.close();
362 }
363 }
364
365 fn reset_progress(&mut self, tip: Option<(block::Height, block::Hash)>) {
367 let (initial_tip_hash, verifier_progress) = progress_from_tip(&self.checkpoint_list, tip);
368 self.initial_tip_hash = initial_tip_hash;
369 self.verifier_progress = verifier_progress;
370
371 self.verified_checkpoint_diagnostics(verifier_progress.height());
372 }
373
374 fn previous_checkpoint_height(&self) -> Progress<block::Height> {
384 self.verifier_progress
385 }
386
387 fn current_start_bound(&self) -> Option<Bound<block::Height>> {
391 match self.previous_checkpoint_height() {
392 BeforeGenesis => Some(Unbounded),
393 InitialTip(height) | PreviousCheckpoint(height) => Some(Excluded(height)),
394 FinalCheckpoint => None,
395 }
396 }
397
398 fn target_checkpoint_height(&self) -> TargetHeight {
410 let start_height = match self.previous_checkpoint_height() {
412 BeforeGenesis if !self.queued.contains_key(&block::Height(0)) => {
414 tracing::trace!("Waiting for genesis block");
415 metrics::counter!("checkpoint.waiting.count").increment(1);
416 return WaitingForBlocks;
417 }
418 BeforeGenesis => block::Height(0),
419 InitialTip(height) | PreviousCheckpoint(height) => height,
420 FinalCheckpoint => return FinishedVerifying,
421 };
422
423 let mut pending_height = start_height;
436 for (&height, _) in self.queued.range((Excluded(pending_height), Unbounded)) {
437 if height == block::Height(pending_height.0 + 1) {
439 pending_height = height;
440 } else {
441 let gap = height.0 - pending_height.0;
442 tracing::trace!(contiguous_height = ?pending_height,
444 next_height = ?height,
445 ?gap,
446 "Waiting for more checkpoint blocks");
447 break;
448 }
449 }
450 metrics::gauge!("checkpoint.queued.continuous.height").set(pending_height.0 as f64);
451
452 let start = self.current_start_bound().expect(
454 "if verification has finished, we should have returned earlier in the function",
455 );
456 let target_checkpoint = self
459 .checkpoint_list
460 .max_height_in_range((start, Included(pending_height)));
461
462 tracing::trace!(
463 checkpoint_start = ?start,
464 highest_contiguous_block = ?pending_height,
465 ?target_checkpoint
466 );
467
468 if let Some(block::Height(target_checkpoint)) = target_checkpoint {
469 metrics::gauge!("checkpoint.processing.next.height").set(target_checkpoint as f64);
470 } else {
471 metrics::gauge!("checkpoint.processing.next.height").set(start_height.0 as f64);
473 metrics::counter!("checkpoint.waiting.count").increment(1);
474 }
475
476 target_checkpoint
477 .map(Checkpoint)
478 .unwrap_or(WaitingForBlocks)
479 }
480
481 fn previous_checkpoint_hash(&self) -> Progress<block::Hash> {
485 match self.previous_checkpoint_height() {
486 BeforeGenesis => BeforeGenesis,
487 InitialTip(_) => self
488 .initial_tip_hash
489 .map(InitialTip)
490 .expect("initial tip height must have an initial tip hash"),
491 PreviousCheckpoint(height) => self
492 .checkpoint_list
493 .hash(height)
494 .map(PreviousCheckpoint)
495 .expect("every checkpoint height must have a hash"),
496 FinalCheckpoint => FinalCheckpoint,
497 }
498 }
499
500 fn check_height(&self, height: block::Height) -> Result<(), VerifyCheckpointError> {
509 if height > self.checkpoint_list.max_height() {
510 Err(VerifyCheckpointError::TooHigh {
511 height,
512 max_height: self.checkpoint_list.max_height(),
513 })?;
514 }
515
516 match self.previous_checkpoint_height() {
517 BeforeGenesis => {}
519 InitialTip(previous_height) | PreviousCheckpoint(previous_height)
521 if (height <= previous_height) =>
522 {
523 let e = Err(VerifyCheckpointError::AlreadyVerified {
524 height,
525 verified_height: previous_height,
526 });
527 tracing::trace!(?e);
528 e?;
529 }
530 InitialTip(_) | PreviousCheckpoint(_) => {}
531 FinalCheckpoint => Err(VerifyCheckpointError::Finished)?,
533 };
534
535 Ok(())
536 }
537
538 fn update_progress(&mut self, verified_height: block::Height) {
540 if let Some(max_height) = self.queued.keys().next_back() {
541 metrics::gauge!("checkpoint.queued.max.height").set(max_height.0 as f64);
542 } else {
543 metrics::gauge!("checkpoint.queued.max.height").set(f64::NAN);
545 }
546 metrics::gauge!("checkpoint.queued_slots").set(self.queued.len() as f64);
547
548 if self.check_height(verified_height).is_err() {
556 return;
557 }
558
559 if verified_height == self.checkpoint_list.max_height() {
561 self.verifier_progress = FinalCheckpoint;
562
563 tracing::info!(
564 final_checkpoint_height = ?verified_height,
565 "verified final checkpoint: starting full validation",
566 );
567
568 self.verified_checkpoint_diagnostics(verified_height);
569 self.finish_diagnostics();
570 } else if self.checkpoint_list.contains(verified_height) {
571 self.verifier_progress = PreviousCheckpoint(verified_height);
572 self.initial_tip_hash = None;
574
575 self.verified_checkpoint_diagnostics(verified_height);
576 }
577 }
578
579 fn check_block(
592 &self,
593 block: Arc<Block>,
594 ) -> Result<CheckpointVerifiedBlock, VerifyCheckpointError> {
595 let hash = block.hash();
596 let height = block
597 .coinbase_height()
598 .ok_or(VerifyCheckpointError::CoinbaseHeight { hash })?;
599 self.check_height(height)?;
600
601 if self.network.disable_pow() {
602 crate::block::check::difficulty_threshold_is_valid(
603 &block.header,
604 &self.network,
605 &height,
606 &hash,
607 )?;
608 } else {
609 crate::block::check::difficulty_is_valid(&block.header, &self.network, &height, &hash)?;
610 crate::block::check::equihash_solution_is_valid(&block.header)?;
611 }
612
613 let expected_deferred_amount = if height > self.network.slow_start_interval() {
616 funding_stream_values(height, &self.network, block_subsidy(height, &self.network)?)?
618 .remove(&FundingStreamReceiver::Deferred)
619 } else {
620 None
621 };
622
623 let deferred_pool_balance_change = expected_deferred_amount
624 .unwrap_or_default()
625 .checked_sub(self.network.lockbox_disbursement_total_amount(height))
626 .map(DeferredPoolBalanceChange::new);
627
628 let block = CheckpointVerifiedBlock::new(block, Some(hash), deferred_pool_balance_change);
630
631 crate::block::check::merkle_root_validity(
632 &self.network,
633 &block.block,
634 &block.transaction_hashes,
635 )?;
636
637 Ok(block)
638 }
639
640 #[allow(clippy::unwrap_in_result)]
651 fn queue_block(&mut self, block: Arc<Block>) -> Result<RequestBlock, VerifyCheckpointError> {
652 let (tx, rx) = oneshot::channel();
654
655 let block = self.check_block(block)?;
657 let height = block.height;
658 let hash = block.hash;
659
660 let new_qblock = QueuedBlock {
661 block: block.clone(),
662 tx,
663 };
664 let req_block = RequestBlock { block, rx };
665
666 let qblocks = self
670 .queued
671 .entry(height)
672 .or_insert_with(|| QueuedBlockList::with_capacity(1));
673
674 for qb in qblocks.iter_mut() {
677 if qb.block.hash == hash {
678 let e = VerifyCheckpointError::NewerRequest { height, hash };
679 tracing::trace!(?e, "failing older of duplicate requests");
680
681 let old = std::mem::replace(qb, new_qblock);
690 let _ = old.tx.send(Err(e));
691 return Ok(req_block);
692 }
693 }
694
695 if qblocks.len() >= MAX_QUEUED_BLOCKS_PER_HEIGHT {
697 let e = VerifyCheckpointError::QueuedLimit;
698 tracing::warn!(?e);
699 return Err(e);
700 }
701
702 qblocks.reserve_exact(1);
705 qblocks.push(new_qblock);
706
707 self.queued_block_diagnostics(height, hash);
708
709 Ok(req_block)
710 }
711
712 #[allow(clippy::unwrap_in_result)]
716 fn process_height(
717 &mut self,
718 height: block::Height,
719 expected_hash: block::Hash,
720 ) -> Option<QueuedBlock> {
721 let mut qblocks = self
722 .queued
723 .remove(&height)
724 .expect("the current checkpoint range has continuous Vec<QueuedBlock>s");
725 assert!(
726 !qblocks.is_empty(),
727 "the current checkpoint range has continuous Blocks"
728 );
729
730 if let Some(checkpoint_hash) = self.checkpoint_list.hash(height) {
732 assert_eq!(expected_hash, checkpoint_hash,
737 "checkpoints in the range should match: bad checkpoint list, zebra bug, or bad chain"
738 );
739 }
740
741 let mut valid_qblock = None;
748 for qblock in qblocks.drain(..) {
749 if qblock.block.hash == expected_hash {
750 if valid_qblock.is_none() {
751 valid_qblock = Some(qblock);
753 } else {
754 unreachable!("unexpected duplicate block {:?} {:?}: duplicate blocks should be rejected before being queued",
755 height, qblock.block.hash);
756 }
757 } else {
758 tracing::info!(?height, ?qblock.block.hash, ?expected_hash,
759 "Side chain hash at height in CheckpointVerifier");
760 let _ = qblock
761 .tx
762 .send(Err(VerifyCheckpointError::UnexpectedSideChain {
763 found: qblock.block.hash,
764 expected: expected_hash,
765 }));
766 }
767 }
768
769 valid_qblock
770 }
771
772 fn process_checkpoint_range(&mut self) {
780 let previous_checkpoint_hash = match self.previous_checkpoint_hash() {
790 BeforeGenesis => GENESIS_PREVIOUS_BLOCK_HASH,
794 InitialTip(hash) | PreviousCheckpoint(hash) => hash,
795 FinalCheckpoint => return,
796 };
797 let (target_checkpoint_height, mut expected_hash) = match self.target_checkpoint_height() {
799 Checkpoint(height) => (
800 height,
801 self.checkpoint_list
802 .hash(height)
803 .expect("every checkpoint height must have a hash"),
804 ),
805 WaitingForBlocks => {
806 return;
807 }
808 FinishedVerifying => {
809 unreachable!("the FinalCheckpoint case should have returned earlier")
810 }
811 };
812
813 let old_prev_check_height = self.previous_checkpoint_height();
816
817 let current_range = (
819 self.current_start_bound()
820 .expect("earlier code checks if verification has finished"),
821 Included(target_checkpoint_height),
822 );
823 let range_heights: Vec<block::Height> = self
824 .queued
825 .range_mut(current_range)
826 .rev()
827 .map(|(key, _)| *key)
828 .collect();
829 let mut rev_valid_blocks = Vec::new();
831
832 for current_height in range_heights {
834 let valid_qblock = self.process_height(current_height, expected_hash);
835 if let Some(qblock) = valid_qblock {
836 expected_hash = qblock.block.block.header.previous_block_hash;
837 rev_valid_blocks.push(qblock);
841 } else {
842 tracing::info!(
845 ?current_height,
846 ?current_range,
847 "No valid blocks at height in CheckpointVerifier"
848 );
849
850 for vblock in rev_valid_blocks.drain(..).rev() {
857 self.queued
858 .entry(vblock.block.height)
859 .or_default()
860 .push(vblock);
861 }
862
863 assert_eq!(
865 self.previous_checkpoint_height(),
866 old_prev_check_height,
867 "we must not change the previous checkpoint on failure"
868 );
869 let current_target = self.target_checkpoint_height();
873 assert!(
874 current_target == WaitingForBlocks
875 || current_target < Checkpoint(target_checkpoint_height),
876 "we must decrease or eliminate our target on failure"
877 );
878
879 return;
881 }
882 }
883
884 assert_eq!(
887 expected_hash, previous_checkpoint_hash,
888 "the previous checkpoint should match: bad checkpoint list, zebra bug, or bad chain"
889 );
890
891 let block_count = rev_valid_blocks.len();
892 tracing::info!(?block_count, ?current_range, "verified checkpoint range");
893 metrics::counter!("checkpoint.verified.block.count").increment(block_count as u64);
894
895 for qblock in rev_valid_blocks.drain(..).rev() {
898 let _ = qblock.tx.send(Ok(qblock.block.hash));
900 }
901
902 self.update_progress(target_checkpoint_height);
904
905 let new_progress = self.previous_checkpoint_height();
907 assert!(
908 new_progress > old_prev_check_height,
909 "we must make progress on success"
910 );
911 if new_progress == FinalCheckpoint {
913 assert_eq!(
914 target_checkpoint_height,
915 self.checkpoint_list.max_height(),
916 "we finish at the maximum checkpoint"
917 );
918 } else {
919 assert_eq!(
920 new_progress,
921 PreviousCheckpoint(target_checkpoint_height),
922 "the new previous checkpoint must match the old target"
923 );
924 }
925 let new_target = self.target_checkpoint_height();
929 assert!(
930 new_target == WaitingForBlocks || new_target == FinishedVerifying,
931 "processing must cover all available checkpoints"
932 );
933 }
934}
935
936impl<S> Drop for CheckpointVerifier<S>
938where
939 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
940 S::Future: Send + 'static,
941{
942 fn drop(&mut self) {
947 self.finish_diagnostics();
948
949 let drop_keys: Vec<_> = self.queued.keys().cloned().collect();
950 for key in drop_keys {
951 let mut qblocks = self
952 .queued
953 .remove(&key)
954 .expect("each entry is only removed once");
955 for qblock in qblocks.drain(..) {
956 let _ = qblock.tx.send(Err(VerifyCheckpointError::Dropped));
958 }
959 }
960 }
961}
962
963#[derive(Debug, Error)]
964#[allow(missing_docs)]
965pub enum VerifyCheckpointError {
966 #[error("checkpoint request after the final checkpoint has been verified")]
967 Finished,
968 #[error("block at {height:?} is higher than the maximum checkpoint {max_height:?}")]
969 TooHigh {
970 height: block::Height,
971 max_height: block::Height,
972 },
973 #[error("block {height:?} is less than or equal to the verified tip {verified_height:?}")]
974 AlreadyVerified {
975 height: block::Height,
976 verified_height: block::Height,
977 },
978 #[error("rejected older of duplicate verification requests for block at {height:?} {hash:?}")]
979 NewerRequest {
980 height: block::Height,
981 hash: block::Hash,
982 },
983 #[error("the block {hash:?} does not have a coinbase height")]
984 CoinbaseHeight { hash: block::Hash },
985 #[error("merkle root {actual:?} does not match expected {expected:?}")]
986 BadMerkleRoot {
987 actual: block::merkle::Root,
988 expected: block::merkle::Root,
989 },
990 #[error("duplicate transactions in block")]
991 DuplicateTransaction,
992 #[error("checkpoint verifier was dropped")]
993 Dropped,
994 #[error(transparent)]
995 CommitCheckpointVerified(BoxError),
996 #[error(transparent)]
997 Tip(BoxError),
998 #[error(transparent)]
999 CheckpointList(BoxError),
1000 #[error(transparent)]
1001 VerifyBlock(VerifyBlockError),
1002 #[error("invalid block subsidy: {0}")]
1003 SubsidyError(#[from] SubsidyError),
1004 #[error("invalid amount: {0}")]
1005 AmountError(#[from] amount::Error),
1006 #[error("too many queued blocks at this height")]
1007 QueuedLimit,
1008 #[error("the block hash does not match the chained checkpoint hash, expected {expected:?} found {found:?}")]
1009 UnexpectedSideChain {
1010 expected: block::Hash,
1011 found: block::Hash,
1012 },
1013 #[error("zebra is shutting down")]
1014 ShuttingDown,
1015}
1016
1017impl From<VerifyBlockError> for VerifyCheckpointError {
1018 fn from(err: VerifyBlockError) -> VerifyCheckpointError {
1019 VerifyCheckpointError::VerifyBlock(err)
1020 }
1021}
1022
1023impl From<BlockError> for VerifyCheckpointError {
1024 fn from(err: BlockError) -> VerifyCheckpointError {
1025 VerifyCheckpointError::VerifyBlock(err.into())
1026 }
1027}
1028
1029impl From<equihash::Error> for VerifyCheckpointError {
1030 fn from(err: equihash::Error) -> VerifyCheckpointError {
1031 VerifyCheckpointError::VerifyBlock(err.into())
1032 }
1033}
1034
1035impl VerifyCheckpointError {
1036 pub fn is_duplicate_request(&self) -> bool {
1039 match self {
1040 VerifyCheckpointError::AlreadyVerified { .. } => true,
1041 VerifyCheckpointError::NewerRequest { .. } => true,
1043 VerifyCheckpointError::VerifyBlock(block_error) => block_error.is_duplicate_request(),
1044 _ => false,
1045 }
1046 }
1047
1048 pub fn misbehavior_score(&self) -> u32 {
1050 match self {
1052 VerifyCheckpointError::VerifyBlock(verify_block_error) => {
1053 verify_block_error.misbehavior_score()
1054 }
1055 VerifyCheckpointError::SubsidyError(_)
1056 | VerifyCheckpointError::CoinbaseHeight { .. }
1057 | VerifyCheckpointError::DuplicateTransaction
1058 | VerifyCheckpointError::AmountError(_) => 100,
1059 _other => 0,
1060 }
1061 }
1062}
1063
1064impl<S> Service<Arc<Block>> for CheckpointVerifier<S>
1068where
1069 S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
1070 S::Future: Send + 'static,
1071{
1072 type Response = block::Hash;
1073 type Error = VerifyCheckpointError;
1074 type Future =
1075 Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
1076
1077 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1078 Poll::Ready(Ok(()))
1079 }
1080
1081 #[instrument(name = "checkpoint", skip(self, block))]
1082 fn call(&mut self, block: Arc<Block>) -> Self::Future {
1083 if let Ok(tip) = self.reset_receiver.try_recv() {
1086 self.reset_progress(tip);
1087 }
1088
1089 if let FinalCheckpoint = self.previous_checkpoint_height() {
1091 return async { Err(VerifyCheckpointError::Finished) }.boxed();
1092 }
1093
1094 let req_block = match self.queue_block(block) {
1095 Ok(req_block) => req_block,
1096 Err(e) => return async { Err(e) }.boxed(),
1097 };
1098
1099 self.process_checkpoint_range();
1100
1101 metrics::gauge!("checkpoint.queued_slots").set(self.queued.len() as f64);
1102
1103 let state_service = self.state_service.clone();
1128 let commit_checkpoint_verified = tokio::spawn(async move {
1129 let hash = req_block
1130 .rx
1131 .await
1132 .map_err(Into::into)
1133 .map_err(VerifyCheckpointError::CommitCheckpointVerified)
1134 .expect("CheckpointVerifier does not leave dangling receivers")?;
1135
1136 match state_service
1139 .oneshot(zs::Request::CommitCheckpointVerifiedBlock(req_block.block))
1140 .map_err(VerifyCheckpointError::CommitCheckpointVerified)
1141 .await?
1142 {
1143 zs::Response::Committed(committed_hash) => {
1144 assert_eq!(committed_hash, hash, "state must commit correct hash");
1145 Ok(hash)
1146 }
1147 _ => unreachable!("wrong response for CommitCheckpointVerifiedBlock"),
1148 }
1149 });
1150
1151 let state_service = self.state_service.clone();
1152 let reset_sender = self.reset_sender.clone();
1153 async move {
1154 let result = commit_checkpoint_verified.await;
1155 let result = if zebra_chain::shutdown::is_shutting_down() {
1163 Err(VerifyCheckpointError::ShuttingDown)
1164 } else {
1165 result.expect("commit_checkpoint_verified should not panic")
1166 };
1167 if result.is_err() {
1168 let tip = match state_service
1172 .oneshot(zs::Request::Tip)
1173 .await
1174 .map_err(VerifyCheckpointError::Tip)?
1175 {
1176 zs::Response::Tip(tip) => tip,
1177 _ => unreachable!("wrong response for Tip"),
1178 };
1179 let _ = reset_sender.send(tip);
1182 }
1183 result
1184 }
1185 .boxed()
1186 }
1187}