1use std::{
6 collections::{BTreeSet, HashMap},
7 mem,
8 sync::Arc,
9};
10
11use indexmap::IndexMap;
12use zebra_chain::{
13 block::{self, Block, Hash, Height},
14 parameters::Network,
15 sprout::{self},
16 transparent,
17};
18
19use crate::{
20 constants::{MAX_INVALIDATED_BLOCKS, MAX_NON_FINALIZED_CHAIN_FORKS},
21 error::ReconsiderError,
22 request::{ContextuallyVerifiedBlock, FinalizableBlock},
23 service::{check, finalized_state::ZebraDb},
24 BoxError, SemanticallyVerifiedBlock, ValidateContextError,
25};
26
27mod chain;
28
29#[cfg(test)]
30mod tests;
31
32pub(crate) use chain::{Chain, SpendingTransactionId};
33
34pub struct NonFinalizedState {
42 chain_set: BTreeSet<Arc<Chain>>,
50
51 invalidated_blocks: IndexMap<Height, Arc<Vec<ContextuallyVerifiedBlock>>>,
54
55 pub network: Network,
59
60 should_count_metrics: bool,
69
70 #[cfg(feature = "progress-bar")]
72 chain_count_bar: Option<howudoin::Tx>,
73
74 #[cfg(feature = "progress-bar")]
79 chain_fork_length_bars: Vec<howudoin::Tx>,
80}
81
82impl std::fmt::Debug for NonFinalizedState {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 let mut f = f.debug_struct("NonFinalizedState");
85
86 f.field("chain_set", &self.chain_set)
87 .field("network", &self.network);
88
89 f.field("should_count_metrics", &self.should_count_metrics);
90
91 f.finish()
92 }
93}
94
95impl Clone for NonFinalizedState {
96 fn clone(&self) -> Self {
97 Self {
98 chain_set: self.chain_set.clone(),
99 network: self.network.clone(),
100 invalidated_blocks: self.invalidated_blocks.clone(),
101 should_count_metrics: self.should_count_metrics,
102 #[cfg(feature = "progress-bar")]
104 chain_count_bar: None,
105 #[cfg(feature = "progress-bar")]
106 chain_fork_length_bars: Vec::new(),
107 }
108 }
109}
110
111impl NonFinalizedState {
112 pub fn new(network: &Network) -> NonFinalizedState {
114 NonFinalizedState {
115 chain_set: Default::default(),
116 network: network.clone(),
117 invalidated_blocks: Default::default(),
118 should_count_metrics: true,
119 #[cfg(feature = "progress-bar")]
120 chain_count_bar: None,
121 #[cfg(feature = "progress-bar")]
122 chain_fork_length_bars: Vec::new(),
123 }
124 }
125
126 #[cfg(any(test, feature = "proptest-impl"))]
137 #[allow(dead_code)]
138 pub fn eq_internal_state(&self, other: &NonFinalizedState) -> bool {
139 self.chain_set.len() == other.chain_set.len()
143 && self
144 .chain_set
145 .iter()
146 .zip(other.chain_set.iter())
147 .all(|(self_chain, other_chain)| self_chain.eq_internal_state(other_chain))
148 && self.network == other.network
149 }
150
151 pub fn chain_iter(&self) -> impl Iterator<Item = &Arc<Chain>> {
155 self.chain_set.iter().rev()
156 }
157
158 fn insert_with<F>(&mut self, chain: Arc<Chain>, chain_filter: F)
161 where
162 F: FnOnce(&mut BTreeSet<Arc<Chain>>),
163 {
164 self.chain_set.insert(chain);
165
166 chain_filter(&mut self.chain_set);
167
168 while self.chain_set.len() > MAX_NON_FINALIZED_CHAIN_FORKS {
169 self.chain_set.pop_first();
171 }
172
173 self.update_metrics_bars();
174 }
175
176 fn insert(&mut self, chain: Arc<Chain>) {
178 self.insert_with(chain, |_ignored_chain| { })
179 }
180
181 pub fn finalize(&mut self) -> FinalizableBlock {
184 #[allow(clippy::mutable_key_type)]
188 let chains = mem::take(&mut self.chain_set);
189 let mut chains = chains.into_iter();
190
191 let mut best_chain = chains.next_back().expect("there's at least one chain");
193
194 let mut_best_chain = Arc::make_mut(&mut best_chain);
196
197 let side_chains = chains;
199
200 let (best_chain_root, root_treestate) = mut_best_chain.pop_root();
203
204 if !best_chain.is_empty() {
206 self.insert(best_chain);
207 }
208
209 for mut side_chain in side_chains.rev() {
211 if side_chain.non_finalized_root_hash() != best_chain_root.hash {
212 drop(side_chain);
215
216 continue;
217 }
218
219 let mut_side_chain = Arc::make_mut(&mut side_chain);
223
224 let (side_chain_root, _treestate) = mut_side_chain.pop_root();
226 assert_eq!(side_chain_root.hash, best_chain_root.hash);
227
228 self.insert(side_chain);
230 }
231
232 self.invalidated_blocks
234 .retain(|height, _blocks| *height >= best_chain_root.height);
235
236 self.update_metrics_for_chains();
237
238 FinalizableBlock::new(best_chain_root, root_treestate)
240 }
241
242 #[tracing::instrument(level = "debug", skip(self, finalized_state, prepared))]
246 pub fn commit_block(
247 &mut self,
248 prepared: SemanticallyVerifiedBlock,
249 finalized_state: &ZebraDb,
250 ) -> Result<(), ValidateContextError> {
251 let parent_hash = prepared.block.header.previous_block_hash;
252 let (height, hash) = (prepared.height, prepared.hash);
253
254 let parent_chain = self.parent_chain(parent_hash)?;
255
256 let modified_chain = self.validate_and_commit(parent_chain, prepared, finalized_state)?;
259
260 self.insert_with(modified_chain, |chain_set| {
265 chain_set.retain(|chain| chain.non_finalized_tip_hash() != parent_hash)
266 });
267
268 self.update_metrics_for_committed_block(height, hash);
269
270 Ok(())
271 }
272
273 #[allow(clippy::unwrap_in_result)]
276 pub fn invalidate_block(&mut self, block_hash: Hash) -> Result<block::Hash, BoxError> {
277 let Some(chain) = self.find_chain(|chain| chain.contains_block_hash(block_hash)) else {
278 return Err("block hash not found in any non-finalized chain".into());
279 };
280
281 let invalidated_blocks = if chain.non_finalized_root_hash() == block_hash {
282 self.chain_set.remove(&chain);
283 chain.blocks.values().cloned().collect()
284 } else {
285 let (new_chain, invalidated_blocks) = chain
286 .invalidate_block(block_hash)
287 .expect("already checked that chain contains hash");
288
289 self.insert_with(Arc::new(new_chain.clone()), |chain_set| {
292 chain_set.retain(|c| !c.contains_block_hash(block_hash))
293 });
294
295 invalidated_blocks
296 };
297
298 self.invalidated_blocks.insert(
300 invalidated_blocks
301 .first()
302 .expect("should not be empty")
303 .clone()
304 .height,
305 Arc::new(invalidated_blocks),
306 );
307
308 while self.invalidated_blocks.len() > MAX_INVALIDATED_BLOCKS {
309 self.invalidated_blocks.shift_remove_index(0);
310 }
311
312 self.update_metrics_for_chains();
313 self.update_metrics_bars();
314
315 Ok(block_hash)
316 }
317
318 pub fn reconsider_block(
322 &mut self,
323 block_hash: block::Hash,
324 finalized_state: &ZebraDb,
325 ) -> Result<Vec<block::Hash>, ReconsiderError> {
326 let height = self
328 .invalidated_blocks
329 .iter()
330 .find_map(|(height, blocks)| {
331 if blocks.first()?.hash == block_hash {
332 Some(height)
333 } else {
334 None
335 }
336 })
337 .ok_or(ReconsiderError::MissingInvalidatedBlock(block_hash))?;
338
339 let invalidated_blocks = Arc::unwrap_or_clone(
340 self.invalidated_blocks
341 .clone()
342 .shift_remove(height)
343 .ok_or(ReconsiderError::MissingInvalidatedBlock(block_hash))?,
344 );
345
346 let invalidated_block_hashes = invalidated_blocks
347 .iter()
348 .map(|block| block.hash)
349 .collect::<Vec<_>>();
350
351 let invalidated_root = invalidated_blocks
354 .first()
355 .ok_or(ReconsiderError::InvalidatedBlocksEmpty)?;
356
357 let root_parent_hash = invalidated_root.block.header.previous_block_hash;
358
359 let chain_result = if root_parent_hash == finalized_state.finalized_tip_hash() {
362 let chain = Chain::new(
363 &self.network,
364 finalized_state
365 .finalized_tip_height()
366 .ok_or(ReconsiderError::ParentChainNotFound(block_hash))?,
367 finalized_state.sprout_tree_for_tip(),
368 finalized_state.sapling_tree_for_tip(),
369 finalized_state.orchard_tree_for_tip(),
370 finalized_state.history_tree(),
371 finalized_state.finalized_value_pool(),
372 );
373 Arc::new(chain)
374 } else {
375 self.parent_chain(root_parent_hash)
378 .map_err(|_| ReconsiderError::ParentChainNotFound(block_hash))?
379 };
380
381 let mut modified_chain = Arc::unwrap_or_clone(chain_result);
382 for block in invalidated_blocks {
383 modified_chain = modified_chain.push(block)?;
384 }
385
386 let (height, hash) = modified_chain.non_finalized_tip();
387
388 if let Some(best_chain_root_height) = finalized_state.finalized_tip_height() {
391 self.invalidated_blocks
392 .retain(|height, _blocks| *height >= best_chain_root_height);
393 }
394
395 self.insert_with(Arc::new(modified_chain), |chain_set| {
396 chain_set.retain(|chain| chain.non_finalized_tip_hash() != root_parent_hash)
397 });
398
399 self.update_metrics_for_committed_block(height, hash);
400
401 Ok(invalidated_block_hashes)
402 }
403
404 #[tracing::instrument(level = "debug", skip(self, finalized_state, prepared))]
407 #[allow(clippy::unwrap_in_result)]
408 pub fn commit_new_chain(
409 &mut self,
410 prepared: SemanticallyVerifiedBlock,
411 finalized_state: &ZebraDb,
412 ) -> Result<(), ValidateContextError> {
413 let finalized_tip_height = finalized_state.finalized_tip_height();
414
415 #[cfg(not(test))]
417 let finalized_tip_height = finalized_tip_height.expect("finalized state contains blocks");
418 #[cfg(test)]
419 let finalized_tip_height = finalized_tip_height.unwrap_or(zebra_chain::block::Height(0));
420
421 let chain = Chain::new(
422 &self.network,
423 finalized_tip_height,
424 finalized_state.sprout_tree_for_tip(),
425 finalized_state.sapling_tree_for_tip(),
426 finalized_state.orchard_tree_for_tip(),
427 finalized_state.history_tree(),
428 finalized_state.finalized_value_pool(),
429 );
430
431 let (height, hash) = (prepared.height, prepared.hash);
432
433 let chain = self.validate_and_commit(Arc::new(chain), prepared, finalized_state)?;
435
436 self.insert(chain);
438 self.update_metrics_for_committed_block(height, hash);
439
440 Ok(())
441 }
442
443 #[tracing::instrument(level = "debug", skip(self, finalized_state, new_chain))]
449 fn validate_and_commit(
450 &self,
451 new_chain: Arc<Chain>,
452 prepared: SemanticallyVerifiedBlock,
453 finalized_state: &ZebraDb,
454 ) -> Result<Arc<Chain>, ValidateContextError> {
455 if self.invalidated_blocks.contains_key(&prepared.height) {
456 return Err(ValidateContextError::BlockPreviouslyInvalidated {
457 block_hash: prepared.hash,
458 });
459 }
460
461 let spent_utxos = check::utxo::transparent_spend(
465 &prepared,
466 &new_chain.unspent_utxos(),
467 &new_chain.spent_utxos,
468 finalized_state,
469 )?;
470
471 check::anchors::block_sapling_orchard_anchors_refer_to_final_treestates(
473 finalized_state,
474 &new_chain,
475 &prepared,
476 )?;
477
478 let sprout_final_treestates = check::anchors::block_fetch_sprout_final_treestates(
480 finalized_state,
481 &new_chain,
482 &prepared,
483 );
484
485 let contextual = ContextuallyVerifiedBlock::with_block_and_spent_utxos(
487 prepared.clone(),
488 spent_utxos.clone(),
489 )
490 .map_err(|value_balance_error| {
491 ValidateContextError::CalculateBlockChainValueChange {
492 value_balance_error,
493 height: prepared.height,
494 block_hash: prepared.hash,
495 transaction_count: prepared.block.transactions.len(),
496 spent_utxo_count: spent_utxos.len(),
497 }
498 })?;
499
500 Self::validate_and_update_parallel(new_chain, contextual, sprout_final_treestates)
501 }
502
503 #[allow(clippy::unwrap_in_result)]
505 #[tracing::instrument(skip(new_chain, sprout_final_treestates))]
506 fn validate_and_update_parallel(
507 new_chain: Arc<Chain>,
508 contextual: ContextuallyVerifiedBlock,
509 sprout_final_treestates: HashMap<sprout::tree::Root, Arc<sprout::tree::NoteCommitmentTree>>,
510 ) -> Result<Arc<Chain>, ValidateContextError> {
511 let mut block_commitment_result = None;
512 let mut sprout_anchor_result = None;
513 let mut chain_push_result = None;
514
515 let block = contextual.block.clone();
517 let network = new_chain.network();
518 let history_tree = new_chain.history_block_commitment_tree();
519
520 let block2 = contextual.block.clone();
521 let height = contextual.height;
522 let transaction_hashes = contextual.transaction_hashes.clone();
523
524 rayon::in_place_scope_fifo(|scope| {
525 scope.spawn_fifo(|_scope| {
526 block_commitment_result = Some(check::block_commitment_is_valid_for_chain_history(
527 block,
528 &network,
529 &history_tree,
530 ));
531 });
532
533 scope.spawn_fifo(|_scope| {
534 sprout_anchor_result =
535 Some(check::anchors::block_sprout_anchors_refer_to_treestates(
536 sprout_final_treestates,
537 block2,
538 transaction_hashes,
539 height,
540 ));
541 });
542
543 scope.spawn_fifo(|_scope| {
549 let new_chain = Arc::try_unwrap(new_chain)
552 .unwrap_or_else(|shared_chain| (*shared_chain).clone());
553 chain_push_result = Some(new_chain.push(contextual).map(Arc::new));
554 });
555 });
556
557 block_commitment_result.expect("scope has finished")?;
559 sprout_anchor_result.expect("scope has finished")?;
560
561 chain_push_result.expect("scope has finished")
562 }
563
564 pub fn best_chain_len(&self) -> Option<u32> {
567 Some(self.best_chain()?.blocks.len() as u32)
570 }
571
572 #[allow(dead_code)]
575 pub fn any_chain_contains(&self, hash: &block::Hash) -> bool {
576 self.chain_set
577 .iter()
578 .rev()
579 .any(|chain| chain.height_by_hash.contains_key(hash))
580 }
581
582 pub fn find_chain<P>(&self, mut predicate: P) -> Option<Arc<Chain>>
587 where
588 P: FnMut(&Chain) -> bool,
589 {
590 self.chain_set
592 .iter()
593 .rev()
594 .find(|chain| predicate(chain))
595 .cloned()
596 }
597
598 pub fn any_utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
603 self.chain_set
604 .iter()
605 .rev()
606 .find_map(|chain| chain.created_utxo(outpoint))
607 }
608
609 #[allow(dead_code)]
611 pub fn any_block_by_hash(&self, hash: block::Hash) -> Option<Arc<Block>> {
612 for chain in self.chain_set.iter().rev() {
614 if let Some(prepared) = chain
615 .height_by_hash
616 .get(&hash)
617 .and_then(|height| chain.blocks.get(height))
618 {
619 return Some(prepared.block.clone());
620 }
621 }
622
623 None
624 }
625
626 #[allow(dead_code)]
628 pub fn any_prev_block_hash_for_hash(&self, hash: block::Hash) -> Option<block::Hash> {
629 self.any_block_by_hash(hash)
631 .map(|block| block.header.previous_block_hash)
632 }
633
634 #[allow(dead_code)]
636 pub fn best_hash(&self, height: block::Height) -> Option<block::Hash> {
637 self.best_chain()?
638 .blocks
639 .get(&height)
640 .map(|prepared| prepared.hash)
641 }
642
643 #[allow(dead_code)]
645 pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> {
646 let best_chain = self.best_chain()?;
647 let height = best_chain.non_finalized_tip_height();
648 let hash = best_chain.non_finalized_tip_hash();
649
650 Some((height, hash))
651 }
652
653 #[allow(dead_code)]
655 pub fn best_tip_block(&self) -> Option<&ContextuallyVerifiedBlock> {
656 let best_chain = self.best_chain()?;
657
658 best_chain.tip_block()
659 }
660
661 #[allow(dead_code)]
663 pub fn best_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
664 let best_chain = self.best_chain()?;
665 let height = *best_chain.height_by_hash.get(&hash)?;
666 Some(height)
667 }
668
669 #[allow(dead_code)]
671 pub fn any_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
672 for chain in self.chain_set.iter().rev() {
673 if let Some(height) = chain.height_by_hash.get(&hash) {
674 return Some(*height);
675 }
676 }
677
678 None
679 }
680
681 #[cfg(any(test, feature = "proptest-impl"))]
683 #[allow(dead_code)]
684 pub fn best_contains_sprout_nullifier(&self, sprout_nullifier: &sprout::Nullifier) -> bool {
685 self.best_chain()
686 .map(|best_chain| best_chain.sprout_nullifiers.contains_key(sprout_nullifier))
687 .unwrap_or(false)
688 }
689
690 #[cfg(any(test, feature = "proptest-impl"))]
692 #[allow(dead_code)]
693 pub fn best_contains_sapling_nullifier(
694 &self,
695 sapling_nullifier: &zebra_chain::sapling::Nullifier,
696 ) -> bool {
697 self.best_chain()
698 .map(|best_chain| {
699 best_chain
700 .sapling_nullifiers
701 .contains_key(sapling_nullifier)
702 })
703 .unwrap_or(false)
704 }
705
706 #[cfg(any(test, feature = "proptest-impl"))]
708 #[allow(dead_code)]
709 pub fn best_contains_orchard_nullifier(
710 &self,
711 orchard_nullifier: &zebra_chain::orchard::Nullifier,
712 ) -> bool {
713 self.best_chain()
714 .map(|best_chain| {
715 best_chain
716 .orchard_nullifiers
717 .contains_key(orchard_nullifier)
718 })
719 .unwrap_or(false)
720 }
721
722 pub fn best_chain(&self) -> Option<&Arc<Chain>> {
724 self.chain_iter().next()
725 }
726
727 pub fn chain_count(&self) -> usize {
729 self.chain_set.len()
730 }
731
732 pub fn invalidated_blocks(&self) -> IndexMap<Height, Arc<Vec<ContextuallyVerifiedBlock>>> {
734 self.invalidated_blocks.clone()
735 }
736
737 fn parent_chain(&self, parent_hash: block::Hash) -> Result<Arc<Chain>, ValidateContextError> {
742 match self.find_chain(|chain| chain.non_finalized_tip_hash() == parent_hash) {
743 Some(chain) => Ok(chain.clone()),
745 None => {
747 let fork_chain = self
750 .chain_set
751 .iter()
752 .rev()
753 .find_map(|chain| chain.fork(parent_hash))
754 .ok_or(ValidateContextError::NotReadyToBeCommitted)?;
755
756 Ok(Arc::new(fork_chain))
757 }
758 }
759 }
760
761 fn should_count_metrics(&self) -> bool {
763 self.should_count_metrics
764 }
765
766 fn update_metrics_for_committed_block(&self, height: block::Height, hash: block::Hash) {
768 if !self.should_count_metrics() {
769 return;
770 }
771
772 metrics::counter!("state.memory.committed.block.count").increment(1);
773 metrics::gauge!("state.memory.committed.block.height").set(height.0 as f64);
774
775 if self
776 .best_chain()
777 .expect("metrics are only updated after initialization")
778 .non_finalized_tip_hash()
779 == hash
780 {
781 metrics::counter!("state.memory.best.committed.block.count").increment(1);
782 metrics::gauge!("state.memory.best.committed.block.height").set(height.0 as f64);
783 }
784
785 self.update_metrics_for_chains();
786 }
787
788 fn update_metrics_for_chains(&self) {
790 if !self.should_count_metrics() {
791 return;
792 }
793
794 metrics::gauge!("state.memory.chain.count").set(self.chain_set.len() as f64);
795 metrics::gauge!("state.memory.best.chain.length",)
796 .set(self.best_chain_len().unwrap_or_default() as f64);
797 }
798
799 fn update_metrics_bars(&mut self) {
802 if !self.should_count_metrics() {
805 #[allow(clippy::needless_return)]
806 return;
807 }
808
809 #[cfg(feature = "progress-bar")]
810 {
811 use std::cmp::Ordering::*;
812
813 if matches!(howudoin::cancelled(), Some(true)) {
814 self.disable_metrics();
815 return;
816 }
817
818 if self.chain_count_bar.is_none() {
820 self.chain_count_bar = Some(howudoin::new_root().label("Chain Forks"));
821 }
822
823 let chain_count_bar = self
824 .chain_count_bar
825 .as_ref()
826 .expect("just initialized if missing");
827 let finalized_tip_height = self
828 .best_chain()
829 .map(|chain| chain.non_finalized_root_height().0 - 1);
830
831 chain_count_bar.set_pos(u64::try_from(self.chain_count()).expect("fits in u64"));
832 if let Some(finalized_tip_height) = finalized_tip_height {
835 chain_count_bar.desc(format!("Finalized Root {finalized_tip_height}"));
836 }
837
838 let prev_length_bars = self.chain_fork_length_bars.len();
840
841 match self.chain_count().cmp(&prev_length_bars) {
842 Greater => self
843 .chain_fork_length_bars
844 .resize_with(self.chain_count(), || {
845 howudoin::new_with_parent(chain_count_bar.id())
846 }),
847 Less => {
848 let redundant_bars = self.chain_fork_length_bars.split_off(self.chain_count());
849 for bar in redundant_bars {
850 bar.close();
851 }
852 }
853 Equal => {}
854 }
855
856 for (chain_length_bar, chain) in
859 std::iter::zip(self.chain_fork_length_bars.iter(), self.chain_iter())
860 {
861 let fork_height = chain
862 .last_fork_height
863 .unwrap_or_else(|| chain.non_finalized_tip_height())
864 .0;
865
866 chain_length_bar
870 .label(format!("Fork {fork_height}"))
871 .set_pos(u64::try_from(chain.len()).expect("fits in u64"));
872 let mut desc = String::new();
882
883 if let Some(recent_fork_height) = chain.recent_fork_height() {
884 let recent_fork_length = chain
885 .recent_fork_length()
886 .expect("just checked recent fork height");
887
888 let mut plural = "s";
889 if recent_fork_length == 1 {
890 plural = "";
891 }
892
893 desc.push_str(&format!(
894 " at {recent_fork_height:?} + {recent_fork_length} block{plural}"
895 ));
896 }
897
898 chain_length_bar.desc(desc);
899 }
900 }
901 }
902
903 pub fn disable_metrics(&mut self) {
905 self.should_count_metrics = false;
906
907 #[cfg(feature = "progress-bar")]
908 {
909 let count_bar = self.chain_count_bar.take().into_iter();
910 let fork_bars = self.chain_fork_length_bars.drain(..);
911 count_bar.chain(fork_bars).for_each(howudoin::Tx::close);
912 }
913 }
914}
915
916impl Drop for NonFinalizedState {
917 fn drop(&mut self) {
918 self.disable_metrics();
919 }
920}